当前位置: 首页 > news >正文

网站建设 经典书籍济南优化seo公司

网站建设 经典书籍,济南优化seo公司,陕西企业营销型网站,平台游戏kafka消费积压 如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。 消费积压时, (1) 可以增加Topic的分区数,并且增加消费组的消费者数量&#…

kafka消费积压

如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。

消费积压时,

(1) 可以增加Topic的分区数,并且增加消费组的消费者数量,让消费者数等于分区数。
(2) 还可以使用多线程消费,提高消费速度。

kafka多线程消费的代码:

public class ThirdMultiConsumerThreadDemo {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,Runtime.getRuntime().availableProcessors());consumerThread.start();}/**** kafka配置* @return*/public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}/*** kafka消费者线程*/public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties props, String topic, int threadNumber) {kafkaConsumer = new KafkaConsumer<>(props);kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records =kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {log.error("run error", e);} finally {kafkaConsumer.close();}}}/*** 处理消息*/public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {//处理records.for (ConsumerRecord<String, String> record : records) {System.out.println("==========>record:"+record.value() + ",thread:" + Thread.currentThread().getName());}}}}

发送消息后,使用多线程消息,运行结果如下:

==========>record:{"id":"1234","name":"lin"},thread:pool-1-thread-1
==========>record:{"id":"5678","name":"chen"},thread:pool-1-thread-2
==========>record:{"id":"91011","name":"wu"},thread:pool-1-thread-3

参考资料:

《深入理解Kafka:核心设计与实践原理》

http://www.khdw.cn/news/9657.html

相关文章:

  • 本地推广找哪些网站世界十大网站排名出炉
  • 商丘网站建设广告网站视频
  • 梁山网站开发seo专业培训需要多久
  • 企业网站属于广告吗收录提交入口网址
  • seo是东莞企业网站排seo产品推广文案范例
  • 吉林市网站建设公司上海优化网站seo公司
  • 四川省的建设厅注册中心网站论坛推广方案
  • 网站建设平台哪个好搜索热词排名
  • 动态网站设计的要求学做网站需要学什么
  • 苏州高端网站建设开发搜索引擎营销的实现方法有哪些
  • 宜春住房和城乡建设部网站域名收录查询工具
  • 做网站需要啥百度电话销售
  • 西安网站制作工程师百度一下移动版首页
  • 中国住房城乡建设部网站在线查询网站收录
  • 做外贸网站一定要会英语吗友情链接检测的特点
  • 沈阳网站建设建设公司排名福州seo推广优化
  • 阿里云网站备案要多久谷歌sem和seo区别
  • 天天广告联盟官网智能优化大师下载
  • wordpress 七牛视频南宁seo外包靠谱吗
  • 网络营销的基本内容有哪些长春seo公司哪家好
  • wordpress 模版教程杭州优化公司在线留言
  • css网站做光晕效果站长工具网站
  • 人才网站怎么做新闻头条今日新闻下载
  • 做3个网站需要多大的服务器链接地址
  • 黄浦上海网站建设互联网营销师培训机构哪家好
  • 用模板做网站的方法谷歌浏览器官网入口
  • 网站备案和服务器备案吗百度爱采购官方网站
  • 爱站seo排名可以做哪些网站百度关键词推广价格查询
  • seo如何优化网站市场营销一般在哪上班
  • 四川省城乡和住房建设厅网站首页优化公司组织架构