当前位置: 首页 > news >正文

化工厂建设网站友链之家

化工厂建设网站,友链之家,十大安卓应用商店排名,企业自己如何做网站推广源码解析目标 分析最核心组件EventLoop在Netty运行过程中所参与的事情,以及具体实现 源码解析 依然用netty包example下Echo目录下的案例代码,单我们写一个NettyServer时候,第一句话就是 EventLoopGroup bossGroup new NioEventLoopGroup(…

源码解析目标

  • 分析最核心组件EventLoop在Netty运行过程中所参与的事情,以及具体实现

源码解析

  • 依然用netty包example下Echo目录下的案例代码,单我们写一个NettyServer时候,第一句话就是 EventLoopGroup bossGroup = new NioEventLoopGroup(1);,我们先来看看NioEventLoop的UML图

在这里插入图片描述

  • 首先我们看到ScheduledEecutorService接口,这个接口是concurrent包下的一个定时任务接口,EventLoop实现了这个接口,因此可以接受定时任务,所以我们在Debug的时候,能在EventLoop中找到一个scheduledTaskQueue
  • EventLoop接口我们看下源码,如下,从注释中我们了解到,EventLoop中一旦注册了Channel,就会处理该Channel对应的所有I/O操作
/*** Will handle all the I/O operations for a {@link Channel} once registered.** One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on* implementation details and internals.**/
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {@OverrideEventLoopGroup parent();
}
  • SingleThreadEventExecutor 也是一个比较重要的类,看源码注释,说明了SingleThreadEventExecutor 是一个单个线程的线程池

/*** Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.**/
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {......}
  • 在SingleThreadEventExecutor 类中实现了很多对线程池的操作,例如runAllTask,executer,takeTask,pollTask,看下其中一个构造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;this.maxPendingTasks = Math.max(16, maxPendingTasks);this.executor = ObjectUtil.checkNotNull(executor, "executor");taskQueue = newTaskQueue(this.maxPendingTasks);rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}
//其中taskQueue初始化protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {return new LinkedBlockingQueue<Runnable>(maxPendingTasks);}
  • 如上,SingleThreadEventExecutor 队列中元素是实现了Runnable接口的对象,线程池中最重要的方法当然是executer方法,EventLoop是SingleThreadEventExecutor 的子类,那么EventLoop 类也可以直接调用executer方法来完成对事件的执行,我们来看源码
 @Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}
  • inEventLoop(); 首先判断EventLoop中线程是否是当前线程,如果是,则直接将task添加到线程池队列中
  • 如果不是则尝试启动一个线程(因为是单个线程的线程池,所以只能且只需要启动一次),之后在将任务添加到队列中去
  • isShutdown() && removeTask(task)) 中逻辑 如果线程已经停止并且删除任务失败,则直接拒绝策略
  • 接着看下addTask的实现
 /*** Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown* before.*/protected void addTask(Runnable task) {if (task == null) {throw new NullPointerException("task");}if (!offerTask(task)) {reject(task);}}final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}return taskQueue.offer(task);}
  • 从注释中可以看出,addTask方法会添加一个task任务到队列中,如果当前线程是shutdown的状态,那么直接抛出异常RejectedExecutionException
  • 接着来看executer方法中的startThread(); ,当我们判断当前线程不是EventLoop中的线程的时候会执行这个方法,他是NioEventLoop中的核心方法,如下源码
 private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {doStartThread();} catch (Throwable cause) {STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}
  • state == ST_NOT_STARTED 首先通过状态判断是否执行过,保证EventLoop只有一个线程

  • 如果没有启动 用cas的方式STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED) 去修改状态为ST_STARTED,直接调用doStartThread方法

  • 如果失败就回滚

  • 接着分析doStartThread方法,首先会调用Executor的execute方法,这个Executor 是我们在创建EventLoopGroup时候创建的是一个ThreadPerTaskExecutor类,如下图是在channel中对应的EventLoop找到的对象信息,该execute方法会将Runable包装成Netty的FastThreadLocalThread

在这里插入图片描述

  • 接着通过Thread.currentThread() 判断线程是否中断
  • updateLastExecutionTime(); 然后设置最后一次执行的事件
  • 核心方法是:SingleThreadEventExecutor.this.run(); 执行单曲NioEventLoop的run方法,等会重点关注
  • 接着完成run方法的事物处理后,在finally中使用cas不断的修改state状态,设置为ST_SHUTTING_DOWN,也就是当loop中run方法结束运行后,关闭线程,最后还会通过不断轮询来二次确认是否关闭,否则不会break跳出
  • 接下来分析EventLoop中的Run方法,我们进入Run方法,就到了我们之前分析过的NioEventLoop中的run方法,此方法做了三件事情,如下源码

@Overrideprotected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}
  • NioEventLoop 中的loop轮询是依靠run方法来执行的,在方法中可以看到是一个for循环其中三件事情,如下图中EventLoop部分

    • case SelectStrategy.SELECT: 当事件类似是SELECT 时候, 通过select(wakenUp.getAndSet(false));方法获取感兴趣的事件
    • processSelectedKeys(); 处理选中的事件
    • runAllTasks 执行队列中的任务。
      在这里插入图片描述
  • 上图不管是bossGroup还是WorkerGroup中的EventLoop都是次run方法执行流程

  • select方法实现

private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);rebuildSelector();selector = this.selector;// Select again to populate selectedKeys.selector.selectNow();selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}}}
  • 关注点在于 select方法如何体现出非阻塞,如下图中,选择器获取对应事件debug
    在这里插入图片描述

  • 在select中传如参数是1 秒,也就是默认情况下阻塞1秒中,具体的算法: long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

  • 其中 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 表示当前时间 - 定时任务的时间,那么timeoutMillis 意思就是,当有定时任务的时候 delayNanos(currentTimeNanos) 时间就部位空,那么定时任务剩余时间 t +0.5秒阻塞的时间,否则就默认1秒中阻塞时间

  • 接着判断: if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks())

    • 如果1秒(或者t+0.5)后能获取到selectedKeys :selectedKeys != 0
    • 或者select被用户唤醒 :oldWakenUp, wakenUp.get()
    • 或者任务队列中有任务存在 : hasTasks()
    • 或者有定时任务即将被执行 : hasScheduledTasks()
  • 有以上任何情况则跳出循环,否则继续沦陷,直到满足其中一个条件为止

  • 接着processSelectedKeys对获取到的selectKey处理

  • 在接着runAllTasks 执行队列任务

总结
  • 每次执行execute方法就会向队列中添加任务。当第一次添加时候就启动线程,执行run方法,run方法是EventLoop的核心实现,负责轮询获取事件,处理事件,执行队列中任务
  • 其中调用selector的select方法默认阻塞一秒,有定时任务就t+0.5,t是定时任务剩余时间,当执行execute方法时候,也就是添加任务的时候,唤醒selector,防止selector阻塞事件过长
  • 当selector返回的时候,会调用processSelectedKeys对selectKey进行处理
  • 当processSelectedKeys 方法执行结束,按照ioRatio比例执行runAllTasks方法默认是 IO 任务时间和非 IO 任务时间是相同的代码如下
  final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
http://www.khdw.cn/news/33510.html

相关文章:

  • html5标签安全优化大师下载
  • 蓝德网站建设农大南路网络营销推广优化
  • 合川网站建设seo营销推广服务公司
  • 如何自己做网站 开直播域名注册购买
  • 国内外政府网站建设经验seo网站培训班
  • 医疗美容 手机网站建设营销策划方案怎么做
  • 做视频类网站需要哪些许可证免费舆情监测平台
  • 南京建站公司网络营销工程师是做什么的
  • 中国人做跨电商有什么网站整站优化是什么意思
  • 野花日本韩国大全免费观看6seo专业培训中心
  • 一个企业做网站推广的优势自助建站网站哪个好
  • 建设银行投诉网站首页东莞网站建设平台
  • 程序开发接单巩义网站优化公司
  • 北京整站线上推广优化杭州seo技术
  • 网站开发 验收标准福州百度快照优化
  • 舌尖上的西安 网站怎么做百度收录怎么做
  • 网页设计与制作实例教程seo排名需要多少钱
  • 青岛做网站优化哪家好百度关键字推广费用
  • 天津大寺网站建设重庆网站搜索引擎seo
  • 柳州网站优化怎样提高百度推广排名
  • 做adsense对网站有什么要求百度推广怎么操作
  • 网站的客服怎么做关键词优化排名软件哪家好
  • 北京做日本旅游的公司网站计算机培训班培训费用
  • 深圳求职招聘网站网站优化种类
  • p2p网站制作 杭州企业管理系统
  • 做快餐料包的网站有哪些seo检测优化
  • 网站开发语言汇总seo关键词有话要多少钱
  • 个人网站建设视频教学搜索引擎广告形式有哪些
  • 自己做优惠券网站网站建设设计
  • 旅行社网站建设seo网站收录工具