golang封装调用kafka的工具包

封装一个golang调用kafka的工具包,包含了consumer,producer,auth,在自己的生产环境上做过验证。可以做参考作用,也可以直接使用。

部分代码

// Run 执行消费动作
func (cg *ConsumerGroup) Run(ctx context.Context) {
	defer cg.close()
	for {
		select {
		case err := <-cg.consumer.Errors():
			cg.logger.WithError(err).Errorln("Error channel")
			cg.handleConsumeError(err)
		case <-ctx.Done():
			err := ctx.Err()
			if err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
				// 正常退出
				return
			}
			cg.logger.WithError(err).Errorln("上下文异常退出")
		default:
			if err := cg.consumer.Consume(ctx, cg.options.topics, cg.options); err != nil {
				cg.logger.WithError(err).Errorln("Consume Error channel")
				cg.handleConsumeError(err)
			}
		}
	}
}
func (kc *KafkaClient) CreateTopic(ctx context.Context, topicName string, ops ...TopicOption) (err error) {
	topicConf := &sarama.TopicDetail{
		NumPartitions:     3,
		ReplicationFactor: 1,
		ConfigEntries: map[string]*string{
			"cleanup.policy": &TopicTTLPolicy,
			"retention.ms":   &TopicTTLRetention,
		},
	}
	for _, op := range ops {
		op(topicConf)
	}
	return kc.cli.CreateTopic(topicName, topicConf, false)
}

代码太多,全部写出来不现实,详细代码在这个下载包里,有test文件可以查看是如何调用的

另外需要自己处理消费时的速度,比如用channel控制同时消费的数量
golang调用kafka下载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/581070.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

每日一题:地下城游戏

恶魔们抓住了公主并将她关在了地下城 dungeon 的 右下角 。地下城是由 m x n 个房间组成的二维网格。我们英勇的骑士最初被安置在 左上角 的房间里&#xff0c;他必须穿过地下城并通过对抗恶魔来拯救公主。 骑士的初始健康点数为一个正整数。如果他的健康点数在某一时刻降至 0…

ThreeJs 环境配置及遇到问题的解决方法

一、环境搭建 ThreeJs在实际在实际使用中更多的是结合框架开发例如&#xff1a;vue框架、react框架&#xff0c;在使用时需要配置开发环境&#xff0c;本文使用的是vscode ThreeJs NodeJs vue 1、ThreeJs安装 下载路径&#xff1a;GitHub - mrdoob/three.js: JavaScript…

CentOS命令大全:掌握关键命令及其精妙用法!

CentOS是一种流行的开源企业级Linux发行版&#xff0c;它基于Red Hat Enterprise Linux (RHEL)的源代码构建。对于系统管理员和运维工程师来说&#xff0c;掌握CentOS的常用命令至关重要。 这些命令不仅可以帮助管理服务器&#xff0c;还可以进行故障排查、性能监控和安全加固等…

【idea】idea 中 git 分支多个提交合并一个提交到新的分支

一、方法原理讲解 我们在 dev 分支对不同的代码文件做了多次提交。现在我们想要把这些提交都合并到 test 分支。首先我们要明白四个 git 操作&#xff0c; commit&#xff1a;命令用于将你的代码变更保存到本地代码仓库中&#xff0c;它创建了一个新的提交&#xff08;commit…

Ubuntu编译安装MariaDB并进行初始化配置

Ubuntu编译安装MariaDB并进行初始化配置 1. 编译安装MariaDB2. 配置MariaDB3. Docker安装MariaDB 1. 编译安装MariaDB MariaDB官方安装文档&#xff1a;https://mariadb.com/kb/en/Build_Environment_Setup_for_Linux/    下载MariaDB源码&#xff1a;https://mariadb.org/ma…

Spring Boot 如何实现缓存预热

Spring Boot 实现缓存预热 1、使用启动监听事件实现缓存预热。2、使用 PostConstruct 注解实现缓存预热。3、使用 CommandLineRunner 或 ApplicationRunner 实现缓存预热。4、通过实现 InitializingBean 接口&#xff0c;并重写 afterPropertiesSet 方法实现缓存预热。 1、使用…

TCP-模拟BS架构通信

简介 bs是通过浏览器进行访问的每次访问都会开启一个短期的socket用来访问服务器的资源 响应报文的格式 服务端 bs架构中的b是浏览器&#xff0c;不需要我们书写&#xff0c;我们只需要书写服务端即可 服务端 public class Server {public static void main(String[] args) {S…

[C++]22:C++11_part_one

C11 一.列表初始化&#xff1a;1.{}初始化&#xff1a;2.C11扩大了列表初始化的范围&#xff1a;3.std::initializer_list1.简单类型的列表初始化&#xff1a;2.复杂类型的列表初始化3.实现vector的列表初始化4.实现list的列表初始化&#xff1a;5.不支持列表初始化&#xff1a…

制作一个RISC-V的操作系统十六-系统调用

文章目录 用户态和内核态mstatus设置模式切换核心流程封装代码背景解释代码示例解析解释目的 用户态和内核态 mstatus设置 此时UIE设置为1和MPIE为1&#xff0c;MPP设置为0 代表当前权限允许UIE中断发生&#xff0c;并且在第一个mret后将权限恢复为用户态&#xff0c;同时MIE也…

易错知识点(学习过程中不断记录)

快捷键专区&#xff1a; 注释&#xff1a;ctrl/ ctrlshift/ 保存&#xff1a;ctrls 调试&#xff1a; 知识点专区&#xff1a; 1基本数据类型 基本数据类型有四类&#xff1a;整型、浮点型、字符型、布尔型&#xff08;Boolean&#xff09;&#xff0c; 分为八种&#xff…

UE5 GAS开发P40 周期性效果,持续治疗

Periodic Gameplay Effects周期性的游戏效果 它们在一段时间内以固定的间隔重复应用到目标上。这种效果通常用于表示持续性伤害、治疗或其他影响&#xff0c;例如中毒、灼烧或回复效果。 修改GE_CrystalHeal,在Period改为每0.1秒执行一次 假如同时有三个持续时间在进行,那么这…

STM32与OLED显示屏通信(四针脚和七阵脚)

系列文章目录 STM32单片机系列专栏 C语言术语和结构总结专栏 文章目录 1. 单片机调试 2. OLED简介 3. 接线 4. OLED驱动函数 4.1 四针脚版本 OLED.c OLED.h OLED_Font.h 4.2 七针脚版本 引脚连接 OLED.c OLED.h OLED_Font.h 5. 主函数 工程文件模板 1. 单片机…

linux下安装deepspeed

安装步骤 一开始安装deepspeed不可以使用pip直接进行安装。 这时我们需要利用git进行clone下载到本地&#xff1a; git clone https://github.com/microsoft/DeepSpeed.git 进入到deepspeed的安装目录下 cd /home/bingxing2/ailab/group/ai4agr/wzf/Tools/DeepSpeed 激活…

verilog 从入门到看得懂---matlab 自动生成verilog

matlab 的强大不用多说&#xff0c;以前经常用simulink 生成c&#xff0c;最近尝试用simulink进行了verilog的生成&#xff0c;方法也很简单。 一个简单的示例如下。 1&#xff0c;新建一个模型文件&#xff0c;并且根据需要进行模型搭建 2.配置HDL生成模块 3.点击 generation…

纯血鸿蒙APP实战开发——全局状态保留能力弹窗

全局状态保留能力弹窗 介绍 全局状态保留能力弹窗一种很常见的能力&#xff0c;能够保持状态&#xff0c;且支持全局控制显隐状态以及自定义布局。使用效果参考评论组件 效果图预览 使用说明 使用案例参考短视频案例 首先程序入口页对全局弹窗初始化&#xff0c;使用Globa…

Linux学习之路 -- 进程篇 -- 自定义shell的编写

前面介绍了进程程序替换的相关知识&#xff0c;接下来&#xff0c;我将介绍如何基于前面的知识&#xff0c;编写一个简单的shell&#xff0c;另外本文的所展示的shell可能仅供参考。 目录 <1>获取用户的输入和打印命令行提示符 <2>切割字符串 <3>执行这个…

qt-C++笔记之滑动条QSlider和QProgressBar进度条

qt-C笔记之滑动条QSlider和QProgressBar进度条 —— 2024-04-28 杭州 本例来自《Qt6 C开发指南》 文章目录 qt-C笔记之滑动条QSlider和QProgressBar进度条1.运行2.阅读笔记3.文件结构4.samp4_06.pro5.main.cpp6.widget.h7.widget.cpp8.widget.ui 1.运行 2.阅读笔记 3.文件结构…

智慧供热一站式热网平衡多功能集成系统

供热管理地域分散的现实&#xff0c;决定必须采用先进技术手段开发软件系统&#xff0c;使各管理单位互联互通。在多年技术积累的基础上&#xff0c;公司采用目前成熟而且领先的技术架构&#xff0c;研发了适用于多个组织机构集中式管理的供热管理软件。使管理在技术上不再受地…

经典的目标检测算法有哪些?

一、经典的目标检测算法有哪些&#xff1f; 目标检测算法根据其处理流程可以分为两大类&#xff1a;One-Stage&#xff08;单阶段&#xff09;算法和Two-Stage&#xff08;两阶段&#xff09;算法。以下是一些经典的目标检测算法&#xff1a; 单阶段算法: YOLO (You Only Loo…

Java集合框架-Collection-queue

目录 一、Deque二、ArrayDequeArrayDeque层次结构图ArrayDeque概述ArrayDeque底层数据结构ArrayDeque常用方法(简略) 三、PriorityQueuePriorityQueue层次结构图PriorityQueue概述PriorityQueue 底层数据结构PriorityQueue常用方法(详细) Java里有一个叫做Stack的类&#xff0c…