Building a Production-Ready Event Store in PostgreSQL: Schema Design, Projections, and Replay
Your microservice just corrupted production data. A race condition in the order processing service double-charged 847 customers, and now you’re staring at a database that only knows the current state of each account. The CFO wants to know exactly which transactions were affected. Legal needs an audit trail. Your on-call engineer is asking the impossible question: “Can we just roll it back?”
You can’t. Your database doesn’t remember what happened—only what is. Every UPDATE statement overwrote the previous truth, leaving you with forensic breadcrumbs scattered across application logs, if you’re lucky enough to have logged the right things.
This is the moment every engineer wishes they had implemented event sourcing from the start.
Event sourcing flips the traditional model: instead of storing current state and discarding history, you store every state change as an immutable event. Your account balance becomes a projection derived from a sequence of deposits, withdrawals, and adjustments. When something goes wrong, you don’t guess—you replay the exact sequence of events that led to corruption, fix the bug, and rebuild state from the corrected event stream.
The catch? Most teams assume event sourcing requires specialized infrastructure: EventStoreDB, Kafka, or some managed cloud service with its own operational overhead and learning curve. But if you’re already running PostgreSQL—and most of us are—you’re sitting on an underutilized event store that handles millions of events without breaking a sweat.
PostgreSQL gives you JSONB for flexible event payloads, partial indexes for projection optimization, table partitioning for managing event growth, and LISTEN/NOTIFY for real-time subscriptions. It’s infrastructure you already understand, operate, and trust in production.
Why PostgreSQL Makes Sense as Your Event Store
When teams adopt event sourcing, the conversation often jumps immediately to specialized databases: EventStoreDB, Kafka, or cloud-native offerings like AWS Kinesis. These tools excel at their intended purpose, but they carry costs that rarely surface during initial architecture discussions.

The Hidden Tax of Specialized Infrastructure
Introducing a dedicated event store means your operations team must master another database with its own backup strategies, failure modes, and scaling characteristics. Your developers need to understand a new query language or API. Your monitoring stack grows. Your on-call runbooks multiply.
For organizations already running PostgreSQL in production, this complexity tax compounds. You now maintain two data persistence layers, synchronize deployments between them, and handle the inevitable consistency questions when one system is available and the other isn’t.
The vendor lock-in concern is equally concrete. EventStoreDB uses its own projection language. Kafka requires specific client libraries and operational patterns. Migrating away from these systems means rebuilding your entire event infrastructure—a project that rarely gets prioritized until it’s urgent.
PostgreSQL’s Event Sourcing Toolkit
PostgreSQL ships with features that address core event sourcing requirements directly:
JSONB stores event payloads with full indexing capability. You can query specific fields within events, create partial indexes on event types, and evolve your event schemas without table migrations.
Table partitioning handles the inevitable growth of event tables. Partition by time range or aggregate ID, and PostgreSQL automatically routes writes and optimizes reads across partitions.
LISTEN/NOTIFY provides built-in pub/sub for event notifications. Projections can subscribe to new events without polling, and you avoid the complexity of external message brokers for internal event distribution.
Advisory locks enable coordination for projection rebuilds and concurrent writes, giving you the primitives needed for exactly-once processing guarantees.
Choosing Your Path
PostgreSQL event sourcing fits well when your event throughput stays under tens of thousands per second, when your team already has PostgreSQL expertise, and when you value operational simplicity over maximum theoretical throughput.
Consider EventStoreDB when you need built-in catch-up subscriptions with automatic checkpointing. Choose Kafka when event streaming across organizational boundaries is your primary use case, or when you’re processing millions of events per second.
💡 Pro Tip: Start with PostgreSQL. The migration path to specialized infrastructure is straightforward if you outgrow it—but many teams never do.
With the architectural decision made, the next step is designing a schema that captures events efficiently while supporting the queries your projections will need.
Designing the Event Store Schema
The events table sits at the heart of your event-sourced system. Every state change flows through it, every audit trail originates from it, and every projection rebuilds from it. Getting this schema right pays dividends across your entire architecture.
The Core Events Table
CREATE TABLE events ( id BIGSERIAL PRIMARY KEY, aggregate_id UUID NOT NULL, aggregate_type VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB DEFAULT '{}', version INTEGER NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT unique_aggregate_version UNIQUE (aggregate_type, aggregate_id, version));Each column serves a specific purpose. The aggregate_id identifies the entity this event belongs to—an order, a user account, a shopping cart. The aggregate_type distinguishes between different domain entities, enabling you to query all events for a specific aggregate type. The version column tracks the sequence of events within each aggregate, starting at 1 and incrementing with each append.
The event_data column stores the actual event payload as JSONB, giving you flexible schema evolution without migrations. You can add new fields to events without altering the table structure, and older events remain queryable alongside newer ones. The metadata column captures cross-cutting concerns: correlation IDs for distributed tracing, causation IDs linking events to their triggers, user IDs for audit purposes, and timestamps from the originating service. Keeping metadata separate from event data maintains a clean separation between domain information and operational concerns.
Optimistic Concurrency Control
The unique_aggregate_version constraint delivers optimistic concurrency control without additional application logic. When two concurrent processes attempt to append events with the same version number, PostgreSQL rejects one with a unique constraint violation. This approach eliminates the need for explicit locking, allowing your system to handle high concurrency while maintaining strict ordering guarantees within each aggregate.
INSERT INTO events (aggregate_id, aggregate_type, event_type, event_data, version)VALUES ( '550e8400-e29b-41d4-a716-446655440000', 'Order', 'OrderPlaced', '{"customer_id": "cust_123", "total": 99.99}'::jsonb, 1);If another transaction already inserted version 1 for this aggregate, this insert fails. Your application catches the constraint violation and either retries with a fresh aggregate state or returns a conflict error to the caller. The retry logic should reload the aggregate from scratch, reapply business validation against the current state, and attempt the append with the correct version number.
💡 Pro Tip: Include the expected current version in your append logic. Load the aggregate, note its version, apply business logic, then append with
version = current_version + 1. This pattern catches stale reads immediately.
Indexing for Performance
Event stores have predictable access patterns: load all events for an aggregate, query events by type for projections, and scan events chronologically for replay. Design indexes around these patterns rather than adding indexes reactively.
-- Primary access pattern: loading aggregate historyCREATE INDEX idx_events_aggregate ON events (aggregate_type, aggregate_id, version);
-- Projection rebuilding: scan by event typeCREATE INDEX idx_events_type_created ON events (event_type, created_at);
-- Global ordering for full replayCREATE INDEX idx_events_created ON events (created_at, id);The composite index on aggregate_type, aggregate_id, and version serves aggregate loading with a single index scan. Including version in the index ensures events return in the correct order without additional sorting. The event type index supports projection builders that need to process specific event types across all aggregates. The global ordering index enables full system replays for disaster recovery or building entirely new projections.
Consider the write amplification tradeoff when adding indexes. Each index adds overhead to every insert operation. For write-heavy event stores, benchmark the impact of additional indexes against your throughput requirements before deploying to production.
Partitioning for Scale
Once your events table grows beyond tens of millions of rows, partition by time range. This keeps individual partitions manageable and enables efficient archival of historical data. Partitioning also improves vacuum performance, as PostgreSQL can process each partition independently.
CREATE TABLE events ( id BIGSERIAL, aggregate_id UUID NOT NULL, aggregate_type VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB DEFAULT '{}', version INTEGER NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (id, created_at), UNIQUE (aggregate_type, aggregate_id, version, created_at)) PARTITION BY RANGE (created_at);
CREATE TABLE events_2024_q1 PARTITION OF events FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');Note that partitioning requires including the partition key in all unique constraints. This tradeoff maintains your concurrency guarantees while enabling partition pruning during queries. Plan your partition creation ahead of time—automate the creation of future partitions to avoid insertion failures when events arrive for a time range without a corresponding partition.
With this schema in place, you have the foundation for reliable event storage. The next challenge is appending events atomically while handling the inevitable conflicts that arise in concurrent systems.
Appending Events with Conflict Detection
The append-only write pattern forms the backbone of event sourcing correctness. Every write operation must verify that no concurrent process has modified the aggregate since you last read it. PostgreSQL’s transaction isolation and constraint enforcement make this straightforward to implement, while psycopg3’s type-safe interfaces ensure your Python code catches errors at development time rather than in production.
Optimistic Concurrency with Expected Version
The core mechanism relies on optimistic locking through version checking. When appending events, you pass the version number you expect the stream to be at. If another process has already written events, your expected version won’t match, and the write fails cleanly. This approach avoids the performance penalties of pessimistic locking while maintaining strict consistency guarantees.
The expected version represents your last known state of the aggregate. Think of it as a lightweight checksum—if the version has changed since you loaded the aggregate, someone else has modified it, and your changes might be based on stale assumptions. Rather than blindly overwriting their changes or silently corrupting state, the system rejects your write and forces you to reload and reconsider.
from dataclasses import dataclassfrom typing import Sequencefrom uuid import UUIDimport psycopgfrom psycopg.rows import class_row
@dataclassclass Event: event_type: str payload: dict metadata: dict | None = None
class ConcurrencyError(Exception): """Raised when expected version doesn't match current stream version.""" pass
class EventStore: def __init__(self, conn: psycopg.Connection): self.conn = conn
def append( self, stream_id: UUID, events: Sequence[Event], expected_version: int, ) -> int: """ Append events to a stream with optimistic concurrency control. Returns the new stream version after appending. """ with self.conn.transaction(): # Lock the stream row and verify version result = self.conn.execute( """ SELECT version FROM event_streams WHERE stream_id = %s FOR UPDATE """, (stream_id,), ).fetchone()
current_version = result[0] if result else 0
if current_version != expected_version: raise ConcurrencyError( f"Expected version {expected_version}, " f"but stream is at {current_version}" )
# Insert all events with sequential versions new_version = current_version for event in events: new_version += 1 self.conn.execute( """ INSERT INTO events ( stream_id, version, event_type, payload, metadata ) VALUES (%s, %s, %s, %s, %s) """, ( stream_id, new_version, event.event_type, psycopg.types.json.Json(event.payload), psycopg.types.json.Json(event.metadata), ), )
# Update stream version self.conn.execute( """ INSERT INTO event_streams (stream_id, version) VALUES (%s, %s) ON CONFLICT (stream_id) DO UPDATE SET version = EXCLUDED.version """, (stream_id, new_version), )
return new_versionThe FOR UPDATE clause acquires a row-level lock on the stream, preventing concurrent transactions from reading stale version numbers. This serializes writes to the same aggregate while allowing parallel writes to different aggregates. The lock is held only for the duration of the transaction, minimizing contention in most workloads.
Note that the version check happens inside the transaction after acquiring the lock. This ordering is critical—checking before locking would create a race condition where two transactions could both read the same version, both acquire locks sequentially, and one would succeed while the other fails unexpectedly.
Retry Logic for Concurrent Conflicts
Concurrency conflicts are expected in high-throughput systems. Rather than failing immediately and bubbling errors up to users, implement retry logic that reloads the aggregate state and reapplies the command. The key insight is that most commands are idempotent with respect to business logic—if the aggregate state hasn’t changed in a way that invalidates the command, reapplying it produces the same result.
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
@retry( retry=retry_if_exception_type(ConcurrencyError), stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.1, max=1),)def handle_command(store: EventStore, stream_id: UUID, command: Command) -> None: # Load current state events = store.load_stream(stream_id) aggregate = rebuild_aggregate(events) current_version = len(events)
# Apply command to generate new events new_events = aggregate.handle(command)
# Attempt to persist store.append(stream_id, new_events, expected_version=current_version)The exponential backoff reduces contention by spreading out retry attempts. Starting with a 100ms delay and capping at 1 second provides a reasonable balance between responsiveness and system stability. For most workloads, conflicts resolve within the first retry as the competing transaction completes.
💡 Pro Tip: Keep retry counts low (2-3 attempts). Persistent conflicts indicate a design problem—either your aggregates are too large or you need to reconsider your domain boundaries.
Multi-Aggregate Transactions
PostgreSQL transactions naturally span multiple stream appends. When a business operation affects several aggregates atomically, wrap the entire operation in a single transaction:
def transfer_funds(store: EventStore, from_id: UUID, to_id: UUID, amount: Decimal): with store.conn.transaction(): store.append(from_id, [Debited(amount)], expected_version=from_version) store.append(to_id, [Credited(amount)], expected_version=to_version)Both appends succeed or neither does. This atomic guarantee across aggregates is something specialized event stores often lack, and it’s a significant advantage of PostgreSQL. You get ACID semantics without building a distributed transaction coordinator or accepting eventual consistency between related aggregates.
However, use multi-aggregate transactions judiciously. They create coupling between aggregates and can become a source of contention. If two aggregates frequently participate in the same transactions, consider whether they should be merged into a single aggregate or whether eventual consistency through downstream event processing would better serve your requirements.
With reliable event appending in place, the next challenge is making that data queryable. Projections transform your event streams into read-optimized views that support efficient queries.
Building Projections for Query Performance
Event sourcing separates the write path (appending events) from the read path (querying current state). While events capture what happened, projections transform that history into query-optimized read models. This separation lets you design read models specifically for your query patterns without compromising the integrity of your event log.

The key insight is that your event store serves as the single source of truth, while projections are derived, disposable views that can be rebuilt at any time. This means you can create multiple projections from the same event stream—one optimized for dashboard queries, another for search, and a third for reporting—each structured exactly as its consumers need.
Synchronous Projections: Consistency Within Transactions
The simplest projection strategy updates read models within the same transaction that appends events. This guarantees that queries always reflect the latest committed events—no eventual consistency to reason about.
import psycopgfrom psycopg.rows import dict_row
def apply_order_event(conn: psycopg.Connection, event: dict) -> None: """Update the orders read model based on the event type."""
if event["event_type"] == "OrderCreated": conn.execute(""" INSERT INTO orders_view (order_id, customer_id, status, total, created_at) VALUES (%(aggregate_id)s, %(customer_id)s, 'pending', 0, %(occurred_at)s) """, {**event, **event["payload"]})
elif event["event_type"] == "ItemAdded": conn.execute(""" UPDATE orders_view SET total = total + %(price)s * %(quantity)s WHERE order_id = %(aggregate_id)s """, {**event, **event["payload"]})
elif event["event_type"] == "OrderConfirmed": conn.execute(""" UPDATE orders_view SET status = 'confirmed' WHERE order_id = %(aggregate_id)s """, event)
def append_event_with_projection(conn: psycopg.Connection, event: dict) -> None: """Append event and update projection in a single transaction."""
with conn.transaction(): conn.execute(""" INSERT INTO events (aggregate_type, aggregate_id, event_type, payload, occurred_at) VALUES (%(aggregate_type)s, %(aggregate_id)s, %(event_type)s, %(payload)s, %(occurred_at)s) """, event)
apply_order_event(conn, event)Synchronous projections work well when you have a small number of read models and projection logic executes quickly. The tradeoff: write latency increases with each projection, and a projection bug can block event appends entirely. For this reason, many teams start with synchronous projections during early development, then migrate to asynchronous processing as the system matures and projection count grows.
Asynchronous Projections: Scaling with LISTEN/NOTIFY
For systems with multiple projections or expensive transformations, decouple projection updates from event writes using PostgreSQL’s built-in pub/sub mechanism. This approach accepts eventual consistency in exchange for better write performance and fault isolation—a failing projection worker cannot block event appends.
First, configure your events table to broadcast new events:
CREATE OR REPLACE FUNCTION notify_new_event() RETURNS TRIGGER AS $$BEGIN PERFORM pg_notify('new_events', json_build_object( 'event_id', NEW.event_id, 'aggregate_type', NEW.aggregate_type, 'aggregate_id', NEW.aggregate_id, 'event_type', NEW.event_type )::text); RETURN NEW;END;$$ LANGUAGE plpgsql;
CREATE TRIGGER events_notify_trigger AFTER INSERT ON events FOR EACH ROW EXECUTE FUNCTION notify_new_event();Then run projection workers that listen for notifications:
import psycopgimport selectimport json
def run_projection_worker(dsn: str, projection_name: str, apply_fn: callable) -> None: """Process events asynchronously using LISTEN/NOTIFY."""
with psycopg.connect(dsn, autocommit=True) as conn: conn.execute("LISTEN new_events")
# Process any events missed while worker was down process_pending_events(conn, projection_name, apply_fn)
while True: if select.select([conn.fileno()], [], [], timeout=30.0)[0]: conn.execute("SELECT 1") # Consume notifications for notify in conn.notifies(): event_meta = json.loads(notify.payload) process_single_event(conn, projection_name, event_meta["event_id"], apply_fn)The timeout in the select call serves dual purposes: it prevents the connection from going stale and provides a natural interval for checking whether the worker should shut down gracefully.
💡 Pro Tip: LISTEN/NOTIFY notifications are not persisted. If your worker restarts or misses a notification, the checkpoint mechanism described below ensures no events are lost.
Maintaining Projection Checkpoints
Every async projection needs a checkpoint—the ID of the last successfully processed event. This enables reliable restarts and full rebuilds. Without checkpoints, you would have no way to know where processing left off after a worker crash or deployment.
def process_pending_events(conn: psycopg.Connection, projection_name: str, apply_fn: callable) -> None: """Process all events after the projection's checkpoint."""
checkpoint = conn.execute( "SELECT last_event_id FROM projection_checkpoints WHERE projection_name = %s", (projection_name,) ).fetchone()
last_id = checkpoint[0] if checkpoint else 0
events = conn.execute(""" SELECT * FROM events WHERE event_id > %s ORDER BY event_id """, (last_id,)).fetchall()
for event in events: with conn.transaction(): apply_fn(conn, dict(event)) conn.execute(""" INSERT INTO projection_checkpoints (projection_name, last_event_id, updated_at) VALUES (%s, %s, NOW()) ON CONFLICT (projection_name) DO UPDATE SET last_event_id = EXCLUDED.last_event_id, updated_at = NOW() """, (projection_name, event["event_id"]))The checkpoint update happens within the same transaction as the projection update. If either fails, both roll back, preventing the projection from advancing past events it hasn’t actually processed. This atomic update guarantees exactly-once semantics for projection processing, even in the face of crashes or network failures.
Storing checkpoints per projection lets you rebuild individual read models independently. When requirements change or bugs corrupt data, you reset that projection’s checkpoint to zero and let it reprocess the full event history. This rebuild capability is one of event sourcing’s most powerful features—your projections become truly disposable, and schema migrations transform from risky deployments into routine operations. The next section covers the mechanics of replaying events to leverage this capability effectively.
Event Replay and Projection Rebuilding
Projections are disposable by design. When you discover a bug in your projection logic, add new fields to a read model, or need to create an entirely new view of your data, you rebuild from the authoritative event stream. This capability transforms what would be a data migration nightmare in traditional systems into a straightforward replay operation. Unlike schema migrations that require careful orchestration and rollback plans, projection rebuilds are inherently safe—the source events remain immutable, and you can rebuild as many times as needed until the result is correct.
When Rebuilding Becomes Necessary
Several scenarios trigger projection rebuilds: fixing calculation errors in existing projections, adding denormalized fields that require historical data, creating new read models for features that didn’t exist at launch, or recovering from corrupted projection state. The event stream remains your single source of truth—projections are merely cached computations.
Consider a common example: your initial order summary projection tracks only order totals, but a new reporting requirement demands breakdowns by product category. In a traditional system, you’d write a complex migration script to backfill this data, likely requiring downtime and careful testing. With event sourcing, you simply update your projection handler to extract category information from the original OrderPlaced events and rebuild. The historical data was always there—you just weren’t capturing it before.
Batch Processing for Large Event Streams
Naive replay implementations load all events into memory and process them sequentially. This approach fails spectacularly with millions of events. Instead, use cursor-based batching that maintains consistent memory usage regardless of stream size.
import psycopgfrom psycopg.rows import dict_row
class ProjectionRebuilder: def __init__(self, conn_string: str, batch_size: int = 1000): self.conn_string = conn_string self.batch_size = batch_size
def rebuild(self, projection_name: str, handler): with psycopg.connect(self.conn_string, row_factory=dict_row) as conn: last_position = 0
while True: with conn.cursor() as cur: cur.execute(""" SELECT event_id, stream_id, event_type, payload, metadata, global_position FROM events WHERE global_position > %s ORDER BY global_position LIMIT %s """, (last_position, self.batch_size))
events = cur.fetchall() if not events: break
for event in events: handler.apply(event) last_position = event['global_position']
conn.commit() print(f"Processed up to position {last_position}")The global_position column serves as your cursor, enabling resumable replays if the process crashes mid-rebuild. This design also opens the door to parallelization—you can partition the event stream by position ranges and process multiple batches concurrently, though you must ensure handlers for the same aggregate process events in order.
Idempotent Event Handlers
Replays mean events get processed multiple times. Your handlers must produce identical results whether an event is applied once or a hundred times. Use upserts with deterministic keys derived from event data rather than auto-generated identifiers that would produce different values on each replay.
class OrderProjectionHandler: def __init__(self, conn): self.conn = conn
def apply(self, event: dict): if event['event_type'] == 'OrderPlaced': payload = event['payload'] with self.conn.cursor() as cur: cur.execute(""" INSERT INTO order_summaries (order_id, customer_id, total, status, placed_at) VALUES (%s, %s, %s, 'placed', %s) ON CONFLICT (order_id) DO UPDATE SET customer_id = EXCLUDED.customer_id, total = EXCLUDED.total, placed_at = EXCLUDED.placed_at """, (payload['order_id'], payload['customer_id'], payload['total'], event['metadata']['timestamp']))The ON CONFLICT clause ensures repeated application converges to the same state. Avoid side effects like sending emails or incrementing counters in replay handlers—either skip these operations during rebuilds using a flag, or track which events have already triggered side effects in a separate table.
Blue-Green Projection Deployment
Production systems cannot tolerate read model downtime during rebuilds. The blue-green pattern solves this elegantly: build the new projection in a separate table while the current one continues serving queries, then swap atomically. This approach also provides a natural rollback path—if the new projection contains errors, you still have the original table available.
def rebuild_with_zero_downtime(conn, rebuilder, handler_class): with conn.cursor() as cur: # Create staging table with identical schema cur.execute(""" CREATE TABLE order_summaries_staging (LIKE order_summaries INCLUDING ALL) """)
# Rebuild into staging table staging_handler = handler_class(conn, table='order_summaries_staging') rebuilder.rebuild('order_summaries', staging_handler)
# Atomic swap with conn.cursor() as cur: cur.execute(""" BEGIN; ALTER TABLE order_summaries RENAME TO order_summaries_old; ALTER TABLE order_summaries_staging RENAME TO order_summaries; DROP TABLE order_summaries_old; COMMIT; """)💡 Pro Tip: Track the
last_processed_positionin a metadata table for each projection. After swapping, your live projection handler continues from exactly where the rebuild finished, ensuring no events are missed or duplicated during the transition window.
Rebuilds on large event streams can take hours. For systems processing thousands of events per second, you’ll need strategies to reduce replay time—which brings us to snapshots and performance optimization techniques.
Snapshots and Performance Optimization
Event sourcing trades write complexity for read flexibility, but this trade-off has a cost: aggregate reconstruction time grows linearly with event count. An order aggregate with 10,000 events—common in high-frequency trading or IoT systems—requires loading and applying every event on each read. Snapshots solve this by periodically capturing aggregate state, allowing reconstruction from a known point rather than the beginning.
The snapshot pattern follows a simple principle: store the fully computed aggregate state at a specific event version. When reconstructing, load the most recent snapshot and replay only the events that occurred after it. This transforms O(n) reconstruction into O(k), where k is the number of events since the last snapshot—typically a small, bounded value.
The Snapshot Schema
Store snapshots in a dedicated table that references the event stream:
CREATE TABLE snapshots ( aggregate_id UUID NOT NULL, aggregate_type TEXT NOT NULL, version BIGINT NOT NULL, state JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (aggregate_id, version));
CREATE INDEX idx_snapshots_latest ON snapshots (aggregate_id, version DESC);The version field corresponds to the event version at which the snapshot was taken. This creates an unambiguous recovery point—load the snapshot, then replay only events with versions greater than the snapshot version. The descending index on version ensures that fetching the latest snapshot remains efficient regardless of how many historical snapshots exist.
Consider retaining multiple snapshots rather than just the latest. Historical snapshots enable point-in-time debugging and provide fallback options if a snapshot becomes corrupted. A retention policy that keeps the last three snapshots per aggregate balances storage costs against operational flexibility.
Loading Aggregates with Snapshots
The reconstruction query becomes a two-phase operation:
WITH latest_snapshot AS ( SELECT state, version FROM snapshots WHERE aggregate_id = $1 ORDER BY version DESC LIMIT 1),subsequent_events AS ( SELECT event_type, payload, version FROM events WHERE aggregate_id = $1 AND version > COALESCE((SELECT version FROM latest_snapshot), 0) ORDER BY version ASC)SELECT ls.state AS snapshot_state, ls.version AS snapshot_version, COALESCE(json_agg(se.*) FILTER (WHERE se.version IS NOT NULL), '[]') AS eventsFROM latest_snapshot lsFULL OUTER JOIN subsequent_events se ON trueGROUP BY ls.state, ls.version;Your application layer deserializes the snapshot state, then applies each subsequent event. For an aggregate with 10,000 events and snapshots every 100 versions, reconstruction now processes at most 99 events instead of 10,000. The FULL OUTER JOIN handles aggregates that have no snapshots yet, returning all events from the beginning.
This query pattern also degrades gracefully. New aggregates with few events incur minimal overhead from the snapshot lookup, while mature aggregates with thousands of events see dramatic performance improvements.
Automated Snapshot Creation
Trigger-based snapshot creation keeps the logic in the database layer:
CREATE OR REPLACE FUNCTION create_snapshot_if_needed()RETURNS TRIGGER AS $$DECLARE snapshot_interval CONSTANT INT := 100; last_snapshot_version BIGINT;BEGIN SELECT COALESCE(MAX(version), 0) INTO last_snapshot_version FROM snapshots WHERE aggregate_id = NEW.aggregate_id;
IF NEW.version - last_snapshot_version >= snapshot_interval THEN -- Application must populate this via a separate call -- since aggregate state computation requires business logic INSERT INTO snapshot_queue (aggregate_id, triggered_at) VALUES (NEW.aggregate_id, NOW()) ON CONFLICT (aggregate_id) DO NOTHING; END IF;
RETURN NEW;END;$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_snapshot_checkAFTER INSERT ON eventsFOR EACH ROW EXECUTE FUNCTION create_snapshot_if_needed();💡 Pro Tip: The trigger queues snapshot creation rather than performing it inline. Computing aggregate state requires business logic that belongs in your application layer, not in database functions. A background worker processes the queue asynchronously, ensuring that snapshot creation never blocks event appends.
Choose snapshot intervals based on your read/write ratio. Write-heavy aggregates benefit from less frequent snapshots (every 500 events), while read-heavy aggregates warrant more aggressive snapshotting (every 50 events). Monitor reconstruction times and adjust accordingly—the goal is keeping reconstruction under a consistent latency threshold, typically 50-100 milliseconds for interactive applications.
Snapshot versioning also matters for schema evolution. When your aggregate structure changes, existing snapshots may become incompatible. Include a schema version in your snapshot metadata and implement migration logic that can upgrade snapshots or fall back to full replay when necessary.
With snapshots handling performance at scale, the remaining challenge is operational: managing schema evolution, handling failed projections, and avoiding the pitfalls that trip up production event-sourced systems.
Operational Considerations and Common Pitfalls
A well-designed event store schema gets you started, but production longevity demands attention to evolution, compliance, and operational hygiene. This section covers the maintenance realities that separate prototype implementations from systems that run reliably for years.
Schema Evolution Strategies
Events are immutable, but your understanding of the domain evolves. When you need to add fields to existing event types, the safest approach treats all new fields as optional with sensible defaults. Your projection code reads events from day one alongside events from today, so it must handle both gracefully.
Breaking changes require upcasting—transforming old event shapes into new ones during replay. Implement this as a pipeline of version-specific transformers that run before your projection handlers see the event. Store a schema_version field in each event’s metadata to route events through the appropriate upcaster chain. Never modify stored events directly; the append-only guarantee is foundational to event sourcing’s auditability benefits.
Monitoring Your Event Store
Track three categories of metrics. First, ingestion health: events appended per second, append latency percentiles, and optimistic concurrency conflict rates. A spike in conflicts often indicates aggregate boundaries that are too broad. Second, projection lag: the gap between the latest event sequence and each projection’s processed sequence. Alert when lag exceeds your consistency SLA. Third, storage growth: events per day, average event size, and table bloat from PostgreSQL’s MVCC overhead.
Set up alerts for projection lag exceeding five minutes, conflict rates above 5%, and any projection that stops advancing entirely. Dead projections are silent failures that compound into data inconsistency.
GDPR Compliance Through Crypto-Shredding
Deleting events breaks replay, but regulations demand data erasure. Crypto-shredding solves this: encrypt personal data fields with a per-user key stored separately. When a user requests deletion, destroy their encryption key. The events remain intact for replay, but personal data becomes unrecoverable ciphertext. Store only the encrypted payload in the event; keep the key mapping in a dedicated table with its own retention policies.
Common Mistakes to Avoid
Oversized events signal a design problem. Events exceeding 100KB typically contain data that belongs in a separate document store, referenced by ID. Large events slow replay, bloat WAL shipping, and strain your backup infrastructure.
Missing aggregate boundaries create artificial contention. If unrelated operations conflict on the same aggregate, you’ve drawn the boundary too wide. Split aggregates along true consistency requirements, not convenience.
Chatty event streams—dozens of granular events for a single user action—inflate storage and complicate debugging. Favor fewer, semantically meaningful events over mechanical state-change recordings.
With operational practices established, you have a complete PostgreSQL event store implementation ready for production workloads.
Key Takeaways
- Start with a simple events table using JSONB and add partitioning only when your event volume exceeds millions per month
- Always implement version-based optimistic concurrency—the complexity cost is minimal compared to debugging race conditions in production
- Build projections as separate, rebuildable read models from day one, even if you start with synchronous updates inside transactions
- Use LISTEN/NOTIFY for lightweight async projections before reaching for message queues—PostgreSQL can handle more than you expect