golang 消化 kafka 的坑

问题所在

用 golang 连接 kafka 做推送或消化操作,一般都会说用 github.com/Shopify/sarama 这个包,但是我在实际使用中发现是有问题的。

sarama 文档中给了几个操作例子,其中 producer 的例子均是正确且好用的。问题出在 consumer 上,文档中给的 consumer 的例子是这样的:

consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
    panic(err)
}

defer func() {
    if err := consumer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
    panic(err)
}

defer func() {
    if err := partitionConsumer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumed := 0
ConsumerLoop:
for {
    select {
    case msg := <-partitionConsumer.Messages():
        log.Printf("Consumed message offset %d\n", msg.Offset)
        consumed++
    case <-signals:
        break ConsumerLoop
    }
}

log.Printf("Consumed: %d\n", consumed)

重要的事情不需要说三遍:这个例子是个非常 demo 的 demo,它仅消化了某个 topic 的仅一个 partition 上的消息,且每次都从最近一条消化,也没有加入 group。

如果照抄这个例子用于实际生产,会有很多问题,大致有 3 条:

  1. 如果 topic 有多个 partition,除了 0 号,其他 partition 上的消息均不会被消化,这就不是丢几条消息的事了,是丢掉了超过一半的消息,partition 越多,丢的越多。
  2. 如果程序挂掉了,一段时间没有消化消息,下次程序重启时,只会从最近一条消息(OffsetNewest)开始消化,如果程序挂掉的这段时间内有消息推入,这些消息就会丢失。
  3. 由于没有加入 group,多个消化程序之间是无法自动相互“互斥”、“替补”的,也就难以实现集群式消化。

kafka 的复杂模型

而据我所知,目前 sarama 并没有对 consumer 做进一步的“傻瓜式”封装。个人觉得这是 sarama 的欠缺之处,但无可厚非,因为 kafka 的队列模型本身就挺复杂,不宜粗暴封装。

简单介绍一下 kafka 的队列模型:

首先,每个 topic 可以有多个 partition,每个 partition 才是严格意义上的消息队列(消息先进先出),而不同 partition 之间是互不影响的。举个例子来说,有消息 A 和消息 B,如果要保证 A 一定要比 B 先被消化,就必须要保证 A 一定要先被投递到某个 partition,且 B 再被投递到同一个 partition。如果 B 被投递到了不一样 partition,那么 B 是有可能先于 A 被消化的。

其次,“一个 topic 的一个 partition”是保证无重复消息的最小消化单元,换句话说,如果有两个消化程序消化同一个 topic 的同一个 partition,那它们消化的消息事实上是彼此重复的。所以为保证所有 partition 均被消化,且不会被同一个业务(属于同一个 group)的多个消化程序重复消化,是需要一个分配策略来决定每个消化程序应当消化哪几个或哪一个 partition,又或者应当做作为候补(当消化程序数量大于 partition 数量时发生),而 kafka 是希望用户程序自行实现这个分配策略的。

这个看似复杂讨厌的缺陷,个人认为其实是一个特性,这是因为这个特性,kafka 可以应对多样的、复杂的应用场景,直至被玩出花来。举例说明一下。

  • 消息没有严格有序性要求,任意消息可以被任意消化程序消化,且消化各消息耗时相近——可以将消息投递到任意 partition,partition 任意均等分配到各个消化程序;
  • 所有消息要求严格有序,但消息量不大——可以配置仅一个 partition,一个消化程序负责消化,其他消化程序作为替补;
  • 同组消息要求有序,不同组消息不要求有序,例如同一个用户的消化要求有序,不同用户的消息不要求有序——可以将属于同一组的消息投递到同一个 partition,比如拿 UID 对 partition 数量取模;
  • 特定组的消息仅可以被特定消化程序消化——可以在将该特定组的消息投递到特定 partition,配置时指定到特定消化程序;
  • 某些消息消化耗时长且要求有序,有些消息消化耗时短且不要求有序——可以将分为两组 partition,一组实行针对有序消息的策略,且多一些 partition、多一些消化程序增大处理能力,另一组实行针对无序消息的的策略,且少一些 partition、少一些消化程序节省资源。

我觉得我可以一种说下去,各组神奇的场景都能蹦出来,更神奇的是 kafka 均能应付自如,然而代价就是开发负担都压到使用者身上了。事实上,大多数时候我们面临的场景就是“消息没有严格有序性要求,任意消息可以被任意消化程序消化,且消化各消息耗时相近”,所以,本文就该场景提供一些解决方案,故如果您确认您的需求就是我所述的,方可继续往下看

尝试用代码自行解决

sarama 虽然没有做傻瓜式封装,但也提供了也谢接口。若要解决上述的问题 1、2,即实现自动消化所有 partition、自动更新 offset,可以在代码中自行完成的,我尝试实现了一下,代码如下:

func main() {
	config := sarama.NewConfig()
	config.Version = sarama.V0_10_2_0
	client, err := sarama.NewClient([]string{ADDR}, config)
	if err != nil {
		log.Fatalln(err)
	}

	offsetManager, err := sarama.NewOffsetManagerFromClient(GROUP, client)
	if err != nil {
		log.Fatalln(err)
	}

	pids, err := client.Partitions(TOPIC)
	if err != nil {
		log.Fatalln(err)
	}

	consumer, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		log.Fatalln(err)
	}

	defer consumer.Close()

	wg := &sync.WaitGroup{}

	for _, v := range pids {
		wg.Add(1)
		go consume(wg, consumer, offsetManager, v)
	}

	wg.Wait()
}

func consume(wg *sync.WaitGroup, c sarama.Consumer, om sarama.OffsetManager, p int32) {
	defer wg.Done()

	pom, err := om.ManagePartition(TOPIC, p)
	if err != nil {
		log.Fatalln(err)
	}
	defer pom.Close()

	offset, _ := pom.NextOffset()
	if offset == -1 {
		offset = sarama.OffsetOldest
	}

	pc, err := c.ConsumePartition(TOPIC, p, offset)
	if err != nil {
		log.Fatalln(err)
	}
	defer pc.Close()

	for msg := range pc.Messages() {
		log.Printf("[%v] Consumed message offset %v\n", p, msg.Offset)
		pom.MarkOffset(msg.Offset + 1, "")
	}
}

接着实现问题3,即实现加入 group 自动分配 partition 到各个消息程序,很抱歉我失败了,要完成这一步需要精读 kafka 的 group 分配操作通信协议,在充分了解协议的前提下写代码,这件事的精力成本远远高于我的预算了,所以我准备打退堂鼓,好在转机来了,确切的说,是三方包来了。

三方包来了

本文开头是这样的:

用 golang 连接 kafka 做推送或消化操作,一般都会说用 github.com/Shopify/sarama 这个包,但是我在实际使用中发现是有问题的。

言外之意就是还有更好的包呗!确实如此,我在尝试解决这些问题是多次查看了 sarama 的 issues 列表,在 #958 的对话中发现了这样一个包:github.com/bsm/sarama-cluster——救我出水火。

sarama-cluster 对 sarama 的消化功能做了进一步封装,实现了自动更新 offset、自动加入 group、自动分配 partition。注意它不是 sarama 的替代品,而是一个补充品,sarama-cluster 依赖 sarama。它的用法在该仓库的 README 中已经阐述了,封装的很好,用法很简单,这里就不复制粘贴了。

另外还有一个问题是 kafka 的版本,实际测试中(2017年11月测的),sarama-cluster 不支持 0.11 且以上版本,会报协议方面的错误,期待作者跟进。

重申一遍,前提条件是需求必须是:消息没有严格有序性要求,任意消息可以被任意消化程序消化,且消化各消息耗时相近。