강의 정리

[Redis & Kafka] Redis와 kafka를 활용한 선착순 이벤트 시스템 개발(Feat. 인프런 강의)

Emil :) 2024. 1. 26. 23:55
728x90
반응형

서론


최근 신입 & 주니어 백엔드 엔지니어에게 사용해본 오픈소스 중 경험이 있다면 좋다고 생각 되는 것이 Redis나 Kafka라고 생각한다.

왜냐하면, 백엔드 엔지니어에게 중요하게 요구되는 사항 중 하나가 바로 '대용량 트래픽 핸들링 경험' 인데, 관련 업무를 수행하기 위해 거의 필수적으로 사용되는 것이 Redis와 Kafka이기 때문이다.

특히 Kafka의 경우, 최근 많은 회사에서 도입하거나, 기존에 사용하면서 인기도가 많이 올라간 것이 체감된다.
면접을 볼 때나, 공고에 걸린 JD를 볼 때도, Kafka 경험이 있으면 우대사항이 있더라.

그래서 평소에도 Redis나 Kafka에 관심이 많은 상태였는데, 저번 주에 봤던 면접에서 Kafka 사용 경험과, 내가 이전 직장에서 담당했던 선착순 이벤트에 대해 질문이 들어왔다.

선착순 이벤트는 오라클 시퀀스를 활용해서 개발을 진행했는데, 다른 방식으로는 없을까요?

라는 질문을 면접관님이 해주셨고, 평상시에 알고 있던 대로 Redis를 활용해서 하는 방법이 있다고만 답변했다. (사용은 못해봄)

면접에서 질문도 나왔겠다, 평상시에 궁금도 했겠다. 이번 기회에 간단하게 사용해보면서 어떤 느낌인지 파악해보기 위해 인프런에 있는 해당 강의를 보며, 간단하게라도 찍먹해보기로 했다.

이번에 간단하게 찍먹해보고 사용법을 익힌 후에, 현재 부트캠프에서 개발중인 프로젝트에 부하테스트를 진행한 뒤에, 프로젝트에 도입 후 전후 부하 차이를 확인해볼 예정이다.

포스트를 작성하면서, Redis와 Kafka에 대한 이해를 돕기 위해 짤막짤막하게 설명하도록 하겠다.

전체 코드는 여기

참고


https://www.inflearn.com/course/%EC%84%A0%EC%B0%A9%EC%88%9C-%EC%9D%B4%EB%B2%A4%ED%8A%B8-%EC%8B%9C%EC%8A%A4%ED%85%9C-%EC%8B%A4%EC%8A%B5#

 

실습으로 배우는 선착순 이벤트 시스템 강의 - 인프런

선착순 이벤트 시스템을 구현할 때 어떤 문제가 발생할 수 있고 어떻게 해결할 수 있는지 배워봅니다., 선착순 이벤트 시스템도 자신있게! 예제를 통해 실전 감각을 잡아보세요.  [임베딩 영상]

www.inflearn.com

본론


환경 세팅

프로젝트를 진행하면서 기존에 도커가 깔려있었으므로, 데이터베이스 추가 작업만 진행해줬다.

도커를 깔고 MySQL 이미지 받고 이런 내용들은 패스하도록 하겠다. (간단한거라..)

create database coupon_example

그 다음, build.gradle과 application.yml을 세팅해주자.

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.2.2'
    id 'io.spring.dependency-management' version '1.1.4'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
    sourceCompatibility = '17'
}

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-redis'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'com.mysql:mysql-connector-j'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

tasks.named('test') {
    useJUnitPlatform()
}
spring:
  jpa:
    hibernate:
      ddl-auto: create
    show-sql: true
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/coupon_example
    username: root
    password: root1234

  data:
    redis:
      port: 63799

 이렇게만 해주면 세팅은 끝이다.

간단한 발급 로직 구현

이 강의에선 테스트코드를 통해 간단한 예제를 구현했다. (먼가 신기했음)

먼저 엔티티를 만들어준다.

@Entity(name = "coupon")
@Getter
public class Coupon {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "id", nullable = false)
    private Long id;

    private Long userId;

    public Coupon() {
    }

    public Coupon(Long userId) {
        this.userId = userId;
    }
}

 그다음 서비스도 만들어주자

@Service
@RequiredArgsConstructor
public class CouponService {

    private final CouponRepository couponRepository;

    public void apply(Long memberId) {
        long count = couponRepository.count();

        if (count > 100) {
            return;
        }

        couponRepository.save(new Coupon(memberId));
    }
}

레파지토리도 만들어주자

public interface CouponRepository extends JpaRepository<Coupon, Long> {
}

이제 서비스 테스트코드를 만들어준다.

@SpringBootTest
class CouponServiceTest {

    @Autowired
    private CouponService couponService;

    @Autowired
    private CouponRepository couponRepository;
}

 이렇게 하면 밑작업은 완료가 되었다. 먼저 하나의 쿠폰을 한번만 응모했을 때의 테스트코드를 작성해보자.

@Test
void 한번만응모() {
    couponService.apply(1L);
    long count = couponRepository.count();

    System.out.println("count = " + count);
    assertThat(count).isEqualTo(1L);
}
잘 된다.

이제 1000개의 요청이 들어오고, 100개의 쿠폰만 발행해야 하는 로직을 작성해보자.
ExecutorService와 CountDonwLatch는 비동기로 로직을 처리하기 위한 자바 클래스이다.

@Test
void 여러번응모() throws InterruptedException {
    int threadCount = 1000;

    // 32개의 쓰레드 생성
    ExecutorService executorService = Executors.newFixedThreadPool(32);

    // 1000개의 작업이 끝나야 다음으로 진행할 수 있도록 하는 장치의 클래스
    CountDownLatch latch = new CountDownLatch(threadCount);

    /*
    1. executorService.submit() 은 비동기로 수행,
    2. 메인스레드는 이 모든 latch 작업이 끝나기 전까지 대기 상태에 빠짐
    3. 이후 count가 0이되면 메인 스레드를 깨워 남은 로직 수행
     */

    for (int i = 1; i < 1000; i++) {
        long memberId = i;
        executorService.submit(() -> {
            try {
                couponService.apply(memberId);
            } finally {
                // 1000부터 줄여나가서 0이 되면 메인 스레드를 대기상태에서 해제한다.
                latch.countDown();
            }
        });
    }

    // 메인 스레드를 대기상태로 전환
    latch.await();

    long count = couponRepository.count();
    System.out.println("count = " + count);
    assertThat(count).isEqualTo(100);


}

 이렇게 하면 count가 어떻게 되야할까? 당연히 100명이 나와야겠지만, 답은 그렇지 않다.

120개가 발급이 되었다.

실제로는 120개가 발급이 된걸 확인할 수 있다. 이렇게 되면 20개의 쿠폰이 초과 발급되어, 회사에 손실이 발생한 상황이다.

원인 분석

왜 그럴까? 흔히들 아는 동시성 이슈 때문이다.
여러 개의 스레드가 하나의 자원에 동시에 접근하기 때문에, 발생한 것이다.
그림으로 정리해보면 다음과 같다.

예제는 스레드와 db간의 관계지만, 실제로는 클라이언트-서버-db 의 구조를 많이 띄고있고,
실제로 내가 업무에서 맞닥뜨린 문제 또한 api단에서 발생한 이슈였으니 클라이언트-서버-db의 형태로 알기쉽게 설명하겠다. 

먼저 정상적인 흐름은 다음과 같다. 

클라이언트가 서버에게 쿠폰을 요청하면, 서버는 db에 쿠폰이 남아있는지 체크하고, 남아있다고 확인이 되면 쿠폰을 하나 발급해준다.
클라이언트에게 정상적으로 쿠폰 발급이 완료되고 나서야 DB에 잔여 쿠폰량이 반영된다.

이 때, 문제가 되는 상황은 다음과 같다.

하나가 남은 상황이라고 가정해보자.
보통 서버가 DB 데이터를 업데이트할 때는 트랜잭션 처리가 적용되어있다.
그런데, 처음에 온 사람의 요청을 마저 처리하기 전에 다른 사람의 요청이 들어와버린다면, 서버가 DB에게 잔여 쿠폰의 개수를 확인을 완료하기도 전에 요청에 대해서도 추가 발급 처리를 해준다.

왜? 아직 DB 입장에선 처음 발급 요청한 사람의 발급 처리가 완료되지 않았기 때문이다.

따라서 쿠폰을 추가 발급해주는 상황이 연출되는 건데, 이제 이걸 Redis를 사용해 막아보자.

Redis 환경 설치

도커를 활용해서 설치해주자.

docker pull redis
docker run --name redis -d -p 63799:6379 redis

앞에서 application.yml 은 설정해줬으니 생략하겠다.

Redis는 싱글스레드 기반의 In-memory DB이다. 따라서, 메모리를 사용하기 때문에 속도가 굉장히 빠르고, 싱글 스레드 기반이기 때문에 Race condition으로부터 자유롭다.
따라서 이런 동시성 이슈나, 캐싱처리를 할 때 주로 사용된다.

Redis 로직 구현

@Repository
@RequiredArgsConstructor
public class CouponCountRepository {

    private final RedisTemplate<String, String> redisTemplate;

    public Long increase() {
        return redisTemplate
                .opsForValue()
                .increment("coupon_count");
    }
}

자바에서 Redis는 RedisTemplate이나 RedisRepository를 이용한다고 한다. 두 개의 차이는 해당 블로그를 참고해보자.
간단히 말하자면 RedisTemplate은 좀 더 세밀한 컨트롤을 요할 때, RedisRepository는 간단하게 사용할 때 사용된다고 한다.

기존엔 RDB(MySQL)에서 갯수를 확인했는데, 이젠 Redis가 확인하도록 로직을 수정해주자.

@Service
@RequiredArgsConstructor
public class CouponService {

    private final CouponRepository couponRepository;
    private final CouponCountRepository couponCountRepository;

    public void apply(Long memberId) {
        Long count = couponCountRepository.increase();

        if (count > 100) {
            return;
        }

        couponRepository.save(new Coupon(memberId));
    }
}

이제 Redis가 적용된 상태로 테스트코드를 실행해보자. 실행 중인 Redis docker container에 접속해 다음 명령어를 때려보자.

docker ps
docker exec -it [PID] redis-cli
flushall

flushall은 생성된 key:value 맵을 모두 삭제하는 명령어이다. 참고로 Redis의 자료형은 SortedMap이다.
이제 기존에 작성했던 여러명응모() 테스트코드를 실행시켜보면..

와우. 잘된다.

왜 이게 가능한걸까?

요청을 받아올 땐 멀티스레드로 받아오지만, 쿠폰을 apply() 시키는 부분에 있어선 Redis를 활용해 싱글스레드로 작업하기 때문이다.
따라서 이러한 동시성 이슈가 발생하지 않는것이다.

그렇다면, 여기서 문제가 또 발생한다.
만약, 1000번의 요청이 아니라 100만번이라던지, 엄청나게 큰 부하가 걸린다면 어떻게 될까?

뭘 어떻게 되겠는가, 당연히 터지겠지.
이제 Kafka를 사용해서 대용량 트래픽에 대한 핸들링이 가능하도록 수정해보자.

docker compose로 여러 컨테이너 핸들링하기

갑자기 뜬금없이 웬 도커 컴포즈? 라고 생각하겠지만, 강의에서 이렇게 진행하니 그러려니 하자.
안그래도 프로젝트에서 도커 컴포즈로 컨테이너 올리려고 했어서, 이것도 찍먹해보니 일석이조였다.

아무튼 도커 컴포즈는 docker-compose.yml 이라는 파일 하나로 컨테이너를 어떤걸 생성하고, 어떻게 실행할건지에 대한 명세를 스크립트로 작성하고, 한번에 실행시켜주는 프로그램이다.
즉, 여러 개의 컨테이너를 쉽게 컨트롤할 수 있도록 도와주는 기능이다.

도커 desktop이 깔려있으면 기본적으로 docker compose도 깔린다. 확인해보자.

docker-compose -v

이제 로컬 아무곳에나 docker-compose.yml 이라는 파일을 만들자.
주키퍼와 카프카 컨테이너를 실행시키는 구문이다.

나는 그냥 프로젝트 루트에 만들었음.

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

 그리고 실행시켜보자.

docker-compose up -d

아까 올린 mysql, redis와 더불어 잘 돌아가고 있다.

Kafka 개념 톺아보기

Kafka는 분산 스트리밍 플랫폼이다. 말이 어려운데, 그냥 "큐를 하나 만들어서 요청을 순차적으로 처리해주는 메세지 큐" 정도로 이해하자.
클라이언트의 요청이 서버에 다다르기 전에 들렀다 가는 곳이다.
비슷한 메세지큐로 대표적인 RabbitMQ가 있는데, 용량이나 처리속도 측면에서 Kafka가 훨씬 더 좋다고 한다.

그래서 이걸 왜 쓰는걸까?
메세지 큐를 사용하면 발신자와 수신자가 서로를 직접 알 필요가 없어지기 때문에 결합도가 느슨해진다.
또한, 서버가 장애가 발생해서 응답을 하지 못하더라도, 클라이언트의 요청이 메세지큐에 남아 있기 때문에 요청이 유실되지 않는 '보장성' 을 갖는다. 메세지 큐는 여러 서비스가 복합적으로 얽혀있는 MSA 환경에서 빛을 발한다.

비동기 통신에도 매우 유리한데, 클라이언트는 메세지 큐에 요청을 담아놓고 다른 로직을 수행하면 되고, 서버도 다른 행동을 하다가 메세지 큐에서 요청을 꺼내쓰면 되기 때문이다.

메세지 큐는 Point to Point 와 Pub / Sub 모델로 구분할 수 있는데, 우리가 사용할 Kafka는 Pub/Sub 모델이다.
Point to Point는 점대점, 즉 전송 대상이 한명.
Pub/Sub는 전송 대상이 다수이다.

Kafka의 대표적인 개념은 Producer(생산자) 와 Consumer(소비자), Topic이 존재한다.
Producer가 클라이언트, Consumer가 서버, Topic이 요청라고 생각하면 이해가 쉽다.
Zookeeper는 컨슈머와 통신하고, 카프카의 메타데이터를 정보해서 카프카의 상태관리에 사용되는 서비스이다.
전반적인 아키텍처는 다음과 같고, 추가적인 개념학습은 다른 포스팅에 더 잘되어있으니, 참고하도록 하자.

Kafka 테스트해보기

이제 터미널을 열고 테스트를 해보자. 먼저 테스트 토픽을 하나 만들어주자.

docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic testTopic
// Created topic testTopic

그리고 Producer와 Consumer를 생성해주자. 각각 다른 터미널에서 실행해줘야 한다. 

docker exec -it kafka kafka-console-producer.sh --topic testTopic --broker-list 0.0.0.0:9092

docker exec -it kafka kafka-console-consumer.sh --topic testTopic --bootstrap-server localhost:9092

 이제 Producer 터미널에서 아무거나 입력해보자, 토픽이 생성되고, Consumer가 수신하는 것을 볼 수 있다.

위가 프로듀서, 아래가 컨슈머

자바에 적용하기

이제 기본적인 사용법은 알았으니, 코드에 적용해보도록 하자.

build.gradle에 추가해주자.

implementation 'org.springframework.kafka:spring-kafka'

KafkaTemplate을 반환해주는 Config를 하나 만들어준다.

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Long> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Long> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

쿠폰 발급을 요청하는 프로듀서도 생성해주자.

@Component
public class CouponCreateProducer {

    private final KafkaTemplate<String, Long> kafkaTemplate;

    public CouponCreateProducer(KafkaTemplate<String, Long> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void create(Long userId) {
        kafkaTemplate.send("coupon_create", userId);
    }
}

서비스를 수정해줬다.
기존엔 repository.save를 사용해서 db에 저장했지만, 이제는 프로듀서의 요청을 생성해준다.

@Service
@RequiredArgsConstructor
public class CouponService {

    private final CouponRepository couponRepository;
    private final CouponCountRepository couponCountRepository;
    private final CouponCreateProducer couponCreateProducer;

    public void apply(Long memberId) {
        long count = couponCountRepository.increase();

        if (count > 100) {
            return;
        }

        couponCreateProducer.create(memberId);
    }
}

자바 코드의 컨슈머를 구현하기에 앞서, 터미널에서 토픽을 생성 후, 자바 프로듀서로부터 생성된 요청을 터미널을 통한 컨슈머로 확인해보자.
먼저 토픽을 생성해준다.

docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic coupon_create

그리고 이 토픽을 받아올 컨슈머를 생성해준다.

docker exec -it kafka kafka-console-consumer.sh --topic coupon_create --bootstrap-server localhost:9092 --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"

그런데.. 여기서 뭔가 막힌다. 강의에선 잘만되는데.. 확인을 해보자

정상적으로 수행된다면 컨슈머에 count값들이 찍히게 된다.

에러 분석 (정상적으로 확인이 된다면 패스해도 좋음)

먼저, docker의 kafka로 접속해 토픽이 생성됐는지 확인해보자.

docker exec -it kafka /bin/bash
cd /opt/kafka
bin/kafka-topics.sh --list --zookeeper localhost:2181

내 경우는 카프카의 토픽을 가져오지 못했다. 타임아웃 에러가 발생함.
그래서 그냥 내가 만든 토픽을 지우고 다시 생성해봤다.

bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic coupon_create
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic coupon_create

 

이렇게 아예 토픽을 지워버리고 재생성 하니 정상적으로 돌아왔다.
만약 해결이 안된다면 그냥 도커 컨테이너를 밀어버리고 다시 docker-compose up -d 로 컨테이너를 올려주자.

자바 단에서의 컨슈머 구현

먼저 컨슈머 설정클래스를 작성해주자. 프로듀서와 매우 유사하다.

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Long> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");              // 브로커 주소
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");                              // 그룹 명시
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Key 직렬화 명시
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); // Value 직렬화 명시

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

그리고 컨슈머도 구현해주자.
일반적인 코드지만 @KafkaListener 어노테이션을 달아줌으로써 토픽으로부터 메시지를 수신하게 된다.

@Component
@RequiredArgsConstructor
public class CouponCreatedConsumer {

    private final CouponRepository couponRepository;
    private final Logger logger = LoggerFactory.getLogger(CouponCreatedConsumer.class);

    @KafkaListener(topics = "coupon_create", groupId = "group_1")
    public void listener(Long userId) {
        try {
            System.out.println("memberId = " + userId);
            couponRepository.save(new Coupon(userId));
        } catch (Exception e) {
            logger.error("failed to create coupon::" + userId);
        }
    }
}

이제 테스트코드를 실행해보자. 실행 전 이전 테스트코드에서 redis를 통해 키가 생성되어있는 상태이다. flushall을 통해 redis를 날리고 테스트코드를 돌려주자.

이런식으로 카프카를 메시지를 수신하고, 잘 들어가는것을 확인할 수 있다.

그런데 여기서 함정. 이 테스트코드는 원래 아래와 같이 실패해야 정상이다.

뭘 바꿨길래 안되는걸까?

바로 카프카의 동작시간보다 테스트코드의 속도가 더 빨랐기 때문이다.
프로듀서 - 토픽 - 컨슈머 - DB 까지 흘러가는 시간이 5초라고 가정한다면,
테스트코드는 2초만에 실행이 끝나버려서, DB에 저장이 끝나기 전에 테스트코드가 종료되어 버린것이다.

처음 테스트 코드가 통과한 이유는 DB에 저장할 시간을 주도록 테스트코드에 sleep()을 줬기 떄문이다.

latch.await();

// 여기 추가
Thread.sleep(5000);

long count = couponRepository.count();

assertThat(count).isEqualTo(100);

위와 같이 일정 대기시간을 줘서 카프카의 처리량을 조절해, DB의 부하를 줄일 수 있다는 장점이 있다는 것도 추가적으로 알게되었다!

요건 변경해보기

이제 비즈니스 로직을 추가해서 복잡도를 올려보자. 기존의 요건은 다음과 같다.

선착순 100명에게 할인쿠폰을 제공하는 이벤트를 제공하고자 한다.
이 이벤트는 아래와 같은 조건을 만족하여야 한다.

  • 선착순 100명에게만 지급되어야 한다.
  • 101개 이상이 지급되면 안된다.
  • 순간적으로 몰리는 트래픽을 버틸 수 있어야 한다.

여기서 다음과 같은 조건을 추가하자.

  • 쿠폰은 1인 1매로 제한된다.

기존 로직대로라면 손만 빠르다면 한 사람이 많은 수의 쿠폰을 발급해버릴 수 있는 상황이 존재한다.
왜냐하면 Coupon 테이블은 쿠폰 id만 존재할 뿐, 별도로 복합키 처리가 되어있다거나 하지 않기 때문이다.

이러한 현상을 막으려면 다음과 같은 방법들이 존재한다.

  • DB 조회 후, 발급받은 적이 없는 고객이면 쿠폰 발급해주기
  • DB에 키를 추가해 복합키 방식으로 해결하기
  • 등등..

여러 방법들이 존재하겠지만, 로직까지 가지 않고 자료구조 단에서 해결할 수 있게, 우리는 이 문제를 Redis의 특성인 Set 자료구조를 통해 해결해보도록 하자.
Set은 기본적으로 중복을 허용하지 않기때문에, 로직까지 갈 필요도 없이 해결이 가능하다.

sadd test 1

"test" : 1 형태로 set이 생성되었다. 
제대로 적용되었는지 확인해보자.

smembers test

test를 키로 갖는 set의 value가 정상적으로 출력된다.

이제 이걸 자바 코드로 구현해보자.

@Repository
@RequiredArgsConstructor
public class AppliedMemberRepository {

    private final RedisTemplate<String, String> redisTemplate;

    public Long add(Long memberId) {
        return redisTemplate
                .opsForSet()
                .add("applied_member", memberId.toString());
    }

}

 

Redis에 Set 데이터를 생성해주는 코드이다.

서비스를 수정해주자.
생성된 set에 add를 해주면, 반영된 값만큼 apply 값이 return된다.
만약 발급받은 적이 있는 고객이라면, 이는 곧 apply값이 0이 될것이고, (set이라서)
해당 고객은 추가 로직을 밟지 않고 바로 return해주는 로직이다.

public void apply(Long userId) {
    Long apply = appliedMemberRepository.add(userId);

    if (apply != 1) {
        return;
    }

    Long count = couponCountRepository.increase();

    if (count > 100) {
        return;
    }

    couponCreateProducer.create(userId);
}

 이제 1명이 1000번의 요청을 하는 테스트코드를 작성해보자.

@Test
void 일인일매응모() throws InterruptedException {
    int threadCount = 1000;

    ExecutorService executorService = Executors.newFixedThreadPool(32);
    CountDownLatch latch = new CountDownLatch(threadCount);

    for (int i = 1; i <= 1000; i++) {
        long memberId = 1L;
        executorService.submit(() -> {
            try {
                couponService.apply(memberId);
            } finally {

                // 1000부터 줄여나가서 0이 되면 메인 스레드를 대기상태에서 해제한다.
                latch.countDown();
            }
        });
    }

    // 메인 스레드를 대기상태로 전환한다.
    latch.await();

    Thread.sleep(10000);

    long count = couponRepository.count();
    System.out.println("count = " + count);
    assertThat(count).isEqualTo(1);
}

한명이 수백, 수천번을 요청해도 coupon테이블엔 하나의 row만 존재해야 한다.

기존 로직의 결함

이렇게 해서 기본적인 로직은 완성이 되었다. 하지만 기존 로직은 결함이 존재한다. 서비스 코드를 살펴보자.

public void apply(Long userId) {
        Long apply = appliedMemberRepository.add(userId);

        if (apply != 1) {
            return;
        }

        Long count = couponCountRepository.increase();

        if (count > 100) {
            return;
        }

        couponCreateProducer.create(userId);
    }

해당 코드에서 couponCreateProducer.create() 구문은 카프카의 영역이다. 카프카를 통해 쿠폰을 발급해주고 있는데, 카프카가 모종의 이유로 에러가 발생해서 정상 동작하지 않더라도, repository를 통해 db에 저장되므로, 쿠폰이 발급되지도 않았는데 count가 올라가서 최종적으로는 모든 고객이 쿠폰을 받지못하는 불상사가 발생할 수도 있다.

이에 대한 리팩토링은 다음 포스팅에서 해보도록 하자. 막차 끊기기 전에 집가야돼....!!

결론


이렇게 Redis & Kafka를 간단하게 찍먹해봤다.

어떤 느낌인지는 알겠는데, 지금은 Consumer와 Producer가 하나의 자바 코드로 수행이 되고 있기에, 정말 간단한 실습정도라고 보면 되겠다.

실제로 프로젝트를 진행한다면 REST API를 통해 개발되므로, 어떤식으로 요청을 받아야 하는지는 추가적인 학습이 필요해보인다.

주말에 공부해봐야지.

구독 및 하트는 정보 포스팅 제작에 큰 힘이됩니다♡

728x90
반응형