python.aws-lambda.security.sqlalchemy-sqli.sqlalchemy-sqli

Author
unknown
Download Count*
License
Detected SQL statement that is tainted by event
object. This could lead to SQL injection if the variable is user-controlled and not properly sanitized. In order to prevent SQL injection, use parameterized queries or prepared statements instead. You can use parameterized statements like so: cursor.execute('SELECT * FROM projects WHERE status = ?', 'active')
Run Locally
Run in CI
Defintion
rules:
- id: sqlalchemy-sqli
languages:
- python
message: "Detected SQL statement that is tainted by `event` object. This could
lead to SQL injection if the variable is user-controlled and not properly
sanitized. In order to prevent SQL injection, use parameterized queries or
prepared statements instead. You can use parameterized statements like so:
`cursor.execute('SELECT * FROM projects WHERE status = ?', 'active')`"
mode: taint
metadata:
references:
- https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection.execute
category: security
owasp:
- A01:2017 - Injection
- A03:2021 - Injection
cwe:
- "CWE-89: Improper Neutralization of Special Elements used in an SQL
Command ('SQL Injection')"
technology:
- aws-lambda
- sqlalchemy
cwe2022-top25: true
cwe2021-top25: true
subcategory:
- vuln
likelihood: HIGH
impact: MEDIUM
confidence: MEDIUM
license: Commons Clause License Condition v1.0[LGPL-2.1-only]
pattern-sinks:
- patterns:
- pattern: $QUERY
- pattern-inside: $CURSOR.execute($QUERY,...)
- pattern-inside: |
import sqlalchemy
...
pattern-sources:
- patterns:
- pattern: event
- pattern-inside: |
def $HANDLER(event, context):
...
severity: WARNING
Examples
sqlalchemy-sqli.py
import json
import os
import typing as t
import boto3
import sqlalchemy
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
SERVICE = "lambda-connection-pooling-demo"
logger = Logger(service=SERVICE)
tracer = Tracer(service=SERVICE)
class LambdaProxyIntegrationResponse(t.TypedDict, total=False):
statusCode: int
body: str
headers: t.Dict[str, t.Any]
DB_USER_SECRET_NAME = os.environ.get("DB_USER_SECRET_NAME")
DB_HOST = os.environ.get("DB_HOST")
secrentsmanager = boto3.client(service_name='secretsmanager')
get_secret_value_response = secrentsmanager.get_secret_value(SecretId=DB_USER_SECRET_NAME)
secret = json.loads(get_secret_value_response["SecretString"])
db_user = secret["username"]
db_password = secret["password"]
db_host = DB_HOST or secret["host"]
db_port = secret["port"]
url = f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/"
engine = sqlalchemy.create_engine(
url,
connect_args={
"ssl": {
"ssl_ca": "./AmazonRootCA1.pem",
}
},
pool_recycle=50,
)
def handler(event, context):
logger.debug("connecting to db...")
with engine.connect() as connection:
try:
# ruleid: sqlalchemy-sqli
connection.execute(f"SELECT * FROM foobar WHERE id = '{event['id']}'")
# ok: sqlalchemy-sqli
connection.execute("SELECT * FROM foobar WHERE id = '?'", event['id'])
except Exception as e:
logger.error("An error occured:")
print(e)
return {
"statusCode": 200,
"body": json.dumps({
"state": "ERROR",
"message": f"response from '{context.log_stream_name}'"
})
}
return {
"statusCode": 200,
"body": json.dumps({
"state": "SUCCESS",
"message": f"response from '{context.log_stream_name}'"
})
}
Short Link: https://sg.run/b48W