Server-Side Filtering

Mat McLoughlin  |  19 July 2020

In the latest version of EventStoreDB, we brought out a new feature, server-side filtering. This feature allows you to pass a filter when querying EventStoreDB so that it will only return events that you asked for.

You will be able to filter by event type or stream name using either a regular expression or a prefix and it is available in all of our current clients, gRPC, TCP and ATOM.

Why?

The main reason for introducing this feature was to reduce the need for projections. Although these are a powerful feature of EventStoreDB they don’t come without costs. Each projection you introduce increases the write amplification on the server as each time you write an event it now has to be propagated out via any projections that are listening to it.

Server-side filtering is an easy way to filter out any events that you don’t care about without the need to write a projection.

How?

Each of the clients implements the filtering in a slightly different way due to the implications of each protocol. We’ll take a look at .NET gRPC in this article as this is going to be the recommended client going forwards.

First, we need to get EventStoreDB node running. The easiest way to do this is by using docker compose. Create a file called docker-compose.yml and paste in the following contents.

version: '3'
services:
 eventstore:
 image: eventstore/eventstore:20.6.0-buster-slim
 environment:
 - EVENTSTORE_DEV=true
 ports:
 - 2113:2113

Navigate to the directory and run the command.

docker-compose up

This is assuming you have docker already installed. If not go to getting started for instructions.

This will start a single EventStoreDB node in development mode.

gRPC filtering

Because the gRPC client is relatively new, filtering is only currently available when you subscribe to a stream.

First, create a new .NET core console application and then install the EventStore.Client.Grpc.Streams package.

dotnet add package EventStore.Client.Grpc.Streams --version 20.6.0

If you you aren’t familiar with the new gRPC client we have split the functionality up into several different nuget packages. The streams package contains all the api’s for appending to, subscribing to, and reading from streams.

Then you can create a connection to the client.

var settings = new EventStoreClientSettings {
    CreateHttpMessageHandler = () =>
        new HttpClientHandler {
            ServerCertificateCustomValidationCallback =
                (message, certificate2, x509Chain, sslPolicyErrors) => true
        },
        ConnectivitySettings = {
            Address = new Uri("https://localhost:2113")
        }
};

var client = new EventStoreClient(settings);

And set up a new subscription to the $all stream.

await client.SubscribeToAllAsync(Position.Start,
    (s, e, c) =>
    {
        Console.WriteLine($"{e.Event.EventType} @ {e.Event.Position.PreparePosition}");
        return Task.CompletedTask;
    }
);

This will subscribe to all events in the store and output the type and position. Then we can start appending events to a stream.

for (var i = 0; i < 10; i++)
{
    var eventData = new EventData(
        Uuid.NewUuid(),
        i % 2 == 0 ? "some-event" : "other-event",
        Encoding.UTF8.GetBytes("{\"id\": \"1\" \"value\": \"some value\"}")
    );

    await client.AppendToStreamAsync(
        "some-stream",
        StreamRevision.None,
        new List<EventData> {eventData}
    );
}

If we run this we’ll see the following output.

$metadata @ 231
$metadata @ 397
$metadata @ 565
$UserCreated @ 750
$UserCreated @ 1045
$User @ 1330
$User @ 1436
some-event @ 1540
other-event @ 1705
some-event @ 1871
other-event @ 2036
some-event @ 2202
other-event @ 2367
some-event @ 2533
other-event @ 2698
some-event @ 2864
other-event @ 3029

More often than not you won’t care about the $ system events so we can filter those out by adding a filter to your subscription.

await client.SubscribeToAllAsync(Position.Start,
    (s, e, c) =>
    {
        Console.WriteLine($"{e.Event.EventType} @ {e.Event.Position.PreparePosition}");
        return Task.CompletedTask;
    },
    filterOptions: new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents())
);

And if you run it again you’ll now get the following output.

some-event @ 1540
other-event @ 1705
some-event @ 1871
other-event @ 2036
some-event @ 2202
other-event @ 2367
some-event @ 2533
other-event @ 2698
some-event @ 2864
other-event @ 3029

But what if you are only interested in some-event and want to ignore all the other events including the other-event? In this case, you can pass in a prefix filter.

new SubscriptionFilterOptions(EventTypeFilter.Prefix("some-"))

To get the following output.

some-event @ 1540
some-event @ 1871
some-event @ 2202
some-event @ 2533
some-event @ 2864

As mentioned above, you can also filter by regular expressions and stream names. Following are a few more examples that may be useful.

new SubscriptionFilterOptions(EventTypeFilter.RegularExpression(@"some|other")
// will filter for event types containing the words "some" or "other"

new SubscriptionFilterOptions(StreamFilter.Prefix("user"))
// will filter for streams beginning with "user"

new SubscriptionFilterOptions(StreamFilter.RegularExpression("^user|^company")
// Will only return events beginning with "user" or "company"

As you can see, this will hopefully replace the use case for projections in a lot of scenarios, which in turn will reduce write amplification and improve the performance of EventStoreDB.

Checkpointing

There is one thing to consider with this solution, and that is when events that match your filter are few and far between. In this scenario, you might find yourself in the situation where EventStoreDB has searched through 1 million events and the last thing you want to happen is for the server to get to event 900k and then have your client crash. It won’t have been able to take a checkpoint and upon restart, you’d have to go back to the beginning and start again.

It’s for this reason we have introduced an additional delegate that will be triggered every n number of events.

To make use of it set up the extra delegate checkpointReached on the SubscriptionFilterOptions class.

var filterOptions = new SubscriptionFilterOptions(
    EventTypeFilter.ExcludeSystemEvents(),
    checkpointReached: (s, p, c) =>
    {
        Console.WriteLine($"checkpoint taken at {p.PreparePosition}");
        return Task.CompletedTask;
    });

This will be called every n number of events. If you want to be specific about the number of events threshold you can also pass that in.

var filterOptions = new SubscriptionFilterOptions(
    EventTypeFilter.ExcludeSystemEvents(),
    checkpointInterval: 1000,
    checkpointReached: (s, p, c) =>
    {
        Console.WriteLine($"checkpoint taken at {p.PreparePosition}");
        return Task.CompletedTask;
    });

Conclusion

As you can see, this will hopefully replace the need for projections in a lot of cases. Both the TCP client and HTTP client also implement the filtering and you can find out more about them in the documentation.

The sample code for this application can be found here.


Photo of Mat McLoughlin

Mat McLoughlin Mat is Event Store's Former Head of Developer Advocacy; he uses his previous experience as an Event Store customer building event-sourced systems to identify areas where the developer experience can be improved, and his objective is to help more developers learn and successfully implement Event Store in their systems. He has now left Event Store.