当前位置: 首页 > news >正文

沈阳哪里有教做网站的广州网络营销公司

沈阳哪里有教做网站的,广州网络营销公司,wordpress 手机显示图片,做网站经济虚拟币诈骗定罪目录 减少发送mq的消息体内容 增加消费者数量 批量消费消息 临时队列转移 监控和预警机制 分阶段实施 最后还有一个方法就是开启队列的懒加载 这篇文章总结一下自己知道的解决消息积压得方法。 减少发送mq的消息体内容 像我们没有必要知道一个的中间状态,只需…

目录

减少发送mq的消息体内容

增加消费者数量

批量消费消息

临时队列转移

监控和预警机制

分阶段实施

最后还有一个方法就是开启队列的懒加载


这篇文章总结一下自己知道的解决消息积压得方法。

减少发送mq的消息体内容

像我们没有必要知道一个的中间状态,只需知道一个最终状态就可以了。
发送的消息体只用包含:id和状态等关键信息,不用发送一个完整的对象内容。
消费者收到消息之后,通过id调用原服务再将完整的消息对象内容查询出来即可,最后再进行消费处理。

增加消费者数量

采用动态增加消费者的数量

@Configuration
public class RabbitMQConfig {@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置并发消费者数量factory.setConcurrentConsumers(5);        // 初始消费者数量factory.setMaxConcurrentConsumers(20);    // 最大消费者数量// 动态调整消费者数量factory.setConsumerTagStrategy(queue -> "consumer-" + UUID.randomUUID());return factory;}
}
@Service
public class ConsumerManagerService {@Autowiredprivate RabbitListenerEndpointRegistry registry;public void adjustConsumerCount(String queueName, int count) {MessageListenerContainer container = registry.getListenerContainer(queueName);if (container instanceof SimpleMessageListenerContainer) {SimpleMessageListenerContainer simpleContainer = (SimpleMessageListenerContainer) container;simpleContainer.setConcurrentConsumers(count);}}
}

批量消费消息

@Service
public class BatchMessageConsumer {@RabbitListener(queues = "myQueue", containerFactory = "batchFactory")public void processBatch(List<Message> messages, Channel channel) {try {// 批量处理消息List<MessageDTO> dtos = messages.stream().map(this::convertToDTO).collect(Collectors.toList());// 批量保存到数据库batchSaveToDatabase(dtos);// 获取最后一条消息的deliveryTaglong lastDeliveryTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();// 批量确认channel.basicAck(lastDeliveryTag, true);} catch (Exception e) {// 批量拒绝handleBatchError(messages, channel);}}
}// 配置批量消费
@Configuration
public class BatchConsumerConfig {@Beanpublic SimpleRabbitListenerContainerFactory batchFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 启用批量消费factory.setBatchListener(true);// 批量大小factory.setBatchSize(100);// 批量超时时间factory.setReceiveTimeout(1000L);return factory;}
}

临时队列转移

@Service
public class MessageTransferService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void transferMessages(String sourceQueue, String tempQueue, int batchSize) {while (true) {// 从源队列批量获取消息List<Message> messages = new ArrayList<>();for (int i = 0; i < batchSize; i++) {Message message = rabbitTemplate.receive(sourceQueue);if (message == null) break;messages.add(message);}if (messages.isEmpty()) break;// 转移到临时队列messages.forEach(msg -> rabbitTemplate.send(tempQueue, msg));}}
}// 临时队列的消费者
@Component
public class TempQueueConsumer {@RabbitListener(queues = "#{tempQueue.name}")public void processMessage(Message message) {// 使用更高效的处理方式fastProcessMessage(message);}@Beanpublic Queue tempQueue() {return new Queue("temp-queue-" + UUID.randomUUID(), false, false, true);}
}

监控和预警机制

@Service
@Slf4j
public class QueueMonitorService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Scheduled(fixedRate = 60000) // 每分钟执行一次public void monitorQueueSize() {String queueName = "myQueue";// 获取队列信息Properties properties = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(queueName));// 获取消息数量int messageCount = properties.getMessageCount();// 检查消息堆积if (messageCount > threshold) {// 发送告警sendAlert(queueName, messageCount);// 动态调整消费者adjustConsumers(queueName, messageCount);}}private void adjustConsumers(String queueName, int messageCount) {// 根据消息数量动态调整消费者数量int newConsumerCount = calculateConsumerCount(messageCount);consumerManagerService.adjustConsumerCount(queueName, newConsumerCount);}
}

分阶段实施

@Service
public class MessageHandlingStrategy {public void handleMessageBacklog() {// 1. 首先增加消费者数量adjustConsumerCount();// 2. 如果仍然堆积,启用批量处理if (isStillBacklogged()) {enableBatchProcessing();}// 3. 如果问题持续,使用临时队列if (isEmergency()) {transferToTemporaryQueue();}}
}

最后还有一个方法就是开启队列的懒加载

http://www.khdw.cn/news/35599.html

相关文章:

  • 如何利用网站做推广西安seo培训学校
  • 网站设计与开发培训交换友链平台
  • wordpress主题页添加seo外链自动群发工具
  • 百度做销售网站多少钱建网站建设
  • 正规的环保行业网站开发广点通投放平台
  • yy陪玩网站怎么做百度企业推广怎么收费
  • 淘宝网站建设目标免费行情网站大全搜狐网
  • 有自己团队做网站上线多久广安网站seo
  • 邢台网站制作长沙seo运营
  • 济南网站模板嘉兴网站建设
  • wordpress数字链接出现404搜索关键词优化
  • 顺义哪有做网站厂家谷歌seo推广培训班
  • 网站建设项目策划书格式朋友圈广告推广文字
  • ps做景观有哪些素材网站广东网站营销seo方案
  • 荔湾做网站石家庄网站建设培训
  • wordpress简洁移动主题百度seo搜索引擎优化培训
  • 幼儿园网站建设实践研究网店推广的作用是
  • python做网站源码互联网网络推广
  • 网站会过期吗北京网络营销推广公司
  • 站外推广营销方案网站建设公司seo关键词
  • 龙岗网站开发中国去中心化搜索引擎
  • 做网销好的网站如何制作网页链接教程
  • 大型网站服务器配置seo短视频入口引流
  • 上海制作网站公司哪家好最新军事报道
  • dw 怎么做钓鱼网站网页搜索快捷键
  • 是想建个网站 用本地做服务器世界杯大数据
  • 南昌营销型网站建设什么是电商?电商怎么做
  • 网站移动排名建站模板哪个好
  • 趴比库的网站是谁建设的重庆seo和网络推广
  • php网站后台教程800元做小程序网站