python.lang.security.audit.sqli.asyncpg-sqli.asyncpg-sqli
semgrep
Author
1,688
Download Count*
License
Detected string concatenation with a non-literal variable in a asyncpg Python SQL statement. This could lead to SQL injection if the variable is user-controlled and not properly sanitized. In order to prevent SQL injection, use parameterized queries or prepared statements instead. You can create parameterized queries like so: 'conn.fetch("SELECT $1 FROM table", value)'. You can also create prepared statements with 'Connection.prepare': 'stmt = conn.prepare("SELECT $1 FROM table"); await stmt.fetch(user_value)'
Run Locally
Run in CI
Defintion
rules:
- id: asyncpg-sqli
languages:
- python
message: "Detected string concatenation with a non-literal variable in a asyncpg
Python SQL statement. This could lead to SQL injection if the variable is
user-controlled and not properly sanitized. In order to prevent SQL
injection, use parameterized queries or prepared statements instead. You
can create parameterized queries like so: 'conn.fetch(\"SELECT $1 FROM
table\", value)'. You can also create prepared statements with
'Connection.prepare': 'stmt = conn.prepare(\"SELECT $1 FROM table\");
await stmt.fetch(user_value)'"
metadata:
references:
- https://github.com/MagicStack/asyncpg
- https://magicstack.github.io/asyncpg/current/
category: security
cwe:
- "CWE-89: Improper Neutralization of Special Elements used in an SQL
Command ('SQL Injection')"
technology:
- asyncpg
owasp:
- A01:2017 - Injection
- A03:2021 - Injection
cwe2022-top25: true
cwe2021-top25: true
subcategory:
- audit
likelihood: LOW
impact: HIGH
confidence: LOW
license: Commons Clause License Condition v1.0[LGPL-2.1-only]
vulnerability_class:
- SQL Injection
patterns:
- pattern-either:
- patterns:
- pattern: $CONN.$METHOD(...,$QUERY,...)
- pattern-either:
- pattern-inside: |
$QUERY = $X + $Y
...
- pattern-inside: |
$QUERY += $X
...
- pattern-inside: |
$QUERY = '...'.format(...)
...
- pattern-inside: |
$QUERY = '...' % (...)
...
- pattern-inside: |
$QUERY = f'...{$USERINPUT}...'
...
- pattern-not-inside: |
$QUERY += "..."
...
- pattern-not-inside: |
$QUERY = "..." + "..."
...
- pattern-not-inside: |
$QUERY = '...'.format()
...
- pattern-not-inside: |
$QUERY = '...' % ()
...
- pattern: $CONN.$METHOD(..., $X + $Y, ...)
- pattern: $CONN.$METHOD(..., $Y.format(...), ...)
- pattern: $CONN.$METHOD(..., '...'.format(...), ...)
- pattern: $CONN.$METHOD(..., '...' % (...), ...)
- pattern: $CONN.$METHOD(..., f'...{$USERINPUT}...', ...)
- pattern-either:
- pattern-inside: |
$CONN = await asyncpg.connect(...)
...
- pattern-inside: |
async with asyncpg.create_pool(...) as $CONN:
...
- pattern-inside: |
async with $POOL.acquire(...) as $CONN:
...
- pattern-inside: |
$CONN = await $POOL.acquire(...)
...
- pattern-inside: |
def $FUNCNAME(..., $CONN: Connection, ...):
...
- pattern-inside: |
def $FUNCNAME(..., $CONN: asyncpg.Connection, ...):
...
- pattern-not: $CONN.$METHOD(..., "..." + "...", ...)
- pattern-not: $CONN.$METHOD(..., '...'.format(), ...)
- pattern-not: $CONN.$METHOD(..., '...'%(), ...)
- metavariable-regex:
metavariable: $METHOD
regex: ^(fetch|fetchrow|fetchval|execute|executemany|prepare|cursor|copyfromquery)$
severity: WARNING
Examples
asyncpg-sqli.py
import asyncio
import asyncpg
def bad1():
conn = await asyncpg.connect(user='user', password='password',
database='database', host='127.0.0.1')
query = "SELECT name FROM users WHERE age=" + req.FormValue("age")
# ruleid: asyncpg-sqli
values = await conn.fetch(query)
await conn.close()
async def bad2(conn: Connection):
async with conn.transaction():
sql_query = 'SELECT * FROM {}'.format(user_input)
# ruleid: asyncpg-sqli
cur = await conn.cursor(sql_query)
def bad3(connection: Connection):
async with connection.transaction():
sql_query = 'SELECT * FROM %s'%(user_input)
# ruleid: asyncpg-sqli
await connection.execute(sql_query)
def bad4(user_input):
async with asyncpg.create_pool(user='postgres',
command_timeout=60) as pool:
sql_query = f'SELECT * FROM {user_input}'
# ruleid: asyncpg-sqli
await pool.fetch(sql_query)
def bad5():
async with asyncpg.create_pool(user='postgres',
command_timeout=60) as pool:
async with pool.acquire() as con:
# ruleid: asyncpg-sqli
await con.execute("SELECT name FROM users WHERE age=" + req.FormValue("age"))
def bad6(user_input):
pool = await asyncpg.create_pool(user='postgres', command_timeout=60)
con = await pool.acquire()
try:
# ruleid: asyncpg-sqli
await con.execute('SELECT * FROM {}'.format(user_input))
finally:
await pool.release(con)
async def bad7(conn: Connection, user_input):
# ruleid: asyncpg-sqli
conn.execute('SELECT * FROM %s'%(user_input))
async def bad8(conn: Connection, user_input):
# ruleid: asyncpg-sqli
conn.fetchrow(f'SELECT * FROM {user_input}')
async def bad9(conn: Connection, user_input):
# ruleid: asyncpg-sqli
conn.execute(
"insert into %s values (%%s, %%s)" % ext.quote_ident(table_name),[10, 20])
def bad10(conn: asyncpg.Connection):
async with conn.transaction():
sql_query = 'SELECT * FROM {}'.format(user_input)
# ruleid: asyncpg-sqli
cur = await conn.cursor(sql_query)
def bad11(conn: asyncpg.Connection):
import common
# ruleid: asyncpg-sqli
cur = conn.fetch(common.bad_query_1.format(user_input))
def ok1(user_input):
con = await asyncpg.connect(user='postgres')
# ok: asyncpg-sqli
result = await con.copy_from_query(
'SELECT foo, bar FROM mytable WHERE foo > $1', 10,
output='file.csv', format='csv')
print(result)
def ok2(user_input):
con = await asyncpg.connect(user='postgres')
query = "SELECT name FROM users WHERE age=" + "3"
# ok: asyncpg-sqli
con.execute(query)
def ok3(con: Connection, user_input):
query = "SELECT name FROM users WHERE age="
query += "3"
# ok: asyncpg-sqli
con.execute(query)
def ok4(user_input):
con = await asyncpg.connect(user='postgres')
query = 'SELECT * FROM John'.format()
# ok: asyncpg-sqli
con.fetchval(query)
def ok5(user_input):
con = await asyncpg.connect(user='postgres')
query = 'SELECT * FROM John'% ()
# ok: asyncpg-sqli
con.execute(query)
def ok6(con: Connection, user_input):
query = f'SELECT * FROM John'
# ok: asyncpg-sqli
con.execute(query)
def ok7(con: Connection, user_input):
# ok: asyncpg-sqli
con.execute("SELECT name FROM users WHERE age=" + "3")
def ok8(user_input):
con = await asyncpg.connect(user='postgres')
# ok: asyncpg-sqli
con.execute('SELECT * FROM John'.format())
def ok9(user_input):
con = await asyncpg.connect(user='postgres')
# ok: asyncpg-sqli
con.execute('SELECT * FROM John'% ())
def ok10(user_input):
con = await asyncpg.connect(user='postgres')
# ok: asyncpg-sqli
con.execute(f'SELECT * FROM John')
def ok11(user_input):
con = await asyncpg.connect(user='postgres')
# ok: asyncpg-sqli
stmt = await con.prepare('SELECT ($1::int, $2::text)')
print(stmt.get_parameters())
Short Link: https://sg.run/0nBB