Skip to content

Getting Started

Wallaby streams row changes from Postgres logical replication, materializes them into your mapped EF Core entities, lets you transform/enrich them, and routes the resulting documents to pluggable sinks (destinations).

Install

bash
dotnet add package Wallaby
dotnet add package Wallaby.Sinks.Meilisearch   # optionally add a pre-built sink

Wallaby requires .NET 10+.

Server prerequisites

Your Postgres server must already have:

  • wal_level = logical set in postgresql.conf required for logical replication.
  • A role with the REPLICATION attribute (or superuser) for the connection string you give Wallaby.
  • Headroom in max_replication_slots and max_wal_senders (one slot/sender per Wallaby cluster).

Wallaby validates these on startup and fails fast with an actionable error if something is missing.

Register

AddWallaby<TContext> is driven by your existing DbContext. You must also register an IDbContextFactory<TContext> (Wallaby opens fresh contexts for enrichment), and supply a connection string via UseConnectionString(...).

csharp
using Wallaby.Abstractions;
using Wallaby.DependencyInjection;
using Wallaby.Sinks.Meilisearch;

var builder = Host.CreateApplicationBuilder(args);

builder.Services.AddDbContextFactory<AppDbContext>(o => o.UseNpgsql(conn));

builder.Services.AddWallaby<AppDbContext>(cdc =>
{
    cdc.UseConnectionString(conn)
       .ConfigureOptions(o =>
       {
           o.SlotName = "app_cdc";
           o.PublicationName = "app_cdc_pub";
       })
       .AddMeilisearchSink("meili", m => { m.Host = "http://localhost:7700"; m.ApiKey = key; })

       // specify mapping and then configure the transform and destination
       .Map<Product>()
            .ToSink("meili", destination: "products")
            .WithBackfillVersion("v1")
            .UsingTransform((_, changes, _) =>
            {
                var docs = new Dictionary<DocumentKey, CdcDocument?>(changes.Count);
                foreach (var c in changes)
                    docs[c.Key] = new CdcDocument { ["name"] = c.Entity!.Name, ["price"] = c.Entity!.Price };
                return Task.FromResult<IReadOnlyDictionary<DocumentKey, CdcDocument?>>(docs);
            });
});

await builder.Build().RunAsync();

On startup Wallaby validates the server, creates the wallaby state schema, the publication, and the replication slot, backfills the mapped tables, then streams live changes. Wallably holds a distributed lock so these operations will only run on a single node within a HA environment.

What gets tracked

Only entities you declare are captured and added to the publication:

  • Map<T>() declares a table and routes it to a sink.
  • CaptureAllMappedTables() opts every mapped entity in (not recommended).

Tables a mapping DependsOn are captured automatically. Captured tables must have a primary key.

Options

ConfigureOptions(o => ...) exposes:

OptionDefaultPurpose
SlotName / PublicationNameefcore_cdc_slot / efcore_cdc_pubNames Wallaby creates/uses.
ChunkSize500Backfill keyset page size.
MaxBatchSize1000Max records per dispatched batch (and per inline dependent fan-out page). Bounds memory and sink batch size for large transactions, fan-out, and backfill.
ManagePublicationTablestrueReconcile the publication's table set to the model.
RequireFullReplicaIdentityfalseFail (vs warn) when a table needs REPLICA IDENTITY FULL.
AutoBackfillNewTablestrueBackfill a newly declared table on first run.
AutoBackfillOnVersionChangetrueRe-backfill when a mapping's WithBackfillVersion changes.
StandbyRetryInterval / LeaderRetryInterval5sLeader-election retry cadence.
DeadLetterPolicyHaltHalt stops on a permanent sink failure; Skip logs and drops the batch.

Next steps