article

Database Transactions - การจัดการข้อมูลอย่างมืออาชีพ

5 min read

Transaction คืออะไร?

Transaction เป็นหน่วยงานที่เล็กที่สุดของการดำเนินงานกับฐานข้อมูลที่สามารถ Commit (บันทึก) หรือ Rollback (ยกเลิก) ได้เป็นหน่วยเดียว โดย Transaction จะต้องมีคุณสมบัติ ACID ครบถ้วน

ACID Properties คืออะไร?

A - Atomicity (ความเป็นหน่วยเดียว)

  • Transaction จะต้องเป็น “all or nothing”
  • ถ้าส่วนใดส่วนหนึ่งล้มเหลว ทั้งหมดจะต้องถูกยกเลิก
BEGIN TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
-- ถ้าการ UPDATE ใดการหนึ่งล้มเหลว ทั้งคู่จะถูก Rollback
COMMIT;

C - Consistency (ความสอดคล้อง)

  • ฐานข้อมูลจะต้องอยู่ในสถานะที่ถูกต้องเสมอ
  • ข้อมูลต้องผ่าน Business Rules และ Constraints

I - Isolation (การแยกตัว)

  • Transaction แต่ละตัวจะต้องไม่รบกวนกัน
  • มี Isolation Levels ต่างๆ ให้เลือก

D - Durability (ความคงทน)

  • เมื่อ Commit แล้วข้อมูลจะต้องถาวร แม้ระบบจะขัดข่าย

Isolation Levels

1. Read Uncommitted

  • อ่านข้อมูลที่ยังไม่ได้ Commit
  • มีปัญหา Dirty Read

2. Read Committed (Default ในหลายระบบ)

  • อ่านได้เฉพาะข้อมูลที่ Commit แล้ว
  • แต่ยังมีปัญหา Non-Repeatable Read

3. Repeatable Read

  • การอ่านซ้ำจะได้ผลลัพธ์เหมือนเดิม
  • แต่ยังมีปัญหา Phantom Read

4. Serializable

  • Isolation Level สูงสุด
  • ป้องกันปัญหาทั้งหมด แต่ Performance ต่ำที่สุด
-- ตั้ง Isolation Level
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN TRANSACTION;
-- operations here
COMMIT;

การจัดการ Transaction ในภาษาต่างๆ

Node.js with PostgreSQL

const { Client } = require('pg');

class TransactionManager {
  constructor(client) {
    this.client = client;
  }

  async executeTransaction(operations) {
    await this.client.query('BEGIN');
    try {
      const results = [];
      for (const operation of operations) {
        const result = await this.client.query(operation.query, operation.params);
        results.push(result);
      }
      await this.client.query('COMMIT');
      return results;
    } catch (error) {
      await this.client.query('ROLLBACK');
      throw error;
    }
  }
}

// การใช้งาน
const transferMoney = async (fromAccount, toAccount, amount) => {
  const operations = [
    {
      query: 'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
      params: [amount, fromAccount]
    },
    {
      query: 'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
      params: [amount, toAccount]
    }
  ];
  
  return await transactionManager.executeTransaction(operations);
};

Python with SQLAlchemy

from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager

class TransactionManager:
    def __init__(self, engine):
        self.Session = sessionmaker(bind=engine)
    
    @contextmanager
    def transaction_scope(self):
        session = self.Session()
        try:
            yield session
            session.commit()
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()

# การใช้งาน
def transfer_money(from_account_id, to_account_id, amount):
    with transaction_manager.transaction_scope() as session:
        from_account = session.query(Account).filter_by(id=from_account_id).first()
        to_account = session.query(Account).filter_by(id=to_account_id).first()
        
        if from_account.balance < amount:
            raise ValueError("Insufficient balance")
        
        from_account.balance -= amount
        to_account.balance += amount

Distributed Transactions

เมื่อต้องทำ Transaction ข้าม Database หลายตัว จะต้องใช้ Two-Phase Commit (2PC)

Two-Phase Commit Protocol

Phase 1: Prepare Phase

  1. Coordinator ส่ง “Prepare” ไปยัง Participants ทั้งหมด
  2. แต่ละ Participant เตรียมพร้อมและตอบกลับ “Yes” หรือ “No”

Phase 2: Commit Phase

  1. ถ้าทุกคนตอบ “Yes” → Coordinator ส่ง “Commit”
  2. ถ้าใครตอบ “No” → Coordinator ส่ง “Abort”
class DistributedTransactionManager {
  constructor(databases) {
    this.databases = databases;
  }

  async executeDistributedTransaction(operations) {
    // Phase 1: Prepare
    const prepared = [];
    try {
      for (let i = 0; i < this.databases.length; i++) {
        await this.databases[i].query('BEGIN');
        await this.databases[i].query(operations[i].query, operations[i].params);
        prepared.push(i);
      }

      // Phase 2: Commit
      for (const db of this.databases) {
        await db.query('COMMIT');
      }
    } catch (error) {
      // Rollback prepared transactions
      for (const index of prepared) {
        await this.databases[index].query('ROLLBACK');
      }
      throw error;
    }
  }
}

Saga Pattern สำหรับ Microservices

เนื่องจาก 2PC มีปัญหาเรื่อง Performance และ Availability ใน Microservices จึงใช้ Saga Pattern

Choreography-Based Saga

// Order Service
class OrderService {
  async createOrder(orderData) {
    const order = await this.orderRepository.save(orderData);
    
    // Publish event
    await this.eventBus.publish('OrderCreated', {
      orderId: order.id,
      customerId: order.customerId,
      amount: order.amount
    });
    
    return order;
  }

  async handlePaymentFailed(event) {
    await this.orderRepository.cancel(event.orderId);
    await this.eventBus.publish('OrderCancelled', event);
  }
}

// Payment Service
class PaymentService {
  async handleOrderCreated(event) {
    try {
      await this.processPayment(event.customerId, event.amount);
      await this.eventBus.publish('PaymentProcessed', event);
    } catch (error) {
      await this.eventBus.publish('PaymentFailed', event);
    }
  }
}

Orchestration-Based Saga

class OrderSagaOrchestrator {
  async executeOrderSaga(orderData) {
    const sagaId = this.generateSagaId();
    
    try {
      // Step 1: Create Order
      const order = await this.orderService.createOrder(orderData);
      
      // Step 2: Process Payment
      await this.paymentService.processPayment(order.customerId, order.amount);
      
      // Step 3: Reserve Inventory
      await this.inventoryService.reserve(order.items);
      
      // Step 4: Arrange Shipping
      await this.shippingService.schedule(order);
      
      await this.completeOrder(order.id);
    } catch (error) {
      await this.compensate(sagaId, error);
    }
  }

  async compensate(sagaId, error) {
    const steps = await this.getSagaSteps(sagaId);
    
    // Execute compensation in reverse order
    for (const step of steps.reverse()) {
      await step.compensate();
    }
  }
}

Best Practices

1. ใช้ Connection Pooling

const { Pool } = require('pg');

const pool = new Pool({
  user: 'username',
  host: 'localhost',
  database: 'mydb',
  password: 'password',
  port: 5432,
  max: 20, // maximum number of clients
  idleTimeoutMillis: 30000,
});

2. ตั้ง Timeout

-- PostgreSQL
SET statement_timeout = '30s';
SET lock_timeout = '10s';

3. ใช้ Retry Pattern

class RetryableTransaction {
  async execute(operation, maxRetries = 3) {
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        return await operation();
      } catch (error) {
        if (attempt === maxRetries || !this.isRetryableError(error)) {
          throw error;
        }
        await this.delay(Math.pow(2, attempt) * 1000); // Exponential backoff
      }
    }
  }

  isRetryableError(error) {
    return error.code === 'ECONNRESET' || 
           error.code === '40001' || // serialization failure
           error.code === '40P01';   // deadlock detected
  }

  delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

4. Monitor และ Logging

class TransactionMonitor {
  async executeWithMonitoring(operation) {
    const startTime = Date.now();
    const transactionId = this.generateId();
    
    console.log(`Transaction ${transactionId} started`);
    
    try {
      const result = await operation();
      const duration = Date.now() - startTime;
      
      console.log(`Transaction ${transactionId} completed in ${duration}ms`);
      this.metrics.recordSuccess(duration);
      
      return result;
    } catch (error) {
      const duration = Date.now() - startTime;
      
      console.error(`Transaction ${transactionId} failed after ${duration}ms:`, error);
      this.metrics.recordFailure(error.code);
      
      throw error;
    }
  }
}

สรุป

Transaction เป็นพื้นฐานสำคัญในการจัดการข้อมูล การเข้าใจ ACID Properties, Isolation Levels และรูปแบบการจัดการ Transaction ในระบบ Distributed จะช่วยให้เราสร้างระบบที่เสถียรและน่าเชื่อถือได้

สำหรับ Microservices ควรพิจารณาใช้ Saga Pattern แทน Traditional Transaction เพื่อ Performance และ Scalability ที่ดีกว่า

จำไว้ว่า Transaction ที่ดีคือ Transaction ที่สั้น รวดเร็ว และจัดการ Error ได้อย่างเหมาะสม!