2010/01/11

JavaCC で memcache text protocol の BNF(と、なんちゃってmemcache互換サーバ)

ポスト @ 23:39:32 | , , ,     このエントリーを含むはてなブックマーク

ANTLRのやつがあったけど、JavaCCのがみつからなかったので、でっちあげた
via - http://harward.us/~nharward/antlr/memcached_protocol.g

できあがったのは、こんな感じ

SKIP: {
    " " | "\t" | "\r" | "\n"
}
TOKEN: {
    < NUMBER: ["1"-"9"] (["0"-"9"])* | "0" >
  | < FLAGS: < NUMBER > >
  | < TIME: < NUMBER >  >
  | < LENGTH: < NUMBER > >
  | < CREMENT_VALUE: < NUMBER > >
  | < CAS_UNIQUE: < NUMBER > >
}
TOKEN: {
  < SET_STATEMENT: "set" >
  | < ADD_STATEMENT: "add" >
  | < REPLACE_STATEMENT: "replace" >
  | < APPEND_STATEMENT: "append" >
  | < PREPEND_STATEMENT: "prepend" >
  | < CAS_STATEMENT: "cas" >
  | < STORAGE_STATEMENT:
        < SET_STATEMENT >
        | < ADD_STATEMENT >
        | < REPLACE_STATEMENT >
        | < APPEND_STATEMENT >
        | < PREPEND_STATEMENT >
    >
  | < STORAGE_COMMAND:
        (
          < STORAGE_STATEMENT > < KEY > < FLAGS > < TIME > < LENGTH >
          | < CAS_STATEMENT > < KEY > < FLAGS > < TIME > < LENGTH > < CAS_UNIQUE >
        )
        (< NOREPLY >)?
    >
}
TOKEN: {
  < RETRIEVAL_STATEMENT: "get" | "gets" >
  | < RETRIEVAL_COMMAND:
        < RETRIEVAL_STATEMENT > < KEY >
    >
}
TOKEN: {
  < DELETE_STATEMENT: "delete" >
  | < DELETE_COMMAND:
        < DELETE_STATEMENT > < KEY > (< TIME >)? (< NOREPLY >)?
    >
}
TOKEN: {
  < INCREMENT_STATEMENT: "incr" >
  | < INCREMENT_COMMAND:
        < INCREMENT_STATEMENT > < KEY > < CREMENT_VALUE > (< NOREPLY >)?
    >
}
TOKEN: {
  < DECREMENT_STATEMENT: "decr" >
  | < DECREMENT_COMMAND:
        < DECREMENT_STATEMENT > < KEY > < CREMENT_VALUE > (< NOREPLY >)?
    >
}
TOKEN: {
  < STATISTICS_STATEMENT: "STAT" >
  | < STATISTICS_OPTION: "items" | "slabs" | "sizes" >
  | < STATISTICS_COMMAND:
        < STATISTICS_STATEMENT > (< STATISTICS_OPTION >)?
    >
}
TOKEN: {
  < FLUSH_STATEMENT: "flush_all" >
  | < FLUSH_COMMAND:
        < FLUSH_STATEMENT > (< TIME >)? (< NOREPLY >)?
    >
}
TOKEN: {
  < VERSION_STATEMENT: "version" >
  | < VERSION_COMMAND:
        < VERSION_STATEMENT >
    >
}
TOKEN: {
    < NOREPLY: "noreply" >
}
// last match
TOKEN: {
    < KEY: (~[" ", "\r","\n"])+ >
}

少し、数値まわりのToken(TIME, LENGTHとか)が適当すぎるかな。

んで、これにテキトーなNodeをparseしてあげてみる

Command Command():
{
  Command command;
}
{
  (
    command = RetrievalCommand()
  | command = StorageCommand()
  | command = DeleteCommand()
  | command = VersionCommand()
  )
  {
    return command;
  }
}
StorageCommand StorageCommand():
{
  StorageCommand command;
  String key;
  Long flags = 0L;
  Long time = 0L;
  Long length = 0L;
  Boolean noreply = Boolean.FALSE;
}
{
  command = createStorageCommand()
  key = Key()
  flags = Flags()
  time = Time()
  length = Length()
  noreply = Noreply()
  {
    command.setNode(jjtThis);
    command.setKey(key);
    command.setFlags(flags);
    command.setExpTime(time);
    command.setLength(length);
    command.setNoreply(noreply);
    return command;
  }
}
StorageCommand createStorageCommand():
{}
{
  (
    < SET_STATEMENT >
    {
      return new SetCommand();
    }
    | < ADD_STATEMENT >
    {
      return new AddCommand();
    }
    | < REPLACE_STATEMENT >
    {
      return new ReplaceCommand();
    }
    | < APPEND_STATEMENT >
    {
      return new AppendCommand();
    }
    | < PREPEND_STATEMENT >
    {
      return new PrependCommand();
    }
  )
}

RetrievalCommand RetrievalCommand():
{
  RetrievalCommand command = new RetrievalCommand();
  String key;
}
{
  < RETRIEVAL_STATEMENT >
  (
    key = Key()
    {
      command.addKey(key);
    }
  )+
  {
    command.setNode(jjtThis);
    return command;
  }
}
DeleteCommand DeleteCommand():
{
  DeleteCommand command = new DeleteCommand();
  String key;
  Long time = 0L;
  Boolean noreply = Boolean.FALSE;
}
{
  < DELETE_STATEMENT >
  key = Key()
  time = Time()
  noreply = Noreply()
  {
    command.setNode(jjtThis);
    command.setKey(key);
    command.setExpTime(time);
    command.setNoreply(noreply);
    return command;
  }
}
VersionCommand VersionCommand():
{}
{
  < VERSION_STATEMENT >
  {
    VersionCommand command = new VersionCommand();
    command.setNode(jjtThis);
    return command;
  }
}

String Key():
{ Token key; }
{
  key = < KEY >
  {
    return key.image;
  }
}

Long Flags():
{ Token flags; }
{
  flags = < NUMBER >
  {
    return Long.valueOf(flags.image);
  }
}

Long Time():
{ Token time; }
{
  time = < NUMBER >
  {
    return Long.valueOf(time.image);
  }
}

Long Length():
{ Token length; }
{
  length = < NUMBER >
  {
    return Long.valueOf(length.image);
  }
}

Boolean Noreply():
{ Boolean noreply = Boolean.FALSE; }
{
  [< NOREPLY >{noreply = Boolean.TRUE;}]
  {
    return noreply;
  }
}

これに、適当なコードを投げてあげると

{
    StringReader reader = new StringReader("get hoge\r\n");
    MemcacheParser parser = new MemcacheParser(reader);
    try {
        parser.Command();
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
{
    StringReader reader = new StringReader("gets hoge foo\r\n");
    MemcacheParser parser = new MemcacheParser(reader);
    try {
        parser.Command();
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
{
    StringReader reader = new StringReader("set xyzkey 0 0 6\r\n");
    MemcacheParser parser = new MemcacheParser(reader);
    try {
        parser.Command();
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
Call:   Command
  Call:   RetrievalCommand
    Consumed token: <<RETRIEVAL_STATEMENT>: "get" at line 1 column 1>
    Call:   Key
      Consumed token: <<KEY>: "hoge" at line 1 column 5>
    Return: Key
  Return: RetrievalCommand
Return: Command
Call:   Command
  Call:   RetrievalCommand
    Consumed token: <<RETRIEVAL_STATEMENT>: "gets" at line 1 column 1>
    Call:   Key
      Consumed token: <<KEY>: "hoge" at line 1 column 6>
    Return: Key
    Call:   Key
      Consumed token: <<KEY>: "foo" at line 1 column 11>
    Return: Key
  Return: RetrievalCommand
Return: Command
Call:   Command
  Call:   StorageCommand
    Call:   createStorageCommand
      Consumed token: <"set" at line 1 column 1>
    Return: createStorageCommand
    Call:   Key
      Consumed token: <<KEY>: "xyzkey" at line 1 column 5>
    Return: Key
    Call:   Flags
      Consumed token: <<NUMBER>: "0" at line 1 column 12>
    Return: Flags
    Call:   Time
      Consumed token: <<NUMBER>: "0" at line 1 column 14>
    Return: Time
    Call:   Length
      Consumed token: <<NUMBER>: "6" at line 1 column 16>
    Return: Length
    Call:   Noreply
    Return: Noreply
  Return: StorageCommand
Return: Command

と、こんな感じになる。
<NUMBER>とか、ホント、マジメにtokenが書けてないですね。。

ここまでできたので、set と get しかない、memcached 互換ものをでっち上げてみた

public class Server extends Thread {
    
    protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>();
    
    protected final Cache<String, String> cache = new LRUCache<String, String>();

    protected final ExecutorService acceptPool;
    
    protected final int port;
    
    protected final int maxConnection;
    
    public Server(final int port, final int maxConnection){
        this.port = port;
        this.maxConnection = maxConnection;
        this.acceptPool = Executors.newFixedThreadPool(maxConnection);
    }
    
    public static void main(String...args){
        Server s = new Server(12221, 32);
        s.start();
        
        while(s.isAlive()){
            try {
                TimeUnit.MICROSECONDS.sleep(10);
            } catch(InterruptedException e){}
        }
    }

    public void run(){
        try {
            ServerSocketFactory factory = ServerSocketFactory.getDefault();
            ServerSocket socket = factory.createServerSocket(port, maxConnection);
            socket.setReuseAddress(true);
            
            while(!socket.isClosed()){
                final Socket accept = socket.accept();
                acceptPool.execute(new AcceptHandler(accept));
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    protected class AcceptHandler implements Runnable {
        private final Socket socket;
        public AcceptHandler(final Socket socket){
            this.socket = socket;
        }
        public void run(){
            try {
                final InputStream in = socket.getInputStream();
                final OutputStream out = socket.getOutputStream();
                
                final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                final DataOutputStream writer = new DataOutputStream(out);
                final CommandWorker worker = new CommandWorker(reader);
                while(!socket.isClosed()){
                    if(!worker.prepare()){
                        break;
                    }
                    Return r = worker.call();
                    writer.writeBytes(r.renderMessage());
                }
            } catch(IOException e){
                e.printStackTrace();
            } finally {
                try {
                    socket.close();
                } catch(IOException e){
                    // nop
                }
            }
        }
    }
    private class CommandWorker implements CommandVisitor {
        private final BufferedReader reader;
        private String currentLine;
        public CommandWorker(final BufferedReader reader) {
            this.reader = reader;
        }
        
        public boolean prepare(){
            try {
                String line = reader.readLine();
                if(null == line){
                    return false;
                }
                currentLine = line;
                return true;
            } catch(IOException e){
                return false;
            }
        }
        
        public Return call() {
            try {
                final StringReader r = new StringReader(currentLine);
                final MemcacheParser parser = new MemcacheParser(r);
                
                Command command = parser.Command();
                return command.accept(this, null);
            } catch(ParseException e){
                return new Return(ResponseType.ERROR);
            }
        }
        
        public Return visit(Command command, Parameter parameter) {
            return null;
        }
    
        public Return visit(AddCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(AppendCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(CasCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(DeleteCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(PrependCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(ReplaceCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(RetrievalCommand command, Parameter parameter) {
            String value = cache.get(command.getKeys().get(0));
            if(null == value){
                return new Return(ResponseType.END);
            }
            return new Return(ResponseType.SEND_VALUE,
                command.getKeys().get(0), 0, value.length(),
                value
            );
        }
    
        public Return visit(SetCommand command, Parameter parameter) {
            try {
                final String nextLine = reader.readLine();
                cache.put(command.getKey(), nextLine, command.getExpTime().longValue());
                return new Return(ResponseType.STORED);
            } catch(IOException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            }
        }
    
        public Return visit(VersionCommand command, Parameter parameter) {
            return null;
        }
    }
}

見事に、setとgetしか実装してません。しかもハンドリングは少し適当。

ということで、これと(java-lang)、memcached(c-lang)で比較してみた。

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 12221)
);
foreach($target as $t){
    $memcache = new Memcache;
    $memcache->connect($t['host'], $t['port']);

    $elapsed = microtime(true);
    for($i = 0; $i < 1000; ++$i){
        $memcache->set('hoge', '123');
        $memcache->set('hoge', '124');
        $memcache->get('hoge');

    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
}
target host => localhost port =>11211
elapsed: 0.420136213303
target host => localhost port =>12221
elapsed: 1.34848499298

なんつーか、「もうちょっとがんばりま賞」って感じで残念感があります。(約3倍遅い)
とりあえず、動きそうなので、他の実装も頑張る。


Trackback

No Trackbacks

Track from Your Website

http://blog.xole.net/trackback/tb.php?id=738

Comment

No Comments

Post Your Comment


*は入力必須です。E-Mailは公開されません。

1 + 2 =