반응형
JSON 형태인 데이터(메시지)를 Producer가 보내고 Consumer가 읽는 코드를 간단하게 구현해보도록 하겠습니다.
아래 코드를 작성해보시고 실행해보시면 정상작동합니다.
예제 코드가 정상 작동하기 위해서는 당연히 예제 쓰일 topic이 생성되어야 하고, zookeeper와 kafka를 실행시켜주셔야합니다. (zookeeper는 지양되지만 현 예제에서는 zookeeper를 사용하도록 하겠습니다)
1. zookeeper 실행
D:\tool\kafka> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
2. (broker) 서버 실행
D:\tool\kafka> .\bin\windows\kafka-server-start.bat .\config\server.properties
3. topic 생성
D:\tool\kafka> .\bin\windows\kafka-topics.bat --create --topic topic1 --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092
의존 Kafka 추가
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
}
Producer 작성
Message.java
package com.dutmdcjf.test.kafka.moel;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class Message {
private String name;
private String message;
}
KafkaProducerConfig.java
package com.dutmdcjf.test.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.dutmdcjf.test.kafka.moel.Message;
@Configuration
public class KafkaProducerConfig {
private static final String BOOTSTRAP_SERVER = "localhost:9092";
/**
* 메시지 발송 모듈에서 사용할 KafkaTemplate
* @return
*/
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // String 형태로 직렬화
return new DefaultKafkaProducerFactory<>(configProps);
}
}
- KafkaTemplate:
Producer는 Kafka에 메시지를 보내는 작업을 담당합니다. 이를 위해 KafkaTemplate을 사용하여 메시지를 전송합니다. KafkaTemplate에 ProducerFactory를 등록하여 Kafka 메시지를 보내는데 필요한 설정을 해줍니다.
KafkaProducerService.java
package com.dutmdcjf.test.kafka.service;
import java.util.concurrent.CompletableFuture;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import com.dutmdcjf.test.kafka.moel.Message;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
private static final String TOPIC_NAME = "topic1";
private final KafkaTemplate<String, Message> kafkaTemplate;
public void send(Message message) {
kafkaTemplate.send(TOPIC_NAME, message);
}
public void sendWithCallback(Message message) {
// Non Blocking (Async)
CompletableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, message);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent: " + message + " offset: " + result.getRecordMetadata().offset());
} else {
System.out.println("Failure: " + message + " due to : " + ex.getMessage());
}
});
}
}
- ListenableFuture deprecated
Spring Boot 2 → Spring Boot 3이 되면서 ListenableFuture이 deprecated 되었습니다. ListenableFuture 대신 CompletableFuture을 사용하면 됩니다. 아래 링크는 관련 공식 문서 입니다.
https://docs.spring.io/spring-kafka/reference/kafka/sending-messages.html#examples
KafkaProducerController.java
package com.dutmdcjf.test.kafka.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.dutmdcjf.test.kafka.moel.Message;
import com.dutmdcjf.test.kafka.service.KafkaProducerService;
import lombok.RequiredArgsConstructor;
@RestController
@RequiredArgsConstructor
public class KafkaProducerController {
private final KafkaProducerService kafkaProducerService;
@RequestMapping("/publish")
public String publishJson(Message message) {
kafkaProducerService.send(message);
return "publish message>>> name: " + message.getName() + ", message: " + message.getMessage();
}
}
Consumer 작성
KafkaConsumerConfig.java
package com.dutmdcjf.test.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private static final String BOOTSTRAP_SERVER = "localhost:9092";
private final static String GROUP_ID = "group_first";
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // --from-beginning 처럼 첫 레코드부터 읽음
return new DefaultKafkaConsumerFactory<>(props);
}
}
- ConcurrentKafkaListenerContainerFactory:
Consumer는 메시지를 받아들이고 처리하는 작업을 담당합니다. 이를 위해 Kafka 리스너를 설정해야 합니다.
Kafka 리스너를 설정해주기 위해 Kafka 리스너 컨테이너(ConcurrentKafkaListenerContainerFactory)를 생성해야합니다. ConcurrentKafkaListenerContainerFactory에 ConsumerFactory를 등록하여 Kafka 메시지를 처리하는데 필요한 설정을 해줍니다.
KafkaConsumer.java
package com.dutmdcjf.test.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.dutmdcjf.test.kafka.moel.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
public class KafkaConsumer {
private static final String TOPIC_NAME = "topic1";
ObjectMapper objectMapper = new ObjectMapper();
@KafkaListener(topics = TOPIC_NAME)
public void listenMessage(String jsonMessage) {
try {
Message message = objectMapper.readValue(jsonMessage, Message.class);
System.out.println("Kafka Listen>>> name: " + message.getName() + " message: " + message.getMessage());
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 지정한 주제(여기서는 topic1)로 메시지가 들어오면 Kafka Listener는 메시지를 읽습니다.
반응형
'Kafka' 카테고리의 다른 글
Kafka Kafka명령어, local 환경에 Kafka Cluster 구축 (1) | 2024.02.13 |
---|---|
Kafka 카프카(Kafka)란? (0) | 2024.02.07 |