Batch Inserting Millions of Rows with SQLAlchemy core.execute

To batch insert millions of rows using SQLAlchemy 2.0 Core, initialize an AsyncEngine with create_async_engine(), wrap operations in an async with engine.connect() transaction block, and pass chunked dictionaries directly to await conn.execute(stmt, chunk). SQLAlchemy 2.0 automatically detects sequences of dictionaries and delegates execution to the underlying DBAPI's executemany implementation. Always call await conn.commit() explicitly before the context manager exits to prevent silent rollbacks. This execution model aligns with established Advanced Query Patterns and Bulk Data Operations for scalable, production-grade data pipelines.

Step-by-Step: Memory-Safe Chunking & Async Execution

Accumulating millions of rows in Python lists triggers immediate MemoryError exceptions. Replace list accumulation with generator functions that yield row dictionaries on-demand. Use itertools.islice() to materialize fixed-size chunks (10,000–50,000 rows) per iteration. Wrap the execution loop in try/except to guarantee transactional integrity, and disable result fetching via execution_options(no_returning=True) to prevent RAM exhaustion from generated primary keys.

Memory-Safe Data Streaming Pattern

from typing import Iterator, Dict, Any
import csv

def stream_rows_from_source(filepath: str) -> Iterator[Dict[str, Any]]:
 """Yield dictionaries row-by-row to prevent OOM on multi-million datasets."""
 with open(filepath, "r", encoding="utf-8") as f:
 reader = csv.DictReader(f)
 for row in reader:
 yield dict(row) # Ensure mutable dict copy if needed

Async Core Execute with Generator Chunking

import asyncio
from itertools import islice
from typing import Iterator, Dict, Any
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy import Table, insert

async def execute_chunked_inserts(
 engine: AsyncEngine,
 table: Table,
 data_stream: Iterator[Dict[str, Any]],
 chunk_size: int = 25000
) -> None:
 """Execute bulk inserts using async connection and generator slicing."""
 stmt = insert(table)
 chunk = list(islice(data_stream, chunk_size))

 async with engine.connect() as conn:
 try:
 while chunk:
 # SQLAlchemy 2.0 auto-detects sequences and uses executemany
 await conn.execute(stmt, chunk)
 chunk = list(islice(data_stream, chunk_size))
 await conn.commit()
 except Exception:
 await conn.rollback()
 raise

Niche Optimization: Driver Tuning & Connection Pooling

Driver-level configuration drastically impacts throughput and memory stability during massive async inserts. For asyncpg, disable the prepared statement cache to prevent unbounded memory growth during high-volume parameter binding. For psycopg, enable native multi-row VALUES syntax generation to reduce network round-trips. Tune pool_size and max_overflow to accommodate concurrent worker processes without exhausting database connection limits. For deeper connection reuse strategies and transaction batching, reference High-Performance Bulk Inserts and Updates.

Driver-Specific Execution Options

from sqlalchemy.ext.asyncio import create_async_engine

# asyncpg: Bypass cache memory leaks during massive inserts
asyncpg_engine = create_async_engine(
 "postgresql+asyncpg://user:pass@host/db",
 prepared_statement_cache_size=0,
 pool_size=20,
 max_overflow=10,
 execution_options={"no_returning": True}
)

# psycopg: Native multi-row INSERT syntax generation
psycopg_engine = create_async_engine(
 "postgresql+psycopg://user:pass@host/db",
 executemany_mode="values",
 pool_size=20,
 max_overflow=10,
 execution_options={"no_returning": True}
)

Error Resolution: Common Async & Bulk Insert Failures

ErrorRoot CauseProduction Fix
InterfaceError: connection already closedTransaction scope exits before commit.Ensure await conn.commit() executes explicitly before __aexit__.
MemoryErrorLoading full dataset into Python memory.Cap chunk size at 10k–50k and stream data via generators.
StatementError: (psycopg2.errors.UndefinedTable)Schema mismatch or sync fallback in async loop.Verify table metadata is reflected/created and dialect is initialized with create_async_engine().
asyncpg.exceptions.TooManyConnectionsErrorConnection pool exhaustion under concurrent load.Adjust create_async_engine(pool_size=20, max_overflow=10) and implement backoff retries.

Critical Pitfalls to Avoid

  • Synchronous execute() in async loops: Calling conn.execute() without await or using synchronous engines inside asyncio triggers RuntimeError: cannot run in event loop.
  • Omitted await conn.commit(): SQLAlchemy 2.0 does not auto-commit on context exit. Missing explicit commits cause silent transaction rollbacks.
  • Single-list parameter dumping: Passing millions of rows as one sequence exceeds DBAPI parameter limits (e.g., PostgreSQL's 65,535 cap). Always chunk.
  • Enabling echo=True in production: Serializes millions of parameter bindings to stdout, causing I/O bottlenecks and process crashes.
  • Neglecting no_returning=True: Forces SQLAlchemy to fetch generated IDs for every row, exhausting application RAM and doubling network latency.

FAQ

Does SQLAlchemy 2.0 core.execute automatically batch rows? Yes. Passing a sequence of dictionaries triggers executemany=True implicitly. SQLAlchemy delegates batch execution to the underlying DBAPI driver's optimized bulk path.

What is the optimal chunk size for millions of rows? Between 10,000 and 50,000 rows per chunk. This range balances network round-trip overhead, transaction log growth, and Python memory allocation without hitting driver parameter limits.

How do I handle auto-incrementing IDs during async bulk inserts? Use execution_options={"no_returning": True} to skip fetching generated IDs, or rely on database-level sequences. Fetching millions of RETURNING values negates bulk performance gains.

Can I use INSERT ... ON CONFLICT with async core.execute? Yes. Construct the statement using insert(table).on_conflict_do_nothing() or on_conflict_do_update(), then pass the compiled statement to await conn.execute() with chunked dictionaries.