网站开发 经常要清理缓存,网站开发设计费 怎么入账,广州seo推荐,html网页基础代码Connector容器主要负责解析socket请求#xff0c;在tomcat中的源码位于org.apache.catalina.connector和org.apache.coyote包路径下#xff1b;通过上两节的分析#xff0c;我们知道了Connector是Service的子容器#xff0c;而Service又是Server的子容器。在server.xml文件…Connector容器主要负责解析socket请求在tomcat中的源码位于org.apache.catalina.connector和org.apache.coyote包路径下通过上两节的分析我们知道了Connector是Service的子容器而Service又是Server的子容器。在server.xml文件中配置然后在Catalina类中通过Digester完成实例化。在server.xml中默认配置了两种Connector的实现分别用来处理Http请求和AJP请求。Connector的实现一共有以下三种 1、Http Connector解析HTTP请求又分为BIO Http Connector和NIO Http Connector即阻塞IO Connector和非阻塞IO Connector。本文主要分析NIO Http Connector的实现过程。 2、AJP基于AJP协议用于Tomcat与HTTP服务器通信定制的协议能提供较高的通信速度和效率。如与Apache服务器集成时采用这个协议。 3、APR HTTP Connector用C实现通过JNI调用的。主要提升对静态资源如HTML、图片、CSS、JS等的访问性能。 具体要使用哪种Connector可以在server.xml文件中通过protocol属性配置如下 Connector port8080 protocolHTTP/1.1connectionTimeout20000redirectPort8443 / 然后看一下Connector的构造器 public Connector(String protocol) {setProtocol(protocol);// Instantiate protocol handlerProtocolHandler p null;try {Class? clazz Class.forName(protocolHandlerClassName);p (ProtocolHandler) clazz.getConstructor().newInstance();} catch (Exception e) {log.error(sm.getString(coyoteConnector.protocolHandlerInstantiationFailed), e);} finally {this.protocolHandler p;}if (Globals.STRICT_SERVLET_COMPLIANCE) {uriCharset StandardCharsets.ISO_8859_1;} else {uriCharset StandardCharsets.UTF_8;}
}public void setProtocol(String protocol) {boolean aprConnector AprLifecycleListener.isAprAvailable() AprLifecycleListener.getUseAprConnector();if (HTTP/1.1.equals(protocol) || protocol null) {if (aprConnector) {setProtocolHandlerClassName(org.apache.coyote.http11.Http11AprProtocol);} else {setProtocolHandlerClassName(org.apache.coyote.http11.Http11NioProtocol);}} else if (AJP/1.3.equals(protocol)) {if (aprConnector) {setProtocolHandlerClassName(org.apache.coyote.ajp.AjpAprProtocol);} else {setProtocolHandlerClassName(org.apache.coyote.ajp.AjpNioProtocol);}} else {setProtocolHandlerClassName(protocol);}
} 通过分析Connector构造器的源码可以知道每一个Connector对应了一个protocolHandler一个protocolHandler被设计用来监听服务器某个端口的网络请求但并不负责处理请求(处理请求由Container组件完成)。下面就以Http11NioProtocol为例分析Http请求的解析过程。在Connector的startInterval方法中启动了protocolHandler,代码如下 protected void startInternal() throws LifecycleException {// Validate settings before startingif (getPort() 0) {throw new LifecycleException(sm.getString(coyoteConnector.invalidPort, Integer.valueOf(getPort())));}setState(LifecycleState.STARTING);try {protocolHandler.start();} catch (Exception e) {throw new LifecycleException(sm.getString(coyoteConnector.protocolHandlerStartFailed), e);}
} Http11NioProtocol创建一个org.apache.tomcat.util.net.NioEndpoint实例,然后将监听端口并解析请求的工作全被委托给NioEndpoint实现。tomcat在使用Http11NioProtocol解析HTTP请求时一共设计了三种线程分别为AcceptorPoller和Worker。 1、Acceptor线程 Acceptor实现了Runnable接口根据其命名就知道它是一个接收器负责接收socket其接收方法是serverSocket.accept()方式获得SocketChannel对象然后封装成tomcat自定义的org.apache.tomcat.util.net.NioChannel。虽然是Nio但在接收socket时仍然使用传统的方法使用阻塞方式实现。Acceptor以线程池的方式被创建和管理在NioEndpoint的startInternal()方法中完成Acceptor的启动源码如下 public void startInternal() throws Exception {if (!running) {running true;paused false;processorCache new SynchronizedStack(SynchronizedStack.DEFAULT_SIZE,socketProperties.getProcessorCache());eventCache new SynchronizedStack(SynchronizedStack.DEFAULT_SIZE,socketProperties.getEventCache());nioChannels new SynchronizedStack(SynchronizedStack.DEFAULT_SIZE,socketProperties.getBufferPool());// Create worker collectionif ( getExecutor() null ) {createExecutor();}//设置最大连接数,默认值为maxConnections 10000通过同步器AQS实现。initializeConnectionLatch();//默认是2个Math.min(2,Runtime.getRuntime().availableProcessors());和虚拟机处理器个数比较// Start poller threadspollers new Poller[getPollerThreadCount()];for (int i0; ipollers.length; i) {pollers[i] new Poller();Thread pollerThread new Thread(pollers[i], getName() -ClientPoller-i);pollerThread.setPriority(threadPriority);pollerThread.setDaemon(true);pollerThread.start();}startAcceptorThreads();}
} 继续追踪startAcceptorThreads的源码 protected final void startAcceptorThreads() {//启动Acceptor线程默认是1个int count getAcceptorThreadCount();acceptors new Acceptor[count];for (int i 0; i count; i) {acceptors[i] createAcceptor();String threadName getName() -Acceptor- i;acceptors[i].setThreadName(threadName);Thread t new Thread(acceptors[i], threadName);t.setPriority(getAcceptorThreadPriority());t.setDaemon(getDaemon());t.start();}
} Acceptor线程的核心代码在它的run方法中 protected class Acceptor extends AbstractEndpoint.Acceptor {Overridepublic void run() {int errorDelay 0;// Loop until we receive a shutdown commandwhile (running) {// Loop if endpoint is pausedwhile (paused running) {state AcceptorState.PAUSED;try {Thread.sleep(50);} catch (InterruptedException e) {// Ignore}}if (!running) {break;}state AcceptorState.RUNNING;try {//if we have reached max connections, waitcountUpOrAwaitConnection();SocketChannel socket null;try {// Accept the next incoming connection from the server// socket//接收socket请求socket serverSock.accept();} catch (IOException ioe) {// We didnt get a socketcountDownConnection();if (running) {// Introduce delay if necessaryerrorDelay handleExceptionWithDelay(errorDelay);// re-throwthrow ioe;} else {break;}}// Successful accept, reset the error delayerrorDelay 0;// Configure the socketif (running !paused) {// setSocketOptions() will hand the socket off to// an appropriate processor if successfulif (!setSocketOptions(socket)) {closeSocket(socket);}} else {closeSocket(socket);}} catch (Throwable t) {ExceptionUtils.handleThrowable(t);log.error(sm.getString(endpoint.accept.fail), t);}}state AcceptorState.ENDED;}private void closeSocket(SocketChannel socket) {countDownConnection();try {socket.socket().close();} catch (IOException ioe) {if (log.isDebugEnabled()) {log.debug(sm.getString(endpoint.err.close), ioe);}}try {socket.close();} catch (IOException ioe) {if (log.isDebugEnabled()) {log.debug(sm.getString(endpoint.err.close), ioe);}}}
} Acceptor完成了socket请求的接收然后交给NioEndpoint 进行配置继续追踪Endpoint的setSocketOptions方法。 protected boolean setSocketOptions(SocketChannel socket) {// Process the connectiontry {//disable blocking, APR style, we are gonna be polling it//设置为非阻塞socket.configureBlocking(false);Socket sock socket.socket();socketProperties.setProperties(sock);NioChannel channel nioChannels.pop();if (channel null) {SocketBufferHandler bufhandler new SocketBufferHandler(socketProperties.getAppReadBufSize(),socketProperties.getAppWriteBufSize(),socketProperties.getDirectBuffer());if (isSSLEnabled()) {channel new SecureNioChannel(socket, bufhandler, selectorPool, this);} else {channel new NioChannel(socket, bufhandler);}} else {channel.setIOChannel(socket);channel.reset();}//轮训pollers数组元素调用Poller的register方法完成channel的注册。getPoller0().register(channel);} catch (Throwable t) {ExceptionUtils.handleThrowable(t);try {log.error(,t);} catch (Throwable tt) {ExceptionUtils.handleThrowable(tt);}// Tell to close the socketreturn false;}return true;
} 分析setSocketOptions的源码可以知道该方法的主要功能是利用传入的SocketChannel参数生成SecureNioChannel或者NioChannel然后注册到Poller线程的selector中可以进一步了解Java nio的相关知识对这一块内容有更深的理解。 2、Poller线程 Poller同样实现了Runnable接口是NioEndpoint类的内部类。在Endpoint的startInterval方法中创建、配置并启动了Poller线程见代码清单4。Poller主要职责是不断轮询其selector检查准备就绪的socket(有数据可读或可写)实现io的多路复用。其构造其中初始化了selector。 public Poller() throws IOException {this.selector Selector.open();
} 在分析Acceptor的时候提到了Acceptor接受到一个socket请求后调用NioEndpoint的setSocketOptions方法(代码清单6),该方法生成了NioChannel后调用Poller的register方法生成PoolorEvent后加入到Eventqueueregister方法的源码如下 public void register(final NioChannel socket) {socket.setPoller(this);NioSocketWrapper ka new NioSocketWrapper(socket, NioEndpoint.this);socket.setSocketWrapper(ka);ka.setPoller(this);ka.setReadTimeout(getSocketProperties().getSoTimeout());ka.setWriteTimeout(getSocketProperties().getSoTimeout());ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());ka.setSecure(isSSLEnabled());ka.setReadTimeout(getConnectionTimeout());ka.setWriteTimeout(getConnectionTimeout());PollerEvent r eventCache.pop();ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.//生成PoolorEvent并加入到Eventqueueif ( rnull) r new PollerEvent(socket,ka,OP_REGISTER);else r.reset(socket,ka,OP_REGISTER);addEvent(r);
} Poller的核心代码也在其run方法中: public void run() {// Loop until destroy() is called// 调用了destroy()方法后终止此循环while (true) {boolean hasEvents false;try {if (!close) {hasEvents events();if (wakeupCounter.getAndSet(-1) 0) {//if we are here, means we have other stuff to do//do a non blocking select//非阻塞的 selectkeyCount selector.selectNow();} else {//阻塞selector直到有准备就绪的socketkeyCount selector.select(selectorTimeout);}wakeupCounter.set(0);}if (close) {//该方法遍历了eventqueue中的所有PollerEvent然后依次调用PollerEvent的run将socket注册到selector中。events();timeout(0, false);try {selector.close();} catch (IOException ioe) {log.error(sm.getString(endpoint.nio.selectorCloseFail), ioe);}break;}} catch (Throwable x) {ExceptionUtils.handleThrowable(x);log.error(,x);continue;}//either we timed out or we woke up, process events firstif ( keyCount 0 ) hasEvents (hasEvents | events());IteratorSelectionKey iterator keyCount 0 ? selector.selectedKeys().iterator() : null;// Walk through the collection of ready keys and dispatch// any active event.//遍历就绪的socket事件while (iterator ! null iterator.hasNext()) {SelectionKey sk iterator.next();NioSocketWrapper attachment (NioSocketWrapper)sk.attachment();// Attachment may be null if another thread has called// cancelledKey()if (attachment null) {iterator.remove();} else {iterator.remove();//调用processKey方法对有数据读写的socket进行处理在分析Worker线程时会分析该方法processKey(sk, attachment);}}//while//process timeoutstimeout(keyCount,hasEvents);}//while
getStopLatch().countDown();
} run方法中调用了events方法 public boolean events() {boolean result false;PollerEvent pe null;for (int i 0, size events.size(); i size (pe events.poll()) ! null; i ) {result true;try {//将pollerEvent中的每个socketChannel注册到selector中pe.run();pe.reset();if (running !paused) {//将注册了的pollerEvent加到endPoint.eventCacheeventCache.push(pe);}} catch ( Throwable x ) {log.error(,x);}}return result;
} 继续跟进PollerEvent的run方法 public void run() {if (interestOps OP_REGISTER) {try {//将SocketChannel注册到selector中,注册时间为SelectionKey.OP_READ读事件socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);} catch (Exception x) {log.error(sm.getString(endpoint.nio.registerFail), x);}} else {final SelectionKey key socket.getIOChannel().keyFor(socket.getPoller().getSelector());try {if (key null) {// The key was cancelled (e.g. due to socket closure)// and removed from the selector while it was being// processed. Count down the connections at this point// since it wont have been counted down when the socket// closed.socket.socketWrapper.getEndpoint().countDownConnection();((NioSocketWrapper) socket.socketWrapper).closed true;} else {final NioSocketWrapper socketWrapper (NioSocketWrapper) key.attachment();if (socketWrapper ! null) {//we are registering the key to start with, reset the fairness counter.int ops key.interestOps() | interestOps;socketWrapper.interestOps(ops);key.interestOps(ops);} else {socket.getPoller().cancelledKey(key);}}} catch (CancelledKeyException ckx) {try {socket.getPoller().cancelledKey(key);} catch (Exception ignore) {}}}
} 3、Worker线程 Worker线程即SocketProcessor是用来处理Socket请求的。SocketProcessor也同样是Endpoint的内部类。在Poller的run方法中(代码清单8)监听到准备就绪的socket时会调用processKey方法进行处理 protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {try {if ( close ) {cancelledKey(sk);} else if ( sk.isValid() attachment ! null ) {//有读写事件就绪时if (sk.isReadable() || sk.isWritable() ) {if ( attachment.getSendfileData() ! null ) {processSendfile(sk,attachment, false);} else {unreg(sk, attachment, sk.readyOps());boolean closeSocket false;// Read goes before write// socket可读时先处理读事件if (sk.isReadable()) {//调用processSocket方法进一步处理if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {closeSocket true;}}//写事件if (!closeSocket sk.isWritable()) {//调用processSocket方法进一步处理if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {closeSocket true;}}if (closeSocket) {cancelledKey(sk);}}}} else {//invalid keycancelledKey(sk);}} catch ( CancelledKeyException ckx ) {cancelledKey(sk);} catch (Throwable t) {ExceptionUtils.handleThrowable(t);log.error(,t);}
} 继续跟踪processSocket方法 public boolean processSocket(SocketWrapperBaseS socketWrapper,SocketEvent event, boolean dispatch) {try {if (socketWrapper null) {return false;}// 尝试循环利用之前回收的SocketProcessor对象如果没有可回收利用的则创建新的SocketProcessor对象SocketProcessorBaseS sc processorCache.pop();if (sc null) {sc createSocketProcessor(socketWrapper, event);} else {// 循环利用回收的SocketProcessor对象sc.reset(socketWrapper, event);}Executor executor getExecutor();if (dispatch executor ! null) {//SocketProcessor实现了Runneble接口可以直接传入execute方法进行处理executor.execute(sc);} else {sc.run();}} catch (RejectedExecutionException ree) {getLog().warn(sm.getString(endpoint.executor.fail, socketWrapper) , ree);return false;} catch (Throwable t) {ExceptionUtils.handleThrowable(t);// This means we got an OOM or similar creating a thread, or that// the pool and its queue are fullgetLog().error(sm.getString(endpoint.process.fail), t);return false;}return true;
}//NioEndpoint中createSocketProcessor创建一个SocketProcessor。
protected SocketProcessorBaseNioChannel createSocketProcessor(SocketWrapperBaseNioChannel socketWrapper, SocketEvent event) {return new SocketProcessor(socketWrapper, event);
} 总结 Http11NioProtocol是基于Java Nio实现的创建了Acceptor、Poller和Worker线程实现多路io的复用。三类线程之间的关系如下图所示 Acceptor和Poller之间是生产者消费者模式的关系Acceptor不断向EventQueue中添加PollerEventPollor轮询检查EventQueue中就绪的PollerEvent然后发送给Work线程进行处理。 转载于:https://www.cnblogs.com/grasp/p/10099897.html