python.lang.security.audit.dangerous-asyncio-create-exec.dangerous-asyncio-create-exec

profile photo of returntocorpreturntocorp
Author
1,200
Download Count*

Detected 'create_subprocess_exec' function without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Run Locally

Run in CI

Defintion

rules:
  - id: dangerous-asyncio-create-exec
    pattern-either:
      - patterns:
          - pattern-not: asyncio.create_subprocess_exec($PROG, "...", ...)
          - pattern-not: asyncio.create_subprocess_exec($PROG, ["...",...], ...)
          - pattern: asyncio.create_subprocess_exec(...)
      - patterns:
          - pattern-not: asyncio.create_subprocess_exec($PROG,
              "=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", "...", ...)
          - pattern: asyncio.create_subprocess_exec($PROG, "=~/(sh|bash|ksh|csh|tcsh|zsh)/",
              "-c",...)
      - patterns:
          - pattern-not: asyncio.create_subprocess_exec($PROG,
              ["=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", "...", ...], ...)
          - pattern: asyncio.create_subprocess_exec($PROG,
              ["=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", ...], ...)
      - patterns:
          - pattern-not: asyncio.subprocess.create_subprocess_exec($PROG, "...", ...)
          - pattern-not: asyncio.subprocess.create_subprocess_exec($PROG, ["...",...], ...)
          - pattern: asyncio.subprocess.create_subprocess_exec(...)
      - patterns:
          - pattern-not: asyncio.subprocess.create_subprocess_exec($PROG,
              "=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", "...", ...)
          - pattern: asyncio.subprocess.create_subprocess_exec($PROG,
              "=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c",...)
      - patterns:
          - pattern-not: asyncio.subprocess.create_subprocess_exec($PROG,
              ["=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", "...", ...], ...)
          - pattern: asyncio.subprocess.create_subprocess_exec($PROG,
              ["=~/(sh|bash|ksh|csh|tcsh|zsh)/", "-c", ...], ...)
    message: Detected 'create_subprocess_exec' function without a static string. If
      this data can be controlled by a malicious actor, it may be an instance of
      command injection. Audit the use of this call to ensure it is not
      controllable by an external resource. You may consider using
      'shlex.escape()'.
    metadata:
      owasp: "A1: Injection"
      cwe: "CWE-78: Improper Neutralization of Special Elements used in an OS Command
        ('OS Command Injection')"
      asvs:
        section: "V5: Validation, Sanitization and Encoding Verification Requirements"
        control_id: 5.3.8 OS Command Injection
        control_url: https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v53-output-encoding-and-injection-prevention-requirements
        version: "4"
      references:
        - https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.create_subprocess_exec
        - https://docs.python.org/3/library/shlex.html
      category: security
      technology:
        - python
      license: Commons Clause License Condition v1.0[LGPL-2.1-only]
    languages:
      - python
    severity: ERROR

Examples

dangerous-asyncio-create-exec.py

import asyncio

class AsyncEventLoop:
    def __enter__(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        return self.loop

    def __exit__(self, *args):
        self.loop.close()

def vuln1():
    args = get_user_input()
    program = args[0]
    with AsyncEventLoop() as loop:
        # ruleid: dangerous-asyncio-create-exec
        proc = loop.run_until_complete(asyncio.subprocess.create_subprocess_exec(program, *args))
        loop.run_until_complete(proc.communicate())

def vuln2():
    program = "bash"
    loop = asyncio.new_event_loop()
    # ruleid: dangerous-asyncio-create-exec
    proc = loop.run_until_complete(asyncio.subprocess.create_subprocess_exec(program, [program, "-c", sys.argv[1]]))
    loop.run_until_complete(proc.communicate())

def ok1():
    program = "echo"
    loop = asyncio.new_event_loop()
    # ok: dangerous-asyncio-create-exec
    proc = loop.run_until_complete(asyncio.subprocess.create_subprocess_exec(program, [program, "123"]))
    loop.run_until_complete(proc.communicate())