티스토리 뷰

공부흔적

RabbitMQ로 객체 전송하기

주디 𝙹𝚞𝚍𝚢 2021. 6. 16. 00:09

 입사한 회사의 곧 개발 시작할 프로젝트에서 RabbitMQ를 사용한다고 해서 RabbitMQ를 공부하고 있다. 간단한 예제를 찾아보니 문자열을 보내는 경우는 있는데, 객체를 전송하는 경우는 잘 안 보이는 것 같아서 간단히 정리해둔다. (RabbitMQ를 아예 처음 마주하는 나와 같은 조무래기 개발자들에게 도움이 되길)

 본 포스팅은 https://oingdaddy.tistory.com/166를 참고해서 객체 전송하는 기능으로 변경한 내용을 정리한 것이다. RabbitMQ에 대해 아예 이해가 없는 상태라면 위 링크를 먼저 보고 문자열 전송까지 완료한 후 본 포스팅을 참고하는 것이 도움이 될 것이다.


1. RabbitMQ 설치(Docker 이용)

 그냥 사용하고 있는 컴퓨터에 바로 RabbitMQ를 설치해도 되고, VM을 이용해도 되지만 난 예제용으로 쓸 생각이었기 때문에 도커를 이용했다.

docker run -d --name rabbitmq -p 5672:5672 -p 8080:15672 --restart=unless-stopped -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:management

// RabbitMQ 계정 : admin
// 비밀번호 : admin

 위 명령어를 실행하면 RabbitMQ컨테이너가 실행된다.

 

2. Exchange, Queue 생성 및 Binding 설정

localhost:8080

// 아이디, 비밀번호 모두 admin

왼쪽부터 Exchange, Queue 생성, Binding 설정

 이미지의 [Add a new exchange], [Add a new queue], [Add binding from this exchange]를 눈여겨보면 된다. [Add binding from this exchange]의 경우, [Exchanges] 메뉴를 선택해서 자신이 만든 exchange의 이름을 선택하면 뜨는 상세화면에서 볼 수 있다.

3. PublisherApp, ConsumerApp 생성

start.spring.io에서의 생성

 프로젝트는 Spring Starter Project로 만드는데, STS에서 [Spring Starter Project]를 선택해서 만들어도 되고, start.spring.io에서 만들어도 상관없다.

4. ConsumerApp 설정

// ConsumerApp의 application.properties

server.port=9080

# RabbitMQ와 연동을 위한 기본적인 설정
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
// ConsumerApp의 Config.java

package com.rabbitmq.example.config;

import com.rabbitmq.example.model.Student;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

@Configuration
public class Config {

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new MessageConverter() {
            @Override
            public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
                return null;
            }

            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))) {
                    return (Student)ois.readObject();
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
        });

        return factory;
    }

}
package com.rabbitmq.example.model;

import java.io.Serializable;

// PublisherApp에서 ConsumerApp으로 전송할 객체
public class Student implements Serializable {

    private static final long serialVersionUID = 6529685098267757690L;
    private long id;
    private String name;
    private int age;
    private String address;
    private String tel;

    public Student(long id, String name, int age, String address, String tel) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.address = address;
        this.tel = tel;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", address='" + address + '\'' +
                ", tel='" + tel + '\'' +
                '}';
    }
}
package com.rabbitmq.example;

import com.rabbitmq.example.model.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SampleListener {

    private static final Logger log = LoggerFactory.getLogger(SampleListener.class);

    // 아래는 String 타입의 메세지를 전송받을 때 사용하는 코드
    // 본 포스팅에서는 객체 전송을 다루고 있으므로 주석처리했다.
//    @RabbitListener(queues = "sample.queue")
//    public void receiveMessage(final Message message) {
//        log.info(message.toString());
//    }

    // 아래는 객체를 전송받을 때 사용하는 코드
    @RabbitHandler
    @RabbitListener(queues = "sample.queue")
    public void receiveMessage(Student student) {
        log.info(student.toString());
    }

}

5. PublisherApp 설정

// PublisherApp의 application.properties

server.port=9081

# RabbitMQ와 연동을 위한 기본적인 설정
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
package com.rabbitmq.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;

public class SampleConfig {

    private static final String EXCHANGE_NAME = "sample.exchange";
    private static final String QUEUE_NAME = "sample.queue";
    private static final String ROUTING_KEY = "sample.ara.#";

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    Queue quque() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

}
package com.rabbitmq.example.model;

import java.io.Serializable;

public class Student implements Serializable {

    private static final long serialVersionUID = 6529685098267757690L;
    private long id;
    private String name;
    private int age;
    private String address;
    private String tel;

    public Student(long id, String name, int age, String address, String tel) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.address = address;
        this.tel = tel;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", address='" + address + '\'' +
                ", tel='" + tel + '\'' +
                '}';
    }
}
package com.rabbitmq.example;

import com.rabbitmq.example.model.Student;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SampleController {

    private static final String EXCHANGE_NAME = "sample.exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;

    // 아래의 코드는 문자열 전송시 사용하는 코드이다.
    @GetMapping("/sample/queue")
    public String samplePublish() {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "sample.ara.#", "RabbitMQ + Springboot = Success!");
        return "message sending!";
    }

    // 객체 메시지큐 테스트
    @GetMapping("/sample/student")
    public String objectPublish() {

        Student student = new Student(1, "홍길동", 20, "서울특별시 강남구", "010-1234-5678");

        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "sample.ara.#", student);
        return "object sending!";
    }

}

 PublisherApp과 ConsumerApp 코드는 여기에 올려뒀다.

6. 객체 전송 테스트

테스트를 하기 위해서는 메시지를 보낼 PublisherApp, 최종적으로 메시지를 받을 ConsumerApp, 중간에 메시지를 저장할 RabbitMQ 컨테이너가 모두 동작중이어야 한다.

localhost:9081/sample/student

// 9081은 PublisherApp의 포트번호

 위의 url로 접근하면 ConsumerApp 콘솔에 객체 정보가 뜨는 걸 확인할 수 있다.


RabbitMQ로 객체를 전송하기 위해서는 직렬화를 알아야 한다

 사실 이렇게 포스팅으로 정리하려고 생각하진 않았는데, 이 포스팅을 하게 된 이유가 있으니... 바로 직렬화때문이다. 직렬화에 대해서는 아래를 참고.

자바 직렬화, 그것이 알고싶다. 훑어보기편(우아한 형제들 기술 블로그)

자바 직렬화, 그것이 알고싶다. 실무편(우아한 형제들 기술 블로그)

 PublisherApp에서 직렬화를 거친 객체가 큐에 들어가고, 큐에서 나온 객체는 ConsumerApp에 전달되고 역직렬화를 거쳐야 우리가 아는 객체의 모습을 볼 수 있다. Student 클래스가 Serializable을 구현한 것이 그 이유이다. PublisherApp과 ConsumerApp의 Student 클래스가 Serializable을 구현하면 할 게 한 가지 더 있다. 기본으로 사용하는 SimpleMessageConverter는 메시지를 보낼 때 object를 바이트배열로 직렬화하는데, 이 직렬화된 데이터를 다시 원래대로 되돌리는 역직렬화를 해주도록 바꿔야 한다는 것이다. 그래서 역직렬화를 할 수 있도록 MessageConverter를 커스터마이즈한 부분이 ConsumerApp의 Config이다.

 본 이미지의 출처는 https://www.programmersought.com/article/6243566022/ 이다. JSON을 이용한 직렬화, 역직렬화 정보도 있으니 참고하면 좋을 것 같다.

 직렬화만 되면 객체로 바꿀 수 있는 것인가하면 그렇지 않다. 직렬화만 설정해주고 PublisherApp에서 객체를 보내게 되면 serialVersionUID가 맞지 않는다는 에러를 보게 될 것이다. 직렬화 - 역직렬화 시에 serialVersionUID를 키로 객체의 호환을 따지는데(같은 객체인지 보는 것) 이 값이 다르기 때문이다. 이 값은 클래스 구조정보에 따라 값을 생성하기 때문에 클래스 정보가 달라지면 값도 달라진다고 한다. 값을 지정해주지 않을 경우, 컴파일 환경에 따라 이 값이 변할 우려가 있기 때문에 사전에 지정해서 선언해주는 것이 좋다고 한다. (출처 : https://hevton.tistory.com/164) 그렇기 때문에 Student 클래스에서 아래와 같은 코드가 추가된 것이다.

private static final long serialVersionUID = 6529685098267757690L;

 

300x250
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함