Kafkaをとりあえず動かす

最近TL等ではKafkaという単語をよく目にするようになってきましたが、kafkaとググっても日本語の情報がほとんどないのが悲しいですね。というわけで本家ドキュメントのQuick Start的なモノを動かしてみた話です。

本当に動かすだけです。が、以外にこのレベルの情報もない!のがKafkaですね。もっとあっても良さそうなのに。。。

Kafkaってなんなのよ、って話は今は触れずにとりあえずKafkaをローカル環境にて動かしてみます。

Apache Kafka

公式ドキュメントのQuick Startに従います。

とりあえず動かす

Kafkaを取ってくる

ミラーサイトからKafkaを取ってきます。最新版は8.2.0のようですが、公式ドキュメントは8.1.1なので、8.1.1を取ってきます。 適当に展開しましょう。

$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
$ tar xzvf kafka_2.9.2-0.8.1.1.tgz

ZooKeeperを起動する

KafkaはZooKeeperが必須のプロダクトとなっていますので、Kafkaを立ち上げる前に、ZooKeeperも起動しておきます。 先ほど取ってきたKafkaのディレクトリにZooKeeperも同梱されているのでテスト的に利用する際はそちらを利用するとよいでしょう。 bin/ディレクトリに各種操作実行用のスクリプトconfig/ディレクトリ以下にKafkaやZooKeeperの設定ファイル一式が入っています。

まずはZooKeeprの起動です。

$ cd kafka_2.9.2-0.8.1.1
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2015-02-05 14:48:29,847] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-02-05 14:48:29,848] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
...

Kafka(Broker)を起動する

続いて、Kafkaの起動です。

$ bin/kafka-server-start.sh config/server.properties
[2015-02-05 14:50:55,521] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2015-02-05 14:50:55,560] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
...

topicの作成

続いて、topicを作成します。topicは、Kafkaにおけるメッセージのカテゴリ名のようなものです。メッセージが入る箱みたいなものですね。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-sample
Created topic "kafka-sample".

topicが出来たようです。ついでにtopicを確認してみます。

$ bin/kafka-topics.sh --list --zookeeper localhost:2181
kafka-sample

確かにkafka-sampleが作られています。

データを投入する

同梱されているスクリプトを利用してデータを投入します。ちなみに、Kafkaではデータを投入する役割をProducer、メッセージを受け取る(一時的に保持する)役割がBroker、メッセージを取り出す役割をConsumerと言います。慣れるまではワケワカメですね。最初に立ち上げたKafkaはBrokerに相当します。

同梱しているスクリプトは標準入力を受け取り、Brokerにデータを渡す機能を提供しています。

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-sample
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test
message  

という感じです。これでBrokerにメッセージが渡された状態になります。

データを取り出す

Consumer経由でデータを取り出します。こちらも同梱のスクリプトを利用します。

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-sample --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test
message

という感じで結果が返ってきます。確かにproducer側で投入したデータが返ってきますね。双方のスクリプトを動かした状態でProducer側で入力した結果は、すぐにConsumer側で表示されます。