Skip to main content

Integration Events

Integration events enable eventual consistency across bounded contexts (modules) and external systems. They are published after SaveChanges() succeeds using the DotNetCore.CAP library for durable, reliable delivery.

What are Integration Events?

Integration events represent significant business occurrences that other modules or systems need to know about:

  • Eventual Consistency: Published after transaction commits
  • Non-Blocking: Cannot rollback the originating operation
  • Asynchronous: Processed by subscribers independently
  • Cross-Module: Communication between bounded contexts
  • Durable: Persisted to SQL Server, survives process restarts
  • Reliable: Automatic retries with exponential backoff

Use integration events when:

  • Communicating across module/bounded context boundaries
  • Sending external notifications (email, webhooks, SMS)
  • Operations that shouldn't block the user's request
  • Eventual consistency is acceptable

Don't use integration events for:

  • Coordinating aggregates within the same context (use domain events)
  • Operations requiring immediate rollback capability
  • Strong consistency requirements

Integration Events vs Domain Events

AspectIntegration EventsDomain Events
WhenAfter SaveChangesBefore SaveChanges
TransactionSeparate transactionSame transaction
ConsistencyEventualStrong
Can RollbackNo (already committed)Yes (throw exception)
ScopeCross-moduleWithin module
LibraryDotNetCore.CAPMediatR
DeliveryAsynchronousSynchronous
StorageSQL Server queueIn-memory

Creating Integration Events

Step 1: Define Module Constants

CRITICAL: Always use module constants for topic names, never magic strings.

// Location: Constants/EquipmentCapTopics.cs
namespace MyApp.Module.Constants;

/// <summary>
/// CAP topic constants for Equipment module integration events.
/// All integration event names should reference these constants.
/// </summary>
public static class EquipmentCapTopics
{
public const string EquipmentCreated = "equipment.equipment-created";
public const string EquipmentStatusChanged = "equipment.status-changed";
public const string EquipmentCheckedOut = "equipment.checked-out";
public const string EquipmentCheckedIn = "equipment.checked-in";
}

Naming convention for topics:

  • Format: {module}.{aggregate}.{event} or {module}.{event}
  • Use kebab-case: equipment.equipment-created, not EquipmentCreated
  • Be specific and descriptive

Step 2: Define Integration Event

Events implement IIntegrationEvent with an EventName property:

// Location: Domain/EquipmentAggregate/IntegrationEvents/EquipmentCreatedIntegrationEvent.cs
using PearDrop.Domain;
using MyApp.Module.Constants;

namespace MyApp.Module.Domain.EquipmentAggregate.IntegrationEvents;

/// <summary>
/// Published when equipment is successfully created.
/// Notifies other modules and external systems of new equipment.
/// </summary>
public sealed record EquipmentCreatedIntegrationEvent(
Guid EquipmentId,
string Name,
string Category,
EquipmentStatus Status,
DateTime CreatedAt) : IIntegrationEvent
{
// CRITICAL: Use module constant, NOT magic string
public string EventName => EquipmentCapTopics.EquipmentCreated;
}

Key points:

  • Sealed record with positional parameters
  • Implements IIntegrationEvent
  • EventName property references module constant
  • Past tense naming: EquipmentCreatedIntegrationEvent
  • Include all data subscribers will need

Step 3: Publish From Command Handler

Publish integration events after successful SaveChanges:

internal sealed class CreateEquipmentCommandHandler : 
AuditableCommandHandler<CreateEquipmentCommand, CreateEquipmentCommandResult>
{
private readonly IIntegrationEventPublisher integrationEventPublisher;
private readonly TimeProvider timeProvider;

public CreateEquipmentCommandHandler(
IEnumerable<IValidator<CreateEquipmentCommand>> validators,
ILogger<CreateEquipmentCommandHandler> logger,
ICommandStore commandStore,
IRepositoryFactory<Equipment> repositoryFactory,
IIntegrationEventPublisher integrationEventPublisher,
TimeProvider timeProvider)
: base(validators, logger, commandStore, repositoryFactory)
{
this.integrationEventPublisher = integrationEventPublisher;
this.timeProvider = timeProvider;
}

protected override async Task<CommandResult<CreateEquipmentCommandResult>>
HandleInternalWithRepository(
CreateEquipmentCommand request,
CancellationToken cancellationToken)
{
// Collect events to publish
var integrationEventsToPublish = new List<IIntegrationEvent>();

// Create aggregate (may raise domain events)
var equipment = Equipment.Create(
request.Name,
request.Category,
timeProvider.GetUtcNow());

this.Repository.Add(equipment);

// Prepare integration event
integrationEventsToPublish.Add(new EquipmentCreatedIntegrationEvent(
equipment.Id,
equipment.Name,
equipment.Category,
equipment.Status,
timeProvider.GetUtcNow()));

// Save database changes (domain events dispatched first)
var result = await this.Repository.UnitOfWork.SaveEntitiesAsync(cancellationToken);

if (result.IsSuccess)
{
// Publish events AFTER successful save (non-blocking)
foreach (var integrationEvent in integrationEventsToPublish)
{
await integrationEventPublisher.PublishAsync(
integrationEvent,
cancellationToken);
}

return CommandResult<CreateEquipmentCommandResult>.Succeeded(
new CreateEquipmentCommandResult(equipment.Id));
}

return CommandResult<CreateEquipmentCommandResult>.Failed(result.Error!);
}
}

Publishing pattern:

  1. Collect events to publish in a list
  2. Perform all domain logic and SaveChanges
  3. Only if SaveChanges succeeds, publish integration events
  4. Events persisted to CAP queue immediately
  5. Return success to caller (doesn't wait for subscribers)
Integration Event Pattern

Always publish integration events:

  • After SaveChanges succeeds
  • Before returning to caller
  • Using IIntegrationEventPublisher.PublishAsync()
  • In a loop if multiple events

CAP persists events to SQL Server and processes them asynchronously.

Creating CAP Subscribers

Subscribers listen for published events and react accordingly.

Define Subscriber Class

// Location: Infrastructure/EventHandlers/EquipmentCreatedSubscriber.cs
using DotNetCore.CAP;
using Microsoft.Extensions.Logging;
using MyApp.Module.Constants;
using MyApp.Module.Domain.EquipmentAggregate.IntegrationEvents;

namespace MyApp.Module.Infrastructure.EventHandlers;

/// <summary>
/// Handles EquipmentCreatedIntegrationEvent to send notifications.
/// Processes events asynchronously after they're published.
/// </summary>
public sealed class EquipmentCreatedSubscriber : ICapSubscribe
{
private readonly ILogger<EquipmentCreatedSubscriber> logger;
private readonly IEmailService emailService;

public EquipmentCreatedSubscriber(
ILogger<EquipmentCreatedSubscriber> logger,
IEmailService emailService)
{
this.logger = logger;
this.emailService = emailService;
}

/// <summary>
/// Handles EquipmentCreatedIntegrationEvent.
/// Topic name must match event.EventName from module constants.
/// </summary>
[CapSubscribe(EquipmentCapTopics.EquipmentCreated)] // Use constant!
public async Task Handle(
EquipmentCreatedIntegrationEvent @event,
CancellationToken cancellationToken)
{
this.logger.LogInformation(
"Processing EquipmentCreated event for {EquipmentId}",
@event.EquipmentId);

try
{
// Send notification email
await emailService.SendEquipmentCreatedNotificationAsync(
@event.Name,
@event.Category,
cancellationToken);

this.logger.LogInformation(
"Successfully processed EquipmentCreated event for {EquipmentId}",
@event.EquipmentId);
}
catch (Exception ex)
{
this.logger.LogError(
ex,
"Error processing EquipmentCreated event for {EquipmentId}",
@event.EquipmentId);

throw; // Rethrow to trigger CAP retry
}
}
}

Subscriber characteristics:

  • Implements ICapSubscribe interface
  • Methods decorated with [CapSubscribe(topic)] attribute
  • Topic references module constant (critical!)
  • Constructor injection for dependencies
  • Async method with CancellationToken
  • Rethrow exceptions to trigger retries
Module Constants Required

Always use module constants in CapSubscribe attribute:

// ✅ CORRECT: Module constant
[CapSubscribe(EquipmentCapTopics.EquipmentCreated)]

// ❌ WRONG: Magic string
[CapSubscribe("equipment.equipment-created")]

Magic strings create duplication and risk typos breaking event delivery.

CAP Library Configuration

SQL Server Storage

CAP stores events in SQL Server for durability:

// Location: Program.cs or Module registration
builder.Services.AddCapIntegrationEvents(
builder.Configuration.GetConnectionString("PearDrop")!);

This creates CAP tables in your database:

  • CAP.Published - Published events
  • CAP.Received - Received events for subscribers

Retry Configuration

CAP automatically retries failed subscribers:

services.AddCap(options =>
{
options.UseSqlServer(connectionString);

// Retry configuration
options.FailedRetryCount = 5; // Retry 5 times
options.FailedRetryInterval = 60; // 60 seconds between retries
options.FailedThresholdCallback = failedInfo =>
{
// Callback after all retries exhausted
logger.LogError(
"Event {MessageType} failed after {RetryCount} retries: {Exception}",
failedInfo.MessageType,
failedInfo.Retr yCount,
failedInfo.Exception);
};
});

Retry behavior:

  • Exponential backoff between retries
  • Event marked as failed after max retries
  • FailedThresholdCallback enables custom alerting

Cross-Module Communication

Integration events enable modules to communicate without direct dependencies:

Example: Auth Module → Files Module

Auth module publishes event:

// Auth module: User deleted
public sealed record UserDeletedIntegrationEvent(
Guid UserId,
string Email,
DateTime DeletedAt) : IIntegrationEvent
{
public string EventName => AuthenticationCapTopics.UserDeleted;
}

// Command handler publishes
if (result.IsSuccess)
{
await integrationEventPublisher.PublishAsync(
new UserDeletedIntegrationEvent(userId, email, DateTime.UtcNow),
cancellationToken);
}

Files module subscribes:

// Files module: Clean up user's files
public sealed class UserDeletedSubscriber : ICapSubscribe
{
[CapSubscribe(AuthenticationCapTopics.UserDeleted)]
public async Task Handle(
UserDeletedIntegrationEvent @event,
CancellationToken cancellationToken)
{
// Delete all files owned by the user
await fileService.DeleteUserFilesAsync(@event.UserId, cancellationToken);
}
}

Benefits:

  • Auth module doesn't know about Files module
  • Files module reacts autonomously
  • Decoupled modules
  • Easy to add more subscribers

Idempotency in Subscribers

Subscribers may receive duplicate events (network retries, etc.). Make handlers idempotent:

[CapSubscribe(EquipmentCapTopics.EquipmentCreated)]
public async Task Handle(EquipmentCreatedIntegrationEvent @event, CancellationToken ct)
{
// Check if already processed
var exists = await repository.ExistsAsync(@event.EquipmentId, ct);
if (exists)
{
logger.LogInformation("Event already processed for {EquipmentId}", @event.EquipmentId);
return; // Skip duplicate
}

// Process event
await ProcessEventAsync(@event, ct);
}

Testing Integration Events

Testing Event Publishing

[Fact]
public async Task CreateEquipment_ShouldPublishIntegrationEvent()
{
// Arrange
var mockPublisher = new Mock<IIntegrationEventPublisher>();
var handler = new CreateEquipmentCommandHandler(
/* ... */,
mockPublisher.Object,
timeProvider);

// Act
var result = await handler.Handle(command, CancellationToken.None);

// Assert
mockPublisher.Verify(
p => p.PublishAsync(
It.Is<EquipmentCreatedIntegrationEvent>(e =>
e.EquipmentId == result.Data!.EquipmentId),
It.IsAny<CancellationToken>()),
Times.Once);
}

Testing Subscribers

[Fact]
public async Task Handle_ShouldSendEmail()
{
// Arrange
var mockEmailService = new Mock<IEmailService>();
var subscriber = new EquipmentCreatedSubscriber(logger, mockEmailService.Object);
var evt = new EquipmentCreatedIntegrationEvent(
Guid.NewGuid(), "Test", "Tools", EquipmentStatus.Available, DateTime.UtcNow);

// Act
await subscriber.Handle(evt, CancellationToken.None);

// Assert
mockEmailService.Verify(
s => s.SendEquipmentCreatedNotificationAsync(
"Test",
"Tools",
It.IsAny<CancellationToken>()),
Times.Once);
}

Generating Integration Events with CLI

# Generate integration event with subscriber
peardrop add integration-event EquipmentCreated \
--aggregate Equipment \
--properties "EquipmentId:Guid,Name:string,Category:string"

# Generated files:
# - Domain/EquipmentAggregate/IntegrationEvents/EquipmentCreatedIntegrationEvent.cs
# - Infrastructure/EventHandlers/EquipmentCreatedSubscriber.cs
# - Constants/EquipmentCapTopics.cs (updated or created)

# Cross-library subscriber (different project)
peardrop add integration-event EquipmentCreated \
--aggregate Equipment \
--handler-project ../OtherModule

# Event only (no subscriber)
peardrop add integration-event EquipmentCreated \
--aggregate Equipment \
--no-handler
CLI Scaffolding

The CLI generates:

  • Event record implementing IIntegrationEvent
  • Module constants class with topic name
  • CAP subscriber implementing ICapSubscribe
  • Proper [CapSubscribe(constant)] attribute
  • Logger injection and error handling

No manual configuration needed - CAP auto-discovers ICapSubscribe implementations.

Best Practices

✅ DO

  • Use module constants for topics

    [CapSubscribe(EquipmentCapTopics.EquipmentCreated)]  // ✅
  • Publish after SaveChanges succeeds

    if (result.IsSuccess)
    {
    await integrationEventPublisher.PublishAsync(evt, ct);
    }
  • Rethrow exceptions to trigger retries

    catch (Exception ex)
    {
    logger.LogError(ex, "Processing failed");
    throw; // CAP will retry
    }
  • Make subscribers idempotent

    if (await repository.ExistsAsync(id)) return;  // Skip duplicate
  • Include all necessary data in events

    public sealed record EventName(
    Guid Id,
    string Name, // Include data subscribers need
    DateTime Timestamp) : IIntegrationEvent;

❌ DON'T

  • Don't use magic strings in CapSubscribe

    [CapSubscribe("equipment-created")]  // ❌ Use constant
  • Don't publish before SaveChanges

    await publisher.PublishAsync(evt);  // ❌ Too early
    await repository.SaveChangesAsync(); // Event published even if this fails
  • Don't swallow exceptions

    catch (Exception ex)
    {
    logger.LogError(ex, "...");
    // Missing throw - CAP won't retry ❌
    }
  • Don't block on subscriber processing

    • Subscribers process asynchronously
    • Don't expect immediate completion
  • Don't create circular event chains

    • Module A publishes → Module B subscribes and publishes → Module A subscribes
    • Can cause infinite loops

Monitoring CAP Events

CAP Dashboard (Optional)

Enable CAP's web dashboard to monitor events:

services.AddCap(options =>
{
options.UseDashboard(); // Enable dashboard at /cap
});

Access at https://yourapp/cap to view:

  • Published events
  • Received events
  • Failed events and errors
  • Retry status

Logging

CAP logs all event processing:

logger.LogInformation("Published {EventName}", @event.EventName);
logger.LogInformation("Received {EventName}", notification.EventName);
logger.LogError("Failed processing {EventName}: {Error}", eventName, ex.Message);

Summary

Eventual Consistency: Integration events published after SaveChanges
CAP Library: Durable delivery with SQL Server storage
Module Constants: Always use constants for topic names
Non-Blocking: Cannot rollback originating operation
Asynchronous: Subscribers process independently with retries
Cross-Module: Decouple modules via event-driven communication
CLI Support: Generate events and subscribers with proper patterns

Integration events enable reliable, decoupled communication across bounded contexts while maintaining system resilience through durable storage and automatic retries.

Further Reading