python.aws-lambda.security.pymysql-sqli.pymysql-sqli

Author
unknown
Download Count*
License
Detected SQL statement that is tainted by event
object. This could lead to SQL injection if the variable is user-controlled and not properly sanitized. In order to prevent SQL injection, use parameterized queries or prepared statements instead. You can use parameterized statements like so: cursor.execute('SELECT * FROM projects WHERE status = %s', ('active'))
Run Locally
Run in CI
Defintion
rules:
- id: pymysql-sqli
languages:
- python
message: "Detected SQL statement that is tainted by `event` object. This could
lead to SQL injection if the variable is user-controlled and not properly
sanitized. In order to prevent SQL injection, use parameterized queries or
prepared statements instead. You can use parameterized statements like so:
`cursor.execute('SELECT * FROM projects WHERE status = %s', ('active'))`"
mode: taint
metadata:
references:
- https://pypi.org/project/PyMySQL/#id4
category: security
owasp:
- A01:2017 - Injection
- A03:2021 - Injection
cwe:
- "CWE-89: Improper Neutralization of Special Elements used in an SQL
Command ('SQL Injection')"
technology:
- aws-lambda
- pymysql
cwe2022-top25: true
cwe2021-top25: true
subcategory:
- vuln
likelihood: HIGH
impact: MEDIUM
confidence: MEDIUM
license: Commons Clause License Condition v1.0[LGPL-2.1-only]
pattern-sinks:
- patterns:
- pattern: $QUERY
- pattern-inside: $CURSOR.execute($QUERY,...)
- pattern-either:
- pattern-inside: |
import pymysql
...
- pattern-inside: |
import pymysql.cursors
...
pattern-sources:
- patterns:
- pattern: event
- pattern-inside: |
def $HANDLER(event, context):
...
severity: WARNING
Examples
pymysql-sqli.py
import boto3
import base64
import json
import os
import pymysql
import pymysql.cursors
DB_CREDS = os.environ['DB_CREDS']
DB_NAME = os.environ['DB_NAME']
def connection_info(db_creds):
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager'
)
get_secret_value_response = client.get_secret_value(SecretId=db_creds)
if 'SecretString' in get_secret_value_response:
secret = json.loads(get_secret_value_response['SecretString'])
else:
secret = json.loads(base64.b64decode(get_secret_value_response['SecretBinary']))
return secret
def lambda_handler(event, context):
status_code = 400
try:
user_id = event['requestContext']['identity']['cognitoIdentityId']
sql = '''
SELECT
id
,userId
,stationId
,stationName
,duration
,price
,createdDate
FROM
rideTransactions
WHERE
userId = "{}"
ORDER BY
createdDate DESC;
'''.format(user_id)
conn_info = connection_info(DB_CREDS)
conn = pymysql.connect(host=conn_info['host'], user=conn_info['username'], password=conn_info['password'], database=conn_info['dbname'], connect_timeout=30, cursorclass=pymysql.cursors.DictCursor)
with conn.cursor() as cur:
# ruleid: pymysql-sqli
cur.execute(sql)
rows = cur.fetchall()
# ok: pymysql-sqli
cur.execute('SELECT * FROM foobar')
rows2 = cur.fetchall()
conn.close()
output = [{'id': c['id'], 'userId': c['userId'], 'stationId': c['stationId'], 'stationName': c['stationName'], 'duration': c['duration'], 'price': float(c['price']), 'createdDate': c['createdDate'].isoformat()} for c in rows]
status_code = 200
except Exception as e:
print('ERROR: ', e)
output = '{}'.format(e)
response = {
'statusCode': status_code,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Credentials': True,
'Access-Control-Allow-Headers': 'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token,X-Amz-User-Agent',
'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS,HEAD,PATCH',
'Content-Type': 'application/json'
},
'body': json.dumps(output)
}
print('[INFO] Query response: {}'.format(json.dumps(response)))
return response
Short Link: https://sg.run/reve