当前位置: 首页 > news >正文

永仁网站建设网站定制的公司

永仁网站建设,网站定制的公司,启东市住房建设局网站,江苏外贸型网站制作目录一、发送消息类型1、同步消息2、异步消息3、单向消息4、顺序消费5、延迟消费二、消费模式1、集群模式2、广播模式3、消费模式扩展4、如何配置三、其他用法1、事务消息2、过滤消息1)Tag过滤2)SQL方式过滤源码放到了GitHub仓库上,地址 http…

目录

  • 一、发送消息类型
    • 1、同步消息
    • 2、异步消息
    • 3、单向消息
    • 4、顺序消费
    • 5、延迟消费
  • 二、消费模式
    • 1、集群模式
    • 2、广播模式
    • 3、消费模式扩展
    • 4、如何配置
  • 三、其他用法
    • 1、事务消息
    • 2、过滤消息
      • 1)Tag过滤
      • 2)SQL方式过滤

源码放到了GitHub仓库上,地址 https://github.com/shengwanping/SpringBoot-RocketMQ/tree/dev_01

一、发送消息类型

1、同步消息

发送同步消息是指producer向 broker发送消息,执行API时同步等待,直到broker服务器返回发送结果

// 可以使用RocketMQTemplate类下面的syncSend方法
SendResult sendResult = rocketMQTemplate.syncSend("topic_001", "Hello RocketMQ 同步消息");
System.out.println(sendResult);

2、异步消息

指producer向broker发送消息时异步执行,不会影响后面逻辑。而异步里面会调用一个回调方法,来处理消息发送成功或失败的逻辑

// 可以使用RocketMQTemplate类下面的asyncSend方法
rocketMQTemplate.asyncSend("topic_001", "Hello RocketMQ 异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步消息 发送成功!");}@Overridepublic void onException(Throwable throwable) {System.out.println("异步消息 发送失败!");}});

3、单向消息

是指producer向 broker发送消息,执行API时直接返回,不等待broker 服务器的响应

// 可以使用RocketMQTemplate类下面的sendOneWay方法
rocketMQTemplate.sendOneWay("topic_001", "Hello RocketMQ 单项消息");

4、顺序消费

就是让消费者按照生产者发送消息的顺序去消费。

应用场景:比如电商系统需要实现,订单创建、支付、完成顺序的流程。RocketMQ默认是并发消费,没有顺序的。需要顺序消费需要通过如下配置:

首先消费者@RocketMQMessageListener注解,consumeMode 设置为ConsumeMode.ORDERLY

@RocketMQMessageListener(topic = "topic_001",consumerGroup = "${rocketmq.consumer.group}",consumeMode = ConsumeMode.ORDERLY)

然后生产者调用含有Orderly的方法:

	//                                topic          消息               队列rocketMQTemplate.syncSendOrderly("topic_001", "1001顺序消费-创建", "1001");rocketMQTemplate.syncSendOrderly("topic_001", "1001顺序消费-支付", "1001");rocketMQTemplate.syncSendOrderly("topic_001", "1001顺序消费-完成", "1001");rocketMQTemplate.syncSendOrderly("topic_001", "1002顺序消费-创建", "1002");rocketMQTemplate.syncSendOrderly("topic_001", "1002顺序消费-支付", "1002");rocketMQTemplate.syncSendOrderly("topic_001", "1002顺序消费-完成", "1002");

在消费者打印结果:

1001顺序消费-创建
1001顺序消费-消费
1001顺序消费-完成
1002顺序消费-创建
1002顺序消费-消费
1002顺序消费-完成

5、延迟消费

就是生产者设定延迟时间,时间到了消费者才能去消费

应用场景:一种比较常见的场要就是在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后customer会收到这条订单满息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式去处理了,

RocketMQ不能自定义延迟时间,有特定等级如下
延迟等级 0 不延迟,1 延时1s,2 延时5s,3 延时10s,4 延时 30s,以此类推。。。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

// 延迟方法                         topic           消息      默认3秒,没有发送消息会抛出异常   延迟等级
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel)
// 演示
rocketMQTemplate.syncSend("topic_001", MessageBuilder.withPayload("延迟消费5秒").build(), 3000, 2);
rocketMQTemplate.syncSend("topic_001", MessageBuilder.withPayload("延迟消费30秒").build(), 3000, 4);

二、消费模式

RocketMQ的消费者有两种消费模式:BROADCASTING广播模式,CLUSTERING集群模式,默认集群消费模式。

1、集群模式

理解:

如果这个消费者组都是集群模式,那么这个消费者组会去平分这个topic下面的消息,且一条消息只能被一个消费者消费

举个例子:

生产者给topic_1发送了10条消息,消费topic_1的这个消费者组有2个消费者,那么这两个消费者就会平分这10条消息,每个消费者5条消息。
但是经测试有时也会一个消费者6条,另一个消费者4条。(这个问题笔者占时也不清楚,待解答)

2、广播模式

理解:

如果这个消费者组都是广播模式,那么这个消费者组中的每个消费者都会去执行这个topic下面所有的消息,相当于一条消息会被执行多次

举个例子:

生产者给topic_2发送了10条消息,消费topic_2的这个消费者组有2个消费者,那么这两个消费者都会去消费这10条消息

3、消费模式扩展

生产者给Topic_1推送了10条消息,然后同时有两个消费者组对Topic_1进行消费。

消费者组1 中有两个消费者,分别是消费者A和消费者B,消费者A是集群模式,消费者B是广播模式。
消费者组2 中有一个消费者,是消费者C,消费者C是广播模式。

这个时候是怎么消费的呢?

答案是:消费者C会全量消费10条消息,消费者B也会全量消费10条消息,而消费者A只会消费一半消息(可能4条、5条、6条)

解释:由上可以看出,消费多少消息是由消费者的消费模式决定的。因为B、C都是广播模式,所以会消费这个Topic下面所有消息,而A是集群模式,他只会消费到的消息是
消费消息数量 = Topic中消息总数/消费者组中消费者数量

4、如何配置

在消费者 @RocketMQMessageListener注解中配置messageModel 参数,(没有设置默认集群模式)

设置为MessageModel.CLUSTERING,则是集群模式
设置为MessageModel.BROADCASTING,则是广播模式

如下:

@RocketMQMessageListener(topic = "topic_001",consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.CLUSTERING)

三、其他用法

1、事务消息

Half(Prepare) Message——半消息(预处理消息)

半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。

Message Status Check——消息状态回查

由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit或Rollback)。可以看出, Message Status Check主要用来解决分布式事务中的超时问题。

在这里插入图片描述
1.应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ。
2. prepare消息发送成功后,应用模块执行数据库事务(本地事务)。
3. 根据数据库事务执行的结果,再返回Commit或Rollback给MQ。
4. 如果是Commit, MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息。
5. 第3步的执行结果如果没响应,或是超时的,启动定时任务回查事务状态(最多重试15次,超过了默认丢弃此消息) ,处理结果同第4步。
6. MQ消费的成功机制由MQ自己保证。

生产者发送事务消息:

rocketMQTemplate.sendMessageInTransaction("topic_001", MessageBuilder.withPayload("事务消息").build(), null);
// rocketmq事务消息  配置类
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务,如果是COMMIT则消息发送成功,如果是ROLLBACK则直接丢弃消息,如果是UNKNOWN则调用checkLocalTransaction()try {System.out.println("executeLocalTransaction");}catch (Exception e){e.printStackTrace();return RocketMQLocalTransactionState.UNKNOWN;}return RocketMQLocalTransactionState.COMMIT;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {// 检查本地事务(最多调用15次,如果全部失败则ROLLBACK丢弃消息)System.out.println("checkLocalTransaction");return RocketMQLocalTransactionState.COMMIT;}
}

2、过滤消息

在消费端进行消息消费的时候,我们根据业务需求,可以对消息进行过滤处理需要的消息
尤其是广播模式下,消息过滤经常使用
RocketMQ提供了TAG和SQL表达式两种消息过滤方式

1)Tag过滤

生产者需要在Topic后面加上 冒号 + TAG
消费者需要配置 selectorType = SelectorType.TAGselectorExpression

消费端配置如下:

@Component
// 指定topic 和 消费者组
@RocketMQMessageListener(topic = "topic_001",consumerGroup = "${rocketmq.consumer.group}",selectorType = SelectorType.TAG, selectorExpression = "TAG1 || TAG2")
public class ConsumerMode implements RocketMQListener<String> { // 继承RocketMQListener接口@Overridepublic void onMessage(String s) {System.out.println("收到的消息是:"+s);}
}

生产端发送消息如下:

	rocketMQTemplate.convertAndSend("topic_001"+":"+"TAG1", MessageBuilder.withPayload("TAG1消息").build());rocketMQTemplate.convertAndSend("topic_001"+":"+"TAG2", MessageBuilder.withPayload("TAG2消息").build());rocketMQTemplate.convertAndSend("topic_001"+":"+"TAG3", MessageBuilder.withPayload("TAG3消息").build());

消费端打印:

收到的消息是:{"payload":"TAG1消息","headers":{"id":"3df5f1a5-cbb2-fac5-e95b-489f29bc4a77","timestamp":1678204919985}}
收到的消息是:{"payload":"TAG2消息","headers":{"id":"1112d1cf-e1a9-bc2c-b38d-59a667196385","timestamp":1678204920260}}

2)SQL方式过滤

SQL表达式方式可以根据发送消息时输入的属性进行一些计算。
RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。

数字比较,如>,>=, <,<=,
BETWEEN, =;
字符比较,如:=,<>,IN;
IS NULL or IS NOT NULL;
逻辑运算符:AND, OR, NOT;

常量类型:
数值,如:123,3.1415;
字符,如: ‘abc’,必须使用单引号;
NULL,特殊常量
Boolean, TRUE or FALSE;

首先要在broker配置文件里面加入支持,否则会报错

1、rocketmq-4.4.0\conf\broker.conf 加入enablePropertyFilter = true
2、重启broker 并指定 配置文件 mqbroker.cmd -n localhost:9876 autoCreateToopicEnable=true -c ../conf/broker.conf

完成上面两步操作接口用sql方式过滤消息

生产者:

		Message msg1 = MessageBuilder.withPayload("rocketmq过滤消息测试01").build();Map<String, Object> headers = new HashMap<>();headers.put("name", "xiao ming");headers.put("a", 2) ;rocketMQTemplate.convertAndSend("topic_001", msg1, headers);Message msg2 = MessageBuilder.withPayload("rocketmq过滤消息测试02").build();Map<String, Object> headers1 = new HashMap<>();headers1.put("name", "xiao hua");headers1.put("a", 7) ;rocketMQTemplate.convertAndSend("topic_001", msg2, headers1);

消费者: 主要是selectorType = SelectorType.SQL92, selectorExpression = "name = 'xiao ming' and a < 5")

@Component
// 指定topic 和 消费者组
@RocketMQMessageListener(topic = "topic_001",consumerGroup = "${rocketmq.consumer.group}",
//        selectorType = SelectorType.TAG, selectorExpression = "TAG1 || TAG2")selectorType = SelectorType.SQL92, selectorExpression = "name = 'xiao ming' and a < 5")
public class ConsumerMode implements RocketMQListener<String> { // 继承RocketMQListener接口@Overridepublic void onMessage(String s) {System.out.println("收到的消息是:"+s);}
}

消费者打印:
根据过滤条件只打印了第一条消息

收到的消息是:{"payload":"rocketmq过滤消息测试01","headers":{"id":"da3ac866-4140-b440-2f4c-ecb5ce4d9965","timestamp":1678206096192}}
http://www.khdw.cn/news/49854.html

相关文章:

  • 深圳微信小程序开发网站建设怎样推广自己的商城
  • 做自媒体哪个平台最好seo去哪学
  • 厦门建设局网站首页6北京seo诊断
  • 公关公司是做什么的?长春做网络优化的公司
  • 网站建设公司市场开发方案搜狗推广登录平台官网
  • 日本网站 设计登录百度账号注册
  • 网站对公司的重要性正规职业技能培训机构
  • 建设网站建设费用关键词自动生成器
  • 面试学校网站开发东莞seo外包
  • wordpress板娘插件武汉seo收费
  • 房地产公司如何做网站怎么找一手app推广代理
  • 怎么做企业网站快手作品免费推广软件
  • 一元建站镇江seo
  • 网站页面打开速度慢搜索引擎优化方法与技巧
  • 江苏政府网站建设对比评估营销咨询服务
  • 杭州网站建设教育机构如何提高搜索引擎优化
  • 定制型网站关键词排名点击软件首页
  • 18成年人正能量软件长沙网站seo公司
  • 主机安装wordpress班级优化大师网页版
  • 北京工信部网站备案查询中国搜索引擎有哪些
  • 360门户网站怎样做工业设计公司
  • 网站密钥怎么做最近的新闻有哪些
  • wordpress 资讯官网seo是什么意思
  • 3g下订单的网站怎么做做推广的软件有哪些
  • 薪火相传网站建设刚刚刚刚刚刚刚刚刚刚刚刚刚刚
  • 有哪些外国网站做精油的企业推广软文
  • 如何做360购物网站找精准客户的app
  • 仁怀网站建设不好出手关键词seo排名
  • 网站提现功能开发网站建设服务公司
  • 金融视频直播网站开发b站2023年免费入口