python.django.security.audit.custom-expression-as-sql.custom-expression-as-sql

Community Favorite
profile photo of semgrepsemgrep
Author
82,263
Download Count*

Detected a Custom Expression ''$EXPRESSION'' calling ''as_sql(...).'' This could lead to SQL injection, which can result in attackers exfiltrating sensitive data. Instead, ensure no user input enters this function or that user input is properly sanitized.

Run Locally

Run in CI

Defintion

rules:
  - id: custom-expression-as-sql
    languages:
      - python
    message: Detected a Custom Expression ''$EXPRESSION'' calling ''as_sql(...).''
      This could lead to SQL injection, which can result in attackers
      exfiltrating sensitive data. Instead, ensure no user input enters this
      function or that user input is properly sanitized.
    metadata:
      cwe:
        - "CWE-89: Improper Neutralization of Special Elements used in an SQL
          Command ('SQL Injection')"
      owasp:
        - A01:2017 - Injection
        - A03:2021 - Injection
      references:
        - https://docs.djangoproject.com/en/3.0/ref/models/expressions/#django.db.models.Func.as_sql
        - https://semgrep.dev/blog/2020/preventing-sql-injection-a-django-authors-perspective/
      category: security
      technology:
        - django
      cwe2022-top25: true
      cwe2021-top25: true
      subcategory:
        - audit
      likelihood: LOW
      impact: HIGH
      confidence: LOW
      license: Commons Clause License Condition v1.0[LGPL-2.1-only]
      vulnerability_class:
        - SQL Injection
    pattern: $EXPRESSION.as_sql(...)
    severity: WARNING

Examples

custom-expression-as-sql.py

import django
from django.db.models.aggregates import Aggregate as DjangoAggregate
from django.db.models.sql.aggregates import Aggregate as DjangoSqlAggregate

def query_as_sql(query, connection):
    # ruleid: custom-expression-as-sql
    return query.get_compiler(connection=connection).as_sql()

class SqlAggregate(DjangoSqlAggregate):
    conditional_template = '%(function)s(CASE WHEN %(condition)s THEN %(field)s ELSE null END)'

    def __init__(self, col, source=None, is_summary=False, condition=None, **extra):
        super(SqlAggregate, self).__init__(col, source, is_summary, **extra)
        self.condition = condition

    def relabel_aliases(self, change_map):
        if VERSION < (1, 7):
            super(SqlAggregate, self).relabel_aliases(change_map)
        if self.has_condition:
            condition_change_map = dict((k, v) for k, v in \
                change_map.items() if k in self.condition.query.alias_map
            )
            self.condition.query.change_aliases(condition_change_map)

    def relabeled_clone(self, change_map):
        self.relabel_aliases(change_map)
        return super(SqlAggregate, self).relabeled_clone(change_map)

    def as_sql(self, qn, connection):
        if self.has_condition:
            self.sql_template = self.conditional_template
            self.extra['condition'] = self._condition_as_sql(qn, connection)

        # ruleid: custom-expression-as-sql
        return super(SqlAggregate, self).as_sql(qn, connection)

    @property
    def has_condition(self):
        # Warning: bool(QuerySet) will hit the database
        return self.condition is not None

    def _condition_as_sql(self, qn, connection):
        '''
        Return sql for condition.
        '''
        def escape(value):
            if isinstance(value, bool):
                value = str(int(value))
            if isinstance(value, six.string_types):
                # Escape params used with LIKE
                if '%' in value:
                    value = value.replace('%', '%%')
                # Escape single quotes
                if "'" in value:
                    value = value.replace("'", "''")
                # Add single quote to text values
                value = "'" + value + "'"
            return value

        # ruleid: custom-expression-as-sql
        sql, param = self.condition.query.where.as_sql(qn, connection)
        param = map(escape, param)

        return sql % tuple(param)