Operational Intent
This document defines the exact implementation pattern for consuming PostgreSQL’s native pgoutput logical decoding stream using psycopg2. The operational objective is singular: deploy a deterministic, low-latency WAL stream consumer that decodes binary pgoutput messages into structured change events while maintaining exact LSN progression, preventing replication slot bloat, and guaranteeing zero data loss during network partitions. This workflow targets database engineers, data platform teams, Python ETL developers, and DevOps operators responsible for CDC pipeline automation.
PostgreSQL Parameter Baseline
Before initializing the consumer, the source cluster must enforce strict replication parameters. Deviations cause silent WAL truncation, slot deactivation, or protocol rejection. Values below assume PostgreSQL 14+.
| Parameter | Required Value | Operational Rationale |
|---|---|---|
wal_level |
logical |
Enables logical decoding output. replica or minimal will reject CREATE_REPLICATION_SLOT. Requires full restart. |
max_replication_slots |
≥ 10 |
Reserves shared memory for slot metadata. Insufficient allocation triggers FATAL: too many replication slots. Requires restart. |
max_wal_senders |
≥ 10 |
Caps concurrent WAL streaming processes. Must exceed active logical + physical replication consumers. Requires restart. |
wal_sender_timeout |
60s |
Terminates idle connections. Prevents orphaned slots from holding WAL indefinitely. Reload-safe. |
wal_keep_size |
1GB (minimum) |
Guarantees WAL retention during consumer restarts. Adjust based on peak transaction throughput and checkpoint intervals. Reload-safe. |
Apply changes via ALTER SYSTEM or postgresql.conf, then execute SELECT pg_reload_conf();. Parameters requiring restart must be followed by a controlled cluster bounce. Verify with SHOW wal_level; and SELECT slot_name, active, restart_lsn FROM pg_replication_slots;.
Connection & Slot Lifecycle Management
psycopg2 exposes logical replication through psycopg2.extras.LogicalReplicationConnection. The connection must explicitly request the pgoutput plugin, declare the target slot, and operate in raw binary mode (decode=False) to preserve protocol alignment.
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
DSN = "host=primary-db port=5432 dbname=cdc_source user=replicator password=secure_pass replication=database"
def ensure_slot(slot_name: str, publication: str) -> None:
"""Idempotent slot creation using a regular (non-replication) connection."""
# Use a standard connection for slot management queries
with psycopg2.connect(
"host=primary-db port=5432 dbname=cdc_source user=replicator password=secure_pass"
) as admin_conn:
admin_conn.autocommit = True
with admin_conn.cursor() as cur:
cur.execute(
"SELECT slot_name, active, restart_lsn FROM pg_replication_slots WHERE slot_name = %s",
(slot_name,)
)
row = cur.fetchone()
if row is None:
cur.execute(
"SELECT pg_create_logical_replication_slot(%s, 'pgoutput')",
(slot_name,)
)
return
active = row[1]
if active:
raise RuntimeError(
f"Slot {slot_name} is already active. Terminate the existing consumer first."
)
# Slot exists and is inactive — reuse existing restart_lsn
def init_consumer(slot_name: str, publication: str):
ensure_slot(slot_name, publication)
conn = psycopg2.connect(DSN, connection_factory=LogicalReplicationConnection)
cur = conn.cursor()
cur.start_replication(
slot_name=slot_name,
options={"proto_version": "1", "publication_names": publication},
decode=False # Mandatory for binary pgoutput parsing
)
return conn, cur
The replication=database DSN parameter is mandatory. Omission defaults to standard query mode and raises psycopg2.ProgrammingError. Always verify slot state before invoking start_replication to avoid protocol desync.
stateDiagram-v2 [*] --> BEGIN BEGIN --> RELATION: schema is new or changed BEGIN --> DML: relation already cached RELATION --> DML DML --> DML: INSERT / UPDATE / DELETE DML --> COMMIT COMMIT --> [*]: flush LSN advances
Binary pgoutput Message Decoding
The pgoutput plugin emits a strict binary protocol. Each message begins with a single-byte type identifier, followed by payload bytes (no separate length prefix in the framing the replication protocol gives to consumers — psycopg2 delivers each message as a complete payload bytes object). Parsing requires precise byte alignment and explicit null-terminated string handling.
import struct
import io
from typing import Dict, Any, Generator
class PgOutputParser:
"""Production-grade binary pgoutput decoder aligned with PostgreSQL 14+ protocol."""
@staticmethod
def _read_string(buf: io.BytesIO) -> str:
data = b""
while True:
byte = buf.read(1)
if not byte or byte == b"\x00":
break
data += byte
return data.decode("utf-8", errors="replace")
@staticmethod
def _read_tuple(buf: io.BytesIO, natts: int) -> Dict[str, Any]:
row = {}
for i in range(natts):
flag = buf.read(1)
if flag == b"n":
row[f"col_{i}"] = None # NULL
elif flag == b"u":
row[f"col_{i}"] = None # Unchanged TOAST (requires prior schema context)
elif flag == b"t":
length = struct.unpack("!I", buf.read(4))[0]
value = buf.read(length)
row[f"col_{i}"] = value.decode("utf-8", errors="replace")
return row
@staticmethod
def parse_message(payload: bytes) -> Dict[str, Any]:
"""Parse a single pgoutput message delivered by psycopg2 consume_stream."""
buf = io.BytesIO(payload)
msg_type = buf.read(1)
if not msg_type:
return {}
if msg_type == b"B": # BEGIN
final_lsn, timestamp, xid = struct.unpack("!QqI", buf.read(20))
return {"type": "BEGIN", "final_lsn": final_lsn, "xid": xid, "timestamp": timestamp}
elif msg_type == b"C": # COMMIT
flags = struct.unpack("!B", buf.read(1))[0]
commit_lsn, end_lsn, timestamp = struct.unpack("!QQq", buf.read(24))
return {"type": "COMMIT", "commit_lsn": commit_lsn, "end_lsn": end_lsn}
elif msg_type == b"R": # RELATION
rel_id = struct.unpack("!I", buf.read(4))[0]
namespace = PgOutputParser._read_string(buf)
name = PgOutputParser._read_string(buf)
replica_identity = buf.read(1)
natts = struct.unpack("!H", buf.read(2))[0]
# Column metadata parsing omitted for brevity; map to schema registry in production
return {"type": "RELATION", "rel_id": rel_id, "schema": namespace, "table": name, "natts": natts}
elif msg_type in (b"I", b"U", b"D"): # INSERT, UPDATE, DELETE
rel_id = struct.unpack("!I", buf.read(4))[0]
# Tuple parsing requires schema context; simplified here
return {"type": msg_type.decode(), "rel_id": rel_id, "payload": buf.read().hex()}
return {"type": "unknown", "raw": payload.hex()}
Binary parsing must be stateful in production. Cache RELATION messages to map rel_id to fully qualified table names and column definitions. This eliminates redundant schema lookups and enables deterministic Python CDC Parser Development workflows where schema evolution is tracked externally.
LSN Acknowledgment & Feedback Loop
PostgreSQL retains WAL segments until the consumer acknowledges receipt via standby status update messages. The send_feedback method is called on the cursor object (not the connection). Failure to send periodic feedback allows WAL to accumulate without bound.
import threading
import time
def consume_stream(conn: psycopg2.extensions.connection,
cur,
parser: PgOutputParser):
"""Drive the consume_stream loop and periodically flush the confirmed LSN."""
flush_lsn_holder = [0] # mutable container for thread-safe sharing
def feedback_worker():
while True:
time.sleep(5.0)
lsn = flush_lsn_holder[0]
if lsn == 0:
continue
try:
# send_feedback is called on the cursor, not the connection
cur.send_feedback(
flush_lsn=lsn,
write_lsn=lsn,
apply_lsn=lsn,
reply=True
)
except psycopg2.OperationalError:
break # Connection dropped; let main loop handle reconnect
t = threading.Thread(target=feedback_worker, daemon=True)
t.start()
def handle_msg(msg):
if msg.payload:
event = parser.parse_message(msg.payload)
if event.get("type") == "COMMIT":
flush_lsn_holder[0] = msg.data_start
# Route event to downstream sink here
# consume_stream blocks and calls handle_msg for each replication message
cur.consume_stream(handle_msg)
reply=True requests that the primary immediately acknowledge the feedback, ensuring the replication slot’s confirmed_flush_lsn advances promptly. Never block the feedback thread with downstream I/O; decouple consumption from acknowledgment using an in-memory queue or async event loop.
Production Hardening & Pipeline Integration
Deploying a raw pgoutput consumer requires explicit handling of backpressure, schema drift, and failure recovery. Unlike managed connectors, this pattern places operational responsibility on the engineering team. When designing CDC Pipeline Implementation with Python & Debezium, evaluate whether a lightweight psycopg2 consumer satisfies latency SLAs or if a JVM-based connector with built-in offset management is warranted.
Threshold Tuning & Backpressure: Set wal_sender_timeout to 60s and configure the feedback interval to 5 seconds or less. If downstream sinks (for example, Kafka brokers or Avro schema registries) experience latency, buffer events in a bounded queue. Drop or pause consumption only when queue depth exceeds memory thresholds; never stall the LSN feedback loop.
JSON to Avro Transformation & Event Routing: pgoutput outputs raw column values. Transform these into typed Avro records using a schema registry. Route events by schema.table to dedicated Kafka topics. Enforce idempotency by embedding xid and commit_lsn in the Avro payload, enabling deduplication at the sink.
Fallback Chains & Disaster Recovery: Replication slots are not replicated across standby nodes by default (PostgreSQL 17 adds opt-in slot synchronization). Implement a slot recreation routine that queries pg_stat_replication on failover, identifies the last known restart_lsn, and reattaches the consumer. Archive WAL segments to object storage (archive_mode=on) to guarantee recovery windows during catastrophic slot loss. Monitor pg_replication_slots.wal_status for lost or extended states, which indicate WAL gaps requiring manual intervention.
This architecture delivers sub-100ms latency for high-throughput CDC workloads while maintaining strict PostgreSQL protocol compliance. Validate against your cluster’s checkpoint frequency, transaction volume, and network topology before promoting to production.