python.lang.security.audit.dangerous-asyncio-shell-tainted-env-args.dangerous-asyncio-shell-tainted-env-args

Author
unknown
Download Count*
License
Detected asyncio subprocess function with user controlled data. You may consider using 'shlex.escape()'.
Run Locally
Run in CI
Defintion
rules:
- id: dangerous-asyncio-shell-tainted-env-args
mode: taint
options:
symbolic_propagation: true
pattern-sources:
- patterns:
- pattern-either:
- patterns:
- pattern-either:
- pattern: os.environ
- pattern: os.environ.get('$FOO', ...)
- pattern: os.environb
- pattern: os.environb.get('$FOO', ...)
- pattern: os.getenv('$ANYTHING', ...)
- pattern: os.getenvb('$ANYTHING', ...)
- patterns:
- pattern-either:
- patterns:
- pattern-either:
- pattern: sys.argv
- pattern: sys.orig_argv
- patterns:
- pattern-inside: |
$PARSER = argparse.ArgumentParser(...)
...
- pattern-inside: |
$ARGS = $PARSER.parse_args()
- pattern: <... $ARGS ...>
- patterns:
- pattern-inside: |
$PARSER = optparse.OptionParser(...)
...
- pattern-inside: |
$ARGS = $PARSER.parse_args()
- pattern: <... $ARGS ...>
- patterns:
- pattern-either:
- pattern-inside: |
$OPTS, $ARGS = getopt.getopt(...)
...
- pattern-inside: |
$OPTS, $ARGS = getopt.gnu_getopt(...)
...
- pattern-either:
- patterns:
- pattern-inside: |
for $O, $A in $OPTS:
...
- pattern: $A
- pattern: $ARGS
pattern-sinks:
- patterns:
- pattern-either:
- pattern-inside: $LOOP.subprocess_shell($PROTOCOL, $CMD)
- pattern-inside: asyncio.subprocess.create_subprocess_shell($CMD, ...)
- pattern-inside: asyncio.create_subprocess_shell($CMD, ...)
- focus-metavariable: $CMD
- pattern-not-inside: |
$CMD = "..."
...
- pattern-not: $LOOP.subprocess_shell($PROTOCOL, "...")
- pattern-not: asyncio.subprocess.create_subprocess_shell("...", ...)
- pattern-not: asyncio.create_subprocess_shell("...", ...)
message: Detected asyncio subprocess function with user controlled data. You may
consider using 'shlex.escape()'.
metadata:
owasp:
- A01:2017 - Injection
- A03:2021 - Injection
cwe:
- "CWE-78: Improper Neutralization of Special Elements used in an OS
Command ('OS Command Injection')"
asvs:
section: "V5: Validation, Sanitization and Encoding Verification Requirements"
control_id: 5.3.8 OS Command Injection
control_url: https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v53-output-encoding-and-injection-prevention-requirements
version: "4"
references:
- https://docs.python.org/3/library/asyncio-subprocess.html
- https://docs.python.org/3/library/shlex.html
- https://semgrep.dev/docs/cheat-sheets/python-command-injection/
category: security
technology:
- python
confidence: MEDIUM
cwe2022-top25: true
cwe2021-top25: true
subcategory:
- vuln
likelihood: MEDIUM
impact: HIGH
license: Commons Clause License Condition v1.0[LGPL-2.1-only]
languages:
- python
severity: ERROR
Examples
dangerous-asyncio-shell-tainted-env-args.py
import asyncio
import sys
class AsyncEventLoop:
def __enter__(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
return self.loop
def __exit__(self, *args):
self.loop.close()
class WaitingProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
def process_exited(self):
self.exit_future.set_result(True)
def vuln0():
shell_command = sys.argv[2]
with AsyncEventLoop() as loop:
exit_future = asyncio.Future(loop=loop)
transport, _ = loop.run_until_complete(
# ruleid: dangerous-asyncio-shell-tainted-env-args
loop.subprocess_shell(lambda: WaitingProtocol(exit_future), shell_command)
)
loop.run_until_complete(exit_future)
transport.close()
def vuln1(shell_command):
with AsyncEventLoop() as loop:
exit_future = asyncio.Future(loop=loop)
transport, _ = loop.run_until_complete(
# fn: dangerous-asyncio-shell-tainted-env-args
loop.subprocess_shell(lambda: WaitingProtocol(exit_future), shell_command)
)
loop.run_until_complete(exit_future)
transport.close()
def vuln2(shell_command):
with AsyncEventLoop() as loop:
proc = loop.run_until_complete(
# fn: dangerous-asyncio-shell-tainted-env-args
asyncio.subprocess.create_subprocess_shell(shell_command)
)
loop.run_until_complete(proc.wait())
def ok1():
shell_command = 'echo "Hello world"'
with AsyncEventLoop() as loop:
exit_future = asyncio.Future(loop=loop)
# ok: dangerous-asyncio-shell-tainted-env-args
transport, _ = loop.run_until_complete(
loop.subprocess_shell(lambda: WaitingProtocol(exit_future), shell_command)
)
loop.run_until_complete(exit_future)
transport.close()
def ok2():
with AsyncEventLoop() as loop:
# ok: dangerous-asyncio-shell-tainted-env-args
proc = loop.run_until_complete(
asyncio.subprocess.create_subprocess_shell('echo "foobar"')
)
loop.run_until_complete(proc.wait())
Short Link: https://sg.run/Dx8Y