python.lang.security.audit.sqli.pg8000-sqli.pg8000-sqli

profile photo of semgrepsemgrep
Author
1,688
Download Count*

Detected string concatenation with a non-literal variable in a pg8000 Python SQL statement. This could lead to SQL injection if the variable is user-controlled and not properly sanitized. In order to prevent SQL injection, use parameterized queries or prepared statements instead. You can create parameterized queries like so: 'conn.run("SELECT :value FROM table", value=myvalue)'. You can also create prepared statements with 'conn.prepare': 'conn.prepare("SELECT (:v) FROM table")'

Run Locally

Run in CI

Defintion

rules:
  - id: pg8000-sqli
    languages:
      - python
    message: "Detected string concatenation with a non-literal variable in a pg8000
      Python SQL statement. This could lead to SQL injection if the variable is
      user-controlled and not properly sanitized. In order to prevent SQL
      injection, use parameterized queries or prepared statements instead. You
      can create parameterized queries like so: 'conn.run(\"SELECT :value FROM
      table\", value=myvalue)'. You can also create prepared statements with
      'conn.prepare': 'conn.prepare(\"SELECT (:v) FROM table\")'"
    metadata:
      references:
        - https://github.com/tlocke/pg8000
      cwe:
        - "CWE-89: Improper Neutralization of Special Elements used in an SQL
          Command ('SQL Injection')"
      category: security
      technology:
        - pg8000
      owasp:
        - A01:2017 - Injection
        - A03:2021 - Injection
      cwe2022-top25: true
      cwe2021-top25: true
      subcategory:
        - audit
      likelihood: LOW
      impact: HIGH
      confidence: LOW
      license: Commons Clause License Condition v1.0[LGPL-2.1-only]
      vulnerability_class:
        - SQL Injection
    patterns:
      - pattern-either:
          - patterns:
              - pattern: $CONN.$METHOD(...,$QUERY,...)
              - pattern-either:
                  - pattern-inside: |
                      $QUERY = $X + $Y
                      ...
                  - pattern-inside: |
                      $QUERY += $X
                      ...
                  - pattern-inside: |
                      $QUERY = '...'.format(...)
                      ...
                  - pattern-inside: |
                      $QUERY = '...' % (...)
                      ...
                  - pattern-inside: |
                      $QUERY = f'...{$USERINPUT}...'
                      ...
              - pattern-not-inside: |
                  $QUERY += "..."
                  ...
              - pattern-not-inside: |
                  $QUERY = "..." + "..."
                  ...
              - pattern-not-inside: |
                  $QUERY = '...'.format()
                  ...
              - pattern-not-inside: |
                  $QUERY = '...' % ()
                  ...
          - pattern: $CONN.$METHOD(..., $X + $Y, ...)
          - pattern: $CONN.$METHOD(..., '...'.format(...), ...)
          - pattern: $CONN.$METHOD(..., '...' % (...), ...)
          - pattern: $CONN.$METHOD(..., f'...{$USERINPUT}...', ...)
      - pattern-either:
          - pattern-inside: |
              $CONN = pg8000.native.Connection(...)
              ...
          - pattern-inside: |
              $CONN = pg8000.dhapi.connect(...)
              ...
          - pattern-inside: |
              $CONN1 = pg8000.connect(...)
              ...
              $CONN = $CONN1.cursor(...)
              ...
          - pattern-inside: |
              $CONN = pg8000.connect(...)
              ...
      - pattern-not: $CONN.$METHOD(..., "..." + "...", ...)
      - pattern-not: $CONN.$METHOD(..., '...'.format(), ...)
      - pattern-not: $CONN.$METHOD(..., '...'%(), ...)
      - metavariable-regex:
          metavariable: $METHOD
          regex: ^(run|execute|executemany|prepare)$
    severity: WARNING

Examples

pg8000-sqli.py

import pg8000.native as pg
import pg8000.dbapi

def bad1():
    conn = pg.Connection("postgres", password="cpsnow")

    query = "SELECT name FROM users WHERE age=" + req.FormValue("age")
    # ruleid: pg8000-sqli
    conn.run(query)

def bad2():
    db = pg8000.connect(**db_connect)
    self.assertEqual(db.notifies, [])
    cursor = db.cursor()
    sql_query = 'SELECT * FROM {}'.format(user_input)
    # ruleid: pg8000-sqli
    cursor.execute(sql_query)

def bad3():
    connection = pg8000.connect(os.environ['DB_USER'], password=os.environ['DB_PASSWORD'], port=os.environ['DB_PORT'], host=os.environ['DB_HOST'])
    sql_query = 'SELECT * FROM %s'%(user_input)
    # ruleid: pg8000-sqli
    connection.run(sql_query)

def bad4(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    cursor = conn.cursor()
    sql_query = f'SELECT * FROM {user_input}'
    # ruleid: pg8000-sqli
    cursor.execute(sql_query)

def bad5():
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    # ruleid: pg8000-sqli
    conn.executemany("SELECT name FROM users WHERE age=" + req.FormValue("age"))

def bad6(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    # ruleid: pg8000-sqli
    conn.run('SELECT * FROM {}'.format(user_input))

def bad7(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    # ruleid: pg8000-sqli
    conn.run('SELECT * FROM %s'%(user_input))

def bad8(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    # ruleid: pg8000-sqli
    conn.execute(f'SELECT * FROM {user_input}')

def bad9():
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    # ruleid: pg8000-sqli
    conn.execute(
    "insert into %s values (%%s, %%s)" % table_name,[10, 20])

def ok1(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    SQL = "INSERT INTO authors (name) VALUES :userinput;"
    # ok: pg8000-sqli
    conn.execute(SQL, userinput=user_input)

def ok2(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    query = "SELECT name FROM users WHERE age=" + "3"
    # ok: pg8000-sqli
    conn.execute(query)

def ok3(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    query = "SELECT name FROM users WHERE age="
    query += "3"
    # ok: pg8000-sqli
    conn.execute(query)

def ok4(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    query = 'SELECT * FROM John'.format()
    # ok: pg8000-sqli
    conn.execute(query)

def ok5(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    query = 'SELECT * FROM John'% ()
    # ok: pg8000-sqli
    conn.execute(query)

def ok6(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    query = f'SELECT * FROM John'
    # ok: pg8000-sqli
    conn.execute(query)

def ok7(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    # ok: pg8000-sqli
    conn.execute("SELECT name FROM users WHERE age=" + "3")

def ok8(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    # ok: pg8000-sqli
    conn.execute('SELECT * FROM John'.format())

def ok9(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    # ok: pg8000-sqli
    conn.execute('SELECT * FROM John'% ())

def ok10(user_input):
    conn = pg8000.connect(user='postgres', password='password', database='andromedabot')
    # ok: pg8000-sqli
    conn.execute(f'SELECT * FROM John')

def ok11(user_input):
    conn = pg8000.native.Connection("postgres", password="cpsnow")
    conn.prepare("SELECT (:v) FROM table")
    # ok: pg8000-sqli
    cur.run(v = user_input)