일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- Call-by-reference
- query
- CSS
- 악성코드
- Kafka
- 자료구조
- 노드
- Sort
- System
- C
- array
- API
- UTF-8
- c++
- CLASS
- 포인터
- windows
- HTML
- 투자
- beans
- java
- JavaScript
- request
- algorithm
- function
- OOP
- 윈도우즈
- meta
- WebProgramming
- jsp
- Today
- Total
hahahia
Kafka Consumer 의 commitSync / seek 을 이용한 offset 관리 예제 본문
최근에 Kafka Consumer 를 구현하면서 commitSync 에 대해 겪었던 이슈를 공유합니다
보통 Kafka Consumer 를 구현할 때, commitSync 를 이용하여 해당 offset 을 명시적으로 commit 할 수 있습니다.
따라서 commitSync 를 하지 않으면, 다음 번 poll 에서는 commit 하지 않았던 이전 offset 을 가져오는 줄 알았지만, 실패했던 현재 offset 이 아닌 다음 offset(현재 offset + 1) 을 가져오는 이슈가 있었습니다.
이해를 돕기 위해 다음과 같은 순서로 정리해보았습니다.
- 자동 커밋 / 수동 커밋
- 수동 커밋 코드 예제
- commitSync 에 대한 오해?
- 해결책(seek)
- 결론
1. 자동 커밋 / 수동 커밋
-
자동 커밋
- 오프셋 및 파티션에 대한 관리를 사용자가 관리하지 않아도 되는 경우에 사용(로그성 데이터)
- enable.auto.commit=true
- auto.commit.interval.ms 옵션을 통해 자동 커밋 주기를 설정
-
수동 커밋
- 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안되는 경우에 사용(비즈니스 로직, DB 관련 로직 등)
- enable.auto.commit=false 로 설정
- 명시적으로 commitSync 메소드를 호출하여 메시지 처리 완료 & 메시지를 가져온 것으로 설정
2. 수동 커밋 Consumer 코드 예제
해당 consume method 는 loop 로 돌면서, 계속해서 poll 을 호출하는 방식입니다 (spring kafka 가 아닌 apache kafka 사용).
private void consume(Consumer<String, String> consumer) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT));
for (ConsumerRecord<String, String> record : records) {
try {
if (process(record)) {
consumer.commitSync();
}
} catch (CommitFailedException e) {
LOG.error("Error:", e);
}
}
}
public boolean process(ConsumerRecord<String, String> record) {
try {
// record 처리 로직(DB insert 등)
return true;
} catch (Exception e) {
return false;
}
}
3. commitSync 에 대한 오해?
commitSync 을 통해 메시지 처리가 완료되었고, 메시지를 가져온 것으로 처리하기 때문에 commitSync 를 수행하지 않으면(여기에서는 process 의 return 이 false 일 경우) 실패한 현재 offset 을 다시 가져올 것이라고 생각했었습니다.
하지만, 제 예상과 달리 현재 offset 을 다시 가져오지 않고 그 다음 offset 을 가져옵니다.
Producer 예제 및 로그
- test 라는 topic 으로 produce-0,1,2,3,...18 이라는 message 를 producing 하는 Application
- 해당 topic(test) 의 partition 개수는 3개
- test 라는 topic 으로 총 19개의 Message 가 producing 되었습니다.
21:25:37.379 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-0
21:25:38.384 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-1
21:25:39.386 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-2
21:25:40.387 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-3
21:25:41.390 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-4
21:25:42.392 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-5
21:25:43.393 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-6
21:25:44.394 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-7
21:25:45.395 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-8
21:25:46.398 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-9
21:25:47.400 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-10
21:25:48.401 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-11
21:25:49.402 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-12
21:25:50.405 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-13
21:25:51.406 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-21
21:25:52.407 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-15
21:25:53.412 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-16
21:25:54.421 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-17
14:25:55.419 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-18
Consumer 예제 및 로그
- process 로직이 false 로 return 되게끔 강제하여, commitSync 를 하지 않는 예제
- 보기 편하게 partition 1 에 대한 로그만 보여드립니다.
2019-12-16 21:25:38.128 INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:25:40.402 INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 634, CreateTime = 1576473940387, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-3)
2019-12-16 21:25:43.412 INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 635, CreateTime = 1576473943393, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-6)
2019-12-16 21:25:46.412 INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 636, CreateTime = 1576473946398, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-9)
2019-12-16 21:25:49.411 INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 637, CreateTime = 1576473949402, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-12)
2019-12-16 21:25:52.422 INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 638, CreateTime = 1576473952407, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-15)
2019-12-16 14:25:55.533 INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 639, CreateTime = 1576473955419, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-18)
Consuemr 로그의 offset 을 보면
commitSync 를 아예 하지 않는다면 가장 처음에 실패한 message-0 을 계속해서 poll(offset 633) 로 받아야 하는데 예상과 달리 message-18(offset 639까지) 까지 받는 것을 확인할 수 있었습니다.
따라서 commitSync 메소드를 호출하지 않는다면, 현재 offset 을 Skip 하고 다음 offset 을 poll 로 받는다는 것을 알 수 있었습니다.
그리고 Consumer Application 을 종료하고 다시 재시작 했을 때 로그를 살펴보니 다음과 같았습니다(보기 편하기 위해 1번 partition 에 대한 로그만 보여드립니다)
2019-12-16 21:30:26.073 INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:30:26.073 INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 634, CreateTime = 1576473940387, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-3)
2019-12-16 21:30:26.073 INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 635, CreateTime = 1576473943393, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-6)
2019-12-16 21:30:26.073 INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 636, CreateTime = 1576473946398, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-9)
2019-12-16 21:30:26.074 INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 637, CreateTime = 1576473949402, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-12)
2019-12-16 21:30:26.074 INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 638, CreateTime = 1576473952407, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-15)
2019-12-16 21:30:26.074 INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 639, CreateTime = 1576473955419, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-18)
재시작할 경우, 다시 실패했었던 offset 인 633, message-0 부터 consuming 하는 것을 확인할 수 있습니다.
4. 해결책(seek)
예상대로 process 를 실패했을 때 실패했던 record 의 offset 을 다시 poll 하기 위해서 seek 메소드를 이용해 이 문제를 해결해보았습니다.
private void consume(Consumer<String, String> consumer) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT));
for (ConsumerRecord<String, String> record : records) {
try {
if (process(record)) {
consumer.commitSync();
} else {
/** 실패했던 시점의 Offset 으로 되돌아가는 Seek */
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
}
} catch (CommitFailedException e) {
LOG.error("Error:", e);
}
}
}
seek 를 적용한 consumer 실행 결과 로그
2019-12-16 21:44:01.789 INFO 29554 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:44:01.789 INFO 29554 --- [pool-1-thread-3] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-2, groupId=test-group-5] Seeking to offset 633 for partition test-1
2019-12-16 21:44:01.806 INFO 29554 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:44:01.806 INFO 29554 --- [pool-1-thread-3] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-2, groupId=test-group-5] Seeking to offset 633 for partition test-1
2019-12-16 21:44:01.822 INFO 29554 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:44:01.822 INFO 29554 --- [pool-1-thread-3] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-2, groupId=test-group-5] Seeking to offset 633 for partition test-1
proess 에서 false 인 경우, seeking to offset 633 for partition test-1 이라는 로그에서 보이듯이 해당 offset 으로 되돌아가는 것을 확인할 수 있습니다.
5. 결론
- seek 를 이용하여 실패한 offset 을 다시 poll 할 수 있음
- 이러한 방식을 사용하게 된다면 비즈니스 로직을에 대한 retry 정책을 따로 가져가야 할 필요가 있음
'Kafka' 카테고리의 다른 글
Spring Boot + Spring Kafka 를 이용한 Consumer 구현 (4) | 2019.12.14 |
---|