이전 글이 Kafka를 실행하는 법이었다면, 이제 Spring에서 Producer와 Consumer를 설정하는 것에 대한 게시글이다.
Spring에서 Kafka Consumer, Producer 설정
Consumer 설정
- 먼저, kafka 라이브러리 의존성을 추가한다.
- Maven 기준이며, Kafka 2.8.1을 썼기 때문에 라이브러리 버전도 맞춰줬다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
- 이제 Configuration 클래스를 작성한다.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
- @Configuration 어노테이션을 붙인 클래스를 생성한다.
- @EnableKafka 어노테이션의 경우 Consumer 클래스에 필요하다. 그렇지 않으면 @KafkaListener 어노테이션으로 메시지를 가져올 수 없다.
- ConsumerFactory에서는 Consumer를 설정한다.
- 기본 포트인 9092를 사용하고 있으며, Consumer 그룹은 group_1으로 설정했다.
- 토픽:데이터 형태로 데이터를 전달받는다.
- 이때, 토픽은 String으로 명시하고 넘겨줄 데이터는 JSON형식으로 직렬화해서 Producer에서 전달할 것이므로, 동일한 타입으로 역직렬화를 명시해줬다.
- ConcurrentKafkaListenerContainerFactory에서는 Consumer를 KafkaListener에 연결해준다.
- 실제로 Listener가 동작하는 코드는 다음과 같다.
@KafkaListener(topics="prod_compensation", groupId = "group_1")
public void incrInventoryQuantity(HashMap<String, Object> data) {
List<String> itemId = new ArrayList<>();
List<Integer> increment = new ArrayList<>();
for (String key : data.keySet()) {
itemId.add(key);
increment.add((int) data.get(key));
}
catalogService.rollBackInventoryQuantity(itemId, increment);
}
- Controller에 리스너를 등록했다.
- 받아오는 데이터가 HashMap<>이기 때문에 인자에 HashMap<>으로 데이터를 가져온다.
- 이때, KafkaListener 어노테이션에서 topics는 Producer에서 발행한 메시지의 토픽과 일치하게 지정하면 된다.
- 우리는 받아온 메시지가 group_1의 모든 컨슈머에게 전달되기를 원하기 때문에 앞서 설정한 그룹명도 명시해준다.
- 아래의 로직은 보상 트랜잭션이기 때문에 다른 서비스를 호출해 데이터를 롤백하는 로직이다.
Producer 설정
- Consumer와 동일한 라이브러리 의존성을 추가한다.
- Producer를 Configuration하는 클래스를 작성한다.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- 앞서 얘기한 것과 같이 토픽은 String이며 밸류는 객체를 JSON형식으로 직렬화 할 것 이므로 위와 같이 설정한다.
- KafkaTemplate는 send()와 같은 메소드를 통해 쉽게 메시지를 발행할 수 있도록 해주는 객체이다.
- 이제 실제로 메시지를 발행해보자
@Autowired
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public void insertOrder(Order order) {
Map<String, Object> param = new HashMap<>();
try {
// ...
} catch(Exception e) {
// 주문 오류 시
kafkaTemplate.send("prod_compensation", param);
}
}
- "prod_compensation" 이라는 토픽으로, HashMap 객체인 param을 넘겨줬다.
- 실제로 동작시키고, Producer에서 발행한 메시지를 Consumer에서 실행시키면 아주 잘 받아오는 것을 알 수 있다.
추천글
2025.02.14 - [개발] - Kafka를 window 및 Mac os에서 설치 및 실행하는 법
Kafka를 window 및 Mac os에서 설치 및 실행하는 법
Kafka를 프로젝트에 들여올 일이 생기면서, Widnow 환경과 Mac os에서 설치 및 실행시킬 필요가 있었다. 나도 나중에 까먹지 않도록 기록하려고 한다. Kafka를 window 및 Mac os에서 설치 및 실행하
se-dobby.tistory.com
'개발' 카테고리의 다른 글
Kafka 그리고 Kafka의 파티션(Partition)과 컨슈머 그룹(Consumer group) (0) | 2025.02.14 |
---|---|
Kafka를 window 및 Mac os에서 설치 및 실행하는 법 (3) | 2025.02.14 |
분산 트랜잭션 및 보상 트랜잭션 (0) | 2025.02.12 |
CORS에서 Cross-Origin과 쿠키의 Same-Site (0) | 2025.02.10 |
트랜잭션과 격리성 그리고 낙관적 락과 비관적락에 대한 얘기 (0) | 2025.02.09 |