`
com0606
  • 浏览: 60292 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

侃侃tomcat的bio,nio

 
阅读更多

对于这2种io以及在socket的应用不作描述,主要探究一下在tomcat中是如何应用这2种io的。找了2个版本的tomcat做一下对比,以tomcat4、tomcat6为例。

1 .tomcat4

在tomcat4中,只有bio的使用。首先看这个类

public final class HttpConnector
    implements Connector, Lifecycle, Runnable

  它实现了Runnable接口以及Lifecycle接口,在tomcat中,实现Lifecycle接口的类都需要实现start和stop等接口,作为一种规范,在类被实例化后start方法会被显式调用;并且在启动和销毁时都会触发相应事件,这里不多说。

HttpConnector在被实例化后它的start方法会被调用

 

private Stack processors = new Stack();

public void start() throws LifecycleException {

    // Validate and update our current state
    if (started)
        throw new LifecycleException
            (sm.getString("httpConnector.alreadyStarted"));
    threadName = "HttpConnector[" + port + "]";
    lifecycle.fireLifecycleEvent(START_EVENT, null);
    started = true;

    // Start our background thread
    threadStart();

    // Create the specified minimum number of processors
    while (curProcessors < minProcessors) {
        if ((maxProcessors > 0) && (curProcessors >= maxProcessors))
            break;
        HttpProcessor processor = newProcessor();
        recycle(processor);
    }

}

private void threadStart() {

    log(sm.getString("httpConnector.starting"));

    thread = new Thread(this, threadName);
    thread.setDaemon(true);
    thread.start();

}

public void run() {
    // Loop until we receive a shutdown command
    while (!stopped) {
        // Accept the next incoming connection from the server socket
        Socket socket = null;
        try {
            //                if (debug >= 3)
            //                    log("run: Waiting on serverSocket.accept()");
            socket = serverSocket.accept();
            //                if (debug >= 3)
            //                    log("run: Returned from serverSocket.accept()");
            if (connectionTimeout > 0)
                socket.setSoTimeout(connectionTimeout);
            socket.setTcpNoDelay(tcpNoDelay);
        } catch (AccessControlException ace) {
            log("socket accept security exception", ace);
            continue;
        } catch (IOException e) {
            //                if (debug >= 3)
            //                    log("run: Accept returned IOException", e);
            try {
                // If reopening fails, exit
                synchronized (threadSync) {
                    if (started && !stopped)
                        log("accept error: ", e);
                    if (!stopped) {
                        //                    if (debug >= 3)
                        //                        log("run: Closing server socket");
                        serverSocket.close();
                        //                        if (debug >= 3)
                        //                            log("run: Reopening server socket");
                        serverSocket = open();
                    }
                }
                //                    if (debug >= 3)
                //                        log("run: IOException processing completed");
            } catch (IOException ioe) {
                log("socket reopen, io problem: ", ioe);
                break;
            } catch (KeyStoreException kse) {
                log("socket reopen, keystore problem: ", kse);
                break;
            } catch (NoSuchAlgorithmException nsae) {
                log("socket reopen, keystore algorithm problem: ", nsae);
                break;
            } catch (CertificateException ce) {
                log("socket reopen, certificate problem: ", ce);
                break;
            } catch (UnrecoverableKeyException uke) {
                log("socket reopen, unrecoverable key: ", uke);
                break;
            } catch (KeyManagementException kme) {
                log("socket reopen, key management problem: ", kme);
                break;
            }

            continue;
        }

        // Hand this socket off to an appropriate processor
        HttpProcessor processor = createProcessor();
        if (processor == null) {
            try {
                log(sm.getString("httpConnector.noProcessor"));
                socket.close();
            } catch (IOException e) {
                ;
            }
            continue;
        }
        //            if (debug >= 3)
        //                log("run: Assigning socket to processor " + processor);
        processor.assign(socket);

        // The processor will recycle itself when it finishes

    }

    // Notify the threadStop() method that we have shut ourselves down
    //        if (debug >= 3)
    //            log("run: Notifying threadStop() that we have shut down");
    synchronized (threadSync) {
        threadSync.notifyAll();
    }

}

 

调用start方法,线程被启动,在此之前会创建好ServerSocket,如果ServerSocket异常关闭会重新启动。接着run方法内会一直while循环,调用ServerSocket的accept从请求队列中获取套接字socket

 

private ServerSocket open()
throws IOException, KeyStoreException, NoSuchAlgorithmException,
       CertificateException, UnrecoverableKeyException,
       KeyManagementException
{

    // Acquire the server socket factory for this Connector
    ServerSocketFactory factory = getFactory();

    // If no address is specified, open a connection on all addresses
    if (address == null) {
        log(sm.getString("httpConnector.allAddresses"));
        try {
            return (factory.createSocket(port, acceptCount));
        } catch (BindException be) {
            throw new BindException(be.getMessage() + ":" + port);
        }
    }

    // Open a server socket on the specified address
    try {
        InetAddress is = InetAddress.getByName(address);
        log(sm.getString("httpConnector.anAddress", address));
        try {
            return (factory.createSocket(port, acceptCount, is));
        } catch (BindException be) {
            throw new BindException(be.getMessage() + ":" + address +
                                    ":" + port);
        }
    } catch (Exception e) {
        log(sm.getString("httpConnector.noAddress", address));
        try {
            return (factory.createSocket(port, acceptCount));
        } catch (BindException be) {
            throw new BindException(be.getMessage() + ":" + port);
        }
    }

}

 

在处理socket的时候,会开启线程异步执行,这样可以保证多线程同时处理多个请求,避免单线程只能处理一个请求而其他请求阻塞的情况。HttpConnector采用线程池的方式来实现线程的复用,用stack维护一定数量的HttpProcessor,在需要的时候从stack中pop线程处理请求

private HttpProcessor createProcessor() {

    synchronized (processors) {
        if (processors.size() > 0) {
            // if (debug >= 2)
            // log("createProcessor: Reusing existing processor");
            return ((HttpProcessor) processors.pop());
        }
        if ((maxProcessors > 0) && (curProcessors < maxProcessors)) {
            // if (debug >= 2)
            // log("createProcessor: Creating new processor");
            return (newProcessor());
        } else {
            if (maxProcessors < 0) {
                // if (debug >= 2)
                // log("createProcessor: Creating new processor");
                return (newProcessor());
            } else {
                // if (debug >= 2)
                // log("createProcessor: Cannot create new processor");
                return (null);
            }
        }
    }

}

 

再看处理请求的类

final class HttpProcessor
    implements Lifecycle, Runnable

 它也是实现了Runnable接口,run方法中一直执行while循环,只要有新的socket,就会调用process处理它。

synchronized void assign(Socket socket) {

    // Wait for the Processor to get the previous Socket
    while (available) {
        try {
            wait();
        } catch (InterruptedException e) {
        }
    }

    // Store the newly available Socket and notify our thread
    this.socket = socket;
    available = true;
    notifyAll();

    if ((debug >= 1) && (socket != null))
        log(" An incoming request is being assigned");

}

public void run() {

    // Process requests until we receive a shutdown signal
    while (!stopped) {

        // Wait for the next socket to be assigned
        Socket socket = await();
        if (socket == null)
            continue;

        // Process the request from this socket
        try {
            process(socket);
        } catch (Throwable t) {
            log("process.invoke", t);
        }

        // Finish up this request
        connector.recycle(this);

    }

    // Tell threadStop() we have shut ourselves down successfully
    synchronized (threadSync) {
        threadSync.notifyAll();
    }

}
 
当assign方法被调用,线程被唤醒,run方法中while循环执行,处理socket请求,当一个socket被处理完,调用recycle归还该线程至线程池。
void recycle(HttpProcessor processor) {

    //        if (debug >= 2)
    //            log("recycle: Recycling processor " + processor);
    processors.push(processor);

}
 可以看到,调用了Stack的push方法。
总结:tomcat4基于阻塞io的方法,使用基于Stack数据结构来维护线程池,可以开启多个线程异步同时处理多个并发请求。
 
2.tomcat6
tomcat6开始支持nio了,可以通过改conf/server.xml配置文件开启 
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" 
               connectionTimeout="20000" 
               redirectPort="8443" /> 
 
相关类在org.apache.catalina.tribes.transport包下。
默认是bio模式,实现方式虽然和tomcat4不一样,但是核心内容大差不差。相关类换成了
//ServerSocket启动,监听
public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback

//具体的处理socket类
public class BioReplicationThread extends WorkerThread
 
线程池的实现也和之前不同,维护了idle,used两个LinkedList,WorkerThread就在两个集合中来回切换。
package org.apache.catalina.tribes.transport;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

/**
 * @author not attributable
 * @version 1.0
 */

public class ThreadPool
{
    /**
     * A very simple thread pool class.  The pool size is set at
     * construction time and remains fixed.  Threads are cycled
     * through a FIFO idle queue.
     */

    List idle = new LinkedList();
    List used = new LinkedList();
    
    Object mutex = new Object();
    boolean running = true;
    
    private static int counter = 1;
    private int maxThreads;
    private int minThreads;
    
    private ThreadCreator creator = null;

    private static synchronized int inc() {
        return counter++;
    }

    
    public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception {
        // fill up the pool with worker threads
        this.maxThreads = maxThreads;
        this.minThreads = minThreads;
        this.creator = creator;
        //for (int i = 0; i < minThreads; i++) {
        for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
            WorkerThread thread = creator.getWorkerThread();
            setupThread(thread);
            idle.add (thread);
        }
    }
    
    protected void setupThread(WorkerThread thread) {
        synchronized (thread) {
            thread.setPool(this);
            thread.setName(thread.getClass().getName() + "[" + inc() + "]");
            thread.setDaemon(true);
            thread.setPriority(Thread.MAX_PRIORITY);
            thread.start();
            try {thread.wait(500); }catch ( InterruptedException x ) {}
        }
    }

    /**
     * Find an idle worker thread, if any.  Could return null.
     */
    public WorkerThread getWorker()
    {
        WorkerThread worker = null;
        synchronized (mutex) {
            while ( worker == null && running ) {
                if (idle.size() > 0) {
                    try {
                        worker = (WorkerThread) idle.remove(0);
                    } catch (java.util.NoSuchElementException x) {
                        //this means that there are no available workers
                        worker = null;
                    }
                } else if ( used.size() < this.maxThreads && creator != null) {
                    worker = creator.getWorkerThread();
                    setupThread(worker);
                } else {
                    try { mutex.wait(); } catch ( InterruptedException x ) {Thread.currentThread().interrupted();}
                }
            }//while
            if ( worker != null ) used.add(worker);
        }
        return (worker);
    }
    
    public int available() {
        return idle.size();
    }

    /**
     * Called by the worker thread to return itself to the
     * idle pool.
     */
    public void returnWorker (WorkerThread worker) {
        if ( running ) {
            synchronized (mutex) {
                used.remove(worker);
                //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
                if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
                else {
                    worker.setDoRun(false);
                    synchronized (worker){worker.notify();}
                }
                mutex.notify();
            }
        }else {
            worker.setDoRun(false);
            synchronized (worker){worker.notify();}
        }
    }

    public int getMaxThreads() {
        return maxThreads;
    }

    public int getMinThreads() {
        return minThreads;
    }

    public void stop() {
        running = false;
        synchronized (mutex) {
            Iterator i = idle.iterator();
            while ( i.hasNext() ) {
                WorkerThread worker = (WorkerThread)i.next();
                returnWorker(worker);
                i.remove();
            }
        }
    }

    public void setMaxThreads(int maxThreads) {
        this.maxThreads = maxThreads;
    }

    public void setMinThreads(int minThreads) {
        this.minThreads = minThreads;
    }

    public ThreadCreator getThreadCreator() {
        return this.creator;
    }
    
    public static interface ThreadCreator {
        public WorkerThread getWorkerThread();
    }
}
 
在开启了nio模式后,tomcat6就会基于nio的方式启动,工作。
nio主线程启动,监听主要看这个类 
public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback
 
看关键部分代码
public void start() throws IOException {
    try {
    //            setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
        setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
    } catch (Exception x) {
        log.fatal("ThreadPool can initilzed. Listener not started", x);
        if ( x instanceof IOException ) throw (IOException)x;
        else throw new IOException(x.getMessage());
    }
    try {
        getBind();
        bind();
        Thread t = new Thread(this, "NioReceiver");
        t.setDaemon(true);
        t.start();
    } catch (Exception x) {
        log.fatal("Unable to start cluster receiver", x);
        if ( x instanceof IOException ) throw (IOException)x;
        else throw new IOException(x.getMessage());
    }
}

protected void bind() throws IOException {
    // allocate an unbound server socket channel
    serverChannel = ServerSocketChannel.open();
    // Get the associated ServerSocket to bind it with
    ServerSocket serverSocket = serverChannel.socket();
    // create a new Selector for use below
    selector = Selector.open();
    // set the port the server channel will listen to
    //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
    bind(serverSocket,getTcpListenPort(),getAutoBind());
    // set non-blocking mode for the listening socket
    serverChannel.configureBlocking(false);
    // register the ServerSocketChannel with the Selector
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    
}

protected void listen() throws Exception {
    if (doListen()) {
        log.warn("ServerSocketChannel already started");
        return;
    }
    
    setListen(true);

    while (doListen() && selector != null) {
        // this may block for a long time, upon return the
        // selected set contains keys of the ready channels
        try {
            events();
            socketTimeouts();
            int n = selector.select(getTcpSelectorTimeout());
            if (n == 0) {
                //there is a good chance that we got here
                //because the TcpReplicationThread called
                //selector wakeup().
                //if that happens, we must ensure that that
                //thread has enough time to call interestOps
//                    synchronized (interestOpsMutex) {
                    //if we got the lock, means there are no
                    //keys trying to register for the
                    //interestOps method
//                    }
                continue; // nothing to do
            }
            // get an iterator over the set of selected keys
            Iterator it = selector.selectedKeys().iterator();
            // look at each key in the selected set
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                // Is a new connection coming in?
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel channel = server.accept();
                    channel.socket().setReceiveBufferSize(getRxBufSize());
                    channel.socket().setSendBufferSize(getTxBufSize());
                    channel.socket().setTcpNoDelay(getTcpNoDelay());
                    channel.socket().setKeepAlive(getSoKeepAlive());
                    channel.socket().setOOBInline(getOoBInline());
                    channel.socket().setReuseAddress(getSoReuseAddress());
                    channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
                    channel.socket().setTrafficClass(getSoTrafficClass());
                    channel.socket().setSoTimeout(getTimeout());
                    Object attach = new ObjectReader(channel);
                    registerChannel(selector,
                                    channel,
                                    SelectionKey.OP_READ,
                                    attach);
                }
                // is there data to read on this channel?
                if (key.isReadable()) {
                    readDataFromSocket(key);
                } else {
                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
                }

                // remove key from selected set, it's been handled
                it.remove();
            }
        } catch (java.nio.channels.ClosedSelectorException cse) {
            // ignore is normal at shutdown or stop listen socket
        } catch (CancelledKeyException nx) {
            log.warn("Replication client disconnected, error when polling key. Ignoring client.");
        } catch (Throwable x) {
            try {
                log.error("Unable to process request in NioReceiver", x);
            }catch ( Throwable tx ) {
                //in case an out of memory error, will affect the logging framework as well
                tx.printStackTrace();
            }
        }

    }
    serverChannel.close();
    if (selector != null)
        selector.close();
}

protected void registerChannel(Selector selector,
                               SelectableChannel channel,
                               int ops,
                               Object attach) throws Exception {
    if (channel == null)return; // could happen
    // set the new channel non-blocking
    channel.configureBlocking(false);
    // register it with the selector
    channel.register(selector, ops, attach);
}

public void run() {
    try {
        listen();
    } catch (Exception x) {
        log.error("Unable to run replication listener.", x);
    }
}

protected void readDataFromSocket(SelectionKey key) throws Exception {
    NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
    if (worker == null) {
        // No threads available, do nothing, the selection
        // loop will keep calling this method until a
        // thread becomes available, the thread pool itself has a waiting mechanism
        // so we will not wait here.
        if (log.isDebugEnabled())
            log.debug("No TcpReplicationThread available");
    } else {
        // invoking this wakes up the worker thread then returns
        worker.serviceChannel(key);
    }
}
 
线程被启动,run执行,ServerSocketChannel和Selector被创建,绑定端口并且注册accept事件。while循环,调用selector的select方法。并且设定阻塞事件,这样既不会在没有请求需要处理的时候线程一直阻塞,也不会不停loop一直占用cpu。当请求到达,就会有acceptable的SelectionKey,主线程会循环处理完这些SelectionKey。获取SocketChannel并将其注册到selector,注册read事件。待后续select被调用会取出这些readable的SelectionKey。调用readDataFromSocket方法,从线程池获取worker来处理请求。
再看NioReplicationThread的实现
public class NioReplicationThread extends WorkerThread {
    
    private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationThread.class );
    
    private ByteBuffer buffer = null;
    private SelectionKey key;
    private int rxBufSize;
    private NioReceiver receiver;
    public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
    {
        super(callback);
        this.receiver = receiver;
    }

    // loop forever waiting for work to do
    public synchronized void run() { 
        this.notify();
        if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
            buffer = ByteBuffer.allocateDirect(getRxBufSize());
        }else {
            buffer = ByteBuffer.allocate (getRxBufSize());
        }
        while (isDoRun()) {
            try {
                // sleep and release object lock
                this.wait();
            } catch (InterruptedException e) {
                if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
                // clear interrupt status
                Thread.interrupted();
            }
            if (key == null) {
                continue;	// just in case
            }
            if ( log.isTraceEnabled() ) 
                log.trace("Servicing key:"+key);

            try {
                ObjectReader reader = (ObjectReader)key.attachment();
                if ( reader == null ) {
                    if ( log.isTraceEnabled() ) 
                        log.trace("No object reader, cancelling:"+key);
                    cancelKey(key);
                } else {
                    if ( log.isTraceEnabled() ) 
                        log.trace("Draining channel:"+key);

                    drainChannel(key, reader);
                }
            } catch (Exception e) {
                //this is common, since the sockets on the other
                //end expire after a certain time.
                if ( e instanceof CancelledKeyException ) {
                    //do nothing
                } else if ( e instanceof IOException ) {
                    //dont spew out stack traces for IO exceptions unless debug is enabled.
                    if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
                    else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
                } else if ( log.isErrorEnabled() ) {
                    //this is a real error, log it.
                    log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
                } 
                cancelKey(key);
            } finally {
                
            }
            key = null;
            // done, ready for more, return to pool
            getPool().returnWorker (this);
        }
    }

    /**
     * Called to initiate a unit of work by this worker thread
     * on the provided SelectionKey object.  This method is
     * synchronized, as is the run() method, so only one key
     * can be serviced at a given time.
     * Before waking the worker thread, and before returning
     * to the main selection loop, this key's interest set is
     * updated to remove OP_READ.  This will cause the selector
     * to ignore read-readiness for this channel while the
     * worker thread is servicing it.
     */
    public synchronized void serviceChannel (SelectionKey key) {
        if ( log.isTraceEnabled() ) 
            log.trace("About to service key:"+key);
        ObjectReader reader = (ObjectReader)key.attachment();
        if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
        this.key = key;
        key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
        key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
        this.notify();		// awaken the thread
    }

    /**
     * The actual code which drains the channel associated with
     * the given key.  This method assumes the key has been
     * modified prior to invocation to turn off selection
     * interest in OP_READ.  When this method completes it
     * re-enables OP_READ and calls wakeup() on the selector
     * so the selector will resume watching this channel.
     */
    protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception {
        reader.setLastAccess(System.currentTimeMillis());
        reader.access();
        SocketChannel channel = (SocketChannel) key.channel();
        int count;
        buffer.clear();			// make buffer empty

        // loop while data available, channel is non-blocking
        while ((count = channel.read (buffer)) > 0) {
            buffer.flip();		// make buffer readable
            if ( buffer.hasArray() ) 
                reader.append(buffer.array(),0,count,false);
            else 
                reader.append(buffer,count,false);
            buffer.clear();		// make buffer empty
            //do we have at least one package?
            if ( reader.hasPackage() ) break;
        }

        int pkgcnt = reader.count();
        
        if (count < 0 && pkgcnt == 0 ) {
            //end of stream, and no more packages to process
            remoteEof(key);
            return;
        }
        
        ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
        
        registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
        
        for ( int i=0; i<msgs.length; i++ ) {
            /**
             * Use send ack here if you want to ack the request to the remote 
             * server before completing the request
             * This is considered an asynchronized request
             */
            if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
            try {
                if ( Logs.MESSAGES.isTraceEnabled() ) {
                    try {
                        Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
                    }catch ( Throwable t ) {}
                }
                //process the message
                getCallback().messageDataReceived(msgs[i]);
                /**
                 * Use send ack here if you want the request to complete on this 
                 * server before sending the ack to the remote server
                 * This is considered a synchronized request
                 */
                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
            }catch ( RemoteProcessException e ) {
                if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
            }catch ( Exception e ) {
                log.error("Processing of cluster message failed.",e);
                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
            }
            if ( getUseBufferPool() ) {
                BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
                msgs[i].setMessage(null);
            }
        }                        
        
        if (count < 0) {
            remoteEof(key);
            return;
        }
    }

    private void remoteEof(SelectionKey key) {
        // close channel on EOF, invalidates the key
        if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
        cancelKey(key);
    }

    protected void registerForRead(final SelectionKey key, ObjectReader reader) {
        if ( log.isTraceEnabled() ) 
            log.trace("Adding key for read event:"+key);
        reader.finish();
        //register our OP_READ interest
        Runnable r = new Runnable() {
            public void run() {
                try {
                    if (key.isValid()) {
                        // cycle the selector so this key is active again
                        key.selector().wakeup();
                        // resume interest in OP_READ, OP_WRITE
                        int resumeOps = key.interestOps() | SelectionKey.OP_READ;
                        key.interestOps(resumeOps);
                        if ( log.isTraceEnabled() ) 
                            log.trace("Registering key for read:"+key);
                    }
                } catch (CancelledKeyException ckx ) {
                    NioReceiver.cancelledKey(key);
                    if ( log.isTraceEnabled() ) 
                        log.trace("CKX Cancelling key:"+key);

                } catch (Exception x) {
                    log.error("Error registering key for read:"+key,x);
                }
            }
        };
        receiver.addEvent(r);
    }

    private void cancelKey(final SelectionKey key) {
        if ( log.isTraceEnabled() ) 
            log.trace("Adding key for cancel event:"+key);

        ObjectReader reader = (ObjectReader)key.attachment();
        if ( reader != null ) {
            reader.setCancelled(true);
            reader.finish();
        }
        Runnable cx = new Runnable() {
            public void run() {
                if ( log.isTraceEnabled() ) 
                    log.trace("Cancelling key:"+key);

                NioReceiver.cancelledKey(key);
            }
        };
        receiver.addEvent(cx);
    }
    
    



    /**
     * send a reply-acknowledgement (6,2,3)
     * @param key
     * @param channel
     */
    protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
        
        try {
            ByteBuffer buf = ByteBuffer.wrap(command);
            int total = 0;
            while ( total < command.length ) {
                total += channel.write(buf);
            }
            if (log.isTraceEnabled()) {
                log.trace("ACK sent to " + channel.socket().getPort());
            }
        } catch ( IOException x ) {
            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
        }
    }

    public void setRxBufSize(int rxBufSize) {
        this.rxBufSize = rxBufSize;
    }

    public int getRxBufSize() {
        return rxBufSize;
    }
}
 
基本上就是获取SocketChannel并处理,和bio中处理socket逻辑差不多。
在这里tomcat是将自己封装的ObjectReader绑定到SelectionKey上,来分步处理这个有状态的ObjectReader,ObjectReader中维护了XByteBuffer,可以从维护的pool中获取XByteBuffer而不是直接创建。这一点和WorkThread中维护ByteBuffer一样,不用频繁的创建,减小内存开销。tomcat的处理过程和《Java NIO》这本书中的例子大体一致,只是tomcat的处理更加完善。
 
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics