Integration Events
Integration events enable eventual consistency across bounded contexts (modules) and external systems. They are published after SaveChanges() succeeds using the DotNetCore.CAP library for durable, reliable delivery.
What are Integration Events?
Integration events represent significant business occurrences that other modules or systems need to know about:
- Eventual Consistency: Published after transaction commits
- Non-Blocking: Cannot rollback the originating operation
- Asynchronous: Processed by subscribers independently
- Cross-Module: Communication between bounded contexts
- Durable: Persisted to SQL Server, survives process restarts
- Reliable: Automatic retries with exponential backoff
Use integration events when:
- Communicating across module/bounded context boundaries
- Sending external notifications (email, webhooks, SMS)
- Operations that shouldn't block the user's request
- Eventual consistency is acceptable
Don't use integration events for:
- Coordinating aggregates within the same context (use domain events)
- Operations requiring immediate rollback capability
- Strong consistency requirements
Integration Events vs Domain Events
| Aspect | Integration Events | Domain Events |
|---|---|---|
| When | After SaveChanges | Before SaveChanges |
| Transaction | Separate transaction | Same transaction |
| Consistency | Eventual | Strong |
| Can Rollback | No (already committed) | Yes (throw exception) |
| Scope | Cross-module | Within module |
| Library | DotNetCore.CAP | MediatR |
| Delivery | Asynchronous | Synchronous |
| Storage | SQL Server queue | In-memory |
Creating Integration Events
Step 1: Define Module Constants
CRITICAL: Always use module constants for topic names, never magic strings.
// Location: Constants/EquipmentCapTopics.cs
namespace MyApp.Module.Constants;
/// <summary>
/// CAP topic constants for Equipment module integration events.
/// All integration event names should reference these constants.
/// </summary>
public static class EquipmentCapTopics
{
public const string EquipmentCreated = "equipment.equipment-created";
public const string EquipmentStatusChanged = "equipment.status-changed";
public const string EquipmentCheckedOut = "equipment.checked-out";
public const string EquipmentCheckedIn = "equipment.checked-in";
}
Naming convention for topics:
- Format:
{module}.{aggregate}.{event}or{module}.{event} - Use kebab-case:
equipment.equipment-created, notEquipmentCreated - Be specific and descriptive
Step 2: Define Integration Event
Events implement IIntegrationEvent with an EventName property:
// Location: Domain/EquipmentAggregate/IntegrationEvents/EquipmentCreatedIntegrationEvent.cs
using PearDrop.Domain;
using MyApp.Module.Constants;
namespace MyApp.Module.Domain.EquipmentAggregate.IntegrationEvents;
/// <summary>
/// Published when equipment is successfully created.
/// Notifies other modules and external systems of new equipment.
/// </summary>
public sealed record EquipmentCreatedIntegrationEvent(
Guid EquipmentId,
string Name,
string Category,
EquipmentStatus Status,
DateTime CreatedAt) : IIntegrationEvent
{
// CRITICAL: Use module constant, NOT magic string
public string EventName => EquipmentCapTopics.EquipmentCreated;
}
Key points:
- Sealed record with positional parameters
- Implements
IIntegrationEvent EventNameproperty references module constant- Past tense naming:
EquipmentCreatedIntegrationEvent - Include all data subscribers will need
Step 3: Publish From Command Handler
Publish integration events after successful SaveChanges:
internal sealed class CreateEquipmentCommandHandler :
AuditableCommandHandler<CreateEquipmentCommand, CreateEquipmentCommandResult>
{
private readonly IIntegrationEventPublisher integrationEventPublisher;
private readonly TimeProvider timeProvider;
public CreateEquipmentCommandHandler(
IEnumerable<IValidator<CreateEquipmentCommand>> validators,
ILogger<CreateEquipmentCommandHandler> logger,
ICommandStore commandStore,
IRepositoryFactory<Equipment> repositoryFactory,
IIntegrationEventPublisher integrationEventPublisher,
TimeProvider timeProvider)
: base(validators, logger, commandStore, repositoryFactory)
{
this.integrationEventPublisher = integrationEventPublisher;
this.timeProvider = timeProvider;
}
protected override async Task<CommandResult<CreateEquipmentCommandResult>>
HandleInternalWithRepository(
CreateEquipmentCommand request,
CancellationToken cancellationToken)
{
// Collect events to publish
var integrationEventsToPublish = new List<IIntegrationEvent>();
// Create aggregate (may raise domain events)
var equipment = Equipment.Create(
request.Name,
request.Category,
timeProvider.GetUtcNow());
this.Repository.Add(equipment);
// Prepare integration event
integrationEventsToPublish.Add(new EquipmentCreatedIntegrationEvent(
equipment.Id,
equipment.Name,
equipment.Category,
equipment.Status,
timeProvider.GetUtcNow()));
// Save database changes (domain events dispatched first)
var result = await this.Repository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
if (result.IsSuccess)
{
// Publish events AFTER successful save (non-blocking)
foreach (var integrationEvent in integrationEventsToPublish)
{
await integrationEventPublisher.PublishAsync(
integrationEvent,
cancellationToken);
}
return CommandResult<CreateEquipmentCommandResult>.Succeeded(
new CreateEquipmentCommandResult(equipment.Id));
}
return CommandResult<CreateEquipmentCommandResult>.Failed(result.Error!);
}
}
Publishing pattern:
- Collect events to publish in a list
- Perform all domain logic and SaveChanges
- Only if SaveChanges succeeds, publish integration events
- Events persisted to CAP queue immediately
- Return success to caller (doesn't wait for subscribers)
Always publish integration events:
- After SaveChanges succeeds
- Before returning to caller
- Using
IIntegrationEventPublisher.PublishAsync() - In a loop if multiple events
CAP persists events to SQL Server and processes them asynchronously.
Creating CAP Subscribers
Subscribers listen for published events and react accordingly.
Define Subscriber Class
// Location: Infrastructure/EventHandlers/EquipmentCreatedSubscriber.cs
using DotNetCore.CAP;
using Microsoft.Extensions.Logging;
using MyApp.Module.Constants;
using MyApp.Module.Domain.EquipmentAggregate.IntegrationEvents;
namespace MyApp.Module.Infrastructure.EventHandlers;
/// <summary>
/// Handles EquipmentCreatedIntegrationEvent to send notifications.
/// Processes events asynchronously after they're published.
/// </summary>
public sealed class EquipmentCreatedSubscriber : ICapSubscribe
{
private readonly ILogger<EquipmentCreatedSubscriber> logger;
private readonly IEmailService emailService;
public EquipmentCreatedSubscriber(
ILogger<EquipmentCreatedSubscriber> logger,
IEmailService emailService)
{
this.logger = logger;
this.emailService = emailService;
}
/// <summary>
/// Handles EquipmentCreatedIntegrationEvent.
/// Topic name must match event.EventName from module constants.
/// </summary>
[CapSubscribe(EquipmentCapTopics.EquipmentCreated)] // Use constant!
public async Task Handle(
EquipmentCreatedIntegrationEvent @event,
CancellationToken cancellationToken)
{
this.logger.LogInformation(
"Processing EquipmentCreated event for {EquipmentId}",
@event.EquipmentId);
try
{
// Send notification email
await emailService.SendEquipmentCreatedNotificationAsync(
@event.Name,
@event.Category,
cancellationToken);
this.logger.LogInformation(
"Successfully processed EquipmentCreated event for {EquipmentId}",
@event.EquipmentId);
}
catch (Exception ex)
{
this.logger.LogError(
ex,
"Error processing EquipmentCreated event for {EquipmentId}",
@event.EquipmentId);
throw; // Rethrow to trigger CAP retry
}
}
}
Subscriber characteristics:
- Implements
ICapSubscribeinterface - Methods decorated with
[CapSubscribe(topic)]attribute - Topic references module constant (critical!)
- Constructor injection for dependencies
- Async method with
CancellationToken - Rethrow exceptions to trigger retries
Always use module constants in CapSubscribe attribute:
// ✅ CORRECT: Module constant
[CapSubscribe(EquipmentCapTopics.EquipmentCreated)]
// ❌ WRONG: Magic string
[CapSubscribe("equipment.equipment-created")]
Magic strings create duplication and risk typos breaking event delivery.
CAP Library Configuration
SQL Server Storage
CAP stores events in SQL Server for durability:
// Location: Program.cs or Module registration
builder.Services.AddCapIntegrationEvents(
builder.Configuration.GetConnectionString("PearDrop")!);
This creates CAP tables in your database:
CAP.Published- Published eventsCAP.Received- Received events for subscribers
Retry Configuration
CAP automatically retries failed subscribers:
services.AddCap(options =>
{
options.UseSqlServer(connectionString);
// Retry configuration
options.FailedRetryCount = 5; // Retry 5 times
options.FailedRetryInterval = 60; // 60 seconds between retries
options.FailedThresholdCallback = failedInfo =>
{
// Callback after all retries exhausted
logger.LogError(
"Event {MessageType} failed after {RetryCount} retries: {Exception}",
failedInfo.MessageType,
failedInfo.Retr yCount,
failedInfo.Exception);
};
});
Retry behavior:
- Exponential backoff between retries
- Event marked as failed after max retries
FailedThresholdCallbackenables custom alerting
Cross-Module Communication
Integration events enable modules to communicate without direct dependencies:
Example: Auth Module → Files Module
Auth module publishes event:
// Auth module: User deleted
public sealed record UserDeletedIntegrationEvent(
Guid UserId,
string Email,
DateTime DeletedAt) : IIntegrationEvent
{
public string EventName => AuthenticationCapTopics.UserDeleted;
}
// Command handler publishes
if (result.IsSuccess)
{
await integrationEventPublisher.PublishAsync(
new UserDeletedIntegrationEvent(userId, email, DateTime.UtcNow),
cancellationToken);
}
Files module subscribes:
// Files module: Clean up user's files
public sealed class UserDeletedSubscriber : ICapSubscribe
{
[CapSubscribe(AuthenticationCapTopics.UserDeleted)]
public async Task Handle(
UserDeletedIntegrationEvent @event,
CancellationToken cancellationToken)
{
// Delete all files owned by the user
await fileService.DeleteUserFilesAsync(@event.UserId, cancellationToken);
}
}
Benefits:
- Auth module doesn't know about Files module
- Files module reacts autonomously
- Decoupled modules
- Easy to add more subscribers
Idempotency in Subscribers
Subscribers may receive duplicate events (network retries, etc.). Make handlers idempotent:
[CapSubscribe(EquipmentCapTopics.EquipmentCreated)]
public async Task Handle(EquipmentCreatedIntegrationEvent @event, CancellationToken ct)
{
// Check if already processed
var exists = await repository.ExistsAsync(@event.EquipmentId, ct);
if (exists)
{
logger.LogInformation("Event already processed for {EquipmentId}", @event.EquipmentId);
return; // Skip duplicate
}
// Process event
await ProcessEventAsync(@event, ct);
}
Testing Integration Events
Testing Event Publishing
[Fact]
public async Task CreateEquipment_ShouldPublishIntegrationEvent()
{
// Arrange
var mockPublisher = new Mock<IIntegrationEventPublisher>();
var handler = new CreateEquipmentCommandHandler(
/* ... */,
mockPublisher.Object,
timeProvider);
// Act
var result = await handler.Handle(command, CancellationToken.None);
// Assert
mockPublisher.Verify(
p => p.PublishAsync(
It.Is<EquipmentCreatedIntegrationEvent>(e =>
e.EquipmentId == result.Data!.EquipmentId),
It.IsAny<CancellationToken>()),
Times.Once);
}
Testing Subscribers
[Fact]
public async Task Handle_ShouldSendEmail()
{
// Arrange
var mockEmailService = new Mock<IEmailService>();
var subscriber = new EquipmentCreatedSubscriber(logger, mockEmailService.Object);
var evt = new EquipmentCreatedIntegrationEvent(
Guid.NewGuid(), "Test", "Tools", EquipmentStatus.Available, DateTime.UtcNow);
// Act
await subscriber.Handle(evt, CancellationToken.None);
// Assert
mockEmailService.Verify(
s => s.SendEquipmentCreatedNotificationAsync(
"Test",
"Tools",
It.IsAny<CancellationToken>()),
Times.Once);
}
Generating Integration Events with CLI
# Generate integration event with subscriber
peardrop add integration-event EquipmentCreated \
--aggregate Equipment \
--properties "EquipmentId:Guid,Name:string,Category:string"
# Generated files:
# - Domain/EquipmentAggregate/IntegrationEvents/EquipmentCreatedIntegrationEvent.cs
# - Infrastructure/EventHandlers/EquipmentCreatedSubscriber.cs
# - Constants/EquipmentCapTopics.cs (updated or created)
# Cross-library subscriber (different project)
peardrop add integration-event EquipmentCreated \
--aggregate Equipment \
--handler-project ../OtherModule
# Event only (no subscriber)
peardrop add integration-event EquipmentCreated \
--aggregate Equipment \
--no-handler
The CLI generates:
- Event record implementing
IIntegrationEvent - Module constants class with topic name
- CAP subscriber implementing
ICapSubscribe - Proper
[CapSubscribe(constant)]attribute - Logger injection and error handling
No manual configuration needed - CAP auto-discovers ICapSubscribe implementations.
Best Practices
✅ DO
-
Use module constants for topics
[CapSubscribe(EquipmentCapTopics.EquipmentCreated)] // ✅ -
Publish after SaveChanges succeeds
if (result.IsSuccess)
{
await integrationEventPublisher.PublishAsync(evt, ct);
} -
Rethrow exceptions to trigger retries
catch (Exception ex)
{
logger.LogError(ex, "Processing failed");
throw; // CAP will retry
} -
Make subscribers idempotent
if (await repository.ExistsAsync(id)) return; // Skip duplicate -
Include all necessary data in events
public sealed record EventName(
Guid Id,
string Name, // Include data subscribers need
DateTime Timestamp) : IIntegrationEvent;
❌ DON'T
-
Don't use magic strings in CapSubscribe
[CapSubscribe("equipment-created")] // ❌ Use constant -
Don't publish before SaveChanges
await publisher.PublishAsync(evt); // ❌ Too early
await repository.SaveChangesAsync(); // Event published even if this fails -
Don't swallow exceptions
catch (Exception ex)
{
logger.LogError(ex, "...");
// Missing throw - CAP won't retry ❌
} -
Don't block on subscriber processing
- Subscribers process asynchronously
- Don't expect immediate completion
-
Don't create circular event chains
- Module A publishes → Module B subscribes and publishes → Module A subscribes
- Can cause infinite loops
Monitoring CAP Events
CAP Dashboard (Optional)
Enable CAP's web dashboard to monitor events:
services.AddCap(options =>
{
options.UseDashboard(); // Enable dashboard at /cap
});
Access at https://yourapp/cap to view:
- Published events
- Received events
- Failed events and errors
- Retry status
Logging
CAP logs all event processing:
logger.LogInformation("Published {EventName}", @event.EventName);
logger.LogInformation("Received {EventName}", notification.EventName);
logger.LogError("Failed processing {EventName}: {Error}", eventName, ex.Message);
Summary
✅ Eventual Consistency: Integration events published after SaveChanges
✅ CAP Library: Durable delivery with SQL Server storage
✅ Module Constants: Always use constants for topic names
✅ Non-Blocking: Cannot rollback originating operation
✅ Asynchronous: Subscribers process independently with retries
✅ Cross-Module: Decouple modules via event-driven communication
✅ CLI Support: Generate events and subscribers with proper patterns
Integration events enable reliable, decoupled communication across bounded contexts while maintaining system resilience through durable storage and automatic retries.
Further Reading
- Domain Events - Within-context, transactional events
- Bounded Contexts - Module boundaries and communication
- Domain-Driven Design - Overall DDD approach in PearDrop
- CAP Subscriber Pattern Guide - Advanced CAP patterns