article

Kafka ที่ทำให้ผมนอนไม่หลับ (แต่ระบบทำงานได้ 24/7)

10 min read

วันที่เจอปัญหา “Message ดันหาย!”

วันนั้นเป็นวันศุกร์ เวลาเกือบ 6 โมงเย็น กำลังจะปิดเครื่องกลับบ้าน แล้วมี notification เข้ามา:

“ผู้ใช้ร้องเรียนว่าการชำระเงินสำเร็จแล้ว แต่ไม่ได้รับ email ยืนยันครับ!” 😰

ตอนนั้นใช้ Redis pub/sub แบบง่ายๆ:

// Publisher (Payment Service)
await redisClient.publish('order_completed', JSON.stringify({
  orderId: 12345,
  userId: 678,
  email: 'user@example.com',
  amount: 1500
}));

// Subscriber (Email Service)  
redisClient.subscribe('order_completed');
redisClient.on('message', async (channel, message) => {
  const orderData = JSON.parse(message);
  await sendOrderCompletedEmail(orderData);
});

ปัญหาคือ: Redis pub/sub ไม่มี persistence ถ้า subscriber ตายก่อนได้ message มาก็หายไปเลย!

หลังจากเกิดเรื่องนี้ 3 ครั้ง ก็ตัดสินใจ migrate ไป Kafka

การเรียนรู้ครั้งแรกกับ Kafka

ครั้งแรกที่ลองใช้ Kafka งงมาก เพราะมี concept เยอะ:

  • Topics - เหมือน channels
  • Partitions - แบ่ง topic ออกเป็นส่วนๆ
  • Producers - ส่ง messages
  • Consumers - รับ messages
  • Consumer Groups - แบ่งปัน work กัน
  • Brokers - Kafka servers
  • Zookeeper - จัดการ cluster (ตอนนั้นยังใช้)

Setup แรกของผม:

# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

ความผิดพลาดแรกๆ ที่เจอ

1. Partition Count ผิด

// สร้าง topic ด้วย 1 partition
await admin.createTopics([{
  topic: 'order_events',
  numPartitions: 1, // ❌ น้อยเกินไป!
  replicationFactor: 1
}]);

ปัญหา: message ทั้งหมดไปใน partition เดียว = ไม่มี parallelism

แก้โดยเพิ่ม partitions:

await admin.createTopics([{
  topic: 'order_events',
  numPartitions: 6, // ✅ แบ่งเป็น 6 partitions
  replicationFactor: 3 // ✅ ใน production ต้องมี replication
}]);

2. Consumer Groups ไม่เข้าใจ

ตอนแรกสร้าง consumers หลายตัวแต่ไม่ใส่ groupId:

// ❌ ไม่มี groupId - consumers ทั้งหมดได้ message เดียวกัน
const consumer1 = kafka.consumer();
const consumer2 = kafka.consumer();

// ✅ ใส่ groupId - consumers แบ่งงานกัน
const consumer1 = kafka.consumer({ groupId: 'email-service' });
const consumer2 = kafka.consumer({ groupId: 'email-service' });

ผลต่าง:

  • ไม่มี groupId: ทุก consumer ได้ message ทุกข้อความ
  • มี groupId: message ถูกแบ่งให้ consumers ใน group

3. Serialization/Deserialization ที่ลืม

// Producer
await producer.send({
  topic: 'user_events',
  messages: [{
    key: userId.toString(),
    value: JSON.stringify(userData) // ❌ ลืมเช็คว่า stringify ได้ไหม
  }]
});

// Consumer  
consumer.run({
  eachMessage: async ({ message }) => {
    const data = JSON.parse(message.value); // ❌ ถ้า JSON ผิดจะ crash
  }
});

แก้โดยเพิ่ม error handling:

// Producer - validation
const sendMessage = async (topic, key, data) => {
  try {
    const serialized = JSON.stringify(data);
    await producer.send({
      topic,
      messages: [{
        key: key.toString(),
        value: serialized,
        timestamp: Date.now()
      }]
    });
  } catch (error) {
    console.error('Failed to send message:', error);
    // อาจจะ retry หรือส่งไป dead letter queue
  }
};

// Consumer - safe parsing
consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    try {
      const data = JSON.parse(message.value.toString());
      await processMessage(data);
    } catch (error) {
      console.error('Failed to process message:', {
        topic,
        partition,
        offset: message.offset,
        error: error.message
      });
      
      // ส่งไป dead letter queue หรือ skip
      await sendToDeadLetterQueue(message);
    }
  }
});

เคสจริง: Event Sourcing Architecture

หลังจากใช้ Kafka ได้สักพัก เริ่มเรียนรู้ Event Sourcing pattern:

// Events แทน direct database updates
const events = [
  {
    type: 'ORDER_CREATED',
    orderId: '12345',
    userId: '678',
    items: [{ productId: 'A', quantity: 2 }],
    timestamp: Date.now()
  },
  {
    type: 'PAYMENT_PROCESSED',
    orderId: '12345',
    paymentId: 'pay_789',
    amount: 1500,
    timestamp: Date.now()
  },
  {
    type: 'ORDER_SHIPPED', 
    orderId: '12345',
    trackingNumber: 'TH123456789',
    timestamp: Date.now()
  }
];

// Producer ส่ง events
for (const event of events) {
  await producer.send({
    topic: 'order_events',
    messages: [{
      key: event.orderId,
      value: JSON.stringify(event),
      headers: {
        eventType: event.type,
        version: '1.0'
      }
    }]
  });
}

// Consumer สร้าง projections (views)
const orderProjection = new Map();

consumer.run({
  eachMessage: async ({ message }) => {
    const event = JSON.parse(message.value.toString());
    
    switch (event.type) {
      case 'ORDER_CREATED':
        orderProjection.set(event.orderId, {
          id: event.orderId,
          userId: event.userId,
          items: event.items,
          status: 'created',
          createdAt: event.timestamp
        });
        break;
        
      case 'PAYMENT_PROCESSED':
        const order = orderProjection.get(event.orderId);
        if (order) {
          order.status = 'paid';
          order.paymentId = event.paymentId;
          order.amount = event.amount;
        }
        break;
        
      case 'ORDER_SHIPPED':
        const shippedOrder = orderProjection.get(event.orderId);
        if (shippedOrder) {
          shippedOrder.status = 'shipped';
          shippedOrder.trackingNumber = event.trackingNumber;
        }
        break;
    }
  }
});

ปัญหาใหญ่: Consumer Lag

วันหนึ่งเจอว่า consumer lag สูงมาก messages เยอะเกินกว่า consumers จะประมวลผลได้:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group email-service

# Output:
# TOPIC       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order_events    0       1000            5000           4000  😱
# order_events    1       1200            5200           4000  😱
# order_events    2       900             4900           4000  😱

สาเหตุ:

  1. Message processing ช้า (send email ใช้เวลานาน)
  2. Consumers น้อยเกินไป
  3. Partition count ไม่เพียงพอ

แก้โดย:

1. เพิ่ม Consumers

// แทนที่จะมี consumer 1 ตัว
const consumer = kafka.consumer({ groupId: 'email-service' });

// เพิ่มเป็น 3 ตัว (เท่ากับจำนวน partitions)
const consumers = [];
for (let i = 0; i < 3; i++) {
  consumers.push(kafka.consumer({ 
    groupId: 'email-service',
    clientId: `email-consumer-${i}`
  }));
}

2. Batch Processing

consumer.run({
  eachBatch: async ({ batch }) => {
    const emailBatch = [];
    
    for (const message of batch.messages) {
      const event = JSON.parse(message.value.toString());
      emailBatch.push({
        to: event.email,
        subject: 'Order Confirmation',
        template: 'order_completed',
        data: event
      });
    }
    
    // ส่ง emails แบบ batch (เร็วกว่า)
    await sendBulkEmails(emailBatch);
  }
});

3. Async Processing

// แทนที่จะ process synchronous
consumer.run({
  eachMessage: async ({ message }) => {
    await processMessage(message); // รอจนเสร็จ
  }
});

// ใช้ async queue
const queue = new Queue();

consumer.run({
  eachMessage: async ({ message }) => {
    // ใส่ใน queue แล้วก็ continue
    queue.add('process-message', {
      value: message.value.toString(),
      offset: message.offset
    });
  }
});

// Worker processes แยกต่างหาก
queue.process('process-message', async (job) => {
  await processMessage(job.data);
});

Advanced Kafka Patterns

1. Dead Letter Queues

const DLQ_TOPIC = 'failed_messages_dlq';
const MAX_RETRIES = 3;

consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const retryCount = parseInt(message.headers?.retryCount || '0');
    
    try {
      await processMessage(message);
    } catch (error) {
      if (retryCount < MAX_RETRIES) {
        // Retry
        await producer.send({
          topic,
          messages: [{
            ...message,
            headers: {
              ...message.headers,
              retryCount: (retryCount + 1).toString(),
              lastError: error.message
            }
          }]
        });
      } else {
        // ส่งไป Dead Letter Queue
        await producer.send({
          topic: DLQ_TOPIC,
          messages: [{
            key: message.key,
            value: message.value,
            headers: {
              originalTopic: topic,
              failureReason: error.message,
              retryCount: retryCount.toString()
            }
          }]
        });
      }
    }
  }
});

2. Message Deduplication

const redis = require('redis').createClient();
const DEDUP_TTL = 3600; // 1 hour

consumer.run({
  eachMessage: async ({ message }) => {
    const messageId = message.headers?.messageId;
    
    if (!messageId) {
      console.warn('Message without ID, processing anyway');
      await processMessage(message);
      return;
    }
    
    // Check if already processed
    const exists = await redis.get(`processed:${messageId}`);
    if (exists) {
      console.log(`Message ${messageId} already processed, skipping`);
      return;
    }
    
    await processMessage(message);
    
    // Mark as processed
    await redis.setex(`processed:${messageId}`, DEDUP_TTL, '1');
  }
});

3. Saga Pattern กับ Kafka

// Order Saga - orchestrator
class OrderSaga {
  constructor() {
    this.state = new Map(); // เก็บ saga state
  }
  
  async handleEvent(event) {
    const sagaId = event.orderId;
    let saga = this.state.get(sagaId) || { step: 0, orderId: sagaId };
    
    switch (event.type) {
      case 'ORDER_CREATED':
        // Step 1: Reserve inventory
        await this.sendCommand('RESERVE_INVENTORY', {
          orderId: sagaId,
          items: event.items
        });
        saga.step = 1;
        break;
        
      case 'INVENTORY_RESERVED':
        // Step 2: Process payment
        await this.sendCommand('PROCESS_PAYMENT', {
          orderId: sagaId,
          amount: event.amount
        });
        saga.step = 2;
        break;
        
      case 'PAYMENT_FAILED':
        // Compensate: Release inventory
        await this.sendCommand('RELEASE_INVENTORY', {
          orderId: sagaId
        });
        saga.status = 'failed';
        break;
        
      case 'PAYMENT_PROCESSED':
        // Success!
        await this.sendEvent('ORDER_COMPLETED', {
          orderId: sagaId
        });
        saga.status = 'completed';
        break;
    }
    
    this.state.set(sagaId, saga);
  }
  
  async sendCommand(type, data) {
    await producer.send({
      topic: 'commands',
      messages: [{
        key: data.orderId,
        value: JSON.stringify({ type, ...data })
      }]
    });
  }
  
  async sendEvent(type, data) {
    await producer.send({
      topic: 'order_events',
      messages: [{
        key: data.orderId,
        value: JSON.stringify({ type, ...data })
      }]
    });
  }
}

Kafka Monitoring & Operations

1. Key Metrics ที่ต้องดู

// Custom metrics collector
const prometheus = require('prom-client');

const kafkaLag = new prometheus.Gauge({
  name: 'kafka_consumer_lag',
  help: 'Consumer lag by topic and partition',
  labelNames: ['topic', 'partition', 'consumer_group']
});

const throughput = new prometheus.Counter({
  name: 'kafka_messages_processed_total',  
  help: 'Total processed messages',
  labelNames: ['topic', 'consumer_group', 'status']
});

// Collect metrics
setInterval(async () => {
  const admin = kafka.admin();
  const groups = await admin.listGroups();
  
  for (const group of groups.groups) {
    if (group.groupId.startsWith('my-service')) {
      const description = await admin.describeGroups([group.groupId]);
      // ... collect lag metrics
    }
  }
}, 30000);

2. Health Checks

const healthCheck = async () => {
  try {
    const admin = kafka.admin();
    
    // Check cluster health
    const metadata = await admin.fetchTopicMetadata();
    const brokerCount = Object.keys(metadata.brokers).length;
    
    if (brokerCount === 0) {
      throw new Error('No brokers available');
    }
    
    // Check consumer lag
    const consumerGroups = await admin.listGroups();
    for (const group of consumerGroups.groups) {
      const offsets = await admin.fetchOffsets({
        groupId: group.groupId,
        topics: ['order_events', 'user_events']
      });
      
      // Check if lag > threshold
      for (const topic of offsets) {
        for (const partition of topic.partitions) {
          if (partition.lag > 10000) { // threshold
            throw new Error(`High lag detected: ${partition.lag}`);
          }
        }
      }
    }
    
    return { status: 'healthy', brokers: brokerCount };
  } catch (error) {
    return { status: 'unhealthy', error: error.message };
  }
};

Kafka Streams (ขั้นสูง)

const { KafkaStreams } = require('kafka-streams');

const stream = kafkaStreams.getKStream('user_events');

// Real-time analytics
stream
  .filter(message => message.value.eventType === 'page_view')
  .groupByKey()
  .window(60 * 1000) // 1 minute windows
  .aggregate(
    () => ({ count: 0, users: new Set() }),
    (oldVal, message) => ({
      count: oldVal.count + 1,
      users: oldVal.users.add(message.value.userId)
    })
  )
  .to('page_view_stats');

// Join streams
const userStream = kafkaStreams.getKStream('user_events');
const orderStream = kafkaStreams.getKStream('order_events');

userStream
  .join(
    orderStream,
    (user, order) => ({ ...user, ...order }),
    60 * 1000, // join window
    'user_order_joined'
  )
  .to('user_behavior_analysis');

Production Lessons ที่เจ็บปวด

1. Partition Strategy

// ❌ ผิด - ใช้ random partitioning  
await producer.send({
  topic: 'user_events',
  messages: [{
    value: JSON.stringify(event) // ไม่มี key = random partition
  }]
});

// ✅ ถูก - ใช้ consistent partitioning
await producer.send({
  topic: 'user_events', 
  messages: [{
    key: event.userId, // same user = same partition = order preserved
    value: JSON.stringify(event)
  }]
});

2. Replication Factor

// ❌ Production ไม่ควรใช้
await admin.createTopics([{
  topic: 'critical_events',
  numPartitions: 6,
  replicationFactor: 1 // ถ้า broker ตาย = ข้อมูลหาย
}]);

// ✅ Production ควรใช้
await admin.createTopics([{
  topic: 'critical_events',
  numPartitions: 6,
  replicationFactor: 3, // ทนได้ 2 brokers ตาย
  configEntries: [
    { name: 'min.insync.replicas', value: '2' },
    { name: 'retention.ms', value: '604800000' } // 7 days
  ]
}]);

3. Consumer Group Coordination

# เช็ค consumer group status
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-service --verbose

# Reset offsets (ระวัง! จะ reprocess messages)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-service \
  --reset-offsets --to-earliest \
  --topic my-topic --execute

เครื่องมือที่ช่วยชีวิต

1. Kafka UI Tools

  • Kafka Magic - GUI management
  • Conduktor - Professional tool
  • Kafdrop - Web UI
  • AKHQ - Open source UI

2. CLI Tools

# Create topic
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --partitions 6 \
  --replication-factor 3

# List topics  
kafka-topics.sh --list --bootstrap-server localhost:9092

# Consume messages
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --from-beginning

# Produce messages
kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic

3. Schema Registry

// Avro schema evolution
const avro = require('avsc');

const userSchema = {
  type: 'record',
  name: 'User',
  fields: [
    { name: 'id', type: 'string' },
    { name: 'name', type: 'string' },
    { name: 'email', type: ['null', 'string'], default: null } // optional field
  ]
};

const type = avro.Type.forSchema(userSchema);

// Serialize
const buffer = type.toBuffer({ 
  id: '123', 
  name: 'John',
  email: 'john@example.com' 
});

await producer.send({
  topic: 'users',
  messages: [{
    key: '123',
    value: buffer
  }]
});

สรุป: Kafka ที่ทำให้นอนไม่หลับ

ช่วงแรก: ติดตั้งง่าย แต่เจอปัญหาเยอะ (consumer lag, message loss, rebalancing)

ช่วงกลาง: เรียนรู้ patterns และ best practices ระบบเริ่มเสถียร

ตอนนี้: ใช้เป็น backbone ของ event-driven architecture

ข้อดีที่ได้จริง:

  • Durability - messages ไม่หาย
  • Scalability - รับได้หลายล้าน messages/second
  • Fault tolerance - broker ตายไม่กระทบระบบ
  • Replay capability - ย้อนดู events ได้
  • Real-time processing - latency ต่ำมาก

ข้อเสียที่ต้องทน:

  • Operational complexity - setup และ monitoring ซับซ้อน
  • Resource hungry - กิน RAM และ disk เยอะ
  • Learning curve - concept เยอะต้องเรียนรู้
  • Network overhead - replication ใช้ bandwidth

คำแนะนำสุดท้าย:

  1. เริ่มจาก simple use case ก่อน
  2. Monitor everything - metrics มีไว้ดู
  3. Plan capacity - disk, memory, network
  4. Test failure scenarios - broker ตาย, network partition
  5. Understand trade-offs - consistency vs availability

Kafka มันเหมือน เครื่องยนต์เจ็ท powerful มาก แต่ต้องบำรุงรักษาให้ดี

เพราะพอมีปัญหา มันจะทำให้ ระบบทั้งหมดล่ม ได้! ✈️😅

แต่พอใช้เป็นแล้ว การันตีได้ว่า ไม่อยากกลับไปใช้ simple message queue แล้ว 🚀