diff --git a/Directory.Packages.props b/Directory.Packages.props
index 59f4734dd..f9b53128f 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -22,10 +22,12 @@
0.77.3
+
+
@@ -37,6 +39,7 @@
+
@@ -62,6 +65,7 @@
+
@@ -112,6 +116,7 @@
+
diff --git a/Eventuous.slnx b/Eventuous.slnx
index e32a93a74..0d3c4e669 100644
--- a/Eventuous.slnx
+++ b/Eventuous.slnx
@@ -14,9 +14,11 @@
+
+
diff --git a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs
index daa30259a..4b6b4e447 100644
--- a/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs
+++ b/src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs
@@ -23,7 +23,7 @@ public class ServiceBusSubscription : EventSubscriptionConsume pipe instance
/// Logger factory (optional)
/// Event serializer (optional)
- public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory, IEventSerializer? eventSerializer) :
+ public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory, IEventSerializer? eventSerializer = null) :
base(options, consumePipe, loggerFactory, eventSerializer) {
_defaultErrorHandler = Options.ErrorHandler ?? DefaultErrorHandler;
diff --git a/src/Azure/src/Eventuous.Azure.Storage.Blobs/Eventuous.Azure.Storage.Blobs.csproj b/src/Azure/src/Eventuous.Azure.Storage.Blobs/Eventuous.Azure.Storage.Blobs.csproj
new file mode 100644
index 000000000..edf8dc2c3
--- /dev/null
+++ b/src/Azure/src/Eventuous.Azure.Storage.Blobs/Eventuous.Azure.Storage.Blobs.csproj
@@ -0,0 +1,39 @@
+
+
+
+ README.md
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Tools\TaskExtensions.cs
+
+
+ Tools\Ensure.cs
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Azure/src/Eventuous.Azure.Storage.Blobs/README.md b/src/Azure/src/Eventuous.Azure.Storage.Blobs/README.md
new file mode 100644
index 000000000..652db2965
--- /dev/null
+++ b/src/Azure/src/Eventuous.Azure.Storage.Blobs/README.md
@@ -0,0 +1,112 @@
+# Eventuous Azure Blob Storage Projections
+
+This package adds Azure Blob Storage projections to applications built with Eventuous. It allows you to project event store events to Azure Blob Storage as state objects, maintaining a separate state document for each event stream.
+
+## Using projections
+
+Create your own projection class that inherits from `StorageBlobsProjector` where `T` is your state type. The state type must be a class with a parameterless constructor.
+
+Register event handlers using the `On` methods. When an event is received, the projector retrieves the current state blob (or creates a new state instance if the blob doesn't exist), applies the event to the state using the registered event handler, and uploads the updated state back to Blob Storage.
+
+The class provides two constructors:
+
+* `StorageBlobsProjector(BlobContainerClient container, ...` where the container client is passed directly
+* `StorageBlobsProjector(BlobServiceClient serviceClient, string containerName, ...` where the service client is set up by Azure DI and the container name is set by the projection
+
+By using `IOptions` we can also use the Json serialization options as set in ASP DI.
+
+By default, the blob ID is extracted from the stream using `context.Stream.GetId()`. You can override this by providing a custom `getBlobId` function in the event registration:
+
+```csharp
+public class BookingProjection : StorageBlobsProjector {
+ public BookingProjection(BlobServiceClient client, IOptions serializerOptions)
+ : base(client, "bookings-container", serializerOptions.Value) {
+
+ // Uses default blob ID from stream
+ On((state, evt) => {
+ state.RoomId = evt.RoomId;
+ state.CheckInDate = evt.CheckIn;
+ return state;
+ });
+
+ // Custom blob ID using event data
+ On(
+ (state, evt) => {
+ state.PaidAmount += evt.AmountPaid;
+ return state;
+ },
+ context => new ValueTask($"custom-{context.Message.BookingId}")
+ );
+ }
+}
+```
+
+## Projector options
+
+The `StorageBlobProjectorOptions` class provides several configuration options for fine-tuning the projector behavior.
+
+| Option | Type | Default | Description |
+|--------|------|---------|-------------|
+| `JsonOptions` | `JsonSerializerOptions?` | `null` (uses `JsonSerializerOptions.Web`) | JSON serializer options for state serialization/deserialization. Controls formatting, naming policies, etc. |
+| `RaceRetries` | `int` | `0` | Number of retry attempts for optimistic concurrency conflicts. Increase when concurrent updates are likely. |
+| `IdempotencyMode` | `IdempotencyMode` | `IdempotencyMode.None` | Controls duplicate message detection behavior. |
+
+### Idempotency modes
+
+The `IdempotencyMode` enum controls how the projector handles duplicate messages:
+
+- **`None`** - No idempotency checks. Always processes messages and updates blobs.
+- **`ByGlobalPosition`** - Skips processing if existing blob has matching global position metadata.
+- **`ByMessageId`** - Skips processing if existing blob has matching message ID metadata.
+
+### Custom blob naming
+
+By default, blob names are generated using `GetBlobName(string id)` which creates names in the format `{id}/{T}.json`, where `id` defaults to the stream ID from `context.Stream.GetId()`.
+
+You can customize blob naming in two ways:
+
+**1. Override the virtual methods globally for all events:**
+
+```csharp
+protected override string GetBlobName(string id, IMessageConsumeContext context) {
+ // Use stream name and type in the path
+ var streamName = context.Stream.ToString();
+ return $"projections/{streamName}/{id}.json";
+}
+
+protected override string GetBlobName(string id) {
+ return $"{id}/{typeof(T).Name}.json";
+}
+```
+
+**2. Override blob ID per event handler using `getBlobId`:**
+
+```csharp
+On(
+ (state, evt) => {
+ state.PaidAmount += evt.AmountPaid;
+ return state;
+ },
+ // Custom blob ID for this specific event only
+ context => new ValueTask($"payments/{context.Message.BookingId}.json")
+);
+```
+
+Use per-event blob ID overrides when you need different events to target different blob paths or naming conventions within the same projector, such as when the business identifier differs from the stream identifier.
+## Features
+
+- **Automatic state management** - Creates new state instances when blobs don't exist
+- **Optimistic concurrency control** - Uses ETags for safe concurrent updates
+- **Idempotency** - Prevents duplicate processing with configurable modes
+- **Retry handling** - Automatic retries for race conditions
+- **Flexible blob naming** - Customizable blob ID and naming conventions
+- **Metadata storage** - Automatically stores stream info, positions, and message IDs
+
+## Background
+
+The projector stores each state as a separate blob in Azure Blob Storage. Each blob contains:
+- The serialized state object (JSON by default)
+- Metadata including stream name, message ID, stream position, and global position
+- Content type set to `application/json`
+
+This approach provides natural partitioning by stream and enables efficient state retrieval for individual streams.
\ No newline at end of file
diff --git a/src/Azure/src/Eventuous.Azure.Storage.Blobs/StorageBlobProjectorOptions.cs b/src/Azure/src/Eventuous.Azure.Storage.Blobs/StorageBlobProjectorOptions.cs
new file mode 100644
index 000000000..d150ad14b
--- /dev/null
+++ b/src/Azure/src/Eventuous.Azure.Storage.Blobs/StorageBlobProjectorOptions.cs
@@ -0,0 +1,53 @@
+using System.Text.Json;
+
+namespace Eventuous.Azure.Storage.Blobs;
+
+///
+/// Options for configuring the storage blob projector.
+///
+public class StorageBlobProjectorOptions {
+ ///
+ /// Gets or sets the JSON serializer options to use when serializing or deserializing projection state.
+ /// By default, the default JSON serializer options will be used if this property is not set.
+ ///
+ public JsonSerializerOptions? JsonOptions { get; set; }
+
+ ///
+ /// Gets or sets the number of retry attempts for race condition handling when saving projection state.
+ /// Default is 0 (no retries).
+ ///
+ public int RaceRetries { get; set; } = 0;
+
+ ///
+ /// Gets or sets the idempotency mode for the projector. When enabled, the projector will skip processing
+ /// if the blob already exists with a matching identifier (message ID or global position), preventing duplicate processing.
+ /// Default is (no idempotency checking).
+ ///
+ public IdempotencyMode IdempotencyMode { get; set; } = IdempotencyMode.None;
+}
+
+///
+/// Controls how the projection handles idempotency to prevent duplicate message processing.
+///
+public enum IdempotencyMode {
+ ///
+ /// No idempotency checks. The projector will always process messages and update blobs.
+ /// Use when duplicate processing is acceptable or when external mechanisms ensure message uniqueness.
+ ///
+ None,
+
+ ///
+ /// Skips processing if the existing blob was created from a message at the same global position.
+ /// Uses the GlobalPosition metadata stored with the blob for comparison.
+ /// Effective for append-only event streams where global position uniquely identifies a message.
+ ///
+ ByGlobalPosition,
+
+ ///
+ /// Skips processing if the existing blob was created from the same message ID.
+ /// Uses the MessageId metadata stored with the blob for comparison.
+ /// More precise than position-based checks, works even if messages are processed out of order.
+ /// Especially from external message queues where global position may not be available or reliable.
+ ///
+ ByMessageId
+}
\ No newline at end of file
diff --git a/src/Azure/src/Eventuous.Azure.Storage.Blobs/StorageBlobsProjector.cs b/src/Azure/src/Eventuous.Azure.Storage.Blobs/StorageBlobsProjector.cs
new file mode 100644
index 000000000..7733cd69c
--- /dev/null
+++ b/src/Azure/src/Eventuous.Azure.Storage.Blobs/StorageBlobsProjector.cs
@@ -0,0 +1,232 @@
+using Azure;
+using Azure.Storage.Blobs.Models;
+using Eventuous.Subscriptions;
+using Eventuous.Subscriptions.Context;
+using System.Text.Json;
+
+using static Eventuous.Subscriptions.Diagnostics.SubscriptionsEventSource;
+
+namespace Eventuous.Azure.Storage.Blobs;
+
+///
+/// Projects event store events to Azure Blob Storage as state objects of type T.
+///
+///
+///
+/// This projector works by maintaining a state object of type T in Azure Blob Storage for each event stream.
+/// When an event is received, it retrieves the current state blob (or creates a new state instance if the blob doesn't exist),
+/// applies the event to the state using the registered event handler, and uploads the updated state back to Blob Storage.
+/// The projector uses optimistic concurrency control via ETags to handle concurrent updates, and provides virtual methods
+/// for customizing blob naming conventions. Multiple event types can be handled by registering handlers using the On(TEvent) methods.
+/// The optional getBlobId parameter in event registration allows custom blob ID generation, which is useful when the default
+/// stream ID from context.Stream.GetId() needs to be overridden, such as using event metadata or custom business logic.
+///
+///
+public class StorageBlobsProjector : BaseEventHandler where T : class, new() {
+ /// Azure Blob Storage container client.
+ protected readonly BlobContainerClient ContainerClient;
+
+ readonly JsonSerializerOptions _jsonOptions;
+ readonly Dictionary>> _handlers = new();
+ readonly ITypeMapper _map;
+
+ private readonly int _raceRetries;
+ private readonly IdempotencyMode _idempotencyMode;
+
+ /// Delegate for custom blob ID generation from consume context.
+ /// Event type being consumed.
+ /// Event consume context.
+ /// Blob ID as string.
+ public delegate ValueTask GetBlobId(IMessageConsumeContext context) where TEvent : class;
+
+ ///
+ /// Initializes projector with existing container client.
+ ///
+ /// Azure Blob Storage container client.
+ /// Optional projector configuration.
+ /// Optional JSON serializer options.
+ /// Optional type mapper for event type resolution.
+ public StorageBlobsProjector(
+ BlobContainerClient container,
+ JsonSerializerOptions? serializerOptions = null,
+ StorageBlobProjectorOptions? projectorOptions = null,
+ ITypeMapper? mapper = null
+ ) {
+ ContainerClient = container;
+ _jsonOptions = new(projectorOptions?.JsonOptions ?? serializerOptions ?? JsonSerializerOptions.Web);
+ _map = mapper ?? TypeMap.Instance;
+ _raceRetries = projectorOptions?.RaceRetries ?? 0;
+ _idempotencyMode = projectorOptions?.IdempotencyMode ?? IdempotencyMode.None;
+ }
+
+ ///
+ /// Initializes projector with service client and container name.
+ ///
+ /// Azure Blob Storage service client.
+ /// Name of the container to use.
+ /// Optional JSON serializer options.
+ /// Optional type mapper for event type resolution.
+ /// Optional projector configuration.
+ public StorageBlobsProjector(
+ BlobServiceClient serviceClient,
+ string containerName,
+ JsonSerializerOptions? serializerOptions = null,
+ StorageBlobProjectorOptions? projectorOptions = null,
+ ITypeMapper? mapper = null
+ ) : this(serviceClient.GetBlobContainerClient(containerName), serializerOptions, projectorOptions, mapper) { }
+
+ /// Registers event handler with sync state update.
+ /// Event type to handle.
+ /// State update function receiving current state and event.
+ /// Optional custom blob ID generator.
+ protected void On(Func handler, GetBlobId? getBlobId = null) where TEvent : class
+ => On((ctx, state) => new ValueTask(handler(state, ctx.Message)), getBlobId);
+
+ /// Registers event handler with context and sync state update.
+ /// Event type to handle.
+ /// State update function receiving context, current state, and event.
+ /// Optional custom blob ID generator.
+ protected void On(Func, T, T> handler, GetBlobId? getBlobId = null) where TEvent : class
+ => On((ctx, state) => new ValueTask(handler(ctx, state)), getBlobId);
+
+ /// Registers event handler with async state update.
+ /// Event type to handle.
+ /// Async state update function receiving current state and event.
+ /// Optional custom blob ID generator.
+ protected void On(Func> handler, GetBlobId? getBlobId = null) where TEvent : class
+ => On((ctx, state) => handler(state, ctx.Message), getBlobId);
+
+ /// Registers event handler with context, async state update, and custom blob ID.
+ /// Event type to handle.
+ /// Async state update function receiving context, current state, and event.
+ /// Optional custom blob ID generator.
+ protected void On(Func, T, ValueTask> handler, GetBlobId? getBlobId = null) where TEvent : class {
+ if (!_handlers.TryAdd(typeof(TEvent), new Handler(this, handler, getBlobId).Handle)) {
+ throw new ArgumentException($"Type {typeof(TEvent).Name} already has a handler");
+ }
+
+ if (!_map.TryGetTypeName(out _)) {
+ Log.MessageTypeNotRegistered();
+ }
+ }
+
+ private BlobClient GetBlobContainerClient(string blobName) => ContainerClient.GetBlobClient(blobName);
+
+ /// Registers event handler without custom blob ID.
+ /// Event type to handle.
+ /// Async state update function receiving context, current state, and event.
+ protected void On(Func, T, ValueTask> handler) where TEvent : class
+ => On(handler, default);
+
+ /// Handles incoming event by dispatching to registered handler.
+ /// Event consume context.
+ /// Event handling status indicating success, failure, or ignore.
+ public override async ValueTask HandleEvent(IMessageConsumeContext context) =>
+ _handlers.TryGetValue(context.Message!.GetType(), out var handler)
+ ? await handler(context).NoContext()
+ : EventHandlingStatus.Ignored;
+
+ private T ToObjectFromJson(BinaryData content) => content.ToObjectFromJson(_jsonOptions) ?? new T();
+ private byte[] SerializeToUtf8Bytes(T updated) => JsonSerializer.SerializeToUtf8Bytes(updated, _jsonOptions);
+
+ /// Gets blob name from ID and context. Can be overridden for custom naming.
+ /// Blob identifier.
+ /// Event consume context.
+ /// Blob name as string.
+ protected virtual string GetBlobName(string id, IMessageConsumeContext context) => GetBlobName(id);
+
+ /// Gets blob name from ID. Default format: {id}/{T}.json
+ /// Blob identifier.
+ /// Blob name as string.
+ protected virtual string GetBlobName(string id) => $"{id}/{typeof(T).Name}.json";
+
+ private class Handler
+ where TEvent : class {
+ private readonly StorageBlobsProjector projector;
+ private readonly Func, T, ValueTask> EventHandler;
+ private readonly GetBlobId? GetBlobId;
+
+ public Handler(StorageBlobsProjector storageBlobsProjector, Func, T, ValueTask> handler, GetBlobId? getBlobId) {
+ projector = storageBlobsProjector;
+ EventHandler = handler;
+ GetBlobId = getBlobId;
+ }
+
+ public async ValueTask Handle(IMessageConsumeContext context) {
+ var typedContext = context as MessageConsumeContext ?? new MessageConsumeContext(context);
+ var blobId = GetBlobId == null
+ ? context.Stream.GetId()
+ : await GetBlobId(typedContext).NoContext();
+ var blobName = projector.GetBlobName(blobId, typedContext);
+
+ var blobClient = projector.GetBlobContainerClient(blobName);
+
+ return await ModifyBlobWithRetries(projector._raceRetries).NoContext();
+
+ async Task ModifyBlobWithRetries(int retries) {
+ try {
+ return await ModifyBlob().NoContext();
+ } catch (RequestFailedException ex) when (ex.Status == 412 || ex.Status == 409) {
+ return retries > 0 ? await ModifyBlobWithRetries(retries - 1).NoContext() : EventHandlingStatus.Failure;
+ }
+ }
+
+ async Task ModifyBlob() {
+ try {
+ var blobContent = await blobClient.DownloadContentAsync(typedContext.CancellationToken).NoContext();
+
+ // Check idempotency if enabled
+ if (projector._idempotencyMode != IdempotencyMode.None) {
+ if (IsDuplicate(blobContent.Value.Details.Metadata)) {
+ return EventHandlingStatus.Ignored;
+ }
+ }
+
+ var content = blobContent.Value.Content;
+ var current = projector.ToObjectFromJson(content);
+
+ await UploadUpdated(current, new BlobRequestConditions { IfMatch = blobContent.Value.Details.ETag }).NoContext();
+ return EventHandlingStatus.Success;
+ } catch (RequestFailedException ex) when (ex.Status == 404) {
+ // Blob doesn't exist, start with a new instance
+ await UploadUpdated(new T(), new BlobRequestConditions { IfNoneMatch = ETag.All }).NoContext();
+ return EventHandlingStatus.Success;
+ }
+ }
+
+ bool IsDuplicate(IDictionary metadata) => projector._idempotencyMode switch {
+ IdempotencyMode.ByGlobalPosition =>
+ metadata.TryGetValue("GlobalPosition", out var storedPosition) &&
+ storedPosition == typedContext.GlobalPosition.ToString(),
+ IdempotencyMode.ByMessageId =>
+ metadata.TryGetValue("MessageId", out var storedId) &&
+ storedId == typedContext.MessageId,
+ _ => false
+ };
+
+ async Task UploadUpdated(T current, BlobRequestConditions conditions) {
+ var task = EventHandler(typedContext, current);
+ var updated = task.IsCompletedSuccessfully
+ ? task.Result
+ : await task.NoContext();
+ var json = projector.SerializeToUtf8Bytes(updated);
+
+ var uploadOptions = new BlobUploadOptions {
+ Conditions = conditions,
+ HttpHeaders = new BlobHttpHeaders {
+ ContentType = "application/json"
+ },
+ Metadata = new Dictionary {
+ ["Stream"] = typedContext.Stream.ToString(),
+ ["MessageId"] = typedContext.MessageId,
+ ["StreamPosition"] = typedContext.StreamPosition.ToString(),
+ ["GlobalPosition"] = typedContext.GlobalPosition.ToString()
+ }
+ };
+
+ using var stream = new MemoryStream(json);
+ var response = await blobClient.UploadAsync(stream, uploadOptions, typedContext.CancellationToken).NoContext();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/Eventuous.Tests.Azure.Storage.Blobs.csproj b/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/Eventuous.Tests.Azure.Storage.Blobs.csproj
new file mode 100644
index 000000000..4ab5a79fb
--- /dev/null
+++ b/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/Eventuous.Tests.Azure.Storage.Blobs.csproj
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
diff --git a/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/Fixtures/IntegrationFixture.cs b/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/Fixtures/IntegrationFixture.cs
new file mode 100644
index 000000000..2ccf549ad
--- /dev/null
+++ b/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/Fixtures/IntegrationFixture.cs
@@ -0,0 +1,49 @@
+using Azure.Storage.Blobs;
+using Eventuous.TestHelpers;
+using Testcontainers.Azurite;
+using TUnit.Core.Interfaces;
+
+namespace Eventuous.Tests.Azure.Storage.Blobs.Fixtures;
+
+public sealed class IntegrationFixture : IAsyncInitializer, IAsyncDisposable {
+ public IEventStore EventStore { get; set; } = null!;
+ public BlobServiceClient BlobServiceClient { get; private set; } = null!;
+
+ static IEventSerializer Serializer { get; } = new DefaultEventSerializer(TestPrimitives.DefaultOptions);
+
+ AzuriteContainer _azuriteContainer = null!;
+
+ public async Task AppendEvent(
+ StreamName streamName,
+ object evt,
+ ExpectedStreamVersion? version = null
+ ) {
+ return await EventStore.AppendEvents(
+ streamName,
+ version ?? ExpectedStreamVersion.Any,
+ [new(Guid.NewGuid(), evt, new())],
+ CancellationToken.None
+ );
+ }
+
+ static IntegrationFixture() {
+ DefaultEventSerializer.SetDefaultSerializer(Serializer);
+ }
+
+ public async Task InitializeAsync() {
+ // Start Azurite container for blob storage
+ _azuriteContainer = new AzuriteBuilder()
+ .WithImage("mcr.microsoft.com/azure-storage/azurite:latest")
+ .WithCommand("--skipApiVersionCheck")
+ .Build();
+ await _azuriteContainer.StartAsync();
+
+ var connectionString = _azuriteContainer.GetConnectionString();
+ BlobServiceClient = new BlobServiceClient(connectionString);
+
+ }
+
+ public async ValueTask DisposeAsync() {
+ await _azuriteContainer.DisposeAsync();
+ }
+}
diff --git a/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/StorageBlobsProjectorTests.cs b/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/StorageBlobsProjectorTests.cs
new file mode 100644
index 000000000..934cccbdf
--- /dev/null
+++ b/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/StorageBlobsProjectorTests.cs
@@ -0,0 +1,682 @@
+using System.Text.Json;
+using Azure.Storage.Blobs;
+using Eventuous.Azure.Storage.Blobs;
+using Eventuous.Subscriptions;
+using Eventuous.Subscriptions.Context;
+using Eventuous.Tests.Azure.Storage.Blobs.Fixtures;
+
+namespace Eventuous.Tests.Azure.Storage.Blobs;
+
+[ClassDataSource]
+public class StorageBlobsProjectorTests(IntegrationFixture fixture) {
+ const string DefaultStream = "stream";
+
+ // ========== HELPER METHODS (surface intent through naming) ==========
+
+ ///
+ /// Creates a test container for the given scenario, surfacing the handler type and test case.
+ /// Returns the container name for use with the new constructor.
+ ///
+ async Task SetupContainer(string scenarioName) {
+ var containerName = $"test-{scenarioName}";
+ var client = fixture.BlobServiceClient.GetBlobContainerClient(containerName);
+ await client.CreateAsync();
+ return containerName;
+ }
+
+ ///
+ /// Gets a BlobContainerClient for the given container name
+ ///
+ BlobContainerClient GetContainer(string containerName) =>
+ fixture.BlobServiceClient.GetBlobContainerClient(containerName);
+
+ ///
+ /// Sets up initial blob state for update scenarios
+ ///
+ async Task SetupExistingBlob(string containerName, string blobName, TState initialState) {
+ var blobClient = GetContainer(containerName).GetBlobClient(blobName);
+ var json = JsonSerializer.SerializeToUtf8Bytes(initialState);
+ await blobClient.UploadAsync(new MemoryStream(json), overwrite: true);
+ }
+
+ ///
+ /// Gets the state from blob, surfacing the expected state type
+ ///
+ async Task GetBlobState(string containerName, string blobName) {
+ var blobClient = GetContainer(containerName).GetBlobClient(blobName);
+ var blob = await blobClient.DownloadContentAsync();
+ return blob.Value.Content.ToObjectFromJson(JsonSerializerOptions.Web)!;
+ }
+
+ ///
+ /// Asserts that the projector result is Success
+ ///
+ static async Task AssertSuccess(EventHandlingStatus result) => await Assert.That(result).IsEqualTo(EventHandlingStatus.Success);
+
+ ///
+ /// Asserts that the projector result is Ignored
+ ///
+ static async Task AssertIgnored(EventHandlingStatus result) => await Assert.That(result).IsEqualTo(EventHandlingStatus.Ignored);
+
+ ///
+ /// Asserts that the projector result is Failure
+ ///
+ static async Task AssertFailure(EventHandlingStatus result) => await Assert.That(result).IsEqualTo(EventHandlingStatus.Failure);
+
+ // ========== SYNC STATE HANDLER TESTS ==========
+
+ [Test]
+ public async Task SyncStateHandler_NewBlob_ShouldCreateAndStoreState() {
+ // Arrange
+ var containerName = await SetupContainer("sync-state-new");
+ var projector = new SyncStateProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Value = 10 });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertSuccess(result);
+
+ var state = await GetBlobState(containerName, $"{DefaultStream}/SyncState.json");
+ await Assert.That(state.Value).IsEqualTo(10);
+ }
+
+ [Test]
+ public async Task SyncStateHandler_ExistingBlob_ShouldUpdateState() {
+ // Arrange
+ var containerName = await SetupContainer("sync-state-existing");
+ var blobName = $"{DefaultStream}/SyncState.json";
+
+ await SetupExistingBlob(containerName, blobName, new SyncState { Value = 5 });
+
+ var projector = new SyncStateProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Value = 10 });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertSuccess(result);
+
+ var state = await GetBlobState(containerName, blobName);
+ await Assert.That(state.Value).IsEqualTo(15); // 5 + 10
+ await Assert.That(state.Counter).IsEqualTo(1);
+ }
+
+ // ========== SYNC CONTEXT-AWARE HANDLER TESTS ==========
+
+ [Test]
+ public async Task SyncContextAwareHandler_NewBlob_ShouldUseContextAndStoreState() {
+ // Arrange
+ var containerName = await SetupContainer("sync-context-new");
+ var projector = new SyncContextAwareProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Value = 20 });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertSuccess(result);
+
+ var state = await GetBlobState(containerName, $"{DefaultStream}/SyncContextState.json");
+ await Assert.That(state.Value).IsEqualTo(20);
+ await Assert.That(state.StreamId).IsEqualTo(DefaultStream);
+ }
+
+ // ========== ASYNC STATE HANDLER TESTS ==========
+
+ [Test]
+ public async Task AsyncStateHandler_NewBlob_ShouldCreateAndStoreState() {
+ // Arrange
+ var containerName = await SetupContainer("async-state-new");
+ var projector = new AsyncStateProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Value = 30 });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertSuccess(result);
+
+ var state = await GetBlobState(containerName, $"{DefaultStream}/AsyncState.json");
+ await Assert.That(state.Value).IsEqualTo(30);
+ }
+
+ [Test]
+ public async Task AsyncStateHandler_ExistingBlob_ShouldUpdateState() {
+ // Arrange
+ var containerName = await SetupContainer("async-state-existing");
+ var blobName = $"{DefaultStream}/AsyncState.json";
+
+ await SetupExistingBlob(containerName, blobName, new AsyncState { Value = 5 });
+
+ var projector = new AsyncStateProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Value = 35 });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertSuccess(result);
+
+ var state = await GetBlobState(containerName, blobName);
+ await Assert.That(state.Value).IsEqualTo(40); // 5 + 35
+ }
+
+ // ========== ASYNC CONTEXT-AWARE HANDLER TESTS ==========
+
+ [Test]
+ public async Task AsyncContextAwareHandler_NewBlob_ShouldUseContextAndStoreState() {
+ // Arrange
+ var containerName = await SetupContainer("async-context-new");
+ var projector = new AsyncContextAwareProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Value = 40, Name = "AsyncContext" });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertSuccess(result);
+
+ var state = await GetBlobState(containerName, $"{DefaultStream}/AsyncContextState.json");
+ await Assert.That(state.Value).IsEqualTo(40);
+ await Assert.That(state.EventName).IsEqualTo("AsyncContext");
+ }
+
+ [Test]
+ public async Task AsyncContextAwareHandler_ExistingBlob_ShouldUpdateStateAndContext() {
+ // Arrange
+ var containerName = await SetupContainer("async-context-existing");
+ var blobName = $"{DefaultStream}/AsyncContextState.json";
+
+ await SetupExistingBlob(containerName, blobName, new AsyncContextState { Value = 10, EventName = "Initial" });
+
+ var projector = new AsyncContextAwareProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Value = 50, Name = "Update" });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertSuccess(result);
+
+ var state = await GetBlobState(containerName, blobName);
+ await Assert.That(state.Value).IsEqualTo(60); // 10 + 50
+ await Assert.That(state.EventName).IsEqualTo("Update");
+ }
+
+ // ========== EDGE CASE TESTS ==========
+
+ [Test]
+ public async Task NoHandler_ShouldReturnIgnored() {
+ // Arrange
+ var containerName = await SetupContainer("no-handler");
+ var projector = new NoHandlerProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Value = 100 });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertIgnored(result);
+ }
+
+ // ========== CUSTOM BLOB ID TESTS ==========
+
+ [Test]
+ public async Task CustomBlobId_NewBlob_ShouldUseEventIdForBlobName() {
+ // Arrange
+ var containerName = await SetupContainer("custom-blobid-new");
+ var eventId = Guid.NewGuid().ToString();
+ var projector = new CustomBlobIdProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Id = eventId, Value = 100 });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertSuccess(result);
+
+ var blobName = $"{eventId}/CustomBlobIdState.json";
+ var state = await GetBlobState(containerName, blobName);
+ await Assert.That(state.Value).IsEqualTo(100);
+ }
+
+ [Test]
+ public async Task CustomBlobId_ExistingBlob_ShouldUpdateWithEventId() {
+ // Arrange
+ var containerName = await SetupContainer("custom-blobid-existing");
+ var eventId = Guid.NewGuid().ToString();
+ var blobName = $"{eventId}/CustomBlobIdState.json";
+
+ await SetupExistingBlob(containerName, blobName, new CustomBlobIdState { Value = 5 });
+
+ var projector = new CustomBlobIdProjector(fixture.BlobServiceClient, containerName);
+ var context = CreateContext(new TestEvent { Id = eventId, Value = 100 });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert
+ await AssertSuccess(result);
+
+ var state = await GetBlobState(containerName, blobName);
+ await Assert.That(state.Value).IsEqualTo(105); // 5 + 100
+ }
+
+ // ========== RACE RETRY TESTS ==========
+
+ [Test]
+ public async Task RaceRetries_WithOneRetry_ShouldSucceedAfterRaceCondition() {
+ // Arrange
+ var containerName = await SetupContainer("race-retry");
+ var blobName = $"{DefaultStream}/ConcurrentState.json";
+
+ await SetupExistingBlob(containerName, blobName, new ConcurrentState { Value = 1 });
+
+ var projector = new ConcurrentModificationProjector(
+ fixture.BlobServiceClient,
+ containerName,
+ messWithState: async () => {
+ // Simulate concurrent modification: modify the blob directly with a different value
+ var modifiedState = new ConcurrentState { Value = 999 };
+ var modifiedJson = JsonSerializer.SerializeToUtf8Bytes(modifiedState);
+ var blobClient = GetContainer(containerName).GetBlobClient(blobName);
+ await blobClient.UploadAsync(new MemoryStream(modifiedJson), overwrite: true);
+ },
+ raceRetries: 1);
+ var context = CreateContext(new TestEvent { Value = 10 });
+
+ // Act
+ var result = await projector.HandleEvent(context);
+
+ // Assert - with retry, this should succeed
+ await AssertSuccess(result);
+
+ var state = await GetBlobState(containerName, blobName);
+ // First attempt: concurrent modification sets value to 999, causing 412
+ // Retry: reads 999, adds 10, succeeds
+ await Assert.That(state.Value).IsEqualTo(1009); // 999 + 10 (retry succeeded)
+ }
+
+ [Test]
+ public async Task ConcurrentAdditionOfNewBlob_ShouldReturnFailure() {
+ // Arrange
+ var containerName = await SetupContainer("concurrent-new");
+ var blobName = "stream/ConcurrentState.json";
+
+ var projector = new ConcurrentModificationProjector(
+ fixture.BlobServiceClient,
+ containerName,
+ messWithState: async () => {
+ // Simulate concurrent modification: modify the blob directly with a different value
+ var modifiedState = new ConcurrentState { Value = 999 };
+ var modifiedJson = JsonSerializer.SerializeToUtf8Bytes(modifiedState);
+ var blobClient = GetContainer(containerName).GetBlobClient(blobName);
+ await blobClient.UploadAsync(new MemoryStream(modifiedJson), overwrite: true);
+ },
+ onCall: 1);
+ var context = CreateContext(new TestEvent { Value = 10 });
+
+ // This should now fail with 412 because the ETag won't match
+ var result2 = await projector.HandleEvent(context);
+ await StorageBlobsProjectorTests.AssertFailure(result2);
+ }
+
+ [Test]
+ public async Task ConcurrentModificationOfExistingBlob_ShouldReturnFailure() {
+ // Arrange
+ var containerName = await SetupContainer("concurrent-existing");
+ var blobName = "stream/ConcurrentState.json";
+
+ await SetupExistingBlob(containerName, blobName, new ConcurrentState { Value = 1 });
+
+ var projector = new ConcurrentModificationProjector(
+ fixture.BlobServiceClient,
+ containerName,
+ messWithState: async() => {
+ // Simulate concurrent modification: modify the blob directly with a different value
+ var modifiedState = new ConcurrentState { Value = 999 };
+ var modifiedJson = JsonSerializer.SerializeToUtf8Bytes(modifiedState);
+ var blobClient = GetContainer(containerName).GetBlobClient(blobName);
+ await blobClient.UploadAsync(new MemoryStream(modifiedJson), overwrite: true);
+ },
+ onCall: 2);
+ var context = CreateContext(new TestEvent { Value = 10 });
+
+ // First update should succeed
+ var result1 = await projector.HandleEvent(context);
+ await AssertSuccess(result1);
+
+ // This should now fail with 412 because the ETag won't match
+ var result2 = await projector.HandleEvent(context);
+ await StorageBlobsProjectorTests.AssertFailure(result2);
+ }
+
+ // ========== IDEMPOTENCY TESTS ==========
+
+ [Test]
+ public async Task Idempotency_ByMessageId_ShouldIgnoreDuplicateMessage() {
+ // Arrange
+ var containerName = await SetupContainer("idempotency-messageid");
+ var blobName = $"{DefaultStream}/SyncState.json";
+
+ var projector = new IdempotencyProjector(fixture.BlobServiceClient, containerName, IdempotencyMode.ByMessageId);
+ var messageId = Guid.NewGuid().ToString();
+
+ // First context with specific message ID
+ var context1 = CreateContext(new TestEvent { Value = 10 }, messageId: messageId);
+
+ // Act - first processing should succeed
+ var result1 = await projector.HandleEvent(context1);
+ await AssertSuccess(result1);
+
+ var state1 = await GetBlobState(containerName, blobName);
+ await Assert.That(state1.Value).IsEqualTo(10);
+
+ // Second context with SAME message ID (duplicate)
+ var context2 = CreateContext(new TestEvent { Value = 20 }, messageId: messageId);
+
+ // Act - second processing should be ignored
+ var result2 = await projector.HandleEvent(context2);
+ await AssertIgnored(result2);
+
+ // State should NOT have been updated (still 10, not 30)
+ var state2 = await GetBlobState(containerName, blobName);
+ await Assert.That(state2.Value).IsEqualTo(10);
+ }
+
+ [Test]
+ public async Task Idempotency_ByMessageId_ShouldProcessDifferentMessageId() {
+ // Arrange
+ var containerName = await SetupContainer("idempotency-messageid-different");
+ var blobName = $"{DefaultStream}/SyncState.json";
+
+ var projector = new IdempotencyProjector(fixture.BlobServiceClient, containerName, IdempotencyMode.ByMessageId);
+
+ var messageId1 = Guid.NewGuid().ToString();
+ var context1 = CreateContext(new TestEvent { Value = 10 }, messageId: messageId1);
+
+ // Act - first message
+ var result1 = await projector.HandleEvent(context1);
+ await AssertSuccess(result1);
+
+ var state1 = await GetBlobState(containerName, blobName);
+ await Assert.That(state1.Value).IsEqualTo(10);
+
+ // Different message ID
+ var messageId2 = Guid.NewGuid().ToString();
+ var context2 = CreateContext(new TestEvent { Value = 20 }, messageId: messageId2);
+
+ // Act - different message should be processed
+ var result2 = await projector.HandleEvent(context2);
+ await AssertSuccess(result2);
+
+ // State should have been updated (10 + 20 = 30)
+ var state2 = await GetBlobState(containerName, blobName);
+ await Assert.That(state2.Value).IsEqualTo(30);
+ }
+
+ [Test]
+ public async Task Idempotency_ByGlobalPosition_ShouldIgnoreDuplicatePosition() {
+ // Arrange
+ var containerName = await SetupContainer("idempotency-globalposition");
+ var blobName = $"{DefaultStream}/SyncState.json";
+
+ var projector = new IdempotencyProjector(fixture.BlobServiceClient, containerName, IdempotencyMode.ByGlobalPosition);
+
+ // First context with specific global position
+ var context1 = CreateContext(new TestEvent { Value = 10 }, globalPosition: 100u);
+
+ // Act - first processing should succeed
+ var result1 = await projector.HandleEvent(context1);
+ await AssertSuccess(result1);
+
+ var state1 = await GetBlobState(containerName, blobName);
+ await Assert.That(state1.Value).IsEqualTo(10);
+
+ // Second context with SAME global position (duplicate)
+ var context2 = CreateContext(new TestEvent { Value = 20 }, globalPosition: 100u);
+
+ // Act - second processing should be ignored
+ var result2 = await projector.HandleEvent(context2);
+ await AssertIgnored(result2);
+
+ // State should NOT have been updated (still 10, not 30)
+ var state2 = await GetBlobState(containerName, blobName);
+ await Assert.That(state2.Value).IsEqualTo(10);
+ }
+
+ [Test]
+ public async Task Idempotency_ByGlobalPosition_ShouldProcessDifferentPosition() {
+ // Arrange
+ var containerName = await SetupContainer("idempotency-globalposition-different");
+ var blobName = $"{DefaultStream}/SyncState.json";
+
+ var projector = new IdempotencyProjector(fixture.BlobServiceClient, containerName, IdempotencyMode.ByGlobalPosition);
+
+ // First context with specific global position
+ var context1 = CreateContext(new TestEvent { Value = 10 }, globalPosition: 100);
+
+ // Act - first processing should succeed
+ var result1 = await projector.HandleEvent(context1);
+ await AssertSuccess(result1);
+
+ var state1 = await GetBlobState(containerName, blobName);
+ await Assert.That(state1.Value).IsEqualTo(10);
+
+ // Different global position
+ var context2 = CreateContext(new TestEvent { Value = 20 }, globalPosition: 101u);
+
+ // Act - different position should be processed
+ var result2 = await projector.HandleEvent(context2);
+ await AssertSuccess(result2);
+
+ // State should have been updated (10 + 20 = 30)
+ var state2 = await GetBlobState(containerName, blobName);
+ await Assert.That(state2.Value).IsEqualTo(30);
+ }
+
+ [Test]
+ public async Task Idempotency_None_ShouldAlwaysProcess() {
+ // Arrange - explicitly set to None (which is also the default)
+ var containerName = await SetupContainer("idempotency-none");
+ var blobName = $"{DefaultStream}/SyncState.json";
+
+ var projector = new IdempotencyProjector(fixture.BlobServiceClient, containerName, IdempotencyMode.None);
+ var messageId = Guid.NewGuid().ToString();
+
+ // First context
+ var context1 = CreateContext(new TestEvent { Value = 10 }, messageId: messageId);
+
+ // Act
+ var result1 = await projector.HandleEvent(context1);
+ await AssertSuccess(result1);
+
+ var state1 = await GetBlobState(containerName, blobName);
+ await Assert.That(state1.Value).IsEqualTo(10);
+
+ // Second context with SAME message ID - should still process
+ var context2 = CreateContext(new TestEvent { Value = 20 }, messageId: messageId);
+
+ // Act - should process even with same message ID
+ var result2 = await projector.HandleEvent(context2);
+ await AssertSuccess(result2);
+
+ // State should have been updated (10 + 20 = 30) - no idempotency
+ var state2 = await GetBlobState(containerName, blobName);
+ await Assert.That(state2.Value).IsEqualTo(30);
+ }
+
+ // ========== TEST CONTEXT FACTORY ==========
+
+ static IMessageConsumeContext CreateContext(object message, string? messageId = null, ulong globalPosition = 0) =>
+ new MessageConsumeContext(
+ eventId: messageId ?? Guid.NewGuid().ToString(),
+ eventType: message.GetType().Name,
+ contentType: "application/json",
+ stream: DefaultStream,
+ eventNumber: 0,
+ streamPosition: 0,
+ globalPosition: globalPosition,
+ sequence: 0,
+ created: DateTime.UtcNow,
+ message: message,
+ metadata: new Metadata(),
+ subscriptionId: "test-subscription",
+ cancellationToken: CancellationToken.None
+ );
+
+ // ========== TEST STATE CLASSES ==========
+
+ class SyncState {
+ public int Value { get; set; }
+ public int Counter { get; set; }
+ }
+
+ class SyncContextState {
+ public int Value { get; set; }
+ public string StreamId { get; set; } = "";
+ }
+
+ class AsyncState {
+ public int Value { get; set; }
+ }
+
+ class AsyncContextState {
+ public int Value { get; set; }
+ public string EventName { get; set; } = "";
+ }
+
+ class ConcurrentState {
+ public int Value { get; set; }
+ }
+
+ class NoHandlerState { }
+
+ class CustomBlobIdState {
+ public int Value { get; set; }
+ }
+
+ // ========== TEST PROJECTOR CLASSES
+ // Intent: Each class name explicitly surfaces the handler pattern being tested ==========
+
+ ///
+ /// Tests sync handler: On(Func)
+ ///
+ class SyncStateProjector : StorageBlobsProjector {
+ public SyncStateProjector(BlobServiceClient serviceClient, string containerName)
+ : base(serviceClient, containerName) {
+ On((ctx, state) => {
+ state.Value += ctx.Message.Value;
+ state.Counter++;
+ return state;
+ });
+ }
+ }
+
+ ///
+ /// Tests sync context-aware handler: On(Func) with context access
+ ///
+ class SyncContextAwareProjector : StorageBlobsProjector {
+ public SyncContextAwareProjector(BlobServiceClient serviceClient, string containerName)
+ : base(serviceClient, containerName) {
+ On((ctx, state) => {
+ state.Value += ctx.Message.Value;
+ state.StreamId = ctx.Stream.GetId();
+ return state;
+ });
+ }
+ }
+
+ ///
+ /// Tests async handler: On(Func>)
+ ///
+ class AsyncStateProjector : StorageBlobsProjector {
+ public AsyncStateProjector(BlobServiceClient serviceClient, string containerName)
+ : base(serviceClient, containerName) {
+ On(async (ctx, state) => {
+ await Task.Delay(1);
+ state.Value += ctx.Message.Value;
+ return state;
+ });
+ }
+ }
+
+ ///
+ /// Tests async context-aware handler: On(Func>) with context access
+ ///
+ class AsyncContextAwareProjector : StorageBlobsProjector {
+ public AsyncContextAwareProjector(BlobServiceClient serviceClient, string containerName)
+ : base(serviceClient, containerName) {
+ On(async (ctx, state) => {
+ await Task.Delay(1);
+ state.Value += ctx.Message.Value;
+ state.EventName = ctx.Message.Name;
+ return state;
+ });
+ }
+ }
+
+ ///
+ /// Tests scenario with no handlers registered
+ ///
+ class NoHandlerProjector : StorageBlobsProjector {
+ public NoHandlerProjector(BlobServiceClient serviceClient, string containerName)
+ : base(serviceClient, containerName) { }
+ }
+
+ ///
+ /// Tests concurrent modification scenario with configurable race retries and onCall
+ ///
+ class ConcurrentModificationProjector : StorageBlobsProjector {
+ private int _callCount = 0;
+ private readonly Func? _messWithState;
+ private readonly int _onCall;
+
+ public ConcurrentModificationProjector(
+ BlobServiceClient serviceClient,
+ string containerName,
+ Func? messWithState = null,
+ int onCall = 1,
+ int raceRetries = 0
+ ) : base(serviceClient, containerName, projectorOptions: new StorageBlobProjectorOptions { RaceRetries = raceRetries }) {
+ _messWithState = messWithState;
+ _onCall = onCall;
+
+ On(async (ctx, state) => {
+ if (_messWithState != null && ++_callCount == _onCall)
+ await _messWithState();
+ state.Value += ctx.Message.Value;
+ return state;
+ });
+ }
+ }
+
+ ///
+ /// Tests custom blob ID using getBlobId parameter
+ ///
+ class CustomBlobIdProjector : StorageBlobsProjector {
+ public CustomBlobIdProjector(BlobServiceClient serviceClient, string containerName)
+ : base(serviceClient, containerName) {
+ On(async (ctx, state) => {
+ state.Value += ctx.Message.Value;
+ return state;
+ }, getBlobId: ctx => new ValueTask(ctx.Message.Id));
+ }
+ }
+
+ ///
+ /// Tests idempotency with configurable mode
+ ///
+ class IdempotencyProjector : StorageBlobsProjector {
+ public IdempotencyProjector(BlobServiceClient serviceClient, string containerName, IdempotencyMode mode)
+ : base(serviceClient, containerName, projectorOptions: new StorageBlobProjectorOptions { IdempotencyMode = mode }) {
+ On((ctx, state) => {
+ state.Value += ctx.Message.Value;
+ return state;
+ });
+ }
+ }
+}
diff --git a/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/TestEvent.cs b/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/TestEvent.cs
new file mode 100644
index 000000000..c41a76eb5
--- /dev/null
+++ b/src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/TestEvent.cs
@@ -0,0 +1,14 @@
+namespace Eventuous.Tests.Azure.Storage.Blobs;
+
+[EventType("V1.TestEvent")]
+public record TestEvent {
+ static TestEvent() => TypeMap.RegisterKnownEventTypes(typeof(TestEvent).Assembly);
+
+ public string Id { get; set; } = Guid.NewGuid().ToString();
+ public string Name { get; set; } = "Test Event";
+ public int Value { get; set; } = 42;
+
+ public static TestEvent Create() => new() { Id = "test-event", Name = "Test", Value = 1 };
+
+ public static TestEvent Create(int value) => new() { Id = "test-event", Name = "Test", Value = value };
+}