python.sqlalchemy.security.sqlalchemy-execute-raw-query.sqlalchemy-execute-raw-query

profile photo of semgrepsemgrep
Author
6,908
Download Count*

Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Run Locally

Run in CI

Defintion

rules:
  - id: sqlalchemy-execute-raw-query
    message: "Avoiding SQL string concatenation: untrusted input concatenated with
      raw SQL query can result in SQL Injection. In order to execute raw query
      safely, prepared statement should be used. SQLAlchemy provides TextualSQL
      to easily used prepared statement with named parameters. For complex SQL
      composition, use SQL Expression Language or Schema Definition Language. In
      most cases, SQLAlchemy ORM will be a better option."
    metadata:
      cwe:
        - "CWE-89: Improper Neutralization of Special Elements used in an SQL
          Command ('SQL Injection')"
      owasp:
        - A01:2017 - Injection
        - A03:2021 - Injection
      references:
        - https://docs.sqlalchemy.org/en/14/core/tutorial.html#using-textual-sql
        - https://www.tutorialspoint.com/sqlalchemy/sqlalchemy_quick_guide.htm
        - https://docs.sqlalchemy.org/en/14/core/tutorial.html#using-more-specific-text-with-table-expression-literal-column-and-expression-column
      category: security
      technology:
        - sqlalchemy
      cwe2022-top25: true
      cwe2021-top25: true
      subcategory:
        - audit
      likelihood: LOW
      impact: HIGH
      confidence: LOW
      license: Commons Clause License Condition v1.0[LGPL-2.1-only]
      vulnerability_class:
        - SQL Injection
    severity: ERROR
    languages:
      - python
    pattern-either:
      - pattern: |
          $CONNECTION.execute( $SQL + ..., ... )
      - pattern: |
          $CONNECTION.execute( $SQL % (...), ...)
      - pattern: |
          $CONNECTION.execute( $SQL.format(...), ... )
      - pattern: |
          $CONNECTION.execute(f"...{...}...", ...)
      - patterns:
          - pattern-inside: |
              $QUERY = $SQL + ...
              ...
          - pattern: |
              $CONNECTION.execute($QUERY, ...)
      - patterns:
          - pattern-inside: |
              $QUERY = $SQL % (...)
              ...
          - pattern: |
              $CONNECTION.execute($QUERY, ...)
      - patterns:
          - pattern-inside: |
              $QUERY = $SQL.format(...)
              ...
          - pattern: |
              $CONNECTION.execute($QUERY, ...)
      - patterns:
          - pattern-inside: |
              $QUERY = f"...{...}..."
              ...
          - pattern: |
              $CONNECTION.execute($QUERY, ...)

Examples

sqlalchemy-execute-raw-query.py

##########################################################################
# Connectionless query
##########################################################################

# String concatenation using + operator
engine = create_engine('postgresql://user@localhost/database')
echo("database connexion: ok")
# ruleid: sqlalchemy-execute-raw-query
engine.execute("INSERT INTO person (name) VALUES ('" + name + "')")
# ruleid: sqlalchemy-execute-raw-query
engine.execute("INSERT INTO person (name) VALUES ('" + name + "')", multi=False)

# String concatenation using + operator
engine = create_engine('postgresql://user@localhost/database')
# ruleid: sqlalchemy-execute-raw-query
engine.execute("INSERT INTO person (firstname, lastname) VALUES ('" + firstname + "','" + lastname + "')")

# String formating using % operator (old style)
engine = create_engine('postgresql://user@localhost/database')
# ruleid: sqlalchemy-execute-raw-query
engine.execute("INSERT INTO person (name) VALUES ('%s')" % (name))

# String formating (new style)
engine = create_engine('postgresql://user@localhost/database')
# ruleid: sqlalchemy-execute-raw-query
engine.execute("INSERT INTO person (name) VALUES ('{}')".format(name))

# String concatenation using fstrings
engine = create_engine('postgresql://user@localhost/database')
# ruleid: sqlalchemy-execute-raw-query
engine.execute(f"INSERT INTO person (name) VALUES ('{name}')")

# String concatenation using + operator
engine = create_engine('postgresql://user@localhost/database')
query = "INSERT INTO person (name) VALUES ('" + name + "')"
# ruleid: sqlalchemy-execute-raw-query
engine.execute(query)

# String formating using % operator (old style)
engine = create_engine('postgresql://user@localhost/database')
query = "INSERT INTO person (name) VALUES ('%s')" % (name)
# ruleid: sqlalchemy-execute-raw-query
engine.execute(query)

# String formating (new style)
engine = create_engine('postgresql://user@localhost/database')

query = "INSERT INTO person (name) VALUES ('{}')".format(name)
# ruleid: sqlalchemy-execute-raw-query
engine.execute(query)

# String formating using fstrings
engine = create_engine('postgresql://user@localhost/database')

query = f"INSERT INTO person (name) VALUES ('{name}')"
# ruleid: sqlalchemy-execute-raw-query
engine.execute(query)

# fstrings
engine = create_engine('postgresql://user@localhost/database')
query: str = f"INSERT INTO person (name) VALUES ('{name}')"
# ruleid: sqlalchemy-execute-raw-query
engine.execute(query)

# Query without concatenation
# ok: sqlalchemy-execute-raw-query
engine = create_engine('postgresql://user@localhost/database')
engine.execute("INSERT INTO person (name) VALUES ('Frodon Sacquet')")

##########################################################################
# Execute query without "With" block
##########################################################################

# Execute query from string concatenation using + operator
engine = create_engine('postgresql://user@localhost/database')
connection = engine.connect()
# ruleid: sqlalchemy-execute-raw-query
connection.execute("INSERT INTO person (name) VALUES ('" + name + "')")

# Execute query from String formating using % operator (old style)
engine = create_engine('postgresql://user@localhost/database')
connection = engine.connect()
# ruleid: sqlalchemy-execute-raw-query
connection.execute("INSERT INTO person (name) VALUES ('%s')" % (name))

# Execute query from string formating (new style)
engine = create_engine('postgresql://user@localhost/database')
connection = engine.connect()
# ruleid: sqlalchemy-execute-raw-query
connection.execute("INSERT INTO person (name) VALUES ('{}')".format(name))

# Execute query from string concatenation fstrings
engine = create_engine('postgresql://user@localhost/database')
connection = engine.connect()
# ruleid: sqlalchemy-execute-raw-query
connection.execute(f"INSERT INTO person (name) VALUES ('{name}')")

##########################################################################
# Execute query in With block
##########################################################################

# Execute query in With block from String concatenation using + operator
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute("INSERT INTO person (name) VALUES ('" + name + "')")

# Execute query in With block from string formating using % operator (old style)
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute("INSERT INTO person (name) VALUES ('%s')" % (name))

# Execute query in With block from String formating (new style)
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute("INSERT INTO person (name) VALUES ('{}')".format(name))

# Execute query in With block from String concatenation  fstrings
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute(f"INSERT INTO person (name) VALUES ('{name}')")

##########################################################################
# Execute query in With block and using a variable
##########################################################################

# Execute query in With block from variable set by string concatenation using + operator
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    query = "INSERT INTO person (name) VALUES ('" + name + "')"
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute(query)

# Execute query in With block from variable (type) set by String concatenation using + operator
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    query: str = "INSERT INTO person (name) VALUES ('" + name + "')"
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute(query)


# Execute query in With block from variable set by String formating using % operator (old style)
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    query = "INSERT INTO person (name) VALUES ('%s')" % (name)
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute(query)

# Execute query in With block from variable (type) set by String formating using % operator (old style)
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    query: str = "INSERT INTO person (name) VALUES ('%s')" % (name)
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute(query)

# Execute query in With block from variable set by String formating (new style)
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    query = "INSERT INTO person (name) VALUES ('{}')".format(name)
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute(query)

# Execute query in With block from variable (typed) set by String formating (new style)
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    query: str = "INSERT INTO person (name) VALUES ('{}')".format(name)
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute(query)

# Execute query in With block from variable set by String concatenation  fstrings
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    
    query = f"INSERT INTO person (name) VALUES ('{name}')"
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute(query)

# Execute query in With block from variable (typed) set by String concatenation  fstrings
engine = create_engine('postgresql://user@localhost/database')
with engine.connect() as connection:
    
    query: str = f"INSERT INTO person (name) VALUES ('{name}')"
    # ruleid: sqlalchemy-execute-raw-query
    connection.execute(query)

########################################################################

# Query using prepared statement with named parameters
# ok: sqlalchemy-execute-raw-query
engine = create_engine('postgresql://user@localhost/database')
stmt = text("INSERT INTO table (name) VALUES(:name)")
connection.execute(stmt, name='Frodon Sacquet')

# SQL Composition and prepared statement
# ok: sqlalchemy-execute-raw-query
engine = create_engine('postgresql://user@localhost/database')
query = select(literal_column("users.fullname", String) + ', ' + literal_column("addresses.email_address").label("title")).where(and_(literal_column("users.id") == literal_column("addresses.user_id"), text("users.name BETWEEN 'm' AND 'z'"), text("(addresses.email_address LIKE :x OR addresses.email_address LIKE :y)"))).select_from(table('users')).select_from(table('addresses'))
conn.execute(query, {"x":"%@aol.com", "y":"%@msn.com"}).fetchall()


# SQL Composition using SQL Expression
connection_string = 'sqlite:///db.sqlite'
engine = create_engine(connection_string, echo=True)
with engine.connect() as connection:
  meta = MetaData()
  meta.reflect(bind=connection)
  product_table = meta.tables['product']
  # ok: sqlalchemy-execute-raw-query
  stmt = (
    select(product_table)
    .where(product_table.columns[field_name] == value_name)
  )
  result = connection.execute(stmt)

# Insert multi data record using SQL Expression
connection_string = 'sqlite:///db.sqlite'
engine = create_engine(connection_string, echo=True)
with engine.connect() as connection:
  meta = MetaData()
  meta.reflect(bind=connection)
  product_table = meta.tables['product']
  # ok: sqlalchemy-execute-raw-query
  stmt = insert(product_table)
  values = [
      {field_name: 'hazelnut', field_price: 5},
      {field_name: 'banana', field_price: 8}
  ]
  print(stmt)
  connection.execute(stmt, values)

# Insert multi data record using SQL Expression
connection_string = 'sqlite:///db.sqlite'
engine = create_engine(connection_string, echo=True)
with engine.connect() as connection:
  meta = MetaData()
  meta.reflect(bind=connection)
  product_table = meta.tables['product']
  stmt = insert(product_table) + 'test'
  values = [
      {field_name: 'hazelnut', field_price: 5},
      {field_name: 'banana', field_price: 8}
  ]
  # ruleid: sqlalchemy-execute-raw-query
  connection.execute(stmt, values)