カテゴリー : nio

このカテゴリーの登録数:1件 表示 : 1 - 1 / 1

2010/01/13

NIO にしたら爆速になった(was: なんちゃってmemcache互換サーバ)

ポスト @ 2:37:24 | , ,     

昨日の続き。
server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった

メイン

public class Server extends Thread {
    
    protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>();
    
    protected final ExecutorService acceptPool;
    
    protected final Cache cache;
    
    protected final int port;
    
    protected final int maxConnection;
    
    public Server(final int port, final int maxConnection){
        this(port, maxConnection, ByteSizeUnit.Mega.toLong(64));
    }
    public Server(final int port, final int maxConnection, final long maxMemory){
        this.port = port;
        this.maxConnection = maxConnection;
        this.cache = new LRUCache(maxMemory);
        this.acceptPool = Executors.newFixedThreadPool(maxConnection);
    }
    
    public static void main(String...args){
        Server s = new Server(12221, 32);
        s.start();
        try {
            s.join();
        } catch(InterruptedException e){
            e.printStackTrace(System.err);
        }
    }

    public void run(){
        ServerSocketChannel channel = null;
        try {
            channel = ServerSocketChannel.open();
            channel.configureBlocking(false);
            
            final ServerSocket serverSocket = channel.socket();
            serverSocket.setReuseAddress(true);
            serverSocket.bind(new InetSocketAddress(port));
            
            final Selector selector = Selector.open();
            try {
                channel.register(selector, SelectionKey.OP_ACCEPT, new AcceptAction(cache));
                
                while(0 < selector.select()){
                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    while(keys.hasNext()){
                        final SelectionKey key = keys.next();
                        keys.remove();
                        
                        if(!key.isValid()){
                            continue;
                        }
                        
                        Action action = (Action) key.attachment();
                        action.execute(key);
                    }
                }
            } finally {
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while(keys.hasNext()){
                    final SelectionKey key = keys.next();
                    key.channel().close();
                }
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(null != channel){
                try {
                    channel.close();
                } catch(IOException e){
                    e.printStackTrace();
                }
            }
        }
    }
}

新規接続を受け入れるAccept部分

public class AcceptAction implements Action {
    
    private final Cache cache;
    
    public AcceptAction(final Cache cache){
        this.cache = cache;
    }

    public void execute(SelectionKey selectionKey) {
        SocketChannel channel = null;
        try {
            channel = ((ServerSocketChannel) selectionKey.channel()).accept();
            channel.configureBlocking(false);

            ReadAction action = new ReadAction(cache, new ByteBufferReader(channel));
            channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

んで、読み込みと書き込みは分けた

public class ReadAction implements Action {
    
    protected final Cache cache;
    
    protected final Reader reader;
    
    protected final Handler handler;
    
    public ReadAction(final Cache cache, final Reader reader){
        this.cache = cache;
        this.reader = reader;
        this.handler = new Handler(cache, reader);
    }

    public void execute(SelectionKey selectionKey) {
        if(!selectionKey.isReadable()){
            return;
        }
        
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        try {
            if(!reader.readable()){
                channel.close();
                selectionKey.cancel();
                return;
            }
            String line = reader.readLine();
            if(line == null){
                channel.close();
                selectionKey.cancel();
                return;
            }
            Return r = handler.execute(line);

            WriteAction action = new WriteAction(r, this);
            channel.register(selectionKey.selector(), SelectionKey.OP_WRITE, action);
        } catch(IOException e){
            e.printStackTrace();
        }
    }

    private static class Handler implements CommandVisitor {
        private final Cache cache;
        private final Reader reader;
        private Handler(final Cache cache, final Reader reader){
            this.cache = cache;
            this.reader = reader;
        }
        public Return execute(String line){
            try {
                final StringReader r = new StringReader(line);
                final MemcacheParser parser = new MemcacheParser(r);
                
                Command command = parser.Command();
                return command.accept(this, null);
            } catch(ParseException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            } catch(Exception e){
                e.printStackTrace();
                return new Return(ResponseType.SERVER_ERROR, e.getMessage());
            }
        }
        :
        : // 省略
        :    
        public Return visit(RetrievalCommand command, Parameter parameter) {
            final List<String> keys = command.getKeys();
            final List<Return> returns = new ArrayList<Return>();
            for(final String key: keys){
                Entry entry = cache.get(key);
                if(null == entry){
                    returns.add(new Return(ResponseType.END));
                    continue;
                }
                returns.add(new Return(ResponseType.SEND_VALUE,
                    key, entry.getFlag(), entry.getLength(), entry.getValue()
                ));
            }
            return new Return(returns.toArray(new Return[returns.size()]));
        }
    
        public Return visit(SetCommand command, Parameter parameter) {
            try {
                final String nextLine = reader.readLine();
                if(cache.set(command.getKey(), nextLine, command.getFlags(), command.getExpTime())){
                    return new Return(ResponseType.STORED);
                }
                return new Return(ResponseType.NOT_STORED);
            } catch(IOException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            }
        }
    
        public Return visit(VersionCommand command, Parameter parameter) {
            return null;
        }
    }

}
public class WriteAction implements Action {
    
    private static final Charset ASCII = Charset.forName("US-ASCII"); 
    
    protected final Return ret;
    
    protected final ReadAction action;
    
    public WriteAction(final Return ret, ReadAction action){
        this.ret = ret;
        this.action = action;
    }

    public void execute(SelectionKey selectionKey) {
        if(!selectionKey.isWritable()){
            return;
        }
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        
        final CharBuffer buf = CharBuffer.wrap(ret.renderMessage());
        try {
            ByteBuffer bytes = ASCII.encode(buf);
            int capacity = bytes.capacity();
            int size = channel.write(bytes);
            if(size < capacity){
                int sum = size;
                while(sum < capacity){
                    if(size == 0){
                        return;
                    }
                    bytes.position(sum);
                    size = channel.write(bytes);
                    sum += size;
                }
            }
            try {
                channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
            } catch(ClosedChannelException e){
                e.printStackTrace();
            }
        } catch(IOException e){
            e.printStackTrace();
        }
    }
}

ということで、これをつかって簡単に比較すると

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 12221)
);
foreach($target as $t){
    $memcache = new Memcache;
    $memcache->connect($t['host'], $t['port']);

    $fail = 0;
    $elapsed = microtime(true);
    for($i = 0; $i < 1000; ++$i){
        if(false === $memcache->set('hoge', '1234')){
            echo 'ERROR!!', PHP_EOL;
        }
        if(false === $memcache->set('hoge', '123')){
            echo 'ERROR!!!', PHP_EOL;
        }
        $value = $memcache->get('hoge');
        if(false === $memcache->set('hoge', $value + 1)){
            echo 'ERRROR!!!!', PHP_EOL;
        }
        $result = $memcache->get('hoge');
        if($result != 124){
            $fail++;
        }
    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
    echo 'fail => ', $fail, PHP_EOL;
}
1)
target host => localhost port =>11211
elapsed: 0.66518497467
fail => 0
target host => localhost port =>12221
elapsed: 0.870754003525
fail => 0

2)
target host => localhost port =>11211
elapsed: 1.1857790947
fail => 0
target host => localhost port =>12221
elapsed: 0.868647098541
fail => 0

memcacheに匹敵してきた。

ってか、ByteBufferを読み書きするのは初めて書いたので、すごく手こずった。。
単純に \r\n までの一行を読みたいだけなのに。。
ということで、BufferedReaderのreadLine的なのを書いて色々とお茶を濁す。。

public class ByteBufferReader {
    
    protected final SocketChannel channel;
    
    protected final ByteBuffer buffer;
    
    public ByteBufferReader(final SocketChannel channel){
        this.channel = channel;
        this.buffer = ByteBuffer.allocateDirect(512);
    }
    
    public boolean readable() throws IOException {
        return buffer.hasRemaining();
    }
    
    public String readLine() throws IOException {
        final StringBuilder sb = new StringBuilder();
        readInto(sb);
        if(sb.length() < 1){
            return null;
        }
        return sb.toString();
    }
    
    private void readInto(final StringBuilder sb) throws IOException {
        while(true){
            int read = channel.read(buffer);
            if(read == 0){
                break;
            }
            if(read == -1){
                return;
            }
        }
        buffer.flip();
        while(more(sb));
        buffer.compact();
    }
    
    private boolean more(final StringBuilder sb) throws IOException {
        if(buffer.hasRemaining()){
            char ch = (char) buffer.get();
            switch(ch){
            case '\n':
                return false;
            case '\r':
                return true;
            default:
                sb.append(ch);
                return true;
            }
        }
        buffer.clear();
        return true;
    }

}

とりあえず、一段落
NIOは難しかった。