Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
12e568e
initial aspire
quezlatch Jun 19, 2026
fb3c88a
test: add Eventuous.Tests.Azure.Storage.Blobs project with tests for …
quezlatch Jun 19, 2026
ceab0b4
refactor: extract duplication from StorageBlobsProjectorTests and sur…
quezlatch Jun 19, 2026
b154f98
feat: add constructor overload to StorageBlobsProjector that takes Bl…
quezlatch Jun 19, 2026
fa5ded0
fix tests
quezlatch Jun 19, 2026
6251ad9
make context typed in On methods
quezlatch Jun 19, 2026
7dc10ad
update azure sample to use blob storage
quezlatch Jun 19, 2026
e47c589
update aspire sql databases
quezlatch Jun 19, 2026
5f07032
add race condition check
quezlatch Jun 20, 2026
ad263da
refactor
quezlatch Jun 21, 2026
4872b2d
add projector options
quezlatch Jun 21, 2026
b70b26e
enhance blob name resolution in projection
quezlatch Jun 21, 2026
b09e9f0
adjust blob name generation
quezlatch Jun 21, 2026
83664b4
refactor
quezlatch Jun 22, 2026
2b6cdad
add xml docs
quezlatch Jun 22, 2026
86c9933
documentation
quezlatch Jun 22, 2026
ff371ba
add scalar as swagger/openapi client
quezlatch Jun 25, 2026
b0812fa
refactor
quezlatch Jun 25, 2026
5fbaf53
retries on race conditions
quezlatch Jun 25, 2026
3b1fbcb
tidy
quezlatch Jun 25, 2026
eae7f04
oops
quezlatch Jun 25, 2026
4022b4d
review feedback
quezlatch Jun 25, 2026
8471b94
add .NoContext()
quezlatch Jun 25, 2026
f21f2df
correct aspire http endpoints
quezlatch Jun 26, 2026
d5142c6
remove azure sample
quezlatch Jul 1, 2026
88b47c8
add idempotency functionality
quezlatch Jul 1, 2026
c07f9d5
refactor tests
quezlatch Jul 1, 2026
5406f30
update readme
quezlatch Jul 1, 2026
bafa903
Merge branch 'dev' into aspire-and-blob-storage
quezlatch Jul 1, 2026
4fe57c2
tidy
quezlatch Jul 1, 2026
f0c9a4a
tidy
quezlatch Jul 1, 2026
70cfd2d
update readme
quezlatch Jul 2, 2026
aa05e06
make options non-generic by removing serialisation overrides
quezlatch Jul 2, 2026
5bc95ad
do not use IOptions wrapper
quezlatch Jul 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
<TUnitVersion>0.77.3</TUnitVersion>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="Azure.Storage.Blobs" Version="12.29.0" />
<PackageVersion Include="BenchmarkDotNet" Version="0.14.0" />
<PackageVersion Include="FluentValidation" Version="12.0.0" />
<PackageVersion Include="IsExternalInit" Version="1.0.3" />
<PackageVersion Include="KurrentDB.Client" Version="1.3.0" />
<PackageVersion Include="Microsoft.Extensions.Azure" Version="1.14.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="$(MicrosoftExtensionsVer)" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="$(MicrosoftExtensionsVer)" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="$(MicrosoftExtensionsVer)" />
Expand All @@ -37,6 +39,7 @@
<PackageVersion Include="Microsoft.Extensions.Logging" Version="$(MicrosoftExtensionsVer)" />
<PackageVersion Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="$(MicrosoftExtensionsVer)" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageVersion Include="Scalar.Aspire" Version="0.11.0" />
<PackageVersion Include="Shouldly" Version="4.2.1" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="$(MicrosoftExtensionsVer)" />
<PackageVersion Include="System.Linq.AsyncEnumerable" Version="10.0.0" />
Expand All @@ -62,6 +65,7 @@
</ItemGroup>
<ItemGroup Label="Testcontainers">
<PackageVersion Include="Testcontainers" Version="$(TestcontainersVersion)" />
<PackageVersion Include="Testcontainers.Azurite" Version="$(TestcontainersVersion)" />
<PackageVersion Include="Testcontainers.KurrentDb" Version="$(TestcontainersVersion)" />
<PackageVersion Include="Testcontainers.Kafka" Version="$(TestcontainersVersion)" />
<PackageVersion Include="Testcontainers.MongoDb" Version="$(TestcontainersVersion)" />
Expand Down Expand Up @@ -112,6 +116,7 @@
<PackageVersion Include="OpenTelemetry.Exporter.Zipkin" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.15.2" />
<PackageVersion Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.15.1-beta.1" />
<PackageVersion Include="OpenTelemetry.Instrumentation.SqlClient" Version="1.15.2" />
<PackageVersion Include="Serilog.AspNetCore" Version="9.0.0" />
<PackageVersion Include="Serilog.Extensions.Hosting" Version="9.0.0" />
<PackageVersion Include="Serilog.Sinks.Console" Version="6.0.0" />
Expand Down
2 changes: 2 additions & 0 deletions Eventuous.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
<Folder Name="/Brokers/Azure/" />
<Folder Name="/Brokers/Azure/src/">
<Project Path="src/Azure/src/Eventuous.Azure.ServiceBus/Eventuous.Azure.ServiceBus.csproj" />
<Project Path="src/Azure/src/Eventuous.Azure.Storage.Blobs/Eventuous.Azure.Storage.Blobs.csproj" />
</Folder>
<Folder Name="/Brokers/Azure/test/">
<Project Path="src/Azure/test/Eventuous.Tests.Azure.ServiceBus/Eventuous.Tests.Azure.ServiceBus.csproj" />
<Project Path="src/Azure/test/Eventuous.Tests.Azure.Storage.Blobs/Eventuous.Tests.Azure.Storage.Blobs.csproj" />
</Folder>
<Folder Name="/Brokers/GooglePubSub/" />
<Folder Name="/Brokers/GooglePubSub/src/">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class ServiceBusSubscription : EventSubscription<ServiceBusSubscriptionOp
/// <param name="consumePipe">Consume pipe instance</param>
/// <param name="loggerFactory">Logger factory (optional)</param>
/// <param name="eventSerializer">Event serializer (optional)</param>
public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory, IEventSerializer? eventSerializer) :
public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOptions options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory, IEventSerializer? eventSerializer = null) :
base(options, consumePipe, loggerFactory, eventSerializer) {
_defaultErrorHandler = Options.ErrorHandler ?? DefaultErrorHandler;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageReadmeFile>README.md</PackageReadmeFile>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="$(CoreRoot)\Eventuous.Subscriptions\Eventuous.Subscriptions.csproj" />
</ItemGroup>
<ItemGroup>
<None Include="README.md" Pack="true" PackagePath="\" />
</ItemGroup>

<ItemGroup>
<Using Include="Azure.Storage.Blobs" />
<Using Include="Microsoft.Extensions.Logging" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(CoreRoot)\Eventuous.Shared\Tools\TaskExtensions.cs">
<Link>Tools\TaskExtensions.cs</Link>
</Compile>
<Compile Include="$(CoreRoot)\Eventuous.Shared\Tools\Ensure.cs">
<Link>Tools\Ensure.cs</Link>
</Compile>
<Using Include="Eventuous.Tools" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Eventuous.Tests.Azure.Storage.Blobs" />
</ItemGroup>

</Project>
112 changes: 112 additions & 0 deletions src/Azure/src/Eventuous.Azure.Storage.Blobs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Eventuous Azure Blob Storage Projections

This package adds Azure Blob Storage projections to applications built with Eventuous. It allows you to project event store events to Azure Blob Storage as state objects, maintaining a separate state document for each event stream.

## Using projections

Create your own projection class that inherits from `StorageBlobsProjector<T>` where `T` is your state type. The state type must be a class with a parameterless constructor.

Register event handlers using the `On<TEvent>` methods. When an event is received, the projector retrieves the current state blob (or creates a new state instance if the blob doesn't exist), applies the event to the state using the registered event handler, and uploads the updated state back to Blob Storage.

The class provides two constructors:

* `StorageBlobsProjector(BlobContainerClient container, ...` where the container client is passed directly
* `StorageBlobsProjector(BlobServiceClient serviceClient, string containerName, ...` where the service client is set up by Azure DI and the container name is set by the projection

By using `IOptions<JsonSerializerOptions>` we can also use the Json serialization options as set in ASP DI.

By default, the blob ID is extracted from the stream using `context.Stream.GetId()`. You can override this by providing a custom `getBlobId` function in the event registration:

```csharp
public class BookingProjection : StorageBlobsProjector<BookingState> {
public BookingProjection(BlobServiceClient client, IOptions<JsonSerializerOptions> serializerOptions)
: base(client, "bookings-container", serializerOptions.Value) {

// Uses default blob ID from stream
On<BookingImported>((state, evt) => {
state.RoomId = evt.RoomId;
state.CheckInDate = evt.CheckIn;
return state;
});

// Custom blob ID using event data
On<BookingPaymentRegistered>(
(state, evt) => {
state.PaidAmount += evt.AmountPaid;
return state;
},
context => new ValueTask<string>($"custom-{context.Message.BookingId}")
);
}
}
```

## Projector options

The `StorageBlobProjectorOptions<T>` class provides several configuration options for fine-tuning the projector behavior.

| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `JsonOptions` | `JsonSerializerOptions?` | `null` (uses `JsonSerializerOptions.Web`) | JSON serializer options for state serialization/deserialization. Controls formatting, naming policies, etc. |
| `RaceRetries` | `int` | `0` | Number of retry attempts for optimistic concurrency conflicts. Increase when concurrent updates are likely. |
| `IdempotencyMode` | `IdempotencyMode` | `IdempotencyMode.None` | Controls duplicate message detection behavior. |

### Idempotency modes

The `IdempotencyMode` enum controls how the projector handles duplicate messages:

- **`None`** - No idempotency checks. Always processes messages and updates blobs.
- **`ByGlobalPosition`** - Skips processing if existing blob has matching global position metadata.
- **`ByMessageId`** - Skips processing if existing blob has matching message ID metadata.

### Custom blob naming

By default, blob names are generated using `GetBlobName(string id)` which creates names in the format `{id}/{T}.json`, where `id` defaults to the stream ID from `context.Stream.GetId()`.

You can customize blob naming in two ways:

**1. Override the virtual methods globally for all events:**

```csharp
protected override string GetBlobName(string id, IMessageConsumeContext context) {
// Use stream name and type in the path
var streamName = context.Stream.ToString();
return $"projections/{streamName}/{id}.json";
}

protected override string GetBlobName(string id) {
return $"{id}/{typeof(T).Name}.json";
}
```

**2. Override blob ID per event handler using `getBlobId`:**

```csharp
On<BookingPaymentRegistered>(
(state, evt) => {
state.PaidAmount += evt.AmountPaid;
return state;
},
// Custom blob ID for this specific event only
context => new ValueTask<string>($"payments/{context.Message.BookingId}.json")
);
```

Use per-event blob ID overrides when you need different events to target different blob paths or naming conventions within the same projector, such as when the business identifier differs from the stream identifier.
## Features

- **Automatic state management** - Creates new state instances when blobs don't exist
- **Optimistic concurrency control** - Uses ETags for safe concurrent updates
- **Idempotency** - Prevents duplicate processing with configurable modes
- **Retry handling** - Automatic retries for race conditions
- **Flexible blob naming** - Customizable blob ID and naming conventions
- **Metadata storage** - Automatically stores stream info, positions, and message IDs

## Background

The projector stores each state as a separate blob in Azure Blob Storage. Each blob contains:
- The serialized state object (JSON by default)
- Metadata including stream name, message ID, stream position, and global position
- Content type set to `application/json`

This approach provides natural partitioning by stream and enables efficient state retrieval for individual streams.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System.Text.Json;

namespace Eventuous.Azure.Storage.Blobs;

/// <summary>
/// Options for configuring the storage blob projector.
/// </summary>
public class StorageBlobProjectorOptions {
/// <summary>
/// Gets or sets the JSON serializer options to use when serializing or deserializing projection state.
/// By default, the default JSON serializer options will be used if this property is not set.
/// </summary>
public JsonSerializerOptions? JsonOptions { get; set; }

/// <summary>
/// Gets or sets the number of retry attempts for race condition handling when saving projection state.
/// Default is 0 (no retries).
/// </summary>
public int RaceRetries { get; set; } = 0;

/// <summary>
/// Gets or sets the idempotency mode for the projector. When enabled, the projector will skip processing
/// if the blob already exists with a matching identifier (message ID or global position), preventing duplicate processing.
/// Default is <see cref="IdempotencyMode.None"/> (no idempotency checking).
/// </summary>
public IdempotencyMode IdempotencyMode { get; set; } = IdempotencyMode.None;
}

/// <summary>
/// Controls how the projection handles idempotency to prevent duplicate message processing.
/// </summary>
public enum IdempotencyMode {
/// <summary>
/// No idempotency checks. The projector will always process messages and update blobs.
/// Use when duplicate processing is acceptable or when external mechanisms ensure message uniqueness.
/// </summary>
None,

/// <summary>
/// Skips processing if the existing blob was created from a message at the same global position.
/// Uses the <c>GlobalPosition</c> metadata stored with the blob for comparison.
/// Effective for append-only event streams where global position uniquely identifies a message.
/// </summary>
ByGlobalPosition,

/// <summary>
/// Skips processing if the existing blob was created from the same message ID.
/// Uses the <c>MessageId</c> metadata stored with the blob for comparison.
/// More precise than position-based checks, works even if messages are processed out of order.
/// Especially from external message queues where global position may not be available or reliable.
/// </summary>
ByMessageId
}
Loading