python.aws-lambda.security.dangerous-asyncio-exec.dangerous-asyncio-exec

profile photo of semgrepsemgrep
Author
unknown
Download Count*

Detected subprocess function '$LOOP.subprocess_exec' with argument tainted by event object. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Run Locally

Run in CI

Defintion

rules:
  - id: dangerous-asyncio-exec
    mode: taint
    pattern-sources:
      - patterns:
          - pattern: event
          - pattern-inside: |
              def $HANDLER(event, context):
                ...
    pattern-sinks:
      - patterns:
          - focus-metavariable: $CMD
          - pattern-either:
              - pattern: $LOOP.subprocess_exec($PROTOCOL, $CMD, ...)
              - pattern: $LOOP.subprocess_exec($PROTOCOL, [$CMD, ...], ...)
              - pattern: $LOOP.subprocess_exec($PROTOCOL, "=~/(sh|bash|ksh|csh|tcsh|zsh)/",
                  "-c", $CMD, ...)
              - pattern: $LOOP.subprocess_exec($PROTOCOL, ["=~/(sh|bash|ksh|csh|tcsh|zsh)/",
                  "-c", $CMD, ...], ...)
    message: Detected subprocess function '$LOOP.subprocess_exec' with argument
      tainted by `event` object. If this data can be controlled by a malicious
      actor, it may be an instance of command injection. Audit the use of this
      call to ensure it is not controllable by an external resource. You may
      consider using 'shlex.escape()'.
    metadata:
      owasp:
        - A01:2017 - Injection
        - A03:2021 - Injection
      cwe:
        - "CWE-78: Improper Neutralization of Special Elements used in an OS
          Command ('OS Command Injection')"
      asvs:
        section: "V5: Validation, Sanitization and Encoding Verification Requirements"
        control_id: 5.3.8 OS Command Injection
        control_url: https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v53-output-encoding-and-injection-prevention-requirements
        version: "4"
      references:
        - https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.subprocess_exec
        - https://docs.python.org/3/library/shlex.html
      category: security
      technology:
        - python
        - aws-lambda
      license: Commons Clause License Condition v1.0[LGPL-2.1-only]
      cwe2022-top25: true
      cwe2021-top25: true
      subcategory:
        - vuln
      likelihood: HIGH
      impact: MEDIUM
      confidence: MEDIUM
      vulnerability_class:
        - Command Injection
    languages:
      - python
    severity: ERROR

Examples

dangerous-asyncio-exec.py

import asyncio

class AsyncEventLoop:
    def __enter__(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        return self.loop

    def __exit__(self, *args):
        self.loop.close()

class WaitingProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future

    def process_exited(self):
        self.exit_future.set_result(True)

def handler(event, context):
    args = event['commands']
    with AsyncEventLoop() as loop:
        exit_future = asyncio.Future(loop=loop)
        # ruleid: dangerous-asyncio-exec
        transport, _ = loop.run_until_complete(loop.subprocess_exec(lambda: WaitingProtocol(exit_future), *args))
        loop.run_until_complete(exit_future)
        transport.close()

def lambda_handler(event, context):
    loop = asyncio.new_event_loop()
    exit_future = asyncio.Future(loop=loop)
    cmd = event['cmd']
    # ruleid: dangerous-asyncio-exec
    transport, _ = loop.run_until_complete(loop.subprocess_exec(lambda: WaitingProtocol(exit_future), ["bash", "-c", cmd]))
    loop.run_until_complete(exit_future)
    transport.close()

def ok_handler(event, context):
    loop = asyncio.new_event_loop()
    exit_future = asyncio.Future(loop=loop)
    # ok: dangerous-asyncio-exec
    transport, _ = loop.run_until_complete(loop.subprocess_exec(lambda: WaitingProtocol(exit_future), ["echo", "a"]))
    loop.run_until_complete(exit_future)
    transport.close()