วันที่เจอปัญหา “Message ดันหาย!”
วันนั้นเป็นวันศุกร์ เวลาเกือบ 6 โมงเย็น กำลังจะปิดเครื่องกลับบ้าน แล้วมี notification เข้ามา:
“ผู้ใช้ร้องเรียนว่าการชำระเงินสำเร็จแล้ว แต่ไม่ได้รับ email ยืนยันครับ!” 😰
ตอนนั้นใช้ Redis pub/sub แบบง่ายๆ:
// Publisher (Payment Service)
await redisClient.publish('order_completed', JSON.stringify({
orderId: 12345,
userId: 678,
email: 'user@example.com',
amount: 1500
}));
// Subscriber (Email Service)
redisClient.subscribe('order_completed');
redisClient.on('message', async (channel, message) => {
const orderData = JSON.parse(message);
await sendOrderCompletedEmail(orderData);
});
ปัญหาคือ: Redis pub/sub ไม่มี persistence ถ้า subscriber ตายก่อนได้ message มาก็หายไปเลย!
หลังจากเกิดเรื่องนี้ 3 ครั้ง ก็ตัดสินใจ migrate ไป Kafka
การเรียนรู้ครั้งแรกกับ Kafka
ครั้งแรกที่ลองใช้ Kafka งงมาก เพราะมี concept เยอะ:
- Topics - เหมือน channels
- Partitions - แบ่ง topic ออกเป็นส่วนๆ
- Producers - ส่ง messages
- Consumers - รับ messages
- Consumer Groups - แบ่งปัน work กัน
- Brokers - Kafka servers
- Zookeeper - จัดการ cluster (ตอนนั้นยังใช้)
Setup แรกของผม:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ความผิดพลาดแรกๆ ที่เจอ
1. Partition Count ผิด
// สร้าง topic ด้วย 1 partition
await admin.createTopics([{
topic: 'order_events',
numPartitions: 1, // ❌ น้อยเกินไป!
replicationFactor: 1
}]);
ปัญหา: message ทั้งหมดไปใน partition เดียว = ไม่มี parallelism
แก้โดยเพิ่ม partitions:
await admin.createTopics([{
topic: 'order_events',
numPartitions: 6, // ✅ แบ่งเป็น 6 partitions
replicationFactor: 3 // ✅ ใน production ต้องมี replication
}]);
2. Consumer Groups ไม่เข้าใจ
ตอนแรกสร้าง consumers หลายตัวแต่ไม่ใส่ groupId:
// ❌ ไม่มี groupId - consumers ทั้งหมดได้ message เดียวกัน
const consumer1 = kafka.consumer();
const consumer2 = kafka.consumer();
// ✅ ใส่ groupId - consumers แบ่งงานกัน
const consumer1 = kafka.consumer({ groupId: 'email-service' });
const consumer2 = kafka.consumer({ groupId: 'email-service' });
ผลต่าง:
- ไม่มี groupId: ทุก consumer ได้ message ทุกข้อความ
- มี groupId: message ถูกแบ่งให้ consumers ใน group
3. Serialization/Deserialization ที่ลืม
// Producer
await producer.send({
topic: 'user_events',
messages: [{
key: userId.toString(),
value: JSON.stringify(userData) // ❌ ลืมเช็คว่า stringify ได้ไหม
}]
});
// Consumer
consumer.run({
eachMessage: async ({ message }) => {
const data = JSON.parse(message.value); // ❌ ถ้า JSON ผิดจะ crash
}
});
แก้โดยเพิ่ม error handling:
// Producer - validation
const sendMessage = async (topic, key, data) => {
try {
const serialized = JSON.stringify(data);
await producer.send({
topic,
messages: [{
key: key.toString(),
value: serialized,
timestamp: Date.now()
}]
});
} catch (error) {
console.error('Failed to send message:', error);
// อาจจะ retry หรือส่งไป dead letter queue
}
};
// Consumer - safe parsing
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const data = JSON.parse(message.value.toString());
await processMessage(data);
} catch (error) {
console.error('Failed to process message:', {
topic,
partition,
offset: message.offset,
error: error.message
});
// ส่งไป dead letter queue หรือ skip
await sendToDeadLetterQueue(message);
}
}
});
เคสจริง: Event Sourcing Architecture
หลังจากใช้ Kafka ได้สักพัก เริ่มเรียนรู้ Event Sourcing pattern:
// Events แทน direct database updates
const events = [
{
type: 'ORDER_CREATED',
orderId: '12345',
userId: '678',
items: [{ productId: 'A', quantity: 2 }],
timestamp: Date.now()
},
{
type: 'PAYMENT_PROCESSED',
orderId: '12345',
paymentId: 'pay_789',
amount: 1500,
timestamp: Date.now()
},
{
type: 'ORDER_SHIPPED',
orderId: '12345',
trackingNumber: 'TH123456789',
timestamp: Date.now()
}
];
// Producer ส่ง events
for (const event of events) {
await producer.send({
topic: 'order_events',
messages: [{
key: event.orderId,
value: JSON.stringify(event),
headers: {
eventType: event.type,
version: '1.0'
}
}]
});
}
// Consumer สร้าง projections (views)
const orderProjection = new Map();
consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
switch (event.type) {
case 'ORDER_CREATED':
orderProjection.set(event.orderId, {
id: event.orderId,
userId: event.userId,
items: event.items,
status: 'created',
createdAt: event.timestamp
});
break;
case 'PAYMENT_PROCESSED':
const order = orderProjection.get(event.orderId);
if (order) {
order.status = 'paid';
order.paymentId = event.paymentId;
order.amount = event.amount;
}
break;
case 'ORDER_SHIPPED':
const shippedOrder = orderProjection.get(event.orderId);
if (shippedOrder) {
shippedOrder.status = 'shipped';
shippedOrder.trackingNumber = event.trackingNumber;
}
break;
}
}
});
ปัญหาใหญ่: Consumer Lag
วันหนึ่งเจอว่า consumer lag สูงมาก messages เยอะเกินกว่า consumers จะประมวลผลได้:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group email-service
# Output:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order_events 0 1000 5000 4000 😱
# order_events 1 1200 5200 4000 😱
# order_events 2 900 4900 4000 😱
สาเหตุ:
- Message processing ช้า (send email ใช้เวลานาน)
- Consumers น้อยเกินไป
- Partition count ไม่เพียงพอ
แก้โดย:
1. เพิ่ม Consumers
// แทนที่จะมี consumer 1 ตัว
const consumer = kafka.consumer({ groupId: 'email-service' });
// เพิ่มเป็น 3 ตัว (เท่ากับจำนวน partitions)
const consumers = [];
for (let i = 0; i < 3; i++) {
consumers.push(kafka.consumer({
groupId: 'email-service',
clientId: `email-consumer-${i}`
}));
}
2. Batch Processing
consumer.run({
eachBatch: async ({ batch }) => {
const emailBatch = [];
for (const message of batch.messages) {
const event = JSON.parse(message.value.toString());
emailBatch.push({
to: event.email,
subject: 'Order Confirmation',
template: 'order_completed',
data: event
});
}
// ส่ง emails แบบ batch (เร็วกว่า)
await sendBulkEmails(emailBatch);
}
});
3. Async Processing
// แทนที่จะ process synchronous
consumer.run({
eachMessage: async ({ message }) => {
await processMessage(message); // รอจนเสร็จ
}
});
// ใช้ async queue
const queue = new Queue();
consumer.run({
eachMessage: async ({ message }) => {
// ใส่ใน queue แล้วก็ continue
queue.add('process-message', {
value: message.value.toString(),
offset: message.offset
});
}
});
// Worker processes แยกต่างหาก
queue.process('process-message', async (job) => {
await processMessage(job.data);
});
Advanced Kafka Patterns
1. Dead Letter Queues
const DLQ_TOPIC = 'failed_messages_dlq';
const MAX_RETRIES = 3;
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const retryCount = parseInt(message.headers?.retryCount || '0');
try {
await processMessage(message);
} catch (error) {
if (retryCount < MAX_RETRIES) {
// Retry
await producer.send({
topic,
messages: [{
...message,
headers: {
...message.headers,
retryCount: (retryCount + 1).toString(),
lastError: error.message
}
}]
});
} else {
// ส่งไป Dead Letter Queue
await producer.send({
topic: DLQ_TOPIC,
messages: [{
key: message.key,
value: message.value,
headers: {
originalTopic: topic,
failureReason: error.message,
retryCount: retryCount.toString()
}
}]
});
}
}
}
});
2. Message Deduplication
const redis = require('redis').createClient();
const DEDUP_TTL = 3600; // 1 hour
consumer.run({
eachMessage: async ({ message }) => {
const messageId = message.headers?.messageId;
if (!messageId) {
console.warn('Message without ID, processing anyway');
await processMessage(message);
return;
}
// Check if already processed
const exists = await redis.get(`processed:${messageId}`);
if (exists) {
console.log(`Message ${messageId} already processed, skipping`);
return;
}
await processMessage(message);
// Mark as processed
await redis.setex(`processed:${messageId}`, DEDUP_TTL, '1');
}
});
3. Saga Pattern กับ Kafka
// Order Saga - orchestrator
class OrderSaga {
constructor() {
this.state = new Map(); // เก็บ saga state
}
async handleEvent(event) {
const sagaId = event.orderId;
let saga = this.state.get(sagaId) || { step: 0, orderId: sagaId };
switch (event.type) {
case 'ORDER_CREATED':
// Step 1: Reserve inventory
await this.sendCommand('RESERVE_INVENTORY', {
orderId: sagaId,
items: event.items
});
saga.step = 1;
break;
case 'INVENTORY_RESERVED':
// Step 2: Process payment
await this.sendCommand('PROCESS_PAYMENT', {
orderId: sagaId,
amount: event.amount
});
saga.step = 2;
break;
case 'PAYMENT_FAILED':
// Compensate: Release inventory
await this.sendCommand('RELEASE_INVENTORY', {
orderId: sagaId
});
saga.status = 'failed';
break;
case 'PAYMENT_PROCESSED':
// Success!
await this.sendEvent('ORDER_COMPLETED', {
orderId: sagaId
});
saga.status = 'completed';
break;
}
this.state.set(sagaId, saga);
}
async sendCommand(type, data) {
await producer.send({
topic: 'commands',
messages: [{
key: data.orderId,
value: JSON.stringify({ type, ...data })
}]
});
}
async sendEvent(type, data) {
await producer.send({
topic: 'order_events',
messages: [{
key: data.orderId,
value: JSON.stringify({ type, ...data })
}]
});
}
}
Kafka Monitoring & Operations
1. Key Metrics ที่ต้องดู
// Custom metrics collector
const prometheus = require('prom-client');
const kafkaLag = new prometheus.Gauge({
name: 'kafka_consumer_lag',
help: 'Consumer lag by topic and partition',
labelNames: ['topic', 'partition', 'consumer_group']
});
const throughput = new prometheus.Counter({
name: 'kafka_messages_processed_total',
help: 'Total processed messages',
labelNames: ['topic', 'consumer_group', 'status']
});
// Collect metrics
setInterval(async () => {
const admin = kafka.admin();
const groups = await admin.listGroups();
for (const group of groups.groups) {
if (group.groupId.startsWith('my-service')) {
const description = await admin.describeGroups([group.groupId]);
// ... collect lag metrics
}
}
}, 30000);
2. Health Checks
const healthCheck = async () => {
try {
const admin = kafka.admin();
// Check cluster health
const metadata = await admin.fetchTopicMetadata();
const brokerCount = Object.keys(metadata.brokers).length;
if (brokerCount === 0) {
throw new Error('No brokers available');
}
// Check consumer lag
const consumerGroups = await admin.listGroups();
for (const group of consumerGroups.groups) {
const offsets = await admin.fetchOffsets({
groupId: group.groupId,
topics: ['order_events', 'user_events']
});
// Check if lag > threshold
for (const topic of offsets) {
for (const partition of topic.partitions) {
if (partition.lag > 10000) { // threshold
throw new Error(`High lag detected: ${partition.lag}`);
}
}
}
}
return { status: 'healthy', brokers: brokerCount };
} catch (error) {
return { status: 'unhealthy', error: error.message };
}
};
Kafka Streams (ขั้นสูง)
const { KafkaStreams } = require('kafka-streams');
const stream = kafkaStreams.getKStream('user_events');
// Real-time analytics
stream
.filter(message => message.value.eventType === 'page_view')
.groupByKey()
.window(60 * 1000) // 1 minute windows
.aggregate(
() => ({ count: 0, users: new Set() }),
(oldVal, message) => ({
count: oldVal.count + 1,
users: oldVal.users.add(message.value.userId)
})
)
.to('page_view_stats');
// Join streams
const userStream = kafkaStreams.getKStream('user_events');
const orderStream = kafkaStreams.getKStream('order_events');
userStream
.join(
orderStream,
(user, order) => ({ ...user, ...order }),
60 * 1000, // join window
'user_order_joined'
)
.to('user_behavior_analysis');
Production Lessons ที่เจ็บปวด
1. Partition Strategy
// ❌ ผิด - ใช้ random partitioning
await producer.send({
topic: 'user_events',
messages: [{
value: JSON.stringify(event) // ไม่มี key = random partition
}]
});
// ✅ ถูก - ใช้ consistent partitioning
await producer.send({
topic: 'user_events',
messages: [{
key: event.userId, // same user = same partition = order preserved
value: JSON.stringify(event)
}]
});
2. Replication Factor
// ❌ Production ไม่ควรใช้
await admin.createTopics([{
topic: 'critical_events',
numPartitions: 6,
replicationFactor: 1 // ถ้า broker ตาย = ข้อมูลหาย
}]);
// ✅ Production ควรใช้
await admin.createTopics([{
topic: 'critical_events',
numPartitions: 6,
replicationFactor: 3, // ทนได้ 2 brokers ตาย
configEntries: [
{ name: 'min.insync.replicas', value: '2' },
{ name: 'retention.ms', value: '604800000' } // 7 days
]
}]);
3. Consumer Group Coordination
# เช็ค consumer group status
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-service --verbose
# Reset offsets (ระวัง! จะ reprocess messages)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-service \
--reset-offsets --to-earliest \
--topic my-topic --execute
เครื่องมือที่ช่วยชีวิต
1. Kafka UI Tools
- Kafka Magic - GUI management
- Conduktor - Professional tool
- Kafdrop - Web UI
- AKHQ - Open source UI
2. CLI Tools
# Create topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 6 \
--replication-factor 3
# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092
# Consume messages
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--from-beginning
# Produce messages
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic
3. Schema Registry
// Avro schema evolution
const avro = require('avsc');
const userSchema = {
type: 'record',
name: 'User',
fields: [
{ name: 'id', type: 'string' },
{ name: 'name', type: 'string' },
{ name: 'email', type: ['null', 'string'], default: null } // optional field
]
};
const type = avro.Type.forSchema(userSchema);
// Serialize
const buffer = type.toBuffer({
id: '123',
name: 'John',
email: 'john@example.com'
});
await producer.send({
topic: 'users',
messages: [{
key: '123',
value: buffer
}]
});
สรุป: Kafka ที่ทำให้นอนไม่หลับ
ช่วงแรก: ติดตั้งง่าย แต่เจอปัญหาเยอะ (consumer lag, message loss, rebalancing)
ช่วงกลาง: เรียนรู้ patterns และ best practices ระบบเริ่มเสถียร
ตอนนี้: ใช้เป็น backbone ของ event-driven architecture
ข้อดีที่ได้จริง:
- Durability - messages ไม่หาย
- Scalability - รับได้หลายล้าน messages/second
- Fault tolerance - broker ตายไม่กระทบระบบ
- Replay capability - ย้อนดู events ได้
- Real-time processing - latency ต่ำมาก
ข้อเสียที่ต้องทน:
- Operational complexity - setup และ monitoring ซับซ้อน
- Resource hungry - กิน RAM และ disk เยอะ
- Learning curve - concept เยอะต้องเรียนรู้
- Network overhead - replication ใช้ bandwidth
คำแนะนำสุดท้าย:
- เริ่มจาก simple use case ก่อน
- Monitor everything - metrics มีไว้ดู
- Plan capacity - disk, memory, network
- Test failure scenarios - broker ตาย, network partition
- Understand trade-offs - consistency vs availability
Kafka มันเหมือน เครื่องยนต์เจ็ท powerful มาก แต่ต้องบำรุงรักษาให้ดี
เพราะพอมีปัญหา มันจะทำให้ ระบบทั้งหมดล่ม ได้! ✈️😅
แต่พอใช้เป็นแล้ว การันตีได้ว่า ไม่อยากกลับไปใช้ simple message queue แล้ว 🚀