728x90


https://www.rabbitmq.com/getstarted.html


AMQP와 돌아가는 로직에 대한 전반적 설명은 해당 사이트에 잘 설명되어 있으니 보도록하자.


rabbitmq를 처음 배우는 입장에서는 공식홈페이지에서 제공해주는 예제가 아주아주 좋다.

그래서 예제만 사용해도 되긴 하는데 추가적인 부연설명이 필요할것 같아서 이렇게 작성한다.


여기서는 1번 Hello World가 아주 태표적인 예제인데 이를 해보도록하겠다.

예제를 시행하는 언어들이 아주 많으나 여기서는 Java를 사용해서 해보도록 하겠다.



자바를 사용하기위해서는 RabbitMQ Java Client라이브러리가 필요하다.

해당 라이브러리를 설치해 주도록 하자.


해당예제에서는 위 같은 구조를 만들려고 한다.


Producer(생산자) : 위 그림의 P에 해당한다. 메시지를 만드는 녀석이다.

Queue : 위 그림의 hello에 해당한다. RabbitMQ는 큐의 이름을 정할 수 있는데 위의 경우 큐의 이름은 hello이다.

Consumer(소비자) : 위 그림의 C에 해당한다. 메시지를 사용하는 녀석이다.


위의 상황을 구현하려면 당연히 서버는 켜져있어야한다.



서버를 켜는 것은 이전 포스팅을 참고하라.


해당 경로의 파일을 수정해서 포트의 번호를 바꿀 수 있다.


GUI포트는 위를 읽어보면 바꿀 수 있다.


즉 우리는 언제나 포트가 바뀔수 있다고 생각해야한다.

그럼 코드를 보도록하자.


Producer

package net.theceres;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(35672);
factory.setUsername("uguest");
factory.setPassword("pguest");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
for (int i = 0; i <= 100000; i++) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!" + (int) (Math.random() * 100);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Set '" + message + "'");
Thread.sleep(10);
}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

위의 코드는 아주 간단한 코드이지만 아주 중요한 코드이다. 해석해보도록하자.


ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(35672);
factory.setUsername("uguest");
factory.setPassword("pguest");

ConnectionFactory를 Connection을 만들어준다.

우리는 host와 port, username과 password를 정해주면된다.

만약 port가 5672라면, username과 password가 guest면서 같은 local에서 사용한다면 위 코드들중 일부는 없어져도 된다.


Connection connection = factory.newConnection(); Channel channel = connection.createChannel()

위의 코드는 connection을 만들고 channel을 만든다.


Connection과 Channel에 대한 설명은 위와 같다.


Connection - Application과 RabbitMQ Broker사이의 TCP 연결

Channel - connection내부에 정의된 가상의 연결. queue에서 데이터를 손볼 때 생기는 일종의 통로같은 개념


즉 우리는 데이터를 보내기위해서는 위와 같이 connection으로 연결한후 channel이라는 통로 역시 만들어줘야한다는 것이다.


channel.queueDeclare(QUEUE_NAME, false, false, false, null);

이제 이 channel에는 queue역시 만들어 줘야한다.

각각의 의미하는 바를 모를건데 아래를 보자.


https://rabbitmq.github.io/rabbitmq-java-client/api/current/


docs를 보면 각각의 의미하는 바를 알 수 있다.

아직 중요치는 않으니 모두 false로 해두지만 경우에 따라 설정할 수 있다.

queue를 만드는 로직이지만 이미 존재한다면 다시 만들지는 않고 기존에거를 그대로 사용한다.


channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

이제 publish해주면 제대로 데이터가 보내지는걸 확인할 수 있다.



basicPublish중에 첫번째 파라메터를 빈문자열을 넘겨주는데 이는 exchange이다.

그런데 이번 구조에서는 exchange가 필요 없기 때문에 여기서는 설명하지 않겠다.

빈문자열을 넘겨준다는건 기본 exchange를 사용하겠다는 뜻이다.


그럼 Producer의 설명이 끝났으니 Consumer를 보도록 하자.


Consumer


package net.theceres;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(35672);
factory.setUsername("uguest");
factory.setPassword("pguest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

이제 받는 코드인데 중복된 설명은 빼도록 하자.


DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};

이 코드는 당연히 외부의 데이터를 받는 코드이다.


channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

이 코드 후부터는 이제 종료되지 않고 계속해서 리스닝해서 데이터를 받게된다.


이제 한번 돌려보자.




각각 돌려보면서 모니터링을 어떻게 하는지 보도록하자.

+ Recent posts