package com.ronsoft.books.nio.channels; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.channels.SelectionKey; import java.util.List; import java.util.LinkedList; import java.io.IOException; /** * Specialization of the SelectSockets class which uses a thread pool * to service channels. The thread pool is an ad-hoc implementation * quicky lashed togther in a few hours for demonstration purposes. * It's definitely not production quality. * * Created May 2002 * @author Ron Hitchens (ron@ronsoft.com) * @version $Id: SelectSocketsThreadPool.java,v 1.5 2002/05/20 07:24:29 ron Exp $ */ public class SelectSocketsThreadPool extends SelectSockets { private static final int MAX_THREADS = 5; private ThreadPool pool = new ThreadPool (MAX_THREADS); // ------------------------------------------------------------- public static void main (String [] argv) throws Exception { new SelectSocketsThreadPool().go (argv); } // ------------------------------------------------------------- /** * Sample data handler method for a channel with data ready to read. * This method is invoked from the go() method in the parent class. * This handler delegates to a worker thread in a thread pool to * service the channel, then returns immediately. * @param key A SelectionKey object representing a channel * determined by the selector to be ready for reading. If the * channel returns an EOF condition, it is closed here, which * automatically invalidates the associated key. The selector * will then de-register the channel on the next select call. */ protected void readDataFromSocket (SelectionKey key) throws Exception { WorkerThread worker = pool.getWorker(); if (worker == null) { // No threads available, do nothing, the selection // loop will keep calling this method until a // thread becomes available. This design could // be improved. return; } // invoking this wakes up the worker thread then returns worker.serviceChannel (key); } // --------------------------------------------------------------- /** * A very simple thread pool class. The pool size is set at * construction time and remains fixed. Threads are cycled * through a FIFO idle queue. */ private class ThreadPool { List idle = new LinkedList(); ThreadPool (int poolSize) { // fill up the pool with worker threads for (int i = 0; i < poolSize; i++) { WorkerThread thread = new WorkerThread (this); // set thread name for debugging, start it thread.setName ("Worker" + (i + 1)); thread.start(); idle.add (thread); } } /** * Find an idle worker thread, if any. Could return null. */ WorkerThread getWorker() { WorkerThread worker = null; synchronized (idle) { if (idle.size() > 0) { worker = (WorkerThread) idle.remove (0); } } return (worker); } /** * Called by the worker thread to return itself to the * idle pool. */ void returnWorker (WorkerThread worker) { synchronized (idle) { idle.add (worker); } } } /** * A worker thread class which can drain channels and echo-back * the input. Each instance is constructed with a reference to * the owning thread pool object. When started, the thread loops * forever waiting to be awakened to service the channel associated * with a SelectionKey object. * The worker is tasked by calling its serviceChannel() method * with a SelectionKey object. The serviceChannel() method stores * the key reference in the thread object then calls notify() * to wake it up. When the channel has been drained, the worker * thread returns itself to its parent pool. */ private class WorkerThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate (1024); private ThreadPool pool; private SelectionKey key; WorkerThread (ThreadPool pool) { this.pool = pool; } // loop forever waiting for work to do public synchronized void run() { System.out.println (this.getName() + " is ready"); while (true) { try { // sleep and release object lock this.wait(); } catch (InterruptedException e) { e.printStackTrace(); // clear interrupt status this.interrupted(); } if (key == null) { continue; // just in case } System.out.println (this.getName() + " has been awakened"); try { drainChannel (key); } catch (Exception e) { System.out.println ("Caught '" + e + "' closing channel"); // close channel and nudge selector try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } key.selector().wakeup(); } key = null; // done, ready for more, return to pool this.pool.returnWorker (this); } } /** * Called to initiate a unit of work by this worker thread * on the provided SelectionKey object. This method is * synchronized, as is the run() method, so only one key * can be serviced at a given time. * Before waking the worker thread, and before returning * to the main selection loop, this key's interest set is * updated to remove OP_READ. This will cause the selector * to ignore read-readiness for this channel while the * worker thread is servicing it. */ synchronized void serviceChannel (SelectionKey key) { this.key = key; key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); this.notify(); // awaken the thread } /** * The actual code which drains the channel associated with * the given key. This method assumes the key has been * modified prior to invocation to turn off selection * interest in OP_READ. When this method completes it * re-enables OP_READ and calls wakeup() on the selector * so the selector will resume watching this channel. */ void drainChannel (SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; buffer.clear(); // make buffer empty // loop while data available, channel is non-blocking while ((count = channel.read (buffer)) > 0) { buffer.flip(); // make buffer readable // send the data, may not go all at once while (buffer.hasRemaining()) { channel.write (buffer); } // WARNING: the above loop is evil. // See comments in superclass. buffer.clear(); // make buffer empty } if (count < 0) { // close channel on EOF, invalidates the key channel.close(); return; } // resume interest in OP_READ key.interestOps (key.interestOps() | SelectionKey.OP_READ); // cycle the selector so this key is active again key.selector().wakeup(); } } }