티스토리 뷰

 

 

로컬 호스트 서버에서 댓글 서비스의 웹훅을 사용해야되는데 문제가 생겼다.

 

웹훅을 사용하기 위해 서버의 URL을 입력해야하는 상황이었는데, localhost를 입력할 수는 없으니 말이다.

 

그래서 중간다리로 AWS SQS를 써보기로 했다.

 

파이프라인은

 

댓글 서버 웹훅 -> AWS Lambda -> AWS SQS -> local(SQS Listener) 구조로 잡고 구현을 시작했다.

 

들어가기전에 SQS가 뭔지 짚고 넘어가자.

 

Amazon Simple Queue Service (Amazon SQS)란?

Amazon Simple Queue Service (Amazon SQS)는 분산 소프트웨어 시스템 및 구성 요소를 통합 및 분리할 수 있는 안전하고 안정적이며 사용 가능한 호스팅 대기열을 제공합니다. Amazon SQS는 데드 레터 큐 및 비용 할당 태그와 같은 일반적인 구조를 제공합니다. AWS SDK가 지원하는 모든 프로그래밍 언어를 사용하여 액세스할 수 있는 일반 웹 서비스 API를 제공합니다.
- AWS SQS 공식사이트

 

SQS의 기능을 요약하면 이름 그대로 단순한 대기열(큐) 서비스이다.

 

텍스트부터 바이너리 형식의 데이터를 문자열로 인코딩해서 주고 받을 수 있다.

 

SQS와 다른 메시지 큐(카프카, RabbitMQ)와의 차이점은 Subscribe/Publisher 방식은 사용하지 못하고, Point to Point 형태를 갖는다는 것이다.(카프카, RabbitMQ는 둘다 지원)

 

출처 : https://starplatina.tistory.com/entry/JMS-with-spring-1

 

즉, SQS는 하나의 Consumer(소비자)를 갖고 그 consumer가 데이터를 소비하도록 만드는 구조다.

 

물론, 두 개의 Consumer를 갖게끔 할 수 도 있지만 그럼 한 쪽은 데이터를 받았다 못받았다하게 된다.

 

이름 그대로 단순하게 구현된 것 같다.

 

SNS+다중SQS를 이용한다면 Subscribe/Publisher 방식으로도 구현이 가능하다고 한다.

 

아직 해보진 않았지만, 구조상으로도 가능할 것 같다.

 

각설하고, 내가 사용할 기능은 SQS의 SqsListener라는 기능인데, SQS에 데이터가 들어오면 그걸 캐치해서 서버로 가져오는 기능이다.

 

바로 진행해보자.

 

AWS 큐 만들기

SQS 홈으로

대기열 생성 클릭

 

별다른 설정은 건드리지 않고 그대로 만들어도 별 문제는 없다.

 

각각 사항들은 [정보] 를 클릭하면 알아볼 수 있는데, 간단히 요약해보면

 

표시 제한 시간 : 대기열에서 수신한 메시지가 다른 메시지 소비자에게 보이지 않게 되는 시간

메시지 보존 기간 : Amazon SQS가 삭제되지 않은 메시지를 보관하는 기간

전송 지연 : 메시지를 처리하는 데 추가 시간이 필요한 경우 대기열에 들어오는 각 새 메시지의 전송을 지연

최대 메시지 크기 : 대기열의 최대 메시지 크기(256kb)

메시지 수신 대기 시간 : 폴링이 메시지를 수신할 수 있을 때까지 대기하는 최대 시간

 

암호화, 액세스 정책 등은 따로 건드리지 않았다.

 

그대로 생성하면 바로 생성이 된다.

인증 키 생성하기

인증 키 생성 방법에서 확인

 

클라이언트에서 별도의 설정을 하지 않을 거라

스프링부트에서 SQS SDK 2.0 사용하기

사용환경 

SpringBoot 3.0.5

Java 17

 

build.gradle

dependencyManagement {
    imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:2022.0.4" }
}

implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.1")
implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs'

특이한 점은 SqsListener 만을 사용하는거면, 디폴트로 설정된 클라이언트를 사용하는 클라이언트를 자동으로 생성하는지

 

Configuration 정의가 필요하지 않다.

 

사용법은 매우 간단하다.

@SqsListener(value = "dan-test-queue")
public void SqsListener(@Payload String payload){
    System.out.println(payload);
}

value에 내가 생성한 큐 이름을 넣으면 끝이다.

 

SDK 1.0에서는 delation Policy라고 삭제정책이 있었던거 같은데 여긴 못 찾음..

 

실제로 사용한건 여기 까지 기능인데, 이렇게만 작성하면 뭔가 아쉬우니 큐에 직접 엑세스해서 데이터를 올리고 내려봤다.

 

여기서부터는 클라이언트가 필요하다.

 

Configuration

@Configuration
public class AwsClient {
    @Bean
    SqsClient sqsClient() {
        return SqsClient.builder().build();
    }
}

 

SQS에 데이터 보내기

@PostMapping("/sqs")
public void SqsSend() {
    CreateQueueRequest request = CreateQueueRequest.builder()
                                                   .queueName("dan-test-queue")
                                                   .build();
    sqsClient.createQueue(request);

    GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder()
                                                           .queueName("dan-test-queue")
                                                           .build();

    String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl();

    String exampleMessage = "이게 될까요?";
    SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
                                                          .queueUrl(queueUrl)
                                                          .messageBody(exampleMessage)
                                                          .delaySeconds(5)
                                                          .build();

    System.out.printf("메시지를 보내자. %s, %s%n", exampleMessage, LocalDateTime.now());
    sqsClient.sendMessage(sendMsgRequest);
}

리스너도 조금 바꿔줬다.

@SqsListener(value = "dan-test-queue")
public void SqsListener(@Payload String payload){
    System.out.printf("리스너에서 받았다 : %s, %s%n", payload, LocalDateTime.now());
}

요청을 날려보자.

 

아주 잘된다.

 

실제로 사용하진 않았지만, SQS에서 리스너를 통해 받는게 아니라 그냥 요청을 통해 받을 수도 있다.

 

SQS에 데이터 받기

@GetMapping("/sqs")
public void sqsGet() {
    GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder()
                                                           .queueName("dan-test-queue")
                                                           .build();

    String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl();

    ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
                                                                       .queueUrl(queueUrl)
                                                                       .maxNumberOfMessages(5)
                                                                       .build();

    ReceiveMessageResponse messageResponse = sqsClient.receiveMessage(receiveMessageRequest);
    if(messageResponse != null) {
        String message = messageResponse.messages()
                       .get(0)
                       .body();
        System.out.printf("요청을 통해 메시지를 받았다. %s, %s%n", message, LocalDateTime.now());
    } else {
        System.out.println("아직 큐에 데이터가 안들어옴");
    }
}

큐에 데이터를 넣고, 너무 빠른 Recieve 요청을 보내면 익셉션이 발생하니 익셉션 제어를 위한 코드도 넣었다.

 

아무튼 결과를 보면 잘 나온다.

 

 

마치며

처음 사용할땐 spring cloud messaging 에서 제공하는 sqs sdk 1.0를 사용했는데,

 

이걸 사용했을 때 서비스에선 문제가 발생했었다. 이건 다음에 정리하도록 하겠다.

 

늘 짧고 간단하게 쓰려고하는데 잘 안되는 것 같다.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함