2010/02/07

HiveJDBC を S2JDBC 経由で使えるようにする

ポスト @ 4:36:35 , 修正 @ 2010/02/07 4:51:57 | , , , ,     

hadoop の話題。その3

HiveのJDBCを使えばリモート上で動いているHiveに対して、JDBC(over thrift)経由でHive QLを実行出来るのですごく便利です。
ref - Hive/HiveClient - Hadoop Wiki

HiveJDBCはフツーのJDBCっぽく使えるので、こんな感じで普通のDBサーバにSQLを投げる感覚で使える

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class HiveConnection {
    public static void main(String...args) throws SQLException {
        try {
            Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            System.exit(1);
        }
        Connection conn = DriverManager.getConnection("jdbc:hive://master1:10000/", "", "");
        Statement stmt = conn.createStatement();
        stmt.execute("SELECT distinct(id) FROM hoge");
        
        ResultSet rs = stmt.getResultSet();
        while(rs.next()){
            System.out.println(rs.getString(1));
        }
    }
}

なので、便利なものは便利なものと合体させてしまえ。ということで、みんな大好き S2JDBCで使えるようにしてみた

HiveServerを起動/停止

とりあえず、HiveJDBCを利用するには、thrift経由でHiveServerを叩ける必要がある。
以下のようなスクリプトを用意

start.sh

#!/usr/bin/env bash

pidfile=$HIVE_PID_DIR/hiveserver.pid
logfile=$HIVE_LOG_DIR/hiveserver.log

if [ -f $pidfile ]; then
    echo running as process `cat $pidfile`. stop it first
    exit 1
fi

nohup $HIVE_HOME/bin/hive --service hiveserver > $logfile 2>&1 < /dev/null &
echo $! > $pidfile

stop.sh

#!/usr/bin/env bash

pidfile=$HIVE_PID_DIR/hiveserver.pid
logfile=$HIVE_LOG_DIR/hiveserver.log

if [ -f $pidfile ]; then
    kill `cat $pidfile`
    rm $pidfile
else
    echo no pidfile $pidfile
fi

これで、実行したサーバでデフォルトのポート10000で起動する。

ちなみに、HADOOP_CONF_DIRとかHIVE_CONF_DIRは別途ちゃんと設定すること

HiveDialectを用意

先に書いておくと、SQLのorder by ...に相当するものは、HiveQLだと sort by ...なんだけど、今はまだ用意してない。今度書く

package org.seasar.extension.jdbc.dialect;

import org.seasar.extension.jdbc.SelectForUpdateType;

public class HiveDialect extends StandardDialect {
    @Override
    public String getName() {
        return "hive";
    }

    @Override
    public boolean supportsLimit() {
        return true;
    }
    
    @Override
    public String convertLimitSql(String sql, int offset, int limit) {
        StringBuilder buf = new StringBuilder(sql.length() + 20);
        buf.append(sql);
        buf.append(" limit ");
        buf.append(limit);
        return buf.toString();
    }

    @Override
    public boolean supportsBatchUpdateResults() {
        return false;
    }

    @Override
    public boolean supportsCursor() {
        return false;
    }

    @Override
    public boolean supportsForUpdate(SelectForUpdateType type, boolean withTarget) {
        return false;
    }

    @Override
    public boolean supportsGetGeneratedKeys() {
        return false;
    }

    @Override
    public boolean supportsIdentity() {
        return false;
    }

    @Override
    public boolean supportsInnerJoinForUpdate() {
        return false;
    }

    @Override
    public boolean supportsLockHint() {
        return false;
    }

    @Override
    public boolean supportsOffset() {
        return false;
    }

    @Override
    public boolean supportsOffsetWithoutLimit() {
        return false;
    }

    @Override
    public boolean supportsOuterJoinForUpdate() {
        return false;
    }

    @Override
    public boolean supportsSequence() {
        return false;
    }
    
}

HiveConnectionPoolを用意

これは、HiveがXA transactionとかconnection#closeなんかを実行することができない(未実装のコードなので...svn:..jdbc/HiveConnection)ので ConnectionPoolImpl を継承して ConnectionWrapper を取る部る部分を override

package org.seasar.extension.dbcp.impl;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.sql.XAConnection;
import javax.transaction.Transaction;

import org.seasar.extension.dbcp.ConnectionWrapper;
import org.seasar.framework.util.TransactionManagerUtil;

public class HiveConnectionPool extends ConnectionPoolImpl {
    
    private Map<Transaction, ConnectionWrapper> txActivePool = new ConcurrentHashMap<Transaction, ConnectionWrapper>();

    @Override
    public synchronized ConnectionWrapper checkOut() throws SQLException {
        Transaction tx = getTransaction();
        ConnectionWrapper conn = txActivePool.get(tx);
        if(null == conn){
            conn = createConnection(tx);
        }
        if(null != tx){
            txActivePool.put(tx, conn);
        }
        return conn;
    }
    
    protected ConnectionWrapper createConnection(Transaction transaction) throws SQLException {
        XAConnection xaConnection = getXADataSource().getXAConnection();
        Connection connection = xaConnection.getConnection();
        return new HiveConnectionWrapper(xaConnection, connection, this, transaction);
    }
    
    protected Transaction getTransaction(){
        return TransactionManagerUtil.getTransaction(getTransactionManager());
    }
    
}

HiveConnectionWrapper を用意する

これは後述するPreparedStatement対策として用意。PreparedStatementWrapper さえも wrapper を用意する必要があるので、ConnectionWrapperImpl を override

package org.seasar.extension.dbcp.impl;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.sql.XAConnection;
import javax.transaction.Transaction;

import org.seasar.extension.dbcp.ConnectionPool;
import org.seasar.framework.exception.SSQLException;

public class HiveConnectionWrapper extends ConnectionWrapperImpl {

    public HiveConnectionWrapper(XAConnection xaConnection,
            Connection physicalConnection, ConnectionPool connectionPool,
            Transaction tx) throws SQLException {
        super(xaConnection, physicalConnection, connectionPool, tx);
    }
    
    protected SQLException wrapException(final SQLException e, final String sql) {
        return new SSQLException("ESSR0072",
                new Object[] { sql, e.getMessage(),
                        new Integer(e.getErrorCode()), e.getSQLState() }, e
                        .getSQLState(), e.getErrorCode(), e, sql);
    }
    
    protected void assertOpened() throws SQLException {
        if (isClosed()) {
            throw new SSQLException("ESSR0062", null);
        }
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, autoGeneratedKeys), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnIndexes), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnNames), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }
}

HivePreparedStatementWrapperを用意する

これもConnectionと同じように、PreparedStatementの大半のコードが未実装(svn:jdbc/HivePreparedStatement)なので、PreparedStatementWrapper を継承して override

package org.seasar.extension.dbcp.impl;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.seasar.extension.jdbc.impl.PreparedStatementWrapper;
import org.seasar.framework.exception.SSQLException;

public class HivePreparedStatementWrapper extends PreparedStatementWrapper {

    protected final PreparedStatement original;
    
    protected final String sql;
    
    public HivePreparedStatementWrapper(PreparedStatement original, String sql) {
        super(original, sql);
        this.original = original;
        this.sql = sql;
    }
    
    protected SQLException wrapException(final SQLException e) {
        return wrapException(e, sql);
    }
    
    protected SQLException wrapException(final SQLException e, final String sql) {
        if (sql != null) {
            return new SSQLException("ESSR0072", new Object[] { sql,
                    String.valueOf(e.getErrorCode()), e.getSQLState() }, e
                    .getSQLState(), e.getErrorCode(), e, sql);
        }
        return e;
    }
    
    @Override
    public void close() {
        // HivePreparedStatement was not supported in #close
    }
    
    @Override
    public ResultSet executeQuery() throws SQLException {
        try {
            return new HiveResultSetWrapper(original.executeQuery());
        } catch (final SQLException e) {
            throw wrapException(e);
        }
    }
    
    @Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        try {
            return new HiveResultSetWrapper(original.executeQuery(sql));
        } catch (final SQLException e) {
            throw wrapException(e, sql);
        }
    }
    
    @Override
    public ResultSet getResultSet() throws SQLException {
        try {
            return new HiveResultSetWrapper(original.getResultSet());
        } catch (final SQLException e) {
            throw wrapException(e);
        }
    }
    
    @Override
    public ResultSet getGeneratedKeys() throws SQLException {
        try {
            return new HiveResultSetWrapper(original.getGeneratedKeys());
        } catch (final SQLException e) {
            throw wrapException(e);
        }
    }
}

HiveResultSetWrapper を用意

まだまだ続きます...HiveResultSet にも未実装が(svn:jdbc/HiveResultSet)なので、ResultSetWrapper を override

package org.seasar.extension.dbcp.impl;

import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.seasar.extension.jdbc.impl.ResultSetWrapper;

public class HiveResultSetWrapper extends ResultSetWrapper {
    
    protected static final String DOUBLE_QUOTE = "\"";
    
    protected final ResultSet rs;
    
    public HiveResultSetWrapper(ResultSet rs){
        super(rs);
        this.rs = rs;
    }

    @Override
    public void close() throws SQLException {
        // HiveResultSet was not supported in #close
    }
    
    protected Date parseDate(final String dt) throws Exception {
        String tmp = dt;
        if(dt.startsWith(DOUBLE_QUOTE) && dt.endsWith(DOUBLE_QUOTE)){
            int length = dt.length();
            tmp = dt.substring(1, length - 1);
        }
        return Date.valueOf(tmp);
    }
    
    @Override
    public Date getDate(int columnIndex) throws SQLException {
        Object obj = getObject(columnIndex);
        if (obj == null) {
            return null;
        }
        if(obj instanceof String){
            try {
                return parseDate((String) obj);
            } catch(Exception e){
                throw new SQLException("Cannot convert column " + columnIndex + " to date: " + e.toString());
            }
        }
        return super.getDate(columnIndex);
    }
    
}

jdbc.diconを修正

やっと、s2jdbcに近づいてきました...

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE components PUBLIC "-//SEASAR2.1//DTD S2Container//EN" "http://www.seasar.org/dtd/components21.dtd">
<components namespace="jdbc_hive">
    <!-- Hive does not support in jta -->
    <include path="jta.dicon"/>
    
    <component name="hiveDataSource" class="org.seasar.extension.dbcp.impl.XADataSourceImpl">
        <property name="driverClassName">"org.apache.hadoop.hive.jdbc.HiveDriver"</property>
        <property name="URL">"jdbc:hive://master1:10000/"</property>
        <property name="user">""</property>
        <property name="password">""</property>
    </component>

    <component name="hive" class="org.seasar.extension.dbcp.impl.DataSourceImpl" autoBinding="none">
        <arg>
            <component name="hivePool" class="org.seasar.extension.dbcp.impl.HiveConnectionPool" autoBinding="none">
                <property name="xaDataSource">hiveDataSource</property>
                <property name="timeout">600</property>
                <property name="maxPoolSize">10</property>
                <property name="allowLocalTx">true</property>
                <property name="readOnly">true</property>
                <property name="transactionManager">TransactionManager</property>
                <destroyMethod name="close"/>
            </component>
        </arg>
    </component>
</components>

s2jdbc.diconを用意

dialectをs2jdbcに読ませる

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE components PUBLIC "-//SEASAR//DTD S2Container 2.4//EN" "http://www.seasar.org/dtd/components24.dtd">
<components>
    <include path="jta.dicon"/>
    <include path="tx.dicon"/>
    <include path="s2jdbc-internal.dicon"/>
    
    <include path="jdbc-hive.dicon" />
    
    <component name="hiveDialect" class="org.seasar.extension.jdbc.dialect.HiveDialect" />
            
    <component name="hiveJdbcManager" class="org.seasar.extension.jdbc.manager.JdbcManagerImpl">
        <property name="maxRows">0</property>
        <property name="fetchSize">0</property>
        <property name="queryTimeout">0</property>
        <property name="dialect">hiveDialect</property>
        <initMethod name="init" />
    </component>
</components>

S2JDBC経由でHiveQLを投げてみる

Hiveはinsert文は無いので、LOAD DATA 文です。
しかも、preparedStatementで値のbindできないので、直接クエリを書きます。
さらに、getSingleResult であっても、何も結果セットがかえってこない(svn:hive/service/HiveServier)ので、selectBySql(Integer.class...)とかはできません。
あきらめて、selectBySql(String.class) で void にしてます。。

selectについても、preparedStatementでbindで値のbindができないので、直接クエリを書きます。
今はまだ、order by の書き換え(sort by)を行っていないので、orderByが使えません。直接クエリに書きます。
joinについても少し難あり。。。

等など、ということで、いくらか諦めると...!

@Component
@InterType("aop.propertyInterType")
public class HogeService {
    @Property(PropertyType.WRITE)
    protected JdbcManager hive;

    public void insert(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public void insertFromLocal(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA LOCAL INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public void overwrite(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public void overwriteFromLocal(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public List<Hoge> getHoge(){
        return hive.from(Hoge.class).where("id > 1234").limit(100).getResultList();
    }
}

\(^o^)/ やたー!うごいたー!

おわり。

ってことで、とりあえず、HiveをS2JDBC経由で利用できるようになったよ!

Seasarのパッケージはホント便利!ほとんどwrapperが用意されてるし、diconで切り替えれるから修正が楽!
でも、coreなパッケージほど、 private メソッド多いよ!せめて protected か getter を用意して!(無駄にコピーしちゃったのがいくつかある...)

ああああ、でもこれなら HiveJDBC をちゃんと修正した方が早かったかも。。orz
今度やってみよう。。

HDFS の HadoopThriftServer をなんとかする

ポスト @ 3:27:10 , 修正 @ 2010/02/07 3:31:41 | , , ,     

hadoop の話題。その2

hadoop を支える HDFS には HDFS-APIを通すことで、プログラム中から HDFS の読み書きが出きるようになります。(たぶん、hdfs-s3 なんかもこのAPI経由(? ソース読んでない))

(中略)

んで、この HDFS-API のなかに、Thrift を使って リモート上から HDFS の読み書きをできるようにしている HadoopThriftServer(theiftfs) があります。

この thriftfs の起動は に書かれているのですが、shellを握ってしまうのでこんな感じにしました。

#!/usr/bin/env bash

THRIFTFS_PID_FILE=$HADOOP_PID_DIR/thrift.pid
THRIFTFS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log

if [ -f $THRIFTFS_PID_FILE ]; then
    echo running as process `cat $THRIFTFS_PID_FILE`. stop it first
    exit 1
fi

CLASSPATH=$HADOOP_CONF_DIR
CLASSPATH=$CLASSPATH:$HADOOP_HOME/hadoop-0.20.1-core.jar:$HADOOP_HOME/hadoop-0.20.1-tools.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-1.0.4.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-api-1.0.4.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/log4j-1.2.15.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/contrib/thriftfs/hadoop-0.20.1-thriftfs.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/hadoopthriftapi.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/libthrift.jar

nohup java -Dcom.sun.management.jmxremote -cp $CLASSPATH org.apache.hadoop.thriftfs.HadoopThriftServer 10010 > $THRIFTFS_LOG_FILE 2>&1 < /dev/null &
echo $! > $THRIFTFS_PID_FILE

$HADOOP_HOME/bin/hadoop dfsadmin -safemode leave

同様に 停止は

#!/usr/bin/env bash

THRIFT_FS_PID_FILE=$HADOOP_PID_DIR/thrift.pid
THRIFT_FS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log

if [ -f $THRIFT_FS_PID_FILE ]; then
    kill `cat $THRIFT_FS_PID_FILE`
    rm $THRIFT_FS_PID_FILE
else
    echo no pidfile $THRIFT_FS_PID_FILE
fi

起動する際は dfs($HADOOP_HOME/bin/start-dfs.sh)が起動している状態で起動する必要があります。
(dfsadmin -safemode leave は適宜行ってください)

んで、このThriftFSは、こんな感じでリモート上の DFS のファイルを読み書きできます

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.fs.permission.FsPermission;

import org.apache.hadoop.thriftfs.api.Pathname;
import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
import org.apache.hadoop.thriftfs.api.ThriftHandle;
import org.apache.hadoop.thriftfs.api.ThriftIOException;

import com.facebook.thrift.TException;
import com.facebook.thrift.protocol.TBinaryProtocol;
import com.facebook.thrift.protocol.TProtocol;
import com.facebook.thrift.transport.TSocket;
import com.facebook.thrift.transport.TTransportException;

public class HDFSInput {
    
    public static void main(String...args) {
        final int defaultBufferSize = config.getInt("io.file.buffer.size", 4096);
        final long defaultBlockSize = config.getLong("dfs.block.size", FSConstants.DEFAULT_BLOCK_SIZE);
        final short defaultReplication = (short) config.getInt("dfs.replication", 3);

        TSocket socket = new TSocket("master1", 10010);
        TProtocol protocol = new TBinaryProtocol(socket);
        
        try {
            socket.open();
            try {
                ThriftHadoopFileSystem.Client client = new ThriftHadoopFileSystem.Client(protocol);
                // client timeout 5 sec
                client.setTimeout(5);
                Pathname hoge = new Pathname("/tmp/hoge");

                FsPermission permission = FsPermission.createImmutable((short) 0655);
                boolean overwrite = true;

                ThriftHandle writeHandler = client.createFile(hoge, permission.toShort(), overwrite, defaultBufferSize, defaultReplication, defaultBlockSize);
                client.write(writeHandler, "hello world");
                client.close(writeHandler);

                ThriftHandle readHandler = client.open(hoge);
                System.out.println(client.read(readHandler, 0, 1024));
                client.close(readHandler);
            } catch (TTransportException e) {
                e.printStackTrace();
            } catch (ThriftIOException e) {
                e.printStackTrace();
            }
        } catch (TException e) {
            e.printStackTrace();
        } finally {
            socket.close();
        }
    }
}

read時にBufferedReaderにwrapするともう少し便利に読み書きできる(これは今度書く)

んで、読み書きできるようになったんだけど、どうも複数のクライアントから連続して読み書きをすると、整合性が取れなくなってしまう(?)のかエラーがでるようになった。
元のソースを読むと...なんともエレガントな。。。

ということで、java.util.concurrent.atomic を使って書き直してみた(ここが本題)

package org.apache.hadoop.thriftfs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.thriftfs.api.Pathname;
import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
import org.apache.hadoop.thriftfs.api.ThriftHandle;
import org.apache.hadoop.thriftfs.api.ThriftIOException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;

import com.facebook.thrift.protocol.TBinaryProtocol;
import com.facebook.thrift.server.TServer;
import com.facebook.thrift.server.TThreadPoolServer;
import com.facebook.thrift.transport.TServerSocket;
import com.facebook.thrift.transport.TServerTransport;
import com.facebook.thrift.transport.TTransportFactory;

public class HadoopThriftServer extends ThriftHadoopFileSystem {

    static int serverPort = 0;                    // default port
    TServer    server = null;

    public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface
    {

      public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift");

      // HDFS glue
      Configuration conf;
      FileSystem fs;
          
      // stucture that maps each Thrift object into an hadoop object
      private AtomicLong nextId = new AtomicLong(new Random().nextLong());
      private ConcurrentHashMap<Long, Object> hadoopHash = new ConcurrentHashMap<Long, Object>();
      private Daemon inactivityThread = null;

      // Detect inactive session
      private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
      private static volatile long inactivityRecheckInterval = 60 * 1000;
      private static volatile boolean fsRunning = true;
      private AtomicLong now = new AtomicLong(now());

      // allow outsider to change the hadoopthrift path
      public void setOption(String key, String val) {
      }

      /**
       * Current system time.
       * @return current time in msec.
       */
      static long now() {
        return System.currentTimeMillis();
      }

      /**
      * getVersion
      *
      * @return current version of the interface.
      */
      public String getVersion() {
        return "0.1";
      }

      /**
       * shutdown
       *
       * cleanly closes everything and exit.
       */
      public void shutdown(int status) {
        LOG.info("HadoopThriftServer shutting down.");
        try {
          fs.close();
        } catch (IOException e) {
          LOG.warn("Unable to close file system");
        }
        Runtime.getRuntime().exit(status);
      }

      /**
       * Periodically checks to see if there is inactivity
       */
      class InactivityMonitor implements Runnable {
        public void run() {
          while (fsRunning) {
            try {
              if (now() > now.get() + inactivityPeriod) {
                LOG.warn("HadoopThriftServer Inactivity period of " +
                         inactivityPeriod + " expired... Stopping Server.");
                shutdown(-1);
              }
            } catch (Exception e) {
              LOG.error(StringUtils.stringifyException(e));
            }
            try {
              Thread.sleep(inactivityRecheckInterval);
            } catch (InterruptedException ie) {
            }
          }
        }
      }

      /**
       * HadoopThriftServer
       *
       * Constructor for the HadoopThriftServer glue with Thrift Class.
       *
       * @param name - the name of this handler
       */
      public HadoopThriftHandler(String name) {
        conf = new Configuration();
        now.set(now());
        try {
          inactivityThread = new Daemon(new InactivityMonitor());
          fs = FileSystem.get(conf);
        } catch (IOException e) {
          LOG.warn("Unable to open hadoop file system...");
          Runtime.getRuntime().exit(-1);
        }
      }

      /**
        * printStackTrace
        *
        * Helper function to print an exception stack trace to the log and not stderr
        *
        * @param e the exception
        *
        */
      static private void printStackTrace(Exception e) {
        for(StackTraceElement s: e.getStackTrace()) {
          LOG.error(s);
        }
      }

      /**
       * Lookup a thrift object into a hadoop object
       */
      private synchronized Object lookup(long id) {
        return hadoopHash.get(new Long(id));
      }

      /**
       * Insert a thrift object into a hadoop object. Return its id.
       */
      private synchronized long insert(Object o) {
        long next = nextId.incrementAndGet();
        hadoopHash.put(next, o);
        return next;
      }

      /**
       * Delete a thrift object from the hadoop store.
       */
      private synchronized Object remove(long id) {
        return hadoopHash.remove(new Long(id));
      }

      /**
        * Implement the API exported by this thrift server
        */

      /** Set inactivity timeout period. The period is specified in seconds.
        * if there are no RPC calls to the HadoopThrift server for this much
        * time, then the server kills itself.
        */
      public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
        inactivityPeriod = periodInSeconds * 1000; // in milli seconds
        if (inactivityRecheckInterval > inactivityPeriod ) {
          inactivityRecheckInterval = inactivityPeriod;
        }
      }


      /**
        * Create a file and open it for writing
        */
      public ThriftHandle create(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("create: " + path);
          FSDataOutputStream out = fs.create(new Path(path.pathname));
          long id = insert(out);
          ThriftHandle obj = new ThriftHandle(id);
          HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
          return obj;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
        * Create a file and open it for writing, delete file if it exists
        */
      public ThriftHandle createFile(Pathname path, 
                                     short mode,
                                     boolean  overwrite,
                                     int bufferSize,
                                     short replication,
                                     long blockSize) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("create: " + path +
                                       " permission: " + mode +
                                       " overwrite: " + overwrite +
                                       " bufferSize: " + bufferSize +
                                       " replication: " + replication +
                                       " blockSize: " + blockSize);
          FSDataOutputStream out = fs.create(new Path(path.pathname), 
                                             new FsPermission(mode),
                                             overwrite,
                                             bufferSize,
                                             replication,
                                             blockSize,
                                             null); // progress
          long id = insert(out);
          ThriftHandle obj = new ThriftHandle(id);
          HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
          return obj;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Opens an existing file and returns a handle to read it
       */
      public ThriftHandle open(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("open: " + path);
          FSDataInputStream out = fs.open(new Path(path.pathname));
          long id = insert(out);
          ThriftHandle obj = new ThriftHandle(id);
          HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
          return obj;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Opens an existing file to append to it.
       */
      public ThriftHandle append(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("append: " + path);
          FSDataOutputStream out = fs.append(new Path(path.pathname));
          long id = insert(out);
          ThriftHandle obj = new ThriftHandle(id);
          HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
          return obj;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * write to a file
       */
      public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("write: " + tout.id);
          FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
          byte[] tmp = data.getBytes("UTF-8");
          out.write(tmp, 0, tmp.length);
          HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
          return true;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * read from a file
       */
      public String read(ThriftHandle tout, long offset,
                         int length) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("read: " + tout.id +
                                       " offset: " + offset +
                                       " length: " + length);
          FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
          
          if (in.getPos() != offset) {
            in.seek(offset);
          }
          byte[] tmp = new byte[length];
          int numbytes = in.read(offset, tmp, 0, length);
          HadoopThriftHandler.LOG.debug("read done: " + tout.id);
          return new String(tmp, 0, numbytes, "UTF-8");
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Delete a file/directory
       */
      public boolean rm(Pathname path, boolean recursive) 
                            throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("rm: " + path +
                                       " recursive: " + recursive);
          boolean ret = fs.delete(new Path(path.pathname), recursive);
          HadoopThriftHandler.LOG.debug("rm: " + path);
          return ret;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Move a file/directory
       */
      public boolean rename(Pathname path, Pathname dest) 
                            throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("rename: " + path +
                                       " destination: " + dest);
          boolean ret = fs.rename(new Path(path.pathname), 
                                  new Path(dest.pathname));
          HadoopThriftHandler.LOG.debug("rename: " + path);
          return ret;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       *  close file
       */
       public boolean close(ThriftHandle tout) throws ThriftIOException {
         try {
           now.set(now());
           HadoopThriftHandler.LOG.debug("close: " + tout.id);
           Object obj = remove(tout.id);
           
           if (obj instanceof FSDataOutputStream) {
             FSDataOutputStream out = (FSDataOutputStream)obj;
             out.close();
           } else if (obj instanceof FSDataInputStream) {
             FSDataInputStream in = (FSDataInputStream)obj;
             in.close();
           } else {
             throw new ThriftIOException("Unknown thrift handle.");
           }
           HadoopThriftHandler.LOG.debug("closed: " + tout.id);
           return true;
         } catch (IOException e) {
           throw new ThriftIOException(e.getMessage());
         }
       }

       /**
        * Create a directory
        */
      public boolean mkdirs(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("mkdirs: " + path);
          boolean ret = fs.mkdirs(new Path(path.pathname));
          HadoopThriftHandler.LOG.debug("mkdirs: " + path);
          return ret;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Does this pathname exist?
       */
      public boolean exists(Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("exists: " + path);
          boolean ret = fs.exists(new Path(path.pathname));
          HadoopThriftHandler.LOG.debug("exists done: " + path);
          return ret;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Returns status about the specified pathname
       */
      public org.apache.hadoop.thriftfs.api.FileStatus stat(
                              Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("stat: " + path);
          org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
                                             new Path(path.pathname));
          HadoopThriftHandler.LOG.debug("stat done: " + path);
          return new org.apache.hadoop.thriftfs.api.FileStatus(
            stat.getPath().toString(),
            stat.getLen(),
            stat.isDir(),
            stat.getReplication(),
            stat.getBlockSize(),
            stat.getModificationTime(),
            stat.getPermission().toString(),
            stat.getOwner(),
            stat.getGroup());
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * If the specified pathname is a directory, then return the
       * list of pathnames in this directory
       */
      public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
                              Pathname path) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("listStatus: " + path);

          org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
                                             new Path(path.pathname));
          HadoopThriftHandler.LOG.debug("listStatus done: " + path);
          org.apache.hadoop.thriftfs.api.FileStatus tmp;
          List<org.apache.hadoop.thriftfs.api.FileStatus> value = 
            new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>();

          for (int i = 0; i < stat.length; i++) {
            tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
                        stat[i].getPath().toString(),
                        stat[i].getLen(),
                        stat[i].isDir(),
                        stat[i].getReplication(),
                        stat[i].getBlockSize(),
                        stat[i].getModificationTime(),
                        stat[i].getPermission().toString(),
                        stat[i].getOwner(),
                        stat[i].getGroup());
            value.add(tmp);
          }
          return value;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Sets the permission of a pathname
       */
      public void chmod(Pathname path, short mode) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("chmod: " + path + 
                                       " mode " + mode);
          fs.setPermission(new Path(path.pathname), new FsPermission(mode));
          HadoopThriftHandler.LOG.debug("chmod done: " + path);
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Sets the owner & group of a pathname
       */
      public void chown(Pathname path, String owner, String group) 
                                                         throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("chown: " + path +
                                       " owner: " + owner +
                                       " group: " + group);
          fs.setOwner(new Path(path.pathname), owner, group);
          HadoopThriftHandler.LOG.debug("chown done: " + path);
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }

      /**
       * Sets the replication factor of a file
       */
      public void setReplication(Pathname path, short repl) throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("setrepl: " + path +
                                       " replication factor: " + repl);
          fs.setReplication(new Path(path.pathname), repl);
          HadoopThriftHandler.LOG.debug("setrepl done: " + path);
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }

      }

      /**
       * Returns the block locations of this file
       */
      public List<org.apache.hadoop.thriftfs.api.BlockLocation> 
               getFileBlockLocations(Pathname path, long start, long length) 
                                           throws ThriftIOException {
        try {
          now.set(now());
          HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path);

          org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
                                                   new Path(path.pathname));

          org.apache.hadoop.fs.BlockLocation[] stat = 
              fs.getFileBlockLocations(status, start, length);
          HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path);

          org.apache.hadoop.thriftfs.api.BlockLocation tmp;
          List<org.apache.hadoop.thriftfs.api.BlockLocation> value = 
            new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>();

          for (int i = 0; i < stat.length; i++) {

            // construct the list of hostnames from the array returned
            // by HDFS
            List<String> hosts = new LinkedList<String>();
            String[] hostsHdfs = stat[i].getHosts();
            for (int j = 0; j < hostsHdfs.length; j++) {
              hosts.add(hostsHdfs[j]);
            }

            // construct the list of host:port from the array returned
            // by HDFS
            List<String> names = new LinkedList<String>();
            String[] namesHdfs = stat[i].getNames();
            for (int j = 0; j < namesHdfs.length; j++) {
              names.add(namesHdfs[j]);
            }
            tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
                        hosts, names, stat[i].getOffset(), stat[i].getLength());
            value.add(tmp);
          }
          return value;
        } catch (IOException e) {
          throw new ThriftIOException(e.getMessage());
        }
      }
    }

    // Bind to port. If the specified port is 0, then bind to random port.
    private ServerSocket createServerSocket(int port) throws IOException {
      try {
        ServerSocket sock = new ServerSocket();
        // Prevent 2MSL delay problem on server restarts
        sock.setReuseAddress(true);
        // Bind to listening port
        if (port == 0) {
          sock.bind(null);
          serverPort = sock.getLocalPort();
        } else {
          sock.bind(new InetSocketAddress(port));
        }
        return sock;
      } catch (IOException ioe) {
        throw new IOException("Could not create ServerSocket on port " + port + "." +
                              ioe);
      }
    }

    /**
     * Constrcts a server object
     */
    public HadoopThriftServer(String [] args) {

      if (args.length > 0) {
        serverPort = new Integer(args[0]);
      }
      try {
        ServerSocket ssock = createServerSocket(serverPort);
        TServerTransport serverTransport = new TServerSocket(ssock);
        Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
        ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler);
        TThreadPoolServer.Options options = new TThreadPoolServer.Options();
        options.minWorkerThreads = 10;
        server = new TThreadPoolServer(processor, serverTransport,
                                               new TTransportFactory(),
                                               new TTransportFactory(),
                                               new TBinaryProtocol.Factory(),
                                               new TBinaryProtocol.Factory(), 
                                               options);
        System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]...");
        HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]...");
        System.out.flush();

      } catch (Exception x) {
        x.printStackTrace();
      }
    }

    public static void main(String [] args) {
      HadoopThriftServer me = new HadoopThriftServer(args);
      me.server.serve();
    }
}

安定した。気がする。。
いつか問題となるようなコードを書いてpatchを作ろう

Hive の Local Metastore に derby を使う

ポスト @ 2:37:53 , 修正 @ 2010/02/07 2:39:17 | , ,     

突然ですが、hadoop の話題

Hadoop Hive といえば、SQL 感覚で MapReduce できるんですが、Hive は SQL のように記述できるようにするために、metastore 形式でメタデータを管理してます。
そのあたりは、Hive/AdminManual/MetastoreAdmin - Hadoop Wikimetastore_usage.pptxHiveのmetastoreをMySQLを使ってLocal Metastore形式で利用する - blog.katsuma.tv などに詳しく書かれているので省略。

ただ、local metastore にいちいち mysql をセットアップするのもあれだったんで、今回は derby にしておきます

derby編

derby のインストール

Apache Derbyからダウンロードしてきます。とりあえず最新のを落としてくる

> wget http://..../db-derby-10.5.3.0-bin.tar.gz
> tar xzvf db-derby-10.5.3.0-bin.tar.gz
> sudo mv db-derby-10.5.3.0-bin /opt/local/derby

次に、データ保存用の /var/db/derby を作っておく

> sudo mkdir /var/db/derby
> sudo chown nowel:users /var/db/derby

chown は derby 用のユーザでもいいので、実行ユーザに変えておく

環境変数の設定

環境変数名とかは、Hadoop のそれっぽくしておく

#!/usr/bin/env bash

export DERBY_HOME=/opt/local/derby
export DERBY_OPTS="-Dderby.drda.startNetworkServer=true -Dderby.drda.maxThreads=30 -Dderby.system.home=/var/db/derby -Dderby.drda.portNumber=1527 -Dderby.drda.host=0.0.0.0"
export DERBY_LOG_DIR=/var/log

これを $HOME/.derby_profile とかしておいて、.bashrcなんかに

source $HOME/.derby_profile

と記述しておくといちいち source しなくて楽

起動用のスクリプトを用意

start/stop/status くらいは何度か確認することがあるので、こんな感じで用意

start.sh

#!/usr/bin/env bash

logfile=$DERBY_LOG_DIR/derby.log

nohup $DERBY_HOME/bin/NetworkServerControl start > $logfile 2>&1 < /dev/null &

stop.sh

#!/usr/bin/env bash

$DERBY_HOME/bin/NetworkServerControl shutdown

status.sh

#!/usr/bin/env bash

$DERBY_HOME/bin/NetworkServerControl sysinfo
$DERBY_HOME/bin/NetworkServerControl runtimeinfo

とこんな感じ。

起動と停止

さっき用意した start.sh ファイル群を $HOME/bin とかに置いているのであれば

> $HOME/bin/start.sh

で起動できる

ちなみに、derbyはstartNetworkServerというスクリプトが用意されているけど、素のまま起動すると、ネットワーク越しにアクセスできない(正確には起動した同一ホストからしかアクセスできない)

ネットワーク越しに利用するならstartNetworkServer -h 0.0.0.0とするか、derby.drda.host=0.0.0.0みたいな変数を利用する。
(ここで結構ハマった...とりあえず、開発用なら 0.0.0.0 で始めるといいかと)

Hive編

hadoop の mapred/hdfs などはセットアップ済みとして進めます。

hive のセットアップ

先に書いておくと、http://www.apache.org/dyn/closer.cgi/hadoop/hive/とかに置いてある hive-0.4.1 を使ってセットアップを進めても

FAILED: Error in metadata: org.datanucleus.jdo.exceptions.TransactionNotReadableException: Cant read fields outside of transactions. You may want to set 'NontransactionalRead=true'.
FailedObject:1[OID]org.apache.hadoop.hive.metastore.model.MDatabase
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask

が発生してなかなか前に進めなくなる。
Hive CLI の起動時に

hive > set javax.jdo.option.NontransactionalRead=true;

としておくことで、一時的になんとかなるけど、接続毎にやることになるので、おすすめはしません。
(jpox.properties を使うというのもあるけど、設定の二重管理になりそうだし、後述する新しいのでは必要なさそうなので今回は 0.4.1 を使わないという方向で進みます)

Hiveをtrunkからもってきてビルドする

たぶん、trunkに入ってるのは hive-0.5.x 系

> svn export http://svn.apache.org/repos/asf/hadoop/hive/trunk hive
> cd hive
> ant package
> :
> : (しばし待つ)
> :
> cp -r build/dist/lib/* lib
> sudo mv hive /opt/local/hive-0.5.0-trunk

環境変数の設定

こんな感じで用意する

#!/usr/bin/env bash

export HIVE_HOME=/opt/local/hive-0.5.0-trunk
export HIVE_CONF_DIR=/home/hive/conf
export HIVE_PID_DIR=/var/run
export HIVE_LOG_DIR=/var/log

HIVE_CONF_DIRHADOOP_CONF_DIRと違って、hive-default.xml を HIVE-HOME/conf から読んでくれない(?)ので、hive-default.xmlとhive-log4j.propertiesは HIVE_CONF_DIR にシンボリックリンクしておく

> ln -s $HIVE_HOME/conf/hive-default.xml $HIVE_CONF_DIR/hive-default.xml
> ln -s $HIVE_HOME/conf/hive-log4j.properties $HIVE_CONF_DIR/hive-log4j.properties

hive-site.xmlを用意

$HIVE_CONF_DIR に hive-site.xml をこんな感じで用意

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>hive.metastore.local</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/hive/warehouse</value>
    </property>
    <property>
        <name>hive.metastore.rawstore.impl</name>
        <value>org.apache.hadoop.hive.metastore.ObjectStore</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:derby://master1:1527/metastore;create=true</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>org.apache.derby.jdbc.ClientDriver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value><!-- !empty --></value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value><!-- !empty --></value>
    </property>
    <property>
        <name>datanucleus.autoCreateTables</name>
        <value>true</value>
    </property>
</configuration>

Hive CLIの起動

とりあえず、derby の metastorage が使えるかどうかを確認するため、$HIVE_HOME/bin/hive で Hive CLI を起動して show tables する

> $HIVE_HOME/bin/hive
hive> show tables;
OK
Time taken: 8.613 seconds
hive> exit;

とここまで出れば derby で Local Metastore が使えるようになってます。

おわり

と、ここまで書いてみたけど、Hiveのwikiに同じようなものがあった orz
ref - HiveDerbyServerMode - Hadoop Wiki

2010/01/20

PHPで200行で作る memcached 互換サーバ

ポスト @ 22:00:52 | ,     

まさに誰得。

class MemcachedServer {
    protected $command;
    public function __construct(MemcachedCommand $command){
        $this->command = $command;
    }
    public function start(){
        $this->run();
    }
    public function run(){
        $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        if(false === $socket){
            $code = socket_last_error();
            $msg = socket_strerror($code);
            throw new Exception(sprintf('socket_create was error(%s):%s', $code, $msg));
        }
        socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);

        $binded = @socket_bind($socket, 0, 11222);
        if(false === $binded){
            $code = socket_last_error();
            $msg = socket_strerror($code);
            throw new Exception(sprintf('socket_bind was error(%s):%s', $code, $msg));
        }
        $listend = @socket_listen($socket);
        if(false === $listend){
            $code = socket_last_error();
            $msg = socket_strerror($code);
            throw new Exception(sprintf('socket_listen was error(%s):%s', $code, $msg));
        }
        //socket_set_nonblock($socket);
       
        echo 'server start on tcp://0.0.0.0:11222', PHP_EOL;
        $read = array($socket);
        $write = null;
        $except = null;
        while(true){
            $accept = @socket_accept($socket);
            if(false === $accept){
                continue;
            }
            $handler = new MemcachedAcceptHandler($accept, $this->command);
            $handler->init();
            $handler->execute();
            $handler->destroy();
        }
    }
}

interface StreamReadWrite {
    const DELIMITER = "\r\n";
    public function readLine();
    public function writeLine($str);
}

class MemcachedAcceptHandler implements StreamReadWrite {
    protected $socket;
    protected $command;
    protected $connected = true;
    public function __construct($socket, MemcachedCommand $command){
        $this->socket = $socket;
        $this->command = $command;
    }
    public function __destruct(){
        if(null !== $this->socket){
            @socket_close($this->socket);
            unset($this->socket);
        }
    }
    public function init(){
        echo 'new connection', PHP_EOL;
        //socket_set_nonblock($this->socket);
    }
    public function execute(){
        while($this->connected){
            $read = array($this->socket);
            $write = array();
            $except = array();
            $select = @socket_select($read, $write, $except, 1);
            if(false === $select){
                throw new RuntimeException('socket_select');
            }
            if($select < 1){
                continue;
            }

            $line = $this->readLine();
            if(null === $line){
                continue;
            }
            // get hoge => get
            $mode = substr($line, 0, 3);
            // get hoge foo => array(hoge, foo)
            $args = explode(' ', substr($line, 4));
            $this->command->call($this, $mode, $args);
        }
    }
    public function destroy(){
        echo 'close connection', PHP_EOL;
        @socket_shutdown($this->socket);
    }
    public function readLine(){
        $line = '';
        while(true){
            $buf = @socket_read($this->socket, 1);
            if(false === $buf || '' === $buf){
                // FIXME!!!
                $this->connected = false;
                return null;
            }
            $line .= $buf;
            if(self::DELIMITER == substr($line, -2)){
                $line = substr($line, 0, -2);
                break;
            }
        }
        if(empty($line)){
            return null;
        }
        if(preg_match('/^\s+$/', $line)){
            return null;
        }
        return $line;
    }
    public function writeLine($str){
        return @socket_write($this->socket, $str . self::DELIMITER);
    }
}

interface MemcachedCommand {
    public function call(StreamReadWrite $reader, $mode, array $args);
}

abstract class AbstractMemcachedCommand implements MemcachedCommand {
    protected $reflector;
    public function __construct(){
        $this->reflector = new ReflectionObject($this);
    }
    protected static function concat(array $a, array $b){
        array_splice($a, count($a), 0, $b);
        return $a;
    }
    public final function call(StreamReadWrite $rw, $mode, array $args){
        if(!$this->reflector->hasMethod($mode)){
            return $this->error($rw);
        }
        echo 'command => ', $mode, ' ', join(' ', $args), PHP_EOL;
        return call_user_func_array(array($this, $mode), self::concat(array($rw), $args));
    }
    protected function error($rw){
        $rw->writeLine('ERROR');
    }
    protected abstract function get(StreamReadWrite $rw, $keys);
    protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length);
    protected abstract function delete(StreamReadWrite $rw, $key, $expire = 0);
}
class StorageMemcacheCommand extends AbstractMemcachedCommand {
    protected $cache = array();
    protected function get(StreamReadWrite $rw, $keys){
        $args = func_get_args();
        array_shift($args);
        foreach($args as $key){
            if(!isset($this->cache[$key])){
                continue;
            }
            $value = $this->cache[$key];
            if($value->expire < time()){
                continue;
            }
            $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->length));
            $rw->writeLine($value->value);
        }
        $rw->writeLine('END');
    }
    protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){
        $value = new stdClass;
        $value->flag = $flag;
        $value->expire = time() + $expire;
        $value->length = $length;
        $value->value = $rw->readLine();
        $this->cache[$key] = $value;
        $rw->writeLine('STORED');
    }
    protected function delete(StreamReadWrite $rw, $key, $expire = 0){
        if(isset($this->cache[$key])){
            $this->cache[$key]->expire = $expire;
        }
    }
}

$server = new MemcachedServer(new StorageMemcacheCommand);
$server->start();

とりあえず、set/get/deleteだけ。threadとか色々ないので、実際には削除されない。。
他にも構文解析とかしてない(しなくていいくらい memcache はシンプルなんだけど)ので上手いことハンドリングしないといけない。。
同時アクセスに難あり。。

(中略)

などなど、色々あるので、開発用にしか向いてない。というか、よっぽどストイックにPHPを愛している人じゃないと向いていないかも。
まぁ最近は色々memcached互換系のがでてきたので、クライアント作成時のdebug用途向けかも。

今回は、PHPのsocket 関数をゴリゴリ使っています。
blocking modeとかはちょっと動きが不安定(?)なので、あまり性能はでない
c-lang版と比較を、この前のスクリプトを使ってみた

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 11222)
);
foreach($target as $t){
    $memcache = new Memcache;
    $memcache->connect($t['host'], $t['port']);

    $fail = 0;
    $fails = array();
    $elapsed = microtime(true);
    $count = 500;
    for($i = 0; $i < $count; ++$i){
        if(false === $memcache->set('hoge', 1234, 0, 10)){
            echo 'ERROR!!', PHP_EOL;
        }
        if(false === $memcache->set('hoge', 123, 0, 10)){
            echo 'ERROR!!!', PHP_EOL;
        }
        $value = (int) $memcache->get('hoge');
        if(false === $memcache->set('hoge', ((int)$value + 1), 0, 10)){
            echo 'ERROR!!!!', PHP_EOL;
        }
        $result = (int)$memcache->get('hoge');
        if($result != 124){
            $fails[] = $result;
            $fail++;
        }
    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
    echo 'fail => ', $fail, PHP_EOL;
}
target host => localhost port =>11211
elapsed: 0.634283065796
fail => 0

target host => localhost port =>11222
elapsed: 1.12030887604
fail => 0

とまあ、こんな感じ。(そこそこ?)
ちなみに、stream_socketを使って書いてみたものも置いておく。fwriteまわりでハマったので動かないと思うけど。

// 動かない><
class MemcachedServer {
    protected $command;
    public function __construct(MemcachedCommand $command){
        $this->command = $command;
    }
    public function start(){
        $this->run();
    }
    public function run(){
        $socket = stream_socket_server('tcp://0.0.0.0:11222', $code, $msg, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
        if(false === $socket){
            throw new RuntimeException(sprintf('stream_socket_server was error(%s):%s', $code, $msg));
        }
        $nonBlocking = stream_set_blocking($socket, 0);
        if(false === $nonBlocking){
            throw new RuntimeException('stream_set_blocking: set blocking mode error');
        }

        echo 'server start on tcp://0.0.0.0:11222', PHP_EOL;
        $read = array($socket);
        $write = null;
        $except = null;
        while(true){
            $accept = @stream_socket_accept($socket, 1);
            if(false === $accept){
                continue;
            }
            $handler = new MemcachedAcceptHandler($accept, $this->command);
            $handler->init();
            $handler->execute();
            $handler->destroy();
        }
    }
}

interface StreamReadWrite {
    const DELIMITER = "\r\n";
    public function readLine();
    public function writeLine($str);
}

class MemcachedAcceptHandler implements StreamReadWrite {
    protected $socket;
    protected $command;
    public function __construct($socket, MemcachedCommand $command){
        $this->socket = $socket;
        $this->command = $command;
    }
    public function __destruct(){
        if(null !== $this->socket){
            fclose($this->socket);
            unset($this->socket);
        }
    }
    public function init(){
        echo 'new connection', PHP_EOL;
        stream_set_blocking($this->socket, true);
        stream_set_timeout($this->socket, 60);
        // disable writebuffer
        stream_set_write_buffer($this->socket, 0);
    }
    protected function feof(){
        $recv = stream_socket_recvfrom($this->socket, 1, STREAM_PEEK);
        return strlen($recv) < 1;
    }
    public function execute(){
        while(true){
            $read = array($this->socket);
            $write = array();
            $except = array();
            $select = @stream_select($read, $write, $except, 1);
            if(false === $select){
                throw new RuntimeException('stream_select');
            }
            if($select < 1){
                continue;
            }

            $line = $this->readLine();
            echo 'line => ', $line, PHP_EOL;
            if(null === $line){
                return;
            }
            // get hoge => get
            $mode = substr($line, 0, 3);
            // get hoge foo => array(hoge, foo)
            $args = explode(' ', substr($line, 4));
            $this->command->call($this, $mode, $args);
        }
    }
    public function destroy(){
        echo 'close connection', PHP_EOL;
        // socket_close($socket)
        stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
    }
    public function readLine(){
        // 1048576 = 1024 * 1024
        //socket_read($socket, 1048576, PHP_NORMAL_READ
        $line = stream_get_line($this->socket, 2048, self::DELIMITER);
        if(empty($line)){
            return null;
        }
        if(preg_match('/^\s+$/', $line)){
            return null;
        }
        return $line;
    }
    public function writeLine($str){
        fputs($this->socket, $str . "\n\0");
    }
}

interface MemcachedCommand {
    public function call(StreamReadWrite $reader, $mode, array $args);
}

abstract class AbstractMemcachedCommand implements MemcachedCommand {
    protected $reflector;
    public function __construct(){
        $this->reflector = new ReflectionObject($this);
    }
    protected static function concat(array $a, array $b){
        array_splice($a, count($a), 0, $b);
        return $a;
    }
    public final function call(StreamReadWrite $rw, $mode, array $args){
        if(!$this->reflector->hasMethod($mode)){
            return $this->error($rw);
        }
        return call_user_func_array(array($this, $mode), self::concat(array($rw), $args));
    }
    protected function error($rw){
        $rw->writeLine('ERROR');
    }
    protected abstract function get(StreamReadWrite $rw, $keys);
    protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length);
}
class StorageMemcacheCommand extends AbstractMemcachedCommand {
    protected $cache = array();
    protected function get(StreamReadWrite $rw, $keys){
        $args = func_get_args();
        array_shift($args);
        foreach($args as $key){
            if(!isset($this->cache[$key])){
                continue;
            }
            $value = $this->cache[$key];
            $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->expire));
            $rw->writeLine($value->value);
        }
        $rw->writeLine('END');
    }
    protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){
        $value = new stdClass;
        $value->flag = $flag;
        $value->expire = $expire;
        $value->length = $length;
        $value->value = $rw->readLine();
        $this->cache[$key] = $value;
        $rw->writeLine('STORED');
    }

}

$server = new MemcachedServer(new StorageMemcacheCommand);
$server->start();

memcacheの cache の部分を java で(その2)

ポスト @ 0:21:29 , 修正 @ 2010/01/20 0:32:43 | ,     

前回の続き。

前回の結果、とりあえず、DelayQueueによって期限切れのEntryは取り出せるようになった。
だけど、この仕組みの状態では期限切れになったEntryは容赦なく消えていってしまう。
つまり、そのEntryがまだ利用されるかもしれないのに、消えてしまうのは(色々な意味で)もったいない。
特に、javaだとHashMapとかのloadFactorまわりの動きは(きっと)もったいない

もう一度LRUについてwikipediaに確認すると

Least Recently Used (LRU) はキャッシュメモリや仮想メモリが扱うデータのリソースへの割り当てを決定するアルゴリズムである。対義語はMost Recently Used (MRU)。
和訳すると「最近最も使われなかったもの」つまり「使われてから最も長い時間が経ったもの」「参照される頻度が最も低いもの」である。

ということなので(?)、ホントに使われ無かったもの(アクセスが少ないもの)から順番に消えるように考えてみた。

その結果がこれ

public class LRUCache implements CacheLifeCycle {
    
    private static final Log log = LogFactory.getLog(LRUCache.class);
    
    protected final ConcurrentMap<String, PrioritalEntry> cache = new ConcurrentHashMap<String, PrioritalEntry>();

    protected final DelayQueue<PrioritalEntry> expiredQueue =  new DelayQueue<PrioritalEntry>();
    
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
    private final Lock writeLock = lock.writeLock();
    
    private final Lock readLock = lock.readLock();
    
    public void register(LifeCycleExecutor executor){
        executor.add(this, expiredQueue);
    }
    
    public void purge(PrioritalEntry entry){
        writeLock.lock();
        try {
            // priorityを下げる
            if(entry.decrementPriority() < 1){
                // 1以下なら誰も使ってなさそうなので消す
                if(log.isDebugEnabled()){
                    log.debug("purge entry => " + entry);
                }
                cache.remove(entry.getKey());
                return;
            }
            // もう一度チャンス
            entry.setExpiredAt(retransmission(entry.getPriority()));
            expiredQueue.offer(entry);
        } finally {
            writeLock.unlock();
        }
    }
    
    public boolean set(String key, String value, long expiredAt){
        writeLock.lock();
        try {
            PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt);
            PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry);
            // 既存の値がない
            if(null == previousEntry){
                // ということは新しい値
                return expiredQueue.offer(newEntry);
            }
            previousEntry.setValue(value);
            previousEntry.setExpiredAt(expiredAt);
            previousEntry.incrementPriority();
            return true;
        } finally {
            writeLock.unlock();
        }
    }

    public PrioritalEntry get(String key) {
        readLock.lock();
        try {
            if(!cache.containsKey(key)){
                return null;
            }
            
            PrioritalEntry entry = cache.get(key);
            // 時間切れなので見えなくする
            if(entry.isExpired()){
                return null;
            }
            // 使う人がいたのでpriorityをあげる
            entry.incrementPriority();
            return entry;
        } finally {
            readLock.unlock();
        }
    }

    public void remove(String key) {
        remove(key, 0);
    }
    
    public void remove(String key, long expiredAt){
        writeLock.lock();
        try {
            PrioritalEntry entry = cache.get(key);
            if(null != entry){
                entry.setExpiredAt(expiredAt);
                entry.decrementPriority();
            }
        } finally {
            writeLock.unlock();
        }
    }
    
    public void flush() {
        flush(0);
    }
    
    public void flush(final long expiredAt){
        writeLock.lock();
        try {
            Iterator<Map.Entry<String, PrioritalEntry>> entries = cache.entrySet().iterator();
            while(entries.hasNext()){
                final Map.Entry<String, PrioritalEntry> entry = entries.next();
                final PrioritalEntry value = entry.getValue();
                value.setExpiredAt(expiredAt);
            }
        } finally {
            writeLock.unlock();
        }
    }
    
    protected static long retransmission(final int currentPriority){
        if(currentPriority < PrioritalEntry.MAX_PRIORITY){
            // binary exponential backoff
            long freq = currentPriority + Math.round(Math.pow(2, currentPriority));
            return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority);
        }
        return retransmission(PrioritalEntry.MAX_PRIORITY - 1);
    }
}

んで、PrioritalEntryの実装はこれ

public class PrioritalEntry implements Delayed {
    
    public static final int MAX_PRIORITY = 10;
    
    public static final int DEFAULT_PRIORITY = 5;
    
    private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY);
    
    private final String key;
    
    private final AtomicReference<String> value;
    
    private final AtomicLong expiredAt;
    
    public PrioritalEntry(final String key, final String value, final long expiredAt){
        this.key = key;
        this.value = new AtomicReference<String>(value);
        if(0 == expiredAt){
            this.expiredAt = new AtomicLong(Long.MAX_VALUE);
        } else if(expiredAt < 0) {
            this.expiredAt = new AtomicLong(0);
        } else {
            this.expiredAt = new AtomicLong(relative(expiredAt));
        }
    }

    protected static long relative(long expiredAt){
        // 現在時間 + 指定時間
        return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    protected static long absolute(long expiredAt){
        return TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    public int incrementPriority(){
        if(MAX_PRIORITY < priority.get()){
            return MAX_PRIORITY;
        }
        return priority.incrementAndGet();
    }
    
    public int decrementPriority(){
        return priority.decrementAndGet();
    }
    
    public int getPriority(){
        return priority.get();
    }

    public String getKey(){
        return key;
    }
    
    public String getValue(){
        return value.get();
    }
    
    public void setValue(String value){
        this.value.set(value);
    }
    
    public void setExpiredAt(long newValue){
        // 現在時間より次の期限を設定
        this.expiredAt.set(relative(newValue));
    }

    private long elapsed(){
        return expiredAt.get() - System.currentTimeMillis();
    }
    
    public boolean isExpired(){
        return elapsed() < 1;
    }

    public long getDelay(TimeUnit unit) {
        // expiredの時間から経過時間を引き、残り時間を算出
        return unit.convert(elapsed(), TimeUnit.MILLISECONDS);
    }
    
    public int compareTo(Delayed o) {
        PrioritalEntry target = (PrioritalEntry) o;
        final long x = expiredAt.get();
        final long y = target.expiredAt.get();
        if(x < y){
            return -1;
        }
        if(x > y){
            return 1;
        }
        return 0;
    }
    
    public String toString(){
        StringBuilder buf = new StringBuilder();
        buf.append("key=").append(key).append(",");
        buf.append("value=").append(value).append(",");
        buf.append("priority=").append(priority);
        return buf.toString();
    }
    
}

他にも

public interface CacheLifeCycle {
    public void register(LifeCycleExecutor executor);

    public void purge(PrioritalEntry entry);
}

public class LifeCycleExecutor {
    
    protected final ExecutorService executor;
    
    public LifeCycleExecutor(final ExecutorService executor){
        this.executor = executor;
    }
    
    public void add(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){
        executor.execute(new Monitor(lifeCycle, queue));
    }
    
    public void shutdown(){
        executor.shutdown();
        try {
            if(!executor.awaitTermination(10, TimeUnit.SECONDS)){
                executor.shutdownNow();
            }
        } catch(InterruptedException e){
            e.printStackTrace(System.err);
        }
    }
    
    private static class Monitor implements Runnable {
        private static final Log log = LogFactory.getLog(Monitor.class);
        private final CacheLifeCycle lifeCycle;
        private final BlockingQueue<PrioritalEntry> queue;
        private Monitor(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){
            this.lifeCycle = lifeCycle;
            this.queue = queue;
        }
        public void run(){
            try {
                while(true){
                    PrioritalEntry entry = queue.take();
                    if(log.isDebugEnabled()){
                        log.debug("find entry => " + entry);
                    }
                    lifeCycle.purge(entry);
                }
            } catch(InterruptedException e){
            }
        }
    }

}

細かく解説していく(誰に?)と

まず、PrioritalEntry(スペルとか意味は気にしない)の実装側から

ほとんどの実装は前回のと同じ。
今回は他にpriority(優先度)も一緒に持ってみた
インスタンス生成時は

    public static final int MAX_PRIORITY = 10;
    
    public static final int DEFAULT_PRIORITY = 5;
    
    private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY);

で作られるんだけど、LRUCacheのgetとかの呼び出しが行われる毎に、priorityをincrementしてる

// class LRUCache
    public PrioritalEntry get(String key) {
        readLock.lock();
        try {
            if(!cache.containsKey(key)){
                return null;
            }
            
            PrioritalEntry entry = cache.get(key);
            // 時間切れなので見えなくする
            if(entry.isExpired()){
                return null;
            }
            // 使う人がいたのでpriorityをあげる
            entry.incrementPriority();
            return entry;
        } finally {
            readLock.unlock();
        }
    }

// class PrioritalEntry
    public int incrementPriority(){
        if(MAX_PRIORITY < priority.get()){
            return MAX_PRIORITY;
        }
        return priority.incrementAndGet();
    }
    
    public int decrementPriority(){
        return priority.decrementAndGet();
    }
    
    public int getPriority(){
        return priority.get();
    }

これで、最も "使われる頻度の高い" Entryは "priorityが高い" という状態を作り出してる。

その他にも、setで値が更新される際にも、既に同じkeyでEntryが作られてたらその部分を "再利用" してみた

    public boolean set(String key, String value, long expiredAt){
        writeLock.lock();
        try {
            PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt);
            PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry);
            // 既存の値がない
            if(null == previousEntry){
                // ということは新しい値
                return expiredQueue.offer(newEntry);
            }
            previousEntry.setValue(value);
            previousEntry.setExpiredAt(expiredAt);
            previousEntry.incrementPriority();
            return true;
        } finally {
            writeLock.unlock();
        }
    }

ということで、priorityという値によって最近使われているかどうかまで、表現できたので、次は値の削除について

    public void purge(PrioritalEntry entry){
        writeLock.lock();
        try {
            // priorityを下げる
            if(entry.decrementPriority() < 1){
                // 1以下なら誰も使ってなさそうなので消す
                if(log.isDebugEnabled()){
                    log.debug("purge entry => " + entry);
                }
                cache.remove(entry.getKey());
                return;
            }
            // もう一度チャンス
            entry.setExpiredAt(retransmission(entry.getPriority()));
            expiredQueue.offer(entry);
        } finally {
            writeLock.unlock();
        }
    }

purgeというメソッドでは、LifeCycleExecutorによって、DelayQueueの中身が取り出された "期限切れ" の Entry が引数に渡される

        public void run(){
            try {
                while(true){
                    PrioritalEntry entry = queue.take();
                    if(log.isDebugEnabled()){
                        log.debug("find entry => " + entry);
                    }
                    lifeCycle.purge(entry);
                }
            } catch(InterruptedException e){
            }
        }

期限切れの Entry であっても、すぐに削除は行わずに "再利用" されるチャンスを与えるため、期限切れの Entry であっても Delay Queue に再送(retransmission)している。
ただ単純に再送するのではなく、再送のアルゴリズムとして昔どこかで覚えた "binary exponential backoff" を使ってみた(うろ覚えバージョン)

// binary exponential backoff
long freq = currentPriority + Math.round(Math.pow(2, currentPriority));
return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority);

これを実行してみるとこんな時間が求められる

retransmission(0) => 1
retransmission(1) => 3
retransmission(2) => 5
retransmission(3) => 10
retransmission(4) => 19
retransmission(5) => 33
retransmission(6) => 62
retransmission(7) => 119
retransmission(8) => 232
retransmission(9) => 457
:
:
:

これは再送時間を作るときに、本来ならリトライ回数を使うんだけど(リトライの数が多いほど、次回の再送時間までの待ちが増える、よってこまめに再送させない)、ここでは priority を使用している。
これによって、priorityが高いものほど、時間切れでも生存時間を多くすることができる。これによって再利用のチャンスが増えるはず。(priorityが高い = 最も再利用される可能性が高そう)

ということで、簡単に動かしてみる

LRUCache cache = new LRUCache();
LifeCycleExecutor executor = new LifeCycleExecutor(Executors.newCachedThreadPool());
cache.register(executor);

cache.set("hoge", "123", 5);
cache.set("foo", "456", 2);
cache.set("bar", "789", 2);

cache.get("foo");

try {
    TimeUnit.SECONDS.sleep(120);
} catch(InterruptedException e){
}

executor.shutdown();

で、これを動かしてみると。。。

LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=6
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=5
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=5
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=4
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=4
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=3
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=3
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=5
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=2
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=2
LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=1
LRUCache: purge entry => key=bar,value=789,priority=0
LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=1
LRUCache: purge entry => key=hoge,value=123,priority=0
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=4
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=3
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=2
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=1
LRUCache: purge entry => key=foo,value=456,priority=0

アクセスのあった "foo" キーは、 同じタイムアウトが設定されている "bar" よりも後に削除されている。
これでなんちゃって LRU はできた(?)

ちなみに、writeLockとかreadLockなどはたぶん使ってない。というか使わなくてもいいはず。

ちなみに、本家 memcached は slab allocationとかを使っているので、メモリの効率からいうとそっちの方がいいかも。
こういった部分を効率よく切り替えれるようにしたいものです

次回は、分散ハッシュ(というか複数台で連携)など。

2010/01/17

最近の我が家の電源まわり事情

ポスト @ 20:39:11 , 修正 @ 2010/01/17 21:21:32 | ,     

最近、我が家の電源まわりが結構すっきりしてきたので、一部紹介しようかと思います

電源タップがすっきりした!

以前はごちゃごちゃしていた、電源まわりが↓のように非常にすっきりしてます。

というのも、ここ最近あらゆる電源まわりがUSB(mini-usb/micro-usb)経由で充電できるものが増えてきたため、USBハブに電源まわりを一元化させてます。

(タコ足ならぬタコUSB状態)

ネット環境もイーモバのPocketWifiにすることで、大きかった(十分小さいけど)、バッファローのルータなどはおさらばです。
でも、その代わりにUSBで電源を供給できるようになった!

100円ショップを活用

ほぼ全てのガジェット関係はUSBから電源を供給できるようになっているものの、一部、付属してなかったりオプションだったりするのですが
最近の100円ショップは進化してます。
USB部分が、mini-usb化されていて、必要なオプションだけ選んで買えというスタイル。

(先端の小さいフタみたいのを個別に買ってくる。これでDSもUSBから電源を供給している)

そうそう。こんな感じのが100円ショップで売ってる

しかも100円ショップでも、1メートル級のUSBも売っていたりすので、電源から遠い場所でも安心!

(びよーん。比較にipod mini置いてみた)

なんでもUSB

ただ、100円ショップは微妙に品揃えが悪かったりする。例えば、iPod関係のUSBポートが売ってなかったりする。。
そんな時はドンキホーテ。100円ショップに匹敵かそれ以上のUSB関係の機器が売ってます。

(これはドンキで買ってきた。iPod/iPod Touch用とiPod mini用)

もう、USBまわりを調整するなら100円ショップとドンキがあれば確実です。

常に動かすものも、USBで

ちょっと例が悪いかもしれないですが、最近池袋にあるゲーセンのUFOキャッチャーで取れたこれ。

(画像が汚いけど、これ・・加湿器なんだぜ・・・)

夏は、扇風機もUSB経由になることは、すでに予想されます

(扇風機もUSB・・・卓上用だけど・・・)

高度にUSB化された家電ライフを目指して

たぶん、100円ショップとドンキホーテの企業努力で、今後とも我が家の電源は、ありとあらゆるものはUSB化されていくと思います。

engadgetで読んだけど、"電源から直接USB"とか"49ポートUSB"のが日本でもぞくぞく登場すると嬉しいなぁ。

今は、USBハブ(4ポートとか)をタコHub化してて使い勝手が悪い。

ってか、たぶん、正しいUSBHubはこんな感じだと思う

(家中のUSBメモリ指してみました(合計4+8+2GB)。思ったより少なかった)

では、Happy USB Life を!

2010/01/15

memcacheの cache の部分を java で

ポスト @ 0:06:52 , 修正 @ 2010/01/15 0:11:13 | ,     

指定時間後に消える(というか参照出来なくなる)をやってみることに。
DelayQueueとDelayedで実装してみる。

Cacheの部分

public class RetireCache implements Cache {
    
    protected final Map<String, PeriodEntry> cache = new ConcurrentHashMap<String, PeriodEntry>();
    
    protected final DelayQueue<PeriodEntry> expiredQueue =  new DelayQueue<PeriodEntry>();

    public boolean set(String key, String value, long expiredAt) {
        check();
        final PeriodEntry entry = new PeriodEntry(key, value, expiredAt);
        cache.put(key, entry);
        return expiredQueue.add(entry);
    }

    public Entry get(String key) {
        if(!cache.containsKey(key)){
            return null;
        }
        PeriodEntry entry = cache.get(key);
        if(entry.isExpired()){
            return null;
        }
        return entry;
    }
    
    public void remove(String key) {
        remove(key, 0);
    }
    
    public void remove(String key, long expiredAt){
        PeriodEntry entry = cache.get(key);
        if(null != entry){
            entry.setExpiredAt(expiredAt);
        }
    }
    
    public void flush() {
        flush(0);
    }
    
    public void flush(long expiredAt){
        Iterator<Map.Entry<String, PeriodEntry>> entries = cache.entrySet().iterator();
        while(entries.hasNext()){
            Map.Entry<String, PeriodEntry> entry = entries.next();
            entry.getValue().setExpiredAt(expiredAt);
        }
    }
    
    protected void check(){
        synchronized(this){
            PeriodEntry e = null;
            while((e = expiredQueue.poll()) != null){
                cache.remove(e.getKey());
            }
        }
    }
}

mapにputするときは、同じentryのインスタンスをqueueにも入れておく
getするときには、isExpiredを見て期限切れはnullを返す。

なんとなく、期限切れのqueueの参照にスレッドを立てるのもあれなので、setの時にentryの削除をやってみる

Entryの部分

public class PeriodEntry implements Entry, Delayed {
    
    protected final String key;
    
    protected final String value;
    
    protected long expiredAt;
    
    public PeriodEntry(final String key, final String value, final long expiredAt){
        this.key = key;
        this.value = value;
        if(0 == expiredAt){
            this.expiredAt = Long.MAX_VALUE;
        } else if(expiredAt < 0) {
            this.expiredAt = 0;
        } else {
            this.expiredAt = relative(expiredAt);
        }
    }
    
    protected static long relative(long expiredAt){
        // 現在時間 + 指定時間
        return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    protected static long absolute(long expiredAt){
        return TimeUnit.SECONDS.toMillis(expiredAt);
    }

    public String getKey(){
        return key;
    }
    
    public String getValue(){
        return value;
    }
    
    public void setExpiredAt(long newValue){
        // 現在時間より次の期限を設定
        this.expiredAt = relative(newValue);
    }

    private long elapsed(){
        return expiredAt - System.currentTimeMillis();
    }
    
    public boolean isExpired(){
        return elapsed() < 1;
    }

    public long getDelay(TimeUnit unit) {
        // expiredの時間から経過時間を引き、残り時間を算出
        return unit.convert(elapsed(), TimeUnit.MILLISECONDS);
    }
    
    public int compareTo(Delayed o) {
        PeriodEntry target = (PeriodEntry) o;
        if(expiredAt < target.expiredAt){
            return -1;
        }
        if(expiredAt > target.expiredAt){
            return 1;
        }
        return 0;
    }

}

entryは基本的に相対的な時間を設定するようにしてる以外は、普通のでDelayedな実装で。

テスト

public class RetireCacheTest {
    
    @Test
    public void 指定時間で参照出来なくなる(){
        Cache cache = new RetireCache();
        cache.set("hoge", "hogeValue", 2);
        Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch(InterruptedException e){
        }
        Assert.assertNull("参照できなくなってる", cache.get("hoge"));
    }
    
    @Test
    public void 指定時間後に消えてる(){
        Cache cache = new RetireCache();
        cache.set("hoge", "hogeValue", 2);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e){
        }
        Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue");
        cache.remove("hoge", 5);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e){
        }
        Assert.assertEquals("これは消えてるのが正解?", cache.get("hoge").getValue(), "hogeValue");
    }
    
    @Test
    public void ゼロを指定すると消えない(){
        Cache cache = new RetireCache();
        cache.set("hoge", "hogeValue", 0);
        cache.set("foo", "fooValue", 1);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch(InterruptedException e){
        }
        Assert.assertNull("これは1秒後消えてる", cache.get("foo"));
        Assert.assertEquals("これは消えない", cache.get("hoge").getValue(), "hogeValue");
    }

    @Test
    public void キーがない(){
        Cache cache = new RetireCache();
        cache.set("hoge", "1", 0);
        Assert.assertNull(cache.get("foo"));
    }

    :
    : 省略
    :
}

memcacheのdeleteと一時実装が違うかな。本物は、delete key time でtimeにどんな時間を指定してもすぐに参照出来なくなる

時間を指定しない場合

set hoge 0 30 4
1234
STORED

delete hoge
DELETED

get hoge
END

消える時間を指定しても...

set hoge 0 30 4
1234
STORED

get hoge
VALUE hoge 0 4
1234
END

delete hoge 10000
DELETED

get hoge
END

なかなか難しい
次は LRU

2010/01/13

NIO にしたら爆速になった(was: なんちゃってmemcache互換サーバ)

ポスト @ 2:37:24 | , ,     

昨日の続き。
server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった

メイン

public class Server extends Thread {
    
    protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>();
    
    protected final ExecutorService acceptPool;
    
    protected final Cache cache;
    
    protected final int port;
    
    protected final int maxConnection;
    
    public Server(final int port, final int maxConnection){
        this(port, maxConnection, ByteSizeUnit.Mega.toLong(64));
    }
    public Server(final int port, final int maxConnection, final long maxMemory){
        this.port = port;
        this.maxConnection = maxConnection;
        this.cache = new LRUCache(maxMemory);
        this.acceptPool = Executors.newFixedThreadPool(maxConnection);
    }
    
    public static void main(String...args){
        Server s = new Server(12221, 32);
        s.start();
        try {
            s.join();
        } catch(InterruptedException e){
            e.printStackTrace(System.err);
        }
    }

    public void run(){
        ServerSocketChannel channel = null;
        try {
            channel = ServerSocketChannel.open();
            channel.configureBlocking(false);
            
            final ServerSocket serverSocket = channel.socket();
            serverSocket.setReuseAddress(true);
            serverSocket.bind(new InetSocketAddress(port));
            
            final Selector selector = Selector.open();
            try {
                channel.register(selector, SelectionKey.OP_ACCEPT, new AcceptAction(cache));
                
                while(0 < selector.select()){
                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    while(keys.hasNext()){
                        final SelectionKey key = keys.next();
                        keys.remove();
                        
                        if(!key.isValid()){
                            continue;
                        }
                        
                        Action action = (Action) key.attachment();
                        action.execute(key);
                    }
                }
            } finally {
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while(keys.hasNext()){
                    final SelectionKey key = keys.next();
                    key.channel().close();
                }
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(null != channel){
                try {
                    channel.close();
                } catch(IOException e){
                    e.printStackTrace();
                }
            }
        }
    }
}

新規接続を受け入れるAccept部分

public class AcceptAction implements Action {
    
    private final Cache cache;
    
    public AcceptAction(final Cache cache){
        this.cache = cache;
    }

    public void execute(SelectionKey selectionKey) {
        SocketChannel channel = null;
        try {
            channel = ((ServerSocketChannel) selectionKey.channel()).accept();
            channel.configureBlocking(false);

            ReadAction action = new ReadAction(cache, new ByteBufferReader(channel));
            channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

んで、読み込みと書き込みは分けた

public class ReadAction implements Action {
    
    protected final Cache cache;
    
    protected final Reader reader;
    
    protected final Handler handler;
    
    public ReadAction(final Cache cache, final Reader reader){
        this.cache = cache;
        this.reader = reader;
        this.handler = new Handler(cache, reader);
    }

    public void execute(SelectionKey selectionKey) {
        if(!selectionKey.isReadable()){
            return;
        }
        
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        try {
            if(!reader.readable()){
                channel.close();
                selectionKey.cancel();
                return;
            }
            String line = reader.readLine();
            if(line == null){
                channel.close();
                selectionKey.cancel();
                return;
            }
            Return r = handler.execute(line);

            WriteAction action = new WriteAction(r, this);
            channel.register(selectionKey.selector(), SelectionKey.OP_WRITE, action);
        } catch(IOException e){
            e.printStackTrace();
        }
    }

    private static class Handler implements CommandVisitor {
        private final Cache cache;
        private final Reader reader;
        private Handler(final Cache cache, final Reader reader){
            this.cache = cache;
            this.reader = reader;
        }
        public Return execute(String line){
            try {
                final StringReader r = new StringReader(line);
                final MemcacheParser parser = new MemcacheParser(r);
                
                Command command = parser.Command();
                return command.accept(this, null);
            } catch(ParseException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            } catch(Exception e){
                e.printStackTrace();
                return new Return(ResponseType.SERVER_ERROR, e.getMessage());
            }
        }
        :
        : // 省略
        :    
        public Return visit(RetrievalCommand command, Parameter parameter) {
            final List<String> keys = command.getKeys();
            final List<Return> returns = new ArrayList<Return>();
            for(final String key: keys){
                Entry entry = cache.get(key);
                if(null == entry){
                    returns.add(new Return(ResponseType.END));
                    continue;
                }
                returns.add(new Return(ResponseType.SEND_VALUE,
                    key, entry.getFlag(), entry.getLength(), entry.getValue()
                ));
            }
            return new Return(returns.toArray(new Return[returns.size()]));
        }
    
        public Return visit(SetCommand command, Parameter parameter) {
            try {
                final String nextLine = reader.readLine();
                if(cache.set(command.getKey(), nextLine, command.getFlags(), command.getExpTime())){
                    return new Return(ResponseType.STORED);
                }
                return new Return(ResponseType.NOT_STORED);
            } catch(IOException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            }
        }
    
        public Return visit(VersionCommand command, Parameter parameter) {
            return null;
        }
    }

}
public class WriteAction implements Action {
    
    private static final Charset ASCII = Charset.forName("US-ASCII"); 
    
    protected final Return ret;
    
    protected final ReadAction action;
    
    public WriteAction(final Return ret, ReadAction action){
        this.ret = ret;
        this.action = action;
    }

    public void execute(SelectionKey selectionKey) {
        if(!selectionKey.isWritable()){
            return;
        }
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        
        final CharBuffer buf = CharBuffer.wrap(ret.renderMessage());
        try {
            ByteBuffer bytes = ASCII.encode(buf);
            int capacity = bytes.capacity();
            int size = channel.write(bytes);
            if(size < capacity){
                int sum = size;
                while(sum < capacity){
                    if(size == 0){
                        return;
                    }
                    bytes.position(sum);
                    size = channel.write(bytes);
                    sum += size;
                }
            }
            try {
                channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
            } catch(ClosedChannelException e){
                e.printStackTrace();
            }
        } catch(IOException e){
            e.printStackTrace();
        }
    }
}

ということで、これをつかって簡単に比較すると

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 12221)
);
foreach($target as $t){
    $memcache = new Memcache;
    $memcache->connect($t['host'], $t['port']);

    $fail = 0;
    $elapsed = microtime(true);
    for($i = 0; $i < 1000; ++$i){
        if(false === $memcache->set('hoge', '1234')){
            echo 'ERROR!!', PHP_EOL;
        }
        if(false === $memcache->set('hoge', '123')){
            echo 'ERROR!!!', PHP_EOL;
        }
        $value = $memcache->get('hoge');
        if(false === $memcache->set('hoge', $value + 1)){
            echo 'ERRROR!!!!', PHP_EOL;
        }
        $result = $memcache->get('hoge');
        if($result != 124){
            $fail++;
        }
    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
    echo 'fail => ', $fail, PHP_EOL;
}
1)
target host => localhost port =>11211
elapsed: 0.66518497467
fail => 0
target host => localhost port =>12221
elapsed: 0.870754003525
fail => 0

2)
target host => localhost port =>11211
elapsed: 1.1857790947
fail => 0
target host => localhost port =>12221
elapsed: 0.868647098541
fail => 0

memcacheに匹敵してきた。

ってか、ByteBufferを読み書きするのは初めて書いたので、すごく手こずった。。
単純に \r\n までの一行を読みたいだけなのに。。
ということで、BufferedReaderのreadLine的なのを書いて色々とお茶を濁す。。

public class ByteBufferReader {
    
    protected final SocketChannel channel;
    
    protected final ByteBuffer buffer;
    
    public ByteBufferReader(final SocketChannel channel){
        this.channel = channel;
        this.buffer = ByteBuffer.allocateDirect(512);
    }
    
    public boolean readable() throws IOException {
        return buffer.hasRemaining();
    }
    
    public String readLine() throws IOException {
        final StringBuilder sb = new StringBuilder();
        readInto(sb);
        if(sb.length() < 1){
            return null;
        }
        return sb.toString();
    }
    
    private void readInto(final StringBuilder sb) throws IOException {
        while(true){
            int read = channel.read(buffer);
            if(read == 0){
                break;
            }
            if(read == -1){
                return;
            }
        }
        buffer.flip();
        while(more(sb));
        buffer.compact();
    }
    
    private boolean more(final StringBuilder sb) throws IOException {
        if(buffer.hasRemaining()){
            char ch = (char) buffer.get();
            switch(ch){
            case '\n':
                return false;
            case '\r':
                return true;
            default:
                sb.append(ch);
                return true;
            }
        }
        buffer.clear();
        return true;
    }

}

とりあえず、一段落
NIOは難しかった。

2010/01/11

JavaCC で memcache text protocol の BNF(と、なんちゃってmemcache互換サーバ)

ポスト @ 23:39:32 | , , ,     

ANTLRのやつがあったけど、JavaCCのがみつからなかったので、でっちあげた
via - http://harward.us/~nharward/antlr/memcached_protocol.g

できあがったのは、こんな感じ

SKIP: {
    " " | "\t" | "\r" | "\n"
}
TOKEN: {
    < NUMBER: ["1"-"9"] (["0"-"9"])* | "0" >
  | < FLAGS: < NUMBER > >
  | < TIME: < NUMBER >  >
  | < LENGTH: < NUMBER > >
  | < CREMENT_VALUE: < NUMBER > >
  | < CAS_UNIQUE: < NUMBER > >
}
TOKEN: {
  < SET_STATEMENT: "set" >
  | < ADD_STATEMENT: "add" >
  | < REPLACE_STATEMENT: "replace" >
  | < APPEND_STATEMENT: "append" >
  | < PREPEND_STATEMENT: "prepend" >
  | < CAS_STATEMENT: "cas" >
  | < STORAGE_STATEMENT:
        < SET_STATEMENT >
        | < ADD_STATEMENT >
        | < REPLACE_STATEMENT >
        | < APPEND_STATEMENT >
        | < PREPEND_STATEMENT >
    >
  | < STORAGE_COMMAND:
        (
          < STORAGE_STATEMENT > < KEY > < FLAGS > < TIME > < LENGTH >
          | < CAS_STATEMENT > < KEY > < FLAGS > < TIME > < LENGTH > < CAS_UNIQUE >
        )
        (< NOREPLY >)?
    >
}
TOKEN: {
  < RETRIEVAL_STATEMENT: "get" | "gets" >
  | < RETRIEVAL_COMMAND:
        < RETRIEVAL_STATEMENT > < KEY >
    >
}
TOKEN: {
  < DELETE_STATEMENT: "delete" >
  | < DELETE_COMMAND:
        < DELETE_STATEMENT > < KEY > (< TIME >)? (< NOREPLY >)?
    >
}
TOKEN: {
  < INCREMENT_STATEMENT: "incr" >
  | < INCREMENT_COMMAND:
        < INCREMENT_STATEMENT > < KEY > < CREMENT_VALUE > (< NOREPLY >)?
    >
}
TOKEN: {
  < DECREMENT_STATEMENT: "decr" >
  | < DECREMENT_COMMAND:
        < DECREMENT_STATEMENT > < KEY > < CREMENT_VALUE > (< NOREPLY >)?
    >
}
TOKEN: {
  < STATISTICS_STATEMENT: "STAT" >
  | < STATISTICS_OPTION: "items" | "slabs" | "sizes" >
  | < STATISTICS_COMMAND:
        < STATISTICS_STATEMENT > (< STATISTICS_OPTION >)?
    >
}
TOKEN: {
  < FLUSH_STATEMENT: "flush_all" >
  | < FLUSH_COMMAND:
        < FLUSH_STATEMENT > (< TIME >)? (< NOREPLY >)?
    >
}
TOKEN: {
  < VERSION_STATEMENT: "version" >
  | < VERSION_COMMAND:
        < VERSION_STATEMENT >
    >
}
TOKEN: {
    < NOREPLY: "noreply" >
}
// last match
TOKEN: {
    < KEY: (~[" ", "\r","\n"])+ >
}

少し、数値まわりのToken(TIME, LENGTHとか)が適当すぎるかな。

んで、これにテキトーなNodeをparseしてあげてみる

Command Command():
{
  Command command;
}
{
  (
    command = RetrievalCommand()
  | command = StorageCommand()
  | command = DeleteCommand()
  | command = VersionCommand()
  )
  {
    return command;
  }
}
StorageCommand StorageCommand():
{
  StorageCommand command;
  String key;
  Long flags = 0L;
  Long time = 0L;
  Long length = 0L;
  Boolean noreply = Boolean.FALSE;
}
{
  command = createStorageCommand()
  key = Key()
  flags = Flags()
  time = Time()
  length = Length()
  noreply = Noreply()
  {
    command.setNode(jjtThis);
    command.setKey(key);
    command.setFlags(flags);
    command.setExpTime(time);
    command.setLength(length);
    command.setNoreply(noreply);
    return command;
  }
}
StorageCommand createStorageCommand():
{}
{
  (
    < SET_STATEMENT >
    {
      return new SetCommand();
    }
    | < ADD_STATEMENT >
    {
      return new AddCommand();
    }
    | < REPLACE_STATEMENT >
    {
      return new ReplaceCommand();
    }
    | < APPEND_STATEMENT >
    {
      return new AppendCommand();
    }
    | < PREPEND_STATEMENT >
    {
      return new PrependCommand();
    }
  )
}

RetrievalCommand RetrievalCommand():
{
  RetrievalCommand command = new RetrievalCommand();
  String key;
}
{
  < RETRIEVAL_STATEMENT >
  (
    key = Key()
    {
      command.addKey(key);
    }
  )+
  {
    command.setNode(jjtThis);
    return command;
  }
}
DeleteCommand DeleteCommand():
{
  DeleteCommand command = new DeleteCommand();
  String key;
  Long time = 0L;
  Boolean noreply = Boolean.FALSE;
}
{
  < DELETE_STATEMENT >
  key = Key()
  time = Time()
  noreply = Noreply()
  {
    command.setNode(jjtThis);
    command.setKey(key);
    command.setExpTime(time);
    command.setNoreply(noreply);
    return command;
  }
}
VersionCommand VersionCommand():
{}
{
  < VERSION_STATEMENT >
  {
    VersionCommand command = new VersionCommand();
    command.setNode(jjtThis);
    return command;
  }
}

String Key():
{ Token key; }
{
  key = < KEY >
  {
    return key.image;
  }
}

Long Flags():
{ Token flags; }
{
  flags = < NUMBER >
  {
    return Long.valueOf(flags.image);
  }
}

Long Time():
{ Token time; }
{
  time = < NUMBER >
  {
    return Long.valueOf(time.image);
  }
}

Long Length():
{ Token length; }
{
  length = < NUMBER >
  {
    return Long.valueOf(length.image);
  }
}

Boolean Noreply():
{ Boolean noreply = Boolean.FALSE; }
{
  [< NOREPLY >{noreply = Boolean.TRUE;}]
  {
    return noreply;
  }
}

これに、適当なコードを投げてあげると

{
    StringReader reader = new StringReader("get hoge\r\n");
    MemcacheParser parser = new MemcacheParser(reader);
    try {
        parser.Command();
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
{
    StringReader reader = new StringReader("gets hoge foo\r\n");
    MemcacheParser parser = new MemcacheParser(reader);
    try {
        parser.Command();
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
{
    StringReader reader = new StringReader("set xyzkey 0 0 6\r\n");
    MemcacheParser parser = new MemcacheParser(reader);
    try {
        parser.Command();
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
Call:   Command
  Call:   RetrievalCommand
    Consumed token: <<RETRIEVAL_STATEMENT>: "get" at line 1 column 1>
    Call:   Key
      Consumed token: <<KEY>: "hoge" at line 1 column 5>
    Return: Key
  Return: RetrievalCommand
Return: Command
Call:   Command
  Call:   RetrievalCommand
    Consumed token: <<RETRIEVAL_STATEMENT>: "gets" at line 1 column 1>
    Call:   Key
      Consumed token: <<KEY>: "hoge" at line 1 column 6>
    Return: Key
    Call:   Key
      Consumed token: <<KEY>: "foo" at line 1 column 11>
    Return: Key
  Return: RetrievalCommand
Return: Command
Call:   Command
  Call:   StorageCommand
    Call:   createStorageCommand
      Consumed token: <"set" at line 1 column 1>
    Return: createStorageCommand
    Call:   Key
      Consumed token: <<KEY>: "xyzkey" at line 1 column 5>
    Return: Key
    Call:   Flags
      Consumed token: <<NUMBER>: "0" at line 1 column 12>
    Return: Flags
    Call:   Time
      Consumed token: <<NUMBER>: "0" at line 1 column 14>
    Return: Time
    Call:   Length
      Consumed token: <<NUMBER>: "6" at line 1 column 16>
    Return: Length
    Call:   Noreply
    Return: Noreply
  Return: StorageCommand
Return: Command

と、こんな感じになる。
<NUMBER>とか、ホント、マジメにtokenが書けてないですね。。

ここまでできたので、set と get しかない、memcached 互換ものをでっち上げてみた

public class Server extends Thread {
    
    protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>();
    
    protected final Cache<String, String> cache = new LRUCache<String, String>();

    protected final ExecutorService acceptPool;
    
    protected final int port;
    
    protected final int maxConnection;
    
    public Server(final int port, final int maxConnection){
        this.port = port;
        this.maxConnection = maxConnection;
        this.acceptPool = Executors.newFixedThreadPool(maxConnection);
    }
    
    public static void main(String...args){
        Server s = new Server(12221, 32);
        s.start();
        
        while(s.isAlive()){
            try {
                TimeUnit.MICROSECONDS.sleep(10);
            } catch(InterruptedException e){}
        }
    }

    public void run(){
        try {
            ServerSocketFactory factory = ServerSocketFactory.getDefault();
            ServerSocket socket = factory.createServerSocket(port, maxConnection);
            socket.setReuseAddress(true);
            
            while(!socket.isClosed()){
                final Socket accept = socket.accept();
                acceptPool.execute(new AcceptHandler(accept));
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    protected class AcceptHandler implements Runnable {
        private final Socket socket;
        public AcceptHandler(final Socket socket){
            this.socket = socket;
        }
        public void run(){
            try {
                final InputStream in = socket.getInputStream();
                final OutputStream out = socket.getOutputStream();
                
                final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                final DataOutputStream writer = new DataOutputStream(out);
                final CommandWorker worker = new CommandWorker(reader);
                while(!socket.isClosed()){
                    if(!worker.prepare()){
                        break;
                    }
                    Return r = worker.call();
                    writer.writeBytes(r.renderMessage());
                }
            } catch(IOException e){
                e.printStackTrace();
            } finally {
                try {
                    socket.close();
                } catch(IOException e){
                    // nop
                }
            }
        }
    }
    private class CommandWorker implements CommandVisitor {
        private final BufferedReader reader;
        private String currentLine;
        public CommandWorker(final BufferedReader reader) {
            this.reader = reader;
        }
        
        public boolean prepare(){
            try {
                String line = reader.readLine();
                if(null == line){
                    return false;
                }
                currentLine = line;
                return true;
            } catch(IOException e){
                return false;
            }
        }
        
        public Return call() {
            try {
                final StringReader r = new StringReader(currentLine);
                final MemcacheParser parser = new MemcacheParser(r);
                
                Command command = parser.Command();
                return command.accept(this, null);
            } catch(ParseException e){
                return new Return(ResponseType.ERROR);
            }
        }
        
        public Return visit(Command command, Parameter parameter) {
            return null;
        }
    
        public Return visit(AddCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(AppendCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(CasCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(DeleteCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(PrependCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(ReplaceCommand command, Parameter parameter) {
            return null;
        }
    
        public Return visit(RetrievalCommand command, Parameter parameter) {
            String value = cache.get(command.getKeys().get(0));
            if(null == value){
                return new Return(ResponseType.END);
            }
            return new Return(ResponseType.SEND_VALUE,
                command.getKeys().get(0), 0, value.length(),
                value
            );
        }
    
        public Return visit(SetCommand command, Parameter parameter) {
            try {
                final String nextLine = reader.readLine();
                cache.put(command.getKey(), nextLine, command.getExpTime().longValue());
                return new Return(ResponseType.STORED);
            } catch(IOException e){
                e.printStackTrace();
                return new Return(ResponseType.ERROR);
            }
        }
    
        public Return visit(VersionCommand command, Parameter parameter) {
            return null;
        }
    }
}

見事に、setとgetしか実装してません。しかもハンドリングは少し適当。

ということで、これと(java-lang)、memcached(c-lang)で比較してみた。

$target = array(
    array('host' => 'localhost', 'port' => 11211),
    array('host' => 'localhost', 'port' => 12221)
);
foreach($target as $t){
    $memcache = new Memcache;
    $memcache->connect($t['host'], $t['port']);

    $elapsed = microtime(true);
    for($i = 0; $i < 1000; ++$i){
        $memcache->set('hoge', '123');
        $memcache->set('hoge', '124');
        $memcache->get('hoge');

    }
    echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
    echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
}
target host => localhost port =>11211
elapsed: 0.420136213303
target host => localhost port =>12221
elapsed: 1.34848499298

なんつーか、「もうちょっとがんばりま賞」って感じで残念感があります。(約3倍遅い)
とりあえず、動きそうなので、他の実装も頑張る。

2010/01/07

PHP で ConsistentHash

ポスト @ 23:46:51 , 修正 @ 2010/01/07 23:59:10 | , ,     

ConsistentHashの動きをPHPでもやってみた
via - http://www.hyuki.com/yukiwiki/wiki.cgi?ConsistentHashing

こんな感じで実装

interface HashFunction {
    public function hash($key);
}
interface Circle {
    public function put($key, $value);
}
interface Node {
    public function put($key, $value);
    public function get($key);
    public function has($key);
    public function keys();
    public function getName();
}

class TreeMap implements Circle {
    private $values;
    public function __construct(array $values = array()){
        $this->values = new ArrayObject($values);
    }
    public function put($key, $value){
        $this->values->offsetSet($key, $value);
    }
    public function get($key){
        return $this->values->offsetGet($key);
    }
    public function remove($key){
        $this->values->offsetUnset($key);
    }
    public function has($key){
        return $this->values->offsetExists($key);
    }
    public function isEmpty(){
        return $this->values->count() < 1;
    }
    public function firstKey(){
        $map = clone $this->values;
        $map->ksort();
        $arrayKeys = array_keys($map->getArrayCopy());
        return $arrayKeys[0];
    }
    public function tailMap($key){
        $map = clone $this->values;
        $map->ksort();
        $array = $map->getArrayCopy();
        $arrayKeys = array_keys($array);

        $results = array();
        foreach($arrayKeys as $arrayKey){
            if($key <= $arrayKey){
                $results[$arrayKey] = $array[$arrayKey];
            }
        }
        return new self($results);
    }
}

class ConsistentHash {
    private $hashFunction;
    private $numberOfReplicas;
    private $circle;
    private $nodes = array();
    public function __construct(HashFunction $hashFunction, $numberOfReplicas){
        $this->hashFunction = $hashFunction;
        $this->numberOfReplicas = $numberOfReplicas;
        $this->circle = new TreeMap;
    }
    public function getNodes(){
        return $this->nodes;
    }
    public function add(Node $node){
        for($i = 0; $i < $this->numberOfReplicas; ++$i){
            $nodeKey = $this->hashFunction->hash($node->getName() . $i);
            $this->circle->put($nodeKey, $node);
        }
        $this->nodes[] = $node;
    }
    public function get($key){
        if($this->circle->isEmpty()){
            return null;
        }
        $hash = $this->hashFunction->hash($key);
        if(!$this->circle->has($hash)){
            $tailMap = $this->circle->tailMap($hash);
            if($tailMap->isEmpty()){
                $hash = $this->circle->firstKey();
            } else {
                $hash = $tailMap->firstKey();
            }
        }
        return $this->circle->get($hash);
    }
    public function remove(Node $node){
        for($i = 0; $i < $this->numberOfReplicas; ++$i){
            $nodeKey = $this->hashFunction->hash($node->getName() . $i);
            $this->circle->remove($nodeKey);
        }
    }
}

class ConsistentHashNode implements Node {
    private $hash;
    private $keys = array();
    public function __construct(ConsistentHash $hash){
        $this->hash = $hash;
    }
    public function put($key, $value){
        $this->hash->get($key)->put($key, $value);
        $this->keys[] = $key;
    }
    public function get($key){
        return $this->hash->get($key)->get($key);
    }
    public function has($key){
        return $this->hash->get($key)->has($key);
    }
    public function keys(){
        return $this->keys;
    }
    public function getName(){
        return __CLASS__;
    }
}

class IdentNode implements Node {
    private $name;
    private $values = array();
    public function __construct($name){
        $this->name = $name;
    }
    public function put($key, $value){
        $this->values[$key] = $value;
    }
    public function get($key){
        return $this->values[$key];
    }
    public function has($key){
        return isset($this->values);
    }
    public function keys(){
        return array_keys($this->values);
    }
    public function getName(){
        return $this->name;
    }
}

PHPでTreeMapみたいな実装(keyを並び替えながら挿入、指定key以降のmap取得)が思いつかなかったので、ksortとかつかって、なんちゃって実装

んで、ハッシュアルゴリズムとかは、こんな感じで実装して、必要に応じて切り替える

class HashMD5Integer implements HashFunction {
    public function hash($key){
        $values = unpack('H*', md5($key));
        return $values[1];
    }
}
class HashSha1Integer implements HashFunction {
    public function hash($key){
        $values = unpack('H*', sha1($key));
        return $values[1];
    }
}
class HashCRC32Integer implements HashFunction {
    public function hash($key){
        $values = unpack('H*', crc32($key));
        return $values[1];
    }
}

これについても、ハッシュ値を並び替えるため、数値にしたくて、ハッシュ文字列(md5)を数値(digit)にする方法が思いつかなかったので、unpack(H*)。もっといい方法があると思う。

こんな風に動かす

$hash = new ConsistentHash(new HashMD5Integer, 8);
$hash->add(new IdentNode('hoge1'));
$hash->add(new IdentNode('hoge2'));
$hash->add(new IdentNode('hoge3'));
$map = new ConsistentHashNode($hash);
for($i = 0; $i < 10; ++$i){
    $map->put('key' . $i, 'value' . $i);
}
foreach($hash->getNodes() as $node){
    echo $node->getName(), ':', join(',', $node->keys()), PHP_EOL;
}

実行結果としては、こんな感じで出た。まあそこそこバラけてる(?)

hoge1:key1,key4,key6
hoge2:key3,key5,key7
hoge3:key0,key2,key8,key9

試しに他のアルゴリズムでも実行して比較してみた

$hashAlgos = array(
    new HashMD5Integer,
    new HashSha1Integer,
    new HashCRC32Integer
);
foreach($hashAlgos as $algo){
    echo get_class($algo), PHP_EOL;
    $hash = new ConsistentHash($algo, 32);
    for($i = 0; $i < 10; $i++){
       $hash->add(new IdentNode('hoge' . $i));
    }

    $map = new ConsistentHashNode($hash);
    for($i = 0; $i < 100; $i++){
        $map->put('*' . $i, $i);
    }

    $allKeys = $map->keys();
    foreach($hash->getNodes() as $node){
        $keys = $node->keys();
        echo 'node(', (count($keys) / count($allKeys)) * 100, '%):', $node->getName(), ', keys:', join(',', $keys), PHP_EOL;
    }
}
HashMD5Integer
node(12%):hoge0, keys:*2,*9,*21,*27,*30,*47,*49,*52,*53,*70,*77,*89
node(12%):hoge1, keys:*0,*8,*10,*20,*23,*33,*40,*41,*69,*86,*90,*98
node(11%):hoge2, keys:*16,*22,*31,*38,*39,*63,*64,*72,*75,*79,*88
node(10%):hoge3, keys:*12,*48,*56,*58,*62,*65,*67,*73,*83,*84
node(4%):hoge4, keys:*6,*61,*91,*95
node(11%):hoge5, keys:*1,*4,*7,*13,*17,*28,*42,*44,*54,*55,*94
node(13%):hoge6, keys:*5,*11,*26,*35,*37,*43,*50,*51,*57,*74,*87,*92,*97
node(8%):hoge7, keys:*18,*32,*36,*45,*46,*60,*82,*85
node(8%):hoge8, keys:*15,*19,*25,*59,*68,*78,*81,*96
node(11%):hoge9, keys:*3,*14,*24,*29,*34,*66,*71,*76,*80,*93,*99
HashSha1Integer
node(11%):hoge0, keys:*29,*33,*36,*38,*46,*55,*67,*74,*81,*86,*92
node(8%):hoge1, keys:*1,*2,*18,*48,*69,*89,*93,*99
node(8%):hoge2, keys:*6,*57,*58,*62,*66,*68,*71,*78
node(12%):hoge3, keys:*8,*14,*32,*39,*42,*51,*56,*75,*79,*88,*95,*97
node(14%):hoge4, keys:*3,*9,*12,*13,*34,*40,*43,*45,*53,*59,*64,*65,*77,*94
node(10%):hoge5, keys:*7,*23,*24,*41,*49,*61,*72,*76,*80,*91
node(10%):hoge6, keys:*0,*10,*11,*15,*17,*25,*28,*35,*87,*98
node(8%):hoge7, keys:*4,*20,*21,*27,*31,*50,*63,*85
node(12%):hoge8, keys:*5,*16,*22,*26,*30,*37,*44,*47,*52,*70,*90,*96
node(7%):hoge9, keys:*19,*54,*60,*73,*82,*83,*84
HashCRC32Integer
node(13%):hoge0, keys:*11,*12,*13,*14,*16,*17,*40,*42,*44,*45,*47,*49,*98
node(9%):hoge1, keys:*58,*62,*63,*64,*65,*74,*75,*81,*96
node(3%):hoge2, keys:*29,*30,*36
node(14%):hoge3, keys:*4,*5,*9,*21,*25,*27,*32,*38,*39,*51,*78,*87,*92,*99
node(7%):hoge4, keys:*43,*50,*56,*59,*80,*86,*97
node(22%):hoge5, keys:*10,*15,*18,*19,*34,*41,*48,*52,*53,*54,*55,*57,*60,*61,*66,*67,*68,*69,*70,*71,*85,*93
node(5%):hoge6, keys:*8,*24,*28,*31,*79
node(5%):hoge7, keys:*0,*1,*35,*46,*82
node(16%):hoge8, keys:*2,*3,*6,*22,*23,*26,*37,*72,*73,*76,*77,*83,*88,*90,*94,*95
node(6%):hoge9, keys:*7,*20,*33,*84,*89,*91

crc32とかだと、似たようなキー(52-55,66-71とか)は一部集中してる感じ。逆に md5, sha1 あたりは一様にバラけてるので、バラ撒くなら後者の2つを使う方が良さそう。
というか、crc32は一部に偏りすぎな気もする(実装の問題?)

ノードを増やしてみたりした結果はこれ

$hash = new ConsistentHash(new HashMD5Integer, 32);
$hash->add(new IdentNode('hoge1'));
$hash->add(new IdentNode('hoge2'));
$hash->add(new IdentNode('hoge3'));

$map = new ConsistentHashNode($hash);
for($i = 0; $i < 10; ++$i){
    $map->put('key' . $i, 'value' . $i);
}
$hash->add(new IdentNode('hoge4'));
for($i = 10; $i < 20; ++$i){
    $map->put('key' . $i, 'value' . $i);
}
$hash->add(new IdentNode('hoge5'));
for($i = 30; $i < 40; ++$i){
    $map->put('key' . $i, 'value' . $i);
}

$allKeys = $map->keys();
foreach($hash->getNodes() as $node){
    $keys = $node->keys();
    echo 'node(', (count($keys) / count($allKeys)) * 100, '%):', $node->getName(), ', keys:', join(',', $keys), PHP_EOL;
}
node(30%):hoge1, keys:key0,key4,key6,key7,key16,key17,key30,key31,key34
node(16.6666666667%):hoge2, keys:key2,key3,key5,key9,key19
node(23.3333333333%):hoge3, keys:key1,key8,key10,key11,key12,key14,key15
node(20%):hoge4, keys:key13,key18,key32,key35,key37,key39
node(10%):hoge5, keys:key33,key36,key38

これはこういう動きが正しいのかわからない。。。(先に存在していたノードはキーが増えるよなぁ)

んで、結論。

ConsistentHashは面白い

以前のログ