본문 바로가기
public void static main/Etc

[Kafka] 카프카를 공부해보자

by 햄리뮤 2025. 1. 24.
반응형

kafka는 무엇일까? 왜 쓸까? 어떻게 동작할까? 좀 딥하게 알아보자!

KAFKA

카프카는 분산형 스트리밍 플랫폼으로, 대용량의 실시간 데이터 스트림을 처리하는 데 특화된 오픈소스 솔루션이다.

카프카는 주로 데이터 파이프라인 구축, 이벤트 스트리밍, 실시간 데이터 피드 처리에 사용된다.

주요개념

  1. 토픽(Topic)
    • 카프카에서 데이터가 저장되는 주제를 의미한다.
    • 토픽은 데이터의 카테고리를 나타내며, 데이터 스트림은 하나 이상의 토픽에 분배 된다.
  2. 브로커(Broker)
    • 카프카 클러스터를 구성하는 각 서버를 브로커 라고 한다.
    • 각 브로커는 데이터를 저장하고 클라이언트 간 통신을 관리한다.
  3. 프로듀서(Producer)
    • 데이터를 생산하고 카프카 토픽에 데이터를 전송하는 클라이언트이다.
    • 여러 토픽으로 데이터를 보낼 수 있다.
  4. 컨슈머(Consumer)
    • 토픽에서 데이터를 소비하는 클라이언트이다.
    • 컨슈머 그룹을 구성하여 데이터를 병렬로 처리할 수 있다.
  5. 컨슈머 그룹(Consumer Group)
    • 여러 컨슈머 인스턴스를 그룹으로 묶어 데이터를 병렬로 소비할 수 있는 단위 이다.
    • 각 그룹은 토픽의 파티션을 나누어 데이터를 병렬로 처리 한다.

내부 동작

https://medium.com/@cobch7/kafka-architecture-43333849e0f4

메시지 흐름

데이터 작성(프로듀서 -> 브로커)

  1. 프로듀서가 데이터 전송
    • 프로듀서는 ProducerRecord 객체를 생성하여 토픽과 메시지를 지정한다.
    • 메시지는 파티션 키에 따라 특정 파티션으로 라우팅 된다.
    • 파티션을 선택하는 방식:
      • 파티션 키가 있는 경우: 지정된 파티션에 저장.
      • 키가 없는 경우: 라운드 로빈 방식으로 파티션을 선택.
  2. 데이터 압축
    • 프로듀서는 데이터를 전송하기 전에 GZIP, Snappy, LZ4와 같은 압축 알고리즘을 사용해 메시지를 압축한다.
    • 설정: compression.type
  3. 브로커로 전송
    • 프로듀서는 acks 설정에 따라 브로커로부터 확인(acknowledgement)을 기다린다.
    • ack 설정
      • acks=0: 확인 요청 없음(최저 신뢰성, 최고 성능).
      • acks=1: 리더 브로커만 데이터 확인.
      • acks=all: 모든 복제 브로커가 데이터 확인(최고 신뢰성).

데이터 저장 (브로커 내부)

  1. 파티션과 세그먼트 구조
    • 토픽은 여러 파티션으로 분할된다.
    • 각 파티션은 세그먼트 파일(일정한 크기의 로그 파일)로 저장된다.
    • 세그먼트는 로그 회전을 통해 관리된다.
    • 설정
      • log.segment.bytes: 세그먼트 파일의 최대 크기.
      • log.retention.bytes / log.retention.hours: 데이터 보관 기간.
  2. 리더와 팔로워
    • 각 파티션에는 리더(Leader)와 여러 팔로워(Follower)가 있다.
    • 리더는 클라이언트 요청(읽기/쓰기)을 처리하며, 팔로워는 리더의 데이터를 복제 한다.
  3. 복제
    • 카프카 데이터를 복제하여 장애 내성을 보장한다.
    • 복제는 리더에서 팔로워로 데이터를 동기화하는 방식으로 동작한다.
    • min.insync.replicas: 복제 중 데이터 동기화가 필요한 최소 브로커 수를 정의.

데이터 소비 (브로커 -> 컨슈머)

  1. 오프셋 관리
    • 카프카는 각 컨슈머 그룹별로 오프셋을 관리한다.
    • 컨슈머는 메시지를 읽을 때 오프셋을 수동 또는 자동으로 커밋할 수 있다.
    • 설정:
      • enable.auto.commit: 자동 커밋 여부.
      • auto.commit.interval.ms: 자동 커밋 주기.
  2. 데이터 읽기
    • 컨슈머는 브로커의 리더에서 데이터를 가져온다.
    • 데이터를 가져오는 방식은 폴링(polling)이다.
    • 컨슈머는 데이터를 병렬 처리하기 위해 컨슈머 그룹을 사용한다.
  3. 리밸런스
    • 컨슈머 그룹 내에서 파티션 할당이 변경되면 리밸런싱이 발생한다.
    • 리밸런스는 컨슈머 그룹 내에서 파티션의 소유권을 재분배하는 과정이다.
    • 리밸러스는 지연을 유발할 수 있으므로 최대한 최소화해야 한다.

구성 요소(Zookeeper)

  1. Zookeeper
    • 카프카는 브로커와 클러스터 메타데이터를 관리하기 위해 Zookeeper를 사용한다.
    • Zookeeper의 역할
      • 브로커 등록 및 클러스터 메타 데이터 관리.
      • 리더 선출.
      • 컨슈머 그룹 오프셋 관리.
    • Kafka 2.8 이후: Zookeeper 없이 작동 가능한 새로운 KRaft 아키텍쳐 도입
  2. Replication Protocol
    • ISR(In-Sync Replicas) 목록:
      • 리더와 동기화된 팔로워들의 목록 이다.
      • acks=all 설정 시, ISR 내 모든 복제의 확인이 완료되어야 메시지가 확인된다.

장애 처리

  1. 브로커 장애
    • 리더 브로커가 장애가 발생하면, ISR 목록에서 새로운 리더를 선출한다.
  2. 컨슈머 장애
    • 컨슈머가 장애가 발생하면 리밸런싱을 통해 남은 컨슈머가 파티션을 할당 받는다.
  3. 데이터 손실 방지
    • acks=all 및 min.insync.replicas 설정을 통해 신뢰성을 높일 수 있다.

Kafka Streams

Kafka Streams는 카프카에 내장된 스트리밍 데이터 처리 라이브러리로, 카프카의 데이터를 실시간으로 처리하고 변환할 수 있다.

주요 기능

  • 상태 저장(Stateful Processing): 데이터의 상태를 유지하면서 처리가 가능하다.
  • 윈도우 처리(Windowing): 시간 기반으로 데이터를 그룹화하여 처리.
  • 실시간 데이터 변환 및 집계 (map, filter, aggregate 등).

Kafka Streams를 사용하는 이유

Kafka Streams는 데이터 스트림 처리를 위한 라이브러리로, 단순한 메시지 전달을 넘어 복잡한 데이터 처리와 상태 관리를 쉽게 수행할 수 있게 해준다.

데이터 변환 및 복잡한 처리 로직

  • Kafka Streams는 데이터를 실시간으로 변환, 필터링, 집계, 조인 등의 작업을 손쉽게 수행하도록 설계되어있다.
  • 예: 로그 데이터에서 특정 키워드가 포함된 로그만 필터링한 뒤 실시간으로 통계를 집계하거나, 두 개의 서로 다른 토픽의 데이터를 조합하는 작업이 가능하다.
KStream<String, String> source = builder.stream("input-topic");
KStream<String, String> filtered = source.filter((key, value) -> value.contains("ERROR"));
filtered.to("error-logs-topic");

상태 저장 및 관리 (Stateful Processing)

  • Kafka Streams는 데이터를 처리하는 동안 상태를 관리할 수 있도록 설계되었다.
  • 예: 지난 5분 동안의 데이터에서 최대값이나 평균값을 계산하는 작업.
  • 내부적으로 RocksDB 같은 내장 저장소를 활용하여 상태를 관리하며, 장애가 발생해도 상태를 복구할 수 있다.
KTable<String, Long> counts = source.groupByKey()
    .count(Materialized.as("counts-store"));

처리 보장 (Exactly Once Processing)

  • Kafka Streams는 Exactly Once Processing (정확히 한 번 처리)을 기본으로 지원한다.
  • 단순한 Producer-Consumer 조합으로는 트랜잭션 처리를 구현해야 하지만, Kafka Streams는 이를 기본적으로 처리하므로 개발 복잡도가 낮아진다.

시간 기반 데이터 처리

  • Kafka Streams는 데이터 처리 시 시간(event-time 또는 processing-time)을 기준으로 데이터를 처리하는 기능을 제공 한다.
  • 예: 이벤트가 발생한 시간별로 집계를 수행하거나, 특정 시간 범위에서 데이터를 윈도우로 묶어 처리할 수 있다.
KTable<Windowed<String>, Long> windowedCounts = source.groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
    .count();

SQL 스타일 쿼리 가능 (ksqlDB 연계)

  • Kafka Streams는 ksqlDB와 연계되어 스트림 데이터를 SQL처럼 쿼리할 수 있다.
  • 예: 특정 조건에 맞는 데이터를 실시간으로 필터링하고 집계 결과를 토픽에 저장.
CREATE STREAM errors AS 
SELECT * FROM logs WHERE level = 'ERROR';

Producer-Consumer로만 구현할 때의 한계점

  • 데이터 처리가 복잡해짐: 복잡한 데이터 변환, 집계, 필터링 등을 처리하려면 많은 코드를 작성해야 한다.
  • 상태 관리 어려움: 상태를 직접 관리하려면 외부 DB(Redis, PostgreSQL 등)를 사용하거나 별도 구현이 필요 한다.
  • Exactly Once 보장 어려움: Producer와 Consumer만 사용하면 정확히 한 번 처리하려면 트랜잭션을 직접 관리해야 한다.
  • 시간 기반 처리: 이벤트의 시간별로 데이터를 처리하려면 별도의 구현이 필요하다.
  • 중복 코드 증가: 비슷한 로직을 처리하려면 재사용 가능한 코드를 직접 작성하고 관리해야 한다.

컨슈머에서 로그를 가져오는 것에 대한 추가적인 로직에는 뭐가 있을까?

  1. 데이터 처리 로직
    • 데이터를 단순히 가져오는 것을 넘어서, 비즈니스 로직을 추가적으로 처리해야 하는 경우 이다.
  2. 장애 처리 로직(Error Handling)
    • 데이터 유효성 검사
      • 메시지가 손상되었거나 특정 조건을 만족하지 못하는 경우 이를 무시하거나 별도로 기록.
      • 예: JSON 포맷이 깨진 로그가 들어왔을 때 처리 방법을 정의
    • 재시도 로직
      • 네트워크 이슈나 처리 중 오류 발생 시 재시도 메커니즘을 구현
      • Dead Letter Queue(DLQ)를 설정하여 처리 실패한 메시지를 별도 저장소에 보관.
  3. 상태 관리(Stateful Processing)
    • 컨슈머가 처리 중인 데이터의 상태를 추적해야 할 수 도 있다.
      • 예: 최근 10분간의 로그를 기반으로 에러율을 계산.
      • Kafka Streams의 상태 저장소(State Store)나 외부 Redis 같은 캐시를 활용.
  4. 병렬 처리(Parallel Processing)
    • 대규모 데이터를 빠르게 처리하려면 메시지 단위로 병렬 처리가 필요할 수 있다.
      • 스레드 풀이나 비동기 작업으로 처리 속도를 높임.
      • 주의: 메시지 순서 보장이 필요한 경우 파티션 단위로 처리를 제한해야 함.
  5. 데이터 이동 및 저장
    • 컨슈머가 데이터를 다른 시스템으로 전송하거나 저장하는 역할을 수행할 수도 있다.
      • Elasticsearch, RDBMS 등에 데이터를 저장.
      • 예: 로그 데이터를 분석용 클러스터로 보내거나 저장소에 적재.

Kafka 디렉터리에는 뭐가 있을까? (docker에서 사용해보았다)

/opt/kafka_2.13-2.8.1/logs

  • controller.log: Kafka 클러스터의 컨트롤러 노드에서 발생한 이벤트를 기록한다. 컨트롤러는 Kafka 클러스터 내에서 파티션 리더를 선출하거나, 브로커 상태를 관리하는 역할을 한다.
    • 파티션 리더 선출 이벤트, 장애 복구 시 파티션 할당 변경, 브로커 추가/제거에 따른 파티션 재배치
  • kafka-authorizer.log: Kafka에서 ACL(Access Control List)과 관련된 인증/권한 부여 이벤트를 기록한다.
    • 권한 검증 결과, 인증 실패 또는 성공 이벤트, 보안 관련 이슈 디버깅 시 사용
  • kafka-request.log: Kafka 브로커가 처리한 클라이언트 요청(프로듀서, 컨슈머, 관리자 요청 등)을 기록한다.
    • 프로듀서나 컨슈머로부터 들어온 요청의 처리 시간과 결과, 브로커 간의 네트워크 통신 정보.
  • kafkaServer-gc.log: JVM의 가비지 컬렉션(Garbage Collection) 관련 로그.
    • GC가 실행된 시간과 실행 소요 시간, 메모리 사용량 변화
    • 활용: JVM 튜닝이나 메모리 누수 문제를 디버깅할 때 유용.
  • log-cleaner.log: Kafka의 로그 세그먼트 압축과 관련된 정보를 기록한다. Kafka는 토픽이 압축 모드로 설정된 경우 불필요한 메시지를 제거하고 로그를 정리 한다.
    • 로그 압축 작업 시작/종료, 삭제된 메시지 개수, 압축 작업 중 발생한 에러.
  • server.log: Kafka 브로커의 일반적인 동작과 관련된 가장 중요한 로그 이다.
    • 브로커가 시작/중지될 때의 상태, 클라이언트 연결 요청 및 처리, 데이터 처리 관련 이벤트, 장애 또는 에러 정보
    • 활용: Kafka 브로커의 전반적인 상태를 확인하고 디버깅할 때 가장 많이 참고되는 로그
  • state-change.log: Kafka 브로커 내에서의 상태 변경 이벤트를 기록한다.
    • 파티션 리더 변경, ISR(In-Sync Replica) 집합 변경, 브로커의 상태 변화
    • 활용: 클러스터 안정성 문제를 추적하거나 파티션 리더 선출 과정을 확인할 때 유용.
root@7c6e85f3354f:/opt/kafka_2.13-2.8.1/logs# ls -al
total 1124
drwxr-xr-x 2 root root   4096 Jan 24 08:01 .
drwxr-xr-x 1 root root   4096 Mar  7  2024 ..
-rw-r--r-- 1 root root   2228 Jan 24 08:16 controller.log
-rw-r--r-- 1 root root      0 Mar  7  2024 kafka-authorizer.log
-rw-r--r-- 1 root root      0 Mar  7  2024 kafka-request.log
-rw-r--r-- 1 root root  10274 Jan 24 08:10 kafkaServer-gc.log
-rw-r--r-- 1 root root    172 Jan 24 07:56 log-cleaner.log
-rw-r--r-- 1 root root 103808 Jan 24 07:56 server.log
-rw-r--r-- 1 root root  14664 Jan 24 07:56 state-change.log

/opt/kafka_2.13-2.8.1/config

server.properties: Kafka 브로커(클러스터의 노드)를 설정하는 파일

  • broker.id: 브로커의 고유한 ID (정수값), 클러스터 내에서 브로커를 구분하는 데 사용.
  • log.dirs: 메시지가 저장되는 디스크 경로, 여러 디렉토리를 쉼표로 구분하여 지정 가능.
  • listeners: 브로커가 클라이언트 요청을 받을 주소와 포트
  • advertised.listeners: 클라이언트가 연결할 때 사용할 브로커 주소, 클러스터 외부에서 접근할 때 중요.
  • num.partitions: 새로 생성되는 토픽의 기본 파티션 수.
  • log.retention.hours: 메시지가 유지되는 시간(기본 168시간 = 7일)
  • log.retention.bytes: 로그 파일의 최대 크기(byte)
  • default.replication.factor: 새로 생성되는 토픽의 기본 복제본 수
  • min.insync.replicas: 메시지를 쓰기 위해 살아 있어야 하는 최소 복제본 수.
  • log.segment.bytes: 로그 세그먼트 파일의 최대 크기(byte)
  • message.max.bytes: 브로커가 허용하는 최대 메시지 크기(byte)
############################# Server Basics #############################
broker.id=-1
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/kafka/kafka-logs-7c6e85f3354f
num.partitions=1
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=zookeeper:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
port=9092
advertised.host.name=127.0.0.1

producer.properties

  • bootstrap.servers: Kafka 브로커의 주소, 프로듀서가 메시지를 보낼 브로커
  • key.serializer: 메시지 키를 직렬화 하는 클래
    • 예: key.serializer=org.apache.kafka.common.serialization.StringSerializer
  • value.serializer: 메시지 값을 직렬화 하는 클래스
    • 예: value.serializer=org.apache.kafka.common.serialization.StringSerializer
  • asks: 브로커로부터 전송 성공 여부를 확인하는 수준.
  • retries: 메시지 전송 실패 시 재시도 횟수.
  • delivery.timeout.ms: 메시지 전송 완료까지 대기하는 최대 시간.
  • batch.size: 한번에 전송할 메시지의 최대 크기(byte)
  • linger.ms: 배치를 생성하기 위해 기다리는 시간(ms)
############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

 

zookeeper.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

/opt/kafka_2.13-2.8.1/bin

root@7c6e85f3354f:/opt/kafka_2.13-2.8.1/bin# cat kafka-
kafka-acls.sh                        kafka-consumer-groups.sh             kafka-leader-election.sh             kafka-reassign-partitions.sh         kafka-streams-application-reset.sh
kafka-broker-api-versions.sh         kafka-consumer-perf-test.sh          kafka-log-dirs.sh                    kafka-replica-verification.sh        kafka-topics.sh
kafka-cluster.sh                     kafka-delegation-tokens.sh           kafka-metadata-shell.sh              kafka-run-class.sh                   kafka-verifiable-consumer.sh
kafka-configs.sh                     kafka-delete-records.sh              kafka-mirror-maker.sh                kafka-server-start.sh                kafka-verifiable-producer.sh
kafka-console-consumer.sh            kafka-dump-log.sh                    kafka-preferred-replica-election.sh  kafka-server-stop.sh
kafka-console-producer.sh            kafka-features.sh                    kafka-producer-perf-test.sh          kafka-storage.sh

아~ 진작에 공부할껄~

 

 

** 그냥 하루하루 개인 공부한 것을 끄적 거리는 공간입니다.

이곳 저곳에서 구글링한 것과 강의 들은 내용이 정리가 되었습니다.

그림들은 그림밑에 출처표시를 해놓았습니다.

문제가 될시 말씀해주시면 해당 부분은 삭제 하도록하겠습니다. **

반응형

'public void static main > Etc' 카테고리의 다른 글

비동기와 논블로킹  (0) 2025.01.23
[HTTP] Stateless/Stateful 뭘 사용해야 할까?  (0) 2025.01.07
[RAID] 레이드란 무엇인가!  (0) 2022.12.19
[WEB] 동기 & 비동기  (0) 2022.10.14
[OS] 프로그램과 프로세스  (0) 2022.02.21

댓글