Tomcat源码阅读之Connector设计与实现
先暂时跳过Service的分析,先来看看connector部分。。。。
好像在tomcat8这个版本中,只有Nio的Connector可以用了。。。
我们来看看在server.xml中一个connector的定义:
<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443"/>
这里的参数应该很好理解,首先是端口,将会创建acceptor监听该端口,然后是协议的类型,接着是链接超时,然后是跳转端口,其实这里还可以有executor的属性,用于指定当前connector用哪一个executor,这一点与jetty就稍微不同了。。
在jetty中,是全局共享一个线程池,而在tomcat中,则是每一个connector有自己的线程池。。。
这里先来看看Connector是怎么创建的吧。。。在Catalina对象中,关于Connector部分的配置解析代码如下:
//为servie添加connector规则 digester.addRule("Server/Service/Connector", new ConnectorCreateRule()); //这个规则会创建connector,而且如果有属性执行executor的话,还会设置connector的executor digester.addRule("Server/Service/Connector", //<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443"/> new SetAllPropertiesRule(new String[]{"executor"})); //这里会将所有的属性都设置,除了executor digester.addSetNext("Server/Service/Connector", "addConnector", "org.apache.catalina.connector.Connector"); //在service上面添加这个connector
那么接下来来看看这个ConnectorCreateRule对象究竟是怎么创建connector的吧:
//<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443"/> @Override public void begin(String namespace, String name, Attributes attributes) throws Exception { Service svc = (Service)digester.peek(); //获取当前的service Executor ex = null; if ( attributes.getValue("executor")!=null ) { //如果有executor属性 ex = svc.getExecutor(attributes.getValue("executor")); //从service获取指定的executor,如果没有指定的话,那么将会自己创建executor } Connector con = new Connector(attributes.getValue("protocol")); //创建connector if ( ex != null ) _setExecutor(con,ex); //如果有配置的话,那么设置 digester.push(con); //将当前创建的connector放入digester } //其实是在protocolhandler上面设置classLoader public void _setExecutor(Connector con, Executor ex) throws Exception { Method m = IntrospectionUtils.findMethod(con.getProtocolHandler().getClass(),"setExecutor",new Class[] {java.util.concurrent.Executor.class}); if (m!=null) { m.invoke(con.getProtocolHandler(), new Object[] {ex}); }else { log.warn("Connector ["+con+"] does not support external executors. Method setExecutor(java.util.concurrent.Executor) not found."); } }
代码还算是蛮简单的吧,首先是创建connector对象,接着会看其是否有配置executor属性,如果有的话,那么获取service相应的的executor,然后将其设置给connector,这里还可以看到,其实executor最终是交给了protocolhandler对象去了。。。
这里在看看Connector类型的构造函数之前,先来看看它的一个简单的结构图:
首先它继承了LifecycleMBeanBase,那么表明Connector对象将会被注册到JMX上面去。。。嗯,在jconsole上面确实可以看到。。。
接着上图交代了connector依赖的两个重要属性,首先是protocolHandler,然后就是adapter,他们具体是干啥用的呢?
(1)protocolHandler用于具体的底层IO以及数据的处理,例如acceptor,http协议的什么的。。
(2)adapter用于将protocolHandler生成好的request以及response,将其交给container来处理。。。
好啦,来看看Connector的构造函数吧:
//构造函数,这里同时还要创建protocolHandler对象,一般都是用org.apache.coyote.http11.Http11NioProtocol public Connector(String protocol) { setProtocol(protocol); //着这里会根据协议的类型来指定待会创建的protocol的class类型 // Instantiate protocol handler ProtocolHandler p = null; try { Class<?> clazz = Class.forName(protocolHandlerClassName); //创建protocolhandler p = (ProtocolHandler) clazz.newInstance(); } catch (Exception e) { log.error(sm.getString( "coyoteConnector.protocolHandlerInstantiationFailed"), e); } finally { this.protocolHandler = p; //将创建protocolhandler保存起来 } if (!Globals.STRICT_SERVLET_COMPLIANCE) { URIEncoding = "UTF-8"; URIEncodingLower = URIEncoding.toLowerCase(Locale.ENGLISH); } }
这里还算蛮简单吧,首先根据connector用的协议类型,设置要创建的protocolhandler的类型,接着创建相应的protocolhandler对象。。。。
那么接下来来看看Connector的初始化方法吧:
@Override //用于真正的初始化当前的 connector的方法 protected void initInternal() throws LifecycleException { super.initInternal(); //这个里面主要是进行在jmx上面注册的部分 // Initialize adapter adapter = new CoyoteAdapter(this); //创建adapter对象,它用 protocolHandler.setAdapter(adapter); //在protolhanlder里面保存adapter // Make sure parseBodyMethodsSet has a default if( null == parseBodyMethodsSet ) { setParseBodyMethods(getParseBodyMethods());//设置用于parse 请求的body的方法 } if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerNoApr", getProtocolHandlerClassName())); } try { protocolHandler.init(); //初始化protocolHandler } catch (Exception e) { throw new LifecycleException (sm.getString ("coyoteConnector.protocolHandlerInitializationFailed"), e); } }
这里代码应该还是蛮简单的吧,首先是创建了一个adapter,然后这里最重要的无非是对protocolhandler的初始化、、、
那么再来看看启动的方法吧:
//启动当前connector的代码,其实主要是启动protocolhandler对象 protected void startInternal() throws LifecycleException { // Validate settings before starting if (getPort() < 0) { throw new LifecycleException(sm.getString( "coyoteConnector.invalidPort", Integer.valueOf(getPort()))); } setState(LifecycleState.STARTING); //设置当前组件的状态 try { protocolHandler.start(); //其实主要是启动protocolhandler对象 } catch (Exception e) { String errPrefix = ""; if(this.service != null) { errPrefix += "service.getName(): \"" + this.service.getName() + "\"; "; } throw new LifecycleException (errPrefix + " " + sm.getString ("coyoteConnector.protocolHandlerStartFailed"), e); } }
这里也没啥意思吧,其实最重要的还是对protocolhandler的启动。。。
好了,到这里基本的看了看Connector的定义,其实最主要的事情还是在adapter以及protocolhandler上面做的,那么这里就先来看看protocolhandler都要做啥吧。。。。
先来看看最顶层的接口:
package org.apache.coyote; import java.util.concurrent.Executor; // public interface ProtocolHandler { public void setAdapter(Adapter adapter); //设置适配器,用于处理生成的请求 public Adapter getAdapter(); public Executor getExecutor(); //返回下层的executor public void init() throws Exception; //初始化 public void start() throws Exception; //启动 public void pause() throws Exception; //暂停 public void resume() throws Exception; public void stop() throws Exception; public void destroy() throws Exception; public boolean isAprRequired(); //是否需要apr、native的需求 public boolean isCometSupported(); public boolean isCometTimeoutSupported(); public boolean isSendfileSupported(); //是否支持sendfile操作 }
接口的定义还算是蛮简单的吧,首先是设置以及获取adapter对象的方法,然后就还有获取executor,然后就还有一些启动,暂停什么的方法定义。。。
好啦,在具体的看常用到的protocolhandler类型Http11NioProtocol之前,先来看看一个结构图吧:
这里,其实protocolhandler的运行也主要是依靠内部的endpoint和connectionhandler,
(1)endpoint具体来维护acceptor以及poller对象
(2)connectionhandler用于来处理从poller里面selelct出来的channel,调用processor对象来解析http协议啥的,当请求生成了之后调用adapter对象来处理。。。
这里来看看Http11NioProtocol的够早函数吧。。。:
endpoint = new NioEndpoint(); //创建endpoint cHandler = new Http11ConnectionHandler(this); //创建一个Http11ConnectionHandler ((NioEndpoint) endpoint).setHandler(cHandler); //为endpoint设置connectionhandler setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); }
其实这里主要是创建了endpoint对象以及connectionhandler对象,然后进行了一些参数的设置。。。
再来看看启动的方法吧:
public void start() throws Exception { super.start(); //在父类完成对endpoint的启动 if (npnHandler != null) { npnHandler.init(getEndpoint(), 0, getAdapter()); } }
其实这里主要也都是对endpoint的启动。。。
因此要理解connector对象的关键在于protocolhandler,而理解protocolhandler的关键在于endpoint。。。
好啦,那么接下来就来看看NioEndpoint的实现吧,先来看一张图:
上面的图基本把NioEndpoint最终要的东西都已经弄出来了吧。。。。
(1)acceptor,嗯,这个东西是干啥用的应该从名字里面很容易理解吧
(2)poller,嗯,这个是干嘛的用的应该也能够基本上从名字知道吧。。。
(3)executor,嗯,线程池,Tomcat与jetty在线程池的处理也有比较大的差异吧,Tomcat的线程池的粒度到了connector这个层级,而在jetty中就稍微粗糙了些,整个server都共享同一个线程池,而且在Tomcat中,acceptor与poller对象都有自己的线程,而在jetty中则是简单的派发到了线程池中运行。。。
这里就从启动的方法来开始看endpoint吧:
public final void start() throws Exception { if (bindState == BindState.UNBOUND) { bind(); //这里主要是绑定监听 bindState = BindState.BOUND_ON_START; } startInternal(); }
嗯,在父类中,主要的事情就是建立监听,也就是serversocketchannel,接下来的事情则在NioEndpoint里面的startInternal方法中:
//启动当前的endpoint public void startInternal() throws Exception { if (!running) { running = true; //设置表示为,表示已经看是运行了 paused = false; //没有暂停 // Create worker collection if ( getExecutor() == null ) { //如果没有executor,那么创建 createExecutor(); //创建executor } initializeConnectionLatch(); // // Start poller threads pollers = new Poller[getPollerThreadCount()]; //根据设置的poller数量来创建poller对象的数组 for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); // 创建poller对象 Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); // 创建相应的poller线程 pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); //启动poller } startAcceptorThreads(); //启动acceptor } }
嗯,这里要做的事情就是先搞定线程池,如果在Connector配置中并没有制定线程池,那么这里需要创建一个默认的,接着根据poller的数量来创建相应的poller对象,这里数量一般情况下与机器的cpu数量相等。。。接着就是开始创建acceptor对象,并启动了:
//这里主要是创建acceptor以及启动acceptor protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
不要被这个for循环迷惑了,正常情况下也就一个acceptor。。。创建完后接着就启动就好了。。
好啦,poller与acceptor都已经启动了,那么endpoint也就启动起来了。。
好了,这里就从acceptor对象入手吧:
//这里主要是规定了acceptor的run方法 protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { //如果暂停了 state = AcceptorState.PAUSED; //更改当前acceptor的状态 try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { //如果没有运行,那么这里直接跳过 break; } state = AcceptorState.RUNNING; //设置当前acceptor的状态是running try { //if we have reached max connections, wait countUpOrAwaitConnection(); //增减闭锁的计数,如果connection数量已经达到了最大,那么暂停一下,这里用到的是connectionLimitLatch锁,可以理解为一个闭锁吧 SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket socket = serverSock.accept(); //调用serversocket的accept方法 } catch (IOException ioe) { //we didn‘t get a socket countDownConnection(); //出了异常,并没有获取链接,那么这里减少闭锁的计数 // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // setSocketOptions() will add channel to the poller // if successful if (running && !paused) { if (!setSocketOptions(socket)) { //这里主要是将socket加入到poller对象上面去 countDownConnection(); //加入poller对象失败了的话,那么将闭锁的计数减低 closeSocket(socket); //关闭刚刚 创建的这个socket } } else { countDownConnection(); closeSocket(socket); } } catch (SocketTimeoutException sx) { // Ignore: Normal condition } catch (IOException x) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); } } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; //设置acceptor的状态为ended } }
这里主要来看看acceptor对象的run方法吧,相对来说还是比较容易等把,最主要的事情就是调用serversocketchannel的accept方法获取连接的socketchannel,然后再将其注册到poller对象上面去。。。这里可能需要注意的细节就是这里还涉及到了一个connection数量的限制,是通过闭锁来实现的,如果当前已经拥有的connection已经达到了限制,那么需要等待,暂停当前的acceptor。。。
好了,那么接下来来看看是如何将获取的socketchannel注册到poller上面去的吧。。。
//这里主要是设置刚刚获取的socket的一些配置,然后将它注册到poller上面去 protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); //将socket设置为非阻塞的 Socket sock = socket.socket(); //获取底层的socket socketProperties.setProperties(sock); //设置一些配置属性 NioChannel channel = nioChannels.pop(); //获取一个空闲的NioChannel,用于包装实际底层用到的socketchannel if ( channel == null ) { // SSL setup if (sslContext != null) { //如果是加密的 SSLEngine engine = createSSLEngine(); int appbufsize = engine.getSession().getApplicationBufferSize(); NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()), Math.max(appbufsize,socketProperties.getAppWriteBufSize()), socketProperties.getDirectBuffer()); channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool); } else { // normal tcp setup NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); //设置用到的底层channel if ( channel instanceof SecureNioChannel ) { SSLEngine engine = createSSLEngine(); ((SecureNioChannel)channel).reset(engine); } else { channel.reset(); //将channle复位 } } getPoller0().register(channel); //获取一个poller对象,在poller上面注册channel } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(t); } // Tell to close the socket return false; } return true; }
这里其实并没有立即将socketchannel对象注册到poller上面去,而是先将底层的socket设置为了非阻塞,并进行了一些参数的设置,然后用一个NioChannel对象来包装这个socketchannel,最后才是获取一个poller对象来进行注册。。。
public void register(final NioChannel socket) { socket.setPoller(this); //设置这个channle用到的poller对象 KeyAttachment key = keyCache.pop(); //获取一个空闲的KeyAttachment对象 final KeyAttachment ka = key!=null?key:new KeyAttachment(socket); //如果没有的话,那么需要创建一个 ka.reset(this,socket,getSocketProperties().getSoTimeout()); //对KeyAttachment进行复位,并设置相应的poller对象以及channel对象 ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); //获取一个空闲的PollerEvent ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); // 如果没有的空闲的话,那么需要创建一个,OP_REGISTER表示这是新的channel需要注册 else r.reset(socket,ka,OP_REGISTER); addEvent(r); //加入pollerevent }
这里可以看到也并没有理解将channel注册到selector上面去,而是生成了一个PollerEvent对象,并生成了一个KeyAttachment对象,嗯从名字就知道KeyAttachment对象就是待会注册到selector上面的附件,然后会将pollerevent对象放到一个队列里面去,待会再poller自己的线程的循环中,会在队列里面取出这个pollerEvent对象,然后完成注册。。。
这里之所以这么做,是因为进行注册这个时候的线程是acceptor对象的线程,所以不行的,需要在selector自己的所在的线程中来完成具体的在selector上面的注册。。。。。好啦,接下来就来看看poller对象的run方法吧:
//其实主要是进行循环select,然后处理事件 public void run() { // Loop until destroy() is called while (true) { try { // Loop if endpoint is paused while (paused && (!close) ) { //如果当前poller已经暂停了 try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } } boolean hasEvents = false; // Time to terminate? if (close) { events(); // 处理队列里面的pollerevent对象 timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } else { hasEvents = events(); //将待处理pollerevent处理掉 } try { if ( !close ) { // 如果没有关闭 if (wakeupCounter.getAndSet(-1) > 0) { //如果大于零,说明有事件需要处理,那么调用非阻塞的方法 //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); //调用selector的selectnow方法,立即返回的方法 } else { keyCount = selector.select(selectorTimeout); //调用阻塞的方法,返回key的数量 } wakeupCounter.set(0); //将其归0 } if (close) { //如果已经关闭了,呵呵,不用搞了,直接停掉吧 events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } } catch ( NullPointerException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch ( CancelledKeyException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first //当select出来的key为0的时候 if ( keyCount == 0 ) { hasEvents = (hasEvents | events()); //首先处理队列中的事情,例如注册啥的。。 } Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; //获取select出来的key的迭代器 // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { //遍历所有select出来的事件 SelectionKey sk = iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { //因为有可能在别的线程中取消了key的注册,所以有可能为空,那么直接移除就好了 iterator.remove(); } else { attachment.access(); //设置访问时间 iterator.remove(); //将其移除 processKey(sk, attachment); //处理select出来的 } }//while //process timeouts timeout(keyCount,hasEvents); //处理超时 if ( oomParachute > 0 && oomParachuteData == null ) checkParachute(); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } }//while stopLatch.countDown(); }
代码虽然挺长的,但是要做的事情还是比较简单,无非是调用selector的select方法,接着调用processKey方法用于处理select出来的key。。。当然这里还涉及出来前面提到的队列里面的pollerevent,还有超时。。这个就不细说了。。。
//处理select出来的key protected boolean processKey(SelectionKey sk, KeyAttachment attachment) { boolean result = true; try { if ( close ) { //如果已经关闭了,那么需要取消 cancelledKey(sk, SocketStatus.STOP); //取消在selector上面的注册 } else if ( sk.isValid() && attachment != null ) { attachment.access();//设置当前的访问时间,防止超时将其处理掉 NioChannel channel = attachment.getChannel(); //获取用到的channel if (sk.isReadable() || sk.isWritable() ) { //如果是可写或者可读 if ( attachment.getSendfileData() != null ) { //如果有sendfile的方法 processSendfile(sk,attachment, false); } else { if ( isWorkerAvailable() ) { //是否在工作 unreg(sk, attachment, sk.readyOps()); //这里会根据已经ready的事件进行操作,将已经ready的事件从感兴趣的事件中移除 boolean closeSocket = false; //没有关闭socket // Read goes before write if (sk.isReadable()) { //如果可读,那么处理 if (!processSocket(channel, SocketStatus.OPEN_READ, true)) { //处理当前的channel closeSocket = true; // 失败了,那么表明要关闭这个socket } } if (!closeSocket && sk.isWritable()) { //如果可写 if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { // 如果有需要关闭socket,那么取消注册 cancelledKey(sk,SocketStatus.DISCONNECT); } } else { result = false; } } } } else { //无效的key //invalid key cancelledKey(sk, SocketStatus.ERROR); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk, SocketStatus.ERROR); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } return result; }
这里其实要做的事情也很简单吧,这里主要的要做的事情就是调用processSocket方法来处理select出来的channel,这里另外的一些细节在注释也交代了。。。
好啦,接着看吧:
//具体的处理select出来的channel,第二个参数是用于表示当前要处理的事件,读或者写 protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false); //获取附件 if (attachment == null) { return false; } attachment.setCometNotify(false); //will get reset upon next reg SocketProcessor sc = processorCache.pop(); //获取一个空闲的socketprocessor if ( sc == null ) sc = new SocketProcessor(socket,status); //如果没有的话,那么需要创建一个 else sc.reset(socket,status); Executor executor = getExecutor(); //获取executor if (dispatch && executor != null) { executor.execute(sc); //一般都是在线程池中运行 } else { sc.run(); //居然就在当前线程run,我擦,还有这种 } } catch (RejectedExecutionException ree) { log.warn(sm.getString("endpoint.executor.fail", socket), ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
这里其实将channel封装到一个SocketProcessor里面去,然后放到executor里面去调度执行,那么来看看SocketProcessor的run方法吧:
@Override public void run() { //处理读或者写 SelectionKey key = socket.getIOChannel().keyFor( socket.getPoller().getSelector()); //获取key KeyAttachment ka = null; if (key != null) { ka = (KeyAttachment)key.attachment(); //获取附件 } // Upgraded connections need to allow multiple threads to access the // connection at the same time to enable blocking IO to be used when // NIO has been configured if (ka != null && ka.isUpgraded() && //判断事件类型 SocketStatus.OPEN_WRITE == status) { synchronized (ka.getWriteThreadLock()) { doRun(key, ka); } } else { synchronized (socket) { doRun(key, ka); } } } private void doRun(SelectionKey key, KeyAttachment ka) { boolean launch = false; try { int handshake = -1; try { if (key != null) { //不为空 // For STOP there is no point trying to handshake as the // Poller has been stopped. if (socket.isHandshakeComplete() || status == SocketStatus.STOP) { handshake = 0; } else { handshake = socket.handshake( //ssl的握手 key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. status = SocketStatus.OPEN_READ; } } } catch (IOException x) { handshake = -1; if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x); } catch (CancelledKeyException ckx) { handshake = -1; } if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket if (status == null) { state = handler.process(ka, SocketStatus.OPEN_READ); //用connectionhandler来处理,刚开始肯定处理读取了 } else { state = handler.process(ka, status); } //根据返回的状态来处理 if (state == SocketState.CLOSED) { //如果socket已经关闭 // Close socket and pool try { if (ka!=null) ka.setComet(false); socket.getPoller().cancelledKey(key, SocketStatus.ERROR); //取消注册 if (running && !paused) { nioChannels.push(socket); //回收channel } socket = null; if (running && !paused && ka != null) { keyCache.push(ka); //回收key } ka = null; } catch (Exception x) { log.error("",x); } } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) { //we are async, and we are interested in operations ka.getPoller().add(socket, ka.interestOps()); } } else if (handshake == -1 ) { if (key != null) { socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT); } if (running && !paused) { nioChannels.push(socket); } socket = null; if (running && !paused && ka != null) { keyCache.push(ka); } ka = null; } else { ka.getPoller().add(socket,handshake); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key, null); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; log.error("", oom); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR); } releaseCaches(); } catch (Throwable oomt) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); } catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error("", t); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR); } } finally { if (launch) { try { getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); } catch (NullPointerException npe) { if (running) { log.error(sm.getString("endpoint.launch.fail"), npe); } } } socket = null; status = null; //return to cache if (running && !paused) { processorCache.push(this); //回收利用 } } } }
代码也还挺长的吧,其实要做的事情就是调用以前提到过的protocolhandler里面的connectionhandler的process方法来处理就好了。。。
好啦。。这里代码就终于从endpoint里面出来了。。又回到了protocolhandler,那么来看看connectionhandler的process方法吧(AbstractConnectionHandler):
//处理select出来的channel,第二个参数是状态,读写啥的。。 public SocketState process(SocketWrapper<S> wrapper, SocketStatus status) { if (wrapper == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } S socket = wrapper.getSocket(); //获取channel if (socket == null) { //channel有可能已经关闭了 // Nothing to do. Socket has been closed. return SocketState.CLOSED; } Processor<S> processor = connections.get(socket); //获取当前channel对应的Http11NioProcessor对象,用于http协议的解析 if (status == SocketStatus.DISCONNECT && processor == null) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; } wrapper.setAsync(false); // 默认不是异步的 ContainerThreadMarker.markAsContainerThread(); try { if (processor == null) { processor = recycledProcessors.pop(); //后去一个空闲的Http11NioProcessor } if (processor == null) { processor = createProcessor(); //如果没有空闲的话,那么需要创建一个新的Http11NioProcessor } initSsl(wrapper, processor); //如果有加密 SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { // Associate the processor with the connection as // these calls may result in a nested call to process() connections.put(socket, processor); //将channel与Http11NioProcessor对应起来 DispatchType nextDispatch = dispatches.next(); if (processor.isUpgrade()) { //如果是Upgrade state = processor.upgradeDispatch( nextDispatch.getSocketStatus()); } else { state = processor.asyncDispatch( nextDispatch.getSocketStatus()); } } else if (status == SocketStatus.DISCONNECT && !processor.isComet()) { // Do nothing here, just wait for it to get recycled // Don‘t do this for Comet we need to generate an end // event (see BZ 54022) } else if (processor.isAsync() || state == SocketState.ASYNC_END) { state = processor.asyncDispatch(status); } else if (processor.isComet()) { //继续操作,例如读取发送啥的 state = processor.event(status); } else if (processor.isUpgrade()) { state = processor.upgradeDispatch(status); } else { state = processor.process(wrapper); //其实一般常规的都是走这里 } if (state != SocketState.CLOSED && processor.isAsync()) { state = processor.asyncPostProcess(); } if (state == SocketState.UPGRADING) { //UPGRADING类型 // Get the HTTP upgrade handler HttpUpgradeHandler httpUpgradeHandler = processor.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(wrapper, processor, false, false); // Create the upgrade processor processor = createUpgradeProcessor( wrapper, httpUpgradeHandler); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. httpUpgradeHandler.init((WebConnection) processor); } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + wrapper + "], Status in: [" + status + "], State out: [" + state + "]"); } if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = wrapper.getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || state == SocketState.UPGRADING || dispatches != null && state != SocketState.CLOSED); if (state == SocketState.LONG) { //表示处理request或者response之间,例如数据还没有接收或者发送完,那么据需保持processor,并继续在poller上面注册 // In the middle of processing a request/response. Keep the // socket associated with the processor. Exact requirements // depend on type of long poll connections.put(socket, processor); longPoll(wrapper, processor); } else if (state == SocketState.OPEN) { //表示request已经搞定,但是还是keepalive的,那么回收processor对象,然后再将channel注册到poller上面去poller继续等待 // In keep-alive but between requests. OK to recycle // processor. Continue to poll for the next request. connections.remove(socket); release(wrapper, processor, false, true); } else if (state == SocketState.SENDFILE) { //如果在sendfile中 // Sendfile in progress. If it fails, the socket will be // closed. If it works, the socket will be re-added to the // poller connections.remove(socket); release(wrapper, processor, false, false); } else if (state == SocketState.UPGRADED) { // Don‘t add sockets back to the poller if this was a // non-blocking write otherwise the poller may trigger // multiple read events which may lead to thread starvation // in the connector. The write() method will add this socket // to the poller if necessary. if (status != SocketStatus.OPEN_WRITE) { longPoll(wrapper, processor); } } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. connections.remove(socket); if (processor.isUpgrade()) { processor.getHttpUpgradeHandler().destroy(); } else { release(wrapper, processor, true, false); } } return state; } catch(java.net.SocketException e) { // SocketExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.ioexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as // above. catch (Throwable e) { ExceptionUtils.handleThrowable(e); // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. getLog().error( sm.getString("abstractConnectionHandler.error"), e); } // Make sure socket/processor is removed from the list of current // connections connections.remove(socket); // Don‘t try to add upgrade processors back into the pool if (processor !=null && !processor.isUpgrade()) { release(wrapper, processor, true, false); } return SocketState.CLOSED; }
这里代码够长吧,其实简单的来说就是将channel封装到Http11NioProcessor上面去,让Http11NioProcessor对象来处理数据,例如读取数据,http协议解析啥的,然后根据处理的结果来看下次该怎么办,例如关闭channel,或者继续保持channel在poller的注册,如果一次http请求已经搞定了,那么还要回收processor啥的。。。
好啦,来看看Http11NioProcessor最常规的process方法吧:
// 常规的处理channel public SocketState process(SocketWrapper<S> socketWrapper) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Setting up the I/O setSocketWrapper(socketWrapper); getInputBuffer().init(socketWrapper, endpoint); getOutputBuffer().init(socketWrapper, endpoint); // Flags error = false; keepAlive = true; comet = false; openSocket = false; sendfileInProgress = false; readComplete = true; if (endpoint.getUsePolling()) { keptAlive = false; } else { keptAlive = socketWrapper.isKeptAlive(); } if (disableKeepAlive()) { socketWrapper.setKeepAliveLeft(0); } //处理requestline以及header while (!error && keepAlive && !comet && !isAsync() && httpUpgradeHandler == null && !endpoint.isPaused()) { // Parsing the request header try { setRequestLineReadTimeout(); //设置requestline读取超时 if (!getInputBuffer().parseRequestLine(keptAlive)) { //处理requestline if (handleIncompleteRequestLineRead()) { //表示读取requestline没有搞定 break; //跳出while循环吧,继续让poller来搞 } } if (endpoint.isPaused()) { //如果endpoint已经停止了,那么返回错误 // 503 - Service unavailable response.setStatus(503); error = true; } else { // Make sure that connectors that are non-blocking during // header processing (NIO) only set the start time the first // time a request is processed. if (request.getStartTime() < 0) { request.setStartTime(System.currentTimeMillis()); //设置刚开始的时间 } keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount()); //设置每次都要设置最大的header数量,因为可能通过jmx更改 // Currently only NIO will ever return false here if (!getInputBuffer().parseHeaders()) { //处理header,如果没有把header搞定完,那么跳出while循环 // We‘ve read part of the request, don‘t recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!disableUploadTimeout) { setSocketTimeout(connectionUploadTimeout); //接下来是body的数据上传超时 } } } catch (IOException e) { if (getLog().isDebugEnabled()) { getLog().debug( sm.getString("http11processor.header.parse"), e); } error = true; break; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); UserDataHelper.Mode logMode = userDataHelper.getNextMode(); if (logMode != null) { String message = sm.getString( "http11processor.header.parse"); switch (logMode) { case INFO_THEN_DEBUG: message += sm.getString( "http11processor.fallToDebug"); //$FALL-THROUGH$ case INFO: getLog().info(message); break; case DEBUG: getLog().debug(message); } } // 400 - Bad Request response.setStatus(400); getAdapter().log(request, response, 0); error = true; } if (!error) { //如果没有错误 // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { prepareRequest(); //准备request } catch (Throwable t) { ExceptionUtils.handleThrowable(t); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "http11processor.request.prepare"), t); } // 400 - Internal Server Error response.setStatus(400); getAdapter().log(request, response, 0); error = true; } } if (maxKeepAliveRequests == 1) { keepAlive = false; } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) { keepAlive = false; } // Process the request in the adapter if (!error) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); getAdapter().service(request, response); //调用adapter来处理request和response // Handle when the response was committed before a serious // error occurred. Throwing a ServletException should both // set the status to 500 and set the errorException. // If we fail here, then the response is likely already // committed, so we can‘t try and set headers. if(keepAlive && !error) { // Avoid checking twice. error = response.getErrorException() != null || (!isAsync() && statusDropsConnection(response.getStatus())); } setCometTimeouts(socketWrapper); //设置comet超时 } catch (InterruptedIOException e) { error = true; } catch (HeadersTooLargeException e) { //heander太多了 error = true; // The response should not have been committed but check it // anyway to be safe if (!response.isCommitted()) { response.reset(); response.setStatus(500); response.setHeader("Connection", "close"); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); getLog().error(sm.getString( "http11processor.request.process"), t); // 500 - Internal Server Error response.setStatus(500); getAdapter().log(request, response, 0); error = true; } } // Finish the handling of the request rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); if (!isAsync() && !comet) { //如果不是异步的servlet if (error) { // If we know we are closing the connection, don‘t drain // input. This way uploading a 100GB file doesn‘t tie up the // thread if the servlet has rejected it. getInputBuffer().setSwallowInput(false); } //如果返回的response不正常,那么keeplive将会设置为false,待会会关闭connection if (response.getStatus() < 200 || response.getStatus() > 299) { if (expectation) { // Client sent Expect: 100-continue but received a // non-2xx response. Disable keep-alive (if enabled) to // ensure the connection is closed. Some clients may // still send the body, some may send the next request. // No way to differentiate, so close the connection to // force the client to send the next request. getInputBuffer().setSwallowInput(false); keepAlive = false; } } endRequest(); //结束这次http请求 } rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); // If there was an error, make sure the request is counted as // and error, and update the statistics counter if (error) { response.setStatus(500); } request.updateCounters(); if (!isAsync() && !comet || error) { // 如果不是异步的,那么可以让buffer准备下次请求了 getInputBuffer().nextRequest(); getOutputBuffer().nextRequest(); } if (!disableUploadTimeout) { if(endpoint.getSoTimeout() > 0) { setSocketTimeout(endpoint.getSoTimeout()); } else { setSocketTimeout(0); } } rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); if (breakKeepAliveLoop(socketWrapper)) { break; } } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); if (error || endpoint.isPaused()) { return SocketState.CLOSED; } else if (isAsync() || comet) { //如果是异步的话或者还需要处理,那么返回long,将会维持processor以及在poller上面注册 return SocketState.LONG; } else if (isUpgrade()) { //类似于websocket的吧 return SocketState.UPGRADING; } else { if (sendfileInProgress) { //如果在sendfile过程中 return SocketState.SENDFILE; } else { // if (openSocket) { if (readComplete) { return SocketState.OPEN; } else { return SocketState.LONG; } } else { return SocketState.CLOSED; } } } }
这里代码也够长的了,其实主要也就是进行http协议的解析,然后调用adapter对象来处理生成的请求,然后根据处理的结果返回给connectionhandler,看接下来的工作。。。
这里可能很多细节都没有交代吧,例如异步的servlet啥的。。不过以后再说吧。。。
到这里。。整个connector的就算是差不多了。。。
用一张图来总结吧:
最后再来一段比较有意思的代码吧,直接用connector来返回hello world:
package registTest; import javax.management.Notification; import javax.management.NotificationListener; import javax.management.ObjectName; import org.apache.catalina.connector.Connector; import org.apache.coyote.Request; import org.apache.coyote.Response; import org.apache.coyote.http11.Http11NioProtocol; import org.apache.tomcat.util.modeler.BaseModelMBean; import org.apache.tomcat.util.modeler.ManagedBean; import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.SocketStatus; public class Fjs { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } public void hello() { System.out.println("hello : " + name); } public static void main(String args[]) throws Exception { /* Registry registry = new Registry(); registry.loadDescriptors("registTest", Fjs.class.getClassLoader()); ObjectName name = new ObjectName("fjs:type=hello"); Fjs fjs = new Fjs(); ManagedBean manager = registry.findManagedBean(Fjs.class.getName()); BaseModelMBean dmb = (BaseModelMBean)manager.createMBean(fjs); dmb.addAttributeChangeNotificationListener(new NotificationListener(){ @Override public void handleNotification(Notification notification, Object handback) { // TODO Auto-generated method stub System.out.println(notification); } }, null, null); registry.getMBeanServer().registerMBean(dmb, name); Thread.currentThread().sleep(Long.MAX_VALUE);*/ final Connector con = new Connector(); con.setPort(80); con.init(); con.start(); con.getProtocolHandler().setAdapter(new org.apache.coyote.Adapter(){ @Override public void service(Request req, Response res) throws Exception { // TODO Auto-generated method stub org.apache.catalina.connector.Request request = con.createRequest(); request.setCoyoteRequest(req); org.apache.catalina.connector.Response response = con.createResponse(); response.setCoyoteResponse(res); request.setResponse(response); response.setRequest(request); response.getWriter().println("hello world"); response.getWriter().flush(); //response.addHeader("aa", "aaaaaa"); } @Override public boolean event(Request req, Response res, SocketStatus status) throws Exception { // TODO Auto-generated method stub return false; } @Override public boolean asyncDispatch(Request req, Response res, SocketStatus status) throws Exception { // TODO Auto-generated method stub return false; } @Override public void log(Request req, Response res, long time) { // TODO Auto-generated method stub } @Override public String getDomain() { // TODO Auto-generated method stub return null; } }); Thread.currentThread().sleep(Long.MAX_VALUE); } }
Tomcat源码阅读之Connector设计与实现,布布扣,bubuko.com
原文:http://blog.csdn.net/fjslovejhl/article/details/20375359