Getting Started
Wallaby streams row changes from Postgres logical replication, materializes them into your mapped EF Core entities, lets you transform/enrich them, and routes the resulting documents to pluggable sinks (destinations).
Install
dotnet add package Wallaby
dotnet add package Wallaby.Sinks.Meilisearch # optionally add a pre-built sinkWallaby requires .NET 10+.
Server prerequisites
Your Postgres server must already have:
wal_level = logicalset inpostgresql.confrequired for logical replication.- A role with the
REPLICATIONattribute (or superuser) for the connection string you give Wallaby. - Headroom in
max_replication_slotsandmax_wal_senders(one slot/sender per Wallaby cluster).
Wallaby validates these on startup and fails fast with an actionable error if something is missing.
Register
AddWallaby<TContext> is driven by your existing DbContext. You must also register an IDbContextFactory<TContext> (Wallaby opens fresh contexts for enrichment), and supply a connection string via UseConnectionString(...).
using Wallaby.Abstractions;
using Wallaby.DependencyInjection;
using Wallaby.Sinks.Meilisearch;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddDbContextFactory<AppDbContext>(o => o.UseNpgsql(conn));
builder.Services.AddWallaby<AppDbContext>(cdc =>
{
cdc.UseConnectionString(conn)
.ConfigureOptions(o =>
{
o.SlotName = "app_cdc";
o.PublicationName = "app_cdc_pub";
})
.AddMeilisearchSink("meili", m => { m.Host = "http://localhost:7700"; m.ApiKey = key; })
// specify mapping and then configure the transform and destination
.Map<Product>()
.ToSink("meili", destination: "products")
.WithBackfillVersion("v1")
.UsingTransform((_, changes, _) =>
{
var docs = new Dictionary<DocumentKey, CdcDocument?>(changes.Count);
foreach (var c in changes)
docs[c.Key] = new CdcDocument { ["name"] = c.Entity!.Name, ["price"] = c.Entity!.Price };
return Task.FromResult<IReadOnlyDictionary<DocumentKey, CdcDocument?>>(docs);
});
});
await builder.Build().RunAsync();On startup Wallaby validates the server, creates the wallaby state schema, the publication, and the replication slot, backfills the mapped tables, then streams live changes. Wallably holds a distributed lock so these operations will only run on a single node within a HA environment.
What gets tracked
Only entities you declare are captured and added to the publication:
Map<T>()declares a table and routes it to a sink.CaptureAllMappedTables()opts every mapped entity in (not recommended).
Tables a mapping DependsOn are captured automatically. Captured tables must have a primary key.
Options
ConfigureOptions(o => ...) exposes:
| Option | Default | Purpose |
|---|---|---|
SlotName / PublicationName | efcore_cdc_slot / efcore_cdc_pub | Names Wallaby creates/uses. |
ChunkSize | 500 | Backfill keyset page size. |
MaxBatchSize | 1000 | Max records per dispatched batch (and per inline dependent fan-out page). Bounds memory and sink batch size for large transactions, fan-out, and backfill. |
ManagePublicationTables | true | Reconcile the publication's table set to the model. |
RequireFullReplicaIdentity | false | Fail (vs warn) when a table needs REPLICA IDENTITY FULL. |
AutoBackfillNewTables | true | Backfill a newly declared table on first run. |
AutoBackfillOnVersionChange | true | Re-backfill when a mapping's WithBackfillVersion changes. |
StandbyRetryInterval / LeaderRetryInterval | 5s | Leader-election retry cadence. |
DeadLetterPolicy | Halt | Halt stops on a permanent sink failure; Skip logs and drops the batch. |
Next steps
- Transforms — shaping and enriching documents.
- Meilisearch sink and custom sinks.
- Backfill — initial snapshots and version-triggered reindex.
- Multi-tenancy — per-row scoped contexts and destinations.
- External slots — provision extra publications/slots for an ELT or other CDC consumer.
- Observability — OpenTelemetry metrics and traces.