일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- WebProgramming
- function
- jsp
- c++
- Call-by-reference
- JavaScript
- CLASS
- 포인터
- beans
- 자료구조
- CSS
- 윈도우즈
- Sort
- windows
- OOP
- array
- HTML
- query
- request
- API
- 악성코드
- 노드
- System
- 투자
- meta
- C
- Kafka
- UTF-8
- algorithm
- java
- Today
- Total
hahahia
Spring Boot + Spring Kafka 를 이용한 Consumer 구현 본문
최근 프로젝트에서 Kafka 를 구축하고, Spring Boot 기반의 Consumer 를 구현하게 되었는데 여러가지 조건이 있었다.
일단 SASL 연동 및 수동 Commit 을 예제로 만들어 보았다. Spring Boot Project 생성하는 방법은 Skip!
Spring Boot 에서 spring-kafka 사용하는 방법
사실 Java 에서 kafka library 는 spring-kafka 와 apache-kafka client 가 있는데, 쉽게 보면 고수준 / 저수준 client 정도의 차이라고 생각하면 된다. 지금 예제에서는 spring-kafka 를 사용한다.
build.gradle 에 다음과 같은 dependency 를 추가한다
plugins {
id 'org.springframework.boot' version '2.2.2.RELEASE'
id 'io.spring.dependency-management' version '1.0.8.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka' // 이거 추가
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
test {
useJUnitPlatform()
}
application.properties
ackMode 에 자세한 내용은 아래 링크를 참고하는 것이 좋다. 지금 예제에서는 명시적으로 Ack 를 수행해야 하기 때문에 MANUAL 로 설정하였다.
https://docs.spring.io/spring-kafka/reference/html/#committing-offsets
kafka.bootstrap-servers=localhost:9092
kafka.username=username
kafka.password=qwer!@#$
kafka.enable.auto.commit=false
kafka.ackmode=MANUAL
kafka.offset.reset=latest
MyKafkaConsumerProperties.java
JAAS File 을 따로 import 하지 않고 JAAS Template 를 String Format 으로 관리한다. username/password 만 파라미터로 받아 지정하도록 구현하였다.
package com.example.hahahia.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.HashMap;
import java.util.Map;
public class MyKafkaConsumerProperties {
/** SASL 인증을 위한 JAAS Template */
private static final String JAAS_TEMPLATE = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
public static Map<String, Object> getProperties(String bootstrapServers, String userName, String password) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 명시적으로 Ack 를 수행하기 위해서 false
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest or latest
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
String jaasConfig = String.format(JAAS_TEMPLATE, userName, password);
props.put("sasl.jaas.config", jaasConfig);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
return props;
}
}
MyKafkaConsumerConfig.java
해당 예제에서는 명시적 Ack를 수행해주어야 하기 때문에 ConcurrentKafkaListenerFactory Bean을 생성할 때, setMessageListener 를 별도로 해주었다.
package com.example.hahahia.config;
import com.example.hahahia.service.MyAcknowledgingMessageListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
@Configuration
@EnableKafka
public class MyKafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.username}")
private String userName;
@Value("${kafka.password}")
private String password;
@Value("${kafka.offset.reset}")
private String offsetResetMode;
@Value("${kafka.ackmode}")
private ContainerProperties.AckMode ackMode;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
/**
* https://docs.spring.io/spring-kafka/reference/html/#committing-offsets
* */
factory.getContainerProperties().setAckMode(ackMode);
/**
* consumer 를 처리하는 Thread 개수(consumer 개수를 의미하는 것은 아님)
* */
factory.setConcurrency(3);
/**
* 명시적으로 Ack 처리를 위해 별도의 AcknowledgingMessageListener 구현
* https://docs.spring.io/spring-kafka/reference/html/#message-listeners
* */
factory.getContainerProperties().setMessageListener(myAcknowledgingMessageListener());
factory.getContainerProperties().setPollTimeout(5000);
return factory;
}
public MyAcknowledgingMessageListener myAcknowledgingMessageListener() {
return new MyAcknowledgingMessageListener();
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(MyKafkaConsumerProperties.getProperties(bootstrapServers, userName, password));
}
}
MyAcknowledgingMessageListener.java
package com.example.hahahia.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyAcknowledgingMessageListener implements AcknowledgingMessageListener<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyAcknowledgingMessageListener.class);
@Override
@KafkaListener(topics = "test", groupId = "test-group-1234", containerFactory = "kafkaListenerContainerFactory")
public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
try {
LOGGER.info("consume data: " + data.toString());
acknowledgment.acknowledge();
} catch (Exception e) {
LOGGER.error("consume cause exception : " + e);
}
}
}
여기에서 실제 topic 과 consumer group id 를 설정을 기반으로 Consumer 로직이 동작하게 된다.
Spring Kafka 에서는 AckMode 를 사용하기 위해서는 AcknowledgingMessageListener interface 를 구현해야 한다. 자세한 내용은 message listener 와 위에서 언급하였던 commit offset 모드에 대한 튜토리얼을 참고하는 것이 좋다.
https://docs.spring.io/spring-kafka/reference/html/#message-listeners
실행 결과
2019-12-14 08:49:51.709 INFO 1729 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test-group-1234: partitions assigned: [test-0]
2019-12-14 08:49:51.709 INFO 1729 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : test-group-1234: partitions assigned: [test-1]
2019-12-14 08:49:51.709 INFO 1729 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : test-group-1234: partitions assigned: [test-2]
2019-12-14 08:49:51.748 INFO 1729 --- [ntainer#0-2-C-1] c.e.h.S.MyAcknowledgingMessageListener : consume data: ConsumerRecord(topic = test, partition = 2, leaderEpoch = 0, offset = 393, CreateTime = 1576224998491, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-1)
2019-12-14 08:49:51.748 INFO 1729 --- [ntainer#0-1-C-1] c.e.h.S.MyAcknowledgingMessageListener : consume data: ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 392, CreateTime = 1576224999492, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-2)
2019-12-14 08:49:51.748 INFO 1729 --- [ntainer#0-0-C-1] c.e.h.S.MyAcknowledgingMessageListener : consume data: ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 11294, CreateTime = 1576224997477, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-14 08:49:51.748 INFO 1729 --- [ntainer#0-1-C-1] c.e.h.S.MyAcknowledgingMessageListener : consume data: ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 393, CreateTime = 1576225002503, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-5)
2019-12-14 08:49:51.748 INFO 1729 --- [ntainer#0-2-C-1] c.e.h.S.MyAcknowledgingMessageListener : consume data: ConsumerRecord(topic = test, partition = 2, leaderEpoch = 0, offset = 394, CreateTime = 1576225001498, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-4)
2019-12-14 08:49:51.748 INFO 1729 --- [ntainer#0-0-C-1] c.e.h.S.MyAcknowledgingMessageListener : consume data: ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 11295, CreateTime = 1576225000497, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-3)
해당 Topic 은 3개의 Partition으로 구성되어 있다. 그리고 현재는 Consumer Group 내 consumer는 단 하나(지금 동작하는 SpringBoot Application) 이기 때문에 지금 구현한 Consumer가 3개의 Partition에 대해 Consume 을 수행하는 것을 확인할 수 있다.
로그를 보면 partitions assigned: test-0, test-1, test2
정상적으로 Consume을 하는 것을 확인할 수 있다.
'Kafka' 카테고리의 다른 글
Kafka Consumer 의 commitSync / seek 을 이용한 offset 관리 예제 (5) | 2019.12.16 |
---|