Saga Pattern คืออะไร?
Saga Pattern เป็นรูปแบบการออกแบบสำหรับจัดการ Long-Running Transactions ที่กระจายไปหลายๆ Services ใน Microservices Architecture โดยไม่ใช้ Traditional ACID Transactions
ปัญหาของ Distributed Transactions
ใน Monolith เราสามารถใช้ Database Transaction ธรรมดา:
BEGIN TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
แต่ใน Microservices แต่ละ Service มี Database ของตัวเอง:
graph LR
A[Order Service] --> B[Order DB]
C[Payment Service] --> D[Payment DB]
E[Inventory Service] --> F[Inventory DB]
G[Shipping Service] --> H[Shipping DB]
Two-Phase Commit (2PC) มีปัญหา:
- Performance: ช้า เพราะต้องรอทุก Service
- Availability: ถ้า Coordinator ล้ม ทั้งระบบหยุด
- Scalability: ไม่เหมาะกับ Microservices ขนาดใหญ่
Saga คือคำตอบ
Saga แบ่ง Long-Running Transaction เป็น Local Transactions หลายๆ ตัว โดยมี Compensation Actions สำหรับ Rollback
รูปแบบของ Saga
1. Choreography-Based Saga
Services สื่อสารกันผ่าน Events โดยไม่มี Central Controller
// Order Service
class OrderService {
async createOrder(orderData) {
try {
// Local transaction
const order = await this.orderRepository.save({
...orderData,
status: 'PENDING'
});
// Publish event
await this.eventBus.publish('OrderCreated', {
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount
});
return order;
} catch (error) {
await this.eventBus.publish('OrderCreationFailed', {
orderId: orderData.id,
reason: error.message
});
throw error;
}
}
// Compensation handler
async handlePaymentFailed(event) {
const { orderId, reason } = event;
// Compensate: Cancel order
await this.orderRepository.updateStatus(orderId, 'CANCELLED');
await this.eventBus.publish('OrderCancelled', {
orderId,
reason: `Payment failed: ${reason}`
});
}
async handleInventoryReservationFailed(event) {
const { orderId } = event;
// Compensate: Cancel order and request payment refund
await this.orderRepository.updateStatus(orderId, 'CANCELLED');
await this.eventBus.publish('OrderCancelled', { orderId });
await this.eventBus.publish('RefundRequested', {
orderId,
reason: 'Inventory not available'
});
}
}
// Payment Service
class PaymentService {
async handleOrderCreated(event) {
const { orderId, customerId, totalAmount } = event;
try {
// Local transaction
const payment = await this.processPayment(customerId, totalAmount);
await this.eventBus.publish('PaymentProcessed', {
orderId,
paymentId: payment.id,
amount: payment.amount
});
} catch (error) {
await this.eventBus.publish('PaymentFailed', {
orderId,
reason: error.message
});
}
}
async handleRefundRequested(event) {
const { orderId } = event;
try {
// Find payment by order ID
const payment = await this.paymentRepository.findByOrderId(orderId);
if (payment && payment.status === 'COMPLETED') {
await this.refundPayment(payment.id);
await this.eventBus.publish('PaymentRefunded', {
orderId,
paymentId: payment.id,
amount: payment.amount
});
}
} catch (error) {
await this.eventBus.publish('RefundFailed', {
orderId,
reason: error.message
});
}
}
}
// Inventory Service
class InventoryService {
async handlePaymentProcessed(event) {
const { orderId, paymentId } = event;
try {
// Get order items (might need to call Order Service or use event data)
const orderItems = await this.getOrderItems(orderId);
// Local transaction: Reserve inventory
const reservation = await this.reserveItems(orderItems);
await this.eventBus.publish('InventoryReserved', {
orderId,
reservationId: reservation.id,
items: reservation.items
});
} catch (error) {
await this.eventBus.publish('InventoryReservationFailed', {
orderId,
reason: error.message
});
}
}
async handleOrderCancelled(event) {
const { orderId } = event;
try {
// Compensate: Release reserved inventory
const reservation = await this.reservationRepository.findByOrderId(orderId);
if (reservation) {
await this.releaseReservation(reservation.id);
await this.eventBus.publish('InventoryReleased', {
orderId,
reservationId: reservation.id
});
}
} catch (error) {
console.error('Failed to release inventory:', error);
}
}
}
// Shipping Service
class ShippingService {
async handleInventoryReserved(event) {
const { orderId, items } = event;
try {
// Local transaction: Schedule shipping
const shipment = await this.scheduleShipment(orderId, items);
await this.eventBus.publish('ShippingScheduled', {
orderId,
shipmentId: shipment.id,
estimatedDelivery: shipment.estimatedDelivery
});
} catch (error) {
await this.eventBus.publish('ShippingSchedulingFailed', {
orderId,
reason: error.message
});
}
}
async handleOrderCancelled(event) {
const { orderId } = event;
try {
// Compensate: Cancel shipping
const shipment = await this.shipmentRepository.findByOrderId(orderId);
if (shipment && shipment.status !== 'SHIPPED') {
await this.cancelShipment(shipment.id);
await this.eventBus.publish('ShippingCancelled', {
orderId,
shipmentId: shipment.id
});
}
} catch (error) {
console.error('Failed to cancel shipping:', error);
}
}
}
2. Orchestration-Based Saga
มี Saga Orchestrator ที่ควบคุม Workflow
class OrderSagaOrchestrator {
constructor(
orderService,
paymentService,
inventoryService,
shippingService,
sagaRepository
) {
this.orderService = orderService;
this.paymentService = paymentService;
this.inventoryService = inventoryService;
this.shippingService = shippingService;
this.sagaRepository = sagaRepository;
}
async processOrder(orderData) {
const sagaId = this.generateSagaId();
// Create saga instance
const saga = new OrderSaga(sagaId, orderData);
await this.sagaRepository.save(saga);
try {
// Step 1: Create Order
const order = await this.orderService.createOrder(orderData);
saga.addStep('CREATE_ORDER', order);
await this.sagaRepository.save(saga);
// Step 2: Process Payment
const payment = await this.paymentService.processPayment(
order.customerId,
order.totalAmount
);
saga.addStep('PROCESS_PAYMENT', payment);
await this.sagaRepository.save(saga);
// Step 3: Reserve Inventory
const reservation = await this.inventoryService.reserveItems(order.items);
saga.addStep('RESERVE_INVENTORY', reservation);
await this.sagaRepository.save(saga);
// Step 4: Schedule Shipping
const shipment = await this.shippingService.scheduleShipment(order);
saga.addStep('SCHEDULE_SHIPPING', shipment);
await this.sagaRepository.save(saga);
// Mark saga as completed
saga.complete();
await this.sagaRepository.save(saga);
return { success: true, orderId: order.id };
} catch (error) {
console.error('Saga failed:', error);
await this.compensate(saga, error);
throw error;
}
}
async compensate(saga, error) {
saga.markAsFailed(error.message);
// Execute compensation in reverse order
const completedSteps = saga.getCompletedSteps().reverse();
for (const step of completedSteps) {
try {
await this.executeCompensation(step);
saga.addCompensation(step.name, { success: true });
} catch (compensationError) {
console.error(`Compensation failed for ${step.name}:`, compensationError);
saga.addCompensation(step.name, {
success: false,
error: compensationError.message
});
}
}
await this.sagaRepository.save(saga);
}
async executeCompensation(step) {
switch (step.name) {
case 'SCHEDULE_SHIPPING':
if (step.data.id) {
await this.shippingService.cancelShipment(step.data.id);
}
break;
case 'RESERVE_INVENTORY':
if (step.data.reservationId) {
await this.inventoryService.releaseReservation(step.data.reservationId);
}
break;
case 'PROCESS_PAYMENT':
if (step.data.id) {
await this.paymentService.refundPayment(step.data.id);
}
break;
case 'CREATE_ORDER':
if (step.data.id) {
await this.orderService.cancelOrder(step.data.id);
}
break;
}
}
generateSagaId() {
return require('uuid').v4();
}
}
// Saga State Management
class OrderSaga {
constructor(id, orderData) {
this.id = id;
this.orderData = orderData;
this.status = 'STARTED';
this.steps = [];
this.compensations = [];
this.startTime = new Date();
this.endTime = null;
this.error = null;
}
addStep(name, data) {
this.steps.push({
name,
data,
timestamp: new Date(),
status: 'COMPLETED'
});
}
addCompensation(stepName, result) {
this.compensations.push({
stepName,
result,
timestamp: new Date()
});
}
complete() {
this.status = 'COMPLETED';
this.endTime = new Date();
}
markAsFailed(errorMessage) {
this.status = 'COMPENSATING';
this.error = errorMessage;
}
getCompletedSteps() {
return this.steps.filter(step => step.status === 'COMPLETED');
}
getDuration() {
const end = this.endTime || new Date();
return end.getTime() - this.startTime.getTime();
}
}
Saga State Management
Database Schema for Orchestration
-- Saga instances
CREATE TABLE sagas (
id VARCHAR(36) PRIMARY KEY,
type VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
data JSONB NOT NULL,
error_message TEXT,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP
);
-- Saga steps
CREATE TABLE saga_steps (
id SERIAL PRIMARY KEY,
saga_id VARCHAR(36) REFERENCES sagas(id),
step_name VARCHAR(100) NOT NULL,
step_data JSONB,
status VARCHAR(20) NOT NULL,
error_message TEXT,
executed_at TIMESTAMP DEFAULT NOW(),
compensated_at TIMESTAMP
);
-- Indexes
CREATE INDEX idx_sagas_status ON sagas(status);
CREATE INDEX idx_sagas_type_status ON sagas(type, status);
CREATE INDEX idx_saga_steps_saga_id ON saga_steps(saga_id);
Saga Repository Implementation
class SagaRepository {
constructor(database) {
this.db = database;
}
async save(saga) {
const client = await this.db.getConnection();
try {
await client.query('BEGIN');
// Upsert saga
await client.query(`
INSERT INTO sagas (id, type, status, data, error_message, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (id)
DO UPDATE SET
status = $3,
data = $4,
error_message = $5,
updated_at = NOW()
`, [
saga.id,
saga.constructor.name,
saga.status,
JSON.stringify(saga),
saga.error
]);
// Insert new steps
for (const step of saga.getNewSteps()) {
await client.query(`
INSERT INTO saga_steps (saga_id, step_name, step_data, status)
VALUES ($1, $2, $3, $4)
`, [
saga.id,
step.name,
JSON.stringify(step.data),
step.status
]);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async findById(sagaId) {
const result = await this.db.query(`
SELECT s.*,
COALESCE(
json_agg(
json_build_object(
'name', st.step_name,
'data', st.step_data,
'status', st.status,
'executed_at', st.executed_at
) ORDER BY st.executed_at
) FILTER (WHERE st.id IS NOT NULL),
'[]'
) as steps
FROM sagas s
LEFT JOIN saga_steps st ON s.id = st.saga_id
WHERE s.id = $1
GROUP BY s.id
`, [sagaId]);
if (result.rows.length === 0) {
return null;
}
return this.mapToSaga(result.rows[0]);
}
async findPendingSagas(limit = 100) {
const result = await this.db.query(`
SELECT * FROM sagas
WHERE status IN ('STARTED', 'COMPENSATING')
ORDER BY created_at ASC
LIMIT $1
`, [limit]);
return result.rows.map(row => this.mapToSaga(row));
}
mapToSaga(row) {
const sagaData = JSON.parse(row.data);
const saga = new OrderSaga(row.id, sagaData.orderData);
saga.status = row.status;
saga.steps = row.steps || [];
saga.error = row.error_message;
return saga;
}
}
Error Handling และ Recovery
Retry Mechanism
class ResilientSagaOrchestrator {
constructor(sagaRepository, maxRetries = 3) {
this.sagaRepository = sagaRepository;
this.maxRetries = maxRetries;
}
async executeStep(saga, stepName, operation) {
let attempt = 0;
while (attempt <= this.maxRetries) {
try {
const result = await operation();
saga.addStep(stepName, result);
return result;
} catch (error) {
attempt++;
if (attempt > this.maxRetries) {
throw new Error(`Step ${stepName} failed after ${this.maxRetries} retries: ${error.message}`);
}
// Exponential backoff
const delay = Math.pow(2, attempt) * 1000;
await this.sleep(delay);
console.log(`Retrying step ${stepName}, attempt ${attempt}`);
}
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Saga Recovery Service
class SagaRecoveryService {
constructor(sagaRepository, orchestrator) {
this.sagaRepository = sagaRepository;
this.orchestrator = orchestrator;
}
async recoverPendingSagas() {
const pendingSagas = await this.sagaRepository.findPendingSagas();
console.log(`Found ${pendingSagas.length} pending sagas`);
for (const saga of pendingSagas) {
try {
await this.recoverSaga(saga);
} catch (error) {
console.error(`Failed to recover saga ${saga.id}:`, error);
}
}
}
async recoverSaga(saga) {
if (saga.status === 'COMPENSATING') {
// Continue compensation
await this.orchestrator.continueCompensation(saga);
} else {
// Determine next step and continue
const nextStep = this.determineNextStep(saga);
if (nextStep) {
await this.orchestrator.continueFromStep(saga, nextStep);
} else {
// All steps completed, mark as done
saga.complete();
await this.sagaRepository.save(saga);
}
}
}
determineNextStep(saga) {
const completedSteps = saga.getCompletedSteps().map(s => s.name);
const allSteps = ['CREATE_ORDER', 'PROCESS_PAYMENT', 'RESERVE_INVENTORY', 'SCHEDULE_SHIPPING'];
for (const step of allSteps) {
if (!completedSteps.includes(step)) {
return step;
}
}
return null; // All steps completed
}
// Run recovery periodically
startRecoveryScheduler(intervalMs = 60000) {
setInterval(async () => {
try {
await this.recoverPendingSagas();
} catch (error) {
console.error('Recovery scheduler error:', error);
}
}, intervalMs);
}
}
Monitoring และ Observability
Saga Metrics
class SagaMetrics {
constructor() {
this.metrics = {
sagasStarted: 0,
sagasCompleted: 0,
sagasFailed: 0,
compensationsExecuted: 0,
averageExecutionTime: 0,
stepExecutions: {},
errorsByStep: {}
};
}
recordSagaStarted(sagaType) {
this.metrics.sagasStarted++;
this.recordEvent('saga_started', { type: sagaType });
}
recordSagaCompleted(sagaType, duration) {
this.metrics.sagasCompleted++;
this.updateAverageExecutionTime(duration);
this.recordEvent('saga_completed', { type: sagaType, duration });
}
recordSagaFailed(sagaType, stepName, error) {
this.metrics.sagasFailed++;
if (!this.metrics.errorsByStep[stepName]) {
this.metrics.errorsByStep[stepName] = 0;
}
this.metrics.errorsByStep[stepName]++;
this.recordEvent('saga_failed', {
type: sagaType,
step: stepName,
error: error.message
});
}
recordStepExecution(stepName, success, duration) {
if (!this.metrics.stepExecutions[stepName]) {
this.metrics.stepExecutions[stepName] = {
total: 0,
successful: 0,
failed: 0,
avgDuration: 0
};
}
const stepMetric = this.metrics.stepExecutions[stepName];
stepMetric.total++;
if (success) {
stepMetric.successful++;
} else {
stepMetric.failed++;
}
stepMetric.avgDuration = (stepMetric.avgDuration + duration) / stepMetric.total;
}
getSuccessRate() {
const total = this.metrics.sagasCompleted + this.metrics.sagasFailed;
return total > 0 ? (this.metrics.sagasCompleted / total * 100).toFixed(2) : 0;
}
updateAverageExecutionTime(duration) {
const total = this.metrics.sagasCompleted;
this.metrics.averageExecutionTime =
((this.metrics.averageExecutionTime * (total - 1)) + duration) / total;
}
recordEvent(type, data) {
console.log(`[SagaMetrics] ${type}:`, data);
// Send to monitoring system (Prometheus, DataDog, etc.)
if (global.monitoringClient) {
global.monitoringClient.recordEvent(type, data);
}
}
exportMetrics() {
return {
...this.metrics,
successRate: `${this.getSuccessRate()}%`,
timestamp: new Date().toISOString()
};
}
}
Saga Dashboard
class SagaDashboard {
constructor(sagaRepository, metrics) {
this.sagaRepository = sagaRepository;
this.metrics = metrics;
}
async getDashboardData() {
const [
activeSagas,
recentFailures,
statistics
] = await Promise.all([
this.getActiveSagas(),
this.getRecentFailures(),
this.getStatistics()
]);
return {
activeSagas,
recentFailures,
statistics,
metrics: this.metrics.exportMetrics()
};
}
async getActiveSagas() {
const result = await this.sagaRepository.db.query(`
SELECT
s.id,
s.type,
s.status,
s.created_at,
EXTRACT(EPOCH FROM (NOW() - s.created_at)) as duration_seconds,
COUNT(st.id) as steps_completed
FROM sagas s
LEFT JOIN saga_steps st ON s.id = st.saga_id
WHERE s.status IN ('STARTED', 'COMPENSATING')
GROUP BY s.id, s.type, s.status, s.created_at
ORDER BY s.created_at DESC
LIMIT 50
`);
return result.rows;
}
async getRecentFailures(hours = 24) {
const result = await this.sagaRepository.db.query(`
SELECT
s.id,
s.type,
s.error_message,
s.updated_at,
(
SELECT step_name
FROM saga_steps st
WHERE st.saga_id = s.id
ORDER BY st.executed_at DESC
LIMIT 1
) as last_step
FROM sagas s
WHERE s.status = 'FAILED'
AND s.updated_at > NOW() - INTERVAL '${hours} hours'
ORDER BY s.updated_at DESC
LIMIT 20
`);
return result.rows;
}
async getStatistics() {
const result = await this.sagaRepository.db.query(`
SELECT
COUNT(*) as total_sagas,
COUNT(*) FILTER (WHERE status = 'COMPLETED') as completed_sagas,
COUNT(*) FILTER (WHERE status = 'FAILED') as failed_sagas,
COUNT(*) FILTER (WHERE status IN ('STARTED', 'COMPENSATING')) as active_sagas,
AVG(EXTRACT(EPOCH FROM (COALESCE(completed_at, NOW()) - created_at))) as avg_duration_seconds
FROM sagas
WHERE created_at > NOW() - INTERVAL '24 hours'
`);
return result.rows[0];
}
}
Best Practices
1. Idempotency
class IdempotentSagaStep {
constructor(stepId, operation) {
this.stepId = stepId;
this.operation = operation;
this.executedSteps = new Set();
}
async execute(data) {
// Check if already executed
if (this.executedSteps.has(this.stepId)) {
console.log(`Step ${this.stepId} already executed, skipping`);
return await this.getExistingResult(this.stepId);
}
try {
const result = await this.operation(data);
// Store result for idempotency
await this.storeStepResult(this.stepId, result);
this.executedSteps.add(this.stepId);
return result;
} catch (error) {
// Don't mark as executed if failed
throw error;
}
}
async storeStepResult(stepId, result) {
// Store in database or cache for idempotency checks
await this.db.query(`
INSERT INTO step_results (step_id, result, created_at)
VALUES ($1, $2, NOW())
ON CONFLICT (step_id) DO NOTHING
`, [stepId, JSON.stringify(result)]);
}
async getExistingResult(stepId) {
const result = await this.db.query(`
SELECT result FROM step_results WHERE step_id = $1
`, [stepId]);
return result.rows[0] ? JSON.parse(result.rows[0].result) : null;
}
}
2. Timeout Handling
class TimeoutAwareSagaStep {
constructor(operation, timeoutMs = 30000) {
this.operation = operation;
this.timeoutMs = timeoutMs;
}
async execute(data) {
return Promise.race([
this.operation(data),
this.createTimeoutPromise()
]);
}
createTimeoutPromise() {
return new Promise((_, reject) => {
setTimeout(() => {
reject(new Error(`Operation timed out after ${this.timeoutMs}ms`));
}, this.timeoutMs);
});
}
}
3. Compensation Design
class CompensatableAction {
constructor(action, compensation) {
this.action = action;
this.compensation = compensation;
}
async execute(data) {
const result = await this.action(data);
// Store compensation data
result.compensationData = {
originalData: data,
actionResult: result
};
return result;
}
async compensate(result) {
if (!result.compensationData) {
throw new Error('No compensation data available');
}
return await this.compensation(result.compensationData);
}
}
// การใช้งาน
const paymentAction = new CompensatableAction(
// Action
async (data) => {
return await paymentService.processPayment(data.customerId, data.amount);
},
// Compensation
async (compensationData) => {
const { actionResult } = compensationData;
return await paymentService.refundPayment(actionResult.paymentId);
}
);
เปรียบเทียบ Choreography vs Orchestration
Choreography
ข้อดี:
- ✅ Loose Coupling
- ✅ High Scalability
- ✅ No Single Point of Failure
- ✅ Event-Driven Architecture
ข้อเสียง:
- ❌ Complex Debugging
- ❌ Hard to Track Flow
- ❌ Cyclic Dependencies Risk
- ❌ Eventually Consistent
Orchestration
ข้อดี:
- ✅ Clear Control Flow
- ✅ Easy to Debug
- ✅ Centralized Monitoring
- ✅ Better Error Handling
ข้อเสีย:
- ❌ Tighter Coupling
- ❌ Single Point of Failure
- ❌ Performance Bottleneck
- ❌ More Complex Implementation
สรุป
Saga Pattern เป็นเครื่องมือที่สำคัญสำหรับการจัดการ Distributed Transactions ใน Microservices:
- เลือกรูปแบบให้เหมาะสม: Choreography สำหรับ Simple Flow, Orchestration สำหรับ Complex Flow
- ออกแบบ Compensation ให้ดี: ทุก Action ต้องมี Compensation ที่ใช้งานได้
- จัดการ Idempotency: ทุก Operation ต้องสามารถเรียกซ้ำได้
- Monitor และ Observe: ติดตาม Saga State และ Performance
- Plan for Failures: เตรียมพร้อมสำหรับ Recovery และ Rollback
Saga Pattern ช่วยให้เราสร้าง Resilient Microservices ที่สามารถจัดการกับความซับซ้อนของ Distributed Systems ได้อย่างมีประสิทธิภาพ!