티스토리 뷰

오늘은 SpringBoot와 Apache Kafka를 활용하여 간단한 메시징 예제를 만들어보겠습니다. Apache Kafka는 분산 스트리밍 플랫폼으로, 대규모 데이터 처리 및 실시간 데이터 스트리밍에 강력한 도구입니다. 

 

1. Apache Kafka 소개

Apache Kafka는 분산형 스트리밍 플랫폼으로, 대량의 실시간 데이터를 처리하고 저장하는 데 특화된 시스템입니다. LinkedIn에서 개발되어 현재 Apache Software Foundation에서 관리하고 있으며 아래와 같은 특징을 가지고 있습니다.

  1. 메시지 브로커: Kafka는 생산자(Producer)와 소비자(Consumer) 사이에서 메시지를 전송하는 역할을 합니다. 데이터를 토픽(Topic)으로 구분하여 관리하고, 각 토픽은 여러 파티션(Partition)으로 나뉘어 데이터의 병렬 처리를 지원합니다.
  2. 높은 처리량과 내구성: Kafka는 높은 처리량을 자랑하며, 데이터를 디스크에 기록하여 내구성을 보장합니다. 또한, 데이터를 여러 서버에 복제하여 장애에 대한 복원력을 강화합니다.
  3. 확장성: Kafka는 클러스터 형태로 배포되어 수평 확장이 용이합니다. 노드를 추가하여 시스템의 용량과 성능을 쉽게 확장할 수 있습니다.
  4. 실시간 스트리밍: Kafka는 실시간 데이터 스트리밍을 지원하여, 실시간으로 데이터를 수집하고 처리할 수 있습니다. 이를 통해 이벤트 기반 아키텍처를 구현하거나 로그 분석, 데이터 파이프라인 구축 등에 활용할 수 있습니다.
  5. 내장된 로그 저장: Kafka는 데이터를 로그 형태로 저장하여, 데이터가 발생한 시점의 정확한 상태를 추적할 수 있습니다. 이러한 로그는 필요에 따라 재처리할 수 있어, 데이터의 재처리와 분석에 유용합니다.

 

2. Kafka 서버 실행

https://kafka.apache.org/downloads
  • 위 경로로 접속하여 다운로드 받습니다.
  • 저는 윈도우 환경에서 진행하였으므로 "Binary downloads" 다운받았습니다.

 

2.1. zookeeper 서버 실행

설치된 경로로 이동한 후, cmd 창을 열어 아래와 같이 명령어를 수행합니다.

D:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
  • "입력 줄이 너무 깁니다." 오류가 발생할 수 있어 적당한 경로로 이동후 실행시켜주세요.

 

2.2. kafka 서버 실행

D:\kafka>bin\windows\kafka-server-start.bat config\server.properties

 

2.3. 토픽 생성

# 토픽 생성
D:\kafka>bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --topic test_topic

# 토픽 확인
D:\kafka>bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

 

3. SpringBoot 연동

3.1. build.gradle

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.3.3'
    id 'io.spring.dependency-management' version '1.1.6'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    // basic
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'

    // kafka
    implementation 'org.springframework.kafka:spring-kafka'

    // lombok
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'

    // undertow
    implementation 'org.springframework.boot:spring-boot-starter-undertow'
}

tasks.named('test') {
    useJUnitPlatform()
}

configurations {
    configureEach {
        // was tomcat 제외
        exclude module: 'spring-boot-starter-tomcat'
    }
}

 

3.2. application.yml

server:
  port: 1234

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

 

3.3. KafkaProducer.java

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

  private static final String TOPIC = "test_topic";
  private final KafkaTemplate<String, String> kafkaTemplate;

  public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  public void sendMessage(String message) {
    kafkaTemplate.send(TOPIC, message);
    System.out.println("Produced message : " + message);
  }

}

 

3.4. KafkaConsumer.java

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

  @KafkaListener(topics = "test_topic", groupId = "test-group")
  public void listen(String message) {
    System.out.println("Consumed message : " + message);
  }
}

 

3.5. KafkaController.java

import com.example.kafka_tester.service.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class KafkaController {

  private final KafkaProducer kafkaProducer;

  @GetMapping("/send")
  public String sendMessage(@RequestParam("message") String message) {
    kafkaProducer.sendMessage(message);
    return "Message sent to kafka Topic";
  }
}

 

4. 실행

최근에 올라온 글
Total
Today
Yesterday