서론
회사에서 이벤트 브로커를 만들 수 있는 좋은 기회가 생겼다.
기존에 우리 회사는 IoT 기기로부터 전달된 이벤트를 받아서 쓰기 때문에, 이를 저장하거나, 갈무리해 관리할 수 있는 구조가 아니었다.
이번에 새로운 3rd-party 플랫폼을 추가하게 되어서, 해당 작업을 맡게 되었다.
전반적인 아키텍처는 구도가 잡혔는데, 파티션 수, 토픽의 개수, 순서 보장 방법, 클러스터 내 브로커 개수 등의 세부적인 설계가 필요한 상황이 되었다.
카프카의 스펙을 어떻게 설정하는지, 기초부터 차근차근 따져가면서 생각해보기로 하자.
설계가 필요한 다른 사람들에게 도움이 되길 바란다.
본론
배경
먼저 현 상황에 대해 간단히 기술해보자면
- TPS 기준으로, 평상시 700~800, 특정 타임 1500~2500, 스파이크 시 3500 이상의 이벤트 발생
- 기존 브로커는 토픽 3개, 파티션은 개당 6, 7, 9개로 설정되어있음.
- 모든 이벤트를 받아 이벤트 속성 별로 토픽을 분리가 필요.
- 이벤트 속성은 총 22개, 그러나 비즈니스에 사용되는 토픽은 2개뿐. 추후 사용 가능성이 존재.
- 토픽 내의 파티션에 들어가는 정보는, 이벤트가 발생한 IoT기기(이하 기기)의 ID % 파티션 수 로 각각 파티션 큐에 삽입
- 해당 이벤트를 3rd-party 컨슈머가 가져감
이런 식의 구조를 띄고 있다.
스파이크 타임을 고려하여 3500개의 이벤트까지 안전하게 유실없이, 빠르게 보장하려면 데이터 처리 속도가 보장되어야 한다.
나이브하게 생각해본다면, 고려해볼 스펙의 리스트는 다음과 같다.
- 파티션 수
- 토픽 수
- 컨슈머 수
또한, 이전에 해당 내용과 관련해 팀원분과 함께(라고 말하고 버스를 탔다) 테스트해서 알아낸 내용들을 종합해보면, 다음과 같다.
- 제조사로부터 이벤트 수신 -> consumer가 kafka에 publish -> 다른 consumer가 이를 consume 까지의 시간은 약 0.4~5초
- 순서 보장은 검증됨
- 기기 ID를 기반으로 제조사 측에서 이벤트를 송신
- 제조사로부터 수신하는 spring application이 shutdown 될 때, 카프카는 30초동안은 메시지를 처리하고 프로세스를 종료한다.
- graceful shutdown과는 관계가 없었음, kafka에서 자체적으로 처리
데이터 처리 속도는 어떻게 올리나?
데이터 처리 속도를 올리는 방법은 크게 2가지다.
컨슈머의 처리량 늘리기
컨슈머의 처리량을 늘리는 것은 단순히 서버의 스케일 업을 하거나, GC튜닝 등의 작업이 수반된다.
그러나 이런 작업은 난이도가 상당할 뿐더러, 시간도 오래걸리고 다른 시스템과 엮여있기 때문에 섣불리 작업하기가 쉽지 않다.
물론 내 경우는 새로 만드는 것이긴 하지만, 너무 단순하게 문제를 접근하는 것 같으니 패스하자.
파티션의 개수를 늘리고 파티션 개수만큼 컨슈머를 추가하기
일종의 스케일 아웃이다. 컨슈머(서버)를 그만큼 추가하는 것이기 때문에 돈이 더 드는 거 아닌가? 라고 생각할 수도 있지만, 위에서 언급한 스케일 업이나 스케일 아웃으로 나가는 비용이나 큰 차이 없다고 생각한다.
오히려 스케일 아웃으로 서비스의 안정성을 확보하고, 서버가 무거워지지 않는 것에 초점을 둔다면 더 좋은 방법이라고 볼 수 있다.
프로듀서가 보내는 데이터가 초당 1,000개고, 컨슈머가 처리할 수 있는 데이터가 100개라면 1,000 / 100 = 10 개의 파티션이 필요해진다.
프로듀서 전송 데이터량 < 컨슈머 데이터 처리량 * 파티션 개수
그럼 여기서 중요한건 컨슈머의 처리량인데, 컨슈머의 처리량을 어떻게 알 수 있을까?
파티션 수 조절해보기
정확한 방법이 아닐 수도 있지만, 일단 당장 떠오른 것은 로컬에 세팅해보는 것이다.
컨슈머의 스펙은 메모리 4기가로 할당해봤다.
생각보다 잘 받아온다. 물론 샘플코드라 이벤트의 크기도 작고, 메모리를 4기가씩 할당해주는 것도 한 몫 할 것 같다. 1기가로 극단적으로 줄여보자.
1기가로 서버 스펙을 줄였음에도 불구하고, 너무 잘 받아온다. 그런데 한 1분정도 지나니까 애플리케이션이 정신을 못차리더라. 그래서 1기가는 너무하고.. 넉넉잡아 4기가로 테스트를 진행해봤다.
현 상태에서의 파티션 수는 로컬에서 publish하고 consume해서 그런지 1개로 확인된다.
@KafkaListener(topics = TOPIC, groupId = "high-throughput-group")
public void listen(String message) {
int partitionCount = getPartitionCount(TOPIC);
System.out.println("현재 토픽 '" + TOPIC + "'의 파티션 개수: " + partitionCount);
}
private int getPartitionCount(String topicName) {
try {
KafkaFuture<Map<String, TopicDescription>> future = adminClient.describeTopics(List.of(topicName)).allTopicNames();
TopicDescription topicDescription = future.get().get(topicName);
return topicDescription.partitions().size();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
코드단에서 파티션 1개로 확인되어서.. 너무 작은거 아닌가? 하고 kafka에서 직접 파티션 수를 확인해봤는데, 기본 파티션 수가 1로 설정되어 있었다.
그런데 테스트를 하다보니, 로컬에서 publish하고 consume하는 거라 너무 수월하게 테스트가 진행되었다.
그래서 로컬에서 발행한 이벤트 말고, 실제 이벤트를 토대로 테스트해보기로 했다.
파티션의 개수를 늘려보면서 서버 부하를 확인해볼건데, 여기서부터는 매트릭 정보를 확인하기 위해 프로메테우스와 그라파나 세팅을 진행했다.
어떻게 세팅하는지는 따로 검색해보도록 하자.
서버의 매트릭 정보를 정상적으로 따오는 것 까지는 확인이 되었다.
이제 이걸 ec2에 올려놓고, 약 4시간 단위로 파티션 수를 바꿔가면서 실제 데이터와 비교해보려고 한다.
실제 데이터를 수집하고 부하 테스트를 해야하다보니.. 시간도 필요하고 글도 길어질 것 같으니 다음 포스팅으로 빤스런해보겠다.
결론
카프카의 스펙 설정에 대해 검토가 필요한 항목들은 어떤 것들이 있는지 살펴보고 테스트 환경을 구축했다.
사실 빠른 시간안에 부하테스트를 하려면 할 수는 있는데, 아무래도 로컬 컴퓨터 하나로만 하다보니 네트워킹 시간이 빠져 정확도가 낮을 것이다.
테스트 서버로 사용하는 ec2에 직접 올려놓고 테스트 하는것이 더 정확하기에, 1-2일 정도 파티션 수, 컨슈머 수 등을 조절해가면서 찍히는 그래프를 토대로 스펙을 결정해보자.
참고
How many partitions you need for topic? Best practices for data partitioning in Apache Kafka. Part 3