python.aws-lambda.security.pymssql-sqli.pymssql-sqli

Author
unknown
Download Count*
License
Detected SQL statement that is tainted by event
object. This could lead to SQL injection if the variable is user-controlled and not properly sanitized. In order to prevent SQL injection, used parameterized queries or prepared statements instead. You can use parameterized statements like so: cursor.execute('SELECT * FROM projects WHERE status = %s', 'active')
Run Locally
Run in CI
Defintion
rules:
- id: pymssql-sqli
languages:
- python
message: "Detected SQL statement that is tainted by `event` object. This could
lead to SQL injection if the variable is user-controlled and not properly
sanitized. In order to prevent SQL injection, used parameterized queries
or prepared statements instead. You can use parameterized statements like
so: `cursor.execute('SELECT * FROM projects WHERE status = %s',
'active')`"
mode: taint
metadata:
references:
- https://pypi.org/project/pymssql/
category: security
owasp:
- A01:2017 - Injection
- A03:2021 - Injection
cwe:
- "CWE-89: Improper Neutralization of Special Elements used in an SQL
Command ('SQL Injection')"
technology:
- aws-lambda
- pymssql
cwe2022-top25: true
cwe2021-top25: true
subcategory:
- vuln
likelihood: HIGH
impact: MEDIUM
confidence: MEDIUM
license: Commons Clause License Condition v1.0[LGPL-2.1-only]
pattern-sinks:
- patterns:
- pattern: $QUERY
- pattern-inside: $CURSOR.execute($QUERY,...)
- pattern-inside: |
import pymssql
...
pattern-sources:
- patterns:
- pattern: event
- pattern-inside: |
def $HANDLER(event, context):
...
severity: WARNING
Examples
pymssql-sqli.py
import boto3
import json
import logging
import os
import pymssql
def lambda_handler(event, context):
current_user = event['user_id']
secret_dict = get_secret_dict()
port = str(secret_dict['port']) if 'port' in secret_dict else '1433'
dbname = secret_dict['dbname'] if 'dbname' in secret_dict else 'master'
conn = pymssql.connect(server=secret_dict['host'],
user=secret_dict['username'],
password=secret_dict['password'],
database=dbname,
port=port,
login_timeout=5,
as_dict=True)
cursor = conn.cursor(as_dict=True)
query = "SELECT roleprin.name FROM sys.database_role_members rolemems "\
"JOIN sys.database_principals roleprin ON roleprin.principal_id = rolemems.role_principal_id "\
"JOIN sys.database_principals userprin ON userprin.principal_id = rolemems.member_principal_id "\
"WHERE userprin.name = '%s'" % current_user
# ruleid: pymssql-sqli
cursor.execute(query)
# ok: pymssql-sqli
cursor.execute("SELECT * FROM user WHERE id ='%s'", current_user)
return {
'statusCode': 200,
'body': 'ok'
}
Short Link: https://sg.run/yXvP