hahahia

Kafka Consumer 의 commitSync / seek 을 이용한 offset 관리 예제 본문

Kafka

Kafka Consumer 의 commitSync / seek 을 이용한 offset 관리 예제

hahahia 2019. 12. 16. 21:51

최근에 Kafka Consumer 를 구현하면서 commitSync 에 대해 겪었던 이슈를 공유합니다

보통 Kafka Consumer 를 구현할 때, commitSync 를 이용하여 해당 offset 을 명시적으로 commit 할 수 있습니다.
따라서 commitSync 를 하지 않으면, 다음 번 poll 에서는 commit 하지 않았던 이전 offset 을 가져오는 줄 알았지만, 실패했던 현재 offset 이 아닌 다음 offset(현재 offset + 1) 을 가져오는 이슈가 있었습니다.

이해를 돕기 위해 다음과 같은 순서로 정리해보았습니다.

  1. 자동 커밋 / 수동 커밋
  2. 수동 커밋 코드 예제
  3. commitSync 에 대한 오해?
  4. 해결책(seek)
  5. 결론

 

1. 자동 커밋 / 수동 커밋 

  • 자동 커밋

    • 오프셋 및 파티션에 대한 관리를 사용자가 관리하지 않아도 되는 경우에 사용(로그성 데이터)
    • enable.auto.commit=true
    • auto.commit.interval.ms 옵션을 통해 자동 커밋 주기를 설정
  • 수동 커밋

    • 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안되는 경우에 사용(비즈니스 로직, DB 관련 로직 등)
    • enable.auto.commit=false 로 설정
    • 명시적으로 commitSync 메소드를 호출하여 메시지 처리 완료 & 메시지를 가져온 것으로 설정

 

2. 수동 커밋 Consumer 코드 예제 

해당 consume method 는 loop 로 돌면서, 계속해서 poll 을 호출하는 방식입니다 (spring kafka 가 아닌 apache kafka 사용).

private void consume(Consumer<String, String> consumer) {
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT));
	for (ConsumerRecord<String, String> record : records) {
		try {
			if (process(record)) {
				consumer.commitSync();
			}
		} catch (CommitFailedException e) {
			LOG.error("Error:", e);
		}
	}
}
public boolean process(ConsumerRecord<String, String> record) {
	try {
		// record 처리 로직(DB insert 등)
		return true;
	} catch (Exception e) {
		return false;
	}
}

 

3. commitSync 에 대한 오해?

commitSync 을 통해 메시지 처리가 완료되었고, 메시지를 가져온 것으로 처리하기 때문에 commitSync 를 수행하지 않으면(여기에서는 process 의 return 이 false 일 경우) 실패한 현재 offset 을 다시 가져올 것이라고 생각했었습니다.
하지만, 제 예상과 달리 현재 offset 을 다시 가져오지 않고 그 다음 offset 을 가져옵니다.

Producer 예제 및 로그

  • test 라는 topic 으로 produce-0,1,2,3,...18 이라는 message 를 producing 하는 Application
  • 해당 topic(test) 의 partition 개수는 3개
  • test 라는 topic 으로 총 19개의 Message 가 producing 되었습니다.
21:25:37.379 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-0
21:25:38.384 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-1
21:25:39.386 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-2
21:25:40.387 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-3
21:25:41.390 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-4
21:25:42.392 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-5
21:25:43.393 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-6
21:25:44.394 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-7
21:25:45.395 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-8
21:25:46.398 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-9
21:25:47.400 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-10
21:25:48.401 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-11
21:25:49.402 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-12
21:25:50.405 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-13
21:25:51.406 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-21
21:25:52.407 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-15
21:25:53.412 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-16
21:25:54.421 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-17
14:25:55.419 [main] INFO com.demo.kafkasample.config.ProducerAPp - produce-18

 

Consumer 예제 및 로그

  • process 로직이 false 로 return 되게끔 강제하여, commitSync 를 하지 않는 예제
  • 보기 편하게 partition 1 에 대한 로그만 보여드립니다.
2019-12-16 21:25:38.128  INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:25:40.402  INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 634, CreateTime = 1576473940387, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-3)
2019-12-16 21:25:43.412  INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 635, CreateTime = 1576473943393, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-6)
2019-12-16 21:25:46.412  INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 636, CreateTime = 1576473946398, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-9)
2019-12-16 21:25:49.411  INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 637, CreateTime = 1576473949402, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-12)
2019-12-16 21:25:52.422  INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 638, CreateTime = 1576473952407, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-15)
2019-12-16 14:25:55.533  INFO 22149 --- [pool-1-thread-2] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 639, CreateTime = 1576473955419, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-18)

Consuemr 로그의 offset 을 보면
commitSync 를 아예 하지 않는다 가장 처음에 실패한 message-0 을 계속해서 poll(offset 633) 로 받아야 하는데 예상과 달리 message-18(offset 639까지) 까지 받는 것을 확인할 수 있었습니다.
따라서 commitSync 메소드를 호출하지 않는다면, 현재 offset 을 Skip 하고 다음 offset 을 poll 로 받는다는 것을 알 수 있었습니다.

그리고 Consumer Application 을 종료하고 다시 재시작 했을 때 로그를 살펴보니 다음과 같았습니다(보기 편하기 위해 1번 partition 에 대한 로그만 보여드립니다)

2019-12-16 21:30:26.073  INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:30:26.073  INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 634, CreateTime = 1576473940387, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-3)
2019-12-16 21:30:26.073  INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 635, CreateTime = 1576473943393, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-6)
2019-12-16 21:30:26.073  INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 636, CreateTime = 1576473946398, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-9)
2019-12-16 21:30:26.074  INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 637, CreateTime = 1576473949402, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-12)
2019-12-16 21:30:26.074  INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 638, CreateTime = 1576473952407, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-15)
2019-12-16 21:30:26.074  INFO 27593 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 639, CreateTime = 1576473955419, serialized key size = -1, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-18)

재시작할 경우, 다시 실패했었던 offset 인 633, message-0 부터 consuming 하는 것을 확인할 수 있습니다.

4. 해결책(seek)

예상대로 process 를 실패했을 때 실패했던 record 의 offset 을 다시 poll 하기 위해서 seek 메소드를 이용해 이 문제를 해결해보았습니다.

private void consume(Consumer<String, String> consumer) {
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT));
	for (ConsumerRecord<String, String> record : records) {
		try {
			if (process(record)) {
				consumer.commitSync();
			} else {
				/** 실패했던 시점의 Offset 으로 되돌아가는 Seek */
				consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
			}
		} catch (CommitFailedException e) {
			LOG.error("Error:", e);
		}
	}
}

 

seek 를 적용한 consumer 실행 결과 로그

2019-12-16 21:44:01.789  INFO 29554 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:44:01.789  INFO 29554 --- [pool-1-thread-3] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-2, groupId=test-group-5] Seeking to offset 633 for partition test-1
2019-12-16 21:44:01.806  INFO 29554 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:44:01.806  INFO 29554 --- [pool-1-thread-3] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-2, groupId=test-group-5] Seeking to offset 633 for partition test-1
2019-12-16 21:44:01.822  INFO 29554 --- [pool-1-thread-3] c.n.b.c.test.MyTestKafkaConsumer   : ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 633, CreateTime = 1576473937363, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-16 21:44:01.822  INFO 29554 --- [pool-1-thread-3] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-2, groupId=test-group-5] Seeking to offset 633 for partition test-1

 

proess 에서 false 인 경우, seeking to offset 633 for partition test-1 이라는 로그에서 보이듯이 해당 offset 으로 되돌아가는 것을 확인할 수 있습니다.

5. 결론

  • seek 를 이용하여 실패한 offset 을 다시 poll 할 수 있음
  • 이러한 방식을 사용하게 된다면 비즈니스 로직을에 대한 retry 정책을 따로 가져가야 할 필요가 있음

'Kafka' 카테고리의 다른 글

Spring Boot + Spring Kafka 를 이용한 Consumer 구현  (4) 2019.12.14
Comments