728x90
이 포스팅은 https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/의 내용을 바탕으로 포스팅 한 것이다. 
Spring Cloud Stream은 기본적으로 kafka를 사용한다.
rabbitmq의 예제는 아직까지는 추가할 생각이 없기 때문에 다른 문서를 참고하기를 바란다.
그리고 카프카의 실행과 프로듀싱에 대한 예제도 여기서는 기본적으로 다루지 않는다.
다만 1장에서 간략하게 예제들이 있으므로 간략하게 알고싶다면 이를 참고하라.

Spring Cloud Stream을 사용하기에 앞서서 이 포스팅은 카프카와 연동하기 때문에 카프카를 간략히 설치하고 실행하는 법을 보도록하자.

 

wget http://apache.mirror.cdnetworks.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz

먼저 카프카를 설치하자.

설치는 공식 홈페이지를 하건 wget을 하건 그 외의 무슨 방법이든 일단 설치만 하자.

일단 필자는 wget으로 설치할 것이다.

 

그러면 하위에는 폴더가 위처럼 있는데 bin은 당연하지만 실행파일이 존재한다.

결국 주키퍼와 카프카를 모두 실행해야한다.

주키퍼와 카프카는 아무설정을 하지 않으면 각각 포트가 2181과 9092이다.

 

./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
./bin/kafka-server-start.sh -daemon config/server.properties
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

여러번 했던 이야기지만 카프카가 설치되면 자동으로 주키퍼도 설치된다.

물론 따로 주키퍼를 설치할 수도 있다.

zookeeper-server-start.sh를 사용해서 주키퍼를 실행해준다.

그 후 kafka-server-start.sh를 사용해서 카프카를 실행한다.

기본적으로 카프카는 실행되면 input이라는 토픽을 자동으로 가지고 있다.

즉 default토픽인 셈이다.

테스트용으로 우리는 test이라는 토픽을 추가로 만들기로 하자.

 

이제 카프카의 준비는 끝났으므로 spring cloud stream 프로젝트를 만들기로 하자.

https://start.spring.io/의 initializer를 사용해서 만들 것이다.

 

이니셜라이저는 여러분이 원하는데로 만들면된다.

하지만 몇가지 필자는 보편적인 설정들을 할것인데 이렇게 하는걸 추천한다.

 

Gradle을 지정해주자.

 

그 다음 아래에 Dependencies에서 검색을 눌러준다.

이걸 누르면 검색해서 찾을 수 있는데 stream을 검색하자.

 

그러면 Cloud Stream이 나오는데 우리가 원하던거니까 +를 눌러주자.

 

그 다음 카프카를 검색하면 여러가지가 뜨는데 Kafka Stream이나 Kafka를 선택하면되는데

우리는 Kafka를 선택해주자.

 

그러면 위처럼 둘이 선택되는데 이제 프로젝트로 내부내준다.

 

이걸 이제 IDE로 열어주자.

필자는 IntelliJ를 쓸건데... 여러분도 걍 이거 쓰는걸 추천한다.

 

Spring Cloud Stream과 Spring for Apache Kafka를 선택하면 위처럼 3가지의 dependencies가 추가된다.

 

이제 준비는 다 끝났으니까 코드를 작성해보도록하자.

 

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void handler(String person) {
        System.out.println("Received : " + person);
    }
}

예제는 아주 간단하다.

그런데 앞으로 이 예제가 복잡해 질 것이다.

일단 지금 코드들의 설명을 보도록하자.

 

@EnableBinding(Sink.class)

이 애너테이션을 사용해야 하는 Sink라는 클래스를 바인딩 해주면 바인딩이 일단은 활성화된다.

이제 StreamListener을 사용할 수 있게 되었다.

 

이 부분이 조금 안와 닿을 수 있는데 일반적으로 가장 기본적인 개념에서 Sink라는 클래스는 입력을 담당한다.

내부 로직을 보면 정말 아무것도 없는 interface라는 것을 알 수 있다.

 

public interface Sink {

    /**
     * Input channel name.
     */
    String INPUT = "input";

    /**
     * @return input channel.
     */
    @Input(Sink.INPUT)
    SubscribableChannel input();

}

Sink는 내부적으로 아무것도 없는 interface이다.

다만 이 형식을 따라서 바인딩이 될 뿐이다.

여기서 @Input안의 문자열이 토픽이 된다. 이는 rabbitmq나 kafka나 마찬가지이다.

즉 default토픽은 input이라는 이야기이다.

하지만 application.yml에서 토픽을 수정하면 input은 무시되고 거기서 만든 토픽으로 덮어 쓰게된다.

또한 Input이 붙은 녀석의 함수 이름은 아무 의미가 없는게 아니라 의미를 가진다.

바로 Enable할때 이 녀석의 함수의 이름을 Bean으로 등록하게된다.

이는 나중에 appliation.yml을 바인딩할 때 중요한 지표가된다.

 

@StreamListener(Sink.INPUT)
public void handler(String person) {
    System.out.println("Received : " + person);
}

@EnableBinding으로 SpringCloudStream에서 기본적으로 제공하는 Sink를 바인딩 함으로써

리스너를 사용할 수 있게 되었다.

여기서 중요한건 파라메터인데 보내는 쪽에서는 보내는데로 받고싶다면,

Object나 String으로 사용해야한다.

즉 위의 코드는 나는 주는 대로 받고 처리는 내부에서 하겠다라는 뜻이된다.

반대로 일종의 json형태를 띄고 있다면 클래스로 변환할 수도 있다.

 

리스너는 반드시 한개여야할 필요는 없다.

한개가 아니라 2개, 3개 n개도 가능하다.

그냥 토픽이 오는 순간 바로 리스닝해서 실행하는게 가능하다.

 

그럼 실행해보자.

asdf라는 카프카 메세지를 보내보자

 

그러면 받는데 성공할 것이다.

 

여기서 몇가지 상황에 대해서 더 알아보자.

 

@StreamListener(Sink.INPUT)
public void handler(Person person){
    System.out.println("Received : " + person);
}

만약 받는 값이 String이 아니라 객체일 경우 어떻게 될까?

 

public class Person {
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Person{" +
                "name='" name '\'+
                '}';
    }
}

Person 객체는 name이라는 값을 받는다면 더이상 일반적인 문자열은 받을 수 없다.

특정한 객체로 값을 받으려면 무조건 json으로만 보낼 수 있다.

 

보낼때 문자열을 json 양식으로 맞춰서 보내줘야한다.

 

그러면 아무것도 안해도 자동으로 객체에 매핑이 된다.

 

또한 우리는 아직 application.yml설정을 아무것도 하지 않았다.

그럴경우 토픽은 input을 향해가는줄 알지만 컨슈머는 어떻게될까?

 

이 경우 그룹은 anonymous가 되며 만들때 마다 그때그때 임시 ID가 붙게된다.

특정 groupId를 사용하고싶다면 application.yml을 손봐야한다.

 

spring.cloud.stream:
  kafka:
    binder:
      brokers: localhost
      defaultBrokerPort: 9092
  bindings:
    input:
      destination: test
      group: mygroup

보통 아무리 적어도 이정도는 수정하게 되는 경우가 많다.

첨언하자면 의존성에서 rabbitmq binder혹은 kafka binder가 단 하나만 존재한다면 굳이 바인더를 호출하지 않아도

알아서 각각의 바인더에 붙는다.

우리가 위에 application.yml에 아무 손을 대지 않아도 자동으로 kafka binder에 붙었던건

다른 binder를 호출하지 않았기 때문이다.

다만 포트나 ip를 변경하려면 명시적으로 호출해 줘야한다.

 

여기서 눈여겨 봐야할 부분은 spring.cloud.stream.bindings 부분이다.

input이라는 녀석이 보이는데 이 녀석은 아까 우리가 Sink클래스 내부를 확인했을때,

함수의 이름이 input이였던걸 기억할 것이다.

그 때 Bean으로 등록한다고 했었는데 그 Bean의 이름이다.

즉 토픽이름이거나 단순 환경변수 이름은 아니다.

아래의 destination은 토픽인데 이 녀석을 쓰면 이제 아까 Sink클래스에서 정의한 input을 무시하게된다.

우리는 한번 test로 덮어서 써보자.

마지막으로 컨슈머 그룹역시 이를 통해서 지정해 줄 수 있다.

 

모든 걸 수정하고 나서 다시 한번 해보도록 하자.

test로 바꿨으니 test로 보내보자.

 

그럼 잘 전송되는걸 확인할 수 있다.

+ Recent posts