반응형
kafka는 무엇일까? 왜 쓸까? 어떻게 동작할까? 좀 딥하게 알아보자!
KAFKA
카프카는 분산형 스트리밍 플랫폼으로, 대용량의 실시간 데이터 스트림을 처리하는 데 특화된 오픈소스 솔루션이다.
카프카는 주로 데이터 파이프라인 구축, 이벤트 스트리밍, 실시간 데이터 피드 처리에 사용된다.
주요개념
- 토픽(Topic)
- 카프카에서 데이터가 저장되는 주제를 의미한다.
- 토픽은 데이터의 카테고리를 나타내며, 데이터 스트림은 하나 이상의 토픽에 분배 된다.
- 브로커(Broker)
- 카프카 클러스터를 구성하는 각 서버를 브로커 라고 한다.
- 각 브로커는 데이터를 저장하고 클라이언트 간 통신을 관리한다.
- 프로듀서(Producer)
- 데이터를 생산하고 카프카 토픽에 데이터를 전송하는 클라이언트이다.
- 여러 토픽으로 데이터를 보낼 수 있다.
- 컨슈머(Consumer)
- 토픽에서 데이터를 소비하는 클라이언트이다.
- 컨슈머 그룹을 구성하여 데이터를 병렬로 처리할 수 있다.
- 컨슈머 그룹(Consumer Group)
- 여러 컨슈머 인스턴스를 그룹으로 묶어 데이터를 병렬로 소비할 수 있는 단위 이다.
- 각 그룹은 토픽의 파티션을 나누어 데이터를 병렬로 처리 한다.
내부 동작
메시지 흐름
데이터 작성(프로듀서 -> 브로커)
- 프로듀서가 데이터 전송
- 프로듀서는 ProducerRecord 객체를 생성하여 토픽과 메시지를 지정한다.
- 메시지는 파티션 키에 따라 특정 파티션으로 라우팅 된다.
- 파티션을 선택하는 방식:
- 파티션 키가 있는 경우: 지정된 파티션에 저장.
- 키가 없는 경우: 라운드 로빈 방식으로 파티션을 선택.
- 데이터 압축
- 프로듀서는 데이터를 전송하기 전에 GZIP, Snappy, LZ4와 같은 압축 알고리즘을 사용해 메시지를 압축한다.
- 설정: compression.type
- 브로커로 전송
- 프로듀서는 acks 설정에 따라 브로커로부터 확인(acknowledgement)을 기다린다.
- ack 설정
- acks=0: 확인 요청 없음(최저 신뢰성, 최고 성능).
- acks=1: 리더 브로커만 데이터 확인.
- acks=all: 모든 복제 브로커가 데이터 확인(최고 신뢰성).
데이터 저장 (브로커 내부)
- 파티션과 세그먼트 구조
- 토픽은 여러 파티션으로 분할된다.
- 각 파티션은 세그먼트 파일(일정한 크기의 로그 파일)로 저장된다.
- 세그먼트는 로그 회전을 통해 관리된다.
- 설정
- log.segment.bytes: 세그먼트 파일의 최대 크기.
- log.retention.bytes / log.retention.hours: 데이터 보관 기간.
- 리더와 팔로워
- 각 파티션에는 리더(Leader)와 여러 팔로워(Follower)가 있다.
- 리더는 클라이언트 요청(읽기/쓰기)을 처리하며, 팔로워는 리더의 데이터를 복제 한다.
- 복제
- 카프카 데이터를 복제하여 장애 내성을 보장한다.
- 복제는 리더에서 팔로워로 데이터를 동기화하는 방식으로 동작한다.
- min.insync.replicas: 복제 중 데이터 동기화가 필요한 최소 브로커 수를 정의.
데이터 소비 (브로커 -> 컨슈머)
- 오프셋 관리
- 카프카는 각 컨슈머 그룹별로 오프셋을 관리한다.
- 컨슈머는 메시지를 읽을 때 오프셋을 수동 또는 자동으로 커밋할 수 있다.
- 설정:
- enable.auto.commit: 자동 커밋 여부.
- auto.commit.interval.ms: 자동 커밋 주기.
- 데이터 읽기
- 컨슈머는 브로커의 리더에서 데이터를 가져온다.
- 데이터를 가져오는 방식은 폴링(polling)이다.
- 컨슈머는 데이터를 병렬 처리하기 위해 컨슈머 그룹을 사용한다.
- 리밸런스
- 컨슈머 그룹 내에서 파티션 할당이 변경되면 리밸런싱이 발생한다.
- 리밸런스는 컨슈머 그룹 내에서 파티션의 소유권을 재분배하는 과정이다.
- 리밸러스는 지연을 유발할 수 있으므로 최대한 최소화해야 한다.
구성 요소(Zookeeper)
- Zookeeper
- 카프카는 브로커와 클러스터 메타데이터를 관리하기 위해 Zookeeper를 사용한다.
- Zookeeper의 역할
- 브로커 등록 및 클러스터 메타 데이터 관리.
- 리더 선출.
- 컨슈머 그룹 오프셋 관리.
- Kafka 2.8 이후: Zookeeper 없이 작동 가능한 새로운 KRaft 아키텍쳐 도입
- Replication Protocol
- ISR(In-Sync Replicas) 목록:
- 리더와 동기화된 팔로워들의 목록 이다.
- acks=all 설정 시, ISR 내 모든 복제의 확인이 완료되어야 메시지가 확인된다.
- ISR(In-Sync Replicas) 목록:
장애 처리
- 브로커 장애
- 리더 브로커가 장애가 발생하면, ISR 목록에서 새로운 리더를 선출한다.
- 컨슈머 장애
- 컨슈머가 장애가 발생하면 리밸런싱을 통해 남은 컨슈머가 파티션을 할당 받는다.
- 데이터 손실 방지
- acks=all 및 min.insync.replicas 설정을 통해 신뢰성을 높일 수 있다.
Kafka Streams
Kafka Streams는 카프카에 내장된 스트리밍 데이터 처리 라이브러리로, 카프카의 데이터를 실시간으로 처리하고 변환할 수 있다.
주요 기능
- 상태 저장(Stateful Processing): 데이터의 상태를 유지하면서 처리가 가능하다.
- 윈도우 처리(Windowing): 시간 기반으로 데이터를 그룹화하여 처리.
- 실시간 데이터 변환 및 집계 (map, filter, aggregate 등).
Kafka Streams를 사용하는 이유
Kafka Streams는 데이터 스트림 처리를 위한 라이브러리로, 단순한 메시지 전달을 넘어 복잡한 데이터 처리와 상태 관리를 쉽게 수행할 수 있게 해준다.
데이터 변환 및 복잡한 처리 로직
- Kafka Streams는 데이터를 실시간으로 변환, 필터링, 집계, 조인 등의 작업을 손쉽게 수행하도록 설계되어있다.
- 예: 로그 데이터에서 특정 키워드가 포함된 로그만 필터링한 뒤 실시간으로 통계를 집계하거나, 두 개의 서로 다른 토픽의 데이터를 조합하는 작업이 가능하다.
KStream<String, String> source = builder.stream("input-topic");
KStream<String, String> filtered = source.filter((key, value) -> value.contains("ERROR"));
filtered.to("error-logs-topic");
상태 저장 및 관리 (Stateful Processing)
- Kafka Streams는 데이터를 처리하는 동안 상태를 관리할 수 있도록 설계되었다.
- 예: 지난 5분 동안의 데이터에서 최대값이나 평균값을 계산하는 작업.
- 내부적으로 RocksDB 같은 내장 저장소를 활용하여 상태를 관리하며, 장애가 발생해도 상태를 복구할 수 있다.
KTable<String, Long> counts = source.groupByKey()
.count(Materialized.as("counts-store"));
처리 보장 (Exactly Once Processing)
- Kafka Streams는 Exactly Once Processing (정확히 한 번 처리)을 기본으로 지원한다.
- 단순한 Producer-Consumer 조합으로는 트랜잭션 처리를 구현해야 하지만, Kafka Streams는 이를 기본적으로 처리하므로 개발 복잡도가 낮아진다.
시간 기반 데이터 처리
- Kafka Streams는 데이터 처리 시 시간(event-time 또는 processing-time)을 기준으로 데이터를 처리하는 기능을 제공 한다.
- 예: 이벤트가 발생한 시간별로 집계를 수행하거나, 특정 시간 범위에서 데이터를 윈도우로 묶어 처리할 수 있다.
KTable<Windowed<String>, Long> windowedCounts = source.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
SQL 스타일 쿼리 가능 (ksqlDB 연계)
- Kafka Streams는 ksqlDB와 연계되어 스트림 데이터를 SQL처럼 쿼리할 수 있다.
- 예: 특정 조건에 맞는 데이터를 실시간으로 필터링하고 집계 결과를 토픽에 저장.
CREATE STREAM errors AS
SELECT * FROM logs WHERE level = 'ERROR';
Producer-Consumer로만 구현할 때의 한계점
- 데이터 처리가 복잡해짐: 복잡한 데이터 변환, 집계, 필터링 등을 처리하려면 많은 코드를 작성해야 한다.
- 상태 관리 어려움: 상태를 직접 관리하려면 외부 DB(Redis, PostgreSQL 등)를 사용하거나 별도 구현이 필요 한다.
- Exactly Once 보장 어려움: Producer와 Consumer만 사용하면 정확히 한 번 처리하려면 트랜잭션을 직접 관리해야 한다.
- 시간 기반 처리: 이벤트의 시간별로 데이터를 처리하려면 별도의 구현이 필요하다.
- 중복 코드 증가: 비슷한 로직을 처리하려면 재사용 가능한 코드를 직접 작성하고 관리해야 한다.
컨슈머에서 로그를 가져오는 것에 대한 추가적인 로직에는 뭐가 있을까?
- 데이터 처리 로직
- 데이터를 단순히 가져오는 것을 넘어서, 비즈니스 로직을 추가적으로 처리해야 하는 경우 이다.
- 장애 처리 로직(Error Handling)
- 데이터 유효성 검사
- 메시지가 손상되었거나 특정 조건을 만족하지 못하는 경우 이를 무시하거나 별도로 기록.
- 예: JSON 포맷이 깨진 로그가 들어왔을 때 처리 방법을 정의
- 재시도 로직
- 네트워크 이슈나 처리 중 오류 발생 시 재시도 메커니즘을 구현
- Dead Letter Queue(DLQ)를 설정하여 처리 실패한 메시지를 별도 저장소에 보관.
- 데이터 유효성 검사
- 상태 관리(Stateful Processing)
- 컨슈머가 처리 중인 데이터의 상태를 추적해야 할 수 도 있다.
- 예: 최근 10분간의 로그를 기반으로 에러율을 계산.
- Kafka Streams의 상태 저장소(State Store)나 외부 Redis 같은 캐시를 활용.
- 컨슈머가 처리 중인 데이터의 상태를 추적해야 할 수 도 있다.
- 병렬 처리(Parallel Processing)
- 대규모 데이터를 빠르게 처리하려면 메시지 단위로 병렬 처리가 필요할 수 있다.
- 스레드 풀이나 비동기 작업으로 처리 속도를 높임.
- 주의: 메시지 순서 보장이 필요한 경우 파티션 단위로 처리를 제한해야 함.
- 대규모 데이터를 빠르게 처리하려면 메시지 단위로 병렬 처리가 필요할 수 있다.
- 데이터 이동 및 저장
- 컨슈머가 데이터를 다른 시스템으로 전송하거나 저장하는 역할을 수행할 수도 있다.
- Elasticsearch, RDBMS 등에 데이터를 저장.
- 예: 로그 데이터를 분석용 클러스터로 보내거나 저장소에 적재.
- 컨슈머가 데이터를 다른 시스템으로 전송하거나 저장하는 역할을 수행할 수도 있다.
Kafka 디렉터리에는 뭐가 있을까? (docker에서 사용해보았다)
/opt/kafka_2.13-2.8.1/logs
- controller.log: Kafka 클러스터의 컨트롤러 노드에서 발생한 이벤트를 기록한다. 컨트롤러는 Kafka 클러스터 내에서 파티션 리더를 선출하거나, 브로커 상태를 관리하는 역할을 한다.
- 파티션 리더 선출 이벤트, 장애 복구 시 파티션 할당 변경, 브로커 추가/제거에 따른 파티션 재배치
- kafka-authorizer.log: Kafka에서 ACL(Access Control List)과 관련된 인증/권한 부여 이벤트를 기록한다.
- 권한 검증 결과, 인증 실패 또는 성공 이벤트, 보안 관련 이슈 디버깅 시 사용
- kafka-request.log: Kafka 브로커가 처리한 클라이언트 요청(프로듀서, 컨슈머, 관리자 요청 등)을 기록한다.
- 프로듀서나 컨슈머로부터 들어온 요청의 처리 시간과 결과, 브로커 간의 네트워크 통신 정보.
- kafkaServer-gc.log: JVM의 가비지 컬렉션(Garbage Collection) 관련 로그.
- GC가 실행된 시간과 실행 소요 시간, 메모리 사용량 변화
- 활용: JVM 튜닝이나 메모리 누수 문제를 디버깅할 때 유용.
- log-cleaner.log: Kafka의 로그 세그먼트 압축과 관련된 정보를 기록한다. Kafka는 토픽이 압축 모드로 설정된 경우 불필요한 메시지를 제거하고 로그를 정리 한다.
- 로그 압축 작업 시작/종료, 삭제된 메시지 개수, 압축 작업 중 발생한 에러.
- server.log: Kafka 브로커의 일반적인 동작과 관련된 가장 중요한 로그 이다.
- 브로커가 시작/중지될 때의 상태, 클라이언트 연결 요청 및 처리, 데이터 처리 관련 이벤트, 장애 또는 에러 정보
- 활용: Kafka 브로커의 전반적인 상태를 확인하고 디버깅할 때 가장 많이 참고되는 로그
- state-change.log: Kafka 브로커 내에서의 상태 변경 이벤트를 기록한다.
- 파티션 리더 변경, ISR(In-Sync Replica) 집합 변경, 브로커의 상태 변화
- 활용: 클러스터 안정성 문제를 추적하거나 파티션 리더 선출 과정을 확인할 때 유용.
root@7c6e85f3354f:/opt/kafka_2.13-2.8.1/logs# ls -al
total 1124
drwxr-xr-x 2 root root 4096 Jan 24 08:01 .
drwxr-xr-x 1 root root 4096 Mar 7 2024 ..
-rw-r--r-- 1 root root 2228 Jan 24 08:16 controller.log
-rw-r--r-- 1 root root 0 Mar 7 2024 kafka-authorizer.log
-rw-r--r-- 1 root root 0 Mar 7 2024 kafka-request.log
-rw-r--r-- 1 root root 10274 Jan 24 08:10 kafkaServer-gc.log
-rw-r--r-- 1 root root 172 Jan 24 07:56 log-cleaner.log
-rw-r--r-- 1 root root 103808 Jan 24 07:56 server.log
-rw-r--r-- 1 root root 14664 Jan 24 07:56 state-change.log
/opt/kafka_2.13-2.8.1/config
server.properties: Kafka 브로커(클러스터의 노드)를 설정하는 파일
- broker.id: 브로커의 고유한 ID (정수값), 클러스터 내에서 브로커를 구분하는 데 사용.
- log.dirs: 메시지가 저장되는 디스크 경로, 여러 디렉토리를 쉼표로 구분하여 지정 가능.
- listeners: 브로커가 클라이언트 요청을 받을 주소와 포트
- advertised.listeners: 클라이언트가 연결할 때 사용할 브로커 주소, 클러스터 외부에서 접근할 때 중요.
- num.partitions: 새로 생성되는 토픽의 기본 파티션 수.
- log.retention.hours: 메시지가 유지되는 시간(기본 168시간 = 7일)
- log.retention.bytes: 로그 파일의 최대 크기(byte)
- default.replication.factor: 새로 생성되는 토픽의 기본 복제본 수
- min.insync.replicas: 메시지를 쓰기 위해 살아 있어야 하는 최소 복제본 수.
- log.segment.bytes: 로그 세그먼트 파일의 최대 크기(byte)
- message.max.bytes: 브로커가 허용하는 최대 메시지 크기(byte)
############################# Server Basics #############################
broker.id=-1
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/kafka/kafka-logs-7c6e85f3354f
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=zookeeper:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
port=9092
advertised.host.name=127.0.0.1
producer.properties
- bootstrap.servers: Kafka 브로커의 주소, 프로듀서가 메시지를 보낼 브로커
- key.serializer: 메시지 키를 직렬화 하는 클래
- 예: key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer: 메시지 값을 직렬화 하는 클래스
- 예: value.serializer=org.apache.kafka.common.serialization.StringSerializer
- asks: 브로커로부터 전송 성공 여부를 확인하는 수준.
- retries: 메시지 전송 실패 시 재시도 횟수.
- delivery.timeout.ms: 메시지 전송 완료까지 대기하는 최대 시간.
- batch.size: 한번에 전송할 메시지의 최대 크기(byte)
- linger.ms: 배치를 생성하기 위해 기다리는 시간(ms)
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=
zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
/opt/kafka_2.13-2.8.1/bin
root@7c6e85f3354f:/opt/kafka_2.13-2.8.1/bin# cat kafka-
kafka-acls.sh kafka-consumer-groups.sh kafka-leader-election.sh kafka-reassign-partitions.sh kafka-streams-application-reset.sh
kafka-broker-api-versions.sh kafka-consumer-perf-test.sh kafka-log-dirs.sh kafka-replica-verification.sh kafka-topics.sh
kafka-cluster.sh kafka-delegation-tokens.sh kafka-metadata-shell.sh kafka-run-class.sh kafka-verifiable-consumer.sh
kafka-configs.sh kafka-delete-records.sh kafka-mirror-maker.sh kafka-server-start.sh kafka-verifiable-producer.sh
kafka-console-consumer.sh kafka-dump-log.sh kafka-preferred-replica-election.sh kafka-server-stop.sh
kafka-console-producer.sh kafka-features.sh kafka-producer-perf-test.sh kafka-storage.sh
아~ 진작에 공부할껄~
** 그냥 하루하루 개인 공부한 것을 끄적 거리는 공간입니다.
이곳 저곳에서 구글링한 것과 강의 들은 내용이 정리가 되었습니다.
그림들은 그림밑에 출처표시를 해놓았습니다.
문제가 될시 말씀해주시면 해당 부분은 삭제 하도록하겠습니다. **
반응형
'public void static main > Etc' 카테고리의 다른 글
비동기와 논블로킹 (0) | 2025.01.23 |
---|---|
[HTTP] Stateless/Stateful 뭘 사용해야 할까? (0) | 2025.01.07 |
[RAID] 레이드란 무엇인가! (0) | 2022.12.19 |
[WEB] 동기 & 비동기 (0) | 2022.10.14 |
[OS] 프로그램과 프로세스 (0) | 2022.02.21 |
댓글