Blog
Kafkaを使ってチャットツールを作ってみました
こんにちは、15卒のフィ(@dxhuy)です。 最近仕事でちょっとKafkaを使う機会がありました。Kafkaは汎用的に色々な用途で使えるとても面白いミドルウェアです。そこでKafkaの知識を深めるために何か作ろうかなと考えて、コマンドラインで実現する簡単なチャットツールを作ってみました。
Kafkaとは
Apache Kafka は Publish-Subscribe 型のメッセージングシステムであり、オフライン・オンライン両方のメッセージ取得に適している。これ は大容量のイベントとログデータを低遅延で収集および配信する目的で、当初は LinkedIn で開発された メッセージングシステムであった。
- Producer:情報を作る側。
- Consumer : 情報を使う側。
- Topic: ConsumerがTopicにSubscribeして情報取り出し、ProducerがTopicにPublishする。Topicは簡単に「情報の箱」のようなものです。
- Offset : Consumerがどこまで使ったのかを表す。
- Consumer グループ:TopicにSubscribeのはConsumer単位じゃなくて、Consumerのグループです。一つのメッセージはConsumerグループに送られ、そのグループが一回だけ使えるのがポイントです。そこで、もし全部のConsumerが一つのグループに属したら、キューとして扱えるし、Consumerが別のグループになったら、Pub/SubシステムになるのがKafkaの面白い点です。
- Zookeeper:Kafkaのクラスターの情報を保存するためのミドルウェアです。
今回はKafkaのPub/Sub機能を使うことにより、チャットツールを数行で作れることができ、感動しました。
プログラムの設計
今回作りたいのは簡単にコマンドラインのチャットツールですので、主に二つの機能が必要です
- チャットメッセージを送る機能 (ChatConsole)
- チャットメッセージを表示機能 (RoomConsole)
- その他ユーザ名・チャット部屋の表示・設定できるようにする機能も必要ですね
この設計でKafkaのPub/Subシステムどう適用するために
- 一つのユーザが一つの Consumer Group とする
- 一つのチャット部屋が一つの Topic とする
- メッセージの送信するときは送信した人をプレフィクスとして識別する、送信先は決めたTopicです
実装の詳細
まずは標準Consumerと標準Producerをラップするクラスを作ります。ラップしたConsumerは受け取ったメッセージを標準出力に出力するだけで、ラップしたProducerは決めたユーザ名に対して、メッセージのプレフィクスを付けるようにしました。 Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
package hkafka import java.nio.charset.StandardCharsets import java.util.Properties import kafka.consumer.{Whitelist, Consumer, ConsumerConfig} import kafka.serializer.DefaultDecoder class HConsumer(topic: String, groupId: String, zkHost: String) { val consumerProps = new Properties() consumerProps.put("group.id", groupId) consumerProps.put("auto.offset.reset", "smallest") consumerProps.put("zookeeper.connect", zkHost) val config = new ConsumerConfig(consumerProps) val connector = Consumer.create(config) def consume(callBack: String => Unit) = { Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connector.shutdown() } }) val filterSpec = new Whitelist(topic) val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).head val iter = stream for (messageAndTopic <- iter) { val str = new String(messageAndTopic.message, StandardCharsets.UTF_8); callBack(str) } } } |
Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package hkafka import java.util.Properties import kafka.producer.{KeyedMessage, Producer => KafkaProducer, ProducerConfig} case class HProducer(zkHost: String ,topic: String, messagePrefix: String, brokers: List[String]) { val props = new Properties props.put("zookeeper.connect", s"${zkHost}") props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("metadata.broker.list", brokers.mkString(",")) protected val config = new ProducerConfig(props) private lazy val producer = new KafkaProducer[String, String](config) def send(message: String) = { val key = new KeyedMessage[String, String](topic, s"$messagePrefix $message") producer.send(key) } } |
ChatConsole(チャット送信モジュール):User nameとチャット部屋をArgumentから受け取って、Producerを作成する。そして、ユーザから入力一行ずつを読み取って Topicに送信するという非常に簡単なロジックです。ユーザ入力を簡単に取れるために、jlineライブラリを使いました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package chat import hkafka.HProducer import jline.console.ConsoleReader import joptsimple._ import hkafka.{HZooKeeper} object ChatConsole extends ChatArgument { def main(args: Array[String]) = { val options: OptionSet = tryParse(parser, args) val room = options.valueOf(chatRoom) val uname = options.valueOf(userName) val host = options.valueOf(server) val hZooKeeper = new HZooKeeper(host) val brokers = hZooKeeper.getBrokerLists val producer = new HProducer(host, room, uname, brokers) val reader = new ConsoleReader() reader.setPrompt(s"${options.valueOf(userName)}: ") var line: String = null while ((line = reader.readLine()) != null) { producer.send(line) } } } |
RoomConsole(送信した内容を表示するモジュール):User nameとチャット部屋をArgumentから受け取って、Consumerを作成する。ConsumerをKafkaサーバにZookeeperを通してつながり、最新の10メッセージをチャット歴史として表示する
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package chat import hkafka.{HZooKeeper, HConsumer} import joptsimple.{OptionSet} object RoomConsole extends ChatArgument { def main(args: Array[String]) = { val options: OptionSet = tryParse(parser, args) val room = options.valueOf(chatRoom) val uname = options.valueOf(userName) val host = options.valueOf(server) val consumer = new HConsumer(room , uname, host) val hZooKeeper = new HZooKeeper(host) try { val partition = hZooKeeper.getPartions(room).values.head hZooKeeper.setToDesireOffset( uname, room, partition.head, 10L ) } catch { case e: Exception => //オフセットが10以下の場合オフセットを戻せない場合がある } consumer.consume(printOut) } def printOut(message: String) = { Console.out.print(s"$message \n") } } |
10メッセージチャット歴史を表示するため、今回はクライアントのオフセットをZooKeeper側で管理しますので、直接Zookeeperのメタ情報を修正する処理も必要です (ちょっと面倒くさいなあと思いました、現在Kafkaチームが次世代のクライアントを再設計しています(Consumer Client Re-Design)ので、もうちょっと簡単にオフセットを修正できる方法を期待します。。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def setToDesireOffset(groupId: String, topic: String, partition: Int, approximateMessagesBack: Long) = { val path = s"${ZkUtils.ConsumersPath}/${groupId}/offsets/${topic}/${partition}" if (zkClient.exists(path)) { val currentOffset = zkClient.readData[Any](path) val desiredOffset = math.max(0, (currentOffset.toString.toLong - approximateMessagesBack)) zkClient.writeData(path, desiredOffset.toString) } else { throw new RuntimeException( s""" | Unable to find the move the consumer back | in ZK. This may or may not be an issue, depending on whether you expect | the path to exist. Path: " + ${path}) """.stripMargin ) } } |
ここで完成!!!! 成果を確認してみました 数行でチャットシステムを簡単に作りました〜
感想
- Kafkaのコンセプトが非常に面白くて、Queueシステムとして、Pub/Subシステムとして両方できるのが嬉しい
- Kafkaのメッセージは永続化されているため、メッセージロスなく安全!
- メッセージの読み込みは各Subscriberのオフセットで管理できるため、オフセットを変更すればメッセージのリプレイもできてすごい便利でした。
- Consumerのオフセットの調整はZookeeperで直接削除・書き込みしないといけないため結構面倒。
- Kafka標準のConsumer/ProducerのAPIはちょっと使いづらい、またクラスターのホスト発見、オフセットの調整などのZooKeeperの処理は資料がほぼなく、直接にKafkaのソースコード(http://github.com/apache/kafka)を見る必要がある。
ソースコードは https://github.com/huydx/KafkaChat に置いてありますので、ご興味ある方はどうぞ〜
Author