kafkaの基本的な概念は理解してる前提で細かい説明省略して書くので、ご了承ください。
前提として
- 旧consumer (JavaではなくScalaで書かれてるやつ。新consumerはよく調べてない)
- version 0.9.0.0(旧consumerなら他のversionでも同じ気はする)
- partition.assignment.strategyはデフォルトのrange
- あるtopicに関して、partition数よりconsumer数が多い(実際わりとそれが普通?以下で説明)
という場合です。まず
「あるtopicに関して、partition数よりconsumer数が多い」とは、多い余ったconsumerは単にconsumeせずスタンバイ状態にしたいだけです。
スタンバイ状態にしたい、というか、あるtopicに関してpartition数より多いconsumerは、完全に同時には動かそうとしても動かないはずです。
*1
ある程度以上の規模で運用するなら、consumerを動かすサーバーは、
- 耐障害性
- サーバー増やして横にスケール
という理由により、複数台用意するのは当然ですね?
それで、あるtopicに関して
- partitionが1でも2つ(以上)のconsumerを動かす
- partitionが2でも3つ(以上)のconsumerを動かす
というのは普通にありえる、というか、やっておかないと、consumerが動いているどこかのサーバーが何らかの原因で死んだ*2ら、consumeされないtopic/partitionが発生してしまいますね?*3
なので、そういった観点からスタンバイ状態にしておくために多めにconsumer作っていたわけですが、なぜか特定のサーバーに偏るのです。複数台サーバーあるのに1台だけに偏り、そのサーバー以外のconsumerが暇してる、みたいな状態が発生して悩んでいました。
それで、その解決方法がわかったので、今書いているところです。
結論としては
「"consumer.id"をいい感じに設定する」
なのですが、まずconsumer.id自体の説明を書きます。
公式の説明には少ししかありません
http://kafka.apache.org/090/documentation.html#oldconsumerconfigs
Property Default Description consumer.id null Generated automatically if not set.
「デフォルトがnullで、明示されてなかったら自動生成されるよ」
と言ってます。何も間違ってはいないですが、もう少し説明欲しかったですね。さて「自動生成」とはどう生成されるのでしょうか?
ソースコード見てみましょう。*4
val consumerIdString = { var consumerUuid : String = null config.consumerId match { case Some(consumerId) // for testing only => consumerUuid = consumerId case None // generate unique consumerId automatically => val uuid = UUID.randomUUID() consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8)) } config.groupId + "_" + consumerUuid }
「host名 + 現在時刻 + UUID」です。
それで
「partition数より多いconsumerがいた場合に、どのconsumerが実際にconsumeすることになるか?」
の決定のためのアルゴリズムが、そのconsumer idの辞書順らしいです。
*5
なので、先頭がhost名なので、特定のhost上のJVMプロセスで動いてるconsumerばかりが動く、という微妙に残念な挙動になりました(つらい
なので、consumer id を明示するわけですが、以下の様な要件を考えた結果
- JVMプロセスの再起動(デプロイ)のたびに、consumeするマシンが必要以上に動的に変わるのはさけたい
- ひとまず完全に均等である必要はないし、topicやpartitionは増えるので、完全に手動で決めるのも嫌だし、なんとなくで分散してほしい
- idが衝突したらダメなので、衝突はしないようにしたい (名前の登録にzk利用してるので、衝突するとおそらくzk登録時にエラーになってバグる?)
最終的に以下の様なアルゴリズムで登録することにしました
SHA256でhash(host名 + "group.id"の値) + host名 + ランダム文字列
動的に変えたくないがhost毎にいい感じに違う値にしたかったので、host名と group.id でハッシュをとり、かつ辞書順なのでそれを先頭に持ってきて、
さらに万が一のhash衝突のためにhost名を追加してユニーク*6になるようにして、
さらにローカルやJenkins上のテスト環境で衝突したので、一番後ろにランダム文字列追加
となりました。
見事にこれを試して分散したので、同じような境遇の人にはおすすめです。
もしくは、もっといい感じな工夫してる人いたら教えて下さい。
*1:なにか間違ってたら指摘ください
*2:丸ごとでも、JVMプロセスでも、どんな死に方でもいいけど
*3:死んだら動的に増やす仕組みでも用意しておかない限り
*4:ところでこのコード、match式が値返すことを利用すればvalにできるはずのvarが存在していて、つらくなりますね?
*5:ドキュメントで "For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order." と書いてあるところ?
*6:かつhash衝突しても、辞書順sortの結果が動的に変わらない