[✉️ Kafka] Kafka 설치 및 예제로 실습해보기

2024. 9. 27. 20:06·DevOps
728x90
반응형

Docker를 사용해 Kafka 설치하기

 

Docker 관련 포스팅은 👇🏻

2024.08.20 - [Docker] - [🐳 Docker] Dockerfile & Docker Compose 사용하기

 

[🐳 Docker] Dockerfile & Docker Compose 사용하기

Dockerfile  Dockerfile : 컴퓨터에서 돌아가는 앱을 만들기 위한 레시피 같은 존재. Dockerfile을 토대로 DockerImage 생성 가능. DockerImage는 앱을 실행하는데 필요한 모든 것을 담고 있다. Dockerfile 예제 ># D

tildacoderecorder.tistory.com

 


 

1️⃣  kafka 컨테이너를 생성할 docker-compose.yml 파일 작성

 

version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    platform: linux/amd64
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: wurstmeister/kafka:latest
    platform: linux/amd64
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    platform: linux/amd64
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_READONLY: "false"

 

위와 같이 도커 컴포즈 파일 작성 후 콘솔창에서 파일이 작성된 위치로 이동해 도커 컴포즈를 실행한다.

docker compose up -d

 

 

→ 버전 관련 에러가 나서 zookeeper의 image를 wurstmeister/zookeeper:latest로 변경

 

 

실행 후 localhost:8080에 접속하면 Kafka UI를 확인 할 수 있다.

 

2️⃣  Producer Application

 

 

🔗 https://start.spring.io/ 에서 위와 같은 의존성을 추가해 프로젝트를 생성한다.

 

application.properties

spring.application.name=producer
server.port=8090

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

 

Kafka의 포트 번호는 9092 이다. 콘솔창에서 docker ps로 확인 가능.

 

ProducerApplicationKafkaConfig.java

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class ProducerApplicationKafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

ProducerController

import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class ProducerController {

    private final ProducerService producerService;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("topic") String topic,
                              @RequestParam("key") String key,
                              @RequestParam("message") String message) {
        producerService.sendMessage(topic, key, message);
        return "Message sent to Kafka topic";
    }
}

 

ProducerService

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class ProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;


    public void sendMessage(String topic , String key, String message) {
        for (int i = 0; i < 10; i++) {

            kafkaTemplate.send(topic, key, message + " " + i);
        }

    }
}

 

반복문은 메세지를 받으면 10개 정도 보내는 임의의 옵션.

 

3️⃣  Consumer Application

 

Producer Application과 같은 의존성으로 이름만 수정한 뒤 스프링 프로젝트를 생성해준다.

 

Application.properties

spring.application.name=consumer
server.port=8091

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

ConsumerApplicationKafkaConfig

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

// 이 클래스는 Kafka 컨슈머 설정을 위한 Spring 설정 클래스입니다.
@EnableKafka // Kafka 리스너를 활성화하는 어노테이션입니다.
@Configuration // Spring 설정 클래스로 선언하는 어노테이션입니다.
public class ConsumerApplicationKafkaConfig {

    // Kafka 컨슈머 팩토리를 생성하는 빈을 정의합니다.
    // ConsumerFactory는 Kafka 컨슈머 인스턴스를 생성하는 데 사용됩니다.
    // 각 컨슈머는 이 팩토리를 통해 생성된 설정을 기반으로 작동합니다.
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        // 컨슈머 팩토리 설정을 위한 맵을 생성합니다.
        Map<String, Object> configProps = new HashMap<>();
        // Kafka 브로커의 주소를 설정합니다.
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 메시지 키의 디시리얼라이저 클래스를 설정합니다.
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 메시지 값의 디시리얼라이저 클래스를 설정합니다.
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 설정된 프로퍼티로 DefaultKafkaConsumerFactory를 생성하여 반환합니다.
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    // Kafka 리스너 컨테이너 팩토리를 생성하는 빈을 정의합니다.
    // ConcurrentKafkaListenerContainerFactory는 Kafka 메시지를 비동기적으로 수신하는 리스너 컨테이너를 생성하는 데 사용됩니다.
    // 이 팩토리는 @KafkaListener 어노테이션이 붙은 메서드들을 실행할 컨테이너를 제공합니다.
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        // ConcurrentKafkaListenerContainerFactory를 생성합니다.
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 컨슈머 팩토리를 리스너 컨테이너 팩토리에 설정합니다.
        factory.setConsumerFactory(consumerFactory());
        // 설정된 리스너 컨테이너 팩토리를 반환합니다.
        return factory;
    }
}

 

ConsumerService

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ConsumerService {

    // 이 메서드는 Kafka에서 메시지를 소비하는 리스너 메서드입니다.
    // @KafkaListener 어노테이션은 이 메서드를 Kafka 리스너로 설정합니다.
    @KafkaListener(groupId = "group_a", topics = "topic1")
    // Kafka 토픽 "test-topic"에서 메시지를 수신하면 이 메서드가 호출됩니다.
    // groupId는 컨슈머 그룹을 지정하여 동일한 그룹에 속한 다른 컨슈머와 메시지를 분배받습니다.
    public void consumeFromGroupA(String message) {
        log.info("Group A consumed message from topic1: " + message);
    }

    // 동일한 토픽을 다른 그룹 ID로 소비하는 또 다른 리스너 메서드입니다.
    @KafkaListener(groupId = "group_b", topics = "topic1")
    public void consumeFromGroupB(String message) {
        log.info("Group B consumed message from topic1: " + message);
    }

    // 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드입니다.
    @KafkaListener(groupId = "group_c", topics = "topic2")
    public void consumeFromTopicC(String message) {
        log.info("Group C consumed message from topic2: " + message);
    }

    // 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드입니다.
    @KafkaListener(groupId = "group_c", topics = "topic3")
    public void consumeFromTopicD(String message) {
        log.info("Group C consumed message from topic3: " + message);
    }

    @KafkaListener(groupId = "group_d", topics = "topic4")
    public void consumeFromPartition0(String message) {
        log.info("Group D consumed message from topic4: " + message);
    }
}

 

  • 토픽이 같은데 컨슈머 그룹이 다른 경우
  • 토픽은 다른데 컨슈머 그룹이 같은 경우
  • 토픽과 컨슈머 그룹 모두 다른 경우

이렇게 3가지 경우로 나누어 테스트 진행.

 

 

4️⃣  확인

 

 

두 애플리케이션을 실행하고 Kafka UI 에서 Topics와 Consumers를 확인해보면 위와 같은 화면을 볼 수 있다.

 

🧪 테스트 1 : topic을 test-topic으로 지정하고 메시지 요청해보기

 

 

포스트맨을 사용해 다음과 같이 request를 보내고 ConsumerApplication의 로그를 확인해보면 아무것도 뜨지 않는걸 알 수 있다.

test-topic을 읽는 리스너가 존재하지 않기 때문이다.

Kafka UI를 확인해보면 test-topic이 생성되었고 메세지가 10개 생성되었음을 알 수 있다.

 

 

메시지를 확인해보면 key가 동일하기 때문에 같은 파티션으로 분리되었음을 알 수 있다.

그러나 위에서 말했듯 이 토픽을 읽는 리스너가 존재하지 않기 때문에 consumer 탭을 누르면 아무것도 뜨지 않는다.

 

🧪 테스트 2 : topic을 topic1로 지정하고 요청해보기

→ 토픽이 같고 그룹도 같은 경우

 

ConsumerApplication 로그를 확인해보면 그룹 A와 B가 같은 메시지를 받는 것을 알 수 있다.

 

🧪 테스트 3 : topic을 topic2로 지정하고 요청해보기

→ 토픽은 다른데 그룹이 같은 경우

 

로그를 확인해보면 그룹 C의 컨슈머 리스너만 동작 했음을 알 수 있다.

메시지들은 토픽을 대상으로 발행되기 때문에 같은 그룹에서 다양한 토픽을 받을 때 해당 토픽의 리스너만 처리된다.

위와 같은 원리로 topic3에 요청을 해도 group c의 컨슈머 리스너만 동작하며, topic4를 요청하면 group d의 컨슈머 리스너가 동작한다.

 

 

728x90
반응형
저작자표시 비영리 변경금지 (새창열림)

'DevOps' 카테고리의 다른 글

도커로 프로메테우스 & 그라파나 띄우기 + 프로비저닝 기능까지 1  (0) 2024.10.13
Docker로 Cassandra 띄우기 및 실습하기  (1) 2024.10.04
[✉️ Kafka] Kafka 이해하기2 - Zookeper, Broker, Message  (2) 2024.09.27
[✉️ Kafka] Kafka 이해하기1 - Producer, Consumer, Topic, Partition  (0) 2024.09.27
Docker로 pgAdmin 띄워서 PostgreSQL 손쉽게 활용하기  (1) 2024.09.25
'DevOps' 카테고리의 다른 글
  • 도커로 프로메테우스 & 그라파나 띄우기 + 프로비저닝 기능까지 1
  • Docker로 Cassandra 띄우기 및 실습하기
  • [✉️ Kafka] Kafka 이해하기2 - Zookeper, Broker, Message
  • [✉️ Kafka] Kafka 이해하기1 - Producer, Consumer, Topic, Partition
waVwe
waVwe
    반응형
  • waVwe
    waVwe 개발 블로그
    waVwe
  • 전체
    오늘
    어제
    • ALL (184)
      • Python (1)
      • Spring (15)
      • DevOps (10)
      • Git (6)
      • JAVA (4)
      • C (22)
      • 코테 문제 풀이 (124)
        • 프로그래머스 (43)
        • 백준 (2)
        • 정올 (64)
        • SW Expert Academy (1)
        • 온코더 oncoder (14)
  • 블로그 메뉴

    • 홈
    • 방명록
  • 링크

    • 🐙 Github
  • 공지사항

  • 인기 글

  • 태그

    연결리스트
    progate
    도커
    내일배움캠프
    코테
    알고리즘
    깃헙
    C
    devops
    자료구조
    자바
    docker
    Til
    스파르타코딩
    스파르타코딩클럽
    java
    스프링부트
    온코더
    CI/CD
    깃
    while문
    스프링
    형변환
    프로그래머스
    C언어
    springboot
    정올
    MSA
    이진트리
    아파치카프카
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
waVwe
[✉️ Kafka] Kafka 설치 및 예제로 실습해보기
상단으로

티스토리툴바