利用golang怎么对卡夫卡进行连接

  介绍

本篇文章为大家展示了利用golang怎么对卡夫卡进行连接,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

1。首先初始化配置配置把卡夫卡和ES的地址配置好还有一个日志方便查看

配置信息如下用到的库是

github.com/astaxie/beego/config“刷:平原;”   (日志)   log_level =,调试   log_path =啊?logs/log_transfer.log"   (卡夫卡)   server_addr =192.168.0.134:9092   topic =nginx_log   [ES]   时间=addr  http://192.168.0.134:9200

2。读取之配置存取进结构体

type  LogConfig  struct  {   kafkaAddr 才能;字符串   ESAddr 才能;字符串   LogPath 才能;字符串   LogLevel 才能;字符串   Topic 才能;字符串   }   var  (   ,,logConfig  * LogConfig   )

3。读取之配置代码如下

func  initConfig (filename  conftype 字符串,字符串)(err 错误),{   ,,,err 相依:=,配置只NewConfig (conftype文件名)   ,,if  err  !=, nil  {   ,,,fmt只Println (,“new  config  faild,犯错:“,犯错)   ,,,,回来   ,,}   ,,logConfig =,, LogConfig {}   ,,logConfig.LogLevel =,参看只字符串(,“日志::log_level")   ,,if  len (logConfig.LogLevel),==, 0, {   ,,,,logConfig.LogLevel =,“debug"   ,,}   ,,logConfig.LogPath =,参看只字符串(,“日志::log_path")   ,,if  len (logConfig.LogPath),==, 0, {   ,,,,logConfig.LogPath =,“。/logs"   ,,}   ,,logConfig.kafkaAddr =,参看只字符串(,“卡夫卡::server_addr")   ,,if  len (logConfig.kafkaAddr),==, 0, {   ,,,,err =, fmt只Errorf (,“invalid  kafka  addr  err")   ,,,,回来   ,,}   ,,logConfig.ESAddr =,参看只字符串(,“ES:: addr")   ,,if  len (logConfig.ESAddr),==, 0, {   ,,,,err =, fmt只Errorf (,“invalid  ES  addr  err")   ,,,,回来   ,,}   ,,logConfig.Topic =,参看只字符串(,“卡夫卡::topic")   ,,if  len (logConfig.Topic),==, 0, {   ,,,,err =, fmt只Errorf (,“invalid  topic  addr  err")   ,,,,回来   ,,}   ,才能回来   }

4。完成了initConfig的初始化

5。初始化initLogger

func  convertLogLevel (level 字符串),int  {   ,,开关(高度),{   ,,,,case “debug":   ,,,,,,return  logs.LevelDebug   ,,,,case “warn":   ,,,,,,return  logs.LevelWarn   ,,,,case “info":   ,,,,,,return  logs.LevelInfo   ,,,,case “trace":   ,,,,,,return  logs.LevelTrace   ,,}   ,,return  logs.LevelDebug   }   字符串,func  initLogger (logpath  logLevel 字符串),(err 错误),{   ,,config :=,(,地图(,字符串)接口{})   配置才能[,“filename"],=logpath   配置才能[,“level"],=, convertLogLevel (logLevel)   ,,,err  configStr:=, json只元帅(配置)   ,,if 犯错!=,nil  {   ,,,fmt只Println (,“marshal 失败了,犯错:“,犯错)   ,,,,回来   ,,}   日志才能只SetLogger (logs.AdapterFile,字符串(configStr))   ,才能回来   }

6。初始化卡夫卡

type  KafkaClient  struct  {   client  sarama.Consumer才能   addr 才能;字符串   topic 才能;字符串   wg  sync.WaitGroup才能   }   var  (   ,,kafkaClient  * KafkaClient   )   func  initKafKa (topic  addr 字符串,字符串)(err 错误),{   ,,kafkaClient =,, KafkaClient {}   ,,,消费者,err :=, sarama只NewConsumer(字符串只分裂(addr,,,,,),, nil)   ,,if  err  !=, nil  {   ,,,日志只错误(,“Failed 用strat  consumer :“,犯错)   ,,,,return    ,,}   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

利用golang怎么对卡夫卡进行连接