python.lang.security.audit.dangerous-asyncio-create-exec-tainted-env-args.dangerous-asyncio-create-exec-tainted-env-args

profile photo of semgrepsemgrep
Author
unknown
Download Count*

Detected 'create_subprocess_exec' function with user controlled data. You may consider using 'shlex.escape()'.

Run Locally

Run in CI

Defintion

rules:
  - id: dangerous-asyncio-create-exec-tainted-env-args
    mode: taint
    options:
      symbolic_propagation: true
    pattern-sources:
      - patterns:
          - pattern-either:
              - patterns:
                  - pattern-either:
                      - pattern: os.environ
                      - pattern: os.environ.get('$FOO', ...)
                      - pattern: os.environb
                      - pattern: os.environb.get('$FOO', ...)
                      - pattern: os.getenv('$ANYTHING', ...)
                      - pattern: os.getenvb('$ANYTHING', ...)
              - patterns:
                  - pattern-either:
                      - patterns:
                          - pattern-either:
                              - pattern: sys.argv
                              - pattern: sys.orig_argv
                      - patterns:
                          - pattern-inside: |
                              $PARSER = argparse.ArgumentParser(...)
                              ...
                          - pattern-inside: |
                              $ARGS = $PARSER.parse_args()
                          - pattern: <... $ARGS ...>
                      - patterns:
                          - pattern-inside: |
                              $PARSER = optparse.OptionParser(...)
                              ...
                          - pattern-inside: |
                              $ARGS = $PARSER.parse_args()
                          - pattern: <... $ARGS ...>
                      - patterns:
                          - pattern-either:
                              - pattern-inside: |
                                  $OPTS, $ARGS = getopt.getopt(...)
                                  ...
                              - pattern-inside: |
                                  $OPTS, $ARGS = getopt.gnu_getopt(...)
                                  ...
                          - pattern-either:
                              - patterns:
                                  - pattern-inside: |
                                      for $O, $A in $OPTS:
                                        ...
                                  - pattern: $A
                              - pattern: $ARGS
    pattern-sinks:
      - pattern-either:
          - patterns:
              - pattern-not: asyncio.create_subprocess_exec($PROG, "...", ...)
              - pattern-not: asyncio.create_subprocess_exec($PROG, ["...",...], ...)
              - pattern: asyncio.create_subprocess_exec(...)
          - patterns:
              - pattern-not: asyncio.create_subprocess_exec($PROG,
                  "=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", "...", ...)
              - pattern: asyncio.create_subprocess_exec($PROG, "=~/(sh|bash|ksh|csh|tcsh|zsh)/",
                  "-c",...)
          - patterns:
              - pattern-not: asyncio.create_subprocess_exec($PROG,
                  ["=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", "...", ...], ...)
              - pattern: asyncio.create_subprocess_exec($PROG,
                  ["=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", ...], ...)
          - patterns:
              - pattern-not: asyncio.subprocess.create_subprocess_exec($PROG, "...", ...)
              - pattern-not: asyncio.subprocess.create_subprocess_exec($PROG, ["...",...], ...)
              - pattern: asyncio.subprocess.create_subprocess_exec(...)
          - patterns:
              - pattern-not: asyncio.subprocess.create_subprocess_exec($PROG,
                  "=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", "...", ...)
              - pattern: asyncio.subprocess.create_subprocess_exec($PROG,
                  "=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c",...)
          - patterns:
              - pattern-not: asyncio.subprocess.create_subprocess_exec($PROG,
                  ["=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", "...", ...], ...)
              - pattern: asyncio.subprocess.create_subprocess_exec($PROG,
                  ["=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", ...], ...)
    message: Detected 'create_subprocess_exec' function with user controlled data.
      You may consider using 'shlex.escape()'.
    metadata:
      owasp:
        - A01:2017 - Injection
        - A03:2021 - Injection
      cwe:
        - "CWE-78: Improper Neutralization of Special Elements used in an OS
          Command ('OS Command Injection')"
      asvs:
        section: "V5: Validation, Sanitization and Encoding Verification Requirements"
        control_id: 5.3.8 OS Command Injection
        control_url: https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v53-output-encoding-and-injection-prevention-requirements
        version: "4"
      references:
        - https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.create_subprocess_exec
        - https://docs.python.org/3/library/shlex.html
        - https://semgrep.dev/docs/cheat-sheets/python-command-injection/
      category: security
      technology:
        - python
      confidence: LOW
      cwe2022-top25: true
      cwe2021-top25: true
      subcategory:
        - vuln
      likelihood: LOW
      impact: MEDIUM
      license: Commons Clause License Condition v1.0[LGPL-2.1-only]
      vulnerability_class:
        - Command Injection
    languages:
      - python
    severity: ERROR

Examples

dangerous-asyncio-create-exec-tainted-env-args.py

import asyncio


class AsyncEventLoop:
    def __enter__(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        return self.loop

    def __exit__(self, *args):
        self.loop.close()


def vuln1():
    args = get_user_input()
    program = args[0]
    with AsyncEventLoop() as loop:
        # fn: dangerous-asyncio-create-exec-tainted-env-args
        proc = loop.run_until_complete(
            asyncio.subprocess.create_subprocess_exec(program, *args)
        )
        loop.run_until_complete(proc.communicate())


def vuln2():
    program = "bash"
    loop = asyncio.new_event_loop()
    proc = loop.run_until_complete(
        # ruleid: dangerous-asyncio-create-exec-tainted-env-args
        asyncio.subprocess.create_subprocess_exec(program, [program, "-c", sys.argv[1]])
    )
    loop.run_until_complete(proc.communicate())


def ok1():
    program = "echo"
    loop = asyncio.new_event_loop()
    # ok: dangerous-asyncio-create-exec-tainted-env-args
    proc = loop.run_until_complete(
        asyncio.subprocess.create_subprocess_exec(program, [program, "123"])
    )
    loop.run_until_complete(proc.communicate())