キーワード : memcache

8 件 表示 : 1 - 8 / 8

2010/01/20

PHPで200行で作る memcached 互換サーバ

ポスト @ 22:00:52 | ,     

まさに誰得。

class MemcachedServer {
    protected $command;
    public function __construct(MemcachedCommand $command){
        $this->command = $command;
    }
    public function start(){
        $this->run();
    }
    public function run(){
        $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        if(false === $socket){
            $code = socket_last_error();
            $msg = socket_strerror($code);
            throw new Exception(sprintf('socket_create was error(%s):%s', $code, $msg));
        }
        socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);

        $binded = @socket_bind($socket, 0, 11222);
        if(false === $binded){
            $code = socket_last_error();
            $msg = socket_strerror($code);
            throw new Exception(sprintf('socket_bind was error(%s):%s', $code, $msg));
        }
        $listend = @socket_listen($socket);
        if(false === $listend){
            $code = socket_last_error();
            $msg = socket_strerror($code);
            throw new Exception(sprintf('socket_listen was error(%s):%s', $code, $msg));
        }
        //socket_set_nonblock($socket);
       
        echo 'server start on tcp://0.0.0.0:11222', PHP_EOL;
        $read = array($socket);
        $write = null;
        $except = null;
        while(true){
            $accept = @socket_accept($socket);
            if(false === $accept){
                continue;
            }
            $handler = new MemcachedAcceptHandler($accept, $this->command);
            $handler->init();
            $handler->execute();
            $handler->destroy();
        }
    }
}

interface StreamReadWrite {
    const DELIMITER = "\r\n";
    public function readLine();
    public function writeLine($str);
}

class MemcachedAcceptHandler implements StreamReadWrite {
    protected $socket;
    protected $command;
    protected $connected = true;
    public function __construct($socket, MemcachedCommand $command){
        $this->socket = $socket;
        $this->command = $command;
    }
    public function __destruct(){
        if(null !== $this->socket){
            @socket_close($this->socket);
            unset($this->socket);
        }
    }
    public function init(){
        echo 'new connection', PHP_EOL;
        //socket_set_nonblock($this->socket);
    }
    public function execute(){
        while($this->connected){
            $read = array($this->socket);
            $write = array();
            $except = array();
            $select = @socket_select($read, $write, $except, 1);
            if(false === $select){
                throw new RuntimeException('socket_select');
            }
            if($select < 1){
                continue;
            }

            $line = $this->readLine();
            if(null === $line){
                continue;
            }
            // get hoge => get
            $mode = substr($line, 0, 3);
            // get hoge foo => array(hoge, foo)
            $args = explode(' ', substr($line, 4));
            $this->command->call($this, $mode, $args);
        }
    }
    public function destroy(){
        echo 'close connection', PHP_EOL;
        @socket_shutdown($this->socket);
    }
    public function readLine(){
        $line = '';
        while(true){
            $buf = @socket_read($this->socket, 1);
            if(false === $buf || '' === $buf){
                // FIXME!!!
                $this->connected = false;
                return null;
            }
            $line .= $buf;
            if(self::DELIMITER == substr($line, -2)){
                $line = substr($line, 0, -2);
                break;
            }
        }
        if(empty($line)){
            return null;
        }
        if(preg_match('/^\s+$/', $line)){
            return null;
        }
        return $line;
    }
    public function writeLine($str){
        return @socket_write($this->socket, $str . self::DELIMITER);
    }
}

interface MemcachedCommand {
    public function call(StreamReadWrite $reader, $mode, array $args);
}

abstract class AbstractMemcachedCommand implements MemcachedCommand {
    protected $reflector;
    public function __construct(){
        $this->reflector = new ReflectionObject($this);
    }
    protected static function concat(array $a, array $b){
        array_splice($a, count($a), 0, $b);
        return $a;
    }
    public final function call(StreamReadWrite $rw, $mode, array $args){
        if(!$this->reflector->hasMethod($mode)){
            return $this->error($rw);
        }
        echo 'command => ', $mode, ' ', join(' ', $args), PHP_EOL;
        return call_user_func_array(array($this, $mode), self::concat(array($rw), $args));
    }
    protected function error($rw){
        $rw->writeLine('ERROR');
    }
    protected abstract function get(StreamReadWrite $rw, $keys);
    protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length);
    protected abstract function delete(StreamReadWrite $rw, $key, $expire = 0);
}
class StorageMemcacheCommand extends AbstractMemcachedCommand {
    protected $cache = array();
    protected function get(StreamReadWrite $rw, $keys){
        $args = func_get_args();
        array_shift($args);
        foreach($args as $key){
            if(!isset($this->cache[$key])){
                continue;
            }
            $value = $this->cache[$key];
            if($value->expire < time()){
                continue;
            }
            $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->length));
            $rw->writeLine($value->value);
        }
        $rw->writeLine('END');
    }
    protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){
        $value = new stdClass;
        $value->flag = $flag;
        $value->expire = time() + $expire;
        $value->length = $length;
        $value->value = $rw->readLine();
        $this->cache[$key] = $value;
        $rw->writeLine('STORED');
    }
    protected function delete(StreamReadWrite $rw, $key, $expire = 0){
        if(isset($this->cache[$key])){
            $this->cache[$key]->expire = $expire;
        }
    }
}

$server = new MemcachedServer(new StorageMemcacheCommand);
$server->start();

とりあえず、set/get/deleteだけ。threadとか色々ないので、実際には削除されない。。
他にも構文解析とかしてない(しなくていいくらい memcache はシンプルなんだけど)ので上手いことハンドリングしないといけない。。
同時アクセスに難あり。。

(中略)

などなど、色々あるので、開発用にしか向いてない。というか、よっぽどストイックにPHPを愛している人じゃないと向いていないかも。
まぁ最近は色々memcached互換系のがでてきたので、クライアント作成時のdebug用途向けかも。

今回は、PHPのsocket 関数をゴリゴリ使っています。
blocking modeとかはちょっと動きが不安定(?)なので、あまり性能はでない
c-lang版と比較を、この前のスクリプトを使ってみた

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 11222)
);
foreach($target as $t){
    $memcache
 = new Memcache;
    $memcache
->connect($t['host'], $t['port']);

    $fail = 0;
    $fails = array();
    $elapsed = microtime(true);
    $count = 500;
    for($i = 0; $i < $count; ++$i){
        if(false === $memcache
->set('hoge', 1234, 0, 10)){
            echo 'ERROR!!', PHP_EOL;
        }
        if(false === $memcache
->set('hoge', 123, 0, 10)){
            echo 'ERROR!!!', PHP_EOL;
        }
        $value = (int) $memcache
->get('hoge');
        if(false === $memcache
->set('hoge', ((int)$value + 1), 0, 10)){
            echo 'ERROR!!!!', PHP_EOL;
        }
        $result = (int)$memcache
->get('hoge');
        if($result != 124){
            $fails[] = $result;
            $fail++;
        }
    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
    echo 'fail => ', $fail, PHP_EOL;
}
target host => localhost port =>11211
elapsed: 0.634283065796
fail => 0

target host => localhost port =>11222
elapsed: 1.12030887604
fail => 0

とまあ、こんな感じ。(そこそこ?)
ちなみに、stream_socketを使って書いてみたものも置いておく。fwriteまわりでハマったので動かないと思うけど。

// 動かない><
class MemcachedServer {
    protected $command;
    public function __construct(MemcachedCommand $command){
        $this->command = $command;
    }
    public function start(){
        $this->run();
    }
    public function run(){
        $socket = stream_socket_server('tcp://0.0.0.0:11222', $code, $msg, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
        if(false === $socket){
            throw new RuntimeException(sprintf('stream_socket_server was error(%s):%s', $code, $msg));
        }
        $nonBlocking = stream_set_blocking($socket, 0);
        if(false === $nonBlocking){
            throw new RuntimeException('stream_set_blocking: set blocking mode error');
        }

        echo 'server start on tcp://0.0.0.0:11222', PHP_EOL;
        $read = array($socket);
        $write = null;
        $except = null;
        while(true){
            $accept = @stream_socket_accept($socket, 1);
            if(false === $accept){
                continue;
            }
            $handler = new MemcachedAcceptHandler($accept, $this->command);
            $handler->init();
            $handler->execute();
            $handler->destroy();
        }
    }
}

interface StreamReadWrite {
    const DELIMITER = "\r\n";
    public function readLine();
    public function writeLine($str);
}

class MemcachedAcceptHandler implements StreamReadWrite {
    protected $socket;
    protected $command;
    public function __construct($socket, MemcachedCommand $command){
        $this->socket = $socket;
        $this->command = $command;
    }
    public function __destruct(){
        if(null !== $this->socket){
            fclose($this->socket);
            unset($this->socket);
        }
    }
    public function init(){
        echo 'new connection', PHP_EOL;
        stream_set_blocking($this->socket, true);
        stream_set_timeout($this->socket, 60);
        // disable writebuffer
        stream_set_write_buffer($this->socket, 0);
    }
    protected function feof(){
        $recv = stream_socket_recvfrom($this->socket, 1, STREAM_PEEK);
        return strlen($recv) < 1;
    }
    public function execute(){
        while(true){
            $read = array($this->socket);
            $write = array();
            $except = array();
            $select = @stream_select($read, $write, $except, 1);
            if(false === $select){
                throw new RuntimeException('stream_select');
            }
            if($select < 1){
                continue;
            }

            $line = $this->readLine();
            echo 'line => ', $line, PHP_EOL;
            if(null === $line){
                return;
            }
            // get hoge => get
            $mode = substr($line, 0, 3);
            // get hoge foo => array(hoge, foo)
            $args = explode(' ', substr($line, 4));
            $this->command->call($this, $mode, $args);
        }
    }
    public function destroy(){
        echo 'close connection', PHP_EOL;
        // socket_close($socket)
        stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
    }
    public function readLine(){
        // 1048576 = 1024 * 1024
        //socket_read($socket, 1048576, PHP_NORMAL_READ
        $line = stream_get_line($this->socket, 2048, self::DELIMITER);
        if(empty($line)){
            return null;
        }
        if(preg_match('/^\s+$/', $line)){
            return null;
        }
        return $line;
    }
    public function writeLine($str){
        fputs($this->socket, $str . "\n\0");
    }
}

interface MemcachedCommand {
    public function call(StreamReadWrite $reader, $mode, array $args);
}

abstract class AbstractMemcachedCommand implements MemcachedCommand {
    protected $reflector;
    public function __construct(){
        $this->reflector = new ReflectionObject($this);
    }
    protected static function concat(array $a, array $b){
        array_splice($a, count($a), 0, $b);
        return $a;
    }
    public final function call(StreamReadWrite $rw, $mode, array $args){
        if(!$this->reflector->hasMethod($mode)){
            return $this->error($rw);
        }
        return call_user_func_array(array($this, $mode), self::concat(array($rw), $args));
    }
    protected function error($rw){
        $rw->writeLine('ERROR');
    }
    protected abstract function get(StreamReadWrite $rw, $keys);
    protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length);
}
class StorageMemcacheCommand extends AbstractMemcachedCommand {
    protected $cache = array();
    protected function get(StreamReadWrite $rw, $keys){
        $args = func_get_args();
        array_shift($args);
        foreach($args as $key){
            if(!isset($this->cache[$key])){
                continue;
            }
            $value = $this->cache[$key];
            $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->expire));
            $rw->writeLine($value->value);
        }
        $rw->writeLine('END');
    }
    protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){
        $value = new stdClass;
        $value->flag = $flag;
        $value->expire = $expire;
        $value->length = $length;
        $value->value = $rw->readLine();
        $this->cache[$key] = $value;
        $rw->writeLine('STORED');
    }

}

$server = new MemcachedServer(new StorageMemcacheCommand);
$server->start();

memcacheの cache の部分を java で(その2)

ポスト @ 0:21:29 , 修正 @ 2010/01/20 0:32:43 | ,     

前回の続き。

前回の結果、とりあえず、DelayQueueによって期限切れのEntryは取り出せるようになった。
だけど、この仕組みの状態では期限切れになったEntryは容赦なく消えていってしまう。
つまり、そのEntryがまだ利用されるかもしれないのに、消えてしまうのは(色々な意味で)もったいない。
特に、javaだとHashMapとかのloadFactorまわりの動きは(きっと)もったいない

もう一度LRUについてwikipediaに確認すると

Least Recently Used (LRU) はキャッシュメモリや仮想メモリが扱うデータのリソースへの割り当てを決定するアルゴリズムである。対義語はMost Recently Used (MRU)。
和訳すると「最近最も使われなかったもの」つまり「使われてから最も長い時間が経ったもの」「参照される頻度が最も低いもの」である。

ということなので(?)、ホントに使われ無かったもの(アクセスが少ないもの)から順番に消えるように考えてみた。

その結果がこれ

public class LRUCache implements CacheLifeCycle {
    
    private static final Log log = LogFactory.getLog(LRUCache.class);
    
    protected final ConcurrentMap<String, PrioritalEntry> cache = new ConcurrentHashMap<String, PrioritalEntry>();

    protected final DelayQueue<PrioritalEntry> expiredQueue =  new DelayQueue<PrioritalEntry>();
    
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
    private final Lock writeLock = lock.writeLock();
    
    private final Lock readLock = lock.readLock();
    
    public void register(LifeCycleExecutor executor){
        executor.add(this, expiredQueue);
    }
    
    public void purge(PrioritalEntry entry){
        writeLock.lock();
        try {
            // priorityを下げる
            if(entry.decrementPriority() < 1){
                // 1以下なら誰も使ってなさそうなので消す
                if(log.isDebugEnabled()){
                    log.debug("purge entry => " + entry);
                }
                cache.remove(entry.getKey());
                return;
            }
            // もう一度チャンス
            entry.setExpiredAt(retransmission(entry.getPriority()));
            expiredQueue.offer(entry);
        } finally {
            writeLock.unlock();
        }
    }
    
    public boolean set(String key, String value, long expiredAt){
        writeLock.lock();
        try {
            PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt);
            PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry);
            // 既存の値がない
            if(null == previousEntry){
                // ということは新しい値
                return expiredQueue.offer(newEntry);
            }
            previousEntry.setValue(value);
            previousEntry.setExpiredAt(expiredAt);
            previousEntry.incrementPriority();
            return true;
        } finally {
            writeLock.unlock();
        }
    }

    public PrioritalEntry get(String key) {
        readLock.lock();
        try {
            if(!cache.containsKey(key)){
                return null;
            }
            
            PrioritalEntry entry = cache.get(key);
            // 時間切れなので見えなくする
            if(entry.isExpired()){
                return null;
            }
            // 使う人がいたのでpriorityをあげる
            entry.incrementPriority();
            return entry;
        } finally {
            readLock.unlock();
        }
    }

    public void remove(String key) {
        remove(key, 0);
    }
    
    public void remove(String key, long expiredAt){
        writeLock.lock();
        try {
            PrioritalEntry entry = cache.get(key);
            if(null != entry){
                entry.setExpiredAt(expiredAt);
                entry.decrementPriority();
            }
        } finally {
            writeLock.unlock();
        }
    }
    
    public void flush() {
        flush(0);
    }
    
    public void flush(final long expiredAt){
        writeLock.lock();
        try {
            Iterator<Map.Entry<String, PrioritalEntry>> entries = cache.entrySet().iterator();
            while(entries.hasNext()){
                final Map.Entry<String, PrioritalEntry> entry = entries.next();
                final PrioritalEntry value = entry.getValue();
                value.setExpiredAt(expiredAt);
            }
        } finally {
            writeLock.unlock();
        }
    }
    
    protected static long retransmission(final int currentPriority){
        if(currentPriority < PrioritalEntry.MAX_PRIORITY){
            // binary exponential backoff
            long freq = currentPriority + Math.round(Math.pow(2, currentPriority));
            return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority);
        }
        return retransmission(PrioritalEntry.MAX_PRIORITY - 1);
    }
}

んで、PrioritalEntryの実装はこれ

public class PrioritalEntry implements Delayed {
    
    public static final int MAX_PRIORITY = 10;
    
    public static final int DEFAULT_PRIORITY = 5;
    
    private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY);
    
    private final String key;
    
    private final AtomicReference<String> value;
    
    private final AtomicLong expiredAt;
    
    public PrioritalEntry(final String key, final String value, final long expiredAt){
        this.key = key;
        this.value = new AtomicReference<String>(value);
        if(0 == expiredAt){
            this.expiredAt = new AtomicLong(Long.MAX_VALUE);
        } else if(expiredAt < 0) {
            this.expiredAt = new AtomicLong(0);
        } else {
            this.expiredAt = new AtomicLong(relative(expiredAt));
        }
    }

    protected static long relative(long expiredAt){
        // 現在時間 + 指定時間
        return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    protected static long absolute(long expiredAt){
        return TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    public int incrementPriority(){
        if(MAX_PRIORITY < priority.get()){
            return MAX_PRIORITY;
        }
        return priority.incrementAndGet();
    }
    
    public int decrementPriority(){
        return priority.decrementAndGet();
    }
    
    public int getPriority(){
        return priority.get();
    }

    public String getKey(){
        return key;
    }
    
    public String getValue(){
        return value.get();
    }
    
    public void setValue(String value){
        this.value.set(value);
    }
    
    public void setExpiredAt(long newValue){
        // 現在時間より次の期限を設定
        this.expiredAt.set(relative(newValue));
    }

    private long elapsed(){
        return expiredAt.get() - System.currentTimeMillis();
    }
    
    public boolean isExpired(){
        return elapsed() < 1;
    }

    public long getDelay(TimeUnit unit) {
        // expiredの時間から経過時間を引き、残り時間を算出
        return unit.convert(elapsed(), TimeUnit.MILLISECONDS);
    }
    
    public int compareTo(Delayed o) {
        PrioritalEntry target = (PrioritalEntry) o;
        final long x = expiredAt.get();
        final long y = target.expiredAt.get();
        if(x < y){
            return -1;
        }
        if(x > y){
            return 1;
        }
        return 0;
    }
    
    public String toString(){
        StringBuilder buf = new StringBuilder();
        buf.append("key=").append(key).append(",");
        buf.append("value=").append(value).append(",");
        buf.append("priority=").append(priority);
        return buf.toString();
    }
    
}

他にも

public interface CacheLifeCycle {
    public void register(LifeCycleExecutor executor);

    public void purge(PrioritalEntry entry);
}

public class LifeCycleExecutor {
    
    protected final ExecutorService executor;
    
    public LifeCycleExecutor(final ExecutorService executor){
        this.executor = executor;
    }
    
    public void add(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){
        executor.execute(new Monitor(lifeCycle, queue));
    }
    
    public void shutdown(){
        executor.shutdown();
        try {
            if(!executor.awaitTermination(10, TimeUnit.SECONDS)){
                executor.shutdownNow();
            }
        } catch(InterruptedException e){
            e.printStackTrace(System.err);
        }
    }
    
    private static class Monitor implements Runnable {
        private static final Log log = LogFactory.getLog(Monitor.class);
        private final CacheLifeCycle lifeCycle;
        private final BlockingQueue<PrioritalEntry> queue;
        private Monitor(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){
            this.lifeCycle = lifeCycle;
            this.queue = queue;
        }
        public void run(){
            try {
                while(true){
                    PrioritalEntry entry = queue.take();
                    if(log.isDebugEnabled()){
                        log.debug("find entry => " + entry);
                    }
                    lifeCycle.purge(entry);
                }
            } catch(InterruptedException e){
            }
        }
    }

}

細かく解説していく(誰に?)と

まず、PrioritalEntry(スペルとか意味は気にしない)の実装側から

ほとんどの実装は前回のと同じ。
今回は他にpriority(優先度)も一緒に持ってみた
インスタンス生成時は

    public static final int MAX_PRIORITY = 10;
    
    public static final int DEFAULT_PRIORITY = 5;
    
    private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY);

で作られるんだけど、LRUCacheのgetとかの呼び出しが行われる毎に、priorityをincrementしてる

// class LRUCache
    public PrioritalEntry get(String key) {
        readLock.lock();
        try {
            if(!cache.containsKey(key)){
                return null;
            }
            
            PrioritalEntry entry = cache.get(key);
            // 時間切れなので見えなくする
            if(entry.isExpired()){
                return null;
            }
            // 使う人がいたのでpriorityをあげる
            entry.incrementPriority();
            return entry;
        } finally {
            readLock.unlock();
        }
    }

// class PrioritalEntry
    public int incrementPriority(){
        if(MAX_PRIORITY < priority.get()){
            return MAX_PRIORITY;
        }
        return priority.incrementAndGet();
    }
    
    public int decrementPriority(){
        return priority.decrementAndGet();
    }
    
    public int getPriority(){
        return priority.get();
    }

これで、最も "使われる頻度の高い" Entryは "priorityが高い" という状態を作り出してる。

その他にも、setで値が更新される際にも、既に同じkeyでEntryが作られてたらその部分を "再利用" してみた

    public boolean set(String key, String value, long expiredAt){
        writeLock.lock();
        try {
            PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt);
            PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry);
            // 既存の値がない
            if(null == previousEntry){
                // ということは新しい値
                return expiredQueue.offer(newEntry);
            }
            previousEntry.setValue(value);
            previousEntry.setExpiredAt(expiredAt);
            previousEntry.incrementPriority();
            return true;
        } finally {
            writeLock.unlock();
        }
    }

ということで、priorityという値によって最近使われているかどうかまで、表現できたので、次は値の削除について

    public void purge(PrioritalEntry entry){
        writeLock.lock();
        try {
            // priorityを下げる
            if(entry.decrementPriority() < 1){
                // 1以下なら誰も使ってなさそうなので消す
                if(log.isDebugEnabled()){
                    log.debug("purge entry => " + entry);
                }
                cache.remove(entry.getKey());
                return;
            }
            // もう一度チャンス
            entry.setExpiredAt(retransmission(entry.getPriority()));
            expiredQueue.offer(entry);
        } finally {
            writeLock.unlock();
        }
    }

purgeというメソッドでは、LifeCycleExecutorによって、DelayQueueの中身が取り出された "期限切れ" の Entry が引数に渡される

        public void run(){
            try {
                while(true){
                    PrioritalEntry entry = queue.take();
                    if(log.isDebugEnabled()){
                        log.debug("find entry => " + entry);
                    }
                    lifeCycle.purge(entry);
                }
            } catch(InterruptedException e){
            }
        }

期限切れの Entry であっても、すぐに削除は行わずに "再利用" されるチャンスを与えるため、期限切れの Entry であっても Delay Queue に再送(retransmission)している。
ただ単純に再送するのではなく、再送のアルゴリズムとして昔どこかで覚えた "binary exponential backoff" を使ってみた(うろ覚えバージョン)

// binary exponential backoff
long freq = currentPriority + Math.round(Math.pow(2, currentPriority));
return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority);

これを実行してみるとこんな時間が求められる

retransmission(0) => 1
retransmission(1) => 3
retransmission(2) => 5
retransmission(3) => 10
retransmission(4) => 19
retransmission(5) => 33
retransmission(6) => 62
retransmission(7) => 119
retransmission(8) => 232
retransmission(9) => 457
:
:
:

これは再送時間を作るときに、本来ならリトライ回数を使うんだけど(リトライの数が多いほど、次回の再送時間までの待ちが増える、よってこまめに再送させない)、ここでは priority を使用している。
これによって、priorityが高いものほど、時間切れでも生存時間を多くすることができる。これによって再利用のチャンスが増えるはず。(priorityが高い = 最も再利用される可能性が高そう)

ということで、簡単に動かしてみる

LRUCache cache = new LRUCache();
LifeCycleExecutor executor = new LifeCycleExecutor(Executors.newCachedThreadPool());
cache.register(executor);

cache.set("hoge", "123", 5);
cache.set("foo", "456", 2);
cache.set("bar", "789", 2);

cache.get("foo");

try {
    TimeUnit.SECONDS.sleep(120);
} catch(InterruptedException e){
}

executor.shutdown();

で、これを動かしてみると。。。

LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=6
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=5
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=5
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=4
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=4
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=3
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=3
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=5
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=2
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=2
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=1
LRUCache: purge entry => key=bar,value=789,priority=0
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=1
LRUCache: purge entry => key=hoge,value=123,priority=0
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=4
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=3
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=2
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=1
LRUCache: purge entry => key=foo,value=456,priority=0

アクセスのあった "foo" キーは、 同じタイムアウトが設定されている "bar" よりも後に削除されている。
これでなんちゃって LRU はできた(?)

ちなみに、writeLockとかreadLockなどはたぶん使ってない。というか使わなくてもいいはず。

ちなみに、本家 memcached は slab allocationとかを使っているので、メモリの効率からいうとそっちの方がいいかも。
こういった部分を効率よく切り替えれるようにしたいものです

次回は、分散ハッシュ(というか複数台で連携)など。

2010/01/15

memcacheの cache の部分を java で

ポスト @ 0:06:52 , 修正 @ 2010/01/15 0:11:13 | ,     

指定時間後に消える(というか参照出来なくなる)をやってみることに。
DelayQueueとDelayedで実装してみる。

Cacheの部分

public class RetireCache implements Cache {
    
    protected final Map<String, PeriodEntry> cache = new ConcurrentHashMap<String, PeriodEntry>();
    
    protected final DelayQueue<PeriodEntry> expiredQueue =  new DelayQueue<PeriodEntry>();

    public boolean set(String key, String value, long expiredAt) {
        check();
        final PeriodEntry entry = new PeriodEntry(key, value, expiredAt);
        cache.put(key, entry);
        return expiredQueue.add(entry);
    }

    public Entry get(String key) {
        if(!cache.containsKey(key)){
            return null;
        }
        PeriodEntry entry = cache.get(key);
        if(entry.isExpired()){
            return null;
        }
        return entry;
    }
    
    public void remove(String key) {
        remove(key, 0);
    }
    
    public void remove(String key, long expiredAt){
        PeriodEntry entry = cache.get(key);
        if(null != entry){
            entry.setExpiredAt(expiredAt);
        }
    }
    
    public void flush() {
        flush(0);
    }
    
    public void flush(long expiredAt){
        Iterator<Map.Entry<String, PeriodEntry>> entries = cache.entrySet().iterator();
        while(entries.hasNext()){
            Map.Entry<String, PeriodEntry> entry = entries.next();
            entry.getValue().setExpiredAt(expiredAt);
        }
    }
    
    protected void check(){
        synchronized(this){
            PeriodEntry e = null;
            while((e = expiredQueue.poll()) != null){
                cache.remove(e.getKey());
            }
        }
    }
}

mapにputするときは、同じentryのインスタンスをqueueにも入れておく
getするときには、isExpiredを見て期限切れはnullを返す。

なんとなく、期限切れのqueueの参照にスレッドを立てるのもあれなので、setの時にentryの削除をやってみる

Entryの部分

public class PeriodEntry implements Entry, Delayed {
    
    protected final String key;
    
    protected final String value;
    
    protected long expiredAt;
    
    public PeriodEntry(final String key, final String value, final long expiredAt){
        this.key = key;
        this.value = value;
        if(0 == expiredAt){
            this.expiredAt = Long.MAX_VALUE;
        } else if(expiredAt < 0) {
            this.expiredAt = 0;
        } else {
            this.expiredAt = relative(expiredAt);
        }
    }
    
    protected static long relative(long expiredAt){
        // 現在時間 + 指定時間
        return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    protected static long absolute(long expiredAt){
        return TimeUnit.SECONDS.toMillis(expiredAt);
    }

    public String getKey(){
        return key;
    }
    
    public String getValue(){
        return value;
    }
    
    public void setExpiredAt(long newValue){
        // 現在時間より次の期限を設定
        this.expiredAt = relative(newValue);
    }

    private long elapsed(){
        return expiredAt - System.currentTimeMillis();
    }
    
    public boolean isExpired(){
        return elapsed() < 1;
    }

    public long getDelay(TimeUnit unit) {
        // expiredの時間から経過時間を引き、残り時間を算出
        return unit.convert(elapsed(), TimeUnit.MILLISECONDS);
    }
    
    public int compareTo(Delayed o) {
        PeriodEntry target = (PeriodEntry) o;
        if(expiredAt < target.expiredAt){
            return -1;
        }
        if(expiredAt > target.expiredAt){
            return 1;
        }
        return 0;
    }

}

entryは基本的に相対的な時間を設定するようにしてる以外は、普通のでDelayedな実装で。

テスト

public class RetireCacheTest {
    
    @Test
    public void 指定時間で参照出来なくなる(){
        Cache cache = new RetireCache();
        cache.set("hoge", "hogeValue", 2);
        Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch(InterruptedException e){
        }
        Assert.assertNull("参照できなくなってる", cache.get("hoge"));
    }
    
    @Test
    public void 指定時間後に消えてる(){
        Cache cache = new RetireCache();
        cache.set("hoge", "hogeValue", 2);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e){
        }
        Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue");
        cache.remove("hoge", 5);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e){
        }
        Assert.assertEquals("これは消えてるのが正解?", cache.get("hoge").getValue(), "hogeValue");
    }
    
    @Test
    public void ゼロを指定すると消えない(){
        Cache cache = new RetireCache();
        cache.set("hoge", "hogeValue", 0);
        cache.set("foo", "fooValue", 1);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e){
        }
        Assert.assertNull("これは1秒後消えてる", cache.get("foo"));
        Assert.assertEquals("これは消えない", cache.get("hoge").getValue(), "hogeValue");
    }

    @Test
    public void キーがない(){
        Cache cache = new RetireCache();
        cache.set("hoge", "1", 0);
        Assert.assertNull(cache.get("foo"));
    }

    :
    : 省略
    :
}

memcacheのdeleteと一時実装が違うかな。本物は、delete key time でtimeにどんな時間を指定してもすぐに参照出来なくなる

時間を指定しない場合

set hoge 0 30 4
1234
STORED

delete hoge
DELETED

get hoge
END

消える時間を指定しても...

set hoge 0 30 4
1234
STORED

get hoge
VALUE hoge 0 4
1234
END

delete hoge 10000
DELETED

get hoge
END

なかなか難しい
次は LRU

2010/01/13

NIO にしたら爆速になった(was: なんちゃってmemcache互換サーバ)

ポスト @ 2:37:24 | , ,     

昨日の続き。
server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった

メイン

public class Server extends Thread {
    
    protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>();
    
    protected final ExecutorService acceptPool;
    
    protected final Cache cache;
    
    protected final int port;
    
    protected final int maxConnection;
    
    public Server(final int port, final int maxConnection){
        this(port, maxConnection, ByteSizeUnit.Mega.toLong(64));
    }
    public Server(final int port, final int maxConnection, final long maxMemory){
        this.port = port;
        this.maxConnection = maxConnection;
        this.cache = new LRUCache(maxMemory);
        this.acceptPool = Executors.newFixedThreadPool(maxConnection);
    }
    
    public static void main(String...args){
        Server s = new Server(12221, 32);
        s.start();
        try {
            s.join();
        } catch(InterruptedException e){
            e.printStackTrace(System.err);
        }
    }

    public void run(){
        ServerSocketChannel channel = null;
        try {
            channel = ServerSocketChannel.open();
            channel.configureBlocking(false);
            
            final ServerSocket serverSocket = channel.socket();
            serverSocket.setReuseAddress(true);
            serverSocket.bind(new InetSocketAddress(port));
            
            final Selector selector = Selector.open();
            try {
                channel.register(selector, SelectionKey.OP_ACCEPT, new AcceptAction(cache));
                
                while(0 < selector.select()){
                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    while(keys.hasNext()){
                        final SelectionKey key = keys.next();
                        keys.remove();
                        
                        if(!key.isValid()){
                            continue;
                        }
                        
                        Action action = (Action) key.attachment();
                        action.execute(key);
                    }
                }
            } finally {
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while(keys.hasNext()){
                    final SelectionKey key = keys.next();
                    key.channel().close();
                }
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(null != channel){
                try {
                    channel.close();
                } catch(IOException e){
                    e.printStackTrace();
                }
            }
        }
    }
}

新規接続を受け入れるAccept部分

public class AcceptAction implements Action {
    
    private final Cache cache;
    
    public AcceptAction(final Cache cache){
        this.cache = cache;
    }

    public void execute(SelectionKey selectionKey) {
        SocketChannel channel = null;
        try {
            channel = ((ServerSocketChannel) selectionKey.channel()).accept();
            channel.configureBlocking(false);

            ReadAction action = new ReadAction(cache, new ByteBufferReader(channel));
            channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

んで、読み込みと書き込みは分けた

public class ReadAction implements Action {
    
    protected final Cache cache;
    
    protected final Reader reader;
    
    protected final Handler handler;
    
    public ReadAction(final Cache cache, final Reader reader){
        this.cache = cache;
        this.reader = reader;
        this.handler = new Handler(cache, reader);
    }

    public void execute(SelectionKey selectionKey) {
        if(!selectionKey.isReadable()){
            return;
        }
        
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        try {
            if(!reader.readable()){
                channel.close();
                selectionKey.cancel();
                return;
            }
            String line = reader.readLine();
            if(line == null){
                channel.close();
                selectionKey.cancel();
                return;
            }
            Return r = handler.execute(line);

            WriteAction action = new WriteAction(r, this);
            channel.register(selectionKey.selector(), SelectionKey.OP_WRITE, action);
        } catch(IOException e){
            e.printStackTrace();
        }
    }

    private static class Handler implements CommandVisitor {
        private final Cache cache;
        private final Reader reader;
        private Handler(final Cache cache, final Reader reader){
            this.cache = cache;
            this.reader = reader;
        }
        public Return execute(String line){
            try {
                final StringReader r = new StringReader(line);
                final MemcacheParser parser = new MemcacheParser(r);
                
                Command command = parser.Command();
                return command.accept(this, null);
            } catch(ParseException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            } catch(Exception e){
                e.printStackTrace();
                return new Return(ResponseType.SERVER_ERROR, e.getMessage());
            }
        }
        :
        : // 省略
        :    
        public Return visit(RetrievalCommand command, Parameter parameter) {
            final List<String> keys = command.getKeys();
            final List<Return> returns = new ArrayList<Return>();
            for(final String key: keys){
                Entry entry = cache.get(key);
                if(null == entry){
                    returns.add(new Return(ResponseType.END));
                    continue;
                }
                returns.add(new Return(ResponseType.SEND_VALUE,
                    key, entry.getFlag(), entry.getLength(), entry.getValue()
                ));
            }
            return new Return(returns.toArray(new Return[returns.size()]));
        }
    
        public Return visit(SetCommand command, Parameter parameter) {
            try {
                final String nextLine = reader.readLine();
                if(cache.set(command.getKey(), nextLine, command.getFlags(), command.getExpTime())){
                    return new Return(ResponseType.STORED);
                }
                return new Return(ResponseType.NOT_STORED);
            } catch(IOException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            }
        }
    
        public Return visit(VersionCommand command, Parameter parameter) {
            return null;
        }
    }

}
public class WriteAction implements Action {
    
    private static final Charset ASCII = Charset.forName("US-ASCII"); 
    
    protected final Return ret;
    
    protected final ReadAction action;
    
    public WriteAction(final Return ret, ReadAction action){
        this.ret = ret;
        this.action = action;
    }

    public void execute(SelectionKey selectionKey) {
        if(!selectionKey.isWritable()){
            return;
        }
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        
        final CharBuffer buf = CharBuffer.wrap(ret.renderMessage());
        try {
            ByteBuffer bytes = ASCII.encode(buf);
            int capacity = bytes.capacity();
            int size = channel.write(bytes);
            if(size < capacity){
                int sum = size;
                while(sum < capacity){
                    if(size == 0){
                        return;
                    }
                    bytes.position(sum);
                    size = channel.write(bytes);
                    sum += size;
                }
            }
            try {
                channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
            } catch(ClosedChannelException e){
                e.printStackTrace();
            }
        } catch(IOException e){
            e.printStackTrace();
        }
    }
}

ということで、これをつかって簡単に比較すると

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 12221)
);
foreach($target as $t){
    $memcache
 = new Memcache;
    $memcache
->connect($t['host'], $t['port']);

    $fail = 0;
    $elapsed = microtime(true);
    for($i = 0; $i < 1000; ++$i){
        if(false === $memcache
->set('hoge', '1234')){
            echo 'ERROR!!', PHP_EOL;
        }
        if(false === $memcache
->set('hoge', '123')){
            echo 'ERROR!!!', PHP_EOL;
        }
        $value = $memcache
->get('hoge');
        if(false === $memcache
->set('hoge', $value + 1)){
            echo 'ERRROR!!!!', PHP_EOL;
        }
        $result = $memcache
->get('hoge');
        if($result != 124){
            $fail++;
        }
    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
    echo 'fail => ', $fail, PHP_EOL;
}
1)
target host => localhost port =>11211
elapsed: 0.66518497467
fail => 0
target host => localhost port =>12221
elapsed: 0.870754003525
fail => 0

2)
target host => localhost port =>11211
elapsed: 1.1857790947
fail => 0
target host => localhost port =>12221
elapsed: 0.868647098541
fail => 0

memcacheに匹敵してきた。

ってか、ByteBufferを読み書きするのは初めて書いたので、すごく手こずった。。
単純に \r\n までの一行を読みたいだけなのに。。
ということで、BufferedReaderのreadLine的なのを書いて色々とお茶を濁す。。

public class ByteBufferReader {
    
    protected final SocketChannel channel;
    
    protected final ByteBuffer buffer;
    
    public ByteBufferReader(final SocketChannel channel){
        this.channel = channel;
        this.buffer = ByteBuffer.allocateDirect(512);
    }
    
    public boolean readable() throws IOException {
        return buffer.hasRemaining();
    }
    
    public String readLine() throws IOException {
        final StringBuilder sb = new StringBuilder();
        readInto(sb);
        if(sb.length() < 1){
            return null;
        }
        return sb.toString();
    }
    
    private void readInto(final StringBuilder sb) throws IOException {
        while(true){
            int read = channel.read(buffer);
            if(read == 0){
                break;
            }
            if(read == -1){
                return;
            }
        }
        buffer.flip();
        while(more(sb));
        buffer.compact();
    }
    
    private boolean more(final StringBuilder sb) throws IOException {
        if(buffer.hasRemaining()){
            char ch = (char) buffer.get();
            switch(ch){
            case '\n':
                return false;
            case '\r':
                return true;
            default:
                sb.append(ch);
                return true;
            }
        }
        buffer.clear();
        return true;
    }

}

とりあえず、一段落
NIOは難しかった。

2010/01/11

JavaCC で memcache text protocol の BNF(と、なんちゃってmemcache互換サーバ)

ポスト @ 23:39:32 | , , ,     

ANTLRのやつがあったけど、JavaCCのがみつからなかったので、でっちあげた
via - http://harward.us/~nharward/antlr/memcached_protocol.g

できあがったのは、こんな感じ

SKIP: {
    " " | "\t" | "\r" | "\n"
}
TOKEN: {
    < NUMBER: ["1"-"9"] (["0"-"9"])* | "0" >
  | < FLAGS: < NUMBER > >
  | < TIME: < NUMBER >  >
  | < LENGTH: < NUMBER > >
  | < CREMENT_VALUE: < NUMBER > >
  | < CAS_UNIQUE: < NUMBER > >
}
TOKEN: {
  < SET_STATEMENT: "set" >
  | < ADD_STATEMENT: "add" >
  | < REPLACE_STATEMENT: "replace" >
  | < APPEND_STATEMENT: "append" >
  | < PREPEND_STATEMENT: "prepend" >
  | < CAS_STATEMENT: "cas" >
  | < STORAGE_STATEMENT:
        < SET_STATEMENT >
        | < ADD_STATEMENT >
        | < REPLACE_STATEMENT >
        | < APPEND_STATEMENT >
        | < PREPEND_STATEMENT >
    >
  | < STORAGE_COMMAND:
        (
          < STORAGE_STATEMENT > < KEY > < FLAGS > < TIME > < LENGTH >
          | < CAS_STATEMENT > < KEY > < FLAGS > < TIME > < LENGTH > < CAS_UNIQUE >
        )
        (< NOREPLY >)?
    >
}
TOKEN: {
  < RETRIEVAL_STATEMENT: "get" | "gets" >
  | < RETRIEVAL_COMMAND:
        < RETRIEVAL_STATEMENT > < KEY >
    >
}
TOKEN: {
  < DELETE_STATEMENT: "delete" >
  | < DELETE_COMMAND:
        < DELETE_STATEMENT > < KEY > (< TIME >)? (< NOREPLY >)?
    >
}
TOKEN: {
  < INCREMENT_STATEMENT: "incr" >
  | < INCREMENT_COMMAND:
        < INCREMENT_STATEMENT > < KEY > < CREMENT_VALUE > (< NOREPLY >)?
    >
}
TOKEN: {
  < DECREMENT_STATEMENT: "decr" >
  | < DECREMENT_COMMAND:
        < DECREMENT_STATEMENT > < KEY > < CREMENT_VALUE > (< NOREPLY >)?
    >
}
TOKEN: {
  < STATISTICS_STATEMENT: "STAT" >
  | < STATISTICS_OPTION: "items" | "slabs" | "sizes" >
  | < STATISTICS_COMMAND:
        < STATISTICS_STATEMENT > (< STATISTICS_OPTION >)?
    >
}
TOKEN: {
  < FLUSH_STATEMENT: "flush_all" >
  | < FLUSH_COMMAND:
        < FLUSH_STATEMENT > (< TIME >)? (< NOREPLY >)?
    >
}
TOKEN: {
  < VERSION_STATEMENT: "version" >
  | < VERSION_COMMAND:
        < VERSION_STATEMENT >
    >
}
TOKEN: {
    < NOREPLY: "noreply" >
}
// last match
TOKEN: {
    < KEY: (~[" ", "\r","\n"])+ >
}

少し、数値まわりのToken(TIME, LENGTHとか)が適当すぎるかな。

んで、これにテキトーなNodeをparseしてあげてみる

Command Command():
{
  Command command;
}
{
  (
    command = RetrievalCommand()
  | command = StorageCommand()
  | command = DeleteCommand()
  | command = VersionCommand()
  )
  {
    return command;
  }
}
StorageCommand StorageCommand():
{
  StorageCommand command;
  String key;
  Long flags = 0L;
  Long time = 0L;
  Long length = 0L;
  Boolean noreply = Boolean.FALSE;
}
{
  command = createStorageCommand()
  key = Key()
  flags = Flags()
  time = Time()
  length = Length()
  noreply = Noreply()
  {
    command.setNode(jjtThis);
    command.setKey(key);
    command.setFlags(flags);
    command.setExpTime(time);
    command.setLength(length);
    command.setNoreply(noreply);
    return command;
  }
}
StorageCommand createStorageCommand():
{}
{
  (
    < SET_STATEMENT >
    {
      return new SetCommand();
    }
    | < ADD_STATEMENT >
    {
      return new AddCommand();
    }
    | < REPLACE_STATEMENT >
    {
      return new ReplaceCommand();
    }
    | < APPEND_STATEMENT >
    {
      return new AppendCommand();
    }
    | < PREPEND_STATEMENT >
    {
      return new PrependCommand();
    }
  )
}

RetrievalCommand RetrievalCommand():
{
  RetrievalCommand command = new RetrievalCommand();
  String key;
}
{
  < RETRIEVAL_STATEMENT >
  (
    key = Key()
    {
      command.addKey(key);
    }
  )+
  {
    command.setNode(jjtThis);
    return command;
  }
}
DeleteCommand DeleteCommand():
{
  DeleteCommand command = new DeleteCommand();
  String key;
  Long time = 0L;
  Boolean noreply = Boolean.FALSE;
}
{
  < DELETE_STATEMENT >
  key = Key()
  time = Time()
  noreply = Noreply()
  {
    command.setNode(jjtThis);
    command.setKey(key);
    command.setExpTime(time);
    command.setNoreply(noreply);
    return command;
  }
}
VersionCommand VersionCommand():
{}
{
  < VERSION_STATEMENT >
  {
    VersionCommand command = new VersionCommand();
    command.setNode(jjtThis);
    return command;
  }
}

String Key():
{ Token key; }
{
  key = < KEY >
  {
    return key.image;
  }
}

Long Flags():
{ Token flags; }
{
  flags = < NUMBER >
  {
    return Long.valueOf(flags.image);
  }
}

Long Time():
{ Token time; }
{
  time = < NUMBER >
  {
    return Long.valueOf(time.image);
  }
}

Long Length():
{ Token length; }
{
  length = < NUMBER >
  {
    return Long.valueOf(length.image);
  }
}

Boolean Noreply():
{ Boolean noreply = Boolean.FALSE; }
{
  [< NOREPLY >{noreply = Boolean.TRUE;}]
  {
    return noreply;
  }
}

これに、適当なコードを投げてあげると

{
    StringReader reader = new StringReader("get hoge\r\n");
    MemcacheParser parser = new MemcacheParser(reader);
    try {
        parser.Command();
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
{
    StringReader reader = new StringReader("gets hoge foo\r\n");
    MemcacheParser parser = new MemcacheParser(reader);
    try {
        parser.Command();
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
{
    StringReader reader = new StringReader("set xyzkey 0 0 6\r\n");
    MemcacheParser parser = new MemcacheParser(reader);
    try {
        parser.Command();
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
Call:   Command
  Call:   RetrievalCommand
    Consumed token: <<RETRIEVAL_STATEMENT>: "get" at line 1 column 1>
    Call:   Key
      Consumed token: <<KEY>: "hoge" at line 1 column 5>
    Return: Key
  Return: RetrievalCommand
Return: Command
Call:   Command
  Call:   RetrievalCommand
    Consumed token: <<RETRIEVAL_STATEMENT>: "gets" at line 1 column 1>
    Call:   Key
      Consumed token: <<KEY>: "hoge" at line 1 column 6>
    Return: Key
    Call:   Key
      Consumed token: <<KEY>: "foo" at line 1 column 11>
    Return: Key
  Return: RetrievalCommand
Return: Command
Call:   Command
  Call:   StorageCommand
    Call:   createStorageCommand
      Consumed token: <"set" at line 1 column 1>
    Return: createStorageCommand
    Call:   Key
      Consumed token: <<KEY>: "xyzkey" at line 1 column 5>
    Return: Key
    Call:   Flags
      Consumed token: <<NUMBER>: "0" at line 1 column 12>
    Return: Flags
    Call:   Time
      Consumed token: <<NUMBER>: "0" at line 1 column 14>
    Return: Time
    Call:   Length
      Consumed token: <<NUMBER>: "6" at line 1 column 16>
    Return: Length
    Call:   Noreply
    Return: Noreply
  Return: StorageCommand
Return: Command

と、こんな感じになる。
<NUMBER>とか、ホント、マジメにtokenが書けてないですね。。

ここまでできたので、set と get しかない、memcached 互換ものをでっち上げてみた

public class Server extends Thread {
    
    protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>();
    
    protected final Cache<String, String> cache = new LRUCache<String, String>();

    protected final ExecutorService acceptPool;
    
    protected final int port;
    
    protected final int maxConnection;
    
    public Server(final int port, final int maxConnection){
        this.port = port;
        this.maxConnection = maxConnection;
        this.acceptPool = Executors.newFixedThreadPool(maxConnection);
    }
    
    public static void main(String...args){
        Server s = new Server(12221, 32);
        s.start();
        
        while(s.isAlive()){
            try {
                TimeUnit.MICROSECONDS.sleep(10);
            } catch(InterruptedException e){}
        }
    }

    public void run(){
        try {
            ServerSocketFactory factory = ServerSocketFactory.getDefault();
            ServerSocket socket = factory.createServerSocket(port, maxConnection);
            socket.setReuseAddress(true);
            
            while(!socket.isClosed()){
                final Socket accept = socket.accept();
                acceptPool.execute(new AcceptHandler(accept));
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    protected class AcceptHandler implements Runnable {
        private final Socket socket;
        public AcceptHandler(final Socket socket){
            this.socket = socket;
        }
        public void run(){
            try {
                final InputStream in = socket.getInputStream();
                final OutputStream out = socket.getOutputStream();
                
                final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                final DataOutputStream writer = new DataOutputStream(out);
                final CommandWorker worker = new CommandWorker(reader);
                while(!socket.isClosed()){
                    if(!worker.prepare()){
                        break;
                    }
                    Return r = worker.call();
                    writer.writeBytes(r.renderMessage());
                }
            } catch(IOException e){
                e.printStackTrace();
            } finally {
                try {
                    socket.close();
                } catch(IOException e){
                    // nop
                }
            }
        }
    }
    private class CommandWorker implements CommandVisitor {
        private final BufferedReader reader;
        private String currentLine;
        public CommandWorker(final BufferedReader reader) {
            this.reader = reader;
        }
        
        public boolean prepare(){
            try {
                String line = reader.readLine();
                if(null == line){
                    return false;
                }
                currentLine = line;
                return true;
            } catch(IOException e){
                return false;
            }
        }
        
        public Return call() {
            try {
                final StringReader r = new StringReader(currentLine);
                final MemcacheParser parser = new MemcacheParser(r);
                
                Command command = parser.Command();
                return command.accept(this, null);
            } catch(ParseException e){
                return new Return(ResponseType.ERROR);
            }
        }
        
        public Return visit(Command command, Parameter parameter) {
            return null;
        }
    
        public Return visit(AddCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(AppendCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(CasCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(DeleteCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(PrependCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(ReplaceCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(RetrievalCommand command, Parameter parameter) {
            String value = cache.get(command.getKeys().get(0));
            if(null == value){
                return new Return(ResponseType.END);
            }
            return new Return(ResponseType.SEND_VALUE,
                command.getKeys().get(0), 0, value.length(),
                value
            );
        }
    
        public Return visit(SetCommand command, Parameter parameter) {
            try {
                final String nextLine = reader.readLine();
                cache.put(command.getKey(), nextLine, command.getExpTime().longValue());
                return new Return(ResponseType.STORED);
            } catch(IOException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            }
        }
    
        public Return visit(VersionCommand command, Parameter parameter) {
            return null;
        }
    }
}

見事に、setとgetしか実装してません。しかもハンドリングは少し適当。

ということで、これと(java-lang)、memcached(c-lang)で比較してみた。

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 12221)
);
foreach($target as $t){
    $memcache = new Memcache;
    $memcache->connect($t['host'], $t['port']);

    $elapsed = microtime(true);
    for($i = 0; $i < 1000; ++$i){
        $memcache->set('hoge', '123');
        $memcache->set('hoge', '124');
        $memcache->get('hoge');

    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
}
target host => localhost port =>11211
elapsed: 0.420136213303
target host => localhost port =>12221
elapsed: 1.34848499298

なんつーか、「もうちょっとがんばりま賞」って感じで残念感があります。(約3倍遅い)
とりあえず、動きそうなので、他の実装も頑張る。

2006/11/06

S2Container.PHP5-1.1.2リリース!

ポスト @ 1:39:48 |     

S2Container.PHP5-1.1.2をリリースしました
ref - http://s2container.php5.seasar.org/

詳細はChangelogへ。
大きな変更点としては、

  • パッケージをPEAR Package2へ移行
  • S2AOPにInterTypeの対応
  • YAML形式のダイコンファイルに対応しました。
  • キャッシュ機構の変更と、memcache関連の追加。
  • __autoload関係の追加
  • その他細かな機能修正

ということで、結構機能などが増えました。
バグトラッキングなどはJIRA、もしくはMLに。

InterType、Yaml、 キャッシュ関連のドキュメントは以下を参考にしていただくと良いと思います。

今回、kloveさんと最も議論を交わす事となったキャッシュ関連は結構改良されています。
多くはS2Containerのコンテナキャッシュ(serialize)としても使えますが、AOP時に発行することになるPHPコードもキャッシュ可能です。
これはCacheLiteを利用するキャッシュの他に、Memcacheを利用するキャッシュも用意しています。APC, eAccelerator, XCacheが何らかの理由で導入できない場合には有効でしょう。

もしS2Containerについて分からないことなどがあればML等にてサポートしていますので、ダウンロードして導入してみてください。

2006/02/13

S2ContainerMemcacheFactoryの速度測定

ポスト @ 2:00:50 , 修正 @ 2006/02/13 2:09:28 | ,     

前回の続き。
果たしてmemcacheはfilecacheより速いのか。

memcachedのインストール及び、libeventのインストールについては ./consifugre --prefix=/usr;make; make installと簡単なものです。

pecl::memcache
pecl install memcache でインストールしたので、導入は非常に簡単。(5.1.2からはext/memcacheとかあったっけかな。)

memcache関数については、 PHP マニュアルを。
ref - PHP: Memcache 関数 - Manual

また、memcachedのサーバは石器時代に作られたもの

nowel@linux:~> cat /proc/cpuinfo
processor       : 0
vendor_id       : GenuineTMx86
cpu family      : 6
model           : 4
model name      : Transmeta(tm) Crusoe(tm) Processor TMTM5600
stepping        : 3
cpu MHz         : 592.382
cache size      : 512 KB

しかも、メモリは256くらいしかないorz

memcachedサーバと実行するクライアントは分離し、kloveさん作成のcacheClientでFileCachedと比較。

まずはFileCacheから。

※ FileCacheは、cacheClient.php を実行しているクライアント機で実行してます。(memcachedサーバで実行ではない)

1回目

nowel@linux: examples/cache> php cacheClient.php
[INFO ] S2ContainerCachingFactory::create - cache directory not available.
time : 0.37139296531677
[INFO ] S2ContainerCachingFactory::create - cache directory not available.
time : 0.0055041313171387
[INFO ] S2ContainerCachingFactory::create - create container and cache it.
time : 0.047283172607422
A Object
(
)
[INFO ] S2ContainerCachingFactory::create - create container and cache it.
time : 0.0068569183349609
A Object
(
)

2回目

nowel@linux: examples/cache> php cacheClient.php
[INFO ] S2ContainerCachingFactory::create - cache directory not available.
time : 0.078088998794556
[INFO ] S2ContainerCachingFactory::create - cache directory not available.
time : 0.0063409805297852
[INFO ] S2ContainerCachingFactory::create - cached container available.
time : 0.055350065231323
A Object
(
)
[INFO ] S2ContainerCachingFactory::create - cached container available.
time : 0.0024549961090088
A Object
(
)

var/ の中には こんな感じでキャッシュされます。

nowel@linux: examples/cache> ls var/
c49172ffd634f5b1638945678d08e1a9  testC.dat

次に memcache で。

1回目

nowel@linux: examples/cache> php memcacheClient.php
[INFO ] S2ContainerMemcacheFactory::create - create container and cache it.
time : 0.21873593330383
[INFO ] S2ContainerMemcacheFactory::create - cached container available.
time : 0.0363450050354
[INFO ] S2ContainerMemcacheFactory::create - create container and cache it.
time : 0.061532020568848
A Object
(
)
[INFO ] S2ContainerMemcacheFactory::create - create container and cache it.
time : 0.07548189163208
A Object
(
)

2回目

nowel@linux: examples/cache> php memcacheClient.php
[INFO ] S2ContainerMemcacheFactory::create - cached container available.
time : 0.07556676864624
[INFO ] S2ContainerMemcacheFactory::create - cached container available.
time : 0.028607130050659
[INFO ] S2ContainerMemcacheFactory::create - cached container available.
time : 0.032361030578613
A Object
(
)
[INFO ] S2ContainerMemcacheFactory::create - cached container available.
time : 0.029422044754028
A Object
(
)

また、100回くらいやってみた後のログ

[INFO ] S2ContainerMemcacheFactory::create - cached container available.
time : 0.057204008102417
[INFO ] S2ContainerMemcacheFactory::create - cached container available.
time : 0.0099189281463623
[INFO ] S2ContainerMemcacheFactory::create - cached container available.
time : 0.0084459781646729
A Object
(
)
[INFO ] S2ContainerMemcacheFactory::create - cached container available.
time : 0.026761054992676
A Object
(
)

memcached -vv で起動しているので、ログの一部

nowel@linux:~> memcached -vv
<3 server listening
<7 new client connection
<7 get e06a6a6c91a02862ba2ba87827f427d10921436e
>7 END
<7 set e06a6a6c91a02862ba2ba87827f427d10921436e 0 0 7518
>7 STORED
<7 get e06a6a6c91a02862ba2ba87827f427d10921436e
>7 sending key e06a6a6c91a02862ba2ba87827f427d10921436e
>7 END
<7 get 18d960f7c265a58c76598f4d8b19568127fa2699
>7 END
<7 set 18d960f7c265a58c76598f4d8b19568127fa2699 0 0 7518
>7 STORED
<7 get testC.dat
>7 END
<7 set testC.dat 0 0 7518
>7 STORED
<7 connection closed.
<7 new client connection
<7 get e06a6a6c91a02862ba2ba87827f427d10921436e
>7 sending key e06a6a6c91a02862ba2ba87827f427d10921436e
>7 END
<7 get e06a6a6c91a02862ba2ba87827f427d10921436e
>7 sending key e06a6a6c91a02862ba2ba87827f427d10921436e
>7 END
<7 get 18d960f7c265a58c76598f4d8b19568127fa2699
>7 sending key 18d960f7c265a58c76598f4d8b19568127fa2699
>7 END
<7 get testC.dat
>7 sending key testC.dat
>7 END
<7 connection closed.

今回、memcachedはあんま速くなかったけど、負荷分散できるんで悪くないかも。...しれません。
FileCacheは速いけど、ディスクシークが嫌ーって人はmemcacheかな

# memcache は pconnectとかtimeout設定で良くなるかなー
## って、またplanet php japanのログ汚したな。俺(だったらやめろよ

2006/02/09

S2ContainerFactoryをmemcache

ポスト @ 13:56:32 , 修正 @ 2006/02/09 14:04:10 | ,     

kloveさんがS2ContainerCachingFactory作ってたので、memcache版を作ってみた。

memcached に container を突っ込むヤツ

final class S2ContainerMemcacheFactory {

    private static $host = 'localhost';
    private static $port = 11211;
    private static $timeout;
    private static $memcache = null;
    private static $cache_compress = false;
    private static $cache_expire = 0;
    public static $INITIALIZE_BEFORE_CACHE = false;

    public static function setMemcache(array $prop){
        extract($prop);
        
        self::$host = $host;
        if(isset($port)){
            self::$port = $port;
        }
        if(isset($timeout)){
            self::$timeout = $timeout;
        }
        if(isset($cache_compress)){
            self::$cache_compress = $cache_compress;
        }
        if(isset($cache_expire)){
            self::$cache_expire = $cache_expire;
        }
    }

    public static function getInstance(){
        if(null === self::$memcache){
            self::$memcache = new Memcache();
            if(!self::$memcache->connect(self::$host, self::$port, self::$timeout)){
                throw new Exception("connection failure");
            }
        }
        return self::$memcache;
    }

    public function __destruct(){
        self::$memcache->close();
        unset(self::$memcache);
    }

    public static function create($diconPath, $cacheName = null){
        $memcache = self::getInstance();
        if($cacheName == null){
            $cacheName = self::getCacheKeyName($diconPath);
        }
        
        $cache = self::get($cacheName);
        if($cache !== false){
            $container = unserialize($cache);
            if (is_object($container) && $container instanceof S2Container){
                $container->reconstruct(S2Container_ComponentDef::RECONSTRUCT_FORCE);
                return $container;
            }else{
                throw new Exception("invalid cache found.");
            }
        }

        $container = S2ContainerFactory::create($diconPath);

        if(self::$INITIALIZE_BEFORE_CACHE){
            $container->init();
        }

        self::set($cacheName, serialize($container));

        return $container;
    }

    protected static function set($key, $item){
        return self::$memcache->set($key, $item, self::$cache_compress, self::$cache_expire);
    }

    protected static function get($key){
        return self::$memcache->get($key);
    }

    protected static function remove($key){
        return self::$memcache->delete($key);
    }

    protected static function replace($key, $item){
        return self::$memcache->replace($key, $item, self::$cache_compress, self::$cache_expire);
    }

    protected static function getCacheKeyName($item){
        if(is_object($item)){
            $name = sha1((string)$item);
        } else {
            $name = sha1($item);
        }
        return $name;
    }

}

setMemcache()でmemcachedなどを設定(てか、メソッドのネーミングがorz)

if(class_exists('S2ContainerMemcacheFactory')){
    $memcache = array('host' => 'localhost', 'port' => 11211);
    S2ContainerMemcacheFactory::setMemcache($memcache);
}

後は殆んど一緒

$container = S2ContainerMemcacheFactory::create($PATH);
$a = $container->getComponent("a");
var_dump($a);

速度的には...ビミョー(マシンが旧石器時代ってのもあるんですが)

ref - PHP: Memcache 関数 - Manual
ref - Java Memcached Server

p.s.
このブログパスワードは後から変更できるようになってます。ユーザ認証とかカッコいいのないっす>kloveさん
というかsmtp死んでてメール送れないっぽ...orz