본문 바로가기
개발

Spring에서 Kafka Consumer, Producer 설정

by 뿔난 도비 2025. 2. 14.
반응형

이전 글이 Kafka를 실행하는 법이었다면, 이제 Spring에서 Producer와 Consumer를 설정하는 것에 대한 게시글이다.

 

Spring에서 Kafka Consumer, Producer 설정

 

 

Consumer 설정

 

- 먼저, kafka 라이브러리 의존성을 추가한다.

- Maven 기준이며, Kafka 2.8.1을 썼기 때문에 라이브러리 버전도 맞춰줬다.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.1</version>
</dependency>

 

- 이제 Configuration 클래스를 작성한다.

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

- @Configuration 어노테이션을 붙인 클래스를 생성한다.

- @EnableKafka 어노테이션의 경우 Consumer 클래스에 필요하다. 그렇지 않으면 @KafkaListener 어노테이션으로 메시지를 가져올 수 없다.

- ConsumerFactory에서는 Consumer를 설정한다.

- 기본 포트인 9092를 사용하고 있으며, Consumer 그룹은 group_1으로 설정했다.

- 토픽:데이터 형태로 데이터를 전달받는다.

- 이때, 토픽은 String으로 명시하고 넘겨줄 데이터는 JSON형식으로 직렬화해서 Producer에서 전달할 것이므로, 동일한 타입으로 역직렬화를 명시해줬다.

 

- ConcurrentKafkaListenerContainerFactory에서는 Consumer를 KafkaListener에 연결해준다.

 

- 실제로 Listener가 동작하는 코드는 다음과 같다.

@KafkaListener(topics="prod_compensation", groupId = "group_1")
public void incrInventoryQuantity(HashMap<String, Object> data) {
    List<String> itemId = new ArrayList<>();
    List<Integer> increment = new ArrayList<>();

    for (String key : data.keySet()) {
        itemId.add(key);
        increment.add((int) data.get(key));
    }
    catalogService.rollBackInventoryQuantity(itemId, increment);
}

 

- Controller에 리스너를 등록했다.

- 받아오는 데이터가 HashMap<>이기 때문에 인자에 HashMap<>으로 데이터를 가져온다.

- 이때, KafkaListener 어노테이션에서 topics는 Producer에서 발행한 메시지의 토픽과 일치하게 지정하면 된다.

- 우리는 받아온 메시지가 group_1의 모든 컨슈머에게 전달되기를 원하기 때문에 앞서 설정한 그룹명도 명시해준다.

- 아래의 로직은 보상 트랜잭션이기 때문에 다른 서비스를 호출해 데이터를 롤백하는 로직이다.

Producer 설정

 

- Consumer와 동일한 라이브러리 의존성을 추가한다.

 

- Producer를 Configuration하는 클래스를 작성한다.

@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

- 앞서 얘기한 것과 같이 토픽은 String이며 밸류는 객체를 JSON형식으로 직렬화 할 것 이므로 위와 같이 설정한다.

- KafkaTemplate는 send()와 같은 메소드를 통해 쉽게 메시지를 발행할 수 있도록 해주는 객체이다.

- 이제 실제로 메시지를 발행해보자

 

@Autowired
private final KafkaTemplate<String, Object> kafkaTemplate;

@Transactional
public void insertOrder(Order order) {
    Map<String, Object> param = new HashMap<>();
    try {
    	// ...
    } catch(Exception e) {
      // 주문 오류 시
      kafkaTemplate.send("prod_compensation", param);
    }
}

 

- "prod_compensation" 이라는 토픽으로, HashMap 객체인 param을 넘겨줬다.

- 실제로 동작시키고, Producer에서 발행한 메시지를 Consumer에서 실행시키면 아주 잘 받아오는 것을 알 수 있다.

추천글

2025.02.14 - [개발] - Kafka를 window 및 Mac os에서 설치 및 실행하는 법

 

Kafka를 window 및 Mac os에서 설치 및 실행하는 법

Kafka를 프로젝트에 들여올 일이 생기면서, Widnow 환경과 Mac os에서 설치 및 실행시킬 필요가 있었다. 나도 나중에 까먹지 않도록 기록하려고 한다.  Kafka를 window 및 Mac os에서 설치 및 실행하

se-dobby.tistory.com

 

반응형