728x90
이 포스팅은 https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/의 내용을 바탕으로 포스팅 한 것이다. 
Spring Cloud Stream은 기본적으로 kafka를 사용한다.
rabbitmq의 예제는 아직까지는 추가할 생각이 없기 때문에 다른 문서를 참고하기를 바란다.
그리고 카프카의 실행과 프로듀싱에 대한 예제도 여기서는 기본적으로 다루지 않는다.
다만 1장에서 간략하게 예제들이 있으므로 간략하게 알고싶다면 이를 참고하라.

참고
[Kafka][Java][SpringBoot][SpringCloudStream]Spring Cloud Stream 사용하기 - (1)

저번에 기본적으로 input bind를 Sink 클래스를 사용해서 해보았다.

하지만 기본적으로 cloud stream을 사용하려면 output도 중요하다.

다만 Sink는 input밖에 없기 때문에 사용할 수 없었던 것이고 그럼 output을 사용해서

특정 데이터가 들어왔을 때 그걸 가공해서 외부로 다시 보내는 작업을 해보도록하자.

 

일단 앞장에서 설명하지 않았던 개념들을 몇개 설명하도록 하자.

 

자주 보는 그림이며 컬러 스키마는 이클립스로 되어있는걸 보면 좀 된거 같다.

여튼 여기서 binder, binding이라는 용어가 존재한다.

이에 대한 공식 docs에 대한 설명은 다음과 같다. 

 

Destination Binders: Components responsible to provide integration with the external messaging systems.

Destination Bindings: Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by the Destination Binders).

Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).

영어를 한글로 해석하자면 대충 아래와 같다. 위의 그림을 보면서 이해하면 편하다.

 

Destination Binders: 외부 메세지 시스템과 내부 어플리케이션을 연결하는 접합부이다. 

Destination Bindings: 외부 메세지 시스템과 내부 어플리케이션의 연결이다.

Message: 생산자(만드는 놈)와 소비자(받는 놈)의 연결(바인더)에 소통하는 데이터. 그냥 쉽게 말해서 주고받는 데이터를 의미

 

spring.cloud.stream:
  kafka:
    binder:
      brokers: localhost
      defaultBrokerPort: 9092
  bindings:
    input:
      destination: test
      group: mygroup

저번에 썼던 application.yml을 다시 설명하면서 보면 이해하기 편할 것이다.

우리는 binder를 설정하고 bindings를 설정했다.

위의 경우 kafka라는 바인더를 설정했고 또한 input이라는 바인딩을 설정했다.

 

cloud stream은 여러 바인더 상황에 대처하게 고려되어 있다.

가령 전혀 다른 카프카 두대를 사용하고 싶다?

공식 예제를 보면 답을 알 수 있다.

보면 전혀 다른 카프카 두개를 사용하는 예제가 보인다.

 

그럼 카프카 바인더랑 rabbitmq 바인더 둘다 사용할래! 이런것도 가능할까?

당연하지만 사용할 수 있다.

여기서 bindings에서 우리는 binder키워드를 사용하지 않았는데

이 키워드를 사용하지 않으면 기본적으로 선택된 메세지큐를 사용하게 된다.

우리가 이때까지 이 binder를 사용하지 않았던 것은 어짜피 kafka 딱 한개만 쓰기 때문이다.

 

공식 홈페이지에 가보면 여러가지 메세지큐 시스템을 지원하는 것을 알 수 있다.

 

그럼 이제 어느정도 설명을 했으니까 output예제를 만들어 보도록하자.

 

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.handler.annotation.SendTo;

@SpringBootApplication
@EnableBinding({Sink.class, Source.class})
public class LoggingConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    @SendTo(Source.OUTPUT)
    public String handler(String str) {
        System.out.println("Received : " + str);
        return str + "!!";
    }
}

저번 예제와 별 다를거 없는 것 같지만 사실 엄청난 차이가 추가됬다.

바로 SendTo와 Source이다.

 

EnableBinding에는 이제 Source.class도 함께 추가해준다.

배열로 넣어주면된다.

 

저번에 Sink는 입력 전용이라고 했다.

여기서 출력이 가능하려면 Source를 추가해줘야한다.

Source는 출력 전용이다.

 

public interface Source {

   /**
    * Name of the output channel.
    */
   String OUTPUT "output";

   /**
    * @return output channel
    */
   @Output(Source.OUTPUT)
   MessageChannel output();

}

Source역시 Sink와 마찬가지로 별 내용이 없다.

여기서 말하는 OUTPUT이 가진 변수 output은 기본 토픽이다.

다행히도 카프카는 기본적으로 output이라는 토픽을 기본적으로 가지고 있다.

만약 여러분이 토픽을 바꾸고싶다면 전장을 확인해서 input을 바꿨던것처럼 똑같이 바꾸면된다.

 

위의 로직은 input으로 들어와서 나갈 때 느낌표가 두개 붙어서 다시 토픽으로 쏘게 된다.

결과를 확인해보자.

 

토픽으로 input으로 넣어주면

 

Spring Cloud Stream을 통해서

 

output토픽으로 나오게된다.

 

그럼 output도 토픽이나 그룹 바꾸고싶다면 어떻게 해야하느냐?

앞의 포스팅을 봤다면 사실 알껀데 그래도 다시 설명하겠다.

 

public interface Source {

   /**
    * Name of the output channel.
    */
   String OUTPUT "output";

   /**
    * @return output channel
    */
   @Output(Source.OUTPUT)
   MessageChannel output();

}

Source의 OUTPUT 프로퍼티의 output값은 아무것도 설정하지 않을 때의 기본 토픽이다.

그리고 아래의 output이라는 메소드의 이름은 @Output으로 인해 Bean으로 등록된다.

즉 우리는 output bean을 application.yml로 덮어 써야한다.

 

spring.cloud.stream:
  kafka:
    binder:
      brokers: localhost
      defaultBrokerPort: 9092
  bindings:
    input:
      destination: test
      group: mygroup
      binder: kafka
    output:
      destination: tost
      group: mygorup
      binder: kafka

그러면 spring.cloud.stream.bindings의 output을 지정해준다.

이 녀석은 output Bean을 의미하는 녀석이다.

만약 custom output을 만든다면 이 output이름을 반드식 기억해둬야한다.

그리고 destination으로 topic을 바꿔준다.

필자는 tost로 바꿔줄 것이다.

컨슈머 그룹 설정하고 binder는 kafka로 지정하는데 위에서도 줄기차게 이야기했지만

우리는 하나만 쓰므로 지정안해도 문제는 없다.

 

토스트라는 토픽을 따로 만들어주고

 

test 토픽으로 전송하먄

 

tost 토픽으로 다시 들어가는걸 확인할 수 있다.

 

또한 Sink와 Source를 합한 개념인 Processor라는 것이있다.

입출력을 따로하지않고 한꺼번에 정의하고싶다면 그냥 Processor로 하는게 편하다.

 

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;

@SpringBootApplication
@EnableBinding(Processor.class)
public class LoggingConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String handler(String str) {
        System.out.println("Received : " + str);
        return str + "!!";
    }
}

Processor는 사실 Sink와 Source를 합한 녀석이다.

뜯어보면 알 수 있다.

 

public interface Processor extends SourceSink {

}

그래서 완전하게 같은 결과를 내는 것이다.

+ Recent posts