Zookeeper #1. 개요
Zookeeper #2. 설치와 설정
Zookeeper #3. 구동과 확인
Kafka #1. 개요
Kafka #2. 설치
Kafka 클러스터 구축을 위해서는 zookeeper 가 필요하다. 일반적으로 kafka 클러스터 구축은 3개 이상의 broker를 가지는데 이 ‘세 개 이상의 broker가 하나처럼 동작하기 위한 정보는 zookeeper가 관리’하기 때문에다.
기본적으로 프로듀서 혹은 컨슈머는 카프카에 연결할 때 직접 카프카 브로커에 연결하는 것이 아니라 주키퍼에 연결하여 카프카 브로커, 토픽, 파티션 등의 정보를 취득한 뒤 브로커에 연결하게 된다.
Zookeeper 설치
zoookeeper 설치는 zookeeper #1. 개요, #2. 설치와 설정, #3. 구동과 확인 포스트를 참고하면 된다.
다운로드 및 압축 해제
- Apache kafka 공식 페이지에서 바이너리를 다운로드 한다.
- 다운로드한 파일을 FTP 또는 SFTP등을 이용하여 서버에 업로드한다.
- 또는 wget 명령을 이용하여 서버에서 다운로드 한다
설정
카프카 클러스터의 설정을 위해서는 ‘zookeeper’ 클러스터가 존재하는 상태에서 $KAFKA_HOME/config/server.config 파일에 기재된 설정만 잘 조절해주면 된다.
############################# Server Basics #############################
# 브로커 고유 ID. 각각의 브로커는 중복되지 않은 고유한 숫자 값을 가진다.
broker.id=1
############################# Socket Server Settings #############################
# Broker 가 사용하는 호스트와 포트
listeners=PLAINTEXT://dist01.haedongg.net:9092
# Producer와 Consumer가 접근하는 호스트와 포트
# 다음의 연결 옵션을 제공한다 -PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
advertised.listeners=PLAINTEXT://dist01.haedongg.net:9092
# 네트워크 요청 처리 스레드
num.network.threads=3
# IO 발생 시 생기는 스레드 수
num.io.threads=8
# 소켓 서버가 사용하는 송수신 버퍼
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# Broker가 받은 데이터 관리를 위한 저장 공간
log.dirs=/home/kafka/data
# 여러 개의 디스크, 여러 개의 디렉토리를 사용하는 경우 콤마로 구분하여 추가한다.
# log.dirs=/home/kafka/data,/mnt/sdb1/kafka/data,/third_disk/mq-data
# 토픽 당 파티션의 수. 값 만큼 병렬 처리 가능하고, 파일 수도 늘어난다.
# 기본 값으로써 토픽을 생성할 때 파티션 숫자를 지정할 수 있고, 변경도 가능하다.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
# 토픽에 설정된 replication의 인수가 지정한 값보다 크면 새로운 토픽을 생성하고 작을 경우 브로커의 숫자와 같게 된다.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# 수집 데이터 파일 삭제 주기. (시간, 168h=7days)
# 실제 수신된 메시지의 보존 기간을 의미한다.
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# 토픽별 수집 데이터 보관 파일의 크기. 파일 크기를 초과하면 새로운 파일이 생성 된다.
log.segment.bytes=1073741824
# 수집 데이터 파일 삭제 여부 확인 주기 (밀리초, 300000ms=5min)
log.retention.check.interval.ms=300000
# 삭제 대상 수집 데이터 파일의 처리 방법. (delete=삭제, compact=불필요 내용만 제거)
log.cleanup.policy=delete
# 수집 데이터 파일 삭제를 위한 스레드 수
log.cleaner.threads=1
############################# Zookeeper #############################
# zookeeper 정보.
# zookeeper 클러스터 모두 콤마(,)로 구분해서 기재한다.
zookeeper.connect=dist01.haedongg.net:2181,dist02.haedongg.net:2181,dist03.haedongg.net:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
# 초기 GroupCoordinator 재조정 지연 시간 (밀리초)
# 운영환경에서는 3초를 권장
group.initial.rebalance.delay.ms=3000
############################# 기타 ################################
# 최대 메시지 크기 (byte)
# 예시는 15MB 까지의 메시지를 수신할 수 있다.
message.max.bytes=15728640
구동 및 확인
Kafka broker 구동을 위해서는 JAVA가 필요하다. JDK가 설치되어있고 환경변수($JAVA_HOME 및 $PATH)가 설정되어 있다면 별도의 설정 없이 카프카 브로커를 구동할 수 있다. 만약 JDK가 설치되어있지 않거나, 별도의 JAVA를 사용하고자 한다면 export 명령을 이용하여 변수를 선언하거나, $KAFKA_HOME/bin/kafka-run-class.sh 파일에 JAVA관련 변수를 넣어주도록 한다.
# Memory options
# 다음 $JAVA_HOME 관련 옵션을 적절히 수정한다.
# JAVA_HOME=/WHERE/YOU/INSTALLED/JAVA 이렇게 JAVA_HOME 변수를 지정하면 된다.
# 만약 JAVA_HOME 변수가 선언되지 않았다면 PATH에 등록된 경로에서 java 명령을 찾게 된다.
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
# Memory options
# 카프카 구동을 위한 java heap memory 옵션
if [ -z "$KAFKA_HEAP_OPTS" ]; then
# KAFKA_HEAP_OPTS="-Xmx256M" 이 값이 기본
KAFKA_HEAP_OPTS="-Xms256M -Xmx1G"
fi
broker 실행
- broker로 구동할 모든 Kafka 서버에서 수행한다.
$KAFKA_HOME/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
ex) /home/apps/kafka/bin/kafka-server-start.sh -daemon /home/apps/kafka/config/server.properties
# 만약 -daemon 옵션을 추가하지 않으면 브로커가 background가 아닌 foreground에서 구동된다.
jps -l
18320 kafka.Kafka
18403 sun.tools.jps.Jps
17899 org.apache.zookeeper.server.quorum.QuorumPeerMain
netstat -nltp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 973/sshd
tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN 1286/master
tcp6 0 0 :::8080 :::* LISTEN 17899/java
tcp6 0 0 :::40434 :::* LISTEN 17899/java
tcp6 0 0 :::22 :::* LISTEN 973/sshd
tcp6 0 0 ::1:25 :::* LISTEN 1286/master
tcp6 0 0 :::34880 :::* LISTEN 18320/java
tcp6 0 0 :::9092 :::* LISTEN 18320/java
tcp6 0 0 :::2181 :::* LISTEN 17899/java
$KAFKA_HOME/bin/kafka-server-stop.sh