Aggregates & Event Sourcing
Event sourcing flips the usual persistence model on its head: instead of storing the current state of an entity, you store the full sequence of events that produced it. State becomes a derived value — replay the events and you rebuild the aggregate exactly. NestJS supports this directly through the AggregateRoot base class in @nestjs/cqrs, which records domain events, hands them to the EventBus, and lets you reconstruct an aggregate from history. This gives you a perfect audit log, time-travel debugging, and read models that can be rebuilt at will.
The AggregateRoot base class
An aggregate is a consistency boundary — a cluster of objects treated as a single unit for data changes. In CQRS, you extend AggregateRoot so the object can record the events it produces. Two methods drive the pattern: apply(event) records an event and mutates internal state, and commit() flushes those recorded events to the EventBus so handlers and sagas can react.
// user/user.aggregate.ts
import { AggregateRoot } from '@nestjs/cqrs';
import { UserCreatedEvent } from './events/user-created.event';
import { EmailChangedEvent } from './events/email-changed.event';
export class UserAggregate extends AggregateRoot {
private id!: string;
private email!: string;
private active = false;
// Factory: a brand-new aggregate producing its first event.
static create(id: string, email: string): UserAggregate {
const user = new UserAggregate();
user.apply(new UserCreatedEvent(id, email));
return user;
}
changeEmail(newEmail: string): void {
if (newEmail === this.email) return;
this.apply(new EmailChangedEvent(this.id, newEmail));
}
// Event handlers: onEventName is auto-invoked by apply().
onUserCreatedEvent(event: UserCreatedEvent): void {
this.id = event.id;
this.email = event.email;
this.active = true;
}
onEmailChangedEvent(event: EmailChangedEvent): void {
this.email = event.email;
}
get snapshot() {
return { id: this.id, email: this.email, active: this.active };
}
}
When you call apply, the library looks for an on<EventClassName> method and invokes it to update state. The event is also queued internally until you commit().
The handler naming convention is strict:
apply(new EmailChangedEvent(...))looks foronEmailChangedEvent. A typo means state silently never updates — the event still publishes, but your in-memory aggregate is wrong.
Events are immutable facts
Where commands are imperatives, events are past-tense records of something that already happened. They should be small, serialisable, and never change once written.
// user/events/user-created.event.ts
import { IEvent } from '@nestjs/cqrs';
export class UserCreatedEvent implements IEvent {
constructor(
public readonly id: string,
public readonly email: string,
) {}
}
// user/events/email-changed.event.ts
import { IEvent } from '@nestjs/cqrs';
export class EmailChangedEvent implements IEvent {
constructor(
public readonly id: string,
public readonly email: string,
) {}
}
Hydrating from the EventPublisher
To get event publishing wired into an aggregate you constructed yourself, wrap it with EventPublisher.mergeObjectContext. This injects the publish plumbing so commit() actually reaches the EventBus. Inside a command handler the flow is: load or create the aggregate, merge it, perform the operation, then commit.
// user/commands/create-user.handler.ts
import { CommandHandler, ICommandHandler, EventPublisher } from '@nestjs/cqrs';
import { randomUUID } from 'node:crypto';
import { CreateUserCommand } from './create-user.command';
import { UserAggregate } from '../user.aggregate';
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
constructor(private readonly publisher: EventPublisher) {}
async execute(command: CreateUserCommand): Promise<{ id: string }> {
const id = randomUUID();
const user = this.publisher.mergeObjectContext(
UserAggregate.create(id, command.email),
);
user.commit(); // flushes UserCreatedEvent to the EventBus
return { id };
}
}
Rebuilding state by replaying events
The heart of event sourcing is reconstruction. Given a stored stream of events for one aggregate, you create an empty aggregate and replay each event through apply — but during replay you do not want to re-publish those events, so use loadFromHistory, which applies events without queueing them for commit.
// user/user.repository.ts
import { Injectable } from '@nestjs/common';
import { EventPublisher, IEvent } from '@nestjs/cqrs';
import { UserAggregate } from './user.aggregate';
import { EventStore } from './event-store.service';
@Injectable()
export class UserRepository {
constructor(
private readonly store: EventStore,
private readonly publisher: EventPublisher,
) {}
async load(aggregateId: string): Promise<UserAggregate | null> {
const events: IEvent[] = await this.store.read(aggregateId);
if (events.length === 0) return null;
const user = new UserAggregate();
user.loadFromHistory(events); // replays without re-publishing
return this.publisher.mergeObjectContext(user);
}
}
A minimal event store
The event store is append-only persistence keyed by aggregate id. In production this is PostgreSQL, EventStoreDB, or DynamoDB; the contract is the same — append events, read them back in order.
// user/event-store.service.ts
import { Injectable } from '@nestjs/common';
import { IEvent } from '@nestjs/cqrs';
@Injectable()
export class EventStore {
private readonly streams = new Map<string, IEvent[]>();
async append(aggregateId: string, events: IEvent[]): Promise<void> {
const stream = this.streams.get(aggregateId) ?? [];
stream.push(...events);
this.streams.set(aggregateId, stream);
}
async read(aggregateId: string): Promise<IEvent[]> {
return [...(this.streams.get(aggregateId) ?? [])];
}
}
To persist on commit, override the aggregate’s commit flow or append in the handler before committing:
const user = this.publisher.mergeObjectContext(existing);
user.changeEmail(command.email);
await this.store.append(user.snapshot.id, user.getUncommittedEvents());
user.commit();
Output:
$ curl -s http://localhost:3000/users/6f1c
{"id":"6f1c","email":"[email protected]","active":true}
# state above was rebuilt by replaying 2 stored events:
# UserCreatedEvent, EmailChangedEvent
Commit semantics at a glance
| Method | Records state | Queues for publish | Publishes now | Use when |
|---|---|---|---|---|
apply(event) | Yes | Yes | No | Producing a new event during an operation |
loadFromHistory(events) | Yes | No | No | Rehydrating an aggregate from the store |
getUncommittedEvents() | — | reads queue | No | Persisting new events to the store |
commit() | — | drains queue | Yes | After the operation succeeds |
Best Practices
- Keep one aggregate per consistency boundary; never reach across aggregates inside a single transaction.
- Make events immutable, small, and serialisable — they live forever in the store.
- Always persist uncommitted events before you call
commit()so a publish failure cannot lose history. - Use
loadFromHistoryfor rehydration so replayed events are not accidentally re-published. - Version your event schemas (
UserCreatedEventV2) and upcast old events rather than mutating stored data. - Add periodic snapshots for long event streams so rebuilds stay fast.
- Treat read models as disposable projections that can be rebuilt by replaying the store.