介绍
本篇文章为大家展示了利用golang怎么对卡夫卡进行连接,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
1。首先初始化配置配置把卡夫卡和ES的地址配置好还有一个日志方便查看
配置信息如下用到的库是
github.com/astaxie/beego/config“刷:平原;” (日志) log_level =,调试 log_path =啊?logs/log_transfer.log" (卡夫卡) server_addr =192.168.0.134:9092 topic =nginx_log [ES] 时间=addr http://192.168.0.134:9200
2。读取之配置存取进结构体
type LogConfig struct { kafkaAddr 才能;字符串 ESAddr 才能;字符串 LogPath 才能;字符串 LogLevel 才能;字符串 Topic 才能;字符串 } var ( ,,logConfig * LogConfig )
3。读取之配置代码如下
func initConfig (filename  conftype 字符串,字符串)(err 错误),{ ,,,err 相依:=,配置只NewConfig (conftype文件名) ,,if err  !=, nil { ,,,fmt只Println (,“new  config faild,犯错:“,犯错) ,,,,回来 ,,} ,,logConfig =,, LogConfig {} ,,logConfig.LogLevel =,参看只字符串(,“日志::log_level") ,,if len (logConfig.LogLevel),==, 0, { ,,,,logConfig.LogLevel =,“debug" ,,} ,,logConfig.LogPath =,参看只字符串(,“日志::log_path") ,,if len (logConfig.LogPath),==, 0, { ,,,,logConfig.LogPath =,“。/logs" ,,} ,,logConfig.kafkaAddr =,参看只字符串(,“卡夫卡::server_addr") ,,if len (logConfig.kafkaAddr),==, 0, { ,,,,err =, fmt只Errorf (,“invalid  kafka addr err") ,,,,回来 ,,} ,,logConfig.ESAddr =,参看只字符串(,“ES:: addr") ,,if len (logConfig.ESAddr),==, 0, { ,,,,err =, fmt只Errorf (,“invalid  ES addr err") ,,,,回来 ,,} ,,logConfig.Topic =,参看只字符串(,“卡夫卡::topic") ,,if len (logConfig.Topic),==, 0, { ,,,,err =, fmt只Errorf (,“invalid  topic addr err") ,,,,回来 ,,} ,才能回来 }
4。完成了initConfig的初始化
5。初始化initLogger
func convertLogLevel (level 字符串),int { ,,开关(高度),{ ,,,,case “debug": ,,,,,,return logs.LevelDebug ,,,,case “warn": ,,,,,,return logs.LevelWarn ,,,,case “info": ,,,,,,return logs.LevelInfo ,,,,case “trace": ,,,,,,return logs.LevelTrace ,,} ,,return logs.LevelDebug } 字符串,func initLogger (logpath logLevel 字符串),(err 错误),{ ,,config :=,(,地图(,字符串)接口{}) 配置才能[,“filename"],=logpath 配置才能[,“level"],=, convertLogLevel (logLevel) ,,,err configStr:=, json只元帅(配置) ,,if 犯错!=,nil { ,,,fmt只Println (,“marshal 失败了,犯错:“,犯错) ,,,,回来 ,,} 日志才能只SetLogger (logs.AdapterFile,字符串(configStr)) ,才能回来 }
6。初始化卡夫卡
type KafkaClient struct { client sarama.Consumer才能 addr 才能;字符串 topic 才能;字符串 wg sync.WaitGroup才能 } var ( ,,kafkaClient * KafkaClient ) func initKafKa (topic  addr 字符串,字符串)(err 错误),{ ,,kafkaClient =,, KafkaClient {} ,,,消费者,err :=, sarama只NewConsumer(字符串只分裂(addr,,,,,),, nil) ,,if err  !=, nil { ,,,日志只错误(,“Failed 用strat consumer :“,犯错) ,,,,return ,,} null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null利用golang怎么对卡夫卡进行连接