By Appropri8 Team

Event-Driven Resilience: Building Robust Systems with Event Sourcing & Stateful Workflows

event-drivenevent-sourcingresiliencemicroservicesworkflowsarchitecturedistributed-systems

Modern distributed systems are complex. They span multiple services, handle millions of requests, and need to work even when things go wrong. Traditional request-response architectures struggle with this complexity. They’re brittle, hard to monitor, and difficult to recover from failures.

Event-driven architecture with event sourcing and stateful workflows offers a better way. Instead of trying to keep everything in sync, you let events tell the story of what happened. You build systems that can handle failures gracefully and recover automatically.

This approach isn’t new, but it’s gaining traction as systems get more complex. Companies are using it to handle everything from order processing to financial transactions. The key is understanding how to implement it properly.

The Problem with Traditional Architectures

Most systems start simple. You have a web app that talks to a database. When a user places an order, you update the database and send a confirmation email. This works fine for small systems.

But as you scale, problems emerge. What happens when the email service is down? What if the payment processor takes too long? How do you handle partial failures?

Traditional architectures try to solve this with transactions. You wrap everything in a database transaction and hope it works. But distributed transactions are slow, complex, and often fail. The two-phase commit protocol doesn’t scale.

You end up with systems that are hard to debug. When something goes wrong, you don’t know what state each service is in. You can’t easily replay what happened. You can’t handle long-running processes that span multiple services.

The Event-Driven Solution

Event-driven architecture flips this on its head. Instead of trying to keep everything in sync, you let events tell the story. When something happens, you publish an event. Other services listen to these events and react accordingly.

This approach has several benefits:

Resilience: If one service fails, others can continue working. Events are queued and processed when the service comes back online.

Auditability: Every event is logged. You can see exactly what happened and when.

Scalability: Services can process events independently. You can scale each service based on its workload.

Flexibility: New services can listen to existing events without changing existing code.

Event sourcing takes this further. Instead of storing the current state, you store all the events that led to that state. You can reconstruct any point in time by replaying events.

Understanding Event Sourcing

Event sourcing is simple in concept but powerful in practice. Instead of storing the current state of an object, you store all the events that changed that object.

Think about a bank account. Instead of storing the current balance, you store all the deposits and withdrawals. To get the current balance, you add up all the deposits and subtract all the withdrawals.

This might seem inefficient, but it has huge benefits:

Complete audit trail: You can see every change that ever happened.

Time travel: You can reconstruct the state at any point in time.

Debugging: When something goes wrong, you can see exactly what events led to the problem.

Compliance: Many industries require complete audit trails.

Flexibility: You can create new views of the data by processing events differently.

Here’s how it works in practice:

// Event definitions
interface OrderCreated {
  type: 'OrderCreated';
  orderId: string;
  customerId: string;
  items: OrderItem[];
  timestamp: Date;
}

interface PaymentProcessed {
  type: 'PaymentProcessed';
  orderId: string;
  amount: number;
  paymentId: string;
  timestamp: Date;
}

interface OrderShipped {
  type: 'OrderShipped';
  orderId: string;
  trackingNumber: string;
  timestamp: Date;
}

type OrderEvent = OrderCreated | PaymentProcessed | OrderShipped;

// Aggregate that handles events
class OrderAggregate {
  private events: OrderEvent[] = [];
  private orderId: string;
  private customerId: string;
  private items: OrderItem[];
  private status: OrderStatus;
  private paymentId?: string;
  private trackingNumber?: string;

  constructor(orderId: string) {
    this.orderId = orderId;
    this.status = OrderStatus.PENDING;
  }

  // Replay events to reconstruct state
  static fromEvents(events: OrderEvent[]): OrderAggregate {
    const order = new OrderAggregate(events[0].orderId);
    events.forEach(event => order.apply(event));
    return order;
  }

  // Apply an event to change state
  private apply(event: OrderEvent): void {
    this.events.push(event);

    switch (event.type) {
      case 'OrderCreated':
        this.customerId = event.customerId;
        this.items = event.items;
        this.status = OrderStatus.PENDING;
        break;
      case 'PaymentProcessed':
        this.paymentId = event.paymentId;
        this.status = OrderStatus.PAID;
        break;
      case 'OrderShipped':
        this.trackingNumber = event.trackingNumber;
        this.status = OrderStatus.SHIPPED;
        break;
    }
  }

  // Command methods that generate events
  createOrder(customerId: string, items: OrderItem[]): void {
    if (this.status !== OrderStatus.PENDING) {
      throw new Error('Order already exists');
    }

    const event: OrderCreated = {
      type: 'OrderCreated',
      orderId: this.orderId,
      customerId,
      items,
      timestamp: new Date()
    };

    this.apply(event);
  }

  processPayment(amount: number, paymentId: string): void {
    if (this.status !== OrderStatus.PENDING) {
      throw new Error('Order not in pending state');
    }

    const event: PaymentProcessed = {
      type: 'PaymentProcessed',
      orderId: this.orderId,
      amount,
      paymentId,
      timestamp: new Date()
    };

    this.apply(event);
  }

  shipOrder(trackingNumber: string): void {
    if (this.status !== OrderStatus.PAID) {
      throw new Error('Order not paid');
    }

    const event: OrderShipped = {
      type: 'OrderShipped',
      orderId: this.orderId,
      trackingNumber,
      timestamp: new Date()
    };

    this.apply(event);
  }

  // Get current state
  getState(): OrderState {
    return {
      orderId: this.orderId,
      customerId: this.customerId,
      items: this.items,
      status: this.status,
      paymentId: this.paymentId,
      trackingNumber: this.trackingNumber
    };
  }

  // Get all events for persistence
  getEvents(): OrderEvent[] {
    return [...this.events];
  }
}

Stateful Workflows and Orchestration

Event sourcing handles the “what happened” part. But complex business processes need more. They need to coordinate multiple services over time. They need to handle failures and retries. They need to manage long-running transactions.

This is where stateful workflows come in. A workflow is a state machine that coordinates multiple services to complete a business process.

There are two main approaches: orchestration and choreography.

Orchestration uses a central coordinator (the workflow engine) that tells each service what to do next. It’s like a conductor leading an orchestra.

Choreography lets each service decide what to do based on events. It’s like dancers who know the steps and coordinate themselves.

Both approaches have their place. Orchestration is better for complex processes with many decision points. Choreography is better for simple processes with loose coupling.

Here’s an example of orchestration using a workflow engine:

// Workflow definition
class OrderProcessingWorkflow {
  private workflowId: string;
  private orderId: string;
  private state: WorkflowState;
  private events: WorkflowEvent[] = [];

  constructor(workflowId: string, orderId: string) {
    this.workflowId = workflowId;
    this.orderId = orderId;
    this.state = WorkflowState.STARTED;
  }

  // Handle events and transition state
  async handleEvent(event: WorkflowEvent): Promise<void> {
    this.events.push(event);

    switch (this.state) {
      case WorkflowState.STARTED:
        if (event.type === 'OrderCreated') {
          await this.startPaymentProcess();
          this.state = WorkflowState.PAYMENT_PENDING;
        }
        break;

      case WorkflowState.PAYMENT_PENDING:
        if (event.type === 'PaymentSucceeded') {
          await this.startInventoryReservation();
          this.state = WorkflowState.INVENTORY_PENDING;
        } else if (event.type === 'PaymentFailed') {
          await this.cancelOrder();
          this.state = WorkflowState.CANCELLED;
        }
        break;

      case WorkflowState.INVENTORY_PENDING:
        if (event.type === 'InventoryReserved') {
          await this.startShippingProcess();
          this.state = WorkflowState.SHIPPING_PENDING;
        } else if (event.type === 'InventoryUnavailable') {
          await this.refundPayment();
          await this.cancelOrder();
          this.state = WorkflowState.CANCELLED;
        }
        break;

      case WorkflowState.SHIPPING_PENDING:
        if (event.type === 'OrderShipped') {
          await this.completeOrder();
          this.state = WorkflowState.COMPLETED;
        }
        break;
    }
  }

  // Start payment process
  private async startPaymentProcess(): Promise<void> {
    const paymentCommand = {
      type: 'ProcessPayment',
      orderId: this.orderId,
      amount: await this.getOrderAmount(),
      customerId: await this.getCustomerId()
    };

    await this.publishCommand(paymentCommand);
  }

  // Start inventory reservation
  private async startInventoryReservation(): Promise<void> {
    const inventoryCommand = {
      type: 'ReserveInventory',
      orderId: this.orderId,
      items: await this.getOrderItems()
    };

    await this.publishCommand(inventoryCommand);
  }

  // Start shipping process
  private async startShippingProcess(): Promise<void> {
    const shippingCommand = {
      type: 'ShipOrder',
      orderId: this.orderId,
      customerId: await this.getCustomerId(),
      items: await this.getOrderItems()
    };

    await this.publishCommand(shippingCommand);
  }

  // Compensation actions
  private async refundPayment(): Promise<void> {
    const refundCommand = {
      type: 'RefundPayment',
      orderId: this.orderId,
      paymentId: await this.getPaymentId()
    };

    await this.publishCommand(refundCommand);
  }

  private async cancelOrder(): Promise<void> {
    const cancelCommand = {
      type: 'CancelOrder',
      orderId: this.orderId,
      reason: 'Workflow failure'
    };

    await this.publishCommand(cancelCommand);
  }

  private async completeOrder(): Promise<void> {
    const completeCommand = {
      type: 'CompleteOrder',
      orderId: this.orderId
    };

    await this.publishCommand(completeCommand);
  }

  // Helper methods
  private async getOrderAmount(): Promise<number> {
    // Get from read model or event store
    return 100.00;
  }

  private async getCustomerId(): Promise<string> {
    // Get from read model or event store
    return 'customer-123';
  }

  private async getOrderItems(): Promise<OrderItem[]> {
    // Get from read model or event store
    return [];
  }

  private async getPaymentId(): Promise<string> {
    // Get from events
    const paymentEvent = this.events.find(e => e.type === 'PaymentSucceeded');
    return paymentEvent?.paymentId || '';
  }

  private async publishCommand(command: any): Promise<void> {
    // Publish to command bus
    console.log('Publishing command:', command);
  }
}

// Workflow engine
class WorkflowEngine {
  private workflows: Map<string, OrderProcessingWorkflow> = new Map();

  async startWorkflow(workflowId: string, orderId: string): Promise<void> {
    const workflow = new OrderProcessingWorkflow(workflowId, orderId);
    this.workflows.set(workflowId, workflow);
  }

  async handleEvent(event: WorkflowEvent): Promise<void> {
    const workflow = this.workflows.get(event.workflowId);
    if (workflow) {
      await workflow.handleEvent(event);
    }
  }

  async getWorkflowState(workflowId: string): Promise<WorkflowState | undefined> {
    const workflow = this.workflows.get(workflowId);
    return workflow?.state;
  }
}

Building the Complete System

Now let’s put it all together. Here’s how to build a complete event-driven system with event sourcing and stateful workflows:

// Event store implementation
class EventStore {
  private events: Map<string, DomainEvent[]> = new Map();

  async saveEvents(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
    const existingEvents = this.events.get(aggregateId) || [];
    
    if (existingEvents.length !== expectedVersion) {
      throw new Error('Concurrency conflict');
    }

    const newEvents = [...existingEvents, ...events];
    this.events.set(aggregateId, newEvents);

    // Publish events to event bus
    for (const event of events) {
      await this.publishEvent(event);
    }
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    return this.events.get(aggregateId) || [];
  }

  private async publishEvent(event: DomainEvent): Promise<void> {
    // Publish to event bus (Kafka, RabbitMQ, etc.)
    console.log('Publishing event:', event);
  }
}

// Command handler
class OrderCommandHandler {
  constructor(
    private eventStore: EventStore,
    private workflowEngine: WorkflowEngine
  ) {}

  async handle(command: CreateOrderCommand): Promise<void> {
    // Load aggregate from events
    const events = await this.eventStore.getEvents(command.orderId);
    const order = OrderAggregate.fromEvents(events);

    // Execute command
    order.createOrder(command.customerId, command.items);

    // Save events
    const newEvents = order.getEvents().slice(events.length);
    await this.eventStore.saveEvents(command.orderId, newEvents, events.length);

    // Start workflow
    await this.workflowEngine.startWorkflow(`order-${command.orderId}`, command.orderId);
  }

  async handle(command: ProcessPaymentCommand): Promise<void> {
    const events = await this.eventStore.getEvents(command.orderId);
    const order = OrderAggregate.fromEvents(events);

    order.processPayment(command.amount, command.paymentId);

    const newEvents = order.getEvents().slice(events.length);
    await this.eventStore.saveEvents(command.orderId, newEvents, events.length);

    // Notify workflow
    await this.workflowEngine.handleEvent({
      type: 'PaymentSucceeded',
      workflowId: `order-${command.orderId}`,
      orderId: command.orderId,
      paymentId: command.paymentId,
      timestamp: new Date()
    });
  }
}

// Event handler for projections
class OrderProjectionHandler {
  private readModel: Map<string, OrderView> = new Map();

  async handle(event: OrderCreated): Promise<void> {
    const view: OrderView = {
      orderId: event.orderId,
      customerId: event.customerId,
      items: event.items,
      status: 'PENDING',
      createdAt: event.timestamp
    };

    this.readModel.set(event.orderId, view);
  }

  async handle(event: PaymentProcessed): Promise<void> {
    const view = this.readModel.get(event.orderId);
    if (view) {
      view.status = 'PAID';
      view.paymentId = event.paymentId;
      view.paidAt = event.timestamp;
    }
  }

  async handle(event: OrderShipped): Promise<void> {
    const view = this.readModel.get(event.orderId);
    if (view) {
      view.status = 'SHIPPED';
      view.trackingNumber = event.trackingNumber;
      view.shippedAt = event.timestamp;
    }
  }

  getOrder(orderId: string): OrderView | undefined {
    return this.readModel.get(orderId);
  }
}

// Main application
class OrderService {
  private commandHandler: OrderCommandHandler;
  private projectionHandler: OrderProjectionHandler;
  private workflowEngine: WorkflowEngine;

  constructor() {
    const eventStore = new EventStore();
    this.workflowEngine = new WorkflowEngine();
    this.commandHandler = new OrderCommandHandler(eventStore, this.workflowEngine);
    this.projectionHandler = new OrderProjectionHandler();
  }

  async createOrder(orderId: string, customerId: string, items: OrderItem[]): Promise<void> {
    const command: CreateOrderCommand = {
      orderId,
      customerId,
      items
    };

    await this.commandHandler.handle(command);
  }

  async processPayment(orderId: string, amount: number, paymentId: string): Promise<void> {
    const command: ProcessPaymentCommand = {
      orderId,
      amount,
      paymentId
    };

    await this.commandHandler.handle(command);
  }

  async getOrder(orderId: string): Promise<OrderView | undefined> {
    return this.projectionHandler.getOrder(orderId);
  }
}

Best Practices and Resilience Patterns

Building resilient event-driven systems requires careful attention to several patterns:

Idempotency

Events can be delivered multiple times. Your handlers must be idempotent - they should produce the same result regardless of how many times they’re called.

class IdempotentEventHandler {
  private processedEvents: Set<string> = new Set();

  async handle(event: DomainEvent): Promise<void> {
    const eventId = `${event.type}-${event.aggregateId}-${event.timestamp.getTime()}`;
    
    if (this.processedEvents.has(eventId)) {
      console.log('Event already processed, skipping');
      return;
    }

    // Process event
    await this.processEvent(event);
    
    // Mark as processed
    this.processedEvents.add(eventId);
  }

  private async processEvent(event: DomainEvent): Promise<void> {
    // Actual event processing logic
  }
}

Event Versioning

Events evolve over time. You need a strategy for handling different versions of the same event.

interface EventVersion {
  version: number;
  eventType: string;
}

class EventVersionHandler {
  async handle(event: any): Promise<void> {
    const version = this.getEventVersion(event);
    
    switch (version) {
      case 1:
        await this.handleV1(event);
        break;
      case 2:
        await this.handleV2(event);
        break;
      default:
        throw new Error(`Unsupported event version: ${version}`);
    }
  }

  private getEventVersion(event: any): number {
    return event.version || 1;
  }

  private async handleV1(event: any): Promise<void> {
    // Handle version 1
  }

  private async handleV2(event: any): Promise<void> {
    // Handle version 2
  }
}

Monitoring and Observability

Event-driven systems need comprehensive monitoring. You need to track event lag, processing times, and failures.

class EventMonitoring {
  private metrics: Map<string, number> = new Map();

  recordEventProcessed(eventType: string, processingTime: number): void {
    const key = `events.${eventType}.processed`;
    this.metrics.set(key, (this.metrics.get(key) || 0) + 1);
    
    const timeKey = `events.${eventType}.processing_time`;
    this.metrics.set(timeKey, processingTime);
  }

  recordEventFailed(eventType: string, error: Error): void {
    const key = `events.${eventType}.failed`;
    this.metrics.set(key, (this.metrics.get(key) || 0) + 1);
    
    console.error(`Event processing failed: ${eventType}`, error);
  }

  recordEventLag(eventType: string, lag: number): void {
    const key = `events.${eventType}.lag`;
    this.metrics.set(key, lag);
  }

  getMetrics(): Map<string, number> {
    return new Map(this.metrics);
  }
}

Dead Letter Queues

When events can’t be processed, they should go to a dead letter queue for manual inspection.

class DeadLetterQueue {
  private failedEvents: FailedEvent[] = [];

  async addFailedEvent(event: DomainEvent, error: Error, retryCount: number): Promise<void> {
    const failedEvent: FailedEvent = {
      event,
      error: error.message,
      retryCount,
      timestamp: new Date()
    };

    this.failedEvents.push(failedEvent);
    
    // In production, this would go to a persistent store
    console.log('Event sent to dead letter queue:', failedEvent);
  }

  async retryEvent(failedEventId: string): Promise<void> {
    const failedEvent = this.failedEvents.find(fe => fe.id === failedEventId);
    if (failedEvent) {
      // Retry the event
      console.log('Retrying event:', failedEvent);
    }
  }
}

When to Adopt Event-Driven Architecture

Event-driven architecture isn’t right for every system. Here are the indicators that it might be a good fit:

Complex business processes: When you have processes that span multiple services and take time to complete.

High failure tolerance requirements: When you need systems that can handle partial failures gracefully.

Audit and compliance needs: When you need complete audit trails of all changes.

Scalability requirements: When you need to scale different parts of your system independently.

Integration complexity: When you have many services that need to communicate.

Adoption Roadmap

If you decide to adopt event-driven architecture, here’s a practical roadmap:

Phase 1: Pilot a Domain

Start with a single business domain that has clear boundaries. Choose something that’s not too complex but has enough moving parts to validate the approach.

Set up the basic infrastructure:

  • Event store (EventStore, Apache Kafka, or similar)
  • Event bus (Kafka, RabbitMQ, or similar)
  • Basic monitoring

Phase 2: Define Events

Work with domain experts to define the events that matter to your business. Focus on business events, not technical events.

Create event schemas and versioning strategies. Set up event contracts between services.

Phase 3: Build Projections

Create read models that serve your queries. Start with simple projections and add complexity as needed.

Implement CQRS patterns to separate read and write models.

Phase 4: Add Workflow Engine

Introduce workflow orchestration for complex processes. Start with simple workflows and add compensation logic.

Use tools like Temporal, Azure Durable Functions, or AWS Step Functions.

Phase 5: Measure and Iterate

Monitor the system closely. Measure event lag, processing times, and failure rates.

Use the data to optimize and improve the system.

Team and Process Considerations

Event-driven architecture requires changes to how teams work:

Event naming: Establish clear naming conventions for events. Use past tense (OrderCreated, not CreateOrder).

Producer/consumer contracts: Define clear contracts between services. Use schema registries to manage event schemas.

Data governance: Establish rules about who can publish what events and who can consume them.

Testing strategies: Test event handlers in isolation. Use event sourcing to create test scenarios.

Monitoring and alerting: Set up comprehensive monitoring for event processing, lag, and failures.

Conclusion

Event-driven architecture with event sourcing and stateful workflows provides a powerful foundation for building resilient distributed systems. It handles failures gracefully, provides complete audit trails, and scales well.

But it’s not a silver bullet. It adds complexity and requires careful design. You need to think about event schemas, versioning, monitoring, and team processes.

The key is to start small and learn as you go. Pick a domain that’s not too complex, set up the basic infrastructure, and gradually add more sophisticated patterns.

The benefits are real. You get systems that can handle failures, scale independently, and provide complete visibility into what’s happening. You get the foundation for building truly resilient applications.

If you’re dealing with complex business processes, high failure tolerance requirements, or the need for complete audit trails, event-driven architecture is worth considering. Start with a pilot project and see how it works for your team.

The future of distributed systems is event-driven. The question isn’t whether to adopt it, but when and how to do it effectively.

Join the Discussion

Have thoughts on this article? Share your insights and engage with the community.