python.lang.security.audit.sqli.psycopg-sqli.psycopg-sqli

profile photo of semgrepsemgrep
Author
1,688
Download Count*

Detected string concatenation with a non-literal variable in a psycopg2 Python SQL statement. This could lead to SQL injection if the variable is user-controlled and not properly sanitized. In order to prevent SQL injection, use parameterized queries or prepared statements instead. You can use prepared statements by creating a 'sql.SQL' string. You can also use the pyformat binding style to create parameterized queries. For example: 'cur.execute(SELECT * FROM table WHERE name=%s, user_input)'

Run Locally

Run in CI

Defintion

rules:
  - id: psycopg-sqli
    languages:
      - python
    message: "Detected string concatenation with a non-literal variable in a
      psycopg2 Python SQL statement. This could lead to SQL injection if the
      variable is user-controlled and not properly sanitized. In order to
      prevent SQL injection, use parameterized queries or prepared statements
      instead. You can use prepared statements by creating a 'sql.SQL' string.
      You can also use the pyformat binding style to create parameterized
      queries. For example: 'cur.execute(SELECT * FROM table WHERE name=%s,
      user_input)'"
    metadata:
      cwe:
        - "CWE-89: Improper Neutralization of Special Elements used in an SQL
          Command ('SQL Injection')"
      references:
        - https://www.psycopg.org/docs/sql.html
      category: security
      technology:
        - psycopg
      owasp:
        - A01:2017 - Injection
        - A03:2021 - Injection
      cwe2022-top25: true
      cwe2021-top25: true
      subcategory:
        - audit
      likelihood: LOW
      impact: HIGH
      confidence: LOW
      license: Commons Clause License Condition v1.0[LGPL-2.1-only]
      vulnerability_class:
        - SQL Injection
    patterns:
      - pattern-either:
          - patterns:
              - pattern: $CUR.$METHOD(...,$QUERY,...)
              - pattern-either:
                  - pattern-inside: |
                      $QUERY = $X + $Y
                      ...
                  - pattern-inside: |
                      $QUERY += $X
                      ...
                  - pattern-inside: |
                      $QUERY = '...'.format(...)
                      ...
                  - pattern-inside: |
                      $QUERY = '...' % (...)
                      ...
                  - pattern-inside: |
                      $QUERY = f'...{$USERINPUT}...'
                      ...
              - pattern-not-inside: |
                  $QUERY += "..."
                  ...
              - pattern-not-inside: |
                  $QUERY = "..." + "..."
                  ...
              - pattern-not-inside: |
                  $QUERY = '...'.format()
                  ...
              - pattern-not-inside: |
                  $QUERY = '...' % ()
                  ...
          - pattern: $CUR.$METHOD(..., $X + $Y, ...)
          - pattern: $CUR.$METHOD(..., '...'.format(...), ...)
          - pattern: $CUR.$METHOD(..., '...' % (...), ...)
          - pattern: $CUR.$METHOD(..., f'...{$USERINPUT}...', ...)
      - pattern-either:
          - pattern-inside: |
              $CONN = psycopg2.connect(...)
              ...
              $CUR = $CONN.cursor(...)
              ...
          - pattern-inside: |
              $CONN = psycopg2.connect(...)
              ...
              with $CONN.cursor(...) as $CUR:
                ...
      - pattern-not: $CUR.$METHOD(..., "..." + "...", ...)
      - pattern-not: $CUR.$METHOD(..., '...'.format(), ...)
      - pattern-not: $CUR.$METHOD(..., '...'%(), ...)
      - metavariable-regex:
          metavariable: $METHOD
          regex: ^(execute|executemany|mogrify)$
    severity: WARNING

Examples

psycopg-sqli.py

import psycopg2

def bad1():
    conn = psycopg2.connect("dbname=test user=postgres")

    # Open a cursor to perform database operations
    cur = conn.cursor()

    # Execute a command: this creates a new table
    query = "SELECT name FROM users WHERE age=" + req.FormValue("age")
    # ruleid: psycopg-sqli
    cur.execute(query)

def bad2():
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    sql_query = 'SELECT * FROM {}'.format(user_input)
    # ruleid: psycopg-sqli
    cur.execute(sql_query)

def bad3():
    conn = psycopg2.connect(DSN)

    with conn:
        with conn.cursor() as cur:
            sql_query = 'SELECT * FROM %s'%(user_input)
            # ruleid: psycopg-sqli
            cur.execute(sql_query)

def bad4(user_input):
    conn = psycopg2.connect(DSN)
    with conn:
        with conn.cursor() as cur:
            sql_query = f'SELECT * FROM {user_input}'
            # ruleid: psycopg-sqli
            cur.execute(sql_query)

def bad5():
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    # ruleid: psycopg-sqli
    cur.executemany("SELECT name FROM users WHERE age=" + req.FormValue("age"))

def bad6(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    # ruleid: psycopg-sqli
    cur.execute('SELECT * FROM {}'.format(user_input))

def bad7(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    # ruleid: psycopg-sqli
    cur.execute('SELECT * FROM %s'%(user_input))

def bad8(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    # ruleid: psycopg-sqli
    cur.execute(f'SELECT * FROM {user_input}')

def bad9():
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    # ruleid: psycopg-sqli
    cur.execute(
    "insert into %s values (%%s, %%s)" % ext.quote_ident(table_name),[10, 20])

def ok1(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    SQL = "INSERT INTO authors (name) VALUES (%s);"
    # ok: psycopg-sqli
    cur.execute(SQL, user_input)

def ok2(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    query = "SELECT name FROM users WHERE age=" + "3"
    # ok: psycopg-sqli
    cur.execute(query)

def ok3(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    query = "SELECT name FROM users WHERE age="
    query += "3"
    # ok: psycopg-sqli
    cur.execute(query)

def ok4(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    query = 'SELECT * FROM John'.format()
    # ok: psycopg-sqli
    cur.execute(query)

def ok5(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    query = 'SELECT * FROM John'% ()
    # ok: psycopg-sqli
    cur.execute(query)

def ok6(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    query = f'SELECT * FROM John'
    # ok: psycopg-sqli
    cur.execute(query)

def ok7(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    # ok: psycopg-sqli
    cur.execute("SELECT name FROM users WHERE age=" + "3")

def ok8(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    # ok: psycopg-sqli
    cur.execute('SELECT * FROM John'.format())

def ok9(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    # ok: psycopg-sqli
    cur.execute('SELECT * FROM John'% ())

def ok10(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    # ok: psycopg-sqli
    cur.execute(f'SELECT * FROM John')

def ok11(user_input):
    conn = psycopg2.connect("dbname=test user=postgres")
    cur = conn.cursor()
    query = sql.SQL("select {field} from {table} where {pkey} = %s").format(
    field=sql.Identifier('my_name'),
    table=sql.Identifier('some_table'),
    pkey=sql.Identifier('id'))
    # ok: psycopg-sqli
    cur.execute(query)