Integration Events with DotNetCore.CAP
Integration events enable eventual consistency across modules and services using DotNetCore.CAP. Unlike domain events (which execute in the same transaction), integration events are published after successful database saves and delivered asynchronously with automatic retries.
When to Use Integration Events
✅ Use Integration Events When:
-
Cross-module communication - Notifying other modules/services of state changes
// Equipment module publishes event; Checkout module subscribes
integrationEventPublisher.PublishAsync(
new EquipmentCreatedIntegrationEvent(equipmentId, name)); -
Operations that should succeed independently - Email/notification failures shouldn't block main operation
// User registration succeeds even if welcome email fails
var user = User.Create(email);
await userRepository.SaveAsync(user);
// These can fail independently with automatic CAP retries
await integrationEventPublisher.PublishAsync(
new UserRegisteredIntegrationEvent(user.Id, user.Email)); -
Asynchronous side effects - Long-running operations that can happen outside request cycle
// Order processing sends confirmation email asynchronously
order.Complete();
await repository.SaveAsync(order);
// CAP queues and delivers email async, with retries
await integrationEventPublisher.PublishAsync(
new OrderCompletedIntegrationEvent(order.Id));
❌ Don't Use Integration Events For:
- Operations that must fail together (use domain events)
- Requirements for strong consistency within same aggregate
- Immediate state updates that must be visible to caller
Architecture Overview
Integration events in PearDrop flow through DotNetCore.CAP, which:
- Publishes events to SQL Server or Redis
- Persists event metadata in database queue
- Delivers to all subscribed handlers with automatic retry
- Handles failures gracefully with exponential backoff
┌─────────────────────────────────────────────────────────────┐
│ 1. Command Handler publishes event after SaveChanges │
│ PublishAsync(new EquipmentCreatedIntegrationEvent(...)) │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 2. CAP persists to database queue or message broker │
│ • SQL Server: cap.capoutbox table │
│ • Redis: XADD to stream │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. CAP discovers and routes to subscribers [ICapSubscribe] │
│ • EquipmentCreatedSubscriber (Checkout module) │
│ • NotificationService (Messaging module) │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 4. Subscribers execute handlers with automatic retry │
│ • On failure: logged to cap.capreceivedmessage │
│ • Retry: exponential backoff (1s, 2s, 4s, 8s, 16s...) │
│ • After max retries: moved to failed queue for review │
└─────────────────────────────────────────────────────────────┘
Setup: Configuring CAP
Step 1: Install NuGet Packages
CAP requires the message broker (SQL Server or Redis) and base CAP package:
# Core CAP package
dotnet add package DotNetCore.CAP --version 7.2.1
# Message broker - choose ONE
# For SQL Server (built-in, uses existing database)
dotnet add package DotNetCore.CAP.SqlServer --version 7.2.1
# OR for Redis (distributed, high-performance)
dotnet add package DotNetCore.CAP.Redis --version 7.2.1
Step 2: Register CAP in Program.cs
PearDrop simplifies CAP registration via AddCapIntegrationEvents():
var builder = WebApplication.CreateBuilder(args);
// Get connection string for CAP persistence
var connectionString = builder.Configuration.GetConnectionString("PearDrop");
// Register CAP with SQL Server broker
builder.Services.AddCapIntegrationEvents(connectionString);
// OR with Redis broker (for distributed systems)
var redisConnection = builder.Configuration.GetConnectionString("Redis");
builder.Services.AddCapIntegrationEvents(connectionString, redisConnection);
var app = builder.Build();
app.Run();
Step 3: Database Setup
For SQL Server: CAP automatically creates tables on first run:
cap.capoutbox- Outgoing events waiting to be publishedcap.capreceivedmessage- Messages received by subscriberscap.capexpiredmessage- Failed events after max retries
For Redis: CAP uses Redis streams (no additional tables needed).
Publishing Integration Events
Define the Event
Integration events are immutable records implementing IIntegrationEvent:
// Location: Domain/{BoundedContext}/IntegrationEvents/
namespace Equipment.Domain.IntegrationEvents;
/// <summary>
/// Published when equipment is successfully created.
/// Other modules can subscribe to reserve capacity or update indexes.
/// </summary>
public sealed record EquipmentCreatedIntegrationEvent(
Guid EquipmentId,
string Name,
string Category,
DateTime CreatedAt) : IIntegrationEvent
{
// Event topic - use module constants for consistency
public string EventName => EquipmentCapTopics.EquipmentCreated;
}
Naming Convention:
- Suffix:
IntegrationEvent(e.g.,EquipmentCreatedIntegrationEvent) - Past tense: Describes what happened (Created, Updated, Deleted)
- Include all data subscribers will need (avoid forcing queries)
Define Module Constants
Create a constants file for all event topics in the module:
// Location: Constants/EquipmentCapTopics.cs
namespace Equipment.Constants;
/// <summary>
/// Event topic names for DotNetCore.CAP subscribers.
/// Change with caution - existing subscribers depend on these strings.
/// </summary>
public static class EquipmentCapTopics
{
public const string EquipmentCreated = "equipment.created";
public const string EquipmentDeleted = "equipment.deleted";
public const string EquipmentCategoryChanged = "equipment.category-changed";
}
Publish from Command Handler
After domain logic and SaveChanges, publish the event:
internal sealed class CreateEquipmentCommandHandler :
AuditableCommandHandler<CreateEquipmentCommand, CreateEquipmentCommandResult>
{
private readonly IIntegrationEventPublisher integrationEventPublisher;
public CreateEquipmentCommandHandler(
IEnumerable<IValidator<CreateEquipmentCommand>> validators,
ILogger<CreateEquipmentCommandHandler> logger,
ICommandStore commandStore,
IRepositoryFactory<Equipment> repositoryFactory,
IIntegrationEventPublisher integrationEventPublisher)
: base(validators, logger, commandStore, repositoryFactory)
{
this.integrationEventPublisher = integrationEventPublisher;
}
protected override async Task<CommandResult<CreateEquipmentCommandResult>>
HandleInternalWithRepository(
CreateEquipmentCommand request,
CancellationToken cancellationToken)
{
// Create and validate aggregate
var equipment = Equipment.Create(request.Name, request.Category);
this.Repository.Add(equipment);
// Save to database
var saveResult = await this.Repository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
if (!saveResult.IsSuccess)
{
return CommandResult<CreateEquipmentCommandResult>.Failed(saveResult.Error!);
}
// ✅ AFTER successful save, publish integration event
// If publish fails, event is queued and will retry automatically
await this.integrationEventPublisher.PublishAsync(
new EquipmentCreatedIntegrationEvent(
equipment.Id,
equipment.Name,
equipment.Category,
DateTime.UtcNow),
cancellationToken);
return CommandResult<CreateEquipmentCommandResult>.Succeeded(
new CreateEquipmentCommandResult(equipment.Id));
}
}
Key Points:
- Publish AFTER SaveChanges - Only after database confirms success
- Subscriber failures don't block - Equipment creation succeeds even if downstream handlers fail
- CAP handles persistence - Event queued automatically if subscriber unavailable
- Automatic retries - Failed deliveries retry with exponential backoff
Subscribing to Integration Events
Implement ICapSubscribe Handler
Subscribers implement ICapSubscribe with [CapSubscribe] decorators:
// Location: Infrastructure/EventHandlers/
// or Domain/{BoundedContext}/EventHandlers/
namespace Checkout.Infrastructure.EventHandlers;
using DotNetCore.CAP;
using Equipment.Domain.IntegrationEvents;
using Equipment.Constants;
public sealed class EquipmentCreatedSubscriber : ICapSubscribe
{
private readonly ILogger<EquipmentCreatedSubscriber> logger;
private readonly ICheckoutService checkoutService;
public EquipmentCreatedSubscriber(
ILogger<EquipmentCreatedSubscriber> logger,
ICheckoutService checkoutService)
{
this.logger = logger;
this.checkoutService = checkoutService;
}
// Use constant from Equipment module
[CapSubscribe(EquipmentCapTopics.EquipmentCreated)]
public async Task Handle(
EquipmentCreatedIntegrationEvent @event,
CancellationToken cancellationToken)
{
this.logger.LogInformation(
"Equipment created: {EquipmentId}, reserving checkout capacity",
@event.EquipmentId);
try
{
// Subscribe logic - executed with CAP's automatic retry
await this.checkoutService.ReserveCapacityAsync(
@event.EquipmentId,
cancellationToken);
this.logger.LogInformation(
"Successfully reserved capacity for {EquipmentId}",
@event.EquipmentId);
}
catch (Exception ex)
{
this.logger.LogError(
ex,
"Failed to reserve capacity for equipment {EquipmentId}",
@event.EquipmentId);
// Rethrow to trigger CAP retry
throw;
}
}
}
Handler Rules:
- Implement
ICapSubscribeinterface - Decorate method with
[CapSubscribe(topicName)] - Use module constants for topic names
- Async Task (no result needed)
- Receives event object and CancellationToken
- Throw exceptions to trigger retries - Don't swallow errors
Auto-Discovery
CAP automatically discovers all ICapSubscribe implementations at startup:
- No manual registration needed
- Scans entire assembly for subscribers
- Works across multiple projects/modules
Error Handling & Retries
Automatic Retry Policy
When a subscriber throws an exception, CAP automatically retries:
Initial attempt
↓ (if fails)
Retry 1: wait 1 second
↓ (if fails)
Retry 2: wait 2 seconds
↓ (if fails)
Retry 3: wait 4 seconds
↓ (if fails)
Retry 4: wait 8 seconds
↓ (if fails)
Retry 5: wait 16 seconds
↓ (if still fails after max retries)
Move to cap.capexpiredmessage (dead letter queue)
Configure Retry Policy
public static class CapServiceCollectionExtensions
{
public static IServiceCollection AddCapIntegrationEvents(
this IServiceCollection services,
string sqlServerConnectionString)
{
services.AddCap(options =>
{
// SQL Server as message broker
options.UseSqlServer(sqlServerConnectionString);
// Retry configuration
options.FailedRetryCount = 5; // Max 5 retries (default)
// After max retries, this callback is invoked
options.FailedThresholdCallback = failedMessage =>
{
// Log to monitoring/error tracking service
var logger = services.BuildServiceProvider()
.GetRequiredService<ILogger>();
logger.LogError(
"Integration event delivery failed after {RetryCount} attempts: {Topic}. " +
"Message: {Message}",
options.FailedRetryCount,
failedMessage.Name,
failedMessage.Content);
// TODO: Alert ops team, create incident ticket, etc
};
});
services.AddScoped<IIntegrationEventPublisher, CapIntegrationEventPublisher>();
return services;
}
}
Handle Failed Messages
Failed events are stored in cap.capreceivedmessage table:
-- View failed messages
SELECT Id, Name, Content, StatusName, ExceptionMessage, CreatedAt
FROM cap.capreceivedmessage
WHERE StatusName = 'Failed'
ORDER BY CreatedAt DESC;
-- Manually retry a failed message (CAP UI tools available)
-- Or rebuild service to auto-retry on startup
Message Broker Configuration
Option 1: SQL Server (Default)
Uses existing SQL Server database for event persistence:
// appsettings.json
{
"ConnectionStrings": {
"PearDrop": "Server=localhost,1440;Database=MyApp;..."
}
}
// Program.cs
services.AddCap(options =>
{
options.UseSqlServer(connectionString);
// Events stored in cap.capoutbox table
});
Pros:
- No additional infrastructure
- Uses existing database
- Simple to get started
Cons:
- Database load increases with high event volume
- Single-machine deployment only
Option 2: Redis (Distributed)
Uses Redis for high-performance message delivery:
// appsettings.json
{
"ConnectionStrings": {
"PearDrop": "Server=localhost,1440;Database=MyApp;...",
"Redis": "localhost:6379"
}
}
// Program.cs
var redisConnection = builder.Configuration.GetConnectionString("Redis");
services.AddCap(options =>
{
// SQL Server for outbox (durability)
options.UseSqlServer(connectionString);
// Redis for message broker (performance)
options.UseRedis(redisConnection);
});
Pros:
- High-performance message delivery
- Scales to multiple servers
- Lower database load
Cons:
- Requires Redis infrastructure
- Additional operational complexity
Real-World Workflow Example
Scenario: Equipment Creation with Multi-Module Coordination
Modules:
- Equipment - Creates and manages equipment
- Checkout - Tracks equipment availability
- Notifications - Sends emails
Step-by-Step Flow
1. Equipment Module Creates Equipment
// Equipment command handler
var equipment = Equipment.Create("Projector", "Electronics");
equipmentRepository.Add(equipment);
await equipmentRepository.UnitOfWork.SaveEntitiesAsync();
// Publish event
await integrationEventPublisher.PublishAsync(
new EquipmentCreatedIntegrationEvent(
equipment.Id, equipment.Name, equipment.Category, DateTime.UtcNow));
// ✅ Returns immediately to user - event delivery happens async
2. CAP Queues Event
cap.capoutbox:
├─ EventId: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
├─ EventName: equipment.created
├─ Content: {"equipmentId":"...", "name":"Projector", "category":"Electronics"}
├─ CreateTime: 2026-03-05 10:00:00
└─ ExpirationTime: 2026-03-12 10:00:00
3. Checkout Module Subscribes
[CapSubscribe("equipment.created")]
public async Task Handle(
EquipmentCreatedIntegrationEvent @event,
CancellationToken cancellationToken)
{
// Reserve capacity in checkout system
var checkout = CheckoutAvailability.Create(@event.EquipmentId);
checkoutRepository.Add(checkout);
await checkoutRepository.UnitOfWork.SaveEntitiesAsync();
}
4. Notifications Module Subscribes
[CapSubscribe("equipment.created")]
public async Task Handle(
EquipmentCreatedIntegrationEvent @event,
CancellationToken cancellationToken)
{
// Send admin notification email
await emailService.SendAsync(
"admin@myapp.com",
$"New equipment added: {event.Name}",
$"Equipment ID: {event.EquipmentId}");
}
5. Results
✅ Equipment created and visible to users
✅ Checkout capacity reserved (or retrying if temporary DB outage)
✅ Admin email sent (or queued for retry)
• All async - user doesn't wait
• All durable - events won't be lost if server restarts
• All resilient - automatic retries handle transient failures
Best Practices
| Practice | Why | Example |
|---|---|---|
| Use module constants | Prevents typos, easier refactoring | EquipmentCapTopics.EquipmentCreated |
| Publish AFTER SaveChanges | Ensures data committed before notifying others | See command handler example |
| Throw in subscribers | Triggers CAP retry on failure | throw; in catch block |
| Log before and after | Incident tracking and debugging | Use ILogger in subscriber |
| Include all needed data | Avoids subscribers querying original module | Pass full EquipmentId, name, category |
| Name events in past tense | Clarifies what happened | ✅ EquipmentCreated, ❌ CreateEquipment |
| Test locally with SQL Server | Simpler than Redis for development | Use docker-compose for local PostgreSQL |
| Monitor failed message queue | Alerts to integration issues | Schedule job to check cap.capexpiredmessage |
Troubleshooting
Event Not Delivered
Symptom: Subscriber not called after event published
Solutions:
- Check topic name matches - Compare
EventNameproperty with[CapSubscribe]attribute - Verify subscriber is registered - ICapSubscribe must be implemented correctly
- Check CAP queue -
SELECT * FROM cap.capoutbox - Enable CAP logging - Set
LogLevelto Debug for CAP namespace - Restart application - CAP discovers subscribers at startup
Events Failing with Retries Exhausted
Symptom: Events moved to cap.capexpiredmessage after 5 retries
Solutions:
- Check subscriber error logs - Look for exceptions in subscriber handler
- Verify dependencies - Email service, database connection, etc
- Increase retry count - If transient issue, increase
FailedRetryCount - Manual retry - Use CAP dashboard or database query to republish
- Fix root cause - If database connection issue, fix connection string
Next Steps
- Domain Events Deep Dive - When to use domain vs integration events
peardrop add integration-event- CLI to scaffold events- DotNetCore.CAP Documentation - Official CAP docs