hahahia

Spring Boot + Spring Kafka 를 이용한 Consumer 구현 본문

Kafka

Spring Boot + Spring Kafka 를 이용한 Consumer 구현

hahahia 2019. 12. 14. 09:12

최근 프로젝트에서 Kafka 를 구축하고, Spring Boot 기반의 Consumer 를 구현하게 되었는데 여러가지 조건이 있었다. 

일단 SASL 연동 및 수동 Commit 을 예제로 만들어 보았다. Spring Boot Project 생성하는 방법은 Skip!

프로젝트 구성(gradle 기반 springboot)

Spring Boot 에서 spring-kafka 사용하는 방법

사실 Java 에서 kafka library 는 spring-kafka 와 apache-kafka client 가 있는데, 쉽게 보면 고수준 / 저수준 client 정도의 차이라고 생각하면 된다. 지금 예제에서는 spring-kafka 를 사용한다.

build.gradle 에 다음과 같은 dependency 를 추가한다

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka' // 이거 추가
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

test {
    useJUnitPlatform()
}

 

application.properties 

ackMode 에 자세한 내용은 아래 링크를 참고하는 것이 좋다. 지금 예제에서는 명시적으로 Ack 를 수행해야 하기 때문에 MANUAL 로 설정하였다.

https://docs.spring.io/spring-kafka/reference/html/#committing-offsets

kafka.bootstrap-servers=localhost:9092
kafka.username=username
kafka.password=qwer!@#$
kafka.enable.auto.commit=false
kafka.ackmode=MANUAL
kafka.offset.reset=latest

 

MyKafkaConsumerProperties.java

JAAS File 을 따로 import 하지 않고 JAAS Template 를 String Format 으로 관리한다. username/password 만 파라미터로 받아 지정하도록 구현하였다.

package com.example.hahahia.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.HashMap;
import java.util.Map;

public class MyKafkaConsumerProperties {

    /** SASL 인증을 위한 JAAS Template */
    private static final String JAAS_TEMPLATE = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";

    public static Map<String, Object> getProperties(String bootstrapServers, String userName, String password) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 명시적으로 Ack 를 수행하기 위해서 false
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest or latest
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        String jaasConfig = String.format(JAAS_TEMPLATE, userName, password);
        props.put("sasl.jaas.config", jaasConfig);

        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        return props;

    }
}

 

MyKafkaConsumerConfig.java

해당 예제에서는 명시적 Ack를 수행해주어야 하기 때문에 ConcurrentKafkaListenerFactory Bean을 생성할 때, setMessageListener 를 별도로 해주었다.

package com.example.hahahia.config;

import com.example.hahahia.service.MyAcknowledgingMessageListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

@Configuration
@EnableKafka
public class MyKafkaConsumerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.username}")
    private String userName;

    @Value("${kafka.password}")
    private String password;

    @Value("${kafka.offset.reset}")
    private String offsetResetMode;

    @Value("${kafka.ackmode}")
    private ContainerProperties.AckMode ackMode;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        /**
         * https://docs.spring.io/spring-kafka/reference/html/#committing-offsets
         * */
        factory.getContainerProperties().setAckMode(ackMode);

        /**
         * consumer 를 처리하는 Thread 개수(consumer 개수를 의미하는 것은 아님)
         * */
        factory.setConcurrency(3);
        /**
         * 명시적으로 Ack 처리를 위해 별도의 AcknowledgingMessageListener 구현
         * https://docs.spring.io/spring-kafka/reference/html/#message-listeners
         * */
        factory.getContainerProperties().setMessageListener(myAcknowledgingMessageListener());
        factory.getContainerProperties().setPollTimeout(5000);

        return factory;
    }

    public MyAcknowledgingMessageListener myAcknowledgingMessageListener() {
        return new MyAcknowledgingMessageListener();
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(MyKafkaConsumerProperties.getProperties(bootstrapServers, userName, password));
    }
}

 

MyAcknowledgingMessageListener.java

package com.example.hahahia.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MyAcknowledgingMessageListener implements AcknowledgingMessageListener<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyAcknowledgingMessageListener.class);

    @Override
    @KafkaListener(topics = "test", groupId = "test-group-1234", containerFactory = "kafkaListenerContainerFactory")
    public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
        try {
            LOGGER.info("consume data: " + data.toString());
            acknowledgment.acknowledge();
        } catch (Exception e) {
            LOGGER.error("consume cause exception : " + e);
        }
    }

}

여기에서 실제 topic 과 consumer group id 를 설정을 기반으로 Consumer 로직이 동작하게 된다.

Spring Kafka 에서는 AckMode 를 사용하기 위해서는 AcknowledgingMessageListener interface 를 구현해야 한다. 자세한 내용은 message listener 와 위에서 언급하였던 commit offset 모드에 대한 튜토리얼을 참고하는 것이 좋다.

https://docs.spring.io/spring-kafka/reference/html/#message-listeners

 

 

실행 결과

2019-12-14 08:49:51.709  INFO 1729 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test-group-1234: partitions assigned: [test-0]
2019-12-14 08:49:51.709  INFO 1729 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : test-group-1234: partitions assigned: [test-1]
2019-12-14 08:49:51.709  INFO 1729 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : test-group-1234: partitions assigned: [test-2]
2019-12-14 08:49:51.748  INFO 1729 --- [ntainer#0-2-C-1] c.e.h.S.MyAcknowledgingMessageListener   : consume data: ConsumerRecord(topic = test, partition = 2, leaderEpoch = 0, offset = 393, CreateTime = 1576224998491, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-1)
2019-12-14 08:49:51.748  INFO 1729 --- [ntainer#0-1-C-1] c.e.h.S.MyAcknowledgingMessageListener   : consume data: ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 392, CreateTime = 1576224999492, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-2)
2019-12-14 08:49:51.748  INFO 1729 --- [ntainer#0-0-C-1] c.e.h.S.MyAcknowledgingMessageListener   : consume data: ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 11294, CreateTime = 1576224997477, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-0)
2019-12-14 08:49:51.748  INFO 1729 --- [ntainer#0-1-C-1] c.e.h.S.MyAcknowledgingMessageListener   : consume data: ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 393, CreateTime = 1576225002503, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-5)
2019-12-14 08:49:51.748  INFO 1729 --- [ntainer#0-2-C-1] c.e.h.S.MyAcknowledgingMessageListener   : consume data: ConsumerRecord(topic = test, partition = 2, leaderEpoch = 0, offset = 394, CreateTime = 1576225001498, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-4)
2019-12-14 08:49:51.748  INFO 1729 --- [ntainer#0-0-C-1] c.e.h.S.MyAcknowledgingMessageListener   : consume data: ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 11295, CreateTime = 1576225000497, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message-3)

 

해당 Topic 은 3개의 Partition으로 구성되어 있다. 그리고 현재는 Consumer Group 내 consumer는 단 하나(지금 동작하는 SpringBoot Application) 이기 때문에 지금 구현한 Consumer가 3개의 Partition에 대해 Consume 을 수행하는 것을 확인할 수 있다.

로그를 보면 partitions assigned: test-0, test-1, test2

정상적으로 Consume을 하는 것을 확인할 수 있다.

Comments