python.lang.security.audit.sqli.aiopg-sqli.aiopg-sqli
semgrep
Author
1,688
Download Count*
License
Detected string concatenation with a non-literal variable in an aiopg Python SQL statement. This could lead to SQL injection if the variable is user-controlled and not properly sanitized. In order to prevent SQL injection, use parameterized queries instead. You can create parameterized queries like so: 'cur.execute("SELECT %s FROM table", (user_value,))'.
Run Locally
Run in CI
Defintion
rules:
- id: aiopg-sqli
languages:
- python
message: "Detected string concatenation with a non-literal variable in an aiopg
Python SQL statement. This could lead to SQL injection if the variable is
user-controlled and not properly sanitized. In order to prevent SQL
injection, use parameterized queries instead. You can create parameterized
queries like so: 'cur.execute(\"SELECT %s FROM table\", (user_value,))'."
metadata:
references:
- https://github.com/aio-libs/aiopg
category: security
cwe:
- "CWE-89: Improper Neutralization of Special Elements used in an SQL
Command ('SQL Injection')"
technology:
- aiopg
owasp:
- A01:2017 - Injection
- A03:2021 - Injection
cwe2022-top25: true
cwe2021-top25: true
subcategory:
- audit
likelihood: LOW
impact: HIGH
confidence: LOW
license: Commons Clause License Condition v1.0[LGPL-2.1-only]
vulnerability_class:
- SQL Injection
patterns:
- pattern-either:
- patterns:
- pattern: $CUR.$METHOD(...,$QUERY,...)
- pattern-either:
- pattern-inside: |
$QUERY = $X + $Y
...
- pattern-inside: |
$QUERY += $X
...
- pattern-inside: |
$QUERY = '...'.format(...)
...
- pattern-inside: |
$QUERY = '...' % (...)
...
- pattern-inside: |
$QUERY = f'...{$USERINPUT}...'
...
- pattern-not-inside: |
$QUERY += "..."
...
- pattern-not-inside: |
$QUERY = "..." + "..."
...
- pattern-not-inside: |
$QUERY = '...'.format()
...
- pattern-not-inside: |
$QUERY = '...' % ()
...
- pattern: $CUR.$METHOD(..., $X + $Y, ...)
- pattern: $CUR.$METHOD(..., '...'.format(...), ...)
- pattern: $CUR.$METHOD(..., '...' % (...), ...)
- pattern: $CUR.$METHOD(..., f'...{$USERINPUT}...', ...)
- pattern-either:
- pattern-inside: |
$CONN = await aiopg.connect(...)
...
$CUR = await $CONN.cursor(...)
...
- pattern-inside: |
$POOL = await aiopg.create_pool(...)
...
async with $POOL.acquire(...) as $CONN:
...
async with $CONN.cursor(...) as $CUR:
...
- pattern-inside: |
$POOL = await aiopg.create_pool(...)
...
with (await $POOL.cursor(...)) as $CUR:
...
- pattern-inside: |
$POOL = await aiopg.create_pool(...)
...
async with $POOL as $CONN:
...
$CUR = await $CONN.cursor(...)
...
- pattern-inside: |
$POOL = await aiopg.create_pool(...)
...
async with $POOL.cursor(...) as $CUR:
...
- pattern-not: $CUR.$METHOD(..., "..." + "...", ...)
- pattern-not: $CUR.$METHOD(..., '...'.format(), ...)
- pattern-not: $CUR.$METHOD(..., '...'%(), ...)
- metavariable-regex:
metavariable: $METHOD
regex: ^(execute)$
severity: WARNING
Examples
aiopg-sqli.py
import asyncio
import asyncpg
def bad1():
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
query = "SELECT name FROM users WHERE age=" + req.FormValue("age")
# ruleid: aiopg-sqli
await cur.execute(query)
async def bad2():
pool = await aiopg.create_pool(dsn)
with (await pool.cursor()) as cur:
sql_query = 'SELECT * FROM {}'.format(user_input)
# ruleid: aiopg-sqli
await cur.execute(sql_query)
ret = await cur.fetchone()
assert ret == (1,), ret
async def bad3():
pool = await aiopg.create_pool(dsn)
async with pool.acquire() as conn:
sql_query = 'SELECT * FROM %s'%(user_input)
async with conn.cursor() as cur:
# ruleid: aiopg-sqli
await cur.execute(sql_query)
ret = []
async for row in cur:
ret.append(row)
def bad4(user_input):
pool = await aiopg.create_pool(dsn)
async with pool as conn:
cur = await conn.cursor()
sql_query = f'SELECT * FROM {user_input}'
# ruleid: aiopg-sqli
await cur.execute(sql_query)
def bad5():
pool = await aiopg.create_pool(dsn)
async with pool.cursor() as cur:
# ruleid: aiopg-sqli
await cur.execute("SELECT name FROM users WHERE age=" + req.FormValue("age"))
def bad6(user_input):
pool = await aiopg.create_pool(dsn)
async with pool.cursor() as cur:
# ruleid: aiopg-sqli
await cur.execute('SELECT * FROM {}'.format(user_input))
async def bad7(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
# ruleid: aiopg-sqli
cur.execute('SELECT * FROM %s'%(user_input))
async def bad8(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
# ruleid: aiopg-sqli
cur.execute(f'SELECT * FROM {user_input}')
async def bad9(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
# ruleid: aiopg-sqli
cur.execute(
"insert into %s values (%%s, %%s)" % ext.quote_ident(table_name),[10, 20])
def ok1(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
# ok: aiopg-sqli
cur.execute("SELECT * FROM test WHERE id = %s", (3,))
def ok2(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
query = "SELECT name FROM users WHERE age=" + "3"
# ok: aiopg-sqli
cur.execute(query)
def ok3(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
query = "SELECT name FROM users WHERE age="
query += "3"
# ok: aiopg-sqli
cur.execute(query)
def ok4(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
query = 'SELECT * FROM John'.format()
# ok: aiopg-sqli
cur.fetchval(query)
def ok5(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
query = 'SELECT * FROM John'% ()
# ok: aiopg-sqli
cur.execute(query)
def ok6(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
query = f'SELECT * FROM John'
# ok: aiopg-sqli
cur.execute(query)
def ok7(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
# ok: aiopg-sqli
cur.execute("SELECT name FROM users WHERE age=" + "3")
def ok8(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
# ok: aiopg-sqli
cur.execute('SELECT * FROM John'.format())
def ok9(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
# ok: aiopg-sqli
cur.execute('SELECT * FROM John'% ())
def ok10(user_input):
conn = await aiopg.connect(database='aiopg',
user='aiopg',
password='secret',
host='127.0.0.1')
cur = await conn.cursor()
# ok: aiopg-sqli
cur.execute(f'SELECT * FROM John')
Short Link: https://sg.run/WgGL