このカテゴリーの登録数:1件 表示 : 1 - 1 / 1
昨日の続き。 server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった
メイン
public class Server extends Thread { protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>(); protected final ExecutorService acceptPool; protected final Cache cache; protected final int port; protected final int maxConnection; public Server(final int port, final int maxConnection){ this(port, maxConnection, ByteSizeUnit.Mega.toLong(64)); } public Server(final int port, final int maxConnection, final long maxMemory){ this.port = port; this.maxConnection = maxConnection; this.cache = new LRUCache(maxMemory); this.acceptPool = Executors.newFixedThreadPool(maxConnection); } public static void main(String...args){ Server s = new Server(12221, 32); s.start(); try { s.join(); } catch(InterruptedException e){ e.printStackTrace(System.err); } } public void run(){ ServerSocketChannel channel = null; try { channel = ServerSocketChannel.open(); channel.configureBlocking(false); final ServerSocket serverSocket = channel.socket(); serverSocket.setReuseAddress(true); serverSocket.bind(new InetSocketAddress(port)); final Selector selector = Selector.open(); try { channel.register(selector, SelectionKey.OP_ACCEPT, new AcceptAction(cache)); while(0 < selector.select()){ Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ final SelectionKey key = keys.next(); keys.remove(); if(!key.isValid()){ continue; } Action action = (Action) key.attachment(); action.execute(key); } } } finally { Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ final SelectionKey key = keys.next(); key.channel().close(); } } } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if(null != channel){ try { channel.close(); } catch(IOException e){ e.printStackTrace(); } } } } }
新規接続を受け入れるAccept部分
public class AcceptAction implements Action { private final Cache cache; public AcceptAction(final Cache cache){ this.cache = cache; } public void execute(SelectionKey selectionKey) { SocketChannel channel = null; try { channel = ((ServerSocketChannel) selectionKey.channel()).accept(); channel.configureBlocking(false); ReadAction action = new ReadAction(cache, new ByteBufferReader(channel)); channel.register(selectionKey.selector(), SelectionKey.OP_READ, action); } catch (IOException e) { e.printStackTrace(); } } }
んで、読み込みと書き込みは分けた
public class ReadAction implements Action { protected final Cache cache; protected final Reader reader; protected final Handler handler; public ReadAction(final Cache cache, final Reader reader){ this.cache = cache; this.reader = reader; this.handler = new Handler(cache, reader); } public void execute(SelectionKey selectionKey) { if(!selectionKey.isReadable()){ return; } SocketChannel channel = (SocketChannel) selectionKey.channel(); try { if(!reader.readable()){ channel.close(); selectionKey.cancel(); return; } String line = reader.readLine(); if(line == null){ channel.close(); selectionKey.cancel(); return; } Return r = handler.execute(line); WriteAction action = new WriteAction(r, this); channel.register(selectionKey.selector(), SelectionKey.OP_WRITE, action); } catch(IOException e){ e.printStackTrace(); } } private static class Handler implements CommandVisitor { private final Cache cache; private final Reader reader; private Handler(final Cache cache, final Reader reader){ this.cache = cache; this.reader = reader; } public Return execute(String line){ try { final StringReader r = new StringReader(line); final MemcacheParser parser = new MemcacheParser(r); Command command = parser.Command(); return command.accept(this, null); } catch(ParseException e){ e.printStackTrace(); return new Return(ResponseType.ERROR); } catch(Exception e){ e.printStackTrace(); return new Return(ResponseType.SERVER_ERROR, e.getMessage()); } } : : // 省略 : public Return visit(RetrievalCommand command, Parameter parameter) { final List<String> keys = command.getKeys(); final List<Return> returns = new ArrayList<Return>(); for(final String key: keys){ Entry entry = cache.get(key); if(null == entry){ returns.add(new Return(ResponseType.END)); continue; } returns.add(new Return(ResponseType.SEND_VALUE, key, entry.getFlag(), entry.getLength(), entry.getValue() )); } return new Return(returns.toArray(new Return[returns.size()])); } public Return visit(SetCommand command, Parameter parameter) { try { final String nextLine = reader.readLine(); if(cache.set(command.getKey(), nextLine, command.getFlags(), command.getExpTime())){ return new Return(ResponseType.STORED); } return new Return(ResponseType.NOT_STORED); } catch(IOException e){ e.printStackTrace(); return new Return(ResponseType.ERROR); } } public Return visit(VersionCommand command, Parameter parameter) { return null; } } }
public class WriteAction implements Action { private static final Charset ASCII = Charset.forName("US-ASCII"); protected final Return ret; protected final ReadAction action; public WriteAction(final Return ret, ReadAction action){ this.ret = ret; this.action = action; } public void execute(SelectionKey selectionKey) { if(!selectionKey.isWritable()){ return; } SocketChannel channel = (SocketChannel) selectionKey.channel(); final CharBuffer buf = CharBuffer.wrap(ret.renderMessage()); try { ByteBuffer bytes = ASCII.encode(buf); int capacity = bytes.capacity(); int size = channel.write(bytes); if(size < capacity){ int sum = size; while(sum < capacity){ if(size == 0){ return; } bytes.position(sum); size = channel.write(bytes); sum += size; } } try { channel.register(selectionKey.selector(), SelectionKey.OP_READ, action); } catch(ClosedChannelException e){ e.printStackTrace(); } } catch(IOException e){ e.printStackTrace(); } } }
ということで、これをつかって簡単に比較すると
$target = array( array('host' => 'localhost', 'port' => 11211), array('host' => 'localhost', 'port' => 12221) ); foreach($target as $t){ $memcache = new Memcache; $memcache->connect($t['host'], $t['port']); $fail = 0; $elapsed = microtime(true); for($i = 0; $i < 1000; ++$i){ if(false === $memcache->set('hoge', '1234')){ echo 'ERROR!!', PHP_EOL; } if(false === $memcache->set('hoge', '123')){ echo 'ERROR!!!', PHP_EOL; } $value = $memcache->get('hoge'); if(false === $memcache->set('hoge', $value + 1)){ echo 'ERRROR!!!!', PHP_EOL; } $result = $memcache->get('hoge'); if($result != 124){ $fail++; } } echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL; echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL; echo 'fail => ', $fail, PHP_EOL; }
1) target host => localhost port =>11211 elapsed: 0.66518497467 fail => 0 target host => localhost port =>12221 elapsed: 0.870754003525 fail => 0 2) target host => localhost port =>11211 elapsed: 1.1857790947 fail => 0 target host => localhost port =>12221 elapsed: 0.868647098541 fail => 0
memcacheに匹敵してきた。
ってか、ByteBufferを読み書きするのは初めて書いたので、すごく手こずった。。 単純に \r\n までの一行を読みたいだけなのに。。 ということで、BufferedReaderのreadLine的なのを書いて色々とお茶を濁す。。
public class ByteBufferReader { protected final SocketChannel channel; protected final ByteBuffer buffer; public ByteBufferReader(final SocketChannel channel){ this.channel = channel; this.buffer = ByteBuffer.allocateDirect(512); } public boolean readable() throws IOException { return buffer.hasRemaining(); } public String readLine() throws IOException { final StringBuilder sb = new StringBuilder(); readInto(sb); if(sb.length() < 1){ return null; } return sb.toString(); } private void readInto(final StringBuilder sb) throws IOException { while(true){ int read = channel.read(buffer); if(read == 0){ break; } if(read == -1){ return; } } buffer.flip(); while(more(sb)); buffer.compact(); } private boolean more(final StringBuilder sb) throws IOException { if(buffer.hasRemaining()){ char ch = (char) buffer.get(); switch(ch){ case '\n': return false; case '\r': return true; default: sb.append(ch); return true; } } buffer.clear(); return true; } }
とりあえず、一段落 NIOは難しかった。
詳細検索
/** * @author hata */
昨日の続き。
server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった
メイン
新規接続を受け入れるAccept部分
んで、読み込みと書き込みは分けた
ということで、これをつかって簡単に比較すると
memcacheに匹敵してきた。
ってか、ByteBufferを読み書きするのは初めて書いたので、すごく手こずった。。
単純に \r\n までの一行を読みたいだけなのに。。
ということで、BufferedReaderのreadLine的なのを書いて色々とお茶を濁す。。
とりあえず、一段落
NIOは難しかった。