背景 因为最近需要开发一个新的exporter,这个exporter就是在每台服务器上运行,exporter读取机器的一些信息然后将信息传输到远程,比如调用某个HTTP POST接口将信息传送给远程的服务器,或者存储到远程的数据库。
目前决定不从零开发exporter,而是在telegraf的基础上二次开发。因为telegraf支持的input插件不包括kafka,所以需要自行开发kafka input插件。
之前公司用的kafka exporter是https://github.com/danielqsj/kafka_exporter,现在公司会用这个exporter读取kafka指标然后暴露metrics API给Prometheus。
现在可以考虑先阅读下https://github.com/danielqsj/kafka_exporter的源码,因为源码包括两个部分:分别是读取kafka的指标,暴露metrics接口给Prometheus以用于Prometheus调用metrics读取到kafka的指标。显然阅读的时候只需要关注读取kafka的指标这一部分即可,因为笔者对kafka的架构不熟悉不知道要读取哪些指标和这些指标应该如何获取。
在了解了https://github.com/danielqsj/kafka_exporter如何读取kafka指标后,可以参考其读取的方法,将读取方法“复制”到telegraf的kafka input插件中。
基本概念 kafka kafka 是由 Apache 软件基金会开发的一个高吞吐量的分布式消息队列系统 。它主要用于处理大规模的实时数据流,例如日志、事件、指标等。Kafka 设计用来解决传统消息中间件面临的问题,如低吞吐量、高延迟和可靠性不足等。
zookeeper 这里主要关注kafka与zookeeper的关系,因为https://github.com/danielqsj/kafka_exporter涉及到了zookeeper。
kafka会用到zookeeper,有以下几个原因:
(1)Kafka集群通过Zookeeper来管理kafka的配置,选举leader;
(2)在Consumer Group发生变化时进行rebalance
(3)所有的topic与broker的对应关系都由zk维护
2.kafka的哪些组件需要注册到zookeeper
(1)Broker注册到zk
(2)Topic注册到zk
(3)Consumer注册到zk
producer(生产者)不注册到zookeeper是因为生产者的状态是瞬间状态,发送完消息就可以宕机了。 3.kafka和zookeeper的理解
kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。 broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。
但是发送给Topic本身的数据是不会发到Zk上的。
telegraf Telegraf 是一个开源的、插件驱动的服务器代理程序,用于收集、处理和传输指标数据。它是 InfluxData 公司开发的一个组件,主要用于监控和数据收集。Telegraf 的设计目标是轻量级、高效和易于扩展,它可以用于从各种数据源采集指标数据,然后将这些数据传输到不同的目标存储或可视化工具中。
源代码地址为:https://github.com/influxdata/telegraf。
Telegraf最大的特点是支持插件式的二次开发,Telegraf有输入插件、输出插件、处理(processors)插件等多种插件。
Telegraf还可以设置采集数据的周期 ,以固定的时间间隔定期从数据源获取指标数据。
telegraf默认情况下已经实现了对很多对象的指标采集,比如Clickhouse,CPU,Docker等,具体可看:https://docs.influxdata.com/telegraf/v1.14/plugins/plugin-list/ 的Input plugins部分:
kafka exporter源码阅读 因为这个项目使用到了go vendor这种比较老的管理第三方依赖库的机制,所以笔者先删掉了vendor模块该用go mod模块管理系统作为第三方依赖库的管理工具。
这个项目主要使用的三方库有:
https://github.com/prometheus/client_golang:这是普罗米修斯的官方Go客户端。
https://github.com/IBM/sarama:这个是Apache Kafka的官方Go客户端。
https://github.com/krallistic/kazoo-go:Kazoo 是一个与 Zookeeper 中的 Kafka 元数据交互的库。 它提供集群代理、主题元数据和消费者组的发现。
这个项目的目录结构比较简单,主要逻辑就是在kafka_exporter.go
文件里。下面依次介绍一些比较关键的代码段:
1 2 3 4 5 6 7 8 9 exporter, err := NewExporter(opts, topicFilter, topicExclude, groupFilter, groupExclude)if err != nil { klog.Fatalln(err) }defer exporter.client.Close() prometheus.MustRegister(exporter)
prometheus.MustRegister接收的是一个interface,定义如下:
1 2 3 4 type Collector interface { Describe(chan <- *Desc) Collect(chan <- Metric) }
首先查看NewExporter返回的Exporter struct是如何实现Collect方法的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (e *Exporter) Collect(ch chan <- prometheus.Metric) { if e.allowConcurrent { e.collect(ch) return } e.sgMutex.Lock() e.sgChans = append (e.sgChans, ch) fmt.Println(".....len(e.sgChans)......." , len (e.sgChans)) if len (e.sgChans) == 1 { e.sgWaitCh = make (chan struct {}) go e.collectChans(e.sgWaitCh) } else { klog.V(TRACE).Info("concurrent calls detected, waiting for first to finish" ) } waiter := e.sgWaitCh e.sgMutex.Unlock() <-waiter }
关注collect方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func (e *Exporter) collect(ch chan <- prometheus.Metric) { fmt.Println("......... collect(ch chan<- prometheus.Metric) {..." ) var wg = sync.WaitGroup{} ch <- prometheus.MustNewConstMetric( clusterBrokers, prometheus.GaugeValue, float64 (len (e.client.Brokers())), ) for _, b := range e.client.Brokers() { ch <- prometheus.MustNewConstMetric( clusterBrokerInfo, prometheus.GaugeValue, 1 , strconv.Itoa(int (b.ID())), b.Addr(), ) } ... topics, err := e.client.Topics() if err != nil { klog.Errorf("Cannot get topics: %v" , err) return } ... getTopicMetrics := func (topic string ) { defer wg.Done() if !e.topicFilter.MatchString(topic) || e.topicExclude.MatchString(topic) { return } partitions, err := e.client.Partitions(topic) if err != nil { klog.Errorf("Cannot get partitions of topic %s: %v" , topic, err) return } ch <- prometheus.MustNewConstMetric( topicPartitions, prometheus.GaugeValue, float64 (len (partitions)), topic, ) ...
collect的逻辑就是不断往ch里写不同类型的普罗米修斯指标,在allowConcurrent的情况下,这个ch实际上就是func (e *Exporter) Collect(ch chan<- prometheus.Metric)
的参数ch。
总的逻辑还是挺简单的,代码的主要实现逻辑就是通过Kafka的Go客户端连接到Kafka集群后,获取到kafka信息,再将信息通过类似prometheus.MustNewConstMetric
这样的新建普罗米修斯指标的信息来将信息补充到指标中。kafka信息包括有多少个Topic,Broker,分区等等。
要实现telegraf的input插件,需要实现Gather接口,可以参考官方文档 :
在custom-telegraf/plugins/inputs新建目录,如simple;
在simple目录中新建simple.go文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package a_simpleimport ( _ "embed" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" )var sampleConfig string type Simple struct { Ok bool `toml:"ok"` Log telegraf.Logger `toml:"-"` }func (*Simple) SampleConfig() string { return sampleConfig }func (s *Simple) Init() error { return nil }func (s *Simple) Gather(acc telegraf.Accumulator) error { if s.Ok { acc.AddCounter("state" , map [string ]interface {}{"value0" : 12 }, map [string ]string {}) acc.AddCounter("state" , map [string ]interface {}{"value" : 2 }, map [string ]string {}, time.Now()) acc.AddCounter("state" , map [string ]interface {}{"value1" : 2 , "value2" : 2 }, map [string ]string {}, time.Now()) acc.AddCounter("state" , map [string ]interface {}{"value3" : 3 , "value4" : 2 }, map [string ]string {"filed" : "value" }, time.Now()) } else { acc.AddFields("state" , map [string ]interface {}{"value" : "not great" }, nil ) } return nil }func init () { inputs.Add("simple" , func () telegraf.Input { return &Simple{} }) }
toml:"ok"
后缀表示需要名为ok的配置字段名。
在simple目录中新建对应的配置文件sample.conf:
1 2 [[inputs.simple]] ok = true
在simple目录中新建README.md,这个文件可以为空。
新建custom-telegraf/plugins/inputs/all/simple.go文件:
1 2 3 4 5 package allimport _ "github.com/influxdata/telegraf/plugins/inputs/a_simple"
至此修改完毕,下面在运行时使用的配置文件config.conf新增如下内容:
1 2 [[inputs.simple]] ok = true
执行:
1 2 cd cmd /telegraf go run . --config ../../config.conf --test --input-filter simple
运行结果为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 go run . --config ../../z_tele.conf --test --input-filter simple 2023-08-16T10:09:41Z I! Loading config: ../../config.conf 2023-08-16T10:09:41Z I! Starting Telegraf unknown 2023-08-16T10:09:41Z I! Available plugins: 242 inputs, 9 aggregators, 28 processors, 23 parsers, 59 outputs, 4 secret-stores 2023-08-16T10:09:41Z I! Loaded inputs: simple (2x) 2023-08-16T10:09:41Z I! Loaded aggregators: 2023-08-16T10:09:41Z I! Loaded processors: 2023-08-16T10:09:41Z I! Loaded secretstores: 2023-08-16T10:09:41Z W! Outputs are not used in testing mode! 2023-08-16T10:09:41Z I! Tags enabled: host=Rhetts-MacBook-Pro.local> state,host=Rhetts-MacBook-Pro.local value0=12i 1692180582000000000 > state,host=Rhetts-MacBook-Pro.local value0=12i 1692180582000000000 > state,host=Rhetts-MacBook-Pro.local value=2i 1692180582000000000 > state,host=Rhetts-MacBook-Pro.local value=2i 1692180582000000000 > state,host=Rhetts-MacBook-Pro.local value1=2i,value2=2i 1692180582000000000 > state,host=Rhetts-MacBook-Pro.local value1=2i,value2=2i 1692180582000000000 > state,filed=value,host=Rhetts-MacBook-Pro.local value3=3i,value4=2i 1692180582000000000 > state,filed=value,host=Rhetts-MacBook-Pro.local value3=3i,value4=2i 1692180582000000000
这样打印出来了新增的一些指标。
主要就是参考func (e *Exporter) collect(ch chan<- prometheus.Metric)
函数的实现,使用的实现指标的方法为 acc.AddCounter(matches[1], tm.Fields(), tags, tm.Time())
。