728x90


https://www.rabbitmq.com/tutorials/tutorial-two-java.html


우리가 사용할 예제는 RabbitMQ의 Java에 Work Queue부분이다.

한국 번역에서 조금 이상한 부분들도 있기에 몇가지 부분을 자세히 설명하려고한다.

반드시 전 포스팅을 보는걸 추천하는데 몇가지 부분은 다시 설명하도록 하겠다.


저번에는 구조를 만들었었다.


이번에는 위와 같은 구조를 만들어볼건데 거기에 몇가지를 더 설명하도록 하겠다.

먼저 사용할 코드는 아래와 같다.


아래에서 신나게 떠들어댈 Work Queue는 그냥 Queue다.

Producer는 메세지를 생성하는 녀석이고 Consumer는 메세지를 사용하는 녀석이다.

이녀석들은 하나의 독립적인 서버일 수도있고 아니면 어플리케이션의 말단일 수도 있다.






코드를 설명하면서 work queue에 대해서 같이 설명하면서 또한 세세한 부분도 설명하겠다.

아마 공식 홈페이지 예제 코드에 비해서 조금 많이 수정해서 당황스러울 수 있다.


Producer의

int TEST_MESSAGES = 4;

와 Consumer의

int TEST_Count = 3;
int TEST_TIME = 20000;
int TEST_qos = 1;

는 우리가 수정해 줄 변수이다. 그래서 예제동안 위의 변수는 바뀔 것이다.


일단 위의 예제에서 몇가지를 수정해보자.


int TEST_TIME = 200;

위의 시간을 200으로 수정을 해보자.

그러면 200ms이므로 0.2초만에 작업을 처리하게 될 것이다.


그 다음 RabbitMQ를 실행해준다.

필자의 경우 RabbitMQ의 포트와 아이디 비번을 모두 바꿨다.

안바꿨으면 각각 기본 정보는 아래와 같다.


RabbitMQ 기본 포트 - 5672

RabbitMQ 대시보드 포트 - 15672

RabbitMQ 아이디 - guest

RabbitMQ 비밀번호 - guest


필자 코드에서 포트와 아이디 비번을 입력하는 코드가 있다.


factory.setPort(35672);
factory.setUsername("uguest");
factory.setPassword("pguest");

만약 이게 기본 포트,아이디,비번이면 해당 소스부분은 생략해도 된다.

즉 안적어도 된다는 뜻이다.


이제 준비가 끝났다.


일단 Producer로 총 4개의 메세지를 만든다.



큐에 그럼 데이터가 4개가 적재 됬다.

이제 소비해보자.



Worker를 돌려주자.



그러면 큐에서 데이터를 꺼집어내서 소비하는걸 확인할 수 있다.

하지만 여러분은 이번 예제와 저번 예제의 차이점을 눈치채야한다.


이번에는 워커가 3개이다!


우리는 이번에 보면서 하나의 queue에 워커가 여러개일 때 어떻게 동작하는지 알 수 있다.


int TEST_Count = 3;
for (int i = 0; i < TEST_Count; i++) {
new MyThread(i).start();
}

위 코드로인해서 우리는 총 3개의 워커를 만들어서 돌렸다.

만약 워커의 갯수를 바꾸고 싶다면 변수를 바꿔주면된다.


DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");

System.out.println(num + " [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(num + " [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};

우리는 deliverCallback이 데이터가 전송됬을 때 작동하는 녀석이라는 것을 알고 있다.

위의 코드는 총 3가지 작업을 하는걸 확인할 수 있다.(이건 RabbitMQ가 하는게 아니라 필자가 짠거다.)


1. doWork를 하기 전

2. doWork

3. doWork를 한 후


System.out.println(num + " [x] Received '" + message + "'");

위의 코드는 doWork를 하기 직전에 출력할 것이다. 즉 1번작업이다.

num은 현재 작업하는 쓰레드의 번호이다.


doWork(message);

위의 코드는 doWork를 하는 동안이다. 사실 필자코드는 딱히 뭐하는건 없고 그냥 몇초 쉬는 것이다. 

2번 작업이다.


System.out.println(num + " [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

그리고 마지막으로 Done을 출력하고 확인 메세지를 보낸다.

num은 현재 작업하는 쓰레드의 번호이다.

이는 3번 작업이다.



다시 출력된 데이터를 보면 앞의 숫자는 쓰레드의 번호라고 하였다.

즉 어떤 쓰레들이 어떤 작업을 시작하고 끝냈는지를 알 수 있다.


여기서 우리가 알 수 있는 점은 작업은 워커의 갯수에 맞춰 라운드로빈(균등분배)하게 움직인다는 것이다.


int TEST_MESSAGES = 100;

이번에는 작업의 갯수를 아주 늘려보자. 100개를 돌려보자.



갯수가 적재되면 이제 컨슈머를 돌리자.



보면 알겠지만 0번 쓰레드, 1번 쓰레드, 2번 쓰레드가 번갈아가며 동작하는걸 확인할 수 있다.


Acknowlegement(승인)


RabbitMQ에서는 승인이라는게 아주 중요하다.

이를 놓치면 문제가 생기는데 코드와 DOCS들로 확인해보자.


channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

Consumer의 코드를 보면 위와같은 구절들이 있다.

위는 데이터를 수령하는 코드이고 아래는 데이터의 승인을 보내는 코드이다.


가장 간단하게 보면 Producer, Queue, Consumer의 관계는 위와 같다.

여기서 볼려는건 Queue와 Consumer사이의 승인이다.


Queue는 데이터를 Consumer에게 전송한다.

하지만 전송이 완료되는 순간은 Consumer가 Queue에게 Acknowlegement(승인)을 넘겨준 순간이다.

Queue는 Consumer에게 Acknowlegement를 받아야만 다음 데이터를 넘겨준다.


여기서 우리는 소비를 담당하는 basicConsume을 보도록하자.


보면 두번째 파라메터가 autoAck인걸 알 수 있다.

이 구문이 뭘 의미 하느냐?


이전 포스팅에도 비슷한 구문이 있었다.


channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> {

지난번 포스팅에서는 autoAck가 true였다. 그리고 승인을 날리는 코드는 존재하지 않았다.

여기서보면 위의 역활은 명확하다. basicConsume의 autoAck가 true우리가 코드를 따로 작성하지 않더라도,

자동으로 승인(Acknowlegement)를 날리게 된다.


한편 우리는 두줄이 저번 포스팅과 다르다.

channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

여기서 autoAck가 false면 우리가 직접적으로 반환코드를 작성해줘야한다.

바로 두번째 코드처럼 말이다. 반환종류는 세종류가 존재한다.


총 ack, nack, reject로 3개가 존재하는데 아래 두개는 거절하는거고 ack만 승인하는 것이다.

이 승인을 반드시 달아줘야하는데 달아주지 않을 경우 어떤일이 일어나는지 한번 보도록 하자.


int TEST_MESSAGES = 8;

Producer의 message갯수를 8개로 줄여서 테스트 해보자.


DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");

System.out.println(num + " [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(num + " [x] Done");
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};

basicAck구문을 아예 주석처리해서 보도록하자.



Done이 출력되긴 하는데 주석처리를 하지 않아서 그렇고 사실 Ack가 보내지진 않았다.

그리고 더이상 데이터를 수령하지 않는것을 볼 수 있다.


큐를 보면 Ready는 5개지만 Unacked가 3개인걸 확인할 수 있다.

이게 의미하는건 확인받아야하는 승인이 3개 남았다는걸 알 수 있다.

여기서 발생하는 심각한 문제는


1. Consumer는 더이상 데이터를 받지 못하고

2. Queue는 Unacked데이터들이 공간만 잡아먹고 있다.


그럼 이제 Consumer를 종료하거나 Connection을 끊어보고 대시보드를 보도록하자.


Connection이 끊어지면 이제 데이터를 다시 큐로 되돌린다.

그리고 다시 뿌려주게된다.


이런식으로 RabbitMQ는 데이터 손실을 막게된다.

즉 승인을 받지 못한 데이터는 아직 처리되지 않은걸로 간주해서 다시 큐에 넣는다.

그리고 다른 워커거 이걸 실행하게 된다.


여기서 승인을 해주는건 아주아주아주 중요하다.

일단 승인을 안해주면 consumer가 멈추게된다.

하지만 더 문제가 될 수 있는건 consumer가 승인을 보내지 않고 커넥션을 끊은후 다시 접속하는 경우이다.

이 경우 그 경우 데이터는 복구 되기 위해서 다시 큐에 적재되는데 이 경우 이미 처리한데이터를 또 처리되기 때문이다.

그렇기 때문에 승인을 반드시 해줘야한다.


메세지 영속성(HA, 내구성)


channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);



우리는 Queue를 선언할 때 저번 포스팅과는 다르게 두번째 파라메터를 true로 해줬다.

이 파라메터는 아주 중요한게 durable이라는 속성인데 이는 HA적으로 생각해도 중요하다.

이 경우 RabbitMQ가 종료되더라도 데이터를 디스크에써서 데이터를 유지하며

RabbitMQ가 다시 시작될 경우 디스크에서 불러와서 큐를 유지하게 된다.


지금 데이터가 위처럼 8개가 적재된 상태라고 가정하고 RabbitMQ를 종료해보자.

종료하는 방법은 두가지가 있는데 해당 포스팅을 참조하길 바란다.



필자는 서비스로 켰기에 서비스로 껐다.


종료가 된걸 확인하면 다시 키자.



필자는 서비스로 다시 켰다.


이 경우 데이터가 복구되어있는걸 확인할 수 있다.


channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

그럼 저 두번째 파라메터인 durable을 false로 바꿔서 돌려보자 .

이 설정은 당연히 Producer와 Consumer 둘다 해줘야한다.


channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);

위 처럼 바꿔보자.



바꾸고 실행할 때 위처럼 에러가 뜰 수 있다.

이는 성질이 다른 동명의 Queue가 존재하기 때문이다.

그래서 기존의 Queue를 삭제해야한다.

삭제하는 방법은 크게 3가지가 있다.



자 시간이 길었는데 테스트 해보자.



자 데이터가 다시 쌓였는데 이제 껐다 켜보자.



껐다 켜보자.


데이터가 소멸한걸 확인할 수 있다.



여기서 중요한건 영속성에 대한 것이다.

읽어보면 영속성설명에 fsync(2)라는 구절이 있다.

fsync(2)는 c 함수에 파라메터 2를 넣은것으로 disk에 무조건 write하란 뜻이다.


즉 disk에 쓰므로 지연이 발생하는데 이 지연을 최소화하기 위해서 유휴시간에 일종의 일괄처리를 한다고 한다.


그래서 주의사항을 읽어보면 HA를 완벽하게는 구성하지 않는다고 한다.

당연하다 일종의 배치처리를 하니까 이를 구성하기 위해서는 더 낮은단계까지 내려가야하며

이는 라이브러리(여기서 말하는 라이브러리는 언어와 플랫폼)에 따라 차이가 있다고한다.

이를 주의해서 사용해야한다.


영속성 요약


1. 영속성은 fsync(2)를 호출해서 데이터를 디스크에 쓴다.

2. 따라서 자주 발생하면 성능 하락이 발생, 그래서 일괄(batch)처리를 한다.

3. 일괄처리 특성상 디스크에 쓰지 못하는 데이터가 발생할 수 있다.

4. 이 HA를 높이고 싶으면 더 Row레벨까지 핸들링해야한다.



공정(Fair)한 데이터 분배



데이터를 분배할 때 RabbitMQ는 해당 Consumer가 바쁜지 안바쁜지 모르고 데이터를 분배하게 될것이다.

이는 Connection이 유지되있다면 보내게 될건데 이 경우 바쁜녀석의 버퍼에 계속 쌓이게되는 경우가 존재한다.

이를 위해서 존재하는게 바로 prefetch count라는 개념이다.


prefetch란 예비추출(선출)이라는 뜻인데 실제 영어에서는 안쓰고 컴퓨터에서 사용하는 pre + fetch 합성어이다.

그 카운트가 의미하는것은 "이녀석은 한번에 1개의 데이터만 처리 가능"이라는 것이다.

이를 설정하는 함수는 여러분이 봤던 함수이다.


int TEST_qos = 1;
channel.basicQos(TEST_qos);

이 때 썻던 함수는 basicQos이다.




QoS는 위에서 보면 알겠지만 quality of service로 HA와 관련되있는걸 알 수 있다.

즉 1로 지정하면 무슨일이 있어도 데이터는 1개씩 받게된다.

2로 지정하면 데이터를 두개씩 받게 된다.

정말 그런지 확인해보자.


int TEST_qos = 2;

이를 2로 바꾸고 시작해보자.



다시 Queue에 데이터를  4개를 넣어준다.



그러면 1쓰레드가 동작하지 않고 0, 2쓰레드, 즉 2개의 Consumer만 동작한걸 확인할 수 있다.

그 이유는 동시에 데이터를 2개씩 처리할 수 있기 때문에 1번 쓰레드한테 주기전에 다른 녀석에게 줘서 그렇다.

+ Recent posts