apache kafkaでconsumer.idを指定しないとconsumerが偏る件

kafkaの基本的な概念は理解してる前提で細かい説明省略して書くので、ご了承ください。


前提として

  • 旧consumer (JavaではなくScalaで書かれてるやつ。新consumerはよく調べてない)
  • version 0.9.0.0(旧consumerなら他のversionでも同じ気はする)
  • partition.assignment.strategyはデフォルトのrange
  • あるtopicに関して、partition数よりconsumer数が多い(実際わりとそれが普通?以下で説明)


という場合です。まず
「あるtopicに関して、partition数よりconsumer数が多い」とは、多い余ったconsumerは単にconsumeせずスタンバイ状態にしたいだけです。
スタンバイ状態にしたい、というか、あるtopicに関してpartition数より多いconsumerは、完全に同時には動かそうとしても動かないはずです。
*1

ある程度以上の規模で運用するなら、consumerを動かすサーバーは、

  • 耐障害性
  • サーバー増やして横にスケール

という理由により、複数台用意するのは当然ですね?
それで、あるtopicに関して

  • partitionが1でも2つ(以上)のconsumerを動かす
  • partitionが2でも3つ(以上)のconsumerを動かす

というのは普通にありえる、というか、やっておかないと、consumerが動いているどこかのサーバーが何らかの原因で死んだ*2ら、consumeされないtopic/partitionが発生してしまいますね?*3


なので、そういった観点からスタンバイ状態にしておくために多めにconsumer作っていたわけですが、なぜか特定のサーバーに偏るのです。複数台サーバーあるのに1台だけに偏り、そのサーバー以外のconsumerが暇してる、みたいな状態が発生して悩んでいました。



それで、その解決方法がわかったので、今書いているところです。



結論としては
「"consumer.id"をいい感じに設定する」
なのですが、まずconsumer.id自体の説明を書きます。

公式の説明には少ししかありません

http://kafka.apache.org/090/documentation.html#oldconsumerconfigs

Property Default Description
consumer.id null Generated automatically if not set.

「デフォルトがnullで、明示されてなかったら自動生成されるよ」
と言ってます。何も間違ってはいないですが、もう少し説明欲しかったですね。さて「自動生成」とはどう生成されるのでしょうか?
ソースコード見てみましょう。*4

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L114-L126

  val consumerIdString = {
    var consumerUuid : String = null
    config.consumerId match {
      case Some(consumerId) // for testing only
      => consumerUuid = consumerId
      case None // generate unique consumerId automatically
      => val uuid = UUID.randomUUID()
      consumerUuid = "%s-%d-%s".format(
        InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
        uuid.getMostSignificantBits().toHexString.substring(0,8))
    }
    config.groupId + "_" + consumerUuid
  }

「host名 + 現在時刻 + UUID」です。

それで
「partition数より多いconsumerがいた場合に、どのconsumerが実際にconsumeすることになるか?」
の決定のためのアルゴリズムが、そのconsumer idの辞書順らしいです。
*5


なので、先頭がhost名なので、特定のhost上のJVMプロセスで動いてるconsumerばかりが動く、という微妙に残念な挙動になりました(つらい


なので、consumer id を明示するわけですが、以下の様な要件を考えた結果

  • JVMプロセスの再起動(デプロイ)のたびに、consumeするマシンが必要以上に動的に変わるのはさけたい
  • ひとまず完全に均等である必要はないし、topicやpartitionは増えるので、完全に手動で決めるのも嫌だし、なんとなくで分散してほしい
  • idが衝突したらダメなので、衝突はしないようにしたい (名前の登録にzk利用してるので、衝突するとおそらくzk登録時にエラーになってバグる?)


最終的に以下の様なアルゴリズムで登録することにしました

SHA256でhash(host名 + "group.id"の値) + host名 + ランダム文字列

動的に変えたくないがhost毎にいい感じに違う値にしたかったので、host名と group.id でハッシュをとり、かつ辞書順なのでそれを先頭に持ってきて、
さらに万が一のhash衝突のためにhost名を追加してユニーク*6になるようにして、
さらにローカルやJenkins上のテスト環境で衝突したので、一番後ろにランダム文字列追加

となりました。
見事にこれを試して分散したので、同じような境遇の人にはおすすめです。

もしくは、もっといい感じな工夫してる人いたら教えて下さい。

*1:なにか間違ってたら指摘ください

*2:丸ごとでも、JVMプロセスでも、どんな死に方でもいいけど

*3:死んだら動的に増やす仕組みでも用意しておかない限り

*4:ところでこのコード、match式が値返すことを利用すればvalにできるはずのvarが存在していて、つらくなりますね?

*5:ドキュメントで "For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order." と書いてあるところ?

*6:かつhash衝突しても、辞書順sortの結果が動的に変わらない