2010/03/08

JNA で mecab

ポスト @ 1:21:07 | , ,     

JNIなmecabを使っていたけど、libmecab(JNI) の *.soとか、*.dylib のコンパイルを環境(LinuxとかMacとか)毎に用意するのが面倒になってきたので、JNAでアクセスできないかと挑戦中...。

mecab.h をよみつつ、こんなインタフェースを用意

package org.chasen.mecab;

import com.sun.jna.Library;
import com.sun.jna.PointerType;
import com.sun.jna.Structure;

public interface LibMecab extends Library {
    
    final int MECAB_NOR_NODE = 0;
    final int MECAB_UNK_NODE = 1;
    final int MECAB_BOS_NODE = 2;
    final int MECAB_EOS_NODE = 3;
    final int MECAB_EON_NODE = 4;
    
    final int MECAB_USR_DIC = 1;
    final int MECAB_SYS_DIC = 0;
    final int MECAB_UNK_DIC = 2;

    int mecab_do(int argc, String...argv);

    mecab_t mecab_new(int argc, String...argv);
    
    mecab_t mecab_new2(final String arg);
    String mecab_version();
    
    String mecab_strerror(mecab_t mecab);
    void mecab_destroy(mecab_t mecab);

    int mecab_get_partial(mecab_t mecab);
    void mecab_set_partial(mecab_t mecab, int partial);
    float mecab_get_theta(mecab_t mecab);
    void mecab_set_theta(mecab_t mecab, float theta);
    int mecab_get_lattice_level(mecab_t mecab);
    void mecab_set_lattice_level(mecab_t mecab, int level);
    int mecab_get_all_morphs(mecab_t mecab);
    void mecab_set_all_morphs(mecab_t mecab, int all_morphs);

    String mecab_sparse_tostr(mecab_t mecab, final String str);
    String mecab_sparse_tostr2(mecab_t mecab, final String str, int len);
    String mecab_sparse_tostr3(mecab_t mecab, final String str, int len, String ostr, int olen);
    mecab_node_t mecab_sparse_tonode(mecab_t mecab, final String chr);
    mecab_node_t mecab_sparse_tonode2(mecab_t mecab, final String chr, int size);
    String mecab_nbest_sparse_tostr(mecab_t mecab, int N, final String str);
    String mecab_nbest_sparse_tostr2(mecab_t mecab, int N, final String str, int len);
    String mecab_nbest_sparse_tostr3(mecab_t mecab, int N, final String str, int len, String ostr, int olen);
    int mecab_nbest_init(mecab_t mecab, final String str);
    int mecab_nbest_init2(mecab_t mecab, final String str, int len);
    String mecab_nbest_next_tostr(mecab_t mecab);
    String mecab_nbest_next_tostr2(mecab_t mecab, String ostr, int olen);
    mecab_node_t mecab_nbest_next_tonode(mecab_t mecab);
    String mecab_format_node(mecab_t mecab, final mecab_node_t node);
    
    mecab_dictionary_info_t mecab_dictionary_info(mecab_t mecab);
    int mecab_dict_index(int argc, String...argv);
    int mecab_dict_gen(int argc, String...argv);
    int mecab_cost_train(int argc, String...argv);
    int mecab_system_eval(int argc, String...argv);
    int mecab_test_gen(int argc, String...argv);
    
    class mecab_t extends PointerType {
        public static class Type extends Structure {
            public int allocated;
        }
        public Type get(){
            Type type = (Type) Structure.newInstance(Type.class);
            type.writeField("allocated", getPointer().getInt(0));
            return type;
        }
    }
    class mecab_node_t extends PointerType {
    }
    class mecab_dictionary_info_t extends PointerType {
    }
    
    abstract class Tagger extends PointerType {
        abstract String parse(final String str);
        abstract Node parseToNode(final String str);
        abstract String parseNBest(int N, final String str);
        abstract boolean parseNBestInit(final String str);
        abstract Node nextNode();
        abstract String next();
        abstract String formatNode(final Node node);

        // configuration
        abstract boolean partial();
        abstract void set_partial(boolean partial);
        abstract float theta();
        abstract void set_theta(float theta);
        abstract int lattice_level();
        abstract void set_lattice_level(int level);
        abstract boolean all_morphs();
        abstract void set_all_morphs(boolean all_morphs);
        
        abstract DictionaryInfo dictionary_info();
        abstract String what();

        static Tagger create(int argc, String...argv){
            return null;
        }
        static Tagger create(String...argv){
            return null;
        }
    }
    
    class Node extends PointerType {
        
    }
    
    class DictionaryInfo extends PointerType {
        
    }
}

で、いざ実行...。

package org.chasen.mecab;

import org.chasen.mecab.LibMecab.mecab_t;

import com.sun.jna.Native;

public class Main {
    public static void main(String...args){
        LibMecab mecab = (LibMecab) Native.loadLibrary("mecab", LibMecab.class);
        System.out.println(mecab.mecab_version());
        
        mecab_t _ = mecab.mecab_new2("");
        System.out.println(mecab.mecab_strerror(_));
        
        // 文字化けだし、動かない...
        String result = mecab.mecab_sparse_tostr(_, "こんにちは!");
        System.out.println(result);
        mecab.mecab_destroy(_);
    }
}

こんな結果は出た。バージョンはちゃんと取れてた。けど、ちゃんと動かない...。

0.97

?	M
??	t
???	d
?ち	t
?	M
??	t
??	d
EOS

うーん。もう少し。。これができれば結構楽になるハズ...。
継続していきます
ref - http://github.com/nowelium/JNA-Mecab

2010/03/06

PHP でも Protocol Buffers しよう!

ポスト @ 21:22:40 , 修正 @ 2010/03/06 22:32:22 | , , , ,     

どうも巷では、Google の protocol buffers のPHP版の実装で pb4php が使われてるみたいですが、実装が(個人的に)アレすぎるので、phpbuf を使いましょうという話

これの使い方は README に書いてあるので省略しますが、最近こいつを fork して RPC (protobuf-socket-rpc) の実装を追加したので、ここに書いておくよ。
ref - http://github.com/nowelium/phpbuf

ということで、実装方法を紹介

実装方法(サンプル)

こんな proto を用意します。

package com.github.nowelium.phpbuf;

option optimize_for = SPEED;

message Web {
    enum SiteType {
        BLOG = 0;
        NEWS = 1;
        VIDEO = 2;
        UNKNOWN = 3;
    }
    message Site {
        required string url = 1;
        required string title = 2;
        required SiteType type = 3;
        optional string summary = 4;
    }
}
message Request {
    message SearchRequest {
        required string query = 1;
    }
}
message Response {
    message SearchResult {
        repeated Web.Site sites = 1;
    }
}

service SearchService {
    rpc search(Request.SearchRequest) returns(Response.SearchResult);
}

サーバサイド(java)

ちょー簡単です。
SocketRpcServer の実装は用意されているので、 Service の実装を register するだけ

package com.github.nowelium.phpbuf;

import java.util.concurrent.Executors;

import com.github.nowelium.phpbuf.ExampleSearch.SearchService;
import com.googlecode.protobuf.socketrpc.SocketRpcServer;

public class Server {
    
    public static void main(String...args){
        SocketRpcServer server = new SocketRpcServer(12345, Executors.newCachedThreadPool());
        server.registerBlockingService(SearchService.newReflectiveBlockingService(new SearchServiceImpl()));
        try {
            server.startServer();
            
            synchronized(server){
                server.wait();
            }
        } catch(InterruptedException e){
            // nop
        } finally {
            server.shutDown();
        }
    }

}

Service側の実装。これも簡単で、proto で定義したインタフェースどおりに実装するだけ

package com.github.nowelium.phpbuf;

import java.util.logging.Logger;

import com.github.nowelium.phpbuf.ExampleSearch.Web;
import com.github.nowelium.phpbuf.ExampleSearch.Request.SearchRequest;
import com.github.nowelium.phpbuf.ExampleSearch.Response.SearchResult;
import com.github.nowelium.phpbuf.ExampleSearch.Web.SiteType;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;

public class SearchServiceImpl implements ExampleSearch.SearchService.BlockingInterface {

    private final Logger logger = Logger.getLogger(SearchServiceImpl.class.getName());
    
    public SearchResult search(RpcController controller, SearchRequest request) throws ServiceException {
        String query = request.getQuery();
        logger.info("input query => " + query);
        return search(query);
    }
    
    protected SearchResult search(String query){
        final SearchResult.Builder result = SearchResult.newBuilder();
        {
            Web.Site.Builder builder = Web.Site.newBuilder();
            builder.setTitle("example 1 - " + query);
            builder.setUrl("http://www.example.com/");
            builder.setType(SiteType.NEWS);
            result.addSites(builder.build());
        }
        {
            Web.Site.Builder builder = Web.Site.newBuilder();
            builder.setTitle("example 2 - " + query);
            builder.setUrl("http://www.example.net/");
            builder.setType(SiteType.UNKNOWN);
            result.addSites(builder.build());
        }
        return result.build();
    }

}

クライアントサイド(php)

まずは、proto の message 定義を行います。

class Message_Web_Site extends PhpBuf_Message_Abstract {
    public function __construct(){
        $this->setField('url', PhpBuf_Type::STRING, PhpBuf_Rule::REQUIRED, 1);
        $this->setField('title', PhpBuf_Type::STRING, PhpBuf_Rule::REQUIRED, 2);
        $this->setField('type', PhpBuf_Type::ENUM, PhpBuf_Rule::REQUIRED, 3, Message_SiteType::values());
        $this->setField('summary', PhpBuf_Type::STRING, PhpBuf_Rule::OPTIONAL, 4);
    }
    public static function name(){
        return __CLASS__;
    }
}

class Message_SiteType extends Message_Enum_Abstract {
    const BLOG = 0;
    const NEWS = 1;
    const VIDEO = 2;
    const UNKNOWN = 3;
    public static function values(){
        return array(
            self::BLOG,
            self::NEWS,
            self::VIDEO,
            self::UNKNOWN
        );
    }
}

class Message_Request_SearchRequest extends PhpBuf_Message_Abstract {
    public function __construct(){
        $this->setField('query', PhpBuf_Type::STRING, PhpBuf_Rule::REQUIRED, 1);
    }
    public static function name(){
        return __CLASS__;
    }
}

class Message_Response_SearchResult extends PhpBuf_Message_Abstract {
    public function __construct(){
        $this->setField('sites', PhpBuf_Type::MESSAGE, PhpBuf_Rule::REPEATED, 1, Message_Web_Site::name());
    }
    public static function name(){
        return __CLASS__;
    }
}

次に、Service 部分の実装。これも proto の定義で

class Service_SearchService extends PhpBuf_RPC_Socket_Service_Client {
    public function __construct($host, $port){
        parent::__construct($host, $port);
        $this->setServiceFullQualifiedName('com.github.nowelium.phpbuf.SearchService');
        $this->registerMethodResponderClass('search', Message_Response_SearchResult::name());
    }
}

main処理

$request = new Message_Request_SearchRequest;
$request->query = 'hello world';

$service = new Service_SearchService('localhost', 12345);
$result = $service->search($request);

foreach($result->sites as $site){
    echo 'title => ', $site->title, PHP_EOL;
    echo 'url => ', $site->url, PHP_EOL;
    echo 'type => ', $site->type, PHP_EOL;
    echo PHP_EOL;
}

実装がすごく簡単ですね

実際に起動してみる

先ほどの java を起動しておきます。

shell > java -cp bin:protobuf-java-2.2.0.jar:protobuf-socket-rpc.jar com.github.nowelium.phpbuf.Server
Picked up _JAVA_OPTIONS: -Dfile.encoding=UTF-8
2010/03/06 21:01:47 com.googlecode.protobuf.socketrpc.SocketRpcServer$ServerThread runServer
情報: Listening for requests on port: 12345

次に php で通信してみます

shell > php search_client.php 
title => example 1 - hello world
url => http://www.example.com/
type => 1

title => example 2 - hello world
url => http://www.example.net/
type => 3

通信できてるのがわかります。

おわり

ということで、PHPでも protobuf で RPC できるようになったよ!やったー!\(^o^)/

ここまでのサンプルは phpbuf-rpc-example 置いてます。省略した部分もあるので、詳細はソースでどうぞ

ちなみに、php-protobuf の元作者のページは ここ なんですが、ロシア語(?)で何を書いてるのかわからないです。
いつか、日本語にしてみたいですね。

# 後は、php code を gen できればいいかな。。(これもいつか..。)

2010/02/07

HiveJDBC を S2JDBC 経由で使えるようにする

ポスト @ 4:36:35 , 修正 @ 2010/02/07 4:51:57 | , , , ,     

hadoop の話題。その3

HiveのJDBCを使えばリモート上で動いているHiveに対して、JDBC(over thrift)経由でHive QLを実行出来るのですごく便利です。
ref - Hive/HiveClient - Hadoop Wiki

HiveJDBCはフツーのJDBCっぽく使えるので、こんな感じで普通のDBサーバにSQLを投げる感覚で使える

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class HiveConnection {
    public static void main(String...args) throws SQLException {
        try {
            Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            System.exit(1);
        }
        Connection conn = DriverManager.getConnection("jdbc:hive://master1:10000/", "", "");
        Statement stmt = conn.createStatement();
        stmt.execute("SELECT distinct(id) FROM hoge");
        
        ResultSet rs = stmt.getResultSet();
        while(rs.next()){
            System.out.println(rs.getString(1));
        }
    }
}

なので、便利なものは便利なものと合体させてしまえ。ということで、みんな大好き S2JDBCで使えるようにしてみた

HiveServerを起動/停止

とりあえず、HiveJDBCを利用するには、thrift経由でHiveServerを叩ける必要がある。
以下のようなスクリプトを用意

start.sh

#!/usr/bin/env bash

pidfile=$HIVE_PID_DIR/hiveserver.pid
logfile=$HIVE_LOG_DIR/hiveserver.log

if [ -f $pidfile ]; then
    echo running as process `cat $pidfile`. stop it first
    exit 1
fi

nohup $HIVE_HOME/bin/hive --service hiveserver > $logfile 2>&1 < /dev/null &
echo $! > $pidfile

stop.sh

#!/usr/bin/env bash

pidfile=$HIVE_PID_DIR/hiveserver.pid
logfile=$HIVE_LOG_DIR/hiveserver.log

if [ -f $pidfile ]; then
    kill `cat $pidfile`
    rm $pidfile
else
    echo no pidfile $pidfile
fi

これで、実行したサーバでデフォルトのポート10000で起動する。

ちなみに、HADOOP_CONF_DIRとかHIVE_CONF_DIRは別途ちゃんと設定すること

HiveDialectを用意

先に書いておくと、SQLのorder by ...に相当するものは、HiveQLだと sort by ...なんだけど、今はまだ用意してない。今度書く

package org.seasar.extension.jdbc.dialect;

import org.seasar.extension.jdbc.SelectForUpdateType;

public class HiveDialect extends StandardDialect {
    @Override
    public String getName() {
        return "hive";
    }

    @Override
    public boolean supportsLimit() {
        return true;
    }
    
    @Override
    public String convertLimitSql(String sql, int offset, int limit) {
        StringBuilder buf = new StringBuilder(sql.length() + 20);
        buf.append(sql);
        buf.append(" limit ");
        buf.append(limit);
        return buf.toString();
    }

    @Override
    public boolean supportsBatchUpdateResults() {
        return false;
    }

    @Override
    public boolean supportsCursor() {
        return false;
    }

    @Override
    public boolean supportsForUpdate(SelectForUpdateType type, boolean withTarget) {
        return false;
    }

    @Override
    public boolean supportsGetGeneratedKeys() {
        return false;
    }

    @Override
    public boolean supportsIdentity() {
        return false;
    }

    @Override
    public boolean supportsInnerJoinForUpdate() {
        return false;
    }

    @Override
    public boolean supportsLockHint() {
        return false;
    }

    @Override
    public boolean supportsOffset() {
        return false;
    }

    @Override
    public boolean supportsOffsetWithoutLimit() {
        return false;
    }

    @Override
    public boolean supportsOuterJoinForUpdate() {
        return false;
    }

    @Override
    public boolean supportsSequence() {
        return false;
    }
    
}

HiveConnectionPoolを用意

これは、HiveがXA transactionとかconnection#closeなんかを実行することができない(未実装のコードなので...svn:..jdbc/HiveConnection)ので ConnectionPoolImpl を継承して ConnectionWrapper を取る部る部分を override

package org.seasar.extension.dbcp.impl;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.sql.XAConnection;
import javax.transaction.Transaction;

import org.seasar.extension.dbcp.ConnectionWrapper;
import org.seasar.framework.util.TransactionManagerUtil;

public class HiveConnectionPool extends ConnectionPoolImpl {
    
    private Map<Transaction, ConnectionWrapper> txActivePool = new ConcurrentHashMap<Transaction, ConnectionWrapper>();

    @Override
    public synchronized ConnectionWrapper checkOut() throws SQLException {
        Transaction tx = getTransaction();
        ConnectionWrapper conn = txActivePool.get(tx);
        if(null == conn){
            conn = createConnection(tx);
        }
        if(null != tx){
            txActivePool.put(tx, conn);
        }
        return conn;
    }
    
    protected ConnectionWrapper createConnection(Transaction transaction) throws SQLException {
        XAConnection xaConnection = getXADataSource().getXAConnection();
        Connection connection = xaConnection.getConnection();
        return new HiveConnectionWrapper(xaConnection, connection, this, transaction);
    }
    
    protected Transaction getTransaction(){
        return TransactionManagerUtil.getTransaction(getTransactionManager());
    }
    
}

HiveConnectionWrapper を用意する

これは後述するPreparedStatement対策として用意。PreparedStatementWrapper さえも wrapper を用意する必要があるので、ConnectionWrapperImpl を override

package org.seasar.extension.dbcp.impl;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.sql.XAConnection;
import javax.transaction.Transaction;

import org.seasar.extension.dbcp.ConnectionPool;
import org.seasar.framework.exception.SSQLException;

public class HiveConnectionWrapper extends ConnectionWrapperImpl {

    public HiveConnectionWrapper(XAConnection xaConnection,
            Connection physicalConnection, ConnectionPool connectionPool,
            Transaction tx) throws SQLException {
        super(xaConnection, physicalConnection, connectionPool, tx);
    }
    
    protected SQLException wrapException(final SQLException e, final String sql) {
        return new SSQLException("ESSR0072",
                new Object[] { sql, e.getMessage(),
                        new Integer(e.getErrorCode()), e.getSQLState() }, e
                        .getSQLState(), e.getErrorCode(), e, sql);
    }
    
    protected void assertOpened() throws SQLException {
        if (isClosed()) {
            throw new SSQLException("ESSR0062", null);
        }
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, autoGeneratedKeys), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnIndexes), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnNames), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }
}

HivePreparedStatementWrapperを用意する

これもConnectionと同じように、PreparedStatementの大半のコードが未実装(svn:jdbc/HivePreparedStatement)なので、PreparedStatementWrapper を継承して override

package org.seasar.extension.dbcp.impl;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.seasar.extension.jdbc.impl.PreparedStatementWrapper;
import org.seasar.framework.exception.SSQLException;

public class HivePreparedStatementWrapper extends PreparedStatementWrapper {

    protected final PreparedStatement original;
    
    protected final String sql;
    
    public HivePreparedStatementWrapper(PreparedStatement original, String sql) {
        super(original, sql);
        this.original = original;
        this.sql = sql;
    }
    
    protected SQLException wrapException(final SQLException e) {
        return wrapException(e, sql);
    }
    
    protected SQLException wrapException(final SQLException e, final String sql) {
        if (sql != null) {
            return new SSQLException("ESSR0072", new Object[] { sql,
                    String.valueOf(e.getErrorCode()), e.getSQLState() }, e
                    .getSQLState(), e.getErrorCode(), e, sql);
        }
        return e;
    }
    
    @Override
    public void close() {
        // HivePreparedStatement was not supported in #close
    }
    
    @Override
    public ResultSet executeQuery() throws SQLException {
        try {
            return new HiveResultSetWrapper(original.executeQuery());
        } catch (final SQLException e) {
            throw wrapException(e);
        }
    }
    
    @Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        try {
            return new HiveResultSetWrapper(original.executeQuery(sql));
        } catch (final SQLException e) {
            throw wrapException(e, sql);
        }
    }
    
    @Override
    public ResultSet getResultSet() throws SQLException {
        try {
            return new HiveResultSetWrapper(original.getResultSet());
        } catch (final SQLException e) {
            throw wrapException(e);
        }
    }
    
    @Override
    public ResultSet getGeneratedKeys() throws SQLException {
        try {
            return new HiveResultSetWrapper(original.getGeneratedKeys());
        } catch (final SQLException e) {
            throw wrapException(e);
        }
    }
}

HiveResultSetWrapper を用意

まだまだ続きます...HiveResultSet にも未実装が(svn:jdbc/HiveResultSet)なので、ResultSetWrapper を override

package org.seasar.extension.dbcp.impl;

import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.seasar.extension.jdbc.impl.ResultSetWrapper;

public class HiveResultSetWrapper extends ResultSetWrapper {
    
    protected static final String DOUBLE_QUOTE = "\"";
    
    protected final ResultSet rs;
    
    public HiveResultSetWrapper(ResultSet rs){
        super(rs);
        this.rs = rs;
    }

    @Override
    public void close() throws SQLException {
        // HiveResultSet was not supported in #close
    }
    
    protected Date parseDate(final String dt) throws Exception {
        String tmp = dt;
        if(dt.startsWith(DOUBLE_QUOTE) && dt.endsWith(DOUBLE_QUOTE)){
            int length = dt.length();
            tmp = dt.substring(1, length - 1);
        }
        return Date.valueOf(tmp);
    }
    
    @Override
    public Date getDate(int columnIndex) throws SQLException {
        Object obj = getObject(columnIndex);
        if (obj == null) {
            return null;
        }
        if(obj instanceof String){
            try {
                return parseDate((String) obj);
            } catch(Exception e){
                throw new SQLException("Cannot convert column " + columnIndex + " to date: " + e.toString());
            }
        }
        return super.getDate(columnIndex);
    }
    
}

jdbc.diconを修正

やっと、s2jdbcに近づいてきました...

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE components PUBLIC "-//SEASAR2.1//DTD S2Container//EN" "http://www.seasar.org/dtd/components21.dtd">
<components namespace="jdbc_hive">
    <!-- Hive does not support in jta -->
    <include path="jta.dicon"/>
    
    <component name="hiveDataSource" class="org.seasar.extension.dbcp.impl.XADataSourceImpl">
        <property name="driverClassName">"org.apache.hadoop.hive.jdbc.HiveDriver"</property>
        <property name="URL">"jdbc:hive://master1:10000/"</property>
        <property name="user">""</property>
        <property name="password">""</property>
    </component>

    <component name="hive" class="org.seasar.extension.dbcp.impl.DataSourceImpl" autoBinding="none">
        <arg>
            <component name="hivePool" class="org.seasar.extension.dbcp.impl.HiveConnectionPool" autoBinding="none">
                <property name="xaDataSource">hiveDataSource</property>
                <property name="timeout">600</property>
                <property name="maxPoolSize">10</property>
                <property name="allowLocalTx">true</property>
                <property name="readOnly">true</property>
                <property name="transactionManager">TransactionManager</property>
                <destroyMethod name="close"/>
            </component>
        </arg>
    </component>
</components>

s2jdbc.diconを用意

dialectをs2jdbcに読ませる

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE components PUBLIC "-//SEASAR//DTD S2Container 2.4//EN" "http://www.seasar.org/dtd/components24.dtd">
<components>
    <include path="jta.dicon"/>
    <include path="tx.dicon"/>
    <include path="s2jdbc-internal.dicon"/>
    
    <include path="jdbc-hive.dicon" />
    
    <component name="hiveDialect" class="org.seasar.extension.jdbc.dialect.HiveDialect" />
            
    <component name="hiveJdbcManager" class="org.seasar.extension.jdbc.manager.JdbcManagerImpl">
        <property name="maxRows">0</property>
        <property name="fetchSize">0</property>
        <property name="queryTimeout">0</property>
        <property name="dialect">hiveDialect</property>
        <initMethod name="init" />
    </component>
</components>

S2JDBC経由でHiveQLを投げてみる

Hiveはinsert文は無いので、LOAD DATA 文です。
しかも、preparedStatementで値のbindできないので、直接クエリを書きます。
さらに、getSingleResult であっても、何も結果セットがかえってこない(svn:hive/service/HiveServier)ので、selectBySql(Integer.class...)とかはできません。
あきらめて、selectBySql(String.class) で void にしてます。。

selectについても、preparedStatementでbindで値のbindができないので、直接クエリを書きます。
今はまだ、order by の書き換え(sort by)を行っていないので、orderByが使えません。直接クエリに書きます。
joinについても少し難あり。。。

等など、ということで、いくらか諦めると...!

@Component
@InterType("aop.propertyInterType")
public class HogeService {
    @Property(PropertyType.WRITE)
    protected JdbcManager hive;

    public void insert(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public void insertFromLocal(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA LOCAL INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public void overwrite(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public void overwriteFromLocal(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public List<Hoge> getHoge(){
        return hive.from(Hoge.class).where("id > 1234").limit(100).getResultList();
    }
}

\(^o^)/ やたー!うごいたー!

おわり。

ってことで、とりあえず、HiveをS2JDBC経由で利用できるようになったよ!

Seasarのパッケージはホント便利!ほとんどwrapperが用意されてるし、diconで切り替えれるから修正が楽!
でも、coreなパッケージほど、 private メソッド多いよ!せめて protected か getter を用意して!(無駄にコピーしちゃったのがいくつかある...)

ああああ、でもこれなら HiveJDBC をちゃんと修正した方が早かったかも。。orz
今度やってみよう。。

HDFS の HadoopThriftServer をなんとかする

ポスト @ 3:27:10 , 修正 @ 2010/02/07 3:31:41 | , , ,     

hadoop の話題。その2

hadoop を支える HDFS には HDFS-APIを通すことで、プログラム中から HDFS の読み書きが出きるようになります。(たぶん、hdfs-s3 なんかもこのAPI経由(? ソース読んでない))

(中略)

んで、この HDFS-API のなかに、Thrift を使って リモート上から HDFS の読み書きをできるようにしている HadoopThriftServer(theiftfs) があります。

この thriftfs の起動は に書かれているのですが、shellを握ってしまうのでこんな感じにしました。

#!/usr/bin/env bash

THRIFTFS_PID_FILE=$HADOOP_PID_DIR/thrift.pid
THRIFTFS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log

if [ -f $THRIFTFS_PID_FILE ]; then
    echo running as process `cat $THRIFTFS_PID_FILE`. stop it first
    exit 1
fi

CLASSPATH=$HADOOP_CONF_DIR
CLASSPATH=$CLASSPATH:$HADOOP_HOME/hadoop-0.20.1-core.jar:$HADOOP_HOME/hadoop-0.20.1-tools.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-1.0.4.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-api-1.0.4.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/log4j-1.2.15.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/contrib/thriftfs/hadoop-0.20.1-thriftfs.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/hadoopthriftapi.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/libthrift.jar

nohup java -Dcom.sun.management.jmxremote -cp $CLASSPATH org.apache.hadoop.thriftfs.HadoopThriftServer 10010 > $THRIFTFS_LOG_FILE 2>&1 < /dev/null &
echo $! > $THRIFTFS_PID_FILE

$HADOOP_HOME/bin/hadoop dfsadmin -safemode leave

同様に 停止は

#!/usr/bin/env bash

THRIFT_FS_PID_FILE=$HADOOP_PID_DIR/thrift.pid
THRIFT_FS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log

if [ -f $THRIFT_FS_PID_FILE ]; then
    kill `cat $THRIFT_FS_PID_FILE`
    rm $THRIFT_FS_PID_FILE
else
    echo no pidfile $THRIFT_FS_PID_FILE
fi

起動する際は dfs($HADOOP_HOME/bin/start-dfs.sh)が起動している状態で起動する必要があります。
(dfsadmin -safemode leave は適宜行ってください)

んで、このThriftFSは、こんな感じでリモート上の DFS のファイルを読み書きできます

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.fs.permission.FsPermission;

import org.apache.hadoop.thriftfs.api.Pathname;
import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
import org.apache.hadoop.thriftfs.api.ThriftHandle;
import org.apache.hadoop.thriftfs.api.ThriftIOException;

import com.facebook.thrift.TException;
import com.facebook.thrift.protocol.TBinaryProtocol;
import com.facebook.thrift.protocol.TProtocol;
import com.facebook.thrift.transport.TSocket;
import com.facebook.thrift.transport.TTransportException;

public class HDFSInput {
    
    public static void main(String...args) {
        final int defaultBufferSize = config.getInt("io.file.buffer.size", 4096);
        final long defaultBlockSize = config.getLong("dfs.block.size", FSConstants.DEFAULT_BLOCK_SIZE);
        final short defaultReplication = (short) config.getInt("dfs.replication", 3);

        TSocket socket = new TSocket("master1", 10010);
        TProtocol protocol = new TBinaryProtocol(socket);
        
        try {
            socket.open();
            try {
                ThriftHadoopFileSystem.Client client = new ThriftHadoopFileSystem.Client(protocol);
                // client timeout 5 sec
                client.setTimeout(5);
                Pathname hoge = new Pathname("/tmp/hoge");

                FsPermission permission = FsPermission.createImmutable((short) 0655);
                boolean overwrite = true;

                ThriftHandle writeHandler = client.createFile(hoge, permission.toShort(), overwrite, defaultBufferSize, defaultReplication, defaultBlockSize);
                client.write(writeHandler, "hello world");
                client.close(writeHandler);

                ThriftHandle readHandler = client.open(hoge);
                System.out.println(client.read(readHandler, 0, 1024));
                client.close(readHandler);
            } catch (TTransportException e) {
                e.printStackTrace();
            } catch (ThriftIOException e) {
                e.printStackTrace();
            }
        } catch (TException e) {
            e.printStackTrace();
        } finally {
            socket.close();
        }
    }
}

read時にBufferedReaderにwrapするともう少し便利に読み書きできる(これは今度書く)

んで、読み書きできるようになったんだけど、どうも複数のクライアントから連続して読み書きをすると、整合性が取れなくなってしまう(?)のかエラーがでるようになった。
元のソースを読むと...なんともエレガントな。。。

ということで、java.util.concurrent.atomic を使って書き直してみた(ここが本題)

package org.apache.hadoop.thriftfs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.thriftfs.api.Pathname;
import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
import org.apache.hadoop.thriftfs.api.ThriftHandle;
import org.apache.hadoop.thriftfs.api.ThriftIOException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;

import com.facebook.thrift.protocol.TBinaryProtocol;
import com.facebook.thrift.server.TServer;
import com.facebook.thrift.server.TThreadPoolServer;
import com.facebook.thrift.transport.TServerSocket;
import com.facebook.thrift.transport.TServerTransport;
import com.facebook.thrift.transport.TTransportFactory;

public class HadoopThriftServer extends ThriftHadoopFileSystem {

    static int serverPort = 0;                    // default port
    TServer    server = null;

    public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface
    {

      public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift");

      // HDFS glue
      Configuration conf;
      FileSystem fs;
          
      // stucture that maps each Thrift object into an hadoop object
      private AtomicLong nextId = new AtomicLong(new Random().nextLong());
      private ConcurrentHashMap<Long, Object> hadoopHash = new ConcurrentHashMap<Long, Object>();
      private Daemon inactivityThread = null;

      // Detect inactive session
      private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
      private static volatile long inactivityRecheckInterval = 60 * 1000;
      private static volatile boolean fsRunning = true;
      private AtomicLong now = new AtomicLong(now());

      // allow outsider to change the hadoopthrift path
      public void setOption(String key, String val) {
      }

      /**
       * Current system time.
       * @return current time in msec.
       */
      static long now() {
        return System.currentTimeMillis();
      }

      /**
      * getVersion
      *
      * @return current version of the interface.
      */
      public String getVersion() {
        return "0.1";
      }

      /**
       * shutdown
       *
       * cleanly closes everything and exit.
       */
      public void shutdown(int status) {
        LOG.info("HadoopThriftServer shutting down.");
        try {
          fs.close();
        } catch (IOException e) {
          LOG.warn("Unable to close file system");
        }
        Runtime.getRuntime().exit(status);
      }

      /**
       * Periodically checks to see if there is inactivity
       */
      class InactivityMonitor implements Runnable {
        public void run() {
          while (fsRunning) {
            try {
              if (now() > now.get() + inactivityPeriod) {
                LOG.warn("HadoopThriftServer Inactivity period of " +
                         inactivityPeriod + " expired... Stopping Server.");
                shutdown(-1);
              }
            } catch (Exception e) {
              LOG.error(StringUtils.stringifyException(e));
            }
            try {
              Thread.sleep(inactivityRecheckInterval);
            } catch (InterruptedException ie) {
            }
          }
        }
      }

      /**
       * HadoopThriftServer
       *
       * Constructor for the HadoopThriftServer glue with Thrift Class.
       *
       * @param name - the name of this handler
       */
      public HadoopThriftHandler(String name) {
        conf = new Configuration();
        now.set(now());
        try {
          inactivityThread = new Daemon(new InactivityMonitor());
          fs = FileSystem.get(conf);
        } catch (IOException e) {
          LOG.warn("Unable to open hadoop file system...");
          Runtime.getRuntime().exit(-1);
        }
      }

      /**
        * printStackTrace
        *
        * Helper function to print an exception stack trace to the log and not stderr
        *
        * @param e the exception
        *
        */
      static private void printStackTrace(Exception e) {
        for(StackTraceElement s: e.getStackTrace()) {
          LOG.error(s);
        }
      }

      /**
       * Lookup a thrift object into a hadoop object
       */
      private synchronized Object lookup(long id) {
        return hadoopHash.get(new Long(id));
      }

      /**
       * Insert a thrift object into a hadoop object. Return its id.
       */
      private synchronized long insert(Object o) {
        long next = nextId.incrementAndGet();
        hadoopHash.put(next, o);
        return next;
      }

      /**
       * Delete a thrift object from the hadoop store.
       */
      private synchronized Object remove(long id) {
        return hadoopHash.remove(new Long(id));
      }

      /**
        * Implement the API exported by this thrift server
        */

      /** Set inactivity timeout period. The period is specified in seconds.
        * if there are no RPC calls to the HadoopThrift server for this much
        * time, then the server kills itself.
        */
      public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
        inactivityPeriod = periodInSeconds * 1000; // in milli seconds
        if (inactivityRecheckInterval > inactivityPeriod ) {
          inactivityRecheckInterval = inactivityPeriod;
        }
      }


      /**
        * Create a file and open it for writing
        */
      public ThriftHandle create(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("create: " + path);
          FSDataOutputStream out = fs.create(new Path(path.pathname));
          long id = insert(out);
          ThriftHandle obj = new ThriftHandle(id);
          HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
          return obj;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
        * Create a file and open it for writing, delete file if it exists
        */
      public ThriftHandle createFile(Pathname path, 
                                     short mode,
                                     boolean  overwrite,
                                     int bufferSize,
                                     short replication,
                                     long blockSize) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("create: " + path +
                                       " permission: " + mode +
                                       " overwrite: " + overwrite +
                                       " bufferSize: " + bufferSize +
                                       " replication: " + replication +
                                       " blockSize: " + blockSize);
          FSDataOutputStream out = fs.create(new Path(path.pathname), 
                                             new FsPermission(mode),
                                             overwrite,
                                             bufferSize,
                                             replication,
                                             blockSize,
                                             null); // progress
          long id = insert(out);
          ThriftHandle obj = new ThriftHandle(id);
          HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
          return obj;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Opens an existing file and returns a handle to read it
       */
      public ThriftHandle open(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("open: " + path);
          FSDataInputStream out = fs.open(new Path(path.pathname));
          long id = insert(out);
          ThriftHandle obj = new ThriftHandle(id);
          HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
          return obj;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Opens an existing file to append to it.
       */
      public ThriftHandle append(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("append: " + path);
          FSDataOutputStream out = fs.append(new Path(path.pathname));
          long id = insert(out);
          ThriftHandle obj = new ThriftHandle(id);
          HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
          return obj;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * write to a file
       */
      public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("write: " + tout.id);
          FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
          byte[] tmp = data.getBytes("UTF-8");
          out.write(tmp, 0, tmp.length);
          HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
          return true;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * read from a file
       */
      public String read(ThriftHandle tout, long offset,
                         int length) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("read: " + tout.id +
                                       " offset: " + offset +
                                       " length: " + length);
          FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
          
          if (in.getPos() != offset) {
            in.seek(offset);
          }
          byte[] tmp = new byte[length];
          int numbytes = in.read(offset, tmp, 0, length);
          HadoopThriftHandler.LOG.debug("read done: " + tout.id);
          return new String(tmp, 0, numbytes, "UTF-8");
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Delete a file/directory
       */
      public boolean rm(Pathname path, boolean recursive) 
                            throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("rm: " + path +
                                       " recursive: " + recursive);
          boolean ret = fs.delete(new Path(path.pathname), recursive);
          HadoopThriftHandler.LOG.debug("rm: " + path);
          return ret;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Move a file/directory
       */
      public boolean rename(Pathname path, Pathname dest) 
                            throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("rename: " + path +
                                       " destination: " + dest);
          boolean ret = fs.rename(new Path(path.pathname), 
                                  new Path(dest.pathname));
          HadoopThriftHandler.LOG.debug("rename: " + path);
          return ret;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       *  close file
       */
       public boolean close(ThriftHandle tout) throws ThriftIOException {
         try {
           now.set(now());
           HadoopThriftHandler.LOG.debug("close: " + tout.id);
           Object obj = remove(tout.id);
           
           if (obj instanceof FSDataOutputStream) {
             FSDataOutputStream out = (FSDataOutputStream)obj;
             out.close();
           } else if (obj instanceof FSDataInputStream) {
             FSDataInputStream in = (FSDataInputStream)obj;
             in.close();
           } else {
             throw new ThriftIOException("Unknown thrift handle.");
           }
           HadoopThriftHandler.LOG.debug("closed: " + tout.id);
           return true;
         } catch (IOException e) {
           throw new ThriftIOException(e.getMessage());
         }
       }

       /**
        * Create a directory
        */
      public boolean mkdirs(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("mkdirs: " + path);
          boolean ret = fs.mkdirs(new Path(path.pathname));
          HadoopThriftHandler.LOG.debug("mkdirs: " + path);
          return ret;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Does this pathname exist?
       */
      public boolean exists(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("exists: " + path);
          boolean ret = fs.exists(new Path(path.pathname));
          HadoopThriftHandler.LOG.debug("exists done: " + path);
          return ret;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Returns status about the specified pathname
       */
      public org.apache.hadoop.thriftfs.api.FileStatus stat(
                              Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("stat: " + path);
          org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
                                             new Path(path.pathname));
          HadoopThriftHandler.LOG.debug("stat done: " + path);
          return new org.apache.hadoop.thriftfs.api.FileStatus(
            stat.getPath().toString(),
            stat.getLen(),
            stat.isDir(),
            stat.getReplication(),
            stat.getBlockSize(),
            stat.getModificationTime(),
            stat.getPermission().toString(),
            stat.getOwner(),
            stat.getGroup());
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * If the specified pathname is a directory, then return the
       * list of pathnames in this directory
       */
      public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
                              Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("listStatus: " + path);

          org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
                                             new Path(path.pathname));
          HadoopThriftHandler.LOG.debug("listStatus done: " + path);
          org.apache.hadoop.thriftfs.api.FileStatus tmp;
          List<org.apache.hadoop.thriftfs.api.FileStatus> value = 
            new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>();

          for (int i = 0; i < stat.length; i++) {
            tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
                        stat[i].getPath().toString(),
                        stat[i].getLen(),
                        stat[i].isDir(),
                        stat[i].getReplication(),
                        stat[i].getBlockSize(),
                        stat[i].getModificationTime(),
                        stat[i].getPermission().toString(),
                        stat[i].getOwner(),
                        stat[i].getGroup());
            value.add(tmp);
          }
          return value;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Sets the permission of a pathname
       */
      public void chmod(Pathname path, short mode) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("chmod: " + path + 
                                       " mode " + mode);
          fs.setPermission(new Path(path.pathname), new FsPermission(mode));
          HadoopThriftHandler.LOG.debug("chmod done: " + path);
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Sets the owner & group of a pathname
       */
      public void chown(Pathname path, String owner, String group) 
                                                         throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("chown: " + path +
                                       " owner: " + owner +
                                       " group: " + group);
          fs.setOwner(new Path(path.pathname), owner, group);
          HadoopThriftHandler.LOG.debug("chown done: " + path);
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Sets the replication factor of a file
       */
      public void setReplication(Pathname path, short repl) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("setrepl: " + path +
                                       " replication factor: " + repl);
          fs.setReplication(new Path(path.pathname), repl);
          HadoopThriftHandler.LOG.debug("setrepl done: " + path);
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }

      }

      /**
       * Returns the block locations of this file
       */
      public List<org.apache.hadoop.thriftfs.api.BlockLocation> 
               getFileBlockLocations(Pathname path, long start, long length) 
                                           throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path);

          org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
                                                   new Path(path.pathname));

          org.apache.hadoop.fs.BlockLocation[] stat = 
              fs.getFileBlockLocations(status, start, length);
          HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path);

          org.apache.hadoop.thriftfs.api.BlockLocation tmp;
          List<org.apache.hadoop.thriftfs.api.BlockLocation> value = 
            new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>();

          for (int i = 0; i < stat.length; i++) {

            // construct the list of hostnames from the array returned
            // by HDFS
            List<String> hosts = new LinkedList<String>();
            String[] hostsHdfs = stat[i].getHosts();
            for (int j = 0; j < hostsHdfs.length; j++) {
              hosts.add(hostsHdfs[j]);
            }

            // construct the list of host:port from the array returned
            // by HDFS
            List<String> names = new LinkedList<String>();
            String[] namesHdfs = stat[i].getNames();
            for (int j = 0; j < namesHdfs.length; j++) {
              names.add(namesHdfs[j]);
            }
            tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
                        hosts, names, stat[i].getOffset(), stat[i].getLength());
            value.add(tmp);
          }
          return value;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }
    }

    // Bind to port. If the specified port is 0, then bind to random port.
    private ServerSocket createServerSocket(int port) throws IOException {
      try {
        ServerSocket sock = new ServerSocket();
        // Prevent 2MSL delay problem on server restarts
        sock.setReuseAddress(true);
        // Bind to listening port
        if (port == 0) {
          sock.bind(null);
          serverPort = sock.getLocalPort();
        } else {
          sock.bind(new InetSocketAddress(port));
        }
        return sock;
      } catch (IOException ioe) {
        throw new IOException("Could not create ServerSocket on port " + port + "." +
                              ioe);
      }
    }

    /**
     * Constrcts a server object
     */
    public HadoopThriftServer(String [] args) {

      if (args.length > 0) {
        serverPort = new Integer(args[0]);
      }
      try {
        ServerSocket ssock = createServerSocket(serverPort);
        TServerTransport serverTransport = new TServerSocket(ssock);
        Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
        ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler);
        TThreadPoolServer.Options options = new TThreadPoolServer.Options();
        options.minWorkerThreads = 10;
        server = new TThreadPoolServer(processor, serverTransport,
                                               new TTransportFactory(),
                                               new TTransportFactory(),
                                               new TBinaryProtocol.Factory(),
                                               new TBinaryProtocol.Factory(), 
                                               options);
        System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]...");
        HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]...");
        System.out.flush();

      } catch (Exception x) {
        x.printStackTrace();
      }
    }

    public static void main(String [] args) {
      HadoopThriftServer me = new HadoopThriftServer(args);
      me.server.serve();
    }
}

安定した。気がする。。
いつか問題となるようなコードを書いてpatchを作ろう

Hive の Local Metastore に derby を使う

ポスト @ 2:37:53 , 修正 @ 2010/02/07 2:39:17 | , ,     

突然ですが、hadoop の話題

Hadoop Hive といえば、SQL 感覚で MapReduce できるんですが、Hive は SQL のように記述できるようにするために、metastore 形式でメタデータを管理してます。
そのあたりは、Hive/AdminManual/MetastoreAdmin - Hadoop Wikimetastore_usage.pptxHiveのmetastoreをMySQLを使ってLocal Metastore形式で利用する - blog.katsuma.tv などに詳しく書かれているので省略。

ただ、local metastore にいちいち mysql をセットアップするのもあれだったんで、今回は derby にしておきます

derby編

derby のインストール

Apache Derbyからダウンロードしてきます。とりあえず最新のを落としてくる

> wget http://..../db-derby-10.5.3.0-bin.tar.gz
> tar xzvf db-derby-10.5.3.0-bin.tar.gz
> sudo mv db-derby-10.5.3.0-bin /opt/local/derby

次に、データ保存用の /var/db/derby を作っておく

> sudo mkdir /var/db/derby
> sudo chown nowel:users /var/db/derby

chown は derby 用のユーザでもいいので、実行ユーザに変えておく

環境変数の設定

環境変数名とかは、Hadoop のそれっぽくしておく

#!/usr/bin/env bash

export DERBY_HOME=/opt/local/derby
export DERBY_OPTS="-Dderby.drda.startNetworkServer=true -Dderby.drda.maxThreads=30 -Dderby.system.home=/var/db/derby -Dderby.drda.portNumber=1527 -Dderby.drda.host=0.0.0.0"
export DERBY_LOG_DIR=/var/log

これを $HOME/.derby_profile とかしておいて、.bashrcなんかに

source $HOME/.derby_profile

と記述しておくといちいち source しなくて楽

起動用のスクリプトを用意

start/stop/status くらいは何度か確認することがあるので、こんな感じで用意

start.sh

#!/usr/bin/env bash

logfile=$DERBY_LOG_DIR/derby.log

nohup $DERBY_HOME/bin/NetworkServerControl start > $logfile 2>&1 < /dev/null &

stop.sh

#!/usr/bin/env bash

$DERBY_HOME/bin/NetworkServerControl shutdown

status.sh

#!/usr/bin/env bash

$DERBY_HOME/bin/NetworkServerControl sysinfo
$DERBY_HOME/bin/NetworkServerControl runtimeinfo

とこんな感じ。

起動と停止

さっき用意した start.sh ファイル群を $HOME/bin とかに置いているのであれば

> $HOME/bin/start.sh

で起動できる

ちなみに、derbyはstartNetworkServerというスクリプトが用意されているけど、素のまま起動すると、ネットワーク越しにアクセスできない(正確には起動した同一ホストからしかアクセスできない)

ネットワーク越しに利用するならstartNetworkServer -h 0.0.0.0とするか、derby.drda.host=0.0.0.0みたいな変数を利用する。
(ここで結構ハマった...とりあえず、開発用なら 0.0.0.0 で始めるといいかと)

Hive編

hadoop の mapred/hdfs などはセットアップ済みとして進めます。

hive のセットアップ

先に書いておくと、http://www.apache.org/dyn/closer.cgi/hadoop/hive/とかに置いてある hive-0.4.1 を使ってセットアップを進めても

FAILED: Error in metadata: org.datanucleus.jdo.exceptions.TransactionNotReadableException: Cant read fields outside of transactions. You may want to set 'NontransactionalRead=true'.
FailedObject:1[OID]org.apache.hadoop.hive.metastore.model.MDatabase
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask

が発生してなかなか前に進めなくなる。
Hive CLI の起動時に

hive > set javax.jdo.option.NontransactionalRead=true;

としておくことで、一時的になんとかなるけど、接続毎にやることになるので、おすすめはしません。
(jpox.properties を使うというのもあるけど、設定の二重管理になりそうだし、後述する新しいのでは必要なさそうなので今回は 0.4.1 を使わないという方向で進みます)

Hiveをtrunkからもってきてビルドする

たぶん、trunkに入ってるのは hive-0.5.x 系

> svn export http://svn.apache.org/repos/asf/hadoop/hive/trunk hive
> cd hive
> ant package
> :
> : (しばし待つ)
> :
> cp -r build/dist/lib/* lib
> sudo mv hive /opt/local/hive-0.5.0-trunk

環境変数の設定

こんな感じで用意する

#!/usr/bin/env bash

export HIVE_HOME=/opt/local/hive-0.5.0-trunk
export HIVE_CONF_DIR=/home/hive/conf
export HIVE_PID_DIR=/var/run
export HIVE_LOG_DIR=/var/log

HIVE_CONF_DIRHADOOP_CONF_DIRと違って、hive-default.xml を HIVE-HOME/conf から読んでくれない(?)ので、hive-default.xmlとhive-log4j.propertiesは HIVE_CONF_DIR にシンボリックリンクしておく

> ln -s $HIVE_HOME/conf/hive-default.xml $HIVE_CONF_DIR/hive-default.xml
> ln -s $HIVE_HOME/conf/hive-log4j.properties $HIVE_CONF_DIR/hive-log4j.properties

hive-site.xmlを用意

$HIVE_CONF_DIR に hive-site.xml をこんな感じで用意

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>hive.metastore.local</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/hive/warehouse</value>
    </property>
    <property>
        <name>hive.metastore.rawstore.impl</name>
        <value>org.apache.hadoop.hive.metastore.ObjectStore</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:derby://master1:1527/metastore;create=true</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>org.apache.derby.jdbc.ClientDriver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value><!-- !empty --></value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value><!-- !empty --></value>
    </property>
    <property>
        <name>datanucleus.autoCreateTables</name>
        <value>true</value>
    </property>
</configuration>

Hive CLIの起動

とりあえず、derby の metastorage が使えるかどうかを確認するため、$HIVE_HOME/bin/hive で Hive CLI を起動して show tables する

> $HIVE_HOME/bin/hive
hive> show tables;
OK
Time taken: 8.613 seconds
hive> exit;

とここまで出れば derby で Local Metastore が使えるようになってます。

おわり

と、ここまで書いてみたけど、Hiveのwikiに同じようなものがあった orz
ref - HiveDerbyServerMode - Hadoop Wiki

2010/01/20

PHPで200行で作る memcached 互換サーバ

ポスト @ 22:00:52 | ,     

まさに誰得。

class MemcachedServer {
    protected $command;
    public function __construct(MemcachedCommand $command){
        $this->command = $command;
    }
    public function start(){
        $this->run();
    }
    public function run(){
        $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        if(false === $socket){
            $code = socket_last_error();
            $msg = socket_strerror($code);
            throw new Exception(sprintf('socket_create was error(%s):%s', $code, $msg));
        }
        socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);

        $binded = @socket_bind($socket, 0, 11222);
        if(false === $binded){
            $code = socket_last_error();
            $msg = socket_strerror($code);
            throw new Exception(sprintf('socket_bind was error(%s):%s', $code, $msg));
        }
        $listend = @socket_listen($socket);
        if(false === $listend){
            $code = socket_last_error();
            $msg = socket_strerror($code);
            throw new Exception(sprintf('socket_listen was error(%s):%s', $code, $msg));
        }
        //socket_set_nonblock($socket);
       
        echo 'server start on tcp://0.0.0.0:11222', PHP_EOL;
        $read = array($socket);
        $write = null;
        $except = null;
        while(true){
            $accept = @socket_accept($socket);
            if(false === $accept){
                continue;
            }
            $handler = new MemcachedAcceptHandler($accept, $this->command);
            $handler->init();
            $handler->execute();
            $handler->destroy();
        }
    }
}

interface StreamReadWrite {
    const DELIMITER = "\r\n";
    public function readLine();
    public function writeLine($str);
}

class MemcachedAcceptHandler implements StreamReadWrite {
    protected $socket;
    protected $command;
    protected $connected = true;
    public function __construct($socket, MemcachedCommand $command){
        $this->socket = $socket;
        $this->command = $command;
    }
    public function __destruct(){
        if(null !== $this->socket){
            @socket_close($this->socket);
            unset($this->socket);
        }
    }
    public function init(){
        echo 'new connection', PHP_EOL;
        //socket_set_nonblock($this->socket);
    }
    public function execute(){
        while($this->connected){
            $read = array($this->socket);
            $write = array();
            $except = array();
            $select = @socket_select($read, $write, $except, 1);
            if(false === $select){
                throw new RuntimeException('socket_select');
            }
            if($select < 1){
                continue;
            }

            $line = $this->readLine();
            if(null === $line){
                continue;
            }
            // get hoge => get
            $mode = substr($line, 0, 3);
            // get hoge foo => array(hoge, foo)
            $args = explode(' ', substr($line, 4));
            $this->command->call($this, $mode, $args);
        }
    }
    public function destroy(){
        echo 'close connection', PHP_EOL;
        @socket_shutdown($this->socket);
    }
    public function readLine(){
        $line = '';
        while(true){
            $buf = @socket_read($this->socket, 1);
            if(false === $buf || '' === $buf){
                // FIXME!!!
                $this->connected = false;
                return null;
            }
            $line .= $buf;
            if(self::DELIMITER == substr($line, -2)){
                $line = substr($line, 0, -2);
                break;
            }
        }
        if(empty($line)){
            return null;
        }
        if(preg_match('/^\s+$/', $line)){
            return null;
        }
        return $line;
    }
    public function writeLine($str){
        return @socket_write($this->socket, $str . self::DELIMITER);
    }
}

interface MemcachedCommand {
    public function call(StreamReadWrite $reader, $mode, array $args);
}

abstract class AbstractMemcachedCommand implements MemcachedCommand {
    protected $reflector;
    public function __construct(){
        $this->reflector = new ReflectionObject($this);
    }
    protected static function concat(array $a, array $b){
        array_splice($a, count($a), 0, $b);
        return $a;
    }
    public final function call(StreamReadWrite $rw, $mode, array $args){
        if(!$this->reflector->hasMethod($mode)){
            return $this->error($rw);
        }
        echo 'command => ', $mode, ' ', join(' ', $args), PHP_EOL;
        return call_user_func_array(array($this, $mode), self::concat(array($rw), $args));
    }
    protected function error($rw){
        $rw->writeLine('ERROR');
    }
    protected abstract function get(StreamReadWrite $rw, $keys);
    protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length);
    protected abstract function delete(StreamReadWrite $rw, $key, $expire = 0);
}
class StorageMemcacheCommand extends AbstractMemcachedCommand {
    protected $cache = array();
    protected function get(StreamReadWrite $rw, $keys){
        $args = func_get_args();
        array_shift($args);
        foreach($args as $key){
            if(!isset($this->cache[$key])){
                continue;
            }
            $value = $this->cache[$key];
            if($value->expire < time()){
                continue;
            }
            $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->length));
            $rw->writeLine($value->value);
        }
        $rw->writeLine('END');
    }
    protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){
        $value = new stdClass;
        $value->flag = $flag;
        $value->expire = time() + $expire;
        $value->length = $length;
        $value->value = $rw->readLine();
        $this->cache[$key] = $value;
        $rw->writeLine('STORED');
    }
    protected function delete(StreamReadWrite $rw, $key, $expire = 0){
        if(isset($this->cache[$key])){
            $this->cache[$key]->expire = $expire;
        }
    }
}

$server = new MemcachedServer(new StorageMemcacheCommand);
$server->start();

とりあえず、set/get/deleteだけ。threadとか色々ないので、実際には削除されない。。
他にも構文解析とかしてない(しなくていいくらい memcache はシンプルなんだけど)ので上手いことハンドリングしないといけない。。
同時アクセスに難あり。。

(中略)

などなど、色々あるので、開発用にしか向いてない。というか、よっぽどストイックにPHPを愛している人じゃないと向いていないかも。
まぁ最近は色々memcached互換系のがでてきたので、クライアント作成時のdebug用途向けかも。

今回は、PHPのsocket 関数をゴリゴリ使っています。
blocking modeとかはちょっと動きが不安定(?)なので、あまり性能はでない
c-lang版と比較を、この前のスクリプトを使ってみた

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 11222)
);
foreach($target as $t){
    $memcache = new Memcache;
    $memcache->connect($t['host'], $t['port']);

    $fail = 0;
    $fails = array();
    $elapsed = microtime(true);
    $count = 500;
    for($i = 0; $i < $count; ++$i){
        if(false === $memcache->set('hoge', 1234, 0, 10)){
            echo 'ERROR!!', PHP_EOL;
        }
        if(false === $memcache->set('hoge', 123, 0, 10)){
            echo 'ERROR!!!', PHP_EOL;
        }
        $value = (int) $memcache->get('hoge');
        if(false === $memcache->set('hoge', ((int)$value + 1), 0, 10)){
            echo 'ERROR!!!!', PHP_EOL;
        }
        $result = (int)$memcache->get('hoge');
        if($result != 124){
            $fails[] = $result;
            $fail++;
        }
    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
    echo 'fail => ', $fail, PHP_EOL;
}
target host => localhost port =>11211
elapsed: 0.634283065796
fail => 0

target host => localhost port =>11222
elapsed: 1.12030887604
fail => 0

とまあ、こんな感じ。(そこそこ?)
ちなみに、stream_socketを使って書いてみたものも置いておく。fwriteまわりでハマったので動かないと思うけど。

// 動かない><
class MemcachedServer {
    protected $command;
    public function __construct(MemcachedCommand $command){
        $this->command = $command;
    }
    public function start(){
        $this->run();
    }
    public function run(){
        $socket = stream_socket_server('tcp://0.0.0.0:11222', $code, $msg, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
        if(false === $socket){
            throw new RuntimeException(sprintf('stream_socket_server was error(%s):%s', $code, $msg));
        }
        $nonBlocking = stream_set_blocking($socket, 0);
        if(false === $nonBlocking){
            throw new RuntimeException('stream_set_blocking: set blocking mode error');
        }

        echo 'server start on tcp://0.0.0.0:11222', PHP_EOL;
        $read = array($socket);
        $write = null;
        $except = null;
        while(true){
            $accept = @stream_socket_accept($socket, 1);
            if(false === $accept){
                continue;
            }
            $handler = new MemcachedAcceptHandler($accept, $this->command);
            $handler->init();
            $handler->execute();
            $handler->destroy();
        }
    }
}

interface StreamReadWrite {
    const DELIMITER = "\r\n";
    public function readLine();
    public function writeLine($str);
}

class MemcachedAcceptHandler implements StreamReadWrite {
    protected $socket;
    protected $command;
    public function __construct($socket, MemcachedCommand $command){
        $this->socket = $socket;
        $this->command = $command;
    }
    public function __destruct(){
        if(null !== $this->socket){
            fclose($this->socket);
            unset($this->socket);
        }
    }
    public function init(){
        echo 'new connection', PHP_EOL;
        stream_set_blocking($this->socket, true);
        stream_set_timeout($this->socket, 60);
        // disable writebuffer
        stream_set_write_buffer($this->socket, 0);
    }
    protected function feof(){
        $recv = stream_socket_recvfrom($this->socket, 1, STREAM_PEEK);
        return strlen($recv) < 1;
    }
    public function execute(){
        while(true){
            $read = array($this->socket);
            $write = array();
            $except = array();
            $select = @stream_select($read, $write, $except, 1);
            if(false === $select){
                throw new RuntimeException('stream_select');
            }
            if($select < 1){
                continue;
            }

            $line = $this->readLine();
            echo 'line => ', $line, PHP_EOL;
            if(null === $line){
                return;
            }
            // get hoge => get
            $mode = substr($line, 0, 3);
            // get hoge foo => array(hoge, foo)
            $args = explode(' ', substr($line, 4));
            $this->command->call($this, $mode, $args);
        }
    }
    public function destroy(){
        echo 'close connection', PHP_EOL;
        // socket_close($socket)
        stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
    }
    public function readLine(){
        // 1048576 = 1024 * 1024
        //socket_read($socket, 1048576, PHP_NORMAL_READ
        $line = stream_get_line($this->socket, 2048, self::DELIMITER);
        if(empty($line)){
            return null;
        }
        if(preg_match('/^\s+$/', $line)){
            return null;
        }
        return $line;
    }
    public function writeLine($str){
        fputs($this->socket, $str . "\n\0");
    }
}

interface MemcachedCommand {
    public function call(StreamReadWrite $reader, $mode, array $args);
}

abstract class AbstractMemcachedCommand implements MemcachedCommand {
    protected $reflector;
    public function __construct(){
        $this->reflector = new ReflectionObject($this);
    }
    protected static function concat(array $a, array $b){
        array_splice($a, count($a), 0, $b);
        return $a;
    }
    public final function call(StreamReadWrite $rw, $mode, array $args){
        if(!$this->reflector->hasMethod($mode)){
            return $this->error($rw);
        }
        return call_user_func_array(array($this, $mode), self::concat(array($rw), $args));
    }
    protected function error($rw){
        $rw->writeLine('ERROR');
    }
    protected abstract function get(StreamReadWrite $rw, $keys);
    protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length);
}
class StorageMemcacheCommand extends AbstractMemcachedCommand {
    protected $cache = array();
    protected function get(StreamReadWrite $rw, $keys){
        $args = func_get_args();
        array_shift($args);
        foreach($args as $key){
            if(!isset($this->cache[$key])){
                continue;
            }
            $value = $this->cache[$key];
            $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->expire));
            $rw->writeLine($value->value);
        }
        $rw->writeLine('END');
    }
    protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){
        $value = new stdClass;
        $value->flag = $flag;
        $value->expire = $expire;
        $value->length = $length;
        $value->value = $rw->readLine();
        $this->cache[$key] = $value;
        $rw->writeLine('STORED');
    }

}

$server = new MemcachedServer(new StorageMemcacheCommand);
$server->start();

memcacheの cache の部分を java で(その2)

ポスト @ 0:21:29 , 修正 @ 2010/01/20 0:32:43 | ,     

前回の続き。

前回の結果、とりあえず、DelayQueueによって期限切れのEntryは取り出せるようになった。
だけど、この仕組みの状態では期限切れになったEntryは容赦なく消えていってしまう。
つまり、そのEntryがまだ利用されるかもしれないのに、消えてしまうのは(色々な意味で)もったいない。
特に、javaだとHashMapとかのloadFactorまわりの動きは(きっと)もったいない

もう一度LRUについてwikipediaに確認すると

Least Recently Used (LRU) はキャッシュメモリや仮想メモリが扱うデータのリソースへの割り当てを決定するアルゴリズムである。対義語はMost Recently Used (MRU)。
和訳すると「最近最も使われなかったもの」つまり「使われてから最も長い時間が経ったもの」「参照される頻度が最も低いもの」である。

ということなので(?)、ホントに使われ無かったもの(アクセスが少ないもの)から順番に消えるように考えてみた。

その結果がこれ

public class LRUCache implements CacheLifeCycle {
    
    private static final Log log = LogFactory.getLog(LRUCache.class);
    
    protected final ConcurrentMap<String, PrioritalEntry> cache = new ConcurrentHashMap<String, PrioritalEntry>();

    protected final DelayQueue<PrioritalEntry> expiredQueue =  new DelayQueue<PrioritalEntry>();
    
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
    private final Lock writeLock = lock.writeLock();
    
    private final Lock readLock = lock.readLock();
    
    public void register(LifeCycleExecutor executor){
        executor.add(this, expiredQueue);
    }
    
    public void purge(PrioritalEntry entry){
        writeLock.lock();
        try {
            // priorityを下げる
            if(entry.decrementPriority() < 1){
                // 1以下なら誰も使ってなさそうなので消す
                if(log.isDebugEnabled()){
                    log.debug("purge entry => " + entry);
                }
                cache.remove(entry.getKey());
                return;
            }
            // もう一度チャンス
            entry.setExpiredAt(retransmission(entry.getPriority()));
            expiredQueue.offer(entry);
        } finally {
            writeLock.unlock();
        }
    }
    
    public boolean set(String key, String value, long expiredAt){
        writeLock.lock();
        try {
            PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt);
            PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry);
            // 既存の値がない
            if(null == previousEntry){
                // ということは新しい値
                return expiredQueue.offer(newEntry);
            }
            previousEntry.setValue(value);
            previousEntry.setExpiredAt(expiredAt);
            previousEntry.incrementPriority();
            return true;
        } finally {
            writeLock.unlock();
        }
    }

    public PrioritalEntry get(String key) {
        readLock.lock();
        try {
            if(!cache.containsKey(key)){
                return null;
            }
            
            PrioritalEntry entry = cache.get(key);
            // 時間切れなので見えなくする
            if(entry.isExpired()){
                return null;
            }
            // 使う人がいたのでpriorityをあげる
            entry.incrementPriority();
            return entry;
        } finally {
            readLock.unlock();
        }
    }

    public void remove(String key) {
        remove(key, 0);
    }
    
    public void remove(String key, long expiredAt){
        writeLock.lock();
        try {
            PrioritalEntry entry = cache.get(key);
            if(null != entry){
                entry.setExpiredAt(expiredAt);
                entry.decrementPriority();
            }
        } finally {
            writeLock.unlock();
        }
    }
    
    public void flush() {
        flush(0);
    }
    
    public void flush(final long expiredAt){
        writeLock.lock();
        try {
            Iterator<Map.Entry<String, PrioritalEntry>> entries = cache.entrySet().iterator();
            while(entries.hasNext()){
                final Map.Entry<String, PrioritalEntry> entry = entries.next();
                final PrioritalEntry value = entry.getValue();
                value.setExpiredAt(expiredAt);
            }
        } finally {
            writeLock.unlock();
        }
    }
    
    protected static long retransmission(final int currentPriority){
        if(currentPriority < PrioritalEntry.MAX_PRIORITY){
            // binary exponential backoff
            long freq = currentPriority + Math.round(Math.pow(2, currentPriority));
            return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority);
        }
        return retransmission(PrioritalEntry.MAX_PRIORITY - 1);
    }
}

んで、PrioritalEntryの実装はこれ

public class PrioritalEntry implements Delayed {
    
    public static final int MAX_PRIORITY = 10;
    
    public static final int DEFAULT_PRIORITY = 5;
    
    private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY);
    
    private final String key;
    
    private final AtomicReference<String> value;
    
    private final AtomicLong expiredAt;
    
    public PrioritalEntry(final String key, final String value, final long expiredAt){
        this.key = key;
        this.value = new AtomicReference<String>(value);
        if(0 == expiredAt){
            this.expiredAt = new AtomicLong(Long.MAX_VALUE);
        } else if(expiredAt < 0) {
            this.expiredAt = new AtomicLong(0);
        } else {
            this.expiredAt = new AtomicLong(relative(expiredAt));
        }
    }

    protected static long relative(long expiredAt){
        // 現在時間 + 指定時間
        return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    protected static long absolute(long expiredAt){
        return TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    public int incrementPriority(){
        if(MAX_PRIORITY < priority.get()){
            return MAX_PRIORITY;
        }
        return priority.incrementAndGet();
    }
    
    public int decrementPriority(){
        return priority.decrementAndGet();
    }
    
    public int getPriority(){
        return priority.get();
    }

    public String getKey(){
        return key;
    }
    
    public String getValue(){
        return value.get();
    }
    
    public void setValue(String value){
        this.value.set(value);
    }
    
    public void setExpiredAt(long newValue){
        // 現在時間より次の期限を設定
        this.expiredAt.set(relative(newValue));
    }

    private long elapsed(){
        return expiredAt.get() - System.currentTimeMillis();
    }
    
    public boolean isExpired(){
        return elapsed() < 1;
    }

    public long getDelay(TimeUnit unit) {
        // expiredの時間から経過時間を引き、残り時間を算出
        return unit.convert(elapsed(), TimeUnit.MILLISECONDS);
    }
    
    public int compareTo(Delayed o) {
        PrioritalEntry target = (PrioritalEntry) o;
        final long x = expiredAt.get();
        final long y = target.expiredAt.get();
        if(x < y){
            return -1;
        }
        if(x > y){
            return 1;
        }
        return 0;
    }
    
    public String toString(){
        StringBuilder buf = new StringBuilder();
        buf.append("key=").append(key).append(",");
        buf.append("value=").append(value).append(",");
        buf.append("priority=").append(priority);
        return buf.toString();
    }
    
}

他にも

public interface CacheLifeCycle {
    public void register(LifeCycleExecutor executor);

    public void purge(PrioritalEntry entry);
}

public class LifeCycleExecutor {
    
    protected final ExecutorService executor;
    
    public LifeCycleExecutor(final ExecutorService executor){
        this.executor = executor;
    }
    
    public void add(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){
        executor.execute(new Monitor(lifeCycle, queue));
    }
    
    public void shutdown(){
        executor.shutdown();
        try {
            if(!executor.awaitTermination(10, TimeUnit.SECONDS)){
                executor.shutdownNow();
            }
        } catch(InterruptedException e){
            e.printStackTrace(System.err);
        }
    }
    
    private static class Monitor implements Runnable {
        private static final Log log = LogFactory.getLog(Monitor.class);
        private final CacheLifeCycle lifeCycle;
        private final BlockingQueue<PrioritalEntry> queue;
        private Monitor(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){
            this.lifeCycle = lifeCycle;
            this.queue = queue;
        }
        public void run(){
            try {
                while(true){
                    PrioritalEntry entry = queue.take();
                    if(log.isDebugEnabled()){
                        log.debug("find entry => " + entry);
                    }
                    lifeCycle.purge(entry);
                }
            } catch(InterruptedException e){
            }
        }
    }

}

細かく解説していく(誰に?)と

まず、PrioritalEntry(スペルとか意味は気にしない)の実装側から

ほとんどの実装は前回のと同じ。
今回は他にpriority(優先度)も一緒に持ってみた
インスタンス生成時は

    public static final int MAX_PRIORITY = 10;
    
    public static final int DEFAULT_PRIORITY = 5;
    
    private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY);

で作られるんだけど、LRUCacheのgetとかの呼び出しが行われる毎に、priorityをincrementしてる

// class LRUCache
    public PrioritalEntry get(String key) {
        readLock.lock();
        try {
            if(!cache.containsKey(key)){
                return null;
            }
            
            PrioritalEntry entry = cache.get(key);
            // 時間切れなので見えなくする
            if(entry.isExpired()){
                return null;
            }
            // 使う人がいたのでpriorityをあげる
            entry.incrementPriority();
            return entry;
        } finally {
            readLock.unlock();
        }
    }

// class PrioritalEntry
    public int incrementPriority(){
        if(MAX_PRIORITY < priority.get()){
            return MAX_PRIORITY;
        }
        return priority.incrementAndGet();
    }
    
    public int decrementPriority(){
        return priority.decrementAndGet();
    }
    
    public int getPriority(){
        return priority.get();
    }

これで、最も "使われる頻度の高い" Entryは "priorityが高い" という状態を作り出してる。

その他にも、setで値が更新される際にも、既に同じkeyでEntryが作られてたらその部分を "再利用" してみた

    public boolean set(String key, String value, long expiredAt){
        writeLock.lock();
        try {
            PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt);
            PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry);
            // 既存の値がない
            if(null == previousEntry){
                // ということは新しい値
                return expiredQueue.offer(newEntry);
            }
            previousEntry.setValue(value);
            previousEntry.setExpiredAt(expiredAt);
            previousEntry.incrementPriority();
            return true;
        } finally {
            writeLock.unlock();
        }
    }

ということで、priorityという値によって最近使われているかどうかまで、表現できたので、次は値の削除について

    public void purge(PrioritalEntry entry){
        writeLock.lock();
        try {
            // priorityを下げる
            if(entry.decrementPriority() < 1){
                // 1以下なら誰も使ってなさそうなので消す
                if(log.isDebugEnabled()){
                    log.debug("purge entry => " + entry);
                }
                cache.remove(entry.getKey());
                return;
            }
            // もう一度チャンス
            entry.setExpiredAt(retransmission(entry.getPriority()));
            expiredQueue.offer(entry);
        } finally {
            writeLock.unlock();
        }
    }

purgeというメソッドでは、LifeCycleExecutorによって、DelayQueueの中身が取り出された "期限切れ" の Entry が引数に渡される

        public void run(){
            try {
                while(true){
                    PrioritalEntry entry = queue.take();
                    if(log.isDebugEnabled()){
                        log.debug("find entry => " + entry);
                    }
                    lifeCycle.purge(entry);
                }
            } catch(InterruptedException e){
            }
        }

期限切れの Entry であっても、すぐに削除は行わずに "再利用" されるチャンスを与えるため、期限切れの Entry であっても Delay Queue に再送(retransmission)している。
ただ単純に再送するのではなく、再送のアルゴリズムとして昔どこかで覚えた "binary exponential backoff" を使ってみた(うろ覚えバージョン)

// binary exponential backoff
long freq = currentPriority + Math.round(Math.pow(2, currentPriority));
return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority);

これを実行してみるとこんな時間が求められる

retransmission(0) => 1
retransmission(1) => 3
retransmission(2) => 5
retransmission(3) => 10
retransmission(4) => 19
retransmission(5) => 33
retransmission(6) => 62
retransmission(7) => 119
retransmission(8) => 232
retransmission(9) => 457
:
:
:

これは再送時間を作るときに、本来ならリトライ回数を使うんだけど(リトライの数が多いほど、次回の再送時間までの待ちが増える、よってこまめに再送させない)、ここでは priority を使用している。
これによって、priorityが高いものほど、時間切れでも生存時間を多くすることができる。これによって再利用のチャンスが増えるはず。(priorityが高い = 最も再利用される可能性が高そう)

ということで、簡単に動かしてみる

LRUCache cache = new LRUCache();
LifeCycleExecutor executor = new LifeCycleExecutor(Executors.newCachedThreadPool());
cache.register(executor);

cache.set("hoge", "123", 5);
cache.set("foo", "456", 2);
cache.set("bar", "789", 2);

cache.get("foo");

try {
    TimeUnit.SECONDS.sleep(120);
} catch(InterruptedException e){
}

executor.shutdown();

で、これを動かしてみると。。。

LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=6
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=5
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=5
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=4
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=4
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=3
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=3
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=5
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=2
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=2
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=1
LRUCache: purge entry => key=bar,value=789,priority=0
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=1
LRUCache: purge entry => key=hoge,value=123,priority=0
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=4
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=3
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=2
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=1
LRUCache: purge entry => key=foo,value=456,priority=0

アクセスのあった "foo" キーは、 同じタイムアウトが設定されている "bar" よりも後に削除されている。
これでなんちゃって LRU はできた(?)

ちなみに、writeLockとかreadLockなどはたぶん使ってない。というか使わなくてもいいはず。

ちなみに、本家 memcached は slab allocationとかを使っているので、メモリの効率からいうとそっちの方がいいかも。
こういった部分を効率よく切り替えれるようにしたいものです

次回は、分散ハッシュ(というか複数台で連携)など。

2010/01/17

最近の我が家の電源まわり事情

ポスト @ 20:39:11 , 修正 @ 2010/01/17 21:21:32 | ,     

最近、我が家の電源まわりが結構すっきりしてきたので、一部紹介しようかと思います

電源タップがすっきりした!

以前はごちゃごちゃしていた、電源まわりが↓のように非常にすっきりしてます。

というのも、ここ最近あらゆる電源まわりがUSB(mini-usb/micro-usb)経由で充電できるものが増えてきたため、USBハブに電源まわりを一元化させてます。

(タコ足ならぬタコUSB状態)

ネット環境もイーモバのPocketWifiにすることで、大きかった(十分小さいけど)、バッファローのルータなどはおさらばです。
でも、その代わりにUSBで電源を供給できるようになった!

100円ショップを活用

ほぼ全てのガジェット関係はUSBから電源を供給できるようになっているものの、一部、付属してなかったりオプションだったりするのですが
最近の100円ショップは進化してます。
USB部分が、mini-usb化されていて、必要なオプションだけ選んで買えというスタイル。

(先端の小さいフタみたいのを個別に買ってくる。これでDSもUSBから電源を供給している)

そうそう。こんな感じのが100円ショップで売ってる

しかも100円ショップでも、1メートル級のUSBも売っていたりすので、電源から遠い場所でも安心!

(びよーん。比較にipod mini置いてみた)

なんでもUSB

ただ、100円ショップは微妙に品揃えが悪かったりする。例えば、iPod関係のUSBポートが売ってなかったりする。。
そんな時はドンキホーテ。100円ショップに匹敵かそれ以上のUSB関係の機器が売ってます。

(これはドンキで買ってきた。iPod/iPod Touch用とiPod mini用)

もう、USBまわりを調整するなら100円ショップとドンキがあれば確実です。

常に動かすものも、USBで

ちょっと例が悪いかもしれないですが、最近池袋にあるゲーセンのUFOキャッチャーで取れたこれ。

(画像が汚いけど、これ・・加湿器なんだぜ・・・)

夏は、扇風機もUSB経由になることは、すでに予想されます

(扇風機もUSB・・・卓上用だけど・・・)

高度にUSB化された家電ライフを目指して

たぶん、100円ショップとドンキホーテの企業努力で、今後とも我が家の電源は、ありとあらゆるものはUSB化されていくと思います。

engadgetで読んだけど、"電源から直接USB"とか"49ポートUSB"のが日本でもぞくぞく登場すると嬉しいなぁ。

今は、USBハブ(4ポートとか)をタコHub化してて使い勝手が悪い。

ってか、たぶん、正しいUSBHubはこんな感じだと思う

(家中のUSBメモリ指してみました(合計4+8+2GB)。思ったより少なかった)

では、Happy USB Life を!

2010/01/15

memcacheの cache の部分を java で

ポスト @ 0:06:52 , 修正 @ 2010/01/15 0:11:13 | ,     

指定時間後に消える(というか参照出来なくなる)をやってみることに。
DelayQueueとDelayedで実装してみる。

Cacheの部分

public class RetireCache implements Cache {
    
    protected final Map<String, PeriodEntry> cache = new ConcurrentHashMap<String, PeriodEntry>();
    
    protected final DelayQueue<PeriodEntry> expiredQueue =  new DelayQueue<PeriodEntry>();

    public boolean set(String key, String value, long expiredAt) {
        check();
        final PeriodEntry entry = new PeriodEntry(key, value, expiredAt);
        cache.put(key, entry);
        return expiredQueue.add(entry);
    }

    public Entry get(String key) {
        if(!cache.containsKey(key)){
            return null;
        }
        PeriodEntry entry = cache.get(key);
        if(entry.isExpired()){
            return null;
        }
        return entry;
    }
    
    public void remove(String key) {
        remove(key, 0);
    }
    
    public void remove(String key, long expiredAt){
        PeriodEntry entry = cache.get(key);
        if(null != entry){
            entry.setExpiredAt(expiredAt);
        }
    }
    
    public void flush() {
        flush(0);
    }
    
    public void flush(long expiredAt){
        Iterator<Map.Entry<String, PeriodEntry>> entries = cache.entrySet().iterator();
        while(entries.hasNext()){
            Map.Entry<String, PeriodEntry> entry = entries.next();
            entry.getValue().setExpiredAt(expiredAt);
        }
    }
    
    protected void check(){
        synchronized(this){
            PeriodEntry e = null;
            while((e = expiredQueue.poll()) != null){
                cache.remove(e.getKey());
            }
        }
    }
}

mapにputするときは、同じentryのインスタンスをqueueにも入れておく
getするときには、isExpiredを見て期限切れはnullを返す。

なんとなく、期限切れのqueueの参照にスレッドを立てるのもあれなので、setの時にentryの削除をやってみる

Entryの部分

public class PeriodEntry implements Entry, Delayed {
    
    protected final String key;
    
    protected final String value;
    
    protected long expiredAt;
    
    public PeriodEntry(final String key, final String value, final long expiredAt){
        this.key = key;
        this.value = value;
        if(0 == expiredAt){
            this.expiredAt = Long.MAX_VALUE;
        } else if(expiredAt < 0) {
            this.expiredAt = 0;
        } else {
            this.expiredAt = relative(expiredAt);
        }
    }
    
    protected static long relative(long expiredAt){
        // 現在時間 + 指定時間
        return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    protected static long absolute(long expiredAt){
        return TimeUnit.SECONDS.toMillis(expiredAt);
    }

    public String getKey(){
        return key;
    }
    
    public String getValue(){
        return value;
    }
    
    public void setExpiredAt(long newValue){
        // 現在時間より次の期限を設定
        this.expiredAt = relative(newValue);
    }

    private long elapsed(){
        return expiredAt - System.currentTimeMillis();
    }
    
    public boolean isExpired(){
        return elapsed() < 1;
    }

    public long getDelay(TimeUnit unit) {
        // expiredの時間から経過時間を引き、残り時間を算出
        return unit.convert(elapsed(), TimeUnit.MILLISECONDS);
    }
    
    public int compareTo(Delayed o) {
        PeriodEntry target = (PeriodEntry) o;
        if(expiredAt < target.expiredAt){
            return -1;
        }
        if(expiredAt > target.expiredAt){
            return 1;
        }
        return 0;
    }

}

entryは基本的に相対的な時間を設定するようにしてる以外は、普通のでDelayedな実装で。

テスト

public class RetireCacheTest {
    
    @Test
    public void 指定時間で参照出来なくなる(){
        Cache cache = new RetireCache();
        cache.set("hoge", "hogeValue", 2);
        Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch(InterruptedException e){
        }
        Assert.assertNull("参照できなくなってる", cache.get("hoge"));
    }
    
    @Test
    public void 指定時間後に消えてる(){
        Cache cache = new RetireCache();
        cache.set("hoge", "hogeValue", 2);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e){
        }
        Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue");
        cache.remove("hoge", 5);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e){
        }
        Assert.assertEquals("これは消えてるのが正解?", cache.get("hoge").getValue(), "hogeValue");
    }
    
    @Test
    public void ゼロを指定すると消えない(){
        Cache cache = new RetireCache();
        cache.set("hoge", "hogeValue", 0);
        cache.set("foo", "fooValue", 1);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e){
        }
        Assert.assertNull("これは1秒後消えてる", cache.get("foo"));
        Assert.assertEquals("これは消えない", cache.get("hoge").getValue(), "hogeValue");
    }

    @Test
    public void キーがない(){
        Cache cache = new RetireCache();
        cache.set("hoge", "1", 0);
        Assert.assertNull(cache.get("foo"));
    }

    :
    : 省略
    :
}

memcacheのdeleteと一時実装が違うかな。本物は、delete key time でtimeにどんな時間を指定してもすぐに参照出来なくなる

時間を指定しない場合

set hoge 0 30 4
1234
STORED

delete hoge
DELETED

get hoge
END

消える時間を指定しても...

set hoge 0 30 4
1234
STORED

get hoge
VALUE hoge 0 4
1234
END

delete hoge 10000
DELETED

get hoge
END

なかなか難しい
次は LRU

2010/01/13

NIO にしたら爆速になった(was: なんちゃってmemcache互換サーバ)

ポスト @ 2:37:24 | , ,     

昨日の続き。
server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった

メイン

public class Server extends Thread {
    
    protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>();
    
    protected final ExecutorService acceptPool;
    
    protected final Cache cache;
    
    protected final int port;
    
    protected final int maxConnection;
    
    public Server(final int port, final int maxConnection){
        this(port, maxConnection, ByteSizeUnit.Mega.toLong(64));
    }
    public Server(final int port, final int maxConnection, final long maxMemory){
        this.port = port;
        this.maxConnection = maxConnection;
        this.cache = new LRUCache(maxMemory);
        this.acceptPool = Executors.newFixedThreadPool(maxConnection);
    }
    
    public static void main(String...args){
        Server s = new Server(12221, 32);
        s.start();
        try {
            s.join();
        } catch(InterruptedException e){
            e.printStackTrace(System.err);
        }
    }

    public void run(){
        ServerSocketChannel channel = null;
        try {
            channel = ServerSocketChannel.open();
            channel.configureBlocking(false);
            
            final ServerSocket serverSocket = channel.socket();
            serverSocket.setReuseAddress(true);
            serverSocket.bind(new InetSocketAddress(port));
            
            final Selector selector = Selector.open();
            try {
                channel.register(selector, SelectionKey.OP_ACCEPT, new AcceptAction(cache));
                
                while(0 < selector.select()){
                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    while(keys.hasNext()){
                        final SelectionKey key = keys.next();
                        keys.remove();
                        
                        if(!key.isValid()){
                            continue;
                        }
                        
                        Action action = (Action) key.attachment();
                        action.execute(key);
                    }
                }
            } finally {
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while(keys.hasNext()){
                    final SelectionKey key = keys.next();
                    key.channel().close();
                }
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(null != channel){
                try {
                    channel.close();
                } catch(IOException e){
                    e.printStackTrace();
                }
            }
        }
    }
}

新規接続を受け入れるAccept部分

public class AcceptAction implements Action {
    
    private final Cache cache;
    
    public AcceptAction(final Cache cache){
        this.cache = cache;
    }

    public void execute(SelectionKey selectionKey) {
        SocketChannel channel = null;
        try {
            channel = ((ServerSocketChannel) selectionKey.channel()).accept();
            channel.configureBlocking(false);

            ReadAction action = new ReadAction(cache, new ByteBufferReader(channel));
            channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

んで、読み込みと書き込みは分けた

public class ReadAction implements Action {
    
    protected final Cache cache;
    
    protected final Reader reader;
    
    protected final Handler handler;
    
    public ReadAction(final Cache cache, final Reader reader){
        this.cache = cache;
        this.reader = reader;
        this.handler = new Handler(cache, reader);
    }

    public void execute(SelectionKey selectionKey) {
        if(!selectionKey.isReadable()){
            return;
        }
        
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        try {
            if(!reader.readable()){
                channel.close();
                selectionKey.cancel();
                return;
            }
            String line = reader.readLine();
            if(line == null){
                channel.close();
                selectionKey.cancel();
                return;
            }
            Return r = handler.execute(line);

            WriteAction action = new WriteAction(r, this);
            channel.register(selectionKey.selector(), SelectionKey.OP_WRITE, action);
        } catch(IOException e){
            e.printStackTrace();
        }
    }

    private static class Handler implements CommandVisitor {
        private final Cache cache;
        private final Reader reader;
        private Handler(final Cache cache, final Reader reader){
            this.cache = cache;
            this.reader = reader;
        }
        public Return execute(String line){
            try {
                final StringReader r = new StringReader(line);
                final MemcacheParser parser = new MemcacheParser(r);
                
                Command command = parser.Command();
                return command.accept(this, null);
            } catch(ParseException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            } catch(Exception e){
                e.printStackTrace();
                return new Return(ResponseType.SERVER_ERROR, e.getMessage());
            }
        }
        :
        : // 省略
        :    
        public Return visit(RetrievalCommand command, Parameter parameter) {
            final List<String> keys = command.getKeys();
            final List<Return> returns = new ArrayList<Return>();
            for(final String key: keys){
                Entry entry = cache.get(key);
                if(null == entry){
                    returns.add(new Return(ResponseType.END));
                    continue;
                }
                returns.add(new Return(ResponseType.SEND_VALUE,
                    key, entry.getFlag(), entry.getLength(), entry.getValue()
                ));
            }
            return new Return(returns.toArray(new Return[returns.size()]));
        }
    
        public Return visit(SetCommand command, Parameter parameter) {
            try {
                final String nextLine = reader.readLine();
                if(cache.set(command.getKey(), nextLine, command.getFlags(), command.getExpTime())){
                    return new Return(ResponseType.STORED);
                }
                return new Return(ResponseType.NOT_STORED);
            } catch(IOException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            }
        }
    
        public Return visit(VersionCommand command, Parameter parameter) {
            return null;
        }
    }

}
public class WriteAction implements Action {
    
    private static final Charset ASCII = Charset.forName("US-ASCII"); 
    
    protected final Return ret;
    
    protected final ReadAction action;
    
    public WriteAction(final Return ret, ReadAction action){
        this.ret = ret;
        this.action = action;
    }

    public void execute(SelectionKey selectionKey) {
        if(!selectionKey.isWritable()){
            return;
        }
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        
        final CharBuffer buf = CharBuffer.wrap(ret.renderMessage());
        try {
            ByteBuffer bytes = ASCII.encode(buf);
            int capacity = bytes.capacity();
            int size = channel.write(bytes);
            if(size < capacity){
                int sum = size;
                while(sum < capacity){
                    if(size == 0){
                        return;
                    }
                    bytes.position(sum);
                    size = channel.write(bytes);
                    sum += size;
                }
            }
            try {
                channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
            } catch(ClosedChannelException e){
                e.printStackTrace();
            }
        } catch(IOException e){
            e.printStackTrace();
        }
    }
}

ということで、これをつかって簡単に比較すると

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 12221)
);
foreach($target as $t){
    $memcache = new Memcache;
    $memcache->connect($t['host'], $t['port']);

    $fail = 0;
    $elapsed = microtime(true);
    for($i = 0; $i < 1000; ++$i){
        if(false === $memcache->set('hoge', '1234')){
            echo 'ERROR!!', PHP_EOL;
        }
        if(false === $memcache->set('hoge', '123')){
            echo 'ERROR!!!', PHP_EOL;
        }
        $value = $memcache->get('hoge');
        if(false === $memcache->set('hoge', $value + 1)){
            echo 'ERRROR!!!!', PHP_EOL;
        }
        $result = $memcache->get('hoge');
        if($result != 124){
            $fail++;
        }
    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
    echo 'fail => ', $fail, PHP_EOL;
}
1)
target host => localhost port =>11211
elapsed: 0.66518497467
fail => 0
target host => localhost port =>12221
elapsed: 0.870754003525
fail => 0

2)
target host => localhost port =>11211
elapsed: 1.1857790947
fail => 0
target host => localhost port =>12221
elapsed: 0.868647098541
fail => 0

memcacheに匹敵してきた。

ってか、ByteBufferを読み書きするのは初めて書いたので、すごく手こずった。。
単純に \r\n までの一行を読みたいだけなのに。。
ということで、BufferedReaderのreadLine的なのを書いて色々とお茶を濁す。。

public class ByteBufferReader {
    
    protected final SocketChannel channel;
    
    protected final ByteBuffer buffer;
    
    public ByteBufferReader(final SocketChannel channel){
        this.channel = channel;
        this.buffer = ByteBuffer.allocateDirect(512);
    }
    
    public boolean readable() throws IOException {
        return buffer.hasRemaining();
    }
    
    public String readLine() throws IOException {
        final StringBuilder sb = new StringBuilder();
        readInto(sb);
        if(sb.length() < 1){
            return null;
        }
        return sb.toString();
    }
    
    private void readInto(final StringBuilder sb) throws IOException {
        while(true){
            int read = channel.read(buffer);
            if(read == 0){
                break;
            }
            if(read == -1){
                return;
            }
        }
        buffer.flip();
        while(more(sb));
        buffer.compact();
    }
    
    private boolean more(final StringBuilder sb) throws IOException {
        if(buffer.hasRemaining()){
            char ch = (char) buffer.get();
            switch(ch){
            case '\n':
                return false;
            case '\r':
                return true;
            default:
                sb.append(ch);
                return true;
            }
        }
        buffer.clear();
        return true;
    }

}

とりあえず、一段落
NIOは難しかった。

以前のログ