Event-Driven Architecture คืออะไร?
Event-Driven Architecture (EDA) เป็นรูปแบบการออกแบบซอฟต์แวร์ที่ระบบต่างๆ สื่อสารกันผ่าน Events แทนการเรียกใช้งานโดยตรง ทำให้ระบบมี Loose Coupling และ High Scalability
หลักการพื้นฐาน
- Event Producer - ส่ง Event เมื่อมีสิ่งที่เกิดขึ้น
- Event Consumer - รับ Event และทำงานตาม Business Logic
- Event Broker - ตัวกลางในการส่งต่อ Event
graph LR
A[Producer] -->|Event| B[Event Broker]
B --> C[Consumer 1]
B --> D[Consumer 2]
B --> E[Consumer N]
Event คืออะไร?
Event คือข้อมูลที่แสดงถึงสิ่งที่เกิดขึ้นในระบบ โดยมีลักษณะ:
- Immutable - ไม่สามารถเปลี่ยนแปลงได้
- Past Tense - ใช้กริยาช่วง 3 เช่น “OrderCreated”, “PaymentProcessed”
- Rich in Context - มีข้อมูลครบถ้วนสำหรับ Consumer
Event Structure
// Event Schema
const OrderCreatedEvent = {
eventId: 'uuid-here',
eventType: 'OrderCreated',
aggregateId: 'order-123',
timestamp: '2025-11-01T10:00:00Z',
version: 1,
data: {
orderId: 'order-123',
customerId: 'customer-456',
items: [
{ productId: 'prod-1', quantity: 2, price: 100 },
{ productId: 'prod-2', quantity: 1, price: 50 }
],
totalAmount: 250,
currency: 'THB'
},
metadata: {
source: 'order-service',
correlationId: 'req-789',
userId: 'user-999'
}
};
Event Sourcing
Event Sourcing เป็นเทคนิคการเก็บ Events แทนการเก็บ Current State โดยตรง
Traditional vs Event Sourcing
// Traditional Approach - เก็บ Current State
const account = {
id: 'acc-123',
balance: 1000,
lastUpdated: '2025-11-01T10:00:00Z'
};
// Event Sourcing - เก็บ Events
const events = [
{ type: 'AccountOpened', amount: 0, timestamp: '2025-01-01T00:00:00Z' },
{ type: 'MoneyDeposited', amount: 1500, timestamp: '2025-10-15T14:30:00Z' },
{ type: 'MoneyWithdrawn', amount: 500, timestamp: '2025-11-01T10:00:00Z' }
];
// Rebuild State from Events
const balance = events.reduce((acc, event) => {
switch (event.type) {
case 'AccountOpened': return 0;
case 'MoneyDeposited': return acc + event.amount;
case 'MoneyWithdrawn': return acc - event.amount;
default: return acc;
}
}, 0); // balance = 1000
Event Store Implementation
class EventStore {
constructor(database) {
this.db = database;
}
async saveEvents(aggregateId, expectedVersion, events) {
const transaction = await this.db.beginTransaction();
try {
// Check version for optimistic locking
const currentVersion = await this.getVersion(aggregateId);
if (currentVersion !== expectedVersion) {
throw new Error('Concurrency conflict');
}
// Save events
for (let i = 0; i < events.length; i++) {
const event = {
...events[i],
aggregateId,
version: expectedVersion + i + 1,
timestamp: new Date().toISOString()
};
await this.db.query(`
INSERT INTO events (aggregate_id, version, event_type, data, timestamp)
VALUES ($1, $2, $3, $4, $5)
`, [aggregateId, event.version, event.eventType, JSON.stringify(event.data), event.timestamp]);
}
await transaction.commit();
// Publish events to message broker
await this.publishEvents(events);
} catch (error) {
await transaction.rollback();
throw error;
}
}
async getEvents(aggregateId, fromVersion = 0) {
const result = await this.db.query(`
SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
`, [aggregateId, fromVersion]);
return result.rows.map(row => ({
eventId: row.id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
version: row.version,
data: JSON.parse(row.data),
timestamp: row.timestamp
}));
}
async getVersion(aggregateId) {
const result = await this.db.query(`
SELECT MAX(version) as version FROM events WHERE aggregate_id = $1
`, [aggregateId]);
return result.rows[0].version || 0;
}
}
CQRS (Command Query Responsibility Segregation)
CQRS แยก Command (เขียนข้อมูล) และ Query (อ่านข้อมูล) ออกจากกัน
Command Side (Write Model)
class OrderAggregate {
constructor(id) {
this.id = id;
this.version = 0;
this.events = [];
this.items = [];
this.status = null;
}
// Command Handler
createOrder(customerId, items) {
if (this.status !== null) {
throw new Error('Order already exists');
}
const totalAmount = items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
this.addEvent({
eventType: 'OrderCreated',
data: {
customerId,
items,
totalAmount,
status: 'PENDING'
}
});
}
confirmPayment(paymentId, amount) {
if (this.status !== 'PENDING') {
throw new Error('Cannot confirm payment for non-pending order');
}
this.addEvent({
eventType: 'PaymentConfirmed',
data: { paymentId, amount }
});
}
addEvent(event) {
this.events.push(event);
this.apply(event);
}
// Event Handler (Apply)
apply(event) {
switch (event.eventType) {
case 'OrderCreated':
this.items = event.data.items;
this.status = 'PENDING';
break;
case 'PaymentConfirmed':
this.status = 'CONFIRMED';
break;
}
}
getUncommittedEvents() {
return this.events;
}
markEventsAsCommitted() {
this.events = [];
}
}
Query Side (Read Model)
class OrderProjection {
constructor(database) {
this.db = database;
}
// Event Handlers for Read Model
async handleOrderCreated(event) {
const { customerId, items, totalAmount, status } = event.data;
await this.db.query(`
INSERT INTO order_view (id, customer_id, total_amount, status, created_at)
VALUES ($1, $2, $3, $4, $5)
`, [event.aggregateId, customerId, totalAmount, status, event.timestamp]);
// Update customer statistics
await this.updateCustomerStats(customerId, totalAmount);
}
async handlePaymentConfirmed(event) {
await this.db.query(`
UPDATE order_view
SET status = 'CONFIRMED', confirmed_at = $2
WHERE id = $1
`, [event.aggregateId, event.timestamp]);
}
// Query Methods
async getOrderById(orderId) {
const result = await this.db.query(`
SELECT * FROM order_view WHERE id = $1
`, [orderId]);
return result.rows[0];
}
async getOrdersByCustomer(customerId, limit = 10) {
const result = await this.db.query(`
SELECT * FROM order_view
WHERE customer_id = $1
ORDER BY created_at DESC
LIMIT $2
`, [customerId, limit]);
return result.rows;
}
async getOrderStats() {
const result = await this.db.query(`
SELECT
COUNT(*) as total_orders,
SUM(total_amount) as total_revenue,
AVG(total_amount) as average_order_value
FROM order_view
WHERE status = 'CONFIRMED'
`);
return result.rows[0];
}
}
Message Brokers และ Event Streaming
Apache Kafka Implementation
const kafka = require('kafkajs');
class EventBus {
constructor() {
this.kafka = kafka({
clientId: 'order-service',
brokers: ['localhost:9092']
});
this.producer = this.kafka.producer();
this.consumer = this.kafka.consumer({ groupId: 'order-group' });
}
async publish(eventType, data, options = {}) {
await this.producer.send({
topic: options.topic || 'events',
messages: [{
key: data.aggregateId,
value: JSON.stringify({
eventType,
data,
timestamp: new Date().toISOString(),
...options
}),
headers: {
eventType,
source: 'order-service'
}
}]
});
}
async subscribe(eventTypes, handler) {
await this.consumer.subscribe({ topic: 'events' });
await this.consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
if (eventTypes.includes(event.eventType)) {
try {
await handler(event);
console.log(`Processed event: ${event.eventType}`);
} catch (error) {
console.error(`Error processing event: ${event.eventType}`, error);
// Send to DLQ (Dead Letter Queue)
await this.sendToDeadLetterQueue(event, error);
}
}
}
});
}
async sendToDeadLetterQueue(event, error) {
await this.producer.send({
topic: 'dlq',
messages: [{
value: JSON.stringify({
originalEvent: event,
error: error.message,
timestamp: new Date().toISOString()
})
}]
});
}
}
Redis Streams Implementation
const redis = require('redis');
class RedisEventBus {
constructor() {
this.client = redis.createClient();
this.subscribers = new Map();
}
async publish(stream, event) {
await this.client.xAdd(stream, '*', event);
}
async subscribe(stream, consumerGroup, consumerName, handler) {
try {
// Create consumer group if not exists
await this.client.xGroupCreate(stream, consumerGroup, '0', {
MKSTREAM: true
});
} catch (error) {
// Group already exists
}
// Start consuming
while (true) {
try {
const messages = await this.client.xReadGroup(
consumerGroup,
consumerName,
[{ key: stream, id: '>' }],
{ COUNT: 10, BLOCK: 1000 }
);
for (const message of messages || []) {
for (const entry of message.messages) {
try {
await handler(entry.message);
// Acknowledge message
await this.client.xAck(stream, consumerGroup, entry.id);
} catch (error) {
console.error('Error processing message:', error);
// Handle retry logic here
}
}
}
} catch (error) {
console.error('Error reading from stream:', error);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
}
Saga Pattern ใน Event-Driven System
class OrderSaga {
constructor(eventBus) {
this.eventBus = eventBus;
this.setupEventHandlers();
}
setupEventHandlers() {
this.eventBus.subscribe(['OrderCreated'], this.handleOrderCreated.bind(this));
this.eventBus.subscribe(['PaymentProcessed'], this.handlePaymentProcessed.bind(this));
this.eventBus.subscribe(['PaymentFailed'], this.handlePaymentFailed.bind(this));
this.eventBus.subscribe(['InventoryReserved'], this.handleInventoryReserved.bind(this));
this.eventBus.subscribe(['InventoryReservationFailed'], this.handleInventoryReservationFailed.bind(this));
}
async handleOrderCreated(event) {
const { orderId, customerId, totalAmount } = event.data;
// Step 1: Process Payment
await this.eventBus.publish('ProcessPayment', {
orderId,
customerId,
amount: totalAmount
});
}
async handlePaymentProcessed(event) {
const { orderId, paymentId } = event.data;
// Step 2: Reserve Inventory
await this.eventBus.publish('ReserveInventory', {
orderId,
paymentId,
items: event.data.items
});
}
async handlePaymentFailed(event) {
const { orderId, reason } = event.data;
// Compensate: Cancel Order
await this.eventBus.publish('CancelOrder', {
orderId,
reason: `Payment failed: ${reason}`
});
}
async handleInventoryReserved(event) {
const { orderId } = event.data;
// Step 3: Complete Order
await this.eventBus.publish('CompleteOrder', {
orderId,
status: 'COMPLETED'
});
}
async handleInventoryReservationFailed(event) {
const { orderId, paymentId } = event.data;
// Compensate: Refund Payment
await this.eventBus.publish('RefundPayment', {
orderId,
paymentId,
reason: 'Inventory not available'
});
}
}
Event Store Projections
class ProjectionEngine {
constructor(eventStore, projections) {
this.eventStore = eventStore;
this.projections = projections;
this.checkpoints = new Map();
}
async start() {
for (const projection of this.projections) {
this.processProjection(projection);
}
}
async processProjection(projection) {
let lastCheckpoint = await this.getCheckpoint(projection.name);
while (true) {
try {
const events = await this.eventStore.getEventsSince(lastCheckpoint, 100);
if (events.length === 0) {
await this.sleep(1000);
continue;
}
for (const event of events) {
if (projection.eventTypes.includes(event.eventType)) {
await projection.handler(event);
}
lastCheckpoint = event.id;
}
await this.saveCheckpoint(projection.name, lastCheckpoint);
} catch (error) {
console.error(`Projection ${projection.name} failed:`, error);
await this.sleep(5000);
}
}
}
async getCheckpoint(projectionName) {
// Load from database
const result = await this.db.query(`
SELECT checkpoint FROM projections WHERE name = $1
`, [projectionName]);
return result.rows[0]?.checkpoint || 0;
}
async saveCheckpoint(projectionName, checkpoint) {
await this.db.query(`
INSERT INTO projections (name, checkpoint, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (name)
DO UPDATE SET checkpoint = $2, updated_at = NOW()
`, [projectionName, checkpoint]);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// การใช้งาน
const projectionEngine = new ProjectionEngine(eventStore, [
{
name: 'order-summary',
eventTypes: ['OrderCreated', 'OrderCompleted', 'OrderCancelled'],
handler: async (event) => {
// Update order summary view
await orderSummaryProjection.handle(event);
}
},
{
name: 'customer-stats',
eventTypes: ['OrderCompleted'],
handler: async (event) => {
// Update customer statistics
await customerStatsProjection.handle(event);
}
}
]);
await projectionEngine.start();
Error Handling และ Resilience
Dead Letter Queue
class ResilientEventHandler {
constructor(eventBus, maxRetries = 3) {
this.eventBus = eventBus;
this.maxRetries = maxRetries;
}
async handleEvent(event, handler) {
let attempt = 0;
while (attempt < this.maxRetries) {
try {
await handler(event);
return; // Success
} catch (error) {
attempt++;
if (attempt >= this.maxRetries) {
// Send to Dead Letter Queue
await this.sendToDeadLetterQueue(event, error);
throw new Error(`Failed after ${this.maxRetries} attempts: ${error.message}`);
}
// Exponential backoff
const delay = Math.pow(2, attempt) * 1000;
await this.sleep(delay);
}
}
}
async sendToDeadLetterQueue(event, error) {
await this.eventBus.publish('dlq', {
originalEvent: event,
error: error.message,
stack: error.stack,
timestamp: new Date().toISOString(),
retryCount: this.maxRetries
});
}
}
Event Replay
class EventReplay {
constructor(eventStore, eventBus) {
this.eventStore = eventStore;
this.eventBus = eventBus;
}
async replayEvents(fromTimestamp, toTimestamp, eventTypes = []) {
console.log(`Replaying events from ${fromTimestamp} to ${toTimestamp}`);
const events = await this.eventStore.getEventsByTimeRange(
fromTimestamp,
toTimestamp,
eventTypes
);
for (const event of events) {
try {
// Add replay metadata
event.metadata = {
...event.metadata,
isReplay: true,
originalTimestamp: event.timestamp,
replayTimestamp: new Date().toISOString()
};
await this.eventBus.publish(event.eventType, event.data, {
metadata: event.metadata
});
console.log(`Replayed event: ${event.eventType} (${event.id})`);
} catch (error) {
console.error(`Failed to replay event ${event.id}:`, error);
}
}
}
async rebuildProjection(projectionName, fromDate = null) {
// Clear existing projection data
await this.clearProjection(projectionName);
// Replay all relevant events
const events = await this.eventStore.getAllEvents(fromDate);
for (const event of events) {
await this.processEventForProjection(projectionName, event);
}
}
}
Monitoring และ Observability
class EventMetrics {
constructor() {
this.metrics = {
eventsPublished: 0,
eventsConsumed: 0,
eventsByType: {},
processingTimes: [],
errors: []
};
}
recordEventPublished(eventType) {
this.metrics.eventsPublished++;
this.metrics.eventsByType[eventType] =
(this.metrics.eventsByType[eventType] || 0) + 1;
}
recordEventProcessed(eventType, processingTime) {
this.metrics.eventsConsumed++;
this.metrics.processingTimes.push({
eventType,
duration: processingTime,
timestamp: Date.now()
});
}
recordError(eventType, error) {
this.metrics.errors.push({
eventType,
error: error.message,
timestamp: Date.now()
});
}
getMetrics() {
const avgProcessingTime = this.metrics.processingTimes.length > 0
? this.metrics.processingTimes.reduce((sum, t) => sum + t.duration, 0) / this.metrics.processingTimes.length
: 0;
return {
...this.metrics,
averageProcessingTime: Math.round(avgProcessingTime),
errorRate: this.metrics.errors.length / this.metrics.eventsConsumed
};
}
}
Best Practices
1. Event Design
// ❌ Poor Event Design
const badEvent = {
type: 'Update', // ไม่ชัดเจน
data: { id: '123' } // ข้อมูลไม่ครบ
};
// ✅ Good Event Design
const goodEvent = {
eventId: 'uuid-v4',
eventType: 'CustomerAddressUpdated', // ชัดเจน, past tense
aggregateId: 'customer-123',
version: 5,
timestamp: '2025-11-01T10:00:00Z',
data: {
customerId: 'customer-123',
oldAddress: { /* complete old address */ },
newAddress: { /* complete new address */ },
updatedBy: 'user-456'
},
metadata: {
correlationId: 'req-789',
causationId: 'cmd-101',
source: 'customer-service'
}
};
2. Idempotency
class IdempotentEventHandler {
constructor() {
this.processedEvents = new Set();
}
async handle(event) {
// Check if already processed
if (this.processedEvents.has(event.eventId)) {
console.log(`Event ${event.eventId} already processed, skipping`);
return;
}
try {
await this.processEvent(event);
// Mark as processed
this.processedEvents.add(event.eventId);
await this.saveProcessedEvent(event.eventId);
} catch (error) {
// Don't mark as processed if failed
throw error;
}
}
async saveProcessedEvent(eventId) {
await this.db.query(`
INSERT INTO processed_events (event_id, processed_at)
VALUES ($1, NOW())
ON CONFLICT (event_id) DO NOTHING
`, [eventId]);
}
}
3. Event Versioning
class VersionedEventHandler {
constructor() {
this.handlers = {
'CustomerCreated': {
v1: this.handleCustomerCreatedV1.bind(this),
v2: this.handleCustomerCreatedV2.bind(this)
}
};
}
async handle(event) {
const version = event.version || 'v1';
const handler = this.handlers[event.eventType]?.[version];
if (!handler) {
throw new Error(`No handler for ${event.eventType} version ${version}`);
}
await handler(event);
}
async handleCustomerCreatedV1(event) {
// Handle old format
const { name, email } = event.data;
// Process...
}
async handleCustomerCreatedV2(event) {
// Handle new format with additional fields
const { firstName, lastName, email, phoneNumber } = event.data;
// Process...
}
}
สรุป
Event-Driven Architecture เป็นรูปแบบที่ทรงพลังสำหรับระบบสมัยใหม่ โดยให้ประโยชน์:
- Loose Coupling - ระบบไม่ผูกติดกัน
- Scalability - ขยายได้ตามความต้องการ
- Resilience - ทนต่อความผิดพลาด
- Auditability - ตรวจสอบได้ครบถ้วน
- Real-time Processing - ประมวลผลแบบเรียลไทม์
การใช้ Event Sourcing และ CQRS ร่วมกัน จะช่วยให้สามารถสร้างระบบที่มีประสิทธิภาพและยืดหยุ่นสูง แต่ต้องระวังเรื่อง Complexity และการจัดการ Eventual Consistency!