Batch Inserting Millions of Rows with SQLAlchemy core.execute
To batch insert millions of rows using SQLAlchemy 2.0 Core, initialize an AsyncEngine with create_async_engine(), wrap operations in an async with engine.connect() transaction block, and pass chunked dictionaries directly to await conn.execute(stmt, chunk). SQLAlchemy 2.0 automatically detects sequences of dictionaries and delegates execution to the underlying DBAPI's executemany implementation. Always call await conn.commit() explicitly before the context manager exits to prevent silent rollbacks. This execution model aligns with established Advanced Query Patterns and Bulk Data Operations for scalable, production-grade data pipelines.
Step-by-Step: Memory-Safe Chunking & Async Execution
Accumulating millions of rows in Python lists triggers immediate MemoryError exceptions. Replace list accumulation with generator functions that yield row dictionaries on-demand. Use itertools.islice() to materialize fixed-size chunks (10,000–50,000 rows) per iteration. Wrap the execution loop in try/except to guarantee transactional integrity, and disable result fetching via execution_options(no_returning=True) to prevent RAM exhaustion from generated primary keys.
Memory-Safe Data Streaming Pattern
from typing import Iterator, Dict, Any
import csv
def stream_rows_from_source(filepath: str) -> Iterator[Dict[str, Any]]:
"""Yield dictionaries row-by-row to prevent OOM on multi-million datasets."""
with open(filepath, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
yield dict(row) # Ensure mutable dict copy if needed
Async Core Execute with Generator Chunking
import asyncio
from itertools import islice
from typing import Iterator, Dict, Any
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy import Table, insert
async def execute_chunked_inserts(
engine: AsyncEngine,
table: Table,
data_stream: Iterator[Dict[str, Any]],
chunk_size: int = 25000
) -> None:
"""Execute bulk inserts using async connection and generator slicing."""
stmt = insert(table)
chunk = list(islice(data_stream, chunk_size))
async with engine.connect() as conn:
try:
while chunk:
# SQLAlchemy 2.0 auto-detects sequences and uses executemany
await conn.execute(stmt, chunk)
chunk = list(islice(data_stream, chunk_size))
await conn.commit()
except Exception:
await conn.rollback()
raise
Niche Optimization: Driver Tuning & Connection Pooling
Driver-level configuration drastically impacts throughput and memory stability during massive async inserts. For asyncpg, disable the prepared statement cache to prevent unbounded memory growth during high-volume parameter binding. For psycopg, enable native multi-row VALUES syntax generation to reduce network round-trips. Tune pool_size and max_overflow to accommodate concurrent worker processes without exhausting database connection limits. For deeper connection reuse strategies and transaction batching, reference High-Performance Bulk Inserts and Updates.
Driver-Specific Execution Options
from sqlalchemy.ext.asyncio import create_async_engine
# asyncpg: Bypass cache memory leaks during massive inserts
asyncpg_engine = create_async_engine(
"postgresql+asyncpg://user:pass@host/db",
prepared_statement_cache_size=0,
pool_size=20,
max_overflow=10,
execution_options={"no_returning": True}
)
# psycopg: Native multi-row INSERT syntax generation
psycopg_engine = create_async_engine(
"postgresql+psycopg://user:pass@host/db",
executemany_mode="values",
pool_size=20,
max_overflow=10,
execution_options={"no_returning": True}
)
Error Resolution: Common Async & Bulk Insert Failures
| Error | Root Cause | Production Fix |
|---|---|---|
InterfaceError: connection already closed | Transaction scope exits before commit. | Ensure await conn.commit() executes explicitly before __aexit__. |
MemoryError | Loading full dataset into Python memory. | Cap chunk size at 10k–50k and stream data via generators. |
StatementError: (psycopg2.errors.UndefinedTable) | Schema mismatch or sync fallback in async loop. | Verify table metadata is reflected/created and dialect is initialized with create_async_engine(). |
asyncpg.exceptions.TooManyConnectionsError | Connection pool exhaustion under concurrent load. | Adjust create_async_engine(pool_size=20, max_overflow=10) and implement backoff retries. |
Critical Pitfalls to Avoid
- Synchronous
execute()in async loops: Callingconn.execute()withoutawaitor using synchronous engines insideasynciotriggersRuntimeError: cannot run in event loop. - Omitted
await conn.commit(): SQLAlchemy 2.0 does not auto-commit on context exit. Missing explicit commits cause silent transaction rollbacks. - Single-list parameter dumping: Passing millions of rows as one sequence exceeds DBAPI parameter limits (e.g., PostgreSQL's 65,535 cap). Always chunk.
- Enabling
echo=Truein production: Serializes millions of parameter bindings tostdout, causing I/O bottlenecks and process crashes. - Neglecting
no_returning=True: Forces SQLAlchemy to fetch generated IDs for every row, exhausting application RAM and doubling network latency.
FAQ
Does SQLAlchemy 2.0 core.execute automatically batch rows?
Yes. Passing a sequence of dictionaries triggers executemany=True implicitly. SQLAlchemy delegates batch execution to the underlying DBAPI driver's optimized bulk path.
What is the optimal chunk size for millions of rows? Between 10,000 and 50,000 rows per chunk. This range balances network round-trip overhead, transaction log growth, and Python memory allocation without hitting driver parameter limits.
How do I handle auto-incrementing IDs during async bulk inserts?
Use execution_options={"no_returning": True} to skip fetching generated IDs, or rely on database-level sequences. Fetching millions of RETURNING values negates bulk performance gains.
Can I use INSERT ... ON CONFLICT with async core.execute?
Yes. Construct the statement using insert(table).on_conflict_do_nothing() or on_conflict_do_update(), then pass the compiled statement to await conn.execute() with chunked dictionaries.