当前位置: 首页 > news >正文

免费模板样机素材网站他达拉非片多少钱一盒

免费模板样机素材网站,他达拉非片多少钱一盒,深圳做公司网站的公司,婚庆网站建设需求分析目录 一、参考二、路由规则(分片规则)三、触发重复消费的场景场景一:触发rebalance问题描述可能原因实际影响参数在kafka0.10.1 之前:在kafka0.10.1之后:解决方案 场景二:服务宕机可能原因解决方案 消息幂等性 四、kaf…

目录

  • 一、参考
  • 二、路由规则(分片规则)
  • 三、触发重复消费的场景
    • 场景一:触发rebalance
      • 问题描述
      • 可能原因
      • 实际影响参数
      • 在kafka0.10.1 之前:
      • 在kafka0.10.1之后:
      • 解决方案
    • 场景二:服务宕机
      • 可能原因
      • 解决方案
    • 消息幂等性
  • 四、kafka参数特性
    • 常见配置
  • 五、死信队列
    • 方案一:工厂类KafkaListenerContainerFactory
      • 配置工厂类
      • 死信队列消费者
    • 方案二:错误ConsumerAwareListenerErrorHandler
      • 配置
      • 消费者
  • 六、顺序消费
    • 场景

一、参考

kafka如何解决重复消费?
Kafka重复消费

二、路由规则(分片规则)

发一个消息,如何知道消息被默认分片到哪里

1.如果没有指定key,是随机分片

2.如果指定了key,即 kafkaTemplate.send(topic, null, jsonValue);

可以套用一下公式计算:

      key.hashCode() % 12

例如有一个topic 叫test,有8个patition,key=“1”,则日志文件在

    "1".hashCode() % 8=1

在 *****/log/test-1/ 目录下面

三、触发重复消费的场景

场景一:触发rebalance

参考:使用Kafka时一定要注意防止消费速度过慢触发rebalance而导致的重复消费
参考:spring设置kafka超时时间没有生效的解决方法(解决rebalancing问题)

rebalance就是kafka认为消费者已经离线或者挂掉,就会触发rebalance把消息分配给新的消费者kafka重新平衡是按group
即当消费速度过慢时有可能会触发rebalance, 这批消息被分配到另一个消费者,然后新的消费者还会消费过慢,再次rebalance, 这样一直恶性循环下去。发生这种情况最明显的标志就是日志里能看到CommitFailedException异常,然后还会带上下面一段话:

问题描述

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

可能原因

  • 原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)。
  • 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
  • 原因3:(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
  • 原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
  • 原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
  • 原因6:并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费

实际影响参数

这里我们需要明确一下,在Kafka 0.10.1.0以后的版本中,影响rebalance触发的参数有三个,说明如下:

  • spring.kafka.properties.session.timeout.ms(默认10秒10000)
    这个参数定义了当broker多久没有收到consumer的心跳请求后就触发rebalance,默认值是10s。在0.10.1.0之前的版本中,由于心跳请求是在poll()拉取消息的方法中执行的,因此如果当前批次处理消息耗时太长,就会导致consumer没有机会按时发送心跳,broker认为消费者已死,触发rebalance。在0.10.1.0或更新的版本中解决了这个问题,心跳请求会在单独的线程中发送,因此就不会出现因为消息处理过长而发不出心跳的问题了。而每次发送心跳请求的时间 spring.kafka.properties.heartbeat.interval.ms = 3000(默认三秒)

  • spring.kafka.properties.max.poll.interval.ms(默认值为5分钟300000)
    这个参数定义了两次poll()之间的最大间隔,默认值为5分钟。如果超过这个间隔同样会触发rebalance。在多数情况下这个参数是导致rebalance消息重复的关键,即业务处理消息耗时太长,导致一直没有commit确认收到的消息,然后超过了消费者设置的最大拉取时间。有人可能会疑惑,如果5分钟都没处理完消息那肯定时出了问题,其实不然。能否在5min内处理完还取决于你每次拉取了多少条消息,如果一次拿到了成千上万条的话,5min就够呛了。有也可能是某个消费者节点正在调试,导致线程一直阻塞在那里,然后超过了最大拉取时间.

  • spring.kafka.consumer.max.poll.records
    这个参数定义了poll()方法最多可以返回多少条消息,默认值为500。注意这里的用词是"最多",也就是说如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,就只返回500。这个默认值是比较坑人的,如果你的消息处理逻辑比较重,比如需要查数据库,调用接口,甚至是复杂计算,那么你很难保证能够在5min内处理完500条消息,也就是说,如果上游真的突然大爆发生产了成千上万条消息,而平摊到每个消费者身上的消息达到了500的又无法按时消费完成的话就会触发rebalance, 然后这批消息会被分配到另一个消费者中,还是会处理不完,又会触发rebalance, 这样这批消息就永远也处理不完,而且一直在重复处理。

在kafka0.10.1 之前:

检查整个消费者死亡和检查消费则处理线程,使用的同一个线程,如果设置的max.poll.interval.ms大于session.timeout.ms,遇到一个处理时间过长的消息,会由于线程忙于处理消息,而无法发送心跳,导致kafka认为改消费则已完全死亡,进而进行Rebalance
所以推荐设置:heartbeat.inerval.ms < max.poll.interval.ms < session.timeout.ms

在kafka0.10.1之后:

session.timeout.ms 和 max.poll.interval.ms 解耦了,拆成了两个线程,不用再担心它们之间的依赖关系
推荐设置:heartbeat.interval.ms < session.timeout.ms

解决方案

要避免出现上述问题也很简单,那就是提前评估好处理一条消息最长需要多少时间,然后务必覆盖默认的max.poll.records参数。在spring-kafka中这个原生参数对应的参数项是max-poll-records。对于消息处理比较重的操作,建议把这个值改到50以下会保险一些。

  • 调整几个参数
spring.kafka.properties.max.poll.interval.ms = 600000
spring.kafka.consumer.max.poll.records = 20
spring.kafka.properties.session.timeout.ms = 25000# spring设置kafka参数session超时时间时,要小于请求超时时间与处理超时时间,例如:request.timeout.ms = 30000  session.timeout.ms = 15000    max.poll.interval.ms = 300000session.timeout.ms < request.timeout.mssession.timeout.ms < max.poll.interval.ms
  • 把这个组里比较重要的几个topic移动出去,换到其它组(java里只需要改一行):
//这里没有显式配置组,用的是上方KafkaConfig.java里的commonGroup组
//@KafkaListener(topics = "${kafka.topic.commit}")//改为了显式配置组,把这个topic移动到新组 commitGroup
@KafkaListener(topics = "${kafka.topic.commit}", groupId = "commitGroup")
  • 减少每次拉取的消息记录数和增大poll之间的时间间隔
  • 拉取到消息之后异步处理(保证成功消费)

场景二:服务宕机

可能原因

  • 消费者宕机、重启等。导致消息已经消费但是没有提交offset。

  • 由于网络问题,重复消费不可避免,因此,消费者需要实现消费幂等。#

解决方案

  • ①:消息表

  • ②:数据库唯一索引

  • ③:缓存消费过的消息id

消息幂等性

可以通过redis.setnx方法
key = topic:pardition:offset
redis.setnx(key ,alue);如果没设置过返回1,设置过返回0

四、kafka参数特性

新版kafka的broker幂等性具体实现原理:
  kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和    Sequence Number,如果相同不会再接收。

PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID对用户完全是透明的。生产者如果重启则会生成新的PID。

常见配置

fetch.min.byte:配置Consumer一次拉取请求中能从Kafka中拉取的最小数据量,默认为1B,如果小于这个参数配置的值,就需要进行等待,直到数据量满足这个参数的配置大小。调大可以提交吞吐量,但也会造成延迟

fetch.max.bytes,一次拉取数据的最大数据量,默认为52428800B,也就是50M,但是如果设置的值过小,甚至小于每条消息的值,实际上也是能消费成功的

fetch.wait.max.ms,若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间,默认是500ms

max.poll.records,单次poll调用返回的最大消息记录数,如果处理逻辑很轻量,可以适当提高该值。一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完,默认值为500

consumer.poll(100) ,100 毫秒是一个超时时间,一旦拿到足够多的数据(fetch.min.bytes 参数设置),consumer.poll(100)会立即返回 ConsumerRecords<String, String> records。如果没有拿到足够多的数据,会阻塞100ms,但不会超过100ms就会返回

max.poll.interval.ms,两次拉取消息的间隔,默认5分钟;通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行再均衡操作(将分区分配给组内其他消费者成员)

若超过这个时间则报如下异常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member. This means that the time between subsequent calls
to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either by increasing the session timeout or by
reducing the maximum size of batches returned in poll() with max.poll.records. 
  即:无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多的时间来处理消息。

可以通过增加max.poll.interval.ms来解决这个问题,也可以通过减少在poll()中使用max.poll.records返回的批的最大大小来解决这个问题

max.partition.fetch.bytes:该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。

session.timeout.ms:消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s,将触发再均衡操作

对于每一个Consumer Group,Kafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事:

维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。

协调Group成员的行为。

poll机制

①:每次poll的消息处理完成之后再进行下一次poll,是同步操作

②:每次poll之前检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移

③:每次poll时,consumer都将尝试使用上次消费的offset作为起始offset,然后依次拉取消息

④:poll(long timeout),timeout指等待轮询缓冲区的数据所花费的时间,单位是毫秒

五、死信队列

方案一:工厂类KafkaListenerContainerFactory

配置工厂类

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.*;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional;/*** 消费MQ的消息配置* @author demo* @create 2022-11-12*/@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@ConditionalOnProperty(name = "mq.pipeline", havingValue = IotConstant.MQ_KAFKA)
public class KafkaReceiverConfig {@Resourceprivate ConsumerFactory<String,String> consumerFactory;@Resourceprivate KafkaTemplate<String,String>  kafkaTemplate;@Beanpublic KafkaListenerContainerFactory<?> retryKafkaFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 最大重试次数2次,重试间隔10秒,超过2次(本身一次,重试一次)还没成功,进入死信队列// 注意:目前自动创建主题的配置关闭了,需要提前手动去创建好死信队列主题!!! 死信队列主题的命名方式:原主题名称 + .DLTfactory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(Collections.singletonMap(Object.class, kafkaTemplate), (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition())), new FixedBackOff(10 * 1000L, 2L)));return factory;}}`### 消费者`````java/*** 发送消息* @param consumerRecord 消息记录* @param topicGroupId 消费组*/@KafkaListener(topics = "#{'${mq.alarm.inner.topic.name}'.split(',')}", containerFactory = "retryKafkaFactory")public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());if (kafkaMessage.isPresent()) {String id = null;try {String json = kafkaMessage.get();// todo...} catch (Throwable t) {// 判断是否可恢复异常if(isRecoverable(t)){// ...IotCacheUtil.deleteMessageId(id);// 送入死信队列throw  t;} else {log.error("消费失败 topic:{}, messageId:{}, offset:{}, partition:{} ,异常:{}",consumerRecord.topic(),id,consumerRecord.offset(),consumerRecord.partition(),t);}}}}

死信队列消费者

@KafkaListener( topics = "iotAiAlarmInner.DLT")public void messageListenerDLT(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {log.info("告警死信队列 topic:{}, offset:{}, partition:{} ,key:{}, message:{}",consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key(),Optional.ofNullable(consumerRecord.value()).orElse(null));Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());if (kafkaMessage.isPresent()) {}}

方案二:错误ConsumerAwareListenerErrorHandler

配置


/*** 消费MQ的消息配置* @author demo* @create 2022-11-12*/@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@ConditionalOnProperty(name = "mq.pipeline", havingValue = IotConstant.MQ_KAFKA)
public class KafkaReceiverConfig {@Resourceprivate ConsumerFactory<String,String> consumerFactory;@Resourceprivate KafkaTemplate<String,Object>  kafkaTemplate;/*** 针对tag消息过滤* producer 将tag写进header里* @return ConcurrentKafkaListenerContainerFactory*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String,String> filterContainerFactory() {ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃factory.setAckDiscarded(true);factory.setRecordFilterStrategy(consumerRecord -> {if (Optional.ofNullable(consumerRecord.value()).isPresent()) {for (Header header : consumerRecord.headers()) {if (header.key().equals(IotConstant.MQ_TAG) && new String(header.value()).equals(new String(IotConstant.MQ_TAG_VALUE.getBytes(StandardCharsets.UTF_8)))) {return false;}}}//返回true将会被丢弃return true;});return factory;}@Beanpublic ConsumerAwareListenerErrorHandler consumerIotAlarmAwareErrorHandler() {return new ConsumerAwareListenerErrorHandler() {@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {log.error("consumerAwareErrorHandler receive : {}, error:{}",message.getPayload(),e);//获取消息处理异常主题MessageHeaders headers = message.getHeaders();/*List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);*/Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
//                String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;String topic="iotAlarmInner"+ KafkaAlarmListener.TOPIC_DLT;//放入死信队列kafkaTemplate.send(topic,message.getPayload());return message;}};}
}

消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.UnknownHostException;
import java.util.Optional;/*** 内部自产自销消费者* @author demo* @create 2022-11-09*/
@Slf4j
@Component
@ConditionalOnProperty(name = "mq.pipeline", havingValue = IotConstant.MQ_KAFKA)
public class KafkaAlarmListener {@PostConstructpublic void init() {log.info("启动了通用告警消费者");}public static final String TOPIC_DLT=".DLT";@Resourceprivate DeviceService deviceService;/*** 发送消息* @param consumerRecord 消息记录* @param topicGroupId 消费组*/@KafkaListener(topics = "#{'${mq.alarm.inner.topic.name}'.split(',')}",errorHandler = "consumerIotAlarmAwareErrorHandler",concurrency = "3")public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {execute(consumerRecord, false);}/*** 死信队列* @param consumerRecord 消息记录* @param topicGroupId 消费组*/@KafkaListener( topics = "iotAlarmInner"+TOPIC_DLT)public void deadConsumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {log.info("告警死信队列 topic:{}, offset:{}, partition:{} ,key:{}",consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key());execute(consumerRecord, true);}private void execute(ConsumerRecord<?, String> consumerRecord,boolean dead){String title = dead?"告警死信":"告警";Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());if (kafkaMessage.isPresent()) {String id = null;try {String json = kafkaMessage.get();// 解析AlarmMessageDTO message = JsonUtil.parse(json,AlarmMessageDTO.class);// 判断消息是否已处理id = message.getId();if(json.length()>10000){log.info("{} topic:{}, offset:{}, partition:{} ,key:{}, messageId:{}",title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key(),id);}else{log.info("{} topic:{}, offset:{}, partition:{} ,key:{}, content:{}, messageId:{}",title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key(),id, json);}if(IotCacheUtil.isExistMessageId(id)){log.error("{} topic:{}, messageId:{} 重复消息",title,consumerRecord.topic(), id);return;}// 处理deviceService.uploadAlarm(message);} catch (Throwable t) {if(dead){log.error("{} 消费异常 topic:{}, offset:{}, partition:{}, messageId:{} ,异常:{}",title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),id,t);return;}// 判断是否可恢复异常if(isRecoverable(t)){// ...IotCacheUtil.deleteMessageId(id);// 触发失败errorHandler死信队列throw  t;} else {log.error("{} 消费失败 topic:{}, offset:{}, partition:{}, messageId:{} ,异常:{}",title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),id,t);}}}}private boolean isRecoverable(Throwable t){if(t instanceof NullPointerException){return false;}else if( t instanceof UnknownHostException){return false;}return true;}}

六、顺序消费

利用kafka的分区(pardition)功能,通过生产者send的时候设置key,kafka的broker会根据key计算hash,发送到对应的分区

场景

比如用户三次修改名字
我们再发送消息的时候,把userId设置为key,这样保证三条消息都在一个pardition

http://www.khdw.cn/news/53292.html

相关文章:

  • 做很多网站引流app推广软件
  • 做视频写真网站犯法吗网络营销公司怎么注册
  • 青海中小企业网站建设网络优化网站
  • div使用太多影响网站收录今天全国疫情最新消息
  • 深圳企业公司有哪些关键词优化搜索引擎
  • 济南快速网站排名打开app下载
  • 企业网站设计总结指数平滑法
  • 中国城乡建设三农委员会官方网站优化搜狗排名
  • 做网站定制的一般什么价位百度一下首页网页手机版
  • 如何用dw做asp动态网站十大免费域名
  • 天津哪家做网站好客户引流推广方案
  • 做外包网站搭建谷歌google浏览器官方下载
  • 广州led网站建设seo网站优化软件价格
  • 网站注册费网络营销的推广
  • 网站怎样做免费推广公司的网站
  • 郑州网站制作_郑州网页制作_做网站设计_河南网站制作网域名注册阿里云
  • 电商网站建设方案模板长沙seo培训班
  • 萌兔网站做代销可靠吗学市场营销后悔死了
  • 门户型网站谷歌搜索入口 镜像
  • 微商城网站建设平台网络营销是干嘛的
  • 武汉网站开发有限公司美国站外推广网站
  • 自己怎么做免费网站泰安百度推广代理商
  • 外贸公司 如何做公司网站互联网去哪里学
  • 培训网站推荐小吃培训2000元学6项
  • 360网站卖东西怎么做营销型网站制作成都
  • 电商网站开发商怎么做网站平台
  • jsp网站开发的使用表格seo网站排名后退
  • 如何申请免费的网站空间关键词整站优化公司
  • 网站优化常见的优化技术竞价培训课程
  • 网站管理 上传模板搜索最多的关键词的排名