python.pyramid.security.sqlalchemy-sql-injection.pyramid-sqlalchemy-sql-injection

profile photo of semgrepsemgrep
Author
unknown
Download Count*

Distinct, Having, Group_by, Order_by, and Filter in SQLAlchemy can cause sql injections if the developer inputs raw SQL into the before-mentioned clauses. This pattern captures relevant cases in which the developer inputs raw SQL into the distinct, having, group_by, order_by or filter clauses and injects user-input into the raw SQL with any function besides "bindparams". Use bindParams to securely bind user-input to SQL statements.

Run Locally

Run in CI

Defintion

rules:
  - id: pyramid-sqlalchemy-sql-injection
    message: Distinct, Having, Group_by, Order_by, and Filter in SQLAlchemy can
      cause sql injections if the developer inputs raw SQL into the
      before-mentioned clauses. This pattern captures relevant cases in which
      the developer inputs raw SQL into the distinct, having, group_by, order_by
      or filter clauses and injects user-input into the raw SQL with any
      function besides "bindparams". Use bindParams to securely bind user-input
      to SQL statements.
    languages:
      - python
    severity: ERROR
    metadata:
      category: security
      cwe:
        - "CWE-89: Improper Neutralization of Special Elements used in an SQL
          Command ('SQL Injection')"
      owasp:
        - A01:2017 - Injection
        - A03:2021 - Injection
      references:
        - https://docs.sqlalchemy.org/en/14/tutorial/data_select.html#tutorial-selecting-data
      technology:
        - pyramid
      cwe2022-top25: true
      cwe2021-top25: true
      subcategory:
        - vuln
      likelihood: MEDIUM
      impact: HIGH
      confidence: MEDIUM
      license: Commons Clause License Condition v1.0[LGPL-2.1-only]
      vulnerability_class:
        - SQL Injection
    mode: taint
    pattern-sources:
      - patterns:
          - pattern-inside: |
              from pyramid.view import view_config
              ...
              @view_config( ... )
              def $VIEW($REQ):
                ...
          - pattern: $REQ.$ANYTHING
          - pattern-not: $REQ.dbsession
    pattern-sinks:
      - patterns:
          - pattern-inside: |
              $QUERY = $REQ.dbsession.query(...)
              ...
          - pattern-either:
              - pattern: |
                  $QUERY.$SQLFUNC("...".$FORMATFUNC(..., $SINK, ...))
              - pattern: |
                  $QUERY.join(...).$SQLFUNC("...".$FORMATFUNC(..., $SINK, ...))
          - pattern: $SINK
          - metavariable-regex:
              metavariable: $SQLFUNC
              regex: (group_by|order_by|distinct|having|filter)
          - metavariable-regex:
              metavariable: $FORMATFUNC
              regex: (?!bindparams)
    fix-regex:
      regex: format
      replacement: bindparams

Examples

sqlalchemy-sql-injection.py

from pyramid.view import view_config

### True positives ###

@view_config(route_name='home_bad1', renderer='my_app:templates/mytemplate.jinja2')
def my_bad_home1(request):
    try:
        param = request.params['foo']
        query = request.dbsession.query(models.MyModel)
        
        # ruleid: pyramid-sqlalchemy-sql-injection
        one = query.distinct("foo={}".format(param))
    except SQLAlchemyError:
        return Response("Database error", content_type='text/plain', status=500)
    return {'one': one, 'project': 'my_proj'}

@view_config(route_name='home_bad2', renderer='my_app:templates/mytemplate.jinja2')
def my_bad_home2(request):
    try:
        param = request.params['foo']
        query = request.dbsession.query(models.MyModel)
        
        # ruleid: pyramid-sqlalchemy-sql-injection
        one = query.join(DeploymentPermission).having("oops{}".format(param))
    except SQLAlchemyError:
        return Response("Database error", content_type='text/plain', status=500)
    return {'one': one, 'project': 'my_proj'}

@view_config(route_name='home_bad3', renderer='my_app:templates/mytemplate.jinja2')
def my_bad_home3(request):
    try:
        param = request.params['foo']
        query = request.dbsession.query(models.MyModel)
        
        # ruleid: pyramid-sqlalchemy-sql-injection
        one = query.group_by("oops{}".format(param))
    except SQLAlchemyError:
        return Response("Database error", content_type='text/plain', status=500)
    return {'one': one, 'project': 'my_proj'}

@view_config(route_name='home_bad4', renderer='my_app:templates/mytemplate.jinja2')
def my_bad_home4(request):
    try:
        param = request.params['foo']
        query = request.dbsession.query(models.MyModel)
        
        # ruleid: pyramid-sqlalchemy-sql-injection
        one = query.order_by("oops{}".format(param)).one()
    except SQLAlchemyError:
        return Response("Database error", content_type='text/plain', status=500)
    return {'one': one, 'project': 'my_proj'}

@view_config(route_name='home_bad5', renderer='my_app:templates/mytemplate.jinja2')
def my_bad_home5(request):
    try:
        param = request.params['foo']
        query = request.dbsession.query(models.MyModel)
        
        # ruleid: pyramid-sqlalchemy-sql-injection
        one = query.filter("oops{}".format(param)).one()
    except SQLAlchemyError:
        return Response("Database error", content_type='text/plain', status=500)
    return {'one': one, 'project': 'my_proj'}


### True negatives ###

@view_config(route_name='home_ok1', renderer='my_app:templates/mytemplate.jinja2')
def my_ok_home1(request):
    try:
        query = request.dbsession.query(models.MyModel)
        # ok: pyramid-sqlalchemy-sql-injection
        one = query.filter(models.MyModel.name == 'one').one()
    except SQLAlchemyError:
        return Response("Database error", content_type='text/plain', status=500)
    return {'one': one, 'project': 'my_proj'}

@view_config(route_name='home_ok2', renderer='my_app:templates/mytemplate.jinja2')
def my_ok_home2(request):
    try:
        param = request.params['foo']
        query = request.dbsession.query(models.MyModel)
        # ok: pyramid-sqlalchemy-sql-injection
        one = query.filter("oops{}".bindparams(param)).one()
    except SQLAlchemyError:
        return Response("Database error", content_type='text/plain', status=500)
    return {'one': one, 'project': 'my_proj'}

def not_a_view(something):
    try:
        foo = something.params['foo']
        query = something.dbsession.query(models.MyModel)
        # ok: pyramid-sqlalchemy-sql-injection
        one = query.filter("{}".format(foo)).one()
    except SQLAlchemyError:
        return Response("Database error", content_type='text/plain', status=500)
    return {'one': one, 'project': 'my_proj'}