当前位置: 首页 > news >正文

毕节网站建设兼职20条优化措施

毕节网站建设兼职,20条优化措施,界面设计怎么写,展厅设计上海kafka消费积压 如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。 消费积压时, (1) 可以增加Topic的分区数,并且增加消费组的消费者数量&#…

kafka消费积压

如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。

消费积压时,

(1) 可以增加Topic的分区数,并且增加消费组的消费者数量,让消费者数等于分区数。
(2) 还可以使用多线程消费,提高消费速度。

kafka多线程消费的代码:

public class ThirdMultiConsumerThreadDemo {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,Runtime.getRuntime().availableProcessors());consumerThread.start();}/**** kafka配置* @return*/public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}/*** kafka消费者线程*/public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties props, String topic, int threadNumber) {kafkaConsumer = new KafkaConsumer<>(props);kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records =kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {log.error("run error", e);} finally {kafkaConsumer.close();}}}/*** 处理消息*/public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {//处理records.for (ConsumerRecord<String, String> record : records) {System.out.println("==========>record:"+record.value() + ",thread:" + Thread.currentThread().getName());}}}}

发送消息后,使用多线程消息,运行结果如下:

==========>record:{"id":"1234","name":"lin"},thread:pool-1-thread-1
==========>record:{"id":"5678","name":"chen"},thread:pool-1-thread-2
==========>record:{"id":"91011","name":"wu"},thread:pool-1-thread-3

参考资料:

《深入理解Kafka:核心设计与实践原理》

http://www.khdw.cn/news/1687.html

相关文章:

  • 网站建设需要的技能有哪些上海谷歌seo
  • 有域名有空间怎么做网站上海网站建设服务
  • 代运营工作内容安卓优化大师hd
  • 网站seo快速排名软件友情链接2598
  • 黄石网站设计公司如何在百度搜索排名靠前
  • 济南的网站建设公司网站运营包括哪些内容
  • 电脑wordpress客服端seo企业优化顾问
  • 海南网站建设哪家专业创建网页
  • 阳江网站推广优化seo引擎优化是什
  • 网站制作应用知识东莞谷歌推广
  • 温州定制网站建设电话市场调研报告模板
  • 泉州外贸网站建设都有哪些公司360营销推广
  • 生物科技网站建设方案可以营销的十大产品
  • 送上门卤菜网站要怎么做网站建站教程
  • 产品互联网做推广做什么网站好内蒙古最新消息
  • 国内建站公司淮安网站seo
  • 网站建设工作分解网页是怎么制作的
  • 网站开发专业公司有哪些软文营销策划
  • 静海网站建设广州网站seo推广
  • wordpress万年历插件上海关键词优化报价
  • 备案的网站名称写什么百度做广告推广怎么样
  • 义乌城市投资建设集团网站淄博seo
  • 网店运营都要做什么汕头seo网络推广服务
  • html5公司网站源码网站怎么做的
  • 番禺网站建设设计公司产品推广文案
  • 郑州网站建设+论坛国家税务总局网
  • 渠道销售优化网站有哪些方法
  • 深圳外贸公司哪里集中seo百度发包工具
  • php网站开发薪资企业培训权威机构
  • 做销售的网站深圳网络推广建站