들어가기에 앞서
- 개인적인 공부 용도로 기록한 것입니다. 상업적인 용도가 아닙니다.
- 오역이 있을 수 있습니다.
- 모호하거나 이해가 되지 않는 부분은 초록색으로 표시하였습니다.
Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. NetDB'11, Jun. 12, 2011, Athens, Greece. Copyright 2011 ACM 978-1-4503-0652-2/11/06…$10.00.
ABSTRACT (요약)
Log processing has become a critical component of the data pipeline for consumer internet companies. We introduce Kafka, a distributed messaging system that we developed for collecting and delivering high volumes of log data with low latency. Our system incorporates ideas from existing log aggregators and messaging systems, and is suitable for both offline and online message consumption. We made quite a few unconventional yet practical design choices in Kafka to make our system efficient and scalable. Our experimental results show that Kafka has superior performance when comparedd to two popular messaging systems. We have been using Kafka in production for some time and it is processing hundreds of gigabytes of new data each day.
로그 처리는 소비자를 중시하는 인터넷 기업들의 중요한 데이터 파이프라인 요소가 되어가고 있다. 본 논문에서는 카프카를 소개한다. 카프카는 고용량의 데이터를 작은 지연으로 수집하고 이동시키는 분산 메시지 시스템이다. 카프카 시스템은 현재 존재하는 로그 수집기들과 메시지 시스템의 개념들을 포함하며 온라인과 오프라인 메시지 소비에 적합하다. 우리는 일반적이지는 않지만 실용적인 디자인을 선택함으로써 효율성과 확장성을 꾀했다. 우리의 실험적 결과물은 기존 두 개의 대표적인 메시지 시스템과 비교했을 때 우월한 성능을 가진다. 카프카를 실제로 도입한 이후로 몇 백 기가바이트의 데이터를 매일매일 처리할 수 있게 되었다.
General Terms (대표 용어)
Management, Performance, Design, Experimentation
Keywords (핵심 단어)
messaging, distributed, log processing, throughput, online
1. Introduction (도입)
There is a large amount of “log” data generated at any sizable internet company. This data typically includes (1) user activity events corresponding to logins, pageviews, clicks, “likes”, sharing, comments, and search queries; (2) operational metrics such as service call stack, call latency, errors, and system metrics such as CPU, memory, network, or disk utilization on each machine. Log data has long been a component of analytics used to track user engagement, system utilization, and other metrics. However recent trends in internet applications have made activity data a part of the production data pipeline used directly in site features. These uses include (1) search relevance, (2) recommendations which may be driven by item popularity or cooccurrence in the activity stream, (3) ad targeting and reporting, and (4) security applications that protect against abusive behaviors such as spam or unauthorized data scraping, and (5) newsfeed features that aggregate user status updates or actions for their “friends” or “connections” to read.
규모에 상관없이 어떤 인터넷 회사라도 로그 데이터는 생성된다. 로그 데이터는 다음과 같은 특징을 가진다.
(1) 유저 활동 이벤트 - 로그인, 페이지 뷰, 클릭, 좋아요, 공유, 댓글, 검색 쿼리
(2) 운영 메트릭 - 서비스 콜 스택, 호출 지연, 에러, 시스템 메트릭 (각 기기의 CPU, 메모리, 네트워크, 디스크 사용)
데이터 분석의 요소로서 로그 데이터는 유저 행동 파악, 시스템 사용, 기타 메트릭에 오랫동안 사용되어 왔다. 하지만 최근 인터넷 애플리케이션의 추세는 운영 데이터 파이프라인의 활동 데이터를 기능 개발에 직접적으로 사용한다. 예시는 다음과 같다.
(1) 검색 연관성
(2) 상품의 인기나 활동 스트림에서의 동시 출현에 따른 추천
(3) 광고 유도나 보고
(4) 스팸이나 권한 없는 데이터 스크래핑 같은 악용을 막는 보안 애플리케이션
(5) 친구나 연결된 사람에게 보여줄 사용자의 상태 갱신이나 활동을 모아두는 뉴스 피드
This production, real-time usage of log data creates new challenges for data systems because its volume is orders of magnitude larger than the “real” data. For example, search, recommendations, and advertising often require computing granular click-through rates, which generate log records not only for every user click, but also for dozens of items on each page that are not clicked. Every day, China Mobile collects 5–8TB of phone call records [11] and Facebook gathers almost 6TB of various user activity events [12].
실시간 로그 데이터 사용은 하나의 도전이 되었는데 왜냐하면 "실제" 데이터와는 다르게 크기가 굉장히 컸기 때문이다. 예를 들면 검색, 추천, 광고 같은 경우에는 세분화된 클릭률 계산을 요구했다. 세분화된 클릭률이 생성하는 로그 레코드들은 단순히 유저의 클릭뿐만 아니라 각 페이지에 속해 있는 클릭하지 않은 요소들까지 포함한 것이다. 따라서 중국 모바일의 경우에는 매일 5-8TB 정도의 전화 기록을 수집하였고, 페이스북의 경우에는 매일 6TB 정도의 다양한 유저 활동 이벤트를 수집하게 되었다.
Many early systems for processing this kind of data relied on physically scraping log files off production servers for analysis. In recent years, several specialized distributed log aggregators have been built, including Facebook’s Scribe [6], Yahoo’s Data Highway [4], and Cloudera’s Flume [3]. Those systems are primarily designed for collecting and loading the log data into a data warehouse or Hadoop [8] for offline consumption. At LinkedIn (a social network site), we found that in addition to traditional offline analytics, we needed to support most of the real-time applications mentioned above with delays of no more than a few seconds.
이러한 데이터를 처리하는 많은 초창기 시스템에서는 분석을 위해 운영 서버에서 로그 파일들을 물리적으로 스크래핑했다. 최근에서야 페이스북의 스크라이브, 야후의 데이터 하이웨이, 클라우데라의 플룸같은 분산 로그 수집기들이 만들어졌다. 이러한 시스템들은 로그 데이터를 모으고 데이터 웨어하우스나 하둡 같은 곳으로 적제 하는 것이 최우선 되도록 디자인되었다. 링크드인에서 우리는 기존의 오프라인 분석에 더해 지연이 없는 실시간 데이터 처리 애플리케이션이 필요하게 되었다.
We have built a novel messaging system for log processing called Kafka [18] that combines the benefits of traditional log aggregators and messaging systems. On the one hand, Kafka is distributed and scalable, and offers high throughput. On the other hand, Kafka provides an API similar to a messaging system and allows applications to consume log events in real time. Kafka has been open sourced and used successfully in production at LinkedIn for more than 6 months. It greatly simplifies our infrastructure, since we can exploit a single piece of software for both online and offline consumption of the log data of all types. The rest of the paper is organized as follows. We revisit traditional messaging systems and log aggregators in Section 2. In Section 3, we describe the architecture of Kafka and its key design principles. We describe our deployment of Kafka at LinkedIn in Section 4 and the performance results of Kafka in Section 5. We discuss future work and conclude in Section 6.
우리는 카프카라고 불리는 새로운 로그 처리 메시지 시스템을 만들었다. 카프카는 기존의 로그 수집기와 메시지 시스템의 장점을 결합한 것이다. 카프카는 분산되어 있고, 확장성 있으며, 높은 처리율을 제공한다. 또한 메시지 시스템과 유사한 API를 제공하며 애플리케이션에게 실시간으로 로그 이벤트를 소비할 수 있게 한다. 카프카는 오픈 소스이며 6개월 넘게 링크드인에서 사용되었다. 카프카는 단순하게 모든 로그 데이터 유형을 하나의 소프트웨어로 오프라인, 온라인 처리할 수 있기 때문에 링크드인의 인프라 구조를 매우 간단하게 만들었다. 이 논문의 나머지 파트를 요약하자면, 2장에서는 전통적인 메시지 시스템과 로그 수집기에 대해서 살펴볼 것이다. 3장에서는 카프카 구조와 중요 디자인 원칙에 대해서 설명할 것이다. 4장에서는 링크드인에서의 쓰임에 대해 서술한다. 5장에서는 성능 결과에 대해서 설명한다. 마지막 6장에서는 후행 연구와 마무리를 지을 것이다.
2. Related Work (관련 연구)
Traditional enterprise messaging systems [1][7][15][17] have existed for a long time and often play a critical role as an event bus for processing asynchronous data flows. However, there are a few reasons why they tend not to be a good fit for log processing. First, there is a mismatch in features offered by enterprise systems. Those systems often focus on offering a rich set of delivery guarantees. For example, IBM Websphere MQ [7] has transactional supports that allow an application to insert messages into multiple queues atomically. The JMS [14] specification allows each individual message to be acknowledged after consumption, potentially out of order. Such delivery guarantees are often overkill for collecting log data. For instance, losing a few pageview events occasionally is certainly not the end of the world. Those unneeded features tend to increase the complexity of both the API and the underlying implementation of those systems. Second, many systems do not focus as strongly on throughput as their primary design constraint. For example, JMS has no API to allow the producer to explicitly batch multiple messages into a single request. This means each message requires a full TCP/IP roundtrip, which is not feasible for the throughput requirements of our domain. Third, those systems are weak in distributed support. There is no easy way to partition and store messages on multiple machines. Finally, many messaging systems assume near immediate consumption of messages, so the queue of unconsumed messages is always fairly small. Their performance degrades significantly if messages are allowed to accumulate, as is the case for offline consumers such as data warehousing applications that do periodic large loads rather than continuous consumption.
기존의 기업용 메시징 시스템은 오랫동안 존재해오면서 비동기 데이터 흐름의 처리를 위한 이벤트 버스라는 중요한 역할을 해왔다. 하지만 이 시스템들이 로그 프로세싱에는 적합하지 않은 몇 가지 이유가 존재한다.
첫 번째로, 엔터프라이즈 시스템이 사용하기에는 부적합한 기능이 있다. 기존 시스템은 주로 전달 보장에 초점을 맞춘다. 예를 들면 IBM의 웹스피어 메시지큐는 애플리케이션이 다중 큐에 원자적으로 메시지를 삽입하는 것을 지원한다. 자바 메시지 큐는 각각의 메시지를 소비한 이후 순서에 상관없이 수신 확인을 개별적으로 할 수 있게 해준다. 전달 보장은 로그 데이터를 수집에는 과한 기능이다. 가끔씩 몇 개의 페이지뷰 이벤트를 잃는다고 세상이 끝나지 않는다. 필요하지 않은 기능들이 API와 시스템 구현의 복잡성을 증가시킨다.
두 번째로, 많은 시스템들은 설계에 있어서 처리량을 소홀히 하는 경우가 있다. 예를 들면 자바 메시지 큐에는 하나의 요청에 여러 메시지를 명시적으로 담을 수 있게 하는 API가 존재하지 않는다. 따라서 각각의 모든 메시지들은 TCP/IP 통신을 요구한다. 이건 우리가 필요로 하는 처리량을 만족시키기에는 불가능한 처리다.
세 번째로, 분산 처리 기능이 미흡하다. 파티션을 나누고 여러 기기에 메시지를 저장하는 쉬운 방법이 없다.
마지막으로, 많은 메시지 시스템은 메시지가 곧 소비되기 때문에 소비되지 않은 메시지들은 작다고 가정한다. 이러한 가정으로 인해 메시지가 큐에 누적되게 되면 성능이 떨어지게 된다. 예를 들어 데이터 웨어하우스 같은 오프라인 소비자들은 지속적으로 메시지를 소비하는 것이 아닌 긴 구간 단위로 소비한다.
A number of specialized log aggregators have been built over the last few years. Facebook uses a system called Scribe. Each front- end machine can send log data to a set of Scribe machines over sockets. Each Scribe machine aggregates the log entries and periodically dumps them to HDFS [9] or an NFS device. Yahoo’s data highway project has a similar dataflow. A set of machines aggregate events from the clients and roll out “minute” files, which are then added to HDFS. Flume is a relatively new log aggregator developed by Cloudera. It supports extensible “pipes” and “sinks”, and makes streaming log data very flexible. It also has more integrated distributed support. However, most of those systems are built for consuming the log data offline, and often expose implementation details unnecessarily (e.g. “minute files”) to the consumer. Additionally, most of them use a “push” model in which the broker forwards data to consumers. At LinkedIn, we find the “pull” model more suitable for our applications since each consumer can retrieve the messages at the maximum rate it can sustain and avoid being flooded by messages pushed faster than it can handle. The pull model also makes it easy to rewind a consumer and we discuss this benefit at the end of Section 3.2.
로그 수집기는 최근 몇 년 간 개발되어왔다. 페이스북은 스크라이브라는 시스템을 사용한다. 각각의 프론트엔드 기기들은 스크라이브 기기 집합에 소켓을 통하여 로그 데이터를 보낸다. 각각의 스크라이브 기기들은 로그를 수집하고 주기적으로 HDFS나 NFS 기기로 보낸다. 야후의 데이터 하이웨이 프로젝트도 비슷한 데이터 흐름을 가진다. 기기 집합에서 클라이언트로 부터의 이벤트들을 수집하고 "분" 파일로 정리한뒤 하둡 파일 시스템에 추가한다. 플룸은 클라우드데라에서 개발한 로그 수집기다. 플룸은 파이프들과 싱크들을 늘리는 것을 지원하고 로그 데이터들을 스트리밍하는 것을 유연하게 만든다. 플룸이 좀 더 통합된 분산 지원을 하지만 로그 데이터를 오프라인에서 소비하는 것을 주 목적으로 하며 소비자에게 불필요하게 구현을 노출한다. ("분 파일" 같은) 추가적으로, 브로커가 소비자에게 데이터를 보내는 "푸쉬" 모델을 사용한다. 링크드인에서는 데이터 소비가 많은 애플리케이션을 만들기 위해서는 소비자가 "풀" 모델이어야 적절하다고 생각한다.
각각의 소비자가 자신의 한계치만큼 메시지를 소비할 수 있게 해야, 처리할 수 없을만큼의 메시지가 빠르게 들어오더라도 유지가 가능하기 때문이다. 풀 모델은 소비자가 되감기 하는 것을 쉽게 만드는데 3.2 절에서 이 장점에 대해 이야기한다.
More recently, Yahoo! Research developed a new distributed pub/sub system called HedWig [13]. HedWig is highly scalable and available, and offers strong durability guarantees. However, it is mainly intended for storing the commit log of a data store.
야후! 연구실은 헤드위그라는 새로운 분산 발생/구독 시스템을 개발했다. 헤드위그는 확장성있고 고가용성, 그리고 강한 내구성을 제공한다. 하지만 데이터 저장소의 커밋 로그를 저장하는 것이 주된 목적이다.
3. Kafka Architecture and Design Principles (카프카 아키텍처와 디자인 원칙)
Because of limitations in existing systems, we developed a new messaging-based log aggregator Kafka. We first introduce the basic concepts in Kafka. A stream of messages of a particular type is defined by a topic. A producer can publish messages to a topic. The published messages are then stored at a set of servers called brokers. A consumer can subscribe to one or more topics from the brokers, and consume the subscribed messages by pulling data from the brokers.
현재 시스템들의 한계점 때문에, 메시지 기반 로그 수집기인 카프카를 개발해야 했다. 먼저 카프카의 기본 개념에 대해서 소개한다. 특정 타입의 메시지 흐름은 토픽으로 규정된다. 생산자는 메시지를 토픽으로 발행한다. 발행된 메시지는 브로커라고 불리는 서버 집합에 저장된다. 소비자는 하나 이상의 토픽을 브로커로부터 구독할 수 있으며 브로커로부터 구독 메시지를 풀링하는 방식으로 소비할 수 있다.
Messaging is conceptually simple, and we have tried to make the Kafka API equally simple to reflect this. Instead of showing the exact API, we present some sample code to show how the API is used. The sample code of the producer is given below. A message is defined to contain just a payload of bytes. A user can choose her favorite serialization method to encode a message. For efficiency, the producer can send a set of messages in a single publish request.
메시징은 간단한 개념이기 때문에 카프카 API에도 반영하려고 했다. 완벽한 API 대신에 아래의 샘플코드로 어떻게 API가 사용되는지를 나타내었다. 메시지는 단지 바이트를 담을 수 있도록 설계되었다. 사용자는 원하는 직렬화 방식을 선택하여 메시지를 인코딩할 수 있다. 효율성을 위해서 생산자는 메시지 집합을 하나의 발생 요청에 담아 보낼 수도 있다.
Sample producer code:
producer = new Producer(...);
message = new Message(“test message str”.getBytes());
set = new MessageSet(message);
producer.send(“topic1”, set);
To subscribe to a topic, a consumer first creates one or more message streams for the topic. The messages published to that topic will be evenly distributed into these sub-streams. The details about how Kafka distributes the messages are described later in Section 3.2. Each message stream provides an iterator interface over the continual stream of messages being produced. The consumer then iterates over every message in the stream and processes the payload of the message. Unlike traditional iterators, the message stream iterator never terminates. If there are currently no more messages to consume, the iterator blocks until new messages are published to the topic. We support both the point-to- point delivery model in which multiple consumers jointly consume a single copy of all messages in a topic, as well as the publish/subscribe model in which multiple consumers each retrieve its own copy of a topic.
토픽을 구독하기 위해서, 소비자는 먼저 하나 이상의 토픽에 대한 메시지 스트림을 만들어야 한다. 메시지가 토픽에 발행되고나면 하위 스트림에 분산된다. 어떻게 카프카가 메시지들을 분산시키는지에 대해서는 3.2 절에서 논의한다. 각 메시지 스트림은 메시지가 계속 생성되고 있는 스트림에 대한 반복자 인터페이스를 제공한다. 소비자는 스트림에 있는 메시지를 반복적으로 소비한다. 일반적인 반복자와는 달리 메시지 스트림 반복자는 끝이 없다. 만약에 더 이상 소비될 메시지가 없다면 메시지 스트림 반복자는 새로운 메시지가 토픽에 발행될 때까지 멈추게 된다. 카프카는 여러개의 소비자가 동시에 토픽의 단일 메시지 사본을 소비하는 점대점 모델과 여러개의 소비자가 각자의 토픽에서 각자 토픽 메시지의 사본을 소비하는 발행/구독 모델을 지원한다.
Sample consumer code:
streams[] = Consumer.createMessageStreams(“topic1”, 1);
for (message : streams[0]) {
bytes = message.payload();// do something with the bytes
}
The overall architecture of Kafka is shown in Figure 1. Since Kafka is distributed in nature, an Kafka cluster typically consists of multiple brokers. To balance load, a topic is divided into multiple partitions and each broker stores one or more of those partitions. Multiple producers and consumers can publish and retrieve messages at the same time. In Section 3.1, we describe the layout of a single partition on a broker and a few design choices that we selected to make accessing a partition efficient. In Section 3.2, we describe how the producer and the consumer interact with multiple brokers in a distributed setting. We discuss the delivery guarantees of Kafka in Section 3.3.
그림 1이 카프카의 구조 개략도이다. 카프카는 자연스럽게 분산되기 때문에 카프카 클러스터는 여러개의 브로커로 이루어진다는 특징이 있다. 부하의 균형을 위해서 토픽은 여러 개의 파티션으로 나뉘게되고 각각의 브로커는 하나 이상의 파티션을 저장한다. 여러 생산자와 소비자는 생산과 소비를 동시에 할 수 있다. 3.1 장에서 브로커에 배치된 단일 파티션 및 파티션 접근을 효율적으로 하기 위해 선택한 디자인에 대해 설명한다. 3.2 장에서는 생산자와 소비자가 어떻게 분산된 상황에서 여러 개의 브로커들과 상호작용하는지 설명한다. 3.3 장에서는 카프카의 전달 보장에 대해서 설명한다.
3.1 Efficiency on a Single Partition (단일 파티션에서의 효율성)
We made a few decisions in Kafka to make the system efficient.
카프카를 시스템 효율적으로 만들기 위해서 몇 가지 선택을 했다.
Simple storage: Kafka has a very simple storage layout. Each partition of a topic corresponds to a logical log. Physically, a log is implemented as a set of segment files of approximately the same size (e.g., 1GB). Every time a producer publishes a message to a partition, the broker simply appends the message to the last segment file. For better performance, we flush the segment files to disk only after a configurable number of messages have been published or a certain amount of time has elapsed. A message is only exposed to the consumers after it is flushed.
단순한 저장소: 카프카는 매우 간단한 저장소 배치를 가진다. 토픽의 각 파티션은 하나의 논리적인 로그에 대응된다. 물리적으로 하나의 로그는 거의 동일한 크기의 세그먼트 파일로 구현되어있다. 파티션에 생산자가 메시지를 발행할 때 마다 브로커는 가장 최근 세그먼트 파일에 메시지를 단순히 추가할 뿐이다. 더 나은 성능을 위해서 일정 수의 메시지가 발행되었거나 일정 시간이 지난 이후에 세그먼트 파일이 디스크로 써진다. 소비자들은 메시지가 써진 이후에 메시지를 확인할 수 있다.
Unlike typical messaging systems, a message stored in Kafka doesn’t have an explicit message id. Instead, each message is addressed by its logical offset in the log. This avoids the overhead of maintaining auxiliary, seek-intensive random-access index structures that map the message ids to the actual message locations. Note that our message ids are increasing but not consecutive. To compute the id of the next message, we have to add the length of the current message to its id. From now on, we will use message ids and offsets interchangeably.
일반적인 메시지 시스템과는 다르게 카프카에 저장된 메시지는 명확한 메시지 식별자를 가지지 않는다. 대신에 각 메시지들은 로그에서의 논리적인 변위로 식별성이 생긴다. 이 특징은 메시지 아이디가 실제 메시지 위치를 가리키는 인덱스 구조에서 나타나는 랜덤-엑세스의 읽기 부하를 피할 수 있게 한다. 메시지 식별자는 증가하지만 단조 증가가 아님에 주의해야한다. 다음 메시지의 식별자를 계산하기 위해서는 현재 메시지의 식별자에 길이를 더하면 된다. 지금부터 메시지 식별자와 변위를 대체해서 사용하겠다.
A consumer always consumes messages from a particular partition sequentially. If the consumer acknowledges a particular message offset, it implies that the consumer has received all messages prior to that offset in the partition. Under the covers, the consumer is issuing asynchronous pull requests to the broker to have a buffer of data ready for the application to consume. Each pull request contains the offset of the message from which the consumption begins and an acceptable number of bytes to fetch. Each broker keeps in memory a sorted list of offsets, including the offset of the first message in every segment file. The broker locates the segment file where the requested message resides by searching the offset list, and sends the data back to the consumer. After a consumer receives a message, it computes the offset of the next message to consume and uses it in the next pull request. The layout of an Kafka log and the in-memory index is depicted in Figure 2. Each box shows the offset of a message.
소비자는 항상 특정 파티션에서 메시지를 순차적으로 소비한다. 만약에 소비자가 특정 메시지의 변위를 알고 있다면 소비자가 변위의 이전 모든 메시지에 대해 소비했었다는 것을 의미한다. 보이지는 않지만 소비자는 애플리케이션이 사용할 데이터에 대해 요청을 미리 비동기로 브로커에 보내어 버퍼에 준비 시킨다. 각 풀 요청은 소비의 시작 메시지의 변위와 수용할 수 있는 바이트 수를 포함한다. 브로커는 메모리에 각 세그먼트 파일의 첫 번째 메시지 변위를 정렬된 리스트로 보관한다. 브로커는 변위 리스트를 사용해서 요청 메시지가 들어있는 세그먼트 파일을 찾고 데이터를 소비자에게 보낸다. 소비자는 메시지를 받은 후 소비를 위해 다음 메시지의 변위를 계산하고 다음 요청에 사용한다. 카프카 로그와 메모리 인덱스의 개략도는 그림 2에 표시되어 있다. 각 네모는 메시지의 변위를 나타낸다.

Efficient transfer: We are very careful about transferring data in and out of Kafka. Earlier, we have shown that the producer can submit a set of messages in a single send request. Although the end consumer API iterates one message at a time, under the covers, each pull request from a consumer also retrieves multiple messages up to a certain size, typically hundreds of kilobytes.
효율적인 전송: 우리는 카프카의 데이터 수신/발신에 대해서 매우 고민했다. 앞서 생산자가 하나의 보내기 요청에 여러개의 메시지 모음을 보낼 수 있음을 보여줬다. 소비자 API 는 한 번에 하나의 메시지를 소비하는 것으로 보이지만 소비자 역시 특정 크기 만큼 (일반적으로 몇 백 킬로바이트) 메시지를 가져오고 있다.
Another unconventional choice that we made is to avoid explicitly caching messages in memory at the Kafka layer. Instead, we rely on the underlying file system page cache. This has the main benefit of avoiding double buffering---messages are only cached in the page cache. This has the additional benefit of retaining warm cache even when a broker process is restarted. Since Kafka doesn’t cache messages in process at all, it has very little overhead in garbage collecting its memory, making efficient implementation in a VM-based language feasible. Finally, since both the producer and the consumer access the segment files sequentially, with the consumer often lagging the producer by a small amount, normal operating system caching heuristics are very effective (specifically write-through caching and read- ahead). We have found that both the production and the consumption have consistent performance linear to the data size, up to many terabytes of data.
또 하나 특이한 선택은 메시지를 카프카 계층에 있는 메모리 대신 시스템 페이지 캐시에 캐시했다. 이건 더블 버퍼링을 막아주는 장점을 가져온다. 메시지는 오직 페이지 캐시에만 캐시되어 있다. 브로커 프로세서가 재시작되더라도 캐시에 데이터를 계속 유지할 수 있다는 장점도 있다. 카프카는 프로세스에 메시지를 전혀 캐시하지 않기 때문에 가비지 콜랙팅의 오버헤드가 매우 적어지게 되어 VM 기반의 언어에서의 구현이 매우 효율적이게 되었다. 마지막으로, 생산자와 소비자가 세그먼트 파일을 순차적으로 접근하고 소비자가 생산자보다 조금 뒤쳐진 상황에서 일반적인 운영체제의 캐시 예측이 매우 효율적으로 작동한다. (특히 캐시에 데이터를 쓴 뒤에 바로 읽을 경우) 우리는 발행과 소비가 테라바이트가 넘는 데이터 크기를 가지더라도 일정한 성능을 가지는 것을 관찰했다.
In addition we optimize the network access for consumers. Kafka is a multi-subscriber system and a single message may be consumed multiple times by different consumer applications. A typical approach to sending bytes from a local file to a remote socket involves the following steps: (1) read data from the storage media to the page cache in an OS, (2) copy data in the page cache to an application buffer, (3) copy application buffer to another kernel buffer, (4) send the kernel buffer to the socket. This includes 4 data copying and 2 system calls. On Linux and other Unix operating systems, there exists a sendfile API [5] that can directly transfer bytes from a file channel to a socket channel. This typically avoids 2 of the copies and 1 system call introduced in steps (2) and (3). Kafka exploits the sendfile API to efficiently deliver bytes in a log segment file from a broker to a consumer.
추가적으로 우리는 소비자의 네트워크 접근을 최적화했다. 카프카는 다중-구독자 시스템이고 하나의 메시지가 여러 다른 소비자 애플리케이션에 의해서 여러 번 소비될 수 있다. 일반적으로 로컬 파일의 바이트를 원격 소켓에 보내는 접근 방법은 다음과 같다.
(1) 저장 매체에서 운영체제의 페이지 캐시로 데이터를 읽는다.
(2) 페이지 캐시에서 애플리케이션의 버퍼로 데이터를 복사한다.
(3) 애플리케이션의 버퍼에서 커널의 버퍼로 데이터를 복사한다.
(4) 커널의 버퍼를 소켓에 보낸다.
위 과정은 4번의 데이터 복사와 2번의 시스템 콜을 포함한다. 리눅스와 유닉스 운영체제에서 sendFile API 라는 것이 존재하는데 이건 파일 채널에서 소켓 채널로 바로 바이트를 보내는 기능이다. 이 기능을 사용하면 (2), (3)의 단계를 생략할 수 있게 되어 2번의 데이터 복사와 1번의 시스템 콜을 안 할 수 있게 된다. 카프카는 로그 세그먼트 파일을 브로커에서 소비자에게 보낼 때 sendFile API를 이용한다.
Stateless broker: Unlike most other messaging systems, in Kafka, the information about how much each consumer has consumed is not maintained by the broker, but by the consumer itself. Such a design reduces a lot of the complexity and the overhead on the broker. However, this makes it tricky to delete a message, since a broker doesn’t know whether all subscribers have consumed the message. Kafka solves this problem by using a simple time-based SLA for the retention policy. A message is automatically deleted if it has been retained in the broker longer than a certain period, typically 7 days. This solution works well in practice. Most consumers, including the offline ones, finish consuming either daily, hourly, or in real-time. The fact that the performance of Kafka doesn’t degrade with a larger data size makes this long retention feasible.
무상태 브로커: 다른 메시지 프로그램과 다르게 카프카는 브로커가 소비자의 소비 상태를 관리하지 않는다. 대신에 소비자가 자신의 소비 상태를 관리한다. 이러한 디자인은 많은 복잡성과 브로커의 부하를 줄여준다. 하지만 메시지를 삭제하는 게 어려운데 이건 브로커가 모든 소비자가 메시지를 소비했는지에 대해 모르기 때문이다. 따라서 메시지는 저장 후 7일이 지나면 자동으로 삭제된다. 이러한 해결책은 실제 잘 작동한다. 많은 소비자가 오프라인 소비자를 포함하더라도 하루, 한 시간, 실시간 단위로 메시지를 소비 완료하기 때문이다. 카프카의 성능이 데이터 크기에 따라 저하되지 않는다는 사실이 이러한 긴 보존을 가능하게 한다.
There is an important side benefit of this design. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but proves to be an essential feature for many consumers. For example, when there is an error in application logic in the consumer, the application can re-play certain messages after the error is fixed. This is particularly important to ETL data loads into our data warehouse or Hadoop system. As another example, the consumed data may be flushed to a persistent store only periodically (e.g, a full-text indexer). If the consumer crashes, the unflushed data is lost. In this case, the consumer can checkpoint the smallest offset of the unflushed messages and re-consume from that offset when it’s restarted. We note that rewinding a consumer is much easier to support in the pull model than the push model.
이런 디자인의 또 다른 장점은 소비자가 변위를 되돌려서 데이터를 다시 소비할 수 있게 할 수 있게 하는 것이다. 이건 일반적인 큐의 원칙을 위반하지만 많은 소비자에게 필요한 기능이다. 예를 들어서 소비자의 어떤 애플리케이션 로직에 문제가 생겼다고 가정하자. 애플리케이션의 문제를 고치고 특정 메시지를 다시 소비하면 된다. 이건 특히 데이터 웨어하우스나 하둡 시스템으로의 ETL 데이터 부하에 중요하다. 다른 예시로 소비 데이터가 주기적으로 영구 저장소에 저장된다고 가정하자. 만약에 소비자가 문제가 생긴다면 저장되지 않은 데이터는 유실이다. 이러한 상황에서 소비자는 저장하지 않은 메시지 중 최소 변위 값을 가진 메시지를 알 수 있고 해당 데이터를 기점으로 다시 실행한다면 메시지를 다시 소비할 수 있게 된다. 우리는 소비자를 되돌리는 것은 푸쉬 모델보다 풀 모델에서의 지원이 더 쉽다는 사실을 중요하게 여겼다.
3.2 Distributed Coordination (분산 조정)
We now describe how the producers and the consumers behave in a distributed setting. Each producer can publish a message to either a randomly selected partition or a partition semantically determined by a partitioning key and a partitioning function. We will focus on how the consumers interact with the brokers.
우리는 생산자와 소비자가 어떻게 분산된 상태에서 행동하는 지에 대해서 서술할 것이다. 각 생산자는 무작위로 선택된 파티션 또는 파티션 식별자와 파티셔닝 함수에 의해서 결정된 파티션에 메시지를 발행한다. 앞으로 소비자가 브로커와 어떻게 상호작용하는지에 대해 서술한다.
Kafka has the concept of consumer groups. Each consumer group consists of one or more consumers that jointly consume a set of subscribed topics, i.e., each message is delivered to only one of the consumers within the group. Different consumer groups each independently consume the full set of subscribed messages and no coordination is needed across consumer groups. The consumers within the same group can be in different processes or on different machines. Our goal is to divide the messages stored in the brokers evenly among the consumers, without introducing too much coordination overhead.
카프카는 소비자 그룹이라는 개념이 있다. 각 소비자 그룹은 하나 또는 여러 개의 소비자로 이루어져 있고 이들이 연합하여 하나의 구독 토픽을 소비한다. 예를 들면 하나의 그룹 내에 있는 오직 하나의 소비자가 각 메시지를 소비한다. 다른 소비자 그룹은 독립적으로 구독한 전체 메시지 묶음을 소비하고 다른 소비자 그룹과 어떤 연관도 없다. 하나의 그룹에 있는 소비자는 다른 프로세스, 다른 기기로 이루어질 수 있다. 우리의 목표는 조정에 따른 부하는 없으면서 브로커에 저장되어 있는 메시지가 소비자 그룹간 나누어지는 것이다.
Our first decision is to make a partition within a topic the smallest unit of parallelism. This means that at any given time, all messages from one partition are consumed only by a single consumer within each consumer group. Had we allowed multiple consumers to simultaneously consume a single partition, they would have to coordinate who consumes what messages, which necessitates locking and state maintenance overhead. In contrast, in our design consuming processes only need co-ordinate when the consumers rebalance the load, an infrequent event. In order for the load to be truly balanced, we require many more partitions in a topic than the consumers in each group. We can easily achieve this by over partitioning a topic.
우리의 첫 번째 선택은 토픽에 있는 파티션을 병렬의 가장 작은 단위로 만드는 것이였다. 이 말은 어떤 경우에도 하나의 파티션에 있는 모든 메시지는 각 소비자 그룹에 있는 단 하나의 소비자에 의해서만 소비가 된다는 뜻이다. 여러 소비자가 동시에 하나의 파티션을 소비하는 것을 허용했었더라면, 소비자들은 누가 어떤 메시지를 소비했는지에 대해서 협업해야 한다. 이런 과정은 잠금을 필요로하고 상태 유지의 부하를 만든다. 반대로, 우리의 소비 과정은 소비자들이 리벨런싱 할 때만 협업이 필요하다. 리벨런싱은 자주 일어나는 일이 아니다. 부하를 분산시키려면 각 소비자 그룹에 속해 있는 소비자보다 많은 파티션이 필요하다. 이때 토픽을 더 파티셔닝하는 것으로 간단하게 해결할 수 있다.
The second decision that we made is to not have a central “master” node, but instead let consumers coordinate among themselves in a decentralized fashion. Adding a master can complicate the system since we have to further worry about master failures. To facilitate the coordination, we employ a highly available consensus service Zookeeper [10]. Zookeeper has a very simple, file system like API. One can create a path, set the value of a path, read the value of a path, delete a path, and list the children of a path. It does a few more interesting things: (a) one can register a watcher on a path and get notified when the children of a path or the value of a path has changed; (b) a path can be created as ephemeral (as oppose to persistent), which means that if the creating client is gone, the path is automatically removed by the Zookeeper server; (c) zookeeper replicates its data to multiple servers, which makes the data highly reliable and available.
두 번째 결정은 하나의 중앙 집중화된 마스터 노드가 없다는 것이다 대신에 소비자들이 중앙 집중화되지 않은 방식으로 협업하게 했다. 마스터를 추가한다는 것은 우리가 마스터가 실패 상태에 있는 것을 지나치게 고려하게 만들기 때문에 시스템을 복잡하게 만들 수 밖에 없게 된다. 협업 기능을 구현하기 위해서 고성능 합의 서비스인 주키퍼를 사용했다. 주키퍼는 API처럼 매우 간단한 파일 시스템을 가지고 있다. 경로를 만들 수 있고, 경로의 값을 설정할 수 있고, 경로의 값을 읽을 수 있고, 경로를 삭제할 수 있고, 경로의 자식들을 나열할 수 있다.
몇 가지 더 흥미로운 것들은 다음과 같다.
(a) 하나의 경로에 감시자를 등록할 수 있고 경로의 자식이나 경로의 값이 바뀔 경우에 감지할 수 있게 할 수 있다.
(b) 하나의 경로를 임시로 등록할 수 있다. 이말은 경로를 생성한 클라이언트가 떠나게 되면 경로는 주키퍼 서버에 의해서 자동으로 삭제된다.
(c) 주키퍼는 자신의 데이터를 여러 서버에 복제하여 자신의 데이터를 신뢰성과 고가용성이 있게 한다.
Kafka uses Zookeeper for the following tasks: (1) detecting the addition and the removal of brokers and consumers, (2) triggering a rebalance process in each consumer when the above events happen, and (3) maintaining the consumption relationship and keeping track of the consumed offset of each partition. Specifically, when each broker or consumer starts up, it stores its information in a broker or consumer registry in Zookeeper. The broker registry contains the broker’s host name and port, and the set of topics and partitions stored on it. The consumer registry includes the consumer group to which a consumer belongs and the set of topics that it subscribes to. Each consumer group is associated with an ownership registry and an offset registry in Zookeeper. The ownership registry has one path for every subscribed partition and the path value is the id of the consumer currently consuming from this partition (we use the terminology that the consumer owns this partition). The offset registry stores for each subscribed partition, the offset of the last consumed message in the partition.
카프카는 주키퍼를 다음 과정에 따라 사용한다.
(1) 브로커, 소비자의 추가나 삭제를 감지한다.
(2) 위 사건이 일어나게 되는 경우 각 소비자에 대해 리벨런싱 과정이 발동된다.
(3) 소비 관계를 유지하고 각 파티션의 소비된 변위에 대해서 추적한다.
구체적으로, 각각의 브로커나 소비자가 시작할 때, 각자의 구성 정보를 주키퍼에 저장한다. 브로커 구성 정보는 브로커의 호스트 이름과 포트, 저장하고 있는 토픽 집합과 파티션등을 포함한다. 소비자 구성 정보는 소비자 그룹에 어떤 소비자들이 속해 있는지와 어떤 토픽을 구독하고 있는지를 포함한다. 각 소비자 그룹은 주키퍼의 소유권 구성 정보 및 변위 구성 정보와 연관이 있다. 소유권 구성 정보는 모든 구독 파티션에 대한 하나의 경로를 가지며 경로 값은 현재 이 파티션을 소비하고 있는 소비자의 식별자다. (우리는 이것을 소비자가 파티션을 점유했다는 용어를 사용한다.) 변위 구성 정보는 구독한 각 파티션에 대해서 가장 최근 소비한 메시지 변위 값을 저장한다.
The paths created in Zookeeper are ephemeral for the broker registry, the consumer registry and the ownership registry, and persistent for the offset registry. If a broker fails, all partitions on it are automatically removed from the broker registry. The failure of a consumer causes it to lose its entry in the consumer registry and all partitions that it owns in the ownership registry. Each consumer registers a Zookeeper watcher on both the broker registry and the consumer registry, and will be notified whenever a change in the broker set or the consumer group occurs.
During the initial startup of a consumer or when the consumer is notified about a broker/consumer change through the watcher, the consumer initiates a rebalance process to determine the new subset of partitions that it should consume from. The process is described in Algorithm 1. By reading the broker and the consumer registry from Zookeeper, the consumer first computes the set (PT) of partitions available for each subscribed topic T and the set (CT) of consumers subscribing to T. It then range-partitions PT into |CT| chunks and deterministically picks one chunk to own. For each partition the consumer picks, it writes itself as the new owner of the partition in the ownership registry. Finally, the consumer begins a thread to pull data from each owned partition, starting from the offset stored in the offset registry. As messages get pulled from a partition, the consumer periodically updates the latest consumed offset in the offset registry.
주키퍼에 생성된 경로들은 임시 브로커 구성 정보, 임시 소비자 구성 정보, 임시 소유권 구성 정보와 영구적인 변위 구성 정보를 가진다. 만약에 브로커가 고장날 경우, 고장 난 브로커에 있는 모든 파티션은 자동으로 브로커 구성 정보에서 삭제된다. 소비자가 고장날 경우 소비자 구성 정보의 자신의 정보와 소비자가 소유권 구성 정보에 가지고 있던 모든 파티션도 잃게 된다. 각각의 소비자는 주키퍼 감시자를 브로커 구성 정보와 소비자 구성 정보에 등록하고 브로커와 소비자 그룹의 변화가 있다면 연락을 받는다.
소비자가 초기 시작하는 경우나 소비자가 감시자로부터 감시자로부터 변화를 감지한 경우 소비자는 리벨런스 절차를 시작하는데 리벨런스 절차란 소비자가 소비할 새로운 파티션을 결정하는 것이다. 이러한 절차는 알고리즘 1에 서술했다. 브로커와 소비자 구성 정보를 주키퍼로부터 읽음으로서 소비자는 각 구독할 토픽에서 사용가능한 파티션과 토픽을 구독한 소비자를 계산한다. 소비자가 각 파티션을 고르고 파티션의 소유자라는 것을 소유자 구성 정보에 기록한다. 마지막으로 각 소비자는 소유하고 있는 파티션에 대해서 데이터를 풀링하는 쓰레드를 실행한다. 이때 변위 구성 정보에 저장된 변위로부터 시작한다. 소비자는 메시지를 파티션으로부터 풀링하고 가장 최근에 소비했던 변위에 대해 변위 구정 정보에 영구적으로 저장한다.
Algorithm 1: rebalance process for consumer Ci in group G
For each topic T that Ci subscribes to {
remove partitions owned by Ci from the ownership registry
read the broker and the consumer registries from Zookeeper
compute PT = partitions available in all brokers under topic T
compute CT = all consumers in G that subscribe to topic T
sort PT and CT
let j be the index position of Ci in CT and let N = |PT|/|CT|
assign partitions from j*N to (j+1)*N - 1 in PT to consumer Ci
for each assigned partition p {
set the owner of p to Ci in the ownership registry
let Op = the offset of partition p stored in the offset registry
invoke a thread to pull data in partition p from offset Op
}
}
알고리즘 1: 그룹 G에 있는 소비자 Ci에 대한 리벨런스 과정
Ci가 구독하고 있는 각 토픽에 대해서 {
Ci가 소유하고 있는 파티션을 소유권 등록 정보에서 제거한다.
브로커와 소비자 등록 정보를 주키퍼로부터 읽는다.
PT를 계산한다 = 동작하는 브로커내에 있는 토픽 T의 사용가능한 파티션
CT를 계산한다 = G에 있는 토픽 T를 구독한 모든 소비자
PT와 CT를 정렬한다.
j를 CT 내부의 Ci의 인덱스라고 가정하자. 또한 N은 |PT| / |CT|라고 가정한다.
PT에 있는 j * N ~ (j + 1) * N - 1 범위의 파티션을 Ci에 할당한다.
각 할당된 파티션에 대해서 {
소유권 등록 정보에서 p의 소유권을 Ci로 세팅한다.
Op를 파티션 p의 변위 등록 정보라고 하자.
파티션 p를 변위 Op에서부터 풀링하는 쓰레드를 실행한다.
}
}
When there are multiple consumers within a group, each of them will be notified of a broker or a consumer change. However, the notification may come at slightly different times at the consumers. So, it is possible that one consumer tries to take ownership of a partition still owned by another consumer. When this happens, the first consumer simply releases all the partitions that it currently owns, waits a bit and retries the rebalance process. In practice, the rebalance process often stabilizes after only a few retries.
여러 소비자가 그룹에 있을 때, 각각은 브로커나 컨슈머의 변화에 대해 알림을 받는다. 하지만 소비자들에게 알림은 미세하게 다른 시간에 올 수 있다. 이건 하나의 소비자가 다른 소비자의 파티션을 가져오게 만들 수 잇다. 이런 경우에 첫 번째 소비자가 단순하게 자신이 소유하고 있는 모든 파티션을 포기하고 잠시 기다리면 리벨런스를 다시 시도하게 된다. 실제로 리벨런스 과정은 이런 몇 번의 과정을 거치게 되면 안정화되게 된다.
When a new consumer group is created, no offsets are available in the offset registry. In this case, the consumers will begin with either the smallest or the largest offset (depending on a configuration) available on each subscribed partition, using an API that we provide on the brokers.
만약에 새로운 소비자 그룹이 만들어지게 된다면 변위 등록 정보에 아무런 변위 정보도 없게 된다. 이런 경우 설정에 따라 브로커의 API를 사용하여 구독한 파티션의 가장 크거나 가장 작은 변위를 소비하기 시작한다.
3.3 Delivery Guarantees (전달 보장)
In general, Kafka only guarantees at-least-once delivery. Exactly- once delivery typically requires two-phase commits and is not necessary for our applications. Most of the time, a message is delivered exactly once to each consumer group. However, in the case when a consumer process crashes without a clean shutdown, the consumer process that takes over those partitions owned by the failed consumer may get some duplicate messages that are after the last offset successfully committed to zookeeper. If an application cares about duplicates, it must add its own de-duplication logic, either using the offsets that we return to the consumer or some unique key within the message. This is usually a more cost-effective approach than using two-phase commits.
일반적으로 카프카는 적어도 한 번 전송만을 보장한다. 정확히 한 번 전송은 두 단계의 커밋이 필요한데 이건 우리의 애플리케이션에서 필요한 사안은 아니다. 대부분의 상황에서 메시지는 각 소비자 그룹에 정확히 한 번 전송된다. 하지만 예기치 못하게 소비자의 프로세스가 충돌이 났고 후처리가 안 된 경우, 실패한 소비자가 가지고 있던 파티션을 넘겨받은 소비자는 주키퍼에 성공적으로 커밋한 변위 이후의 메시지를 소비하게 되는데 이때 중복된 소비를 할 수 있게 된다. 중복을 피하려면 소비자에게 돌려주는 변위를 사용하거나 메시지에 식별자를 사용하는 방식을 추가해야 한다. 이게 두 단계 커밋보다 효율적인 접근 방법이다.
Kafka guarantees that messages from a single partition are delivered to a consumer in order. However, there is no guarantee on the ordering of messages coming from different partitions. To avoid log corruption, Kafka stores a CRC for each message in the log. If there is any I/O error on the broker, Kafka runs a recovery process to remove those messages with inconsistent CRCs. Having the CRC at the message level also allows us to check network errors after a message is produced or consumed.
카프카는 하나의 파티션의 메시지는 소비자에게 순차적으로 전송됨을 보장한다. 하지만 서로 다른 파티션에서 온 메시지들 간 순서는 보장하지 않는다. 로그 오염을 막기 위해서 카프카는 로그에 있는 각 메시지에 CRC를 저장한다. 만약에 브로커에 I/O 에러가 있으면 카프카는 복구 절차를 실행하며 CRC가 일관적이지 않은 메시지들을 삭제한다. 메시지 단계에서 CRC를 가지는 것은 메시지가 생산되거나 소비된 후에도 네트워크 오류를 확인할 수 있게 한다.
If a broker goes down, any message stored on it not yet consumed becomes unavailable. If the storage system on a broker is permanently damaged, any unconsumed message is lost forever. In the future, we plan to add built-in replication in Kafka to redundantly store each message on multiple brokers.
만약에 브로커가 종료된다면 브로커에 저장되었지만 소비되지 않은 메시지는 사용 불가능한 상태가 된다. 만약에 브로커의 저장 시스템이 영구적인 손상을 받게 되면 소비되지 않은 메시지는 영원히 유실되게 된다. 미래에는 각 메시지의 여유 복제분을 다중 브로커에 저장하는 것을 계획하고 있다.
4. Kafka Usage at LinkedIn (링크드인에서 카프카의 쓰임)
In this section, we describe how we use Kafka at LinkedIn. Figure 3 shows a simplified version of our deployment. We have one Kafka cluster co-located with each datacenter where our user- facing services run. The frontend services generate various kinds of log data and publish it to the local Kafka brokers in batches. We rely on a hardware load-balancer to distribute the publish requests to the set of Kafka brokers evenly. The online consumers of Kafka run in services within the same datacenter.
이번 장에서는 링크드인에서 카프카를 어떻게 사용했는지를 서술한다. 그림 3은 우리의 배포 상황이다. 우리는 서비스가 실행되고 있는 각 데이터 센터에 위치한 하나의 카프카 클러스터가 있었다. 프론트앤드 서비스는 다양한 종류의 로그 데이터를 생성했고 그 지역 카프카 브로커에 배치 방식으로 발행했다. 우리는 발행 요청에 대해서 하드웨어 로드벨런서에 의존했다. 하드웨어 로드벨런서는 브로커 집합에 골고루 요청을 분배했다. 카프카의 온라인 소비자도 동일한 데이터센터에서 운영했다.

We also deploy a cluster of Kafka in a separate datacenter for offline analysis, located geographically close to our Hadoop cluster and other data warehouse infrastructure. This instance of Kafka runs a set of embedded consumers to pull data from the Kafka instances in the live datacenters. We then run data load jobs to pull data from this replica cluster of Kafka into Hadoop and our data warehouse, where we run various reporting jobs and analytical process on the data. We also use this Kafka cluster for prototyping and have the ability to run simple scripts against the raw event streams for ad hoc querying. Without too much tuning, the end-to-end latency for the complete pipeline is about 10 seconds on average, good enough for our requirements.
우리는 오프라인 분석을 위해서 분리된 데이터센터에 카프카 클러스터를 배포했다. 지역적으로 하둡 클러스터, 그리고 다른 데이터 웨어하우스와 가까운 곳에 위치시켰다. 이 카프카는 온라인 데이터센터에서 데이터들을 가져오는 내장 소비자들을 동작시켰다. 우리는 카프카 복제본으로부터 데이터를 복제하여 하둡이나 데이터 웨어하우스로 끌어왔다. 그리고 데이터 웨어하우스에서 이 데이터에 대한 보고 작업이나 분석 작업을 실행시켰다. 우리는 이 카프카를 초기 모델에 사용할 수 있었고 원시적인 이벤트 스트림에 대한 ad-hoc 쿼리를 위해서 간단한 스크립트를 동작시킬 능력을 가지고 있었다. 많은 성능 개선 없이 모든 완성된 파이프라인이 평균 10초 내외의 지연 시간을 가지는데 이건 요구사항에 있어서 괜찮은 성능이었다.
Currently, Kafka accumulates hundreds of gigabytes of data and close to a billion messages per day, which we expect will grow significantly as we finish converting legacy systems to take advantage of Kafka. More types of messages will be added in the future. The rebalance process is able to automatically redirect the consumption when the operation staffs start or stop brokers for software or hardware maintenance.
현재, 카프카는 몇 백 기가바이트의 데이터와 십 억 개에 가까운 메시지를 매일 처리한다. 우리는 이것이 기존 시스템을 카프카로 변경시키는 것이 마무리 된다면 더 증가할 것이라고 예측한다. 더 많은 유형의 메시지가 미래에 추가될 것이다. 리벨런스 과정은 관리자가 소프트웨어나 하드웨어 유지보수를 위해 브로커를 실행시키거나 종료시켰을 때 소비를 자동으로 조정한다.
Our tracking also includes an auditing system to verify that there is no data loss along the whole pipeline. To facilitate that, each message carries the timestamp and the server name when they are generated. We instrument each producer such that it periodically generates a monitoring event, which records the number of messages published by that producer for each topic within a fixed time window. The producer publishes the monitoring events to Kafka in a separate topic. The consumers can then count the number of messages that they have received from a given topic and validate those counts with the monitoring events to validate the correctness of data.
우리의 추적은 감사 시스템이 전체 파이프라인에서 데이터의 유실이 없다는 것을 확인한 것을 포함한다. 이것을 가능하게 하기 위해서 각각의 메시지는 시간과 메시지를 생성한 서버 이름을 포함한다. 우리는 각 생성자가 주기적으로 모니터링 이벤트를 생성하게 했다. 이 모니터링 이벤트는 각 토픽에 대해 고정된 시간 구간 내에 생성자가 발행한 메시지의 수를 기록한다. 생성자는 별도의 토픽에 대해서 모니터링 이벤트를 발행한다. 소비자는 주어진 토픽에 대해서 메시지 수를 세어 모니터링 이벤트와 비교하는 것으로 데이터의 정합성에 대해 확인한다.
Loading into the Hadoop cluster is accomplished by implementing a special Kafka input format that allows MapReduce jobs to directly read data from Kafka. A MapReduce job loads the raw data and then groups and compresses it for efficient processing in the future. The stateless broker and client-side storage of message offsets again come into play here, allowing the MapReduce task management (which allows tasks to fail and be restarted) to handle the data load in a natural way without duplicating or losing messages in the event of a task restart. Both data and offsets are stored in HDFS only on the successful completion of the job.
하둡 클러스터에 불러들이는 것은 카프카로부터 맵리듀스 작업이 데이터를 바로 읽게해 주는 카프카 입력 양식을 구현하는 것으로 완성했다. 맵리듀스 작업은 원래의 데이터를 불러오고 모아서 압축시켜 미래에 효율적으로 다룰 수 있게 했다. 상태 없는 브로커와 클라이언트 측 메시지 변위 저장은 맵리듀스 작업의 관리를 가능하게 한다. 맵리듀스는 재시작이 있더라도 데이터의 중복이나 유실 없이 자유자재로 다룰 수 있다. HDFS에 저장된 데이터와 변위는 오직 성공한 작업만이 존재하게 된다.
We chose to use Avro [2] as our serialization protocol since it is efficient and supports schema evolution. For each message, we store the id of its Avro schema and the serialized bytes in the payload. This schema allows us to enforce a contract to ensure compatibility between data producers and consumers. We use a lightweight schema registry service to map the schema id to the actual schema. When a consumer gets a message, it looks up in the schema registry to retrieve the schema, which is used to decode the bytes into an object (this lookup need only be done once per schema, since the values are immutable).
우리는 에이브로 직렬화 프로트콜을 사용했다. 효율적이고 스키마 발전을 지원하기 때문이다. 각각의 메시지에 대해 우리는 에이브로 스키마의 식별자와 본문에는 직렬화된 바이트를 저장한다. 이 스키마는 우리에게 데이터 생산자와 소비자 간에 호환성 있는 계약을 지키게 만든다. 우리는 스키마의 식별자를 실제 스키마에 매핑할 수 있게하는 경량의 스키마 등록 서비스를 사용했다. 소비자가 메시지를 받으면 스키마 등록 정보를 확인하고 스키마를 획득한다. 이 스키마는 메시지를 오브젝트로 풀어내는데 사용한다. (하나의 스키마에 오직 한 번의 확인만 필요하다. 값이 불변하기 때문이다.)
5. Experimental Results (실험 결과)
We conducted an experimental study, comparing the performance of Kafka with Apache ActiveMQ v5.4 [1], a popular open-source implementation of JMS, and RabbitMQ v2.4 [16], a message system known for its performance. We used ActiveMQ’s default persistent message store KahaDB. Although not presented here, we also tested an alternative AMQ message store and found its performance very similar to that of KahaDB. Whenever possible, we tried to use comparable settings in all systems.
실험 연구를 위해서 카프카와 아파치 엑티브엠큐 5.4, 인기 있는 오픈소스인 자바 메시지 서비스 레빗엠큐의 성능을 비교한다. 엑티브엠큐의 기본 메시지 저장 시스템으로는 카하 데이터베이스를 사용한다. 여기서 나타내지는 않았지만 AMQ 메시지 저장소도 실험했다. 이건 카하 데이터베이스와 비슷한 성능을 가졌다. 모든 시스템에서 가능하면 비슷한 설정을 사용하려고 했다.
We ran our experiments on 2 Linux machines, each with 8 2GHz cores, 16GB of memory, 6 disks with RAID 10. The two machines are connected with a 1Gb network link. One of the machines was used as the broker and the other machine was used as the producer or the consumer.
우리는 2개의 리눅스 기기에서 실험했다. 각각은 8.2GHz 코어, 16GB 메모리, RAID 10을 사용하는 6개의 디스크를 가진다. 두 개의 기기는 1Gb의 네트워크 선과 연결된다. 하나의 기기는 브로커로 사용했고 다른 하나의 기기는 생산자와 소비자로 사용했다.

Producer Test: We configured the broker in all systems to asynchronously flush messages to its persistence store. For each system, we ran a single producer to publish a total of 10 million messages, each of 200 bytes. We configured the Kafka producer to send messages in batches of size 1 and 50. ActiveMQ and RabbitMQ don’t seem to have an easy way to batch messages and we assume that it used a batch size of 1. The results are shown in Figure 4. The x-axis represents the amount of data sent to the broker over time in MB, and the y-axis corresponds to the producer throughput in messages per second. On average, Kafka can publish messages at the rate of 50,000 and 400,000 messages per second for batch size of 1 and 50, respectively. These numbers are orders of magnitude higher than that of ActiveMQ, and at least 2 times higher than RabbitMQ.
생산자 실험: 우리는 모든 시스템에 있는 브로커가 비동기로 브로커의 영속 저장소에 메시지를 저장하도록 설정했다. 우리는 하나의 생산자가 총 10,000,000 개의 메시지를 생산하도록 했다. 각 메시지는 200 바이트를 가진다. 우리는 카프카 생산자가 메시지를 보내는데 있어서 각각 1의 배치 크기와 50의 배치 크기를 가지도록 했다. 엑티브앰큐와 레빗엠큐는 메시지를 배치로 보내는 쉬운 방법이 없기 때문에 배치 크기가 1이라고 가정한다. 결과는 그림 4와 같다. X축은 시간에 따른 브로커가 보낸 데이터 크기를 나타낸다. 그리고 Y축은 1초 당 메시지 처리량을 나타낸다. 카프카는 배치 사이즈가 1일 때 50,000 메시지, 배치 사이즈가 50일 때 400,000 메시지를 처리할 수 있다. 이 결과는 엑티브엠큐보다는 몇 배나 높은 수치이며 레빗엠큐보다는 적어도 2 배 이상 높은 수치이다.
There are a few reasons why Kafka performed much better. First, the Kafka producer currently doesn’t wait for acknowledgements from the broker and sends messages as faster as the broker can handle. This significantly increased the throughput of the publisher. With a batch size of 50, a single Kafka producer almost saturated the 1Gb link between the producer and the broker. This is a valid optimization for the log aggregation case, as data must be sent asynchronously to avoid introducing any latency into the live serving of traffic. We note that without acknowledging the producer, there is no guarantee that every published message is actually received by the broker. For many types of log data, it is desirable to trade durability for throughput, as long as the number of dropped messages is relatively small. However, we do plan to address the durability issue for more critical data in the future.
카프카의 성능이 더 좋은 몇 가지 이유가 있는데,
첫 번째로 카프카는 브로커의 승낙을 기다리지 않는다. 그리고 브로커가 다룰 수 있을 만큼 빠르게 메시지를 보낸다. 이것이 생산자의 처리률을 급격하게 상승시킨다. 배치 크기가 50인 경우 하나의 카프카 생산자가 생산자와 브로커 사이 1Gb 연결을 거의 포화시킨다. 이건 로그 집계의 경우에는 유효한 최적화이며 데이터는 반드시 비동기로 보내어져 트래픽이 만들어 내는 모든 지연을 피한다. 프로듀서의 승낙 절차가 없으면 발행한 모든 메시지가 브로커에 반드시 도달한다는 보장은 없다. 많은 유형의 로그 데이터는 처리를 위해서 완벽한 저장을 포기하는 경우가 더 효율적이다. 하지만 데이터가 중요하다면 저장을 보장해야 하는데 이 문제에 대해서는 해결할 계획이 있다.
Second, Kafka has a more efficient storage format. On average, each message had an overhead of 9 bytes in Kafka, versus 144 bytes in ActiveMQ. This means that ActiveMQ was using 70% more space than Kafka to store the same set of 10 million messages. One overhead in ActiveMQ came from the heavy message header, required by JMS. Another overhead was the cost of maintaining various indexing structures. We observed that one of the busiest threads in ActiveMQ spent most of its time accessing a B-Tree to maintain message metadata and state. Finally, batching greatly improved the throughput by amortizing the RPC overhead. In Kafka, a batch size of 50 messages improved the throughput by almost an order of magnitude.
두 번째로, 카프카는 효율적인 저장 형식을 가지고 있다. 평균적으로 각각의 메시지는 9바이트 정도의 비용을 가진다. 반면에 엑티브엠큐는 144 바이트의 비용을 가진다. 이 말은 엑티브엠큐가 10,000,000 개의 메시지를 카프카와 동일하게 저장한다고 했을 때 70%의 공간을 더 사용하는 것과 같다. 자바 메시지 서비스에 필요한 무거운 메시지 헤더가 엑티브앰큐의 첫 번째 비용이고 다른 비용은 다양한 인덱스 구조를 유지하기 위한 비용이다. 우리는 엑티브엠큐에서 가장 바쁜 쓰레드가 대부분의 시간을 메시지의 메타데이터와 상태를 유지하기 위해 비트리를 접근하는데 쓴다는 사실을 관찰했다.
마지막으로 배치는 RPC 비용을 줄이기 때문에 처리량을 향상시켰다. 카프카에서 배치 크기를 50으로 하는 것은 처리량을 거의 10 배 상승시켰다.

Consumer Test: In the second experiment, we tested the performance of the consumer. Again, for all systems, we used a single consumer to retrieve a total of 10 millions messages. We configured all systems so that each pull request should prefetch approximately the same amount data---up to 1000 messages or about 200KB. For both ActiveMQ and RabbitMQ, we set the consumer acknowledge mode to be automatic. Since all messages fit in memory, all systems were serving data from the page cache of the underlying file system or some in-memory buffers. The results are presented in Figure 5.
소비자 실험: 두 번째 실험에서는 소비자의 성능에 대해서 실험했다. 우리는 모든 시스템에서 하나의 소비자가 10,000,000 개의 메시지를 받도록 했다. 각 풀 요청은 거의 비슷한 크기의 데이터 양을 가져오도록 했다. 데이터 양은 1000 개의 메시지 또는 200KB의 데이터를 의미한다. 엑티브엠큐 그리고 래빗엠큐의 경우에는 소비자 승낙 모드를 자동으로 설정했다. 모든 메모리가 메시지에 비해 작기 때문에 모든 시스템은 파일 시스템의 페이지 캐시 또는 메모리 버퍼를 새용해서 데이터를 제공햇다. 그림 5는 결과를 나타낸다.
On average, Kafka consumed 22,000 messages per second, more than 4 times that of ActiveMQ and RabbitMQ. We can think of several reasons. First, since Kafka has a more efficient storage format, fewer bytes were transferred from the broker to the consumer in Kafka. Second, the broker in both ActiveMQ and RabbitMQ had to maintain the delivery state of every message. We observed that one of the ActiveMQ threads was busy writing KahaDB pages to disks during this test. In contrast, there were no disk write activities on the Kafka broker. Finally, by using the sendfile API, Kafka reduces the transmission overhead.
카프카는 22,000 개의 메시지를 초당 소비한다. 이건 엑티브엠큐와 레빗엠큐보다 4배나 더 많은 수치이다. 첫 번째로 카프카가 효율적인 저장 형식을 가지고 있기 때문에 약간의 바이트만 브로커에서 소비자로 이동한다. 두 번째로 엑티브엠큐와 레빗엠큐는 모든 메시지의 상태를 유지해야 한다. 실험을 하는 동안 엑티브엠큐의 쓰레드가 카하 데이터베이스의 페이지를 디스크에 쓰기 위해 바쁘다는 사실을 관찰했다. 이와 반대로 카프카 브로커에는 어떠한 디스크 쓰기 작업이 존재하지 않았다. 마지막으로 sendfile API를 사용함으로써 카프카는 데이터 이동 비용을 감소시켰다.
We close the section by noting that the purpose of the experiment is not to show that other messaging systems are inferior to Kafka. After all, both ActiveMQ and RabbitMQ have more features than Kafka. The main point is to illustrate the potential performance gain that can be achieved by a specialized system.
이 실험의 목적이 다른 메시지 시스템이 카프카보다 열등하다라는 것을 보여주기 위함을 나타내는 것이 아님을 밝힌다. 엑티브엠큐나 레빗엠큐 같은 경우 카프카보다 기능이 더 많다. 특수화된 시스템이 좋은 성능을 보일 수 있다는 것을 나타내는 것이 중점이다.
6. Conclusion and Future Works (결론 및 향후 연구)
We present a novel system called Kafka for processing huge volume of log data streams. Like a messaging system, Kafka employs a pull-based consumption model that allows an application to consume data at its own rate and rewind the consumption whenever needed. By focusing on log processing applications, Kafka achieves much higher throughput than conventional messaging systems. It also provides integrated distributed support and can scale out. We have been using Kafka successfully at LinkedIn for both offline and online applications.
우리는 대용량 로그 데이터 스트림 처리를 위해 새로운 시스템인 카프카를 선보였다. 메시지 시스템처럼, 카프카의 풀 기반 소비 모델은 애플리케이션이 각자의 속도로 데이터를 소비하고 언제든지 다시 재소비를 할 수 있게 한다. 로그 처리 에플리케이션을 중점으로 보자면 카프카는 기존의 메시지 시스템보다 훨씬 높은 성능을 가지고 있다. 또한 통합된 분산 지원과 수평확장이 가능하다. 카프카는 링크드인의 오프라인, 온라인 에플리케이션에 성공적으로 사용되고 있다.
There are a number of directions that we’d like to pursue in the future. First, we plan to add built-in replication of messages across multiple brokers to allow durability and data availability guarantees even in the case of unrecoverable machine failures. We’d like to support both asynchronous and synchronous replication models to allow some tradeoff between producer latency and the strength of the guarantees provided. An application can choose the right level of redundancy based on its requirement on durability, availability and throughput. Second, we want to add some stream processing capability in Kafka. After retrieving messages from Kafka, real time applications often perform similar operations such as window-based counting and joining each message with records in a secondary store or with messages in another stream. At the lowest level this is supported by semantically partitioning messages on the join key during publishing so that all messages sent with a particular key go to the same partition and hence arrive at a single consumer process. This provides the foundation for processing distributed streams across a cluster of consumer machines. On top of this we feel a library of helpful stream utilities, such as different windowing functions or join techniques will be beneficial to this kind of applications.
우리가 미래에 추구하는 몇 가지 방향이 있다. 첫 번째로 메시지의 내장 복제이다. 여러 개의 브로커에 메시지가 분산되어 기기가 고장나더라도 데이터 영속성과 가용성 보장을 가능하게 한다. 우리는 비동기, 동기 복제 모델 둘 다 지원해서 생산자의 지연과 보장의 정도에서의 상충 관계를 지원할 것이다. 그래서 영속성, 가용성, 처리량의 요구 사항에 따라 선택할 수 있게 할 것이다. 두 번재로 카프카에서의 스트림 처리 기능을 추가할 것이다. 카프카로부터 메시지를 받고 실시간 서비스는 시간 기반 개수 계산이나 보조 장치, 다른 스트림 메시지 간의 조인 연산등을 수행한다. 이 방법은 가장 낮은 단계에서 지원된다. 조인 식별자로 분할된 메시지가 하나의 특정 파티션에 가게 함으로써 하나의 소비자의 처리에 도착하게 한다. 이 방법은 소비자 클러스터가 분산 스트림 처리를 할 수 있게 한다. 이를 기반으로한 다양한 스트림 기능, 구간 함수, 조인 기술들이 애플리케이션을 더 유익하게 만들 것이다.
6. REFERENCES (참고 자료)
[1] http://activemq.apache.org/
[3] Cloudera’s Flume, https://github.com/cloudera/flume
[4] http://developer.yahoo.com/blogs/hadoop/posts/2010/06/enabling_hadoop_batch_processi_1/
[5] Efficient data transfer through zero copy: https://www.ibm.com/developerworks/linux/library/j-zerocopy/
[6] Facebook’s Scribe, http://www.facebook.com/note.php?note_id=32008268919
[7] IBM Websphere MQ: http://www- 01.ibm.com/software/integration/wmq/
[10] http://hadoop.apache.org/zookeeper/
[11] http://www.slideshare.net/cloudera/hw09-hadoop-based-data-mining-platform-for-the-telecom-industry
[12] http://www.slideshare.net/prasadc/hive-percona-2009
[13] https://issues.apache.org/jira/browse/ZOOKEEPER-775
[14] JAVA Message Service: http://download.oracle.com/javaee/1.3/jms/tutorial/1_3_1-%20fcs/doc/jms_tutorialTOC.html.
[15] Oracle Enterprise Messaging Service: http://www.oracle.com/technetwork/middleware/ias/index-093455.html
[17] TIBCO Enterprise Message Service: http://www.tibco.com/products/soa/messaging/
[18] Kafka, http://sna-projects.com/kafka/
느낀 점
논문을 정독하고 한 문장 한 문장 번역하는 일은 쉬운 일이 아니었다. 맥락은 이해하면서도 한글로 표현하는 것이 서툴러서 쓰고 지우고를 반복했다. 발견하지 못한, 여전히 매끄럽지 않고 틀린 부분도 많을 것이다. 몇 장 안 되는 논문도 이 정도인데 번역서를 만드는 작업은 더더욱 힘든 작업이겠구나 생각이 들었다. 새삼 번역서를 만드는 작가님들의 고초를 잠시나마 느껴볼 수 있었다.
기술의 발전은 빠르고 모든 것은 쉽게 바뀌지만 변화 속에서도 오래가는 원칙과 설계는 존재한다. 어떤 시스템이 오랫동안 사랑받는다면 견고하고 치밀하게 만들어진 시스템일 가능성이 높다. (2024년 기준) 논문을 읽고 나면 카프카가 10년이 넘은 기술이지만 왜 여전히 실시간 데이터 처리 시스템의 중심에 있는지를 알 수 있다. 논문은 카프카를 만들게 된 계기와 대용량 데이터 처리 시스템 디자인을 소개한다. 기존 데이터 처리 시스템은 무언가 하나씩 부족한 점이 있거나 대용량 처리에는 과한 기능으로 인해 처리 성능에 있어 문제가 있었다. 링크드인 개발자들은 필요 없는 기능은 없에고 필요한 기능은 더하는 방식으로 카프카라는 새로운 시스템을 만들었다. 카프카 기술 기저에 있는 메시지를 풀 한다는 개념이나 하나의 요청에 여러 개의 메시지를 담아보내는 개념은 그리 어려운 개념이 아니다. 간단하고 복잡하지 않은 개념들이 모여 복잡하지 않은 시스템을 만든다. 이에 시스템은 빠른 성능으로 보답한다. 진정으로 멋있는 시스템이다. 초창기 카프카 시스템을 구축하고 기여하는 작업을 한 링크드인 개발자들이 부러워졌다. 언젠가는 나도 이렇게 지구 저편 누군가에게 존경받는 시스템을 만들어야겠다는 생각이 들었다.
'study > paper' 카테고리의 다른 글
MapReduce: Simplified Data Processing on Large Clusters by Jeffrey Dean, Sanjay Ghemawat; (0) | 2024.05.30 |
---|