python.lang.security.audit.sqli.aiopg-sqli.aiopg-sqli

profile photo of semgrepsemgrep
Author
1,688
Download Count*

Detected string concatenation with a non-literal variable in an aiopg Python SQL statement. This could lead to SQL injection if the variable is user-controlled and not properly sanitized. In order to prevent SQL injection, use parameterized queries instead. You can create parameterized queries like so: 'cur.execute("SELECT %s FROM table", (user_value,))'.

Run Locally

Run in CI

Defintion

rules:
  - id: aiopg-sqli
    languages:
      - python
    message: "Detected string concatenation with a non-literal variable in an aiopg
      Python SQL statement. This could lead to SQL injection if the variable is
      user-controlled and not properly sanitized. In order to prevent SQL
      injection, use parameterized queries instead. You can create parameterized
      queries like so: 'cur.execute(\"SELECT %s FROM table\", (user_value,))'."
    metadata:
      references:
        - https://github.com/aio-libs/aiopg
      category: security
      cwe:
        - "CWE-89: Improper Neutralization of Special Elements used in an SQL
          Command ('SQL Injection')"
      technology:
        - aiopg
      owasp:
        - A01:2017 - Injection
        - A03:2021 - Injection
      cwe2022-top25: true
      cwe2021-top25: true
      subcategory:
        - audit
      likelihood: LOW
      impact: HIGH
      confidence: LOW
      license: Commons Clause License Condition v1.0[LGPL-2.1-only]
      vulnerability_class:
        - SQL Injection
    patterns:
      - pattern-either:
          - patterns:
              - pattern: $CUR.$METHOD(...,$QUERY,...)
              - pattern-either:
                  - pattern-inside: |
                      $QUERY = $X + $Y
                      ...
                  - pattern-inside: |
                      $QUERY += $X
                      ...
                  - pattern-inside: |
                      $QUERY = '...'.format(...)
                      ...
                  - pattern-inside: |
                      $QUERY = '...' % (...)
                      ...
                  - pattern-inside: |
                      $QUERY = f'...{$USERINPUT}...'
                      ...
              - pattern-not-inside: |
                  $QUERY += "..."
                  ...
              - pattern-not-inside: |
                  $QUERY = "..." + "..."
                  ...
              - pattern-not-inside: |
                  $QUERY = '...'.format()
                  ...
              - pattern-not-inside: |
                  $QUERY = '...' % ()
                  ...
          - pattern: $CUR.$METHOD(..., $X + $Y, ...)
          - pattern: $CUR.$METHOD(..., '...'.format(...), ...)
          - pattern: $CUR.$METHOD(..., '...' % (...), ...)
          - pattern: $CUR.$METHOD(..., f'...{$USERINPUT}...', ...)
      - pattern-either:
          - pattern-inside: |
              $CONN = await aiopg.connect(...)
              ...
              $CUR = await $CONN.cursor(...)
              ...
          - pattern-inside: |
              $POOL = await aiopg.create_pool(...)
              ...
              async with $POOL.acquire(...) as $CONN:
                ...
                async with $CONN.cursor(...) as $CUR:
                  ...
          - pattern-inside: |
              $POOL = await aiopg.create_pool(...)
              ...
              with (await $POOL.cursor(...)) as $CUR:
                ...
          - pattern-inside: |
              $POOL = await aiopg.create_pool(...)
              ...
              async with $POOL as $CONN:
                ...
                $CUR = await $CONN.cursor(...)
                ...
          - pattern-inside: |
              $POOL = await aiopg.create_pool(...)
              ...
              async with $POOL.cursor(...) as $CUR:
                ...
      - pattern-not: $CUR.$METHOD(..., "..." + "...", ...)
      - pattern-not: $CUR.$METHOD(..., '...'.format(), ...)
      - pattern-not: $CUR.$METHOD(..., '...'%(), ...)
      - metavariable-regex:
          metavariable: $METHOD
          regex: ^(execute)$
    severity: WARNING

Examples

aiopg-sqli.py

import asyncio
import asyncpg

def bad1():
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    query = "SELECT name FROM users WHERE age=" + req.FormValue("age")
    # ruleid: aiopg-sqli
    await cur.execute(query)

async def bad2():
    pool = await aiopg.create_pool(dsn)

    with (await pool.cursor()) as cur:
        sql_query = 'SELECT * FROM {}'.format(user_input)
        # ruleid: aiopg-sqli
        await cur.execute(sql_query)
        ret = await cur.fetchone()
        assert ret == (1,), ret

async def bad3():
    pool = await aiopg.create_pool(dsn)
    async with pool.acquire() as conn:
        sql_query = 'SELECT * FROM %s'%(user_input)
        async with conn.cursor() as cur:
            # ruleid: aiopg-sqli
            await cur.execute(sql_query)
            ret = []
            async for row in cur:
                ret.append(row)

def bad4(user_input):
    pool = await aiopg.create_pool(dsn)
    async with pool as conn:
        cur = await conn.cursor()
        sql_query = f'SELECT * FROM {user_input}'
        # ruleid: aiopg-sqli
        await cur.execute(sql_query)

def bad5():
    pool = await aiopg.create_pool(dsn)
    async with pool.cursor() as cur:
        # ruleid: aiopg-sqli
        await cur.execute("SELECT name FROM users WHERE age=" + req.FormValue("age"))

def bad6(user_input):
    pool = await aiopg.create_pool(dsn)
    async with pool.cursor() as cur:
        # ruleid: aiopg-sqli
        await cur.execute('SELECT * FROM {}'.format(user_input))

async def bad7(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    # ruleid: aiopg-sqli
    cur.execute('SELECT * FROM %s'%(user_input))

async def bad8(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    # ruleid: aiopg-sqli
    cur.execute(f'SELECT * FROM {user_input}')

async def bad9(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    # ruleid: aiopg-sqli
    cur.execute(
    "insert into %s values (%%s, %%s)" % ext.quote_ident(table_name),[10, 20])

def ok1(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    # ok: aiopg-sqli
    cur.execute("SELECT * FROM test WHERE id = %s", (3,))

def ok2(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    query = "SELECT name FROM users WHERE age=" + "3"
    # ok: aiopg-sqli
    cur.execute(query)

def ok3(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    query = "SELECT name FROM users WHERE age="
    query += "3"
    # ok: aiopg-sqli
    cur.execute(query)

def ok4(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    query = 'SELECT * FROM John'.format()
    # ok: aiopg-sqli
    cur.fetchval(query)

def ok5(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    query = 'SELECT * FROM John'% ()
    # ok: aiopg-sqli
    cur.execute(query)

def ok6(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    query = f'SELECT * FROM John'
    # ok: aiopg-sqli
    cur.execute(query)

def ok7(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    # ok: aiopg-sqli
    cur.execute("SELECT name FROM users WHERE age=" + "3")

def ok8(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    # ok: aiopg-sqli
    cur.execute('SELECT * FROM John'.format())

def ok9(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    # ok: aiopg-sqli
    cur.execute('SELECT * FROM John'% ())

def ok10(user_input):
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    # ok: aiopg-sqli
    cur.execute(f'SELECT * FROM John')