Transaction คืออะไร?
Transaction เป็นหน่วยงานที่เล็กที่สุดของการดำเนินงานกับฐานข้อมูลที่สามารถ Commit (บันทึก) หรือ Rollback (ยกเลิก) ได้เป็นหน่วยเดียว โดย Transaction จะต้องมีคุณสมบัติ ACID ครบถ้วน
ACID Properties คืออะไร?
A - Atomicity (ความเป็นหน่วยเดียว)
- Transaction จะต้องเป็น “all or nothing”
- ถ้าส่วนใดส่วนหนึ่งล้มเหลว ทั้งหมดจะต้องถูกยกเลิก
BEGIN TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
-- ถ้าการ UPDATE ใดการหนึ่งล้มเหลว ทั้งคู่จะถูก Rollback
COMMIT;
C - Consistency (ความสอดคล้อง)
- ฐานข้อมูลจะต้องอยู่ในสถานะที่ถูกต้องเสมอ
- ข้อมูลต้องผ่าน Business Rules และ Constraints
I - Isolation (การแยกตัว)
- Transaction แต่ละตัวจะต้องไม่รบกวนกัน
- มี Isolation Levels ต่างๆ ให้เลือก
D - Durability (ความคงทน)
- เมื่อ Commit แล้วข้อมูลจะต้องถาวร แม้ระบบจะขัดข่าย
Isolation Levels
1. Read Uncommitted
- อ่านข้อมูลที่ยังไม่ได้ Commit
- มีปัญหา Dirty Read
2. Read Committed (Default ในหลายระบบ)
- อ่านได้เฉพาะข้อมูลที่ Commit แล้ว
- แต่ยังมีปัญหา Non-Repeatable Read
3. Repeatable Read
- การอ่านซ้ำจะได้ผลลัพธ์เหมือนเดิม
- แต่ยังมีปัญหา Phantom Read
4. Serializable
- Isolation Level สูงสุด
- ป้องกันปัญหาทั้งหมด แต่ Performance ต่ำที่สุด
-- ตั้ง Isolation Level
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN TRANSACTION;
-- operations here
COMMIT;
การจัดการ Transaction ในภาษาต่างๆ
Node.js with PostgreSQL
const { Client } = require('pg');
class TransactionManager {
constructor(client) {
this.client = client;
}
async executeTransaction(operations) {
await this.client.query('BEGIN');
try {
const results = [];
for (const operation of operations) {
const result = await this.client.query(operation.query, operation.params);
results.push(result);
}
await this.client.query('COMMIT');
return results;
} catch (error) {
await this.client.query('ROLLBACK');
throw error;
}
}
}
// การใช้งาน
const transferMoney = async (fromAccount, toAccount, amount) => {
const operations = [
{
query: 'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
params: [amount, fromAccount]
},
{
query: 'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
params: [amount, toAccount]
}
];
return await transactionManager.executeTransaction(operations);
};
Python with SQLAlchemy
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
class TransactionManager:
def __init__(self, engine):
self.Session = sessionmaker(bind=engine)
@contextmanager
def transaction_scope(self):
session = self.Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# การใช้งาน
def transfer_money(from_account_id, to_account_id, amount):
with transaction_manager.transaction_scope() as session:
from_account = session.query(Account).filter_by(id=from_account_id).first()
to_account = session.query(Account).filter_by(id=to_account_id).first()
if from_account.balance < amount:
raise ValueError("Insufficient balance")
from_account.balance -= amount
to_account.balance += amount
Distributed Transactions
เมื่อต้องทำ Transaction ข้าม Database หลายตัว จะต้องใช้ Two-Phase Commit (2PC)
Two-Phase Commit Protocol
Phase 1: Prepare Phase
- Coordinator ส่ง “Prepare” ไปยัง Participants ทั้งหมด
- แต่ละ Participant เตรียมพร้อมและตอบกลับ “Yes” หรือ “No”
Phase 2: Commit Phase
- ถ้าทุกคนตอบ “Yes” → Coordinator ส่ง “Commit”
- ถ้าใครตอบ “No” → Coordinator ส่ง “Abort”
class DistributedTransactionManager {
constructor(databases) {
this.databases = databases;
}
async executeDistributedTransaction(operations) {
// Phase 1: Prepare
const prepared = [];
try {
for (let i = 0; i < this.databases.length; i++) {
await this.databases[i].query('BEGIN');
await this.databases[i].query(operations[i].query, operations[i].params);
prepared.push(i);
}
// Phase 2: Commit
for (const db of this.databases) {
await db.query('COMMIT');
}
} catch (error) {
// Rollback prepared transactions
for (const index of prepared) {
await this.databases[index].query('ROLLBACK');
}
throw error;
}
}
}
Saga Pattern สำหรับ Microservices
เนื่องจาก 2PC มีปัญหาเรื่อง Performance และ Availability ใน Microservices จึงใช้ Saga Pattern
Choreography-Based Saga
// Order Service
class OrderService {
async createOrder(orderData) {
const order = await this.orderRepository.save(orderData);
// Publish event
await this.eventBus.publish('OrderCreated', {
orderId: order.id,
customerId: order.customerId,
amount: order.amount
});
return order;
}
async handlePaymentFailed(event) {
await this.orderRepository.cancel(event.orderId);
await this.eventBus.publish('OrderCancelled', event);
}
}
// Payment Service
class PaymentService {
async handleOrderCreated(event) {
try {
await this.processPayment(event.customerId, event.amount);
await this.eventBus.publish('PaymentProcessed', event);
} catch (error) {
await this.eventBus.publish('PaymentFailed', event);
}
}
}
Orchestration-Based Saga
class OrderSagaOrchestrator {
async executeOrderSaga(orderData) {
const sagaId = this.generateSagaId();
try {
// Step 1: Create Order
const order = await this.orderService.createOrder(orderData);
// Step 2: Process Payment
await this.paymentService.processPayment(order.customerId, order.amount);
// Step 3: Reserve Inventory
await this.inventoryService.reserve(order.items);
// Step 4: Arrange Shipping
await this.shippingService.schedule(order);
await this.completeOrder(order.id);
} catch (error) {
await this.compensate(sagaId, error);
}
}
async compensate(sagaId, error) {
const steps = await this.getSagaSteps(sagaId);
// Execute compensation in reverse order
for (const step of steps.reverse()) {
await step.compensate();
}
}
}
Best Practices
1. ใช้ Connection Pooling
const { Pool } = require('pg');
const pool = new Pool({
user: 'username',
host: 'localhost',
database: 'mydb',
password: 'password',
port: 5432,
max: 20, // maximum number of clients
idleTimeoutMillis: 30000,
});
2. ตั้ง Timeout
-- PostgreSQL
SET statement_timeout = '30s';
SET lock_timeout = '10s';
3. ใช้ Retry Pattern
class RetryableTransaction {
async execute(operation, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
if (attempt === maxRetries || !this.isRetryableError(error)) {
throw error;
}
await this.delay(Math.pow(2, attempt) * 1000); // Exponential backoff
}
}
}
isRetryableError(error) {
return error.code === 'ECONNRESET' ||
error.code === '40001' || // serialization failure
error.code === '40P01'; // deadlock detected
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
4. Monitor และ Logging
class TransactionMonitor {
async executeWithMonitoring(operation) {
const startTime = Date.now();
const transactionId = this.generateId();
console.log(`Transaction ${transactionId} started`);
try {
const result = await operation();
const duration = Date.now() - startTime;
console.log(`Transaction ${transactionId} completed in ${duration}ms`);
this.metrics.recordSuccess(duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
console.error(`Transaction ${transactionId} failed after ${duration}ms:`, error);
this.metrics.recordFailure(error.code);
throw error;
}
}
}
สรุป
Transaction เป็นพื้นฐานสำคัญในการจัดการข้อมูล การเข้าใจ ACID Properties, Isolation Levels และรูปแบบการจัดการ Transaction ในระบบ Distributed จะช่วยให้เราสร้างระบบที่เสถียรและน่าเชื่อถือได้
สำหรับ Microservices ควรพิจารณาใช้ Saga Pattern แทน Traditional Transaction เพื่อ Performance และ Scalability ที่ดีกว่า
จำไว้ว่า Transaction ที่ดีคือ Transaction ที่สั้น รวดเร็ว และจัดการ Error ได้อย่างเหมาะสม!