python.aws-lambda.security.psycopg-sqli.psycopg-sqli

Author
unknown
Download Count*
License
Detected SQL statement that is tainted by event
object. This could lead to SQL injection if the variable is user-controlled and not properly sanitized. In order to prevent SQL injection, used parameterized queries or prepared statements instead. You can use parameterized statements like so: cursor.execute('SELECT * FROM projects WHERE status = %s', 'active')
Run Locally
Run in CI
Defintion
rules:
- id: psycopg-sqli
languages:
- python
message: "Detected SQL statement that is tainted by `event` object. This could
lead to SQL injection if the variable is user-controlled and not properly
sanitized. In order to prevent SQL injection, used parameterized queries
or prepared statements instead. You can use parameterized statements like
so: `cursor.execute('SELECT * FROM projects WHERE status = %s',
'active')`"
mode: taint
metadata:
references:
- https://www.psycopg.org/docs/cursor.html#cursor.execute
- https://www.psycopg.org/docs/cursor.html#cursor.executemany
- https://www.psycopg.org/docs/cursor.html#cursor.mogrify
category: security
owasp:
- A01:2017 - Injection
- A03:2021 - Injection
cwe:
- "CWE-89: Improper Neutralization of Special Elements used in an SQL
Command ('SQL Injection')"
technology:
- aws-lambda
- psycopg
- psycopg2
cwe2022-top25: true
cwe2021-top25: true
subcategory:
- vuln
likelihood: HIGH
impact: MEDIUM
confidence: MEDIUM
license: Commons Clause License Condition v1.0[LGPL-2.1-only]
pattern-sinks:
- patterns:
- pattern: $QUERY
- pattern-either:
- pattern-inside: $CURSOR.execute($QUERY,...)
- pattern-inside: $CURSOR.executemany($QUERY,...)
- pattern-inside: $CURSOR.mogrify($QUERY,...)
- pattern-inside: |
import psycopg2
...
pattern-sources:
- patterns:
- pattern: event
- pattern-inside: |
def $HANDLER(event, context):
...
severity: WARNING
Examples
psycopg-sqli.py
import psycopg2
import json
def lambda_handler(event, context):
ssm = boto3.client('ssm')
database = ssm.get_parameter(Name = 't2-db-dbname')
user = ssm.get_parameter(Name = 't2-db-user')
port = ssm.get_parameter(Name = 't2-db-port')
tableName = ssm.get_parameter(Name = 't2-db-tablename')
password = ssm.get_parameter(Name = 't2-db-password', WithDecryption = True)
host = ssm.get_parameter(Name = 't2-db-host', WithDecryption = True)
engine = psycopg2.connect(
database=database['Parameter']['Value'],
user=user['Parameter']['Value'],
password=password['Parameter']['Value'],
host=host['Parameter']['Value'],
port=port['Parameter']['Value']
)
tableName = tableName['Parameter']['Value']
keyphrase = event['keyphrase']
username = event['username']
language = event['translateTarget']
cur = conn.cursor()
findQuery = '''SELECT file_name FROM {tableName} WHERE '{keyphrase}' = ANY (keyphrases) AND target_language = '{language}' AND username = '{username}' '''.format(username=username, keyphrase=keyphrase, language=language, tableName = tableName)
# ruleid: psycopg-sqli
cur.execute(findQuery)
result = cur.fetchone()
returnList = []
# ok: psycopg-sqli
cur.execute("SELECT * FROM foobar WHERE id = '%s'", username)
if (result is None):
returnList.append('None')
else:
for i in range (0,len(result)):
returnList.append(result[i])
response = {
'searchedFiles':returnList,
'language' : language
}
engine.commit()
engine.close()
return response
Short Link: https://sg.run/9L8r