article

Event-Driven Architecture - ระบบตอบสนองเหตุการณ์

11 min read

Event-Driven Architecture คืออะไร?

Event-Driven Architecture (EDA) เป็นรูปแบบการออกแบบซอฟต์แวร์ที่ระบบต่างๆ สื่อสารกันผ่าน Events แทนการเรียกใช้งานโดยตรง ทำให้ระบบมี Loose Coupling และ High Scalability

หลักการพื้นฐาน

  1. Event Producer - ส่ง Event เมื่อมีสิ่งที่เกิดขึ้น
  2. Event Consumer - รับ Event และทำงานตาม Business Logic
  3. Event Broker - ตัวกลางในการส่งต่อ Event
graph LR
    A[Producer] -->|Event| B[Event Broker]
    B --> C[Consumer 1]
    B --> D[Consumer 2]
    B --> E[Consumer N]

Event คืออะไร?

Event คือข้อมูลที่แสดงถึงสิ่งที่เกิดขึ้นในระบบ โดยมีลักษณะ:

  • Immutable - ไม่สามารถเปลี่ยนแปลงได้
  • Past Tense - ใช้กริยาช่วง 3 เช่น “OrderCreated”, “PaymentProcessed”
  • Rich in Context - มีข้อมูลครบถ้วนสำหรับ Consumer

Event Structure

// Event Schema
const OrderCreatedEvent = {
  eventId: 'uuid-here',
  eventType: 'OrderCreated',
  aggregateId: 'order-123',
  timestamp: '2025-11-01T10:00:00Z',
  version: 1,
  data: {
    orderId: 'order-123',
    customerId: 'customer-456',
    items: [
      { productId: 'prod-1', quantity: 2, price: 100 },
      { productId: 'prod-2', quantity: 1, price: 50 }
    ],
    totalAmount: 250,
    currency: 'THB'
  },
  metadata: {
    source: 'order-service',
    correlationId: 'req-789',
    userId: 'user-999'
  }
};

Event Sourcing

Event Sourcing เป็นเทคนิคการเก็บ Events แทนการเก็บ Current State โดยตรง

Traditional vs Event Sourcing

// Traditional Approach - เก็บ Current State
const account = {
  id: 'acc-123',
  balance: 1000,
  lastUpdated: '2025-11-01T10:00:00Z'
};

// Event Sourcing - เก็บ Events
const events = [
  { type: 'AccountOpened', amount: 0, timestamp: '2025-01-01T00:00:00Z' },
  { type: 'MoneyDeposited', amount: 1500, timestamp: '2025-10-15T14:30:00Z' },
  { type: 'MoneyWithdrawn', amount: 500, timestamp: '2025-11-01T10:00:00Z' }
];

// Rebuild State from Events
const balance = events.reduce((acc, event) => {
  switch (event.type) {
    case 'AccountOpened': return 0;
    case 'MoneyDeposited': return acc + event.amount;
    case 'MoneyWithdrawn': return acc - event.amount;
    default: return acc;
  }
}, 0); // balance = 1000

Event Store Implementation

class EventStore {
  constructor(database) {
    this.db = database;
  }

  async saveEvents(aggregateId, expectedVersion, events) {
    const transaction = await this.db.beginTransaction();
    
    try {
      // Check version for optimistic locking
      const currentVersion = await this.getVersion(aggregateId);
      if (currentVersion !== expectedVersion) {
        throw new Error('Concurrency conflict');
      }

      // Save events
      for (let i = 0; i < events.length; i++) {
        const event = {
          ...events[i],
          aggregateId,
          version: expectedVersion + i + 1,
          timestamp: new Date().toISOString()
        };

        await this.db.query(`
          INSERT INTO events (aggregate_id, version, event_type, data, timestamp)
          VALUES ($1, $2, $3, $4, $5)
        `, [aggregateId, event.version, event.eventType, JSON.stringify(event.data), event.timestamp]);
      }

      await transaction.commit();
      
      // Publish events to message broker
      await this.publishEvents(events);
    } catch (error) {
      await transaction.rollback();
      throw error;
    }
  }

  async getEvents(aggregateId, fromVersion = 0) {
    const result = await this.db.query(`
      SELECT * FROM events 
      WHERE aggregate_id = $1 AND version > $2
      ORDER BY version ASC
    `, [aggregateId, fromVersion]);

    return result.rows.map(row => ({
      eventId: row.id,
      eventType: row.event_type,
      aggregateId: row.aggregate_id,
      version: row.version,
      data: JSON.parse(row.data),
      timestamp: row.timestamp
    }));
  }

  async getVersion(aggregateId) {
    const result = await this.db.query(`
      SELECT MAX(version) as version FROM events WHERE aggregate_id = $1
    `, [aggregateId]);
    
    return result.rows[0].version || 0;
  }
}

CQRS (Command Query Responsibility Segregation)

CQRS แยก Command (เขียนข้อมูล) และ Query (อ่านข้อมูล) ออกจากกัน

Command Side (Write Model)

class OrderAggregate {
  constructor(id) {
    this.id = id;
    this.version = 0;
    this.events = [];
    this.items = [];
    this.status = null;
  }

  // Command Handler
  createOrder(customerId, items) {
    if (this.status !== null) {
      throw new Error('Order already exists');
    }

    const totalAmount = items.reduce((sum, item) => sum + (item.price * item.quantity), 0);

    this.addEvent({
      eventType: 'OrderCreated',
      data: {
        customerId,
        items,
        totalAmount,
        status: 'PENDING'
      }
    });
  }

  confirmPayment(paymentId, amount) {
    if (this.status !== 'PENDING') {
      throw new Error('Cannot confirm payment for non-pending order');
    }

    this.addEvent({
      eventType: 'PaymentConfirmed',
      data: { paymentId, amount }
    });
  }

  addEvent(event) {
    this.events.push(event);
    this.apply(event);
  }

  // Event Handler (Apply)
  apply(event) {
    switch (event.eventType) {
      case 'OrderCreated':
        this.items = event.data.items;
        this.status = 'PENDING';
        break;
      case 'PaymentConfirmed':
        this.status = 'CONFIRMED';
        break;
    }
  }

  getUncommittedEvents() {
    return this.events;
  }

  markEventsAsCommitted() {
    this.events = [];
  }
}

Query Side (Read Model)

class OrderProjection {
  constructor(database) {
    this.db = database;
  }

  // Event Handlers for Read Model
  async handleOrderCreated(event) {
    const { customerId, items, totalAmount, status } = event.data;
    
    await this.db.query(`
      INSERT INTO order_view (id, customer_id, total_amount, status, created_at)
      VALUES ($1, $2, $3, $4, $5)
    `, [event.aggregateId, customerId, totalAmount, status, event.timestamp]);

    // Update customer statistics
    await this.updateCustomerStats(customerId, totalAmount);
  }

  async handlePaymentConfirmed(event) {
    await this.db.query(`
      UPDATE order_view 
      SET status = 'CONFIRMED', confirmed_at = $2
      WHERE id = $1
    `, [event.aggregateId, event.timestamp]);
  }

  // Query Methods
  async getOrderById(orderId) {
    const result = await this.db.query(`
      SELECT * FROM order_view WHERE id = $1
    `, [orderId]);
    
    return result.rows[0];
  }

  async getOrdersByCustomer(customerId, limit = 10) {
    const result = await this.db.query(`
      SELECT * FROM order_view 
      WHERE customer_id = $1 
      ORDER BY created_at DESC 
      LIMIT $2
    `, [customerId, limit]);
    
    return result.rows;
  }

  async getOrderStats() {
    const result = await this.db.query(`
      SELECT 
        COUNT(*) as total_orders,
        SUM(total_amount) as total_revenue,
        AVG(total_amount) as average_order_value
      FROM order_view
      WHERE status = 'CONFIRMED'
    `);
    
    return result.rows[0];
  }
}

Message Brokers และ Event Streaming

Apache Kafka Implementation

const kafka = require('kafkajs');

class EventBus {
  constructor() {
    this.kafka = kafka({
      clientId: 'order-service',
      brokers: ['localhost:9092']
    });
    
    this.producer = this.kafka.producer();
    this.consumer = this.kafka.consumer({ groupId: 'order-group' });
  }

  async publish(eventType, data, options = {}) {
    await this.producer.send({
      topic: options.topic || 'events',
      messages: [{
        key: data.aggregateId,
        value: JSON.stringify({
          eventType,
          data,
          timestamp: new Date().toISOString(),
          ...options
        }),
        headers: {
          eventType,
          source: 'order-service'
        }
      }]
    });
  }

  async subscribe(eventTypes, handler) {
    await this.consumer.subscribe({ topic: 'events' });
    
    await this.consumer.run({
      eachMessage: async ({ message }) => {
        const event = JSON.parse(message.value.toString());
        
        if (eventTypes.includes(event.eventType)) {
          try {
            await handler(event);
            console.log(`Processed event: ${event.eventType}`);
          } catch (error) {
            console.error(`Error processing event: ${event.eventType}`, error);
            // Send to DLQ (Dead Letter Queue)
            await this.sendToDeadLetterQueue(event, error);
          }
        }
      }
    });
  }

  async sendToDeadLetterQueue(event, error) {
    await this.producer.send({
      topic: 'dlq',
      messages: [{
        value: JSON.stringify({
          originalEvent: event,
          error: error.message,
          timestamp: new Date().toISOString()
        })
      }]
    });
  }
}

Redis Streams Implementation

const redis = require('redis');

class RedisEventBus {
  constructor() {
    this.client = redis.createClient();
    this.subscribers = new Map();
  }

  async publish(stream, event) {
    await this.client.xAdd(stream, '*', event);
  }

  async subscribe(stream, consumerGroup, consumerName, handler) {
    try {
      // Create consumer group if not exists
      await this.client.xGroupCreate(stream, consumerGroup, '0', {
        MKSTREAM: true
      });
    } catch (error) {
      // Group already exists
    }

    // Start consuming
    while (true) {
      try {
        const messages = await this.client.xReadGroup(
          consumerGroup,
          consumerName,
          [{ key: stream, id: '>' }],
          { COUNT: 10, BLOCK: 1000 }
        );

        for (const message of messages || []) {
          for (const entry of message.messages) {
            try {
              await handler(entry.message);
              
              // Acknowledge message
              await this.client.xAck(stream, consumerGroup, entry.id);
            } catch (error) {
              console.error('Error processing message:', error);
              // Handle retry logic here
            }
          }
        }
      } catch (error) {
        console.error('Error reading from stream:', error);
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    }
  }
}

Saga Pattern ใน Event-Driven System

class OrderSaga {
  constructor(eventBus) {
    this.eventBus = eventBus;
    this.setupEventHandlers();
  }

  setupEventHandlers() {
    this.eventBus.subscribe(['OrderCreated'], this.handleOrderCreated.bind(this));
    this.eventBus.subscribe(['PaymentProcessed'], this.handlePaymentProcessed.bind(this));
    this.eventBus.subscribe(['PaymentFailed'], this.handlePaymentFailed.bind(this));
    this.eventBus.subscribe(['InventoryReserved'], this.handleInventoryReserved.bind(this));
    this.eventBus.subscribe(['InventoryReservationFailed'], this.handleInventoryReservationFailed.bind(this));
  }

  async handleOrderCreated(event) {
    const { orderId, customerId, totalAmount } = event.data;
    
    // Step 1: Process Payment
    await this.eventBus.publish('ProcessPayment', {
      orderId,
      customerId,
      amount: totalAmount
    });
  }

  async handlePaymentProcessed(event) {
    const { orderId, paymentId } = event.data;
    
    // Step 2: Reserve Inventory
    await this.eventBus.publish('ReserveInventory', {
      orderId,
      paymentId,
      items: event.data.items
    });
  }

  async handlePaymentFailed(event) {
    const { orderId, reason } = event.data;
    
    // Compensate: Cancel Order
    await this.eventBus.publish('CancelOrder', {
      orderId,
      reason: `Payment failed: ${reason}`
    });
  }

  async handleInventoryReserved(event) {
    const { orderId } = event.data;
    
    // Step 3: Complete Order
    await this.eventBus.publish('CompleteOrder', {
      orderId,
      status: 'COMPLETED'
    });
  }

  async handleInventoryReservationFailed(event) {
    const { orderId, paymentId } = event.data;
    
    // Compensate: Refund Payment
    await this.eventBus.publish('RefundPayment', {
      orderId,
      paymentId,
      reason: 'Inventory not available'
    });
  }
}

Event Store Projections

class ProjectionEngine {
  constructor(eventStore, projections) {
    this.eventStore = eventStore;
    this.projections = projections;
    this.checkpoints = new Map();
  }

  async start() {
    for (const projection of this.projections) {
      this.processProjection(projection);
    }
  }

  async processProjection(projection) {
    let lastCheckpoint = await this.getCheckpoint(projection.name);
    
    while (true) {
      try {
        const events = await this.eventStore.getEventsSince(lastCheckpoint, 100);
        
        if (events.length === 0) {
          await this.sleep(1000);
          continue;
        }

        for (const event of events) {
          if (projection.eventTypes.includes(event.eventType)) {
            await projection.handler(event);
          }
          lastCheckpoint = event.id;
        }

        await this.saveCheckpoint(projection.name, lastCheckpoint);
      } catch (error) {
        console.error(`Projection ${projection.name} failed:`, error);
        await this.sleep(5000);
      }
    }
  }

  async getCheckpoint(projectionName) {
    // Load from database
    const result = await this.db.query(`
      SELECT checkpoint FROM projections WHERE name = $1
    `, [projectionName]);
    
    return result.rows[0]?.checkpoint || 0;
  }

  async saveCheckpoint(projectionName, checkpoint) {
    await this.db.query(`
      INSERT INTO projections (name, checkpoint, updated_at) 
      VALUES ($1, $2, NOW())
      ON CONFLICT (name) 
      DO UPDATE SET checkpoint = $2, updated_at = NOW()
    `, [projectionName, checkpoint]);
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// การใช้งาน
const projectionEngine = new ProjectionEngine(eventStore, [
  {
    name: 'order-summary',
    eventTypes: ['OrderCreated', 'OrderCompleted', 'OrderCancelled'],
    handler: async (event) => {
      // Update order summary view
      await orderSummaryProjection.handle(event);
    }
  },
  {
    name: 'customer-stats',
    eventTypes: ['OrderCompleted'],
    handler: async (event) => {
      // Update customer statistics
      await customerStatsProjection.handle(event);
    }
  }
]);

await projectionEngine.start();

Error Handling และ Resilience

Dead Letter Queue

class ResilientEventHandler {
  constructor(eventBus, maxRetries = 3) {
    this.eventBus = eventBus;
    this.maxRetries = maxRetries;
  }

  async handleEvent(event, handler) {
    let attempt = 0;
    
    while (attempt < this.maxRetries) {
      try {
        await handler(event);
        return; // Success
      } catch (error) {
        attempt++;
        
        if (attempt >= this.maxRetries) {
          // Send to Dead Letter Queue
          await this.sendToDeadLetterQueue(event, error);
          throw new Error(`Failed after ${this.maxRetries} attempts: ${error.message}`);
        }
        
        // Exponential backoff
        const delay = Math.pow(2, attempt) * 1000;
        await this.sleep(delay);
      }
    }
  }

  async sendToDeadLetterQueue(event, error) {
    await this.eventBus.publish('dlq', {
      originalEvent: event,
      error: error.message,
      stack: error.stack,
      timestamp: new Date().toISOString(),
      retryCount: this.maxRetries
    });
  }
}

Event Replay

class EventReplay {
  constructor(eventStore, eventBus) {
    this.eventStore = eventStore;
    this.eventBus = eventBus;
  }

  async replayEvents(fromTimestamp, toTimestamp, eventTypes = []) {
    console.log(`Replaying events from ${fromTimestamp} to ${toTimestamp}`);
    
    const events = await this.eventStore.getEventsByTimeRange(
      fromTimestamp, 
      toTimestamp,
      eventTypes
    );

    for (const event of events) {
      try {
        // Add replay metadata
        event.metadata = {
          ...event.metadata,
          isReplay: true,
          originalTimestamp: event.timestamp,
          replayTimestamp: new Date().toISOString()
        };

        await this.eventBus.publish(event.eventType, event.data, {
          metadata: event.metadata
        });

        console.log(`Replayed event: ${event.eventType} (${event.id})`);
      } catch (error) {
        console.error(`Failed to replay event ${event.id}:`, error);
      }
    }
  }

  async rebuildProjection(projectionName, fromDate = null) {
    // Clear existing projection data
    await this.clearProjection(projectionName);
    
    // Replay all relevant events
    const events = await this.eventStore.getAllEvents(fromDate);
    
    for (const event of events) {
      await this.processEventForProjection(projectionName, event);
    }
  }
}

Monitoring และ Observability

class EventMetrics {
  constructor() {
    this.metrics = {
      eventsPublished: 0,
      eventsConsumed: 0,
      eventsByType: {},
      processingTimes: [],
      errors: []
    };
  }

  recordEventPublished(eventType) {
    this.metrics.eventsPublished++;
    this.metrics.eventsByType[eventType] = 
      (this.metrics.eventsByType[eventType] || 0) + 1;
  }

  recordEventProcessed(eventType, processingTime) {
    this.metrics.eventsConsumed++;
    this.metrics.processingTimes.push({
      eventType,
      duration: processingTime,
      timestamp: Date.now()
    });
  }

  recordError(eventType, error) {
    this.metrics.errors.push({
      eventType,
      error: error.message,
      timestamp: Date.now()
    });
  }

  getMetrics() {
    const avgProcessingTime = this.metrics.processingTimes.length > 0
      ? this.metrics.processingTimes.reduce((sum, t) => sum + t.duration, 0) / this.metrics.processingTimes.length
      : 0;

    return {
      ...this.metrics,
      averageProcessingTime: Math.round(avgProcessingTime),
      errorRate: this.metrics.errors.length / this.metrics.eventsConsumed
    };
  }
}

Best Practices

1. Event Design

// ❌ Poor Event Design
const badEvent = {
  type: 'Update', // ไม่ชัดเจน
  data: { id: '123' } // ข้อมูลไม่ครบ
};

// ✅ Good Event Design
const goodEvent = {
  eventId: 'uuid-v4',
  eventType: 'CustomerAddressUpdated', // ชัดเจน, past tense
  aggregateId: 'customer-123',
  version: 5,
  timestamp: '2025-11-01T10:00:00Z',
  data: {
    customerId: 'customer-123',
    oldAddress: { /* complete old address */ },
    newAddress: { /* complete new address */ },
    updatedBy: 'user-456'
  },
  metadata: {
    correlationId: 'req-789',
    causationId: 'cmd-101',
    source: 'customer-service'
  }
};

2. Idempotency

class IdempotentEventHandler {
  constructor() {
    this.processedEvents = new Set();
  }

  async handle(event) {
    // Check if already processed
    if (this.processedEvents.has(event.eventId)) {
      console.log(`Event ${event.eventId} already processed, skipping`);
      return;
    }

    try {
      await this.processEvent(event);
      
      // Mark as processed
      this.processedEvents.add(event.eventId);
      await this.saveProcessedEvent(event.eventId);
    } catch (error) {
      // Don't mark as processed if failed
      throw error;
    }
  }

  async saveProcessedEvent(eventId) {
    await this.db.query(`
      INSERT INTO processed_events (event_id, processed_at) 
      VALUES ($1, NOW())
      ON CONFLICT (event_id) DO NOTHING
    `, [eventId]);
  }
}

3. Event Versioning

class VersionedEventHandler {
  constructor() {
    this.handlers = {
      'CustomerCreated': {
        v1: this.handleCustomerCreatedV1.bind(this),
        v2: this.handleCustomerCreatedV2.bind(this)
      }
    };
  }

  async handle(event) {
    const version = event.version || 'v1';
    const handler = this.handlers[event.eventType]?.[version];
    
    if (!handler) {
      throw new Error(`No handler for ${event.eventType} version ${version}`);
    }

    await handler(event);
  }

  async handleCustomerCreatedV1(event) {
    // Handle old format
    const { name, email } = event.data;
    // Process...
  }

  async handleCustomerCreatedV2(event) {
    // Handle new format with additional fields
    const { firstName, lastName, email, phoneNumber } = event.data;
    // Process...
  }
}

สรุป

Event-Driven Architecture เป็นรูปแบบที่ทรงพลังสำหรับระบบสมัยใหม่ โดยให้ประโยชน์:

  1. Loose Coupling - ระบบไม่ผูกติดกัน
  2. Scalability - ขยายได้ตามความต้องการ
  3. Resilience - ทนต่อความผิดพลาด
  4. Auditability - ตรวจสอบได้ครบถ้วน
  5. Real-time Processing - ประมวลผลแบบเรียลไทม์

การใช้ Event Sourcing และ CQRS ร่วมกัน จะช่วยให้สามารถสร้างระบบที่มีประสิทธิภาพและยืดหยุ่นสูง แต่ต้องระวังเรื่อง Complexity และการจัดการ Eventual Consistency!