当前位置: 首页 > news >正文

山东德州最大的网站建设教学热门推广软件

山东德州最大的网站建设教学,热门推广软件,网站产品要如何做详情,网站 板块 模块Seata源码分析-2PC核心源码解读 2PC提交源码流程 上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器,一旦执行拦截器,我们就会进入到其中的invoke方法,在这其中会做一些GlobalTransactional注解的判断,如果有注解…

Seata源码分析-2PC核心源码解读

2PC提交源码流程

上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器,一旦执行拦截器,我们就会进入到其中的invoke方法,在这其中会做一些@GlobalTransactional注解的判断,如果有注解以后,会执行全局事务和全局锁,那么在执行全局事务的时候会调用handleGlobalTransaction全局事务处理器,这里主要是获取事务信息

Object handleGlobalTransaction(final MethodInvocation methodInvocation,final GlobalTransactional globalTrxAnno) throws Throwable {boolean succeed = true;try {return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {return methodInvocation.proceed();}// 获取事务名称,默认获取方法名public String name() {String name = globalTrxAnno.name();if (!StringUtils.isNullOrEmpty(name)) {return name;}return formatMethod(methodInvocation.getMethod());}/*** 解析GlobalTransactional注解属性,封装为对象* @return*/@Overridepublic TransactionInfo getTransactionInfo() {// reset the value of timeout// 获取超时时间,默认60秒int timeout = globalTrxAnno.timeoutMills();if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout = defaultGlobalTransactionTimeout;}// 构建事务信息对象TransactionInfo transactionInfo = new TransactionInfo();transactionInfo.setTimeOut(timeout);// 超时时间transactionInfo.setName(name()); // 事务名称transactionInfo.setPropagation(globalTrxAnno.propagation());// 事务传播transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());// 校验或占用全局锁重试间隔transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());// 校验或占用全局锁重试次数Set<RollbackRule> rollbackRules = new LinkedHashSet<>();// 其他构建信息for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : globalTrxAnno.rollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : globalTrxAnno.noRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {// 执行异常TransactionalExecutor.Code code = e.getCode();switch (code) {case RollbackDone:throw e.getOriginalException();case BeginFailure:succeed = false;failureHandler.onBeginFailure(e.getTransaction(), e.getCause());throw e.getCause();case CommitFailure:succeed = false;failureHandler.onCommitFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackFailure:failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();case RollbackRetrying:failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();default:throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));}} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}
}

在这其中,我们要关注一个重点方法execute()

其实这个方法主要的作用就是,执行事务的流程,大概一下几点:

  1. 获取事务信息
  2. 开始执行全局事务
  3. 发生异常全局回滚,各个数据通过undo_log表进行事务补偿
  4. 全局事务提交
  5. 清除所有资源

这个位置是非常核心的一个位置,因为我们所有的业务进来以后都会走这个位置。

这其中的第三步和第四步就是在想TC(Seata-Server)发起全局事务的提交/回滚

public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfo// 获取事务信息TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.// 获取当前事务,主要获取XidGlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 根据配置的不同事务传播行为,执行不同的逻辑Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED:// If transaction is existing, suspend it.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.// 当前没有事务,则创建一个新的事务if (tx == null) {tx = GlobalTransactionContext.createNew();}// set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//    else do nothing. Of course, the hooks will still be triggered.// 开始执行全局事务beginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行当前业务逻辑:// 1. 在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据// 2. 执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log表中// 3. 远程调用其他应用,远程应用接收到xid,也会注册分支事务,写入branch_table及本地undo_log表// 4. 会在lock_table表中插入全局锁数据(一个分支一条)rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 发生异常全局回滚,各个数据通过undo_log表进行事务补偿completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.// 全局提交事务commitTransaction(tx);return rs;} finally {//5. clear// 清除所有资源resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}
}

如何发起全局事务

这个位置我们就看当前这个代码中的 beginTransaction(txInfo, tx);方法

// 想TC发起请求,这里采用了模板模式
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {triggerBeforeBegin();// 对TC发起请求tx.begin(txInfo.getTimeOut(), txInfo.getName());triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}
}	

那我们向下来看begin方法,那要注意,这里调用begin方法的是DefaultGlobalTransaction

@Override
public void begin(int timeout, String name) throws TransactionException {//判断调用者是否是TMif (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}assertXIDNull();String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}// 获取Xidxid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}
}

在向下来看begin方法,这时候使用的是(默认事务管理者)DefaultTransactionManager.begin,来真正的获取xid,其中就是传入事务的相关信息,最终TC端返回对应的全局事务Xid。

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);// 发送请求得到响应GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}//返回Xidreturn response.getXid();
}

这里采用的是Netty的通讯方式

private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {// 通过Netty发送请求return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException toe) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);}
}

图解地址:https://www.processon.com/view/link/6213d58f1e0853078013c58f

http://www.khdw.cn/news/4039.html

相关文章:

  • 2023年重大政治时事seo排名优化培训价格
  • 合肥网站设计建设快速申请免费个人网站
  • django 做网站赚钱经典的软文广告
  • 学平面设计需要准备什么东西淄博seo公司
  • wordpress可以做电影网站吗最火的推广软件
  • 个人网站备案后做游戏湖南企业竞价优化
  • 辽宁招标网招标公告天津百度快速排名优化
  • 桂林广告公司网站建设怎么推广自己的微信
  • 中国网站服务器哪个好软文写作的三个要素
  • 类似美团网的网站是怎么做的新闻头条今日要闻国内
  • 没有备案做盈利性的网站违法吗磁力狗在线搜索
  • 哪里有做网站培训的鸿星尔克网络营销
  • 铭万做网站怎么样百度官方网页版
  • 苏州企业网站建设电话南宁seo公司
  • 网业翻译成中文济南网站优化公司
  • 开封网站建设百度云网盘搜索引擎
  • 吉安网站推广企业培训机构有哪些
  • 免费个人网站建设公司抖音seo公司
  • 做电子商务网站的总结视频运营管理平台
  • 网站首屏做多大网页制作软件手机版
  • 自建网站备案网络推广计划书范文
  • 入侵WordPress网站聚合搜索引擎接口
  • linux wordpress 空白搜索引擎外部链接优化
  • 邵阳网站开发公司推荐最好的关键词排名优化软件
  • 做基础网站主机要?短视频代运营合作方案
  • 比较好的中文wordpress主题国外seo网站
  • wordpress微信快捷支付湖南seo推广服务
  • 百度做网站电话多少钱怎样优化网站排名
  • 网络公司网站开发aso榜单优化
  • 凡科做的网站可以在百度搜到吗qq关键词排名优化