当前位置:首页 > 大杂烩 > 正文内容

go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

高老师3年前 (2021-12-28)大杂烩2125

使用Go的过程记录了全部的rabbitmq的go代码,方便自己下次Copy,go的资料比较少,seo估计很好做,流量速度过来。

【一】.简单队列.生产者将消息发送到队列,消费者从队列中获取消息。

1.0.connection code

func NewRabbitMQ() *amqp.Channel {
	// 获取connection
	amqUrl := "amqp://admin:elecfans@spiderqueue.elecfans.net:5672/"
	connection, err := amqp.Dial(amqUrl)
	if err != nil {
		panic(fmt.Sprintf("获取connection异常:%s\n", err))
	}

	// 获取channel
	channel, err := connection.Channel()
	if err != nil {
		panic(fmt.Sprintf("获取channel异常:%s\n", err))
	}

	return channel
}

1.1.client code:

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明队列(不存在自动创建)
queueName := "ic_order_active"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 生产_发送消息到队列
message := "ic元器件活动来新单啦"
err = channel.Publish(
    // 交换机
    "",
    // 队列名称
    queueName,
    // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
    false,
    // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
    false,
    amqp.Publishing{
        ContentType: "text/plain",
        // 队列和消息同时设置持久化
        DeliveryMode: 2,
        Body:         []byte(message),
    },
)
if err != nil {
    fmt.Printf("发送消息到队列异常:%s", err)
    return
}

1.2.service code

// 消费_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 消费_声明队列
queueName := "ic_order_active"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 消费_获取队列中的消息
message, err := channel.Consume(
    // 队列名称
    queueName,
    // 消费者名称
    "ic订单消费者",
    // 是否自动ack
    false,
    // 是否排他性队列标识
    false,
    false,
    false,
    nil,
)
if err != nil {
    return
}

// 输出消息
for msg := range message {
    // 打印消息内容
    fmt.Printf("收到队列消息%s \n", msg.Body)
    // 确认收到消息
    msg.Ack(true)
}

【二】.Work模式.一个生产者,多个消费者,一个消息只能被一个消费者获取到

2.0.client code

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明队列(不存在自动创建)
queueName := "ic_order_active"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 生产_发送消息到队列
message := "ic元器件活动来新单啦,订单id"
messageSize := 10
for i := 0; i < messageSize; i++ {
    // 方便观察消费者
    time.Sleep(time.Second * 1)
    err = channel.Publish(
        // 交换机
        "",
        // 队列名称
        queueName,
        // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
        false,
        // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 队列和消息同时设置持久化
            DeliveryMode: 2,
            Body:         []byte(message + strconv.Itoa(i)),
        },
    )
    if err != nil {
        fmt.Printf("发送消息到队列异常:%s", err)
        return
    }
}

2.1.service code

// 消费_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 消费_声明队列
queueName := "ic_order_active"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 设置同一时间服务器只会发送一条消息给消费者
channel.Qos(
    // 每次获取多少条
    10,
    // 预加载数量(rabbitMq不支持)
    0,
    // false->对当前队列可用 true->对channel可用(rabbitMq不支持)
    false,
)

// 消费_获取队列中的消息
message, err := channel.Consume(
    // 队列名称
    queueName,
    // 消费者名称
    "ic订单消费者",
    // 是否自动ack
    false,
    // 是否排他性队列标识
    false,
    false,
    false,
    nil,
)
if err != nil {
    return
}

// 输出消息
for msg := range message {
    // 打印消息内容
    fmt.Printf("收到队列消息%s \n", msg.Body)
    // 确认收到消息
    msg.Ack(true)
}

【三】.订阅模式(fanout).

一个生产者,多个消费者

每个消费者拥有自己的队列

生产者将消息发送到交换机

每个队列自己去绑定交换机

(交换机没有储存能力,发送到没有任何队列绑定的交换机则消息丢失)

3.0.client code

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明交换机
exchangeName := "notice"
err := channel.ExchangeDeclare(
    // 交换机名称
    exchangeName,
    // 交换机类型
    "fanout",
    // 持久化
    true,
    // true->当所有绑定都与交换器解绑后,会自动删除此交换器
    false,
    // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器
    false,
    // 是否非阻塞
    false,
    // 其他参数
    nil,
)
if err != nil {
    fmt.Printf("声明交换机异常:%s", err)
    return
}

// 生产_发送消息到交换机
message := "最新消息,华秋全场元器件3折起"
messageSize := 10
for i := 0; i < messageSize; i++ {
    // 方便观察消费者
    time.Sleep(time.Second * 1)
    err = channel.Publish(
        // 交换机
        exchangeName,
        // 路由key
        "",
        // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
        false,
        // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 队列和消息同时设置持久化
            DeliveryMode: 2,
            Body:         []byte(message + strconv.Itoa(i)),
        },
    )
    if err != nil {
        fmt.Printf("发送消息到队列异常:%s", err)
        return
    }
}

【四】.直接匹配(direct)

4.0.client code

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明交换机
exchangeName := "pcb_layout_order"
err := channel.ExchangeDeclare(
    // 交换机名称
    exchangeName,
    // 交换机类型
    "direct",
    // 持久化
    true,
    // true->当所有绑定都与交换器解绑后,会自动删除此交换器
    false,
    // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器
    false,
    // 是否非阻塞
    false,
    // 其他参数
    nil,
)
if err != nil {
    fmt.Printf("声明交换机异常:%s", err)
    return
}

// 生产_发送消息到交换机
allRouteKey := []string{
    "order_insert", // 新增订单
    "order_delete", // 删除订单
}

// 循环发送到两个路由key
message := "订单id1事件"
for _, routeKey := range allRouteKey {
    err = channel.Publish(
        // 交换机
        exchangeName,
        // 路由key
        routeKey,
        // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
        false,
        // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 队列和消息同时设置持久化
            DeliveryMode: 2,
            Body:         []byte(message),
        },
    )
}

4.1.service code

// 消费_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 消费_声明队列
queueName := "notice_queue"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 队列绑定交换机+绑定订单新增key
exchangeName := "pcb_layout_order"
allRouteKey := []string{
    "order_insert", // 新增订单
    "order_delete", // 删除订单
}
for _, routeKey := range allRouteKey {
    channel.QueueBind(
        // 队列名称
        queueName,
        // 绑定的键
        routeKey,
        // 交换机名称
        exchangeName,
        // 是否阻塞处理
        false,
        // 其他参数
        nil,
    )
}

// 设置同一时间服务器只会发送一条消息给消费者
channel.Qos(
    // 每次获取多少条
    10,
    // 预加载数量(rabbitMq不支持)
    0,
    // false->对当前队列可用 true->对channel可用(rabbitMq不支持)
    false,
)

// 消费_获取队列中的消息
message, err := channel.Consume(
    // 队列名称
    queueName,
    // 消费者名称
    "ic订单消费者",
    // 是否自动ack
    false,
    // 是否排他性队列标识
    false,
    false,
    false,
    nil,
)
if err != nil {
    return
}
// 输出消息
for msg := range message {
    // 打印消息内容
    fmt.Printf("收到队列消息%s \n", msg.Body)
    // 确认收到消息
    msg.Ack(true)
}

【五】.直接匹配(topic)

topic同样根据key匹配到队列,#匹配一个或者多个,*匹配一个.(切记:发往topic交换器的routing_key它必须是.分隔的几个词)

5.0.client code

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明交换机
exchangeName := "smt_steel_order"
err := channel.ExchangeDeclare(
    // 交换机名称
    exchangeName,
    // 交换机类型
    "topic",
    // 持久化
    true,
    // true->当所有绑定都与交换器解绑后,会自动删除此交换器
    false,
    // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器
    false,
    // 是否非阻塞
    false,
    // 其他参数
    nil,
)
if err != nil {
    fmt.Printf("声明交换机异常:%s", err)
    return
}

// 生产_发送消息到交换机
allRouteKey := []string{
    "order.insert", // 新增订单
    "order.delete", // 删除订单
}
for _, routeKey := range allRouteKey {
    //fmt.Print(routeKey)
    message := "来自" + routeKey + "的消息"
    err = channel.Publish(
        // 交换机
        exchangeName,
        // 路由key
        routeKey,
        // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
        true,
        // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 队列和消息同时设置持久化
            DeliveryMode: 2,
            Body:         []byte(message),
        },
    )
}

5.1.service code

// 消费_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 消费_声明队列
queueName := "notice_queue"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 队列绑定交换机+绑定订单新增key
exchangeName := "smt_steel_order"
routeKey := "order.#"
channel.QueueBind(
    // 队列名称
    queueName,
    // 绑定的路由
    routeKey,
    // 交换机名称
    exchangeName,
    // 是否阻塞处理
    false,
    // 其他参数
    nil,
)

// 设置同一时间服务器只会发送一条消息给消费者
channel.Qos(
    // 每次获取多少条
    10,
    // 预加载数量(rabbitMq不支持)
    0,
    // false->对当前队列可用 true->对channel可用(rabbitMq不支持)
    false,
)

// 消费_获取队列中的消息
message, err := channel.Consume(
    // 队列名称
    queueName,
    // 消费者名称
    "smt订单消费者",
    // 是否自动ack
    false,
    // 是否排他性队列标识
    false,
    false,
    false,
    nil,
)
if err != nil {
    return
}

// 输出消息
for msg := range message {
    // 打印消息内容
    fmt.Printf("收到队列消息%s \n", msg.Body)
    // 确认收到消息
    msg.Ack(true)
}

扫描二维码推送至手机访问。

版权声明:本文由高久峰个人博客发布,如需转载请注明出处。

本文链接:http://blog.20230611.cn/post/614.html

分享给朋友:

“go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式” 的相关文章

c#中string和StringBuilder效率对比

c#中string和StringBuilder效率对比

    c#中string和StringBuilder直接看看执行速度。(2).String类型累计赋值Test               ...

PHP安装mongodb扩展

PHP安装mongodb扩展

在安装之前我们先看看官方给出的依赖关系.首先是dll文件和mongodb软件的依赖关系然后是PHP文件和dll的依赖关系我的是phpstudy的集成环境PHP5.4.45 NTS+Apache+Mysql【一】.安装mongodb3.0软件对比依赖关系下载mongodb3.0.msi软件,完整名称:...

Application的错误使用

Application的错误使用

Application 对象用于存储和访问来自任意页面的变量,类似 Session 对象。不同之处在于所有的用户分享一个 Application 对象,而 session 对象和用户的关系是一一对应的。很多的书籍中介绍的Application对象都喜欢以统计在线人数来介绍Application 对象...

Git从远程仓库更新文件

Git从远程仓库更新文件

 git   pull  https://git.oschina.net/392223903/learn.git   master   换为您的git地址...

Git日志查看和版本切换

Git日志查看和版本切换

日志查看:git log版本切换:方式1:git  reset  --hard  HEAD^   倒退一个版本git  reset  --hard  HEAD^^  倒退两个版本方式2:(版本号的形式,建议版本号码补充完...

c#关闭计算机的代码

c#关闭计算机的代码

    1.关机Process.Start("shutdown", "-s -t 0");    2. 注销  Proc...