quickstart¶
单一broker¶
下载代码¶
cd src
wget http://mirrors.cnnic.cn/apache/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz
tar xf kafka_2.11-0.10.1.0.tgz -C /opt/
cd /opt/kafka_2.11-0.10.1.0/
kafka使用zookeeper,首先需要启动zookeeper¶
bin/zookeeper-server-start.sh config/zookeeper.properties
新开窗口,启动kafka¶
bin/kafka-server-start.sh config/server.properties
创建话题¶
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以配置broker自动创建topics
查看topic¶
bin/kafka-topics.sh --list --zookeeper localhost:2181
发送消息¶
每一行内容会被分为一条消息
启动producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动一个consumer¶
kafka有一个命令行的consumer,可以将消息显示到标准输出
新开一个窗口运行如下命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
现在可以在刚刚启动的producer窗口发布消息,你将在consumer中看到
刚刚使用的命令行工具,不加参数运行可以查看帮助信息
创建一个多broker集群¶
在同一台机器创建
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# pwd
/opt/kafka_2.11-0.10.1.0
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
编辑配置文件
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
集群中broker.id
唯一识别一个节点,之前已经启动了zookeeper,所以只需要从刚刚生成的两个配置文件启动新的节点
启动kafka
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
创建一个具有三个副本的topic
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
replication-factor表示该topic需要在不同的broker中保存几份
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
查看之前建立的topic
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
可以看到test topic没有副本,并且存放在server 0上
发布消息到新topic
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
测试容错¶
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# ps aux |grep server.properties
kill -9 PID
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 2,1
root@ubuntu47:/opt/kafka_2.11-0.10.1.0#
Leader切换到node 2,node 0不在在in-sync副本里,而消息仍然可用
使用Kafka Connect导入/导出数据¶
使用Kafka Connect从文件导入数据,以及导出数据到文件
echo -e "foo\nbar" > test.txt
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# cat test.sink.txt
foo
bar
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
新窗口,echo “Another line” >> test.txt
可以看到如下信息,同时可以查看test.sink.txt
使用Kafka Streams处理数据¶
准备使用Kafka Streams处理的数据
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
创建topic
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
使用producer将数据输入到指定的topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
新开窗口,执行如下命令
运行WordCount,处理输入的数据,除了日志,不会有stdout,结果不断地写回另一个topic(streams-wordcount-output)
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
检查WordCountDemo应用,从输出的topic读取
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1
以上信息是输出到标准输出的内容,第一列是message的key,第二列是value,要注意的是,输出的实际是一个连续的更新流,其中每条数据(原始输出的每行)是一个单词的最新的count,又叫记录键"kafka".
对于同一个key有多个记录,每个记录之后是前一个的更新.
问题记录¶
kafka启动失败,提示内存不足¶
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# bin/kafka-server-start.sh config/server-1.properties &
[1] 10436
root@ubuntu47:/opt/kafka_2.11-0.10.1.0# Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /opt/kafka_2.11-0.10.1.0/hs_err_pid10436.log
修改JVM堆大小,修改启动脚本中,KAFKA_HEAP_OPTS的值
位置:
bin/zookeeper-server-start.sh
bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx256M -Xms256M"
fi
修改-Xmm,-Xms,视服务器内存而定
Kafka Connect启动报错¶
[2017-02-22 17:56:53,828] ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289)
[2017-02-22 17:56:53,828] ERROR Failed to commit offsets for WorkerSourceTask{id=local-file-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109)
确认kafka进程是否存在,可以暂停之后,重新启动kafka