python.lang.security.audit.dangerous-asyncio-exec-tainted-env-args.dangerous-asyncio-exec-tainted-env-args

Author
unknown
Download Count*
License
Detected subprocess function '$LOOP.subprocess_exec' with user controlled data. You may consider using 'shlex.escape()'.
Run Locally
Run in CI
Defintion
rules:
- id: dangerous-asyncio-exec-tainted-env-args
mode: taint
options:
symbolic_propagation: true
pattern-sources:
- patterns:
- pattern-either:
- patterns:
- pattern-either:
- pattern: os.environ
- pattern: os.environ.get('$FOO', ...)
- pattern: os.environb
- pattern: os.environb.get('$FOO', ...)
- pattern: os.getenv('$ANYTHING', ...)
- pattern: os.getenvb('$ANYTHING', ...)
- patterns:
- pattern-either:
- patterns:
- pattern-either:
- pattern: sys.argv
- pattern: sys.orig_argv
- patterns:
- pattern-inside: |
$PARSER = argparse.ArgumentParser(...)
...
- pattern-inside: |
$ARGS = $PARSER.parse_args()
- pattern: <... $ARGS ...>
- patterns:
- pattern-inside: |
$PARSER = optparse.OptionParser(...)
...
- pattern-inside: |
$ARGS = $PARSER.parse_args()
- pattern: <... $ARGS ...>
- patterns:
- pattern-either:
- pattern-inside: |
$OPTS, $ARGS = getopt.getopt(...)
...
- pattern-inside: |
$OPTS, $ARGS = getopt.gnu_getopt(...)
...
- pattern-either:
- patterns:
- pattern-inside: |
for $O, $A in $OPTS:
...
- pattern: $A
- pattern: $ARGS
pattern-sinks:
- pattern-either:
- patterns:
- pattern-not: $LOOP.subprocess_exec($PROTOCOL, "...", ...)
- pattern-not: $LOOP.subprocess_exec($PROTOCOL, ["...",...], ...)
- pattern: $LOOP.subprocess_exec(...)
- patterns:
- pattern-not: $LOOP.subprocess_exec($PROTOCOL, "=~/(sh|bash|ksh|csh|tcsh|zsh)/",
"-c", "...", ...)
- pattern: $LOOP.subprocess_exec($PROTOCOL, "=~/(sh|bash|ksh|csh|tcsh|zsh)/",
"-c",...)
- patterns:
- pattern-not: $LOOP.subprocess_exec($PROTOCOL, ["=~/(sh|bash|ksh|csh|tcsh|zsh)/",
"-c", "...", ...], ...)
- pattern: $LOOP.subprocess_exec($PROTOCOL, ["=~/(sh|bash|ksh|csh|tcsh|zsh)/",
"-c", ...], ...)
message: Detected subprocess function '$LOOP.subprocess_exec' with user
controlled data. You may consider using 'shlex.escape()'.
metadata:
owasp:
- A01:2017 - Injection
- A03:2021 - Injection
cwe:
- "CWE-78: Improper Neutralization of Special Elements used in an OS
Command ('OS Command Injection')"
asvs:
section: "V5: Validation, Sanitization and Encoding Verification Requirements"
control_id: 5.3.8 OS Command Injection
control_url: https://github.com/OWASP/ASVS/blob/master/4.0/en/0x13-V5-Validation-Sanitization-Encoding.md#v53-output-encoding-and-injection-prevention-requirements
version: "4"
references:
- https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.subprocess_exec
- https://docs.python.org/3/library/shlex.html
- https://semgrep.dev/docs/cheat-sheets/python-command-injection/
category: security
technology:
- python
confidence: MEDIUM
cwe2022-top25: true
cwe2021-top25: true
subcategory:
- vuln
likelihood: MEDIUM
impact: HIGH
license: Commons Clause License Condition v1.0[LGPL-2.1-only]
languages:
- python
severity: ERROR
Examples
dangerous-asyncio-exec-tainted-env-args.py
import asyncio
class AsyncEventLoop:
def __enter__(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
return self.loop
def __exit__(self, *args):
self.loop.close()
class WaitingProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
def process_exited(self):
self.exit_future.set_result(True)
def vuln1():
args = get_user_input()
with AsyncEventLoop() as loop:
exit_future = asyncio.Future(loop=loop)
# fn: dangerous-asyncio-exec-tainted-env-args
transport, _ = loop.run_until_complete(
loop.subprocess_exec(lambda: WaitingProtocol(exit_future), *args)
)
loop.run_until_complete(exit_future)
transport.close()
def vuln2():
loop = asyncio.new_event_loop()
exit_future = asyncio.Future(loop=loop)
transport, _ = loop.run_until_complete(
# ruleid: dangerous-asyncio-exec-tainted-env-args
loop.subprocess_exec(
lambda: WaitingProtocol(exit_future), ["bash", "-c", sys.argv[1]]
)
)
loop.run_until_complete(exit_future)
transport.close()
def ok1():
loop = asyncio.new_event_loop()
exit_future = asyncio.Future(loop=loop)
# ok: dangerous-asyncio-exec-tainted-env-args
transport, _ = loop.run_until_complete(
loop.subprocess_exec(lambda: WaitingProtocol(exit_future), ["echo", "a"])
)
loop.run_until_complete(exit_future)
transport.close()
Short Link: https://sg.run/Apjp