Skip to main content

Integration Events with DotNetCore.CAP

Integration events enable eventual consistency across modules and services using DotNetCore.CAP. Unlike domain events (which execute in the same transaction), integration events are published after successful database saves and delivered asynchronously with automatic retries.

When to Use Integration Events

Use Integration Events When:

  1. Cross-module communication - Notifying other modules/services of state changes

    // Equipment module publishes event; Checkout module subscribes
    integrationEventPublisher.PublishAsync(
    new EquipmentCreatedIntegrationEvent(equipmentId, name));
  2. Operations that should succeed independently - Email/notification failures shouldn't block main operation

    // User registration succeeds even if welcome email fails
    var user = User.Create(email);
    await userRepository.SaveAsync(user);

    // These can fail independently with automatic CAP retries
    await integrationEventPublisher.PublishAsync(
    new UserRegisteredIntegrationEvent(user.Id, user.Email));
  3. Asynchronous side effects - Long-running operations that can happen outside request cycle

    // Order processing sends confirmation email asynchronously
    order.Complete();
    await repository.SaveAsync(order);

    // CAP queues and delivers email async, with retries
    await integrationEventPublisher.PublishAsync(
    new OrderCompletedIntegrationEvent(order.Id));

Don't Use Integration Events For:

  • Operations that must fail together (use domain events)
  • Requirements for strong consistency within same aggregate
  • Immediate state updates that must be visible to caller

Architecture Overview

Integration events in PearDrop flow through DotNetCore.CAP, which:

  1. Publishes events to SQL Server or Redis
  2. Persists event metadata in database queue
  3. Delivers to all subscribed handlers with automatic retry
  4. Handles failures gracefully with exponential backoff
┌─────────────────────────────────────────────────────────────┐
│ 1. Command Handler publishes event after SaveChanges │
│ PublishAsync(new EquipmentCreatedIntegrationEvent(...)) │
└────────────────────────┬────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 2. CAP persists to database queue or message broker │
│ • SQL Server: cap.capoutbox table │
│ • Redis: XADD to stream │
└────────────────────────┬────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 3. CAP discovers and routes to subscribers [ICapSubscribe] │
│ • EquipmentCreatedSubscriber (Checkout module) │
│ • NotificationService (Messaging module) │
└────────────────────────┬────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 4. Subscribers execute handlers with automatic retry │
│ • On failure: logged to cap.capreceivedmessage │
│ • Retry: exponential backoff (1s, 2s, 4s, 8s, 16s...) │
│ • After max retries: moved to failed queue for review │
└─────────────────────────────────────────────────────────────┘

Setup: Configuring CAP

Step 1: Install NuGet Packages

CAP requires the message broker (SQL Server or Redis) and base CAP package:

# Core CAP package
dotnet add package DotNetCore.CAP --version 7.2.1

# Message broker - choose ONE
# For SQL Server (built-in, uses existing database)
dotnet add package DotNetCore.CAP.SqlServer --version 7.2.1

# OR for Redis (distributed, high-performance)
dotnet add package DotNetCore.CAP.Redis --version 7.2.1

Step 2: Register CAP in Program.cs

PearDrop simplifies CAP registration via AddCapIntegrationEvents():

var builder = WebApplication.CreateBuilder(args);

// Get connection string for CAP persistence
var connectionString = builder.Configuration.GetConnectionString("PearDrop");

// Register CAP with SQL Server broker
builder.Services.AddCapIntegrationEvents(connectionString);

// OR with Redis broker (for distributed systems)
var redisConnection = builder.Configuration.GetConnectionString("Redis");
builder.Services.AddCapIntegrationEvents(connectionString, redisConnection);

var app = builder.Build();
app.Run();

Step 3: Database Setup

For SQL Server: CAP automatically creates tables on first run:

  • cap.capoutbox - Outgoing events waiting to be published
  • cap.capreceivedmessage - Messages received by subscribers
  • cap.capexpiredmessage - Failed events after max retries

For Redis: CAP uses Redis streams (no additional tables needed).


Publishing Integration Events

Define the Event

Integration events are immutable records implementing IIntegrationEvent:

// Location: Domain/{BoundedContext}/IntegrationEvents/
namespace Equipment.Domain.IntegrationEvents;

/// <summary>
/// Published when equipment is successfully created.
/// Other modules can subscribe to reserve capacity or update indexes.
/// </summary>
public sealed record EquipmentCreatedIntegrationEvent(
Guid EquipmentId,
string Name,
string Category,
DateTime CreatedAt) : IIntegrationEvent
{
// Event topic - use module constants for consistency
public string EventName => EquipmentCapTopics.EquipmentCreated;
}

Naming Convention:

  • Suffix: IntegrationEvent (e.g., EquipmentCreatedIntegrationEvent)
  • Past tense: Describes what happened (Created, Updated, Deleted)
  • Include all data subscribers will need (avoid forcing queries)

Define Module Constants

Create a constants file for all event topics in the module:

// Location: Constants/EquipmentCapTopics.cs
namespace Equipment.Constants;

/// <summary>
/// Event topic names for DotNetCore.CAP subscribers.
/// Change with caution - existing subscribers depend on these strings.
/// </summary>
public static class EquipmentCapTopics
{
public const string EquipmentCreated = "equipment.created";
public const string EquipmentDeleted = "equipment.deleted";
public const string EquipmentCategoryChanged = "equipment.category-changed";
}

Publish from Command Handler

After domain logic and SaveChanges, publish the event:

internal sealed class CreateEquipmentCommandHandler : 
AuditableCommandHandler<CreateEquipmentCommand, CreateEquipmentCommandResult>
{
private readonly IIntegrationEventPublisher integrationEventPublisher;

public CreateEquipmentCommandHandler(
IEnumerable<IValidator<CreateEquipmentCommand>> validators,
ILogger<CreateEquipmentCommandHandler> logger,
ICommandStore commandStore,
IRepositoryFactory<Equipment> repositoryFactory,
IIntegrationEventPublisher integrationEventPublisher)
: base(validators, logger, commandStore, repositoryFactory)
{
this.integrationEventPublisher = integrationEventPublisher;
}

protected override async Task<CommandResult<CreateEquipmentCommandResult>>
HandleInternalWithRepository(
CreateEquipmentCommand request,
CancellationToken cancellationToken)
{
// Create and validate aggregate
var equipment = Equipment.Create(request.Name, request.Category);
this.Repository.Add(equipment);

// Save to database
var saveResult = await this.Repository.UnitOfWork.SaveEntitiesAsync(cancellationToken);

if (!saveResult.IsSuccess)
{
return CommandResult<CreateEquipmentCommandResult>.Failed(saveResult.Error!);
}

// ✅ AFTER successful save, publish integration event
// If publish fails, event is queued and will retry automatically
await this.integrationEventPublisher.PublishAsync(
new EquipmentCreatedIntegrationEvent(
equipment.Id,
equipment.Name,
equipment.Category,
DateTime.UtcNow),
cancellationToken);

return CommandResult<CreateEquipmentCommandResult>.Succeeded(
new CreateEquipmentCommandResult(equipment.Id));
}
}

Key Points:

  1. Publish AFTER SaveChanges - Only after database confirms success
  2. Subscriber failures don't block - Equipment creation succeeds even if downstream handlers fail
  3. CAP handles persistence - Event queued automatically if subscriber unavailable
  4. Automatic retries - Failed deliveries retry with exponential backoff

Subscribing to Integration Events

Implement ICapSubscribe Handler

Subscribers implement ICapSubscribe with [CapSubscribe] decorators:

// Location: Infrastructure/EventHandlers/
// or Domain/{BoundedContext}/EventHandlers/
namespace Checkout.Infrastructure.EventHandlers;

using DotNetCore.CAP;
using Equipment.Domain.IntegrationEvents;
using Equipment.Constants;

public sealed class EquipmentCreatedSubscriber : ICapSubscribe
{
private readonly ILogger<EquipmentCreatedSubscriber> logger;
private readonly ICheckoutService checkoutService;

public EquipmentCreatedSubscriber(
ILogger<EquipmentCreatedSubscriber> logger,
ICheckoutService checkoutService)
{
this.logger = logger;
this.checkoutService = checkoutService;
}

// Use constant from Equipment module
[CapSubscribe(EquipmentCapTopics.EquipmentCreated)]
public async Task Handle(
EquipmentCreatedIntegrationEvent @event,
CancellationToken cancellationToken)
{
this.logger.LogInformation(
"Equipment created: {EquipmentId}, reserving checkout capacity",
@event.EquipmentId);

try
{
// Subscribe logic - executed with CAP's automatic retry
await this.checkoutService.ReserveCapacityAsync(
@event.EquipmentId,
cancellationToken);

this.logger.LogInformation(
"Successfully reserved capacity for {EquipmentId}",
@event.EquipmentId);
}
catch (Exception ex)
{
this.logger.LogError(
ex,
"Failed to reserve capacity for equipment {EquipmentId}",
@event.EquipmentId);

// Rethrow to trigger CAP retry
throw;
}
}
}

Handler Rules:

  • Implement ICapSubscribe interface
  • Decorate method with [CapSubscribe(topicName)]
  • Use module constants for topic names
  • Async Task (no result needed)
  • Receives event object and CancellationToken
  • Throw exceptions to trigger retries - Don't swallow errors

Auto-Discovery

CAP automatically discovers all ICapSubscribe implementations at startup:

  • No manual registration needed
  • Scans entire assembly for subscribers
  • Works across multiple projects/modules

Error Handling & Retries

Automatic Retry Policy

When a subscriber throws an exception, CAP automatically retries:

Initial attempt
↓ (if fails)
Retry 1: wait 1 second
↓ (if fails)
Retry 2: wait 2 seconds
↓ (if fails)
Retry 3: wait 4 seconds
↓ (if fails)
Retry 4: wait 8 seconds
↓ (if fails)
Retry 5: wait 16 seconds
↓ (if still fails after max retries)
Move to cap.capexpiredmessage (dead letter queue)

Configure Retry Policy

public static class CapServiceCollectionExtensions
{
public static IServiceCollection AddCapIntegrationEvents(
this IServiceCollection services,
string sqlServerConnectionString)
{
services.AddCap(options =>
{
// SQL Server as message broker
options.UseSqlServer(sqlServerConnectionString);

// Retry configuration
options.FailedRetryCount = 5; // Max 5 retries (default)

// After max retries, this callback is invoked
options.FailedThresholdCallback = failedMessage =>
{
// Log to monitoring/error tracking service
var logger = services.BuildServiceProvider()
.GetRequiredService<ILogger>();

logger.LogError(
"Integration event delivery failed after {RetryCount} attempts: {Topic}. " +
"Message: {Message}",
options.FailedRetryCount,
failedMessage.Name,
failedMessage.Content);

// TODO: Alert ops team, create incident ticket, etc
};
});

services.AddScoped<IIntegrationEventPublisher, CapIntegrationEventPublisher>();
return services;
}
}

Handle Failed Messages

Failed events are stored in cap.capreceivedmessage table:

-- View failed messages
SELECT Id, Name, Content, StatusName, ExceptionMessage, CreatedAt
FROM cap.capreceivedmessage
WHERE StatusName = 'Failed'
ORDER BY CreatedAt DESC;

-- Manually retry a failed message (CAP UI tools available)
-- Or rebuild service to auto-retry on startup

Message Broker Configuration

Option 1: SQL Server (Default)

Uses existing SQL Server database for event persistence:

// appsettings.json
{
"ConnectionStrings": {
"PearDrop": "Server=localhost,1440;Database=MyApp;..."
}
}

// Program.cs
services.AddCap(options =>
{
options.UseSqlServer(connectionString);
// Events stored in cap.capoutbox table
});

Pros:

  • No additional infrastructure
  • Uses existing database
  • Simple to get started

Cons:

  • Database load increases with high event volume
  • Single-machine deployment only

Option 2: Redis (Distributed)

Uses Redis for high-performance message delivery:

// appsettings.json
{
"ConnectionStrings": {
"PearDrop": "Server=localhost,1440;Database=MyApp;...",
"Redis": "localhost:6379"
}
}

// Program.cs
var redisConnection = builder.Configuration.GetConnectionString("Redis");
services.AddCap(options =>
{
// SQL Server for outbox (durability)
options.UseSqlServer(connectionString);

// Redis for message broker (performance)
options.UseRedis(redisConnection);
});

Pros:

  • High-performance message delivery
  • Scales to multiple servers
  • Lower database load

Cons:

  • Requires Redis infrastructure
  • Additional operational complexity

Real-World Workflow Example

Scenario: Equipment Creation with Multi-Module Coordination

Modules:

  • Equipment - Creates and manages equipment
  • Checkout - Tracks equipment availability
  • Notifications - Sends emails

Step-by-Step Flow

1. Equipment Module Creates Equipment

// Equipment command handler
var equipment = Equipment.Create("Projector", "Electronics");
equipmentRepository.Add(equipment);
await equipmentRepository.UnitOfWork.SaveEntitiesAsync();

// Publish event
await integrationEventPublisher.PublishAsync(
new EquipmentCreatedIntegrationEvent(
equipment.Id, equipment.Name, equipment.Category, DateTime.UtcNow));

// ✅ Returns immediately to user - event delivery happens async

2. CAP Queues Event

cap.capoutbox:
├─ EventId: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
├─ EventName: equipment.created
├─ Content: {"equipmentId":"...", "name":"Projector", "category":"Electronics"}
├─ CreateTime: 2026-03-05 10:00:00
└─ ExpirationTime: 2026-03-12 10:00:00

3. Checkout Module Subscribes

[CapSubscribe("equipment.created")]
public async Task Handle(
EquipmentCreatedIntegrationEvent @event,
CancellationToken cancellationToken)
{
// Reserve capacity in checkout system
var checkout = CheckoutAvailability.Create(@event.EquipmentId);
checkoutRepository.Add(checkout);
await checkoutRepository.UnitOfWork.SaveEntitiesAsync();
}

4. Notifications Module Subscribes

[CapSubscribe("equipment.created")]
public async Task Handle(
EquipmentCreatedIntegrationEvent @event,
CancellationToken cancellationToken)
{
// Send admin notification email
await emailService.SendAsync(
"admin@myapp.com",
$"New equipment added: {event.Name}",
$"Equipment ID: {event.EquipmentId}");
}

5. Results

✅ Equipment created and visible to users
✅ Checkout capacity reserved (or retrying if temporary DB outage)
✅ Admin email sent (or queued for retry)

• All async - user doesn't wait
• All durable - events won't be lost if server restarts
• All resilient - automatic retries handle transient failures

Best Practices

PracticeWhyExample
Use module constantsPrevents typos, easier refactoringEquipmentCapTopics.EquipmentCreated
Publish AFTER SaveChangesEnsures data committed before notifying othersSee command handler example
Throw in subscribersTriggers CAP retry on failurethrow; in catch block
Log before and afterIncident tracking and debuggingUse ILogger in subscriber
Include all needed dataAvoids subscribers querying original modulePass full EquipmentId, name, category
Name events in past tenseClarifies what happened✅ EquipmentCreated, ❌ CreateEquipment
Test locally with SQL ServerSimpler than Redis for developmentUse docker-compose for local PostgreSQL
Monitor failed message queueAlerts to integration issuesSchedule job to check cap.capexpiredmessage

Troubleshooting

Event Not Delivered

Symptom: Subscriber not called after event published

Solutions:

  1. Check topic name matches - Compare EventName property with [CapSubscribe] attribute
  2. Verify subscriber is registered - ICapSubscribe must be implemented correctly
  3. Check CAP queue - SELECT * FROM cap.capoutbox
  4. Enable CAP logging - Set LogLevel to Debug for CAP namespace
  5. Restart application - CAP discovers subscribers at startup

Events Failing with Retries Exhausted

Symptom: Events moved to cap.capexpiredmessage after 5 retries

Solutions:

  1. Check subscriber error logs - Look for exceptions in subscriber handler
  2. Verify dependencies - Email service, database connection, etc
  3. Increase retry count - If transient issue, increase FailedRetryCount
  4. Manual retry - Use CAP dashboard or database query to republish
  5. Fix root cause - If database connection issue, fix connection string

Next Steps