Zookeeper #1. 개요
Zookeeper #2. 설치와 설정
Zookeeper #3. 구동과 확인
Kafka #1. 개요
Kafka #2. 설치
Kafka 클러스터 구축을 위해서는 zookeeper 가 필요하다. 일반적으로 kafka 클러스터 구축은 3개 이상의 broker를 가지는데 이 ‘세 개 이상의 broker가 하나처럼 동작하기 위한 정보는 zookeeper가 관리’하기 때문에다.
기본적으로 프로듀서 혹은 컨슈머는 카프카에 연결할 때 직접 카프카 브로커에 연결하는 것이 아니라 주키퍼에 연결하여 카프카 브로커, 토픽, 파티션 등의 정보를 취득한 뒤 브로커에 연결하게 된다.1실제 카프카 연결 시 연결 옵션으로 브로커의 정보만 입력하더라도 주키퍼에 연결한 뒤 다시 브로커에 연결된다.
Zookeeper 설치
zoookeeper 설치는 zookeeper #1. 개요, #2. 설치와 설정, #3. 구동과 확인 포스트를 참고하면 된다.
다운로드 및 압축 해제
- Apache kafka 공식 페이지에서 바이너리를 다운로드 한다.
- 다운로드한 파일을 FTP 또는 SFTP등을 이용하여 서버에 업로드한다.
- 또는 wget 명령을 이용하여 서버에서 다운로드 한다
설정
카프카 클러스터의 설정을 위해서는 ‘zookeeper’ 클러스터가 존재하는 상태에서 $KAFKA_HOME/config/server.config 파일에 기재된 설정만 잘 조절해주면 된다.
############################# Server Basics ############################# # 브로커 고유 ID. 각각의 브로커는 중복되지 않은 고유한 숫자 값을 가진다. broker.id=1 ############################# Socket Server Settings ############################# # Broker 가 사용하는 호스트와 포트 listeners=PLAINTEXT://dist01.haedongg.net:9092 # Producer와 Consumer가 접근하는 호스트와 포트 # 다음의 연결 옵션을 제공한다 -PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL advertised.listeners=PLAINTEXT://dist01.haedongg.net:9092 # 네트워크 요청 처리 스레드 num.network.threads=3 # IO 발생 시 생기는 스레드 수 num.io.threads=8 # 소켓 서버가 사용하는 송수신 버퍼 socket.send.buffer.bytes=1024000 socket.receive.buffer.bytes=1024000 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # Broker가 받은 데이터 관리를 위한 저장 공간 log.dirs=/home/kafka/data # 여러 개의 디스크, 여러 개의 디렉토리를 사용하는 경우 콤마로 구분하여 추가한다. # log.dirs=/home/kafka/data,/mnt/sdb1/kafka/data,/third_disk/mq-data # 토픽 당 파티션의 수. 값 만큼 병렬 처리 가능하고, 파일 수도 늘어난다. # 기본 값으로써 토픽을 생성할 때 파티션 숫자를 지정할 수 있고, 변경도 가능하다. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. # 토픽에 설정된 replication의 인수가 지정한 값보다 크면 새로운 토픽을 생성하고 작을 경우 브로커의 숫자와 같게 된다. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # 수집 데이터 파일 삭제 주기. (시간, 168h=7days) # 실제 수신된 메시지의 보존 기간을 의미한다. log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # 토픽별 수집 데이터 보관 파일의 크기. 파일 크기를 초과하면 새로운 파일이 생성 된다. log.segment.bytes=1073741824 # 수집 데이터 파일 삭제 여부 확인 주기 (밀리초, 300000ms=5min) log.retention.check.interval.ms=300000 # 삭제 대상 수집 데이터 파일의 처리 방법. (delete=삭제, compact=불필요 내용만 제거) log.cleanup.policy=delete # 수집 데이터 파일 삭제를 위한 스레드 수 log.cleaner.threads=1 ############################# Zookeeper ############################# # zookeeper 정보. # zookeeper 클러스터 모두 콤마(,)로 구분해서 기재한다. zookeeper.connect=dist01.haedongg.net:2181,dist02.haedongg.net:2181,dist03.haedongg.net:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# # 초기 GroupCoordinator 재조정 지연 시간 (밀리초) # 운영환경에서는 3초를 권장 group.initial.rebalance.delay.ms=3000 ############################# 기타 ################################ # 최대 메시지 크기 (byte) # 예시는 15MB 까지의 메시지를 수신할 수 있다. message.max.bytes=15728640
구동 및 확인
Kafka broker 구동을 위해서는 JAVA가 필요하다. JDK가 설치되어있고 환경변수($JAVA_HOME 및 $PATH)가 설정되어 있다면 별도의 설정 없이 카프카 브로커를 구동할 수 있다. 만약 JDK가 설치되어있지 않거나, 별도의 JAVA를 사용하고자 한다면 export 명령을 이용하여 변수를 선언하거나, $KAFKA_HOME/bin/kafka-run-class.sh 파일에 JAVA관련 변수를 넣어주도록 한다.
# Memory options # 다음 $JAVA_HOME 관련 옵션을 적절히 수정한다. # JAVA_HOME=/WHERE/YOU/INSTALLED/JAVA 이렇게 JAVA_HOME 변수를 지정하면 된다. # 만약 JAVA_HOME 변수가 선언되지 않았다면 PATH에 등록된 경로에서 java 명령을 찾게 된다. if [ -z "$JAVA_HOME" ]; then JAVA="java" else JAVA="$JAVA_HOME/bin/java" fi # Memory options # 카프카 구동을 위한 java heap memory 옵션 if [ -z "$KAFKA_HEAP_OPTS" ]; then # KAFKA_HEAP_OPTS="-Xmx256M" 이 값이 기본 KAFKA_HEAP_OPTS="-Xms256M -Xmx1G" fi
broker 실행
- broker로 구동할 모든 Kafka 서버에서 수행한다.
$KAFKA_HOME/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties ex) /home/apps/kafka/bin/kafka-server-start.sh -daemon /home/apps/kafka/config/server.properties # 만약 -daemon 옵션을 추가하지 않으면 브로커가 background가 아닌 foreground에서 구동된다.
- 프로세스 및 리슨 포트 확인
jps -l 18320 kafka.Kafka 18403 sun.tools.jps.Jps 17899 org.apache.zookeeper.server.quorum.QuorumPeerMain netstat -nltp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 973/sshd tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN 1286/master tcp6 0 0 :::8080 :::* LISTEN 17899/java tcp6 0 0 :::40434 :::* LISTEN 17899/java tcp6 0 0 :::22 :::* LISTEN 973/sshd tcp6 0 0 ::1:25 :::* LISTEN 1286/master tcp6 0 0 :::34880 :::* LISTEN 18320/java tcp6 0 0 :::9092 :::* LISTEN 18320/java tcp6 0 0 :::2181 :::* LISTEN 17899/java
- 종료
$KAFKA_HOME/bin/kafka-server-stop.sh