/* This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2, or (at your option) * any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA * 02111-1307, USA. * * http://www.gnu.org/copyleft/gpl.html */ package org.mmocore.network; import java.io.IOException; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set; import javolution.util.FastList; /** * @author KenM
* Parts of design based on networkcore from WoodenGil */ public final class SelectorThread> extends Thread { private final Selector _selector; // Implementations private final IPacketHandler _packetHandler; private final IPacketHandler _udpPacketHandler; private final IMMOExecutor _executor; private final IClientFactory _clientFactory; private final IAcceptFilter _acceptFilter; private final UDPHeaderHandler _udpHeaderHandler; private final TCPHeaderHandler _tcpHeaderHandler; private volatile boolean _shutdown; // Pending Close private final FastList> _pendingClose = new FastList>(); // Configs private final int HELPER_BUFFER_SIZE; private final int HELPER_BUFFER_COUNT; private final int MAX_SEND_PER_PASS; private final int HEADER_SIZE = 2; private final ByteOrder BYTE_ORDER; private final long SLEEP_TIME; // MAIN BUFFERS private final ByteBuffer DIRECT_WRITE_BUFFER; private final ByteBuffer WRITE_BUFFER; private final ByteBuffer READ_BUFFER; // ByteBuffers General Purpose Pool private final FastList _bufferPool = new FastList(); public SelectorThread(SelectorConfig sc, IMMOExecutor executor, IClientFactory clientFactory, IAcceptFilter acceptFilter) throws IOException { HELPER_BUFFER_SIZE = sc.getHelperBufferSize(); HELPER_BUFFER_COUNT = sc.getHelperBufferCount(); MAX_SEND_PER_PASS = sc.getMaxSendPerPass(); BYTE_ORDER = sc.getByteOrder(); SLEEP_TIME = sc.getSelectorSleepTime(); DIRECT_WRITE_BUFFER = ByteBuffer.allocateDirect(sc.getWriteBufferSize()).order(BYTE_ORDER); WRITE_BUFFER = ByteBuffer.wrap(new byte[sc.getWriteBufferSize()]).order(BYTE_ORDER); READ_BUFFER = ByteBuffer.wrap(new byte[sc.getReadBufferSize()]).order(BYTE_ORDER); _udpHeaderHandler = sc.getUDPHeaderHandler(); _tcpHeaderHandler = sc.getTCPHeaderHandler(); initBufferPool(); _acceptFilter = acceptFilter; _packetHandler = sc.getTCPPacketHandler(); _udpPacketHandler = sc.getUDPPacketHandler(); _clientFactory = clientFactory; _executor = executor; setName("SelectorThread-" + getId()); _selector = Selector.open(); } private void initBufferPool() { for (int i = 0; i < HELPER_BUFFER_COUNT; i++) { getFreeBuffers().addLast(ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(BYTE_ORDER)); } } public void openServerSocket(InetAddress address, int tcpPort) throws IOException { ServerSocketChannel selectable = ServerSocketChannel.open(); selectable.configureBlocking(false); ServerSocket ss = selectable.socket(); if (address == null) { ss.bind(new InetSocketAddress(tcpPort)); } else { ss.bind(new InetSocketAddress(address, tcpPort)); } selectable.register(getSelector(), SelectionKey.OP_ACCEPT); } public void openDatagramSocket(InetAddress address, int udpPort) throws IOException { DatagramChannel selectable = DatagramChannel.open(); selectable.configureBlocking(false); DatagramSocket ss = selectable.socket(); if (address == null) { ss.bind(new InetSocketAddress(udpPort)); } else { ss.bind(new InetSocketAddress(address, udpPort)); } selectable.register(getSelector(), SelectionKey.OP_READ); } ByteBuffer getPooledBuffer() { if (getFreeBuffers().isEmpty()) return ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(BYTE_ORDER); else return getFreeBuffers().removeFirst(); } void recycleBuffer(ByteBuffer buf) { if (getFreeBuffers().size() < HELPER_BUFFER_COUNT) { buf.clear(); getFreeBuffers().addLast(buf); } } private FastList getFreeBuffers() { return _bufferPool; } @Override public void run() { // main loop for (;;) { // check for shutdown if (isShuttingDown()) { close(); break; } boolean hasPendingWrite = false; try { if (getSelector().selectNow() > 0) { Set keys = getSelector().selectedKeys(); for (SelectionKey key : keys) { switch (key.readyOps()) { case SelectionKey.OP_CONNECT: finishConnection(key); break; case SelectionKey.OP_ACCEPT: acceptConnection(key); break; case SelectionKey.OP_READ: readPacket(key); break; case SelectionKey.OP_WRITE: hasPendingWrite |= writePacket2(key); break; case SelectionKey.OP_READ | SelectionKey.OP_WRITE: hasPendingWrite |= writePacket2(key); // key might have been invalidated on writePacket if (key.isValid()) readPacket(key); break; } } keys.clear(); } } catch (Exception e) { e.printStackTrace(); } closePendingConnections(); try { if (!hasPendingWrite) Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { e.printStackTrace(); } closePendingConnections(); } } private void closePendingConnections() { // process pending close synchronized (getPendingClose()) { for (FastList.Node> n = getPendingClose().head(), end = getPendingClose().tail(); (n = n.getNext()) != end;) { final MMOConnection con = n.getValue(); synchronized (con) { if (con.getSendQueue2().isEmpty() && !con.hasPendingWriteBuffer() || con.closeTimeouted()) { FastList.Node> temp = n.getPrevious(); getPendingClose().delete(n); n = temp; closeConnectionImpl(con, false); } } } } } private void finishConnection(SelectionKey key) { try { ((SocketChannel)key.channel()).finishConnect(); } catch (IOException e) { @SuppressWarnings("unchecked") T con = (T)key.attachment(); closeConnectionImpl(con, true); } // key might have been invalidated on finishConnect() if (key.isValid()) { key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); } } private void acceptConnection(SelectionKey key) { SocketChannel sc; try { while ((sc = ((ServerSocketChannel)key.channel()).accept()) != null) { if (getAcceptFilter() == null || getAcceptFilter().accept(sc)) { sc.configureBlocking(false); SelectionKey clientKey = sc.register(getSelector(), SelectionKey.OP_READ); clientKey.attach(getClientFactory().create(this, new TCPSocket(sc.socket()), clientKey)); } else { sc.socket().close(); } } } catch (IOException e) { e.printStackTrace(); } } private void readPacket(SelectionKey key) { if (key.channel() instanceof SocketChannel) { readTCPPacket(key); } else { readUDPPacket(key); } } private void readTCPPacket(SelectionKey key) { @SuppressWarnings("unchecked") T con = (T)key.attachment(); ByteBuffer buf; if ((buf = con.getReadBuffer()) == null) { buf = READ_BUFFER; } int result = -2; // if we try to to do a read with no space in the buffer it will read 0 bytes // going into infinite loop if (buf.position() == buf.limit()) { // should never happen System.err.println("POS ANTES SC.READ(): " + buf.position() + " limit: " + buf.limit()); System.err.println("NOOBISH ERROR " + (buf == READ_BUFFER ? "READ_BUFFER" : "temp")); System.exit(0); } //System.out.println("POS ANTES SC.READ(): "+buf.position()+" limit: "+buf.limit()+" - buf: "+(buf == READ_BUFFER ? "READ_BUFFER" : "TEMP")); try { result = con.getReadableByteChannel().read(buf); } catch (IOException e) { //error handling goes bellow } //System.out.println("LEU: "+result+" pos: "+buf.position()); if (result > 0) { // TODO this should be done vefore even reading if (!con.isClosed()) { buf.flip(); // try to read as many packets as possible while (tryReadPacket2(key, con, buf)) { // ... } } else { if (buf == READ_BUFFER) { READ_BUFFER.clear(); } } } else if (result == 0) { // read interest but nothing to read? wtf? System.out.println("NOOBISH ERROR 2 THE MISSION"); //System.exit(0); } else if (result == -1) { closeConnectionImpl(con, false); } else { closeConnectionImpl(con, true); } } private void readUDPPacket(SelectionKey key) { int result = -2; ByteBuffer buf = READ_BUFFER; DatagramChannel dc = (DatagramChannel)key.channel(); if (!dc.isConnected()) { try { dc.configureBlocking(false); SocketAddress address = dc.receive(buf); buf.flip(); _udpHeaderHandler.onUDPConnection(this, dc, address, buf); } catch (IOException e) { } buf.clear(); } else { //System.err.println("UDP CONN "+buf.remaining()); try { result = dc.read(buf); } catch (IOException e) { //error handling goes bellow //System.err.println("UDP ERR: "+e.getMessage()); } //System.out.println("LEU: "+result+" pos: "+buf.position()); if (result > 0) { buf.flip(); // try to read as many packets as possible while (tryReadUDPPacket(key, buf)) { // ... } } else if (result == 0) { // read interest but nothing to read? wtf? System.out.println("CRITICAL ERROR ON SELECTOR"); System.exit(0); } else { // TODO kill and cleanup this UDP connection //System.err.println("UDP ERROR: "+result); } } } private boolean tryReadPacket2(SelectionKey key, T con, ByteBuffer buf) { //System.out.println("BUFF POS ANTES DE LER: "+buf.position()+" - REMAINING: "+buf.remaining()); if (buf.hasRemaining()) { TCPHeaderHandler handler = _tcpHeaderHandler; // parse all jeaders HeaderInfo ret; while (!handler.isChildHeaderHandler()) { handler.handleHeader(key, buf); handler = handler.getSubHeaderHandler(); } // last header ret = handler.handleHeader(key, buf); if (ret != null) { int result = buf.remaining(); // then check if header was processed if (ret.headerFinished()) { // get expected packet size int size = ret.getDataPending(); //System.out.println("IF: ("+size+" <= "+result+") => (size <= result)"); // do we got enough bytes for the packet? if (size <= result) { // avoid parsing dummy packets (packets without body) if (size > 0) { int pos = buf.position(); parseClientPacket(getPacketHandler(), buf, size, con); buf.position(pos + size); } // if we are done with this buffer if (!buf.hasRemaining()) { //System.out.println("BOA 2"); if (buf != READ_BUFFER) { con.setReadBuffer(null); recycleBuffer(buf); } else { READ_BUFFER.clear(); } return false; } else { // nothing } return true; } else { // we dont have enough bytes for the dataPacket so we need to read con.enableReadInterest(); //System.out.println("LIMIT "+buf.limit()); if (buf == READ_BUFFER) { buf.position(buf.position() - HEADER_SIZE); allocateReadBuffer(con); } else { buf.position(buf.position() - HEADER_SIZE); buf.compact(); } return false; } } else { // we dont have enough data for header so we need to read con.enableReadInterest(); if (buf == READ_BUFFER) { allocateReadBuffer(con); } else { buf.compact(); } return false; } } else { // null ret means critical error // kill the connection closeConnectionImpl(con, true); return false; } } else //con.disableReadInterest(); return false; //empty buffer } private boolean tryReadUDPPacket(SelectionKey key, ByteBuffer buf) { if (buf.hasRemaining()) { UDPHeaderHandler handler = _udpHeaderHandler; // parse all jeaders HeaderInfo ret; while (!handler.isChildHeaderHandler()) { handler.handleHeader(buf); handler = handler.getSubHeaderHandler(); } // last header ret = handler.handleHeader(buf); if (ret != null) { int result = buf.remaining(); // then check if header was processed if (ret.headerFinished()) { T con = ret.getClient(); // get expected packet size int size = ret.getDataPending(); //System.out.println("IF: ("+size+" <= "+result+") => (size <= result)"); // do we got enough bytes for the packet? if (size <= result) { if (ret.isMultiPacket()) { while (buf.hasRemaining()) { parseClientPacket(_udpPacketHandler, buf, buf.remaining(), con); } } else { // avoid parsing dummy packets (packets without body) if (size > 0) { int pos = buf.position(); parseClientPacket(_udpPacketHandler, buf, size, con); buf.position(pos + size); } } // if we are done with this buffer if (!buf.hasRemaining()) { //System.out.println("BOA 2"); if (buf != READ_BUFFER) { con.setReadBuffer(null); recycleBuffer(buf); } else { READ_BUFFER.clear(); } return false; } else { // nothing } return true; } else { // we dont have enough bytes for the dataPacket so we need to read con.enableReadInterest(); //System.out.println("LIMIT "+buf.limit()); if (buf == READ_BUFFER) { buf.position(buf.position() - HEADER_SIZE); allocateReadBuffer(con); } else { buf.position(buf.position() - HEADER_SIZE); buf.compact(); } return false; } } else { buf.clear(); // READ_BUFFER return false; } } else { buf.clear(); // READ_BUFFER return false; } } else { //con.disableReadInterest(); buf.clear(); return false; //empty buffer } } private void allocateReadBuffer(T con) { //System.out.println("con: "+Integer.toHexString(con.hashCode())); //Util.printHexDump(READ_BUFFER); con.setReadBuffer(getPooledBuffer().put(READ_BUFFER)); READ_BUFFER.clear(); } private void parseClientPacket(IPacketHandler handler, ByteBuffer buf, int dataSize, T client) { int pos = buf.position(); boolean ret = client.decrypt(buf, dataSize); //buf.position(pos); //can be annoying for some decrypt impl decrypt should place the pos at the right place itself //System.out.println("pCP -> BUF: POS: "+buf.position()+" - LIMIT: "+buf.limit()+" == Packet: SIZE: "+dataSize); if (buf.hasRemaining() && ret) { // apply limit int limit = buf.limit(); buf.limit(pos + dataSize); //System.out.println("pCP2 -> BUF: POS: "+buf.position()+" - LIMIT: "+buf.limit()+" == Packet: SIZE: "+size); ReceivablePacket cp = handler.handlePacket(buf, client); if (cp != null) { cp.setByteBuffer(buf); cp.setClient(client); if (cp.read()) { getExecutor().execute(cp); } } buf.limit(limit); } } private boolean writePacket2(SelectionKey key) { @SuppressWarnings("unchecked") T con = (T)key.attachment(); prepareWriteBuffer2(con); DIRECT_WRITE_BUFFER.flip(); int size = DIRECT_WRITE_BUFFER.remaining(); //System.err.println("WRITE SIZE: "+size); int result = -1; try { result = con.getWritableChannel().write(DIRECT_WRITE_BUFFER); } catch (IOException e) { // error handling goes on the if bellow //System.err.println("IOError: " + e.getMessage()); } // check if no error happened if (result >= 0) { // check if we writed everything if (result == size) { // complete write //System.err.println("FULL WRITE"); //System.err.flush(); // if there was a a pending write then we need to finish the operation /*if (con.getWriterMark() > 0) { con.finishPrepending(con.getWriterMark()); }*/ synchronized (con) { if (con.getSendQueue2().isEmpty() && !con.hasPendingWriteBuffer()) { con.disableWriteInterest(); return false; } else return true; } } else //incomplete write { con.createWriteBuffer(DIRECT_WRITE_BUFFER); return false; //System.err.println("DEBUG: INCOMPLETE WRITE - write size: "+size); //System.err.flush(); } //if (result == 0) //{ //System.err.println("DEBUG: write result: 0 - write size: "+size+" - DWB rem: "+DIRECT_WRITE_BUFFER.remaining()); //System.err.flush(); //} } else { //System.err.println("IOError: "+result); //System.err.flush(); closeConnectionImpl(con, true); return false; } } private void prepareWriteBuffer2(T con) { DIRECT_WRITE_BUFFER.clear(); // if theres pending content add it if (con.hasPendingWriteBuffer()) { con.movePendingWriteBufferTo(DIRECT_WRITE_BUFFER); //System.err.println("ADDED PENDING TO DIRECT "+DIRECT_WRITE_BUFFER.position()); } if (DIRECT_WRITE_BUFFER.remaining() > 1 && !con.hasPendingWriteBuffer()) { synchronized (con) { final FastList> sendQueue = con.getSendQueue2(); for (int i = 0; !sendQueue.isEmpty() && i < MAX_SEND_PER_PASS; i++) { // put into WriteBuffer putPacketIntoWriteBuffer(con, sendQueue.removeFirst()); WRITE_BUFFER.flip(); //System.err.println("WB SIZE: "+WRITE_BUFFER.limit()); if (DIRECT_WRITE_BUFFER.remaining() >= WRITE_BUFFER.limit()) { /*if (i == 0) { // mark begining of new data from previous pending data con.setWriterMark(DIRECT_WRITE_BUFFER.position()); }*/ DIRECT_WRITE_BUFFER.put(WRITE_BUFFER); } else { // there is no more space in the direct buffer //con.addWriteBuffer(getPooledBuffer().put(WRITE_BUFFER)); con.createWriteBuffer(WRITE_BUFFER); break; } } } } } private final void putPacketIntoWriteBuffer(T client, SendablePacket sp) { WRITE_BUFFER.clear(); // set the write buffer sp.setByteBuffer(WRITE_BUFFER); // reserve space for the size int headerPos = sp.getByteBuffer().position(); int headerSize = sp.getHeaderSize(); sp.getByteBuffer().position(headerPos + headerSize); // write content to buffer sp.write(client); // size (incl header) int dataSize = sp.getByteBuffer().position() - headerPos - headerSize; sp.getByteBuffer().position(headerPos + headerSize); client.encrypt(sp.getByteBuffer(), dataSize); // recalculate size after encryption dataSize = sp.getByteBuffer().position() - headerPos - headerSize; // prepend header //prependHeader(headerPos, size); sp.getByteBuffer().position(headerPos); sp.writeHeader(dataSize); sp.getByteBuffer().position(headerPos + headerSize + dataSize); } private Selector getSelector() { return _selector; } private IMMOExecutor getExecutor() { return _executor; } private IPacketHandler getPacketHandler() { return _packetHandler; } private IClientFactory getClientFactory() { return _clientFactory; } private IAcceptFilter getAcceptFilter() { return _acceptFilter; } void closeConnection(MMOConnection con) { synchronized (getPendingClose()) { getPendingClose().addLast(con); } } private void closeConnectionImpl(MMOConnection con, boolean forced) { try { if (forced) con.onForcedDisconnection(); } catch (RuntimeException e) { e.printStackTrace(); } try { // notify connection con.onDisconnection(); } catch (RuntimeException e) { e.printStackTrace(); } finally { try { // close socket and the SocketChannel con.getSocket().close(); } catch (IOException e) { // ignore, we are closing anyway } finally { con.releaseBuffers(); // clear attachment con.getSelectionKey().attach(null); // cancel key con.getSelectionKey().cancel(); } } } private FastList> getPendingClose() { return _pendingClose; } public void shutdown() throws InterruptedException { _shutdown = true; join(); } private boolean isShuttingDown() { return _shutdown; } private void close() { for (SelectionKey key : getSelector().keys()) { try { key.channel().close(); } catch (IOException e) { // ignore } } try { getSelector().close(); } catch (IOException e) { // Ignore } } }