728x90

최근 Kafka를 배우면서 Spring과 어떻게 연결할 수 있을까 고민하던차에 이미 그런게 존재한다는걸 알았다.

그런데 여기에 대해서 자세히 알려주는 곳이 없어서 좀 고생을 했었다.

결국 해결 방법들을 몇개 알아냈는데 이는 그 여러가지 방법중 가장 심플한 방법이다.

실제 프로덕션 레벨로 올 릴 수 있을 정도는 아니므로 프로덕션 레벨을 원한다면은 다른걸 알아보는게 좋다.

 

 

위의 자료들을 많이 참고 했지만 시중에 나와있는 예제들과는 좀 다를 수 있다.

쓰는 방식이 시시각각 변하고 있으므로 이용시에는 주의를 요한다.

 

해당 포스팅에서는 docker를 사용해서 배포를 한다.

만약 도커를 사용하지 않는다면 도커 부분은 빼고 직접해봐도 무방하다.

또한 kafka설정같은 기초적인 것부터 시작해서 실제로 Spring으로 동작시키는것 까지 설명할 것이다.

 

https://github.com/kukaro/Eris-DockerExampleTemplate/tree/master/Ubuntu-Sshd-Kafka-SpringBootCloudStream

전체 예제 코드는 위에서 확인할 수 있다.

 

일단 연동하기전에 우리가kafka binder를 사용할 것인지 아니면 kafka stream binder을 사용할 것인지 알아야한다.

사용하는 라이브러리도 조금 다르고 사용하는 방식도 조금 다르다.

여기 예제는 kafka binder를 사용할 것이다.

둘의 차이는 kafka binder는 메시지를 한개씩 처리하는 거고

kafka stream binder는 메시지를 타임 슬라이스로 잘라서 묶어서 처리하는 것이다.

 

1. kafka설치 및 구동(feat. zookeeper)

먼저 대상 서버에 zookeeper와 kafka를 설치하고 둘을 실행해줘야한다.

 

#install kafka
RUN apt-get install -y tar
RUN apt-get install -y openjdk-8-jdk
RUN cd ${HOME} && wget http://apache.mirror.cdnetworks.com/kafka/${KAFKA_VER}/kafka_2.12-${KAFKA_VER}.tgz
RUN cd ${HOME} && tar -xzf kafka_2.12-${KAFKA_VER}.tgz
RUN cd ${HOME} && rm kafka_2.12-${KAFKA_VER}.tgz
RUN cd ${HOME} && mv kafka_2.12-${KAFKA_VER} kafka

kafka를 설치하려면 공식 홈페이지에서 설치하면 된다.

tar과 8버전이상의 java가 필요하므로 설치해준다.

wget도 필요하다면 설치해주어야한다.

만약 실 도커에서 하고싶지 않다면 그냥 RUN떼고 실행해주면된다.

 

zookeeper를 설치하지 않았는데 설치해도 되고 설치 안해도된다.

왜냐하면 zookeeper는 kafka안에 기본적으로 내장되어 있기때문이다.

물론 따로 설치해줘도 상관 없다.

 

설치가 끝난 후에는 설정을 바꿔줄 필요가 있다.

 

설치된 kafka의 디렉터리에서 config/server.properties에가 보면,

위의 경우 36번째 라인인 listeners=PLAIN~~~~ 부분이 #으로 주석처리되어 있다.

이 부분을 사진처럼 바꿔줘야한다.

즉 주석을 지워주고 자신의 IP와 PORT를 적어준다.

 

여기서 PORT는 기본포트를 쓸거면 그냥 냅두면 되는데 IP는 반드시 자신의 IP를 적어줘야한다.

자신의 아이피를 보려면 hostname을 쓰면된다.

이를 스크립트로 쉽게 하고싶다면 아래와 같이 입력하면된다.

 

cd kafka #카프카가 설치되어있는 디렉터리로 이동한다.
hostname=`hostname -I | tr -d ' '`
sed -ri "s/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/listeners=PLAINTEXT:\/\/${hostname}:9092/" config/server.properties

위의 코드를 해석하면 hostname으로 ip를 뽑아낸다.

그런데 해당 명령어는 뒤에 공백이 있어서 tr로 제거해준다.

그다음 sed명령어로 덮어써준다.

 

이를 왜 해주냐면 위의 작업을 해주어야 외부 서버에서 접근할 수 있다.
위 작업을 안해주면 내부에서만 접근가능하다.
만약 다른 컴퓨터가 아니라 하나의 컴퓨터에서 하고 있다면 위 작업은 해줄 필요가 없다.

 

2. Kafka 실행하기

kafka를 실행한다는 의미는 아래의 의미를 함축하고 있다.

 

1. Kafka는 무조건 zookeeper의 실행을 우선시한다. 따라서 zookeeper server를 실행해야한다.
2. 그 다음 당연히도 kafka server를 실행한다.
3. kafka가 일을 하려면 topic이 필요하므로 topic도 생산해야한다.

 

즉 위의 세가지를 순서대로 실행해야한다.

 

cd kafka #카프카가 설치되어있는 디렉터리로 이동한다.

# zookeeper on
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties


# kafka on
./bin/kafka-server-start.sh -daemon config/server.properties

sleep 5s
# make topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor --partitions --topic test

위의 세가지를 순서대로 실행한다.

저기서 5초의 유예 시간을 뒀는데 사실 여러분이 도커로 돌릴께 아니라면 그냥 가만히 기다리면된다.

보통 5초 정도면 서버가 뜨기 때문에 기다려주면된다.

topic의 이름은 test로 하기로 하자.

 

3. spring cloud stream 설정하기

필수적인 gradle을 몇개 보도록 하자.

 

// https://mvnrepository.com/artifact/org.apache.kafka/kafka
implementation group'org.apache.kafka'name'kafka_2.12'version'2.3.0'

// https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter
implementation group'org.springframework.boot'name'spring-boot-starter'version'2.2.2.RELEASE'

// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka
implementation group'org.springframework.cloud'name'spring-cloud-stream-binder-kafka'version'3.0.0.RELEASE'

위의 셋은 필수인데 먼저 카프카를 추가해준다.

이는 카프카 설정을 쓰려면 필요한다.

그리고 spring boot를 추가하고 spring cloud stream kafka binder를 설치한다.

이 셋만 있으면 예제를 동작할 수 있다.

 

4. spring cloud stream 자바 예제 코딩하기

프로젝트 구조는 위와 같다.

코드는 딸랑 하나이다. 심플하다. 간단하다.

중요한건 application.yml일 것이다.

그렇지만 일단 App.java부터 보도록 하자.

 

package KafkaProject;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.util.function.Consumer;

@SpringBootApplication
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }

    @Bean
    public Consumer<String> log() {
        return str -> System.out.println("Received: " + str);
    }
}

코드는 건단하다 못해 허탈하다.

심지어 중요한건 log함수밖에 없다.

예제수준이라서 짧은 것도 있지만 실제로도 길지는 않다.

중요한건 해당 범위는 SpringBootApplication에 속해야하며,

stream에 연결된 녀석은 Bean으로 지정해줘야 한다.

만약 다른 클래스에서 선어해서 쓰려고 한다면 해당 클래스는 @Component를 해줘야한다.

여기서는 SpringBootApplication 직속이므로 다행히도 그런 작업을 해줄 필요는 없다.

 

log함수는 Kafka에서 데이터를 받아서 처리를 하는 로직이다.

이 이름을 기억해둬야한다.

이제 application.yml을 보자

 

server.port0

spring.cloud.stream:
  kafka:
    binder:
      brokers: localhost
      defaultBrokerPort: 9092
  bindings:
    log-in-0:
      destination: test
      group: mygroup0

server.port를 0으로 정하면 spring의 서버 포트를 랜덤으로 정하게된다.

이는 다른 예제들과 포트를 안겹치게 하려고 해놓은 조치이다.

아무 설정 하지 않으면 내부 톰캣이 작동하므로 포트는 8080이된다.

 

brokers는 연결해줄 IP를 선택한다.

아무것도 선택하지 않으면 localhost가 된다.

다만 이는 필자는 docker 바인딩 때문에 localhost를 쓴거고 다른 서버라면 그 ip에 맞게 지정해줘야한다.

defaultBrokerPort는 기본값이 9092이다. 포트를 바꿔줬다면 이 역시 바꿔줘야한다.

defaultBrokerPort를 안적고 바로 위의 brokers에 포트를 바로 붙혀줘도 된다.

 

이제 중요한건 topic과 consumer group, streaming함수의 설정이다.

먼지 bindings아래의 log-in-0라는 것이 보일 텐데

이 log는 설마 위의 함수의 log인가? 라고 생각하면 맞다.

우리가 선언했던 함수의 이름을 기억해둬야한다고 했는데 그 함수의 이름이다.

그 다음 -뒤에 in 받아들인다는 뜻이다.

나가는것은 out을 사용하면 되긴하는데 이 예제에서 내보내는것은 하지 않겠다.

원한다면 해주면된다. 그리고 함수의 타입은 Consumer가 아니라 Function이 되야한다.

그래야 반환을 하니까.

(Consumer클래스는 반환값이 없는 함수형 인터페이스)

 

destination은 topic을 지정한다. 이는 rabbitmq를 사용해도 마찬가지이다.

그리고 group은 consumer group을 의미한다. 아무거나 하면되므로 원하는걸로 해준다.

 

이제 구동시켜보자.

 

5. 예제 구동

cd ~/kafka
hostname=`hostname -I | tr -d ' '`
./bin/kafka-console-producer.sh --broker-list "${hostname}":9092 --topic test

프로듀서를 구동한다.

우리는 카프카 메세지를 넣어볼 것이다.

 

또한 spring프로젝트 역시 빌드해서 구동시킨다.

프로듀서를 구동한후 안녕하세요를 입력해본다.

 

그 다음 데이터가 spring에서 처리되는것을 확인할 수 있다.

 

사실 이 정도 예제로 실서버에 적용하기에는 무리가 있으므로

위에 첨부했던 자료들을 많이 읽어보기를 권한다.

+ Recent posts