<?xml version="1.0" encoding="utf-8"?>
<?xml-stylesheet href="http://blog.xole.net/rss/style.css" type="text/css"?>
<rdf:RDF xmlns="http://purl.org/rss/1.0/"
         xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
         xmlns:content="http://purl.org/rss/1.0/modules/content/"
         xmlns:dc="http://purl.org/dc/elements/1.1/"
         xml:lang="ja">
<channel rdf:about="http://blog.xole.net/rss/recent.php">
<title>ハタさんのブログ(復刻版)</title>
<link>http://blog.xole.net/index.php</link>
<dc:date>2010-02-07T04:36:35+09:00</dc:date>
<description>
ハタさんのブログ(復刻版) - RSS (RDF Site Summary) Feed.
</description>
<items>
<rdf:Seq>
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=746" />
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=745" />
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=744" />
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=743" />
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=742" />
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=741" />
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=740" />
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=739" />
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=738" />
<rdf:li rdf:resource="http://blog.xole.net/article.php?id=737" />
</rdf:Seq>
</items>
</channel>
<item rdf:about="http://blog.xole.net/article.php?id=746">
<title>HiveJDBC を S2JDBC 経由で使えるようにする</title>
<link>http://blog.xole.net/article.php?id=746</link>
<dc:date>2010-02-07T04:36:35+09:00</dc:date>
<description>hadoop の話題。その3

HiveのJDBCを使えばリモート上で動いているHiveに対して、JDBC(over thrift)経由でHive QLを実行出来るのですごく便利です。
ref - Hive/HiveClient - ...</description>
<content:encoded>
<![CDATA[
<p>hadoop の話題。その3</p>

<p>HiveのJDBCを使えばリモート上で動いているHiveに対して、JDBC(over thrift)経由でHive QLを実行出来るのですごく便利です。<br />
ref - <a href="http://wiki.apache.org/hadoop/Hive/HiveClient#head-fd2d8ae9e17fdc3d9b7048d088b2c23a53a6857d">Hive/HiveClient - Hadoop Wiki</a>
</p>

<p>HiveJDBCはフツーのJDBCっぽく使えるので、こんな感じで普通のDBサーバにSQLを投げる感覚で使える</p>

<pre class="java">
<span class="keyword" >import</span> java.sql.Connection;
<span class="keyword" >import</span> java.sql.DriverManager;
<span class="keyword" >import</span> java.sql.ResultSet;
<span class="keyword" >import</span> java.sql.SQLException;
<span class="keyword" >import</span> java.sql.Statement;

<span class="keyword" >public</span> <span class="keyword" >class</span> HiveConnection {
    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >void</span> main(String...args) <span class="keyword" >throws</span> SQLException {
        <span class="keyword" >try</span> {
            Class.forName(<span class="string" >"org.apache.hadoop.hive.jdbc.HiveDriver"</span>);
        } <span class="keyword" >catch</span> (ClassNotFoundException e) {
            e.printStackTrace();
            System.exit(<span class="number" >1</span>);
        }
        Connection conn = DriverManager.getConnection(<span class="string" >"jdbc:hive://master1:10000/"</span>, <span class="string" >""</span>, <span class="string" >""</span>);
        Statement stmt = conn.createStatement();
        stmt.execute(<span class="string" >"SELECT distinct(id) FROM hoge"</span>);
        
        ResultSet rs = stmt.getResultSet();
        <span class="keyword" >while</span>(rs.next()){
            System.out.println(rs.getString(<span class="number" >1</span>));
        }
    }
}</pre>

<p>なので、便利なものは便利なものと合体させてしまえ。ということで、みんな大好き <a href="http://s2container.seasar.org/2.4/ja/s2jdbc.html">S2JDBC</a>で使えるようにしてみた</p>

<h3>HiveServerを起動/停止</h3>
<p>とりあえず、HiveJDBCを利用するには、thrift経由でHiveServerを叩ける必要がある。<br />
以下のようなスクリプトを用意</p>

<p>start.sh</p>
<pre class="java">#!/usr/bin/env bash

pidfile=$HIVE_PID_DIR/hiveserver.pid
logfile=$HIVE_LOG_DIR/hiveserver.log

<span class="keyword" >if</span> [ -f $pidfile ]; then
    echo running as process `cat $pidfile`. stop it first
    exit <span class="number" >1</span>
fi

nohup $HIVE_HOME/bin/hive --service hiveserver &gt; $logfile <span class="number" >2</span>&gt;&amp;<span class="number" >1</span> &lt; /dev/<span class="keyword" >null</span> &amp;
echo $! &gt; $pidfile
</pre>

<p>stop.sh</p>
<pre class="java">#!/usr/bin/env bash

pidfile=$HIVE_PID_DIR/hiveserver.pid
logfile=$HIVE_LOG_DIR/hiveserver.log

<span class="keyword" >if</span> [ -f $pidfile ]; then
    kill `cat $pidfile`
    rm $pidfile
<span class="keyword" >else</span>
    echo no pidfile $pidfile
fi
</pre>

<p>これで、実行したサーバでデフォルトのポート10000で起動する。</p>
<p>ちなみに、HADOOP_CONF_DIRとかHIVE_CONF_DIRは別途ちゃんと設定すること</p>


<h3>HiveDialectを用意</h3>

<p>先に書いておくと、SQLのorder by ...に相当するものは、HiveQLだと <a href="http://wiki.apache.org/hadoop/Hive/LanguageManual/SortBy">sort by</a> ...なんだけど、今はまだ用意してない。今度書く</p>

<pre class="java">
<span class="keyword" >package</span> org.seasar.extension.jdbc.dialect;

<span class="keyword" >import</span> org.seasar.extension.jdbc.SelectForUpdateType;

<span class="keyword" >public</span> <span class="keyword" >class</span> HiveDialect <span class="keyword" >extends</span> StandardDialect {
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> String getName() {
        <span class="keyword" >return</span> <span class="string" >"hive"</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsLimit() {
        <span class="keyword" >return</span> <span class="keyword" >true</span>;
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> String convertLimitSql(String sql, <span class="keyword" >int</span> offset, <span class="keyword" >int</span> limit) {
        StringBuilder buf = <span class="keyword" >new</span> StringBuilder(sql.length() + <span class="number" >20</span>);
        buf.append(sql);
        buf.append(<span class="string" >" limit "</span>);
        buf.append(limit);
        <span class="keyword" >return</span> buf.toString();
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsBatchUpdateResults() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsCursor() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsForUpdate(SelectForUpdateType type, <span class="keyword" >boolean</span> withTarget) {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsGetGeneratedKeys() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsIdentity() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsInnerJoinForUpdate() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsLockHint() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsOffset() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsOffsetWithoutLimit() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsOuterJoinForUpdate() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> supportsSequence() {
        <span class="keyword" >return</span> <span class="keyword" >false</span>;
    }
    
}</pre>

<h3>HiveConnectionPoolを用意</h3>

<p>これは、HiveがXA transactionとかconnection#closeなんかを実行することができない(未実装のコードなので...<a href="http://svn.apache.org/repos/asf/hadoop/hive/trunk/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java">svn:..jdbc/HiveConnection</a>)ので ConnectionPoolImpl を継承して ConnectionWrapper を取る部る部分を override </p>

<pre class="java">
<span class="keyword" >package</span> org.seasar.extension.dbcp.impl;

<span class="keyword" >import</span> java.sql.Connection;
<span class="keyword" >import</span> java.sql.SQLException;
<span class="keyword" >import</span> java.util.Map;
<span class="keyword" >import</span> java.util.concurrent.ConcurrentHashMap;

<span class="keyword" >import</span> javax.sql.XAConnection;
<span class="keyword" >import</span> javax.transaction.Transaction;

<span class="keyword" >import</span> org.seasar.extension.dbcp.ConnectionWrapper;
<span class="keyword" >import</span> org.seasar.framework.util.TransactionManagerUtil;

<span class="keyword" >public</span> <span class="keyword" >class</span> HiveConnectionPool <span class="keyword" >extends</span> ConnectionPoolImpl {
    
    <span class="keyword" >private</span> Map&lt;Transaction, ConnectionWrapper&gt; txActivePool = <span class="keyword" >new</span> ConcurrentHashMap&lt;Transaction, ConnectionWrapper&gt;();

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >synchronized</span> ConnectionWrapper checkOut() <span class="keyword" >throws</span> SQLException {
        Transaction tx = getTransaction();
        ConnectionWrapper conn = txActivePool.get(tx);
        <span class="keyword" >if</span>(<span class="keyword" >null</span> == conn){
            conn = createConnection(tx);
        }
        <span class="keyword" >if</span>(<span class="keyword" >null</span> != tx){
            txActivePool.put(tx, conn);
        }
        <span class="keyword" >return</span> conn;
    }
    
    <span class="keyword" >protected</span> ConnectionWrapper createConnection(Transaction transaction) <span class="keyword" >throws</span> SQLException {
        XAConnection xaConnection = getXADataSource().getXAConnection();
        Connection connection = xaConnection.getConnection();
        <span class="keyword" >return</span> <span class="keyword" >new</span> HiveConnectionWrapper(xaConnection, connection, <span class="keyword" >this</span>, transaction);
    }
    
    <span class="keyword" >protected</span> Transaction getTransaction(){
        <span class="keyword" >return</span> TransactionManagerUtil.getTransaction(getTransactionManager());
    }
    
}
</pre>

<h3>HiveConnectionWrapper を用意する</h3>
<p>これは後述するPreparedStatement対策として用意。PreparedStatementWrapper さえも wrapper を用意する必要があるので、ConnectionWrapperImpl を override</p>

<pre class="java">
<span class="keyword" >package</span> org.seasar.extension.dbcp.impl;

<span class="keyword" >import</span> java.sql.Connection;
<span class="keyword" >import</span> java.sql.PreparedStatement;
<span class="keyword" >import</span> java.sql.SQLException;

<span class="keyword" >import</span> javax.sql.XAConnection;
<span class="keyword" >import</span> javax.transaction.Transaction;

<span class="keyword" >import</span> org.seasar.extension.dbcp.ConnectionPool;
<span class="keyword" >import</span> org.seasar.framework.exception.SSQLException;

<span class="keyword" >public</span> <span class="keyword" >class</span> HiveConnectionWrapper <span class="keyword" >extends</span> ConnectionWrapperImpl {

    <span class="keyword" >public</span> HiveConnectionWrapper(XAConnection xaConnection,
            Connection physicalConnection, ConnectionPool connectionPool,
            Transaction tx) <span class="keyword" >throws</span> SQLException {
        <span class="keyword" >super</span>(xaConnection, physicalConnection, connectionPool, tx);
    }
    
    <span class="keyword" >protected</span> SQLException wrapException(<span class="keyword" >final</span> SQLException e, <span class="keyword" >final</span> String sql) {
        <span class="keyword" >return</span> <span class="keyword" >new</span> SSQLException(<span class="string" >"ESSR0072"</span>,
                <span class="keyword" >new</span> Object[] { sql, e.getMessage(),
                        <span class="keyword" >new</span> Integer(e.getErrorCode()), e.getSQLState() }, e
                        .getSQLState(), e.getErrorCode(), e, sql);
    }
    
    <span class="keyword" >protected</span> <span class="keyword" >void</span> assertOpened() <span class="keyword" >throws</span> SQLException {
        <span class="keyword" >if</span> (isClosed()) {
            <span class="keyword" >throw</span> <span class="keyword" >new</span> SSQLException(<span class="string" >"ESSR0062"</span>, <span class="keyword" >null</span>);
        }
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> PreparedStatement prepareStatement(<span class="keyword" >final</span> String sql) <span class="keyword" >throws</span> SQLException {
        assertOpened();
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql), sql);
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException ex) {
            release();
            <span class="keyword" >throw</span> wrapException(ex, sql);
        }
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> PreparedStatement prepareStatement(<span class="keyword" >final</span> String sql, <span class="keyword" >final</span> <span class="keyword" >int</span> resultSetType, <span class="keyword" >final</span> <span class="keyword" >int</span> resultSetConcurrency) <span class="keyword" >throws</span> SQLException {
        assertOpened();
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency), sql);
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException ex) {
            release();
            <span class="keyword" >throw</span> wrapException(ex, sql);
        }
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> PreparedStatement prepareStatement(<span class="keyword" >final</span> String sql, <span class="keyword" >final</span> <span class="keyword" >int</span> resultSetType, <span class="keyword" >final</span> <span class="keyword" >int</span> resultSetConcurrency, <span class="keyword" >final</span> <span class="keyword" >int</span> resultSetHoldability) <span class="keyword" >throws</span> SQLException {
        assertOpened();
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql);
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException ex) {
            release();
            <span class="keyword" >throw</span> wrapException(ex, sql);
        }
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> PreparedStatement prepareStatement(<span class="keyword" >final</span> String sql, <span class="keyword" >final</span> <span class="keyword" >int</span> autoGeneratedKeys) <span class="keyword" >throws</span> SQLException {
        assertOpened();
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, autoGeneratedKeys), sql);
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException ex) {
            release();
            <span class="keyword" >throw</span> wrapException(ex, sql);
        }
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> PreparedStatement prepareStatement(<span class="keyword" >final</span> String sql, <span class="keyword" >final</span> <span class="keyword" >int</span>[] columnIndexes) <span class="keyword" >throws</span> SQLException {
        assertOpened();
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnIndexes), sql);
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException ex) {
            release();
            <span class="keyword" >throw</span> wrapException(ex, sql);
        }
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> PreparedStatement prepareStatement(<span class="keyword" >final</span> String sql, <span class="keyword" >final</span> String[] columnNames) <span class="keyword" >throws</span> SQLException {
        assertOpened();
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnNames), sql);
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException ex) {
            release();
            <span class="keyword" >throw</span> wrapException(ex, sql);
        }
    }
}</pre>

<h3>HivePreparedStatementWrapperを用意する</h3>
<p>これもConnectionと同じように、PreparedStatementの大半のコードが未実装(<a href="http://svn.apache.org/repos/asf/hadoop/hive/trunk/jdbc/src/java/org/apache/hadoop/hive/jdbc/HivePreparedStatement.java">svn:jdbc/HivePreparedStatement</a>)なので、PreparedStatementWrapper を継承して override</p>

<pre class="java">
<span class="keyword" >package</span> org.seasar.extension.dbcp.impl;

<span class="keyword" >import</span> java.sql.PreparedStatement;
<span class="keyword" >import</span> java.sql.ResultSet;
<span class="keyword" >import</span> java.sql.SQLException;

<span class="keyword" >import</span> org.seasar.extension.jdbc.impl.PreparedStatementWrapper;
<span class="keyword" >import</span> org.seasar.framework.exception.SSQLException;

<span class="keyword" >public</span> <span class="keyword" >class</span> HivePreparedStatementWrapper <span class="keyword" >extends</span> PreparedStatementWrapper {

    <span class="keyword" >protected</span> <span class="keyword" >final</span> PreparedStatement original;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> String sql;
    
    <span class="keyword" >public</span> HivePreparedStatementWrapper(PreparedStatement original, String sql) {
        <span class="keyword" >super</span>(original, sql);
        <span class="keyword" >this</span>.original = original;
        <span class="keyword" >this</span>.sql = sql;
    }
    
    <span class="keyword" >protected</span> SQLException wrapException(<span class="keyword" >final</span> SQLException e) {
        <span class="keyword" >return</span> wrapException(e, sql);
    }
    
    <span class="keyword" >protected</span> SQLException wrapException(<span class="keyword" >final</span> SQLException e, <span class="keyword" >final</span> String sql) {
        <span class="keyword" >if</span> (sql != <span class="keyword" >null</span>) {
            <span class="keyword" >return</span> <span class="keyword" >new</span> SSQLException(<span class="string" >"ESSR0072"</span>, <span class="keyword" >new</span> Object[] { sql,
                    String.valueOf(e.getErrorCode()), e.getSQLState() }, e
                    .getSQLState(), e.getErrorCode(), e, sql);
        }
        <span class="keyword" >return</span> e;
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >void</span> close() {
        <span class="comment" >// HivePreparedStatement was not supported in #close</span>
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> ResultSet executeQuery() <span class="keyword" >throws</span> SQLException {
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HiveResultSetWrapper(original.executeQuery());
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException e) {
            <span class="keyword" >throw</span> wrapException(e);
        }
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> ResultSet executeQuery(<span class="keyword" >final</span> String sql) <span class="keyword" >throws</span> SQLException {
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HiveResultSetWrapper(original.executeQuery(sql));
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException e) {
            <span class="keyword" >throw</span> wrapException(e, sql);
        }
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> ResultSet getResultSet() <span class="keyword" >throws</span> SQLException {
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HiveResultSetWrapper(original.getResultSet());
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException e) {
            <span class="keyword" >throw</span> wrapException(e);
        }
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> ResultSet getGeneratedKeys() <span class="keyword" >throws</span> SQLException {
        <span class="keyword" >try</span> {
            <span class="keyword" >return</span> <span class="keyword" >new</span> HiveResultSetWrapper(original.getGeneratedKeys());
        } <span class="keyword" >catch</span> (<span class="keyword" >final</span> SQLException e) {
            <span class="keyword" >throw</span> wrapException(e);
        }
    }
}</pre>

<h3>HiveResultSetWrapper を用意</h3>
<p>まだまだ続きます...HiveResultSet にも未実装が(<a href="http://svn.apache.org/repos/asf/hadoop/hive/trunk/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveResultSet.java">svn:jdbc/HiveResultSet</a>)なので、ResultSetWrapper を override</p>

<pre class="java">
<span class="keyword" >package</span> org.seasar.extension.dbcp.impl;

<span class="keyword" >import</span> java.sql.Date;
<span class="keyword" >import</span> java.sql.ResultSet;
<span class="keyword" >import</span> java.sql.SQLException;

<span class="keyword" >import</span> org.seasar.extension.jdbc.impl.ResultSetWrapper;

<span class="keyword" >public</span> <span class="keyword" >class</span> HiveResultSetWrapper <span class="keyword" >extends</span> ResultSetWrapper {
    
    <span class="keyword" >protected</span> <span class="keyword" >static</span> <span class="keyword" >final</span> String DOUBLE_QUOTE = <span class="string" >"\""</span>;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> ResultSet rs;
    
    <span class="keyword" >public</span> HiveResultSetWrapper(ResultSet rs){
        <span class="keyword" >super</span>(rs);
        <span class="keyword" >this</span>.rs = rs;
    }

    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> <span class="keyword" >void</span> close() <span class="keyword" >throws</span> SQLException {
        <span class="comment" >// HiveResultSet was not supported in #close</span>
    }
    
    <span class="keyword" >protected</span> Date parseDate(<span class="keyword" >final</span> String dt) <span class="keyword" >throws</span> Exception {
        String tmp = dt;
        <span class="keyword" >if</span>(dt.startsWith(DOUBLE_QUOTE) &amp;&amp; dt.endsWith(DOUBLE_QUOTE)){
            <span class="keyword" >int</span> length = dt.length();
            tmp = dt.substring(<span class="number" >1</span>, length - <span class="number" >1</span>);
        }
        <span class="keyword" >return</span> Date.valueOf(tmp);
    }
    
    <span class="annotation" >@Override</span>
    <span class="keyword" >public</span> Date getDate(<span class="keyword" >int</span> columnIndex) <span class="keyword" >throws</span> SQLException {
        Object obj = getObject(columnIndex);
        <span class="keyword" >if</span> (obj == <span class="keyword" >null</span>) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
        <span class="keyword" >if</span>(obj <span class="keyword" >instanceof</span> String){
            <span class="keyword" >try</span> {
                <span class="keyword" >return</span> parseDate((String) obj);
            } <span class="keyword" >catch</span>(Exception e){
                <span class="keyword" >throw</span> <span class="keyword" >new</span> SQLException(<span class="string" >"Cannot convert column "</span> + columnIndex + <span class="string" >" to date: "</span> + e.toString());
            }
        }
        <span class="keyword" >return</span> <span class="keyword" >super</span>.getDate(columnIndex);
    }
    
}
</pre>

<h3>jdbc.diconを修正</h3>
<p>やっと、s2jdbcに近づいてきました...</p>

<pre class="java">&lt;?xml version=<span class="string" >"1.0"</span> encoding=<span class="string" >"UTF-8"</span>?&gt;
&lt;!DOCTYPE components PUBLIC <span class="string" >"-//SEASAR2.1//DTD S2Container//EN"</span> <span class="string" >"http://www.seasar.org/dtd/components21.dtd"</span>&gt;
&lt;components namespace=<span class="string" >"jdbc_hive"</span>&gt;
    &lt;!-- Hive does not support in jta --&gt;
    &lt;include path=<span class="string" >"jta.dicon"</span>/&gt;
    
    &lt;component name=<span class="string" >"hiveDataSource"</span> <span class="keyword" >class</span>=<span class="string" >"org.seasar.extension.dbcp.impl.XADataSourceImpl"</span>&gt;
        &lt;property name=<span class="string" >"driverClassName"</span>&gt;<span class="string" >"org.apache.hadoop.hive.jdbc.HiveDriver"</span>&lt;/property&gt;
        &lt;property name=<span class="string" >"URL"</span>&gt;<span class="string" >"jdbc:hive://master1:10000/"</span>&lt;/property&gt;
        &lt;property name=<span class="string" >"user"</span>&gt;<span class="string" >""</span>&lt;/property&gt;
        &lt;property name=<span class="string" >"password"</span>&gt;<span class="string" >""</span>&lt;/property&gt;
    &lt;/component&gt;

    &lt;component name=<span class="string" >"hive"</span> <span class="keyword" >class</span>=<span class="string" >"org.seasar.extension.dbcp.impl.DataSourceImpl"</span> autoBinding=<span class="string" >"none"</span>&gt;
        &lt;arg&gt;
            &lt;component name=<span class="string" >"hivePool"</span> <span class="keyword" >class</span>=<span class="string" >"org.seasar.extension.dbcp.impl.HiveConnectionPool"</span> autoBinding=<span class="string" >"none"</span>&gt;
                &lt;property name=<span class="string" >"xaDataSource"</span>&gt;hiveDataSource&lt;/property&gt;
                &lt;property name=<span class="string" >"timeout"</span>&gt;<span class="number" >600</span>&lt;/property&gt;
                &lt;property name=<span class="string" >"maxPoolSize"</span>&gt;<span class="number" >10</span>&lt;/property&gt;
                &lt;property name=<span class="string" >"allowLocalTx"</span>&gt;<span class="keyword" >true</span>&lt;/property&gt;
                &lt;property name=<span class="string" >"readOnly"</span>&gt;<span class="keyword" >true</span>&lt;/property&gt;
                &lt;property name=<span class="string" >"transactionManager"</span>&gt;TransactionManager&lt;/property&gt;
                &lt;destroyMethod name=<span class="string" >"close"</span>/&gt;
            &lt;/component&gt;
        &lt;/arg&gt;
    &lt;/component&gt;
&lt;/components&gt;</pre>

<h3>s2jdbc.diconを用意</h3>
<p>dialectをs2jdbcに読ませる</p>

<pre class="java">&lt;?xml version=<span class="string" >"1.0"</span> encoding=<span class="string" >"UTF-8"</span>?&gt;
&lt;!DOCTYPE components PUBLIC <span class="string" >"-//SEASAR//DTD S2Container 2.4//EN"</span> <span class="string" >"http://www.seasar.org/dtd/components24.dtd"</span>&gt;
&lt;components&gt;
    &lt;include path=<span class="string" >"jta.dicon"</span>/&gt;
    &lt;include path=<span class="string" >"tx.dicon"</span>/&gt;
    &lt;include path=<span class="string" >"s2jdbc-internal.dicon"</span>/&gt;
    
    &lt;include path=<span class="string" >"jdbc-hive.dicon"</span> /&gt;
    
    &lt;component name=<span class="string" >"hiveDialect"</span> <span class="keyword" >class</span>=<span class="string" >"org.seasar.extension.jdbc.dialect.HiveDialect"</span> /&gt;
            
    &lt;component name=<span class="string" >"hiveJdbcManager"</span> <span class="keyword" >class</span>=<span class="string" >"org.seasar.extension.jdbc.manager.JdbcManagerImpl"</span>&gt;
        &lt;property name=<span class="string" >"maxRows"</span>&gt;<span class="number" >0</span>&lt;/property&gt;
        &lt;property name=<span class="string" >"fetchSize"</span>&gt;<span class="number" >0</span>&lt;/property&gt;
        &lt;property name=<span class="string" >"queryTimeout"</span>&gt;<span class="number" >0</span>&lt;/property&gt;
        &lt;property name=<span class="string" >"dialect"</span>&gt;hiveDialect&lt;/property&gt;
        &lt;initMethod name=<span class="string" >"init"</span> /&gt;
    &lt;/component&gt;
&lt;/components&gt;</pre>

<h3>S2JDBC経由でHiveQLを投げてみる</h3>
<p>Hiveはinsert文は無いので、LOAD DATA 文です。<br />
しかも、preparedStatementで値のbindできないので、直接クエリを書きます。<br />
さらに、getSingleResult であっても、何も結果セットがかえってこない(<a href="http://svn.apache.org/repos/asf/hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java">svn:hive/service/HiveServier</a>)ので、<code>selectBySql(Integer.class...)</code>とかはできません。<br />
あきらめて、selectBySql(String.class) で void にしてます。。</p>

<p>selectについても、preparedStatementでbindで値のbindができないので、直接クエリを書きます。<br />
今はまだ、order by の書き換え(sort by)を行っていないので、orderByが使えません。直接クエリに書きます。<br />
joinについても少し難あり。。。</p>

<p>等など、ということで、いくらか諦めると...!</p>

<pre class="java">
<span class="annotation" >@Component</span>
<span class="annotation" >@InterType</span>(<span class="string" >"aop.propertyInterType"</span>)
<span class="keyword" >public</span> <span class="keyword" >class</span> HogeService {
    <span class="annotation" >@Property</span>(PropertyType.WRITE)
    <span class="keyword" >protected</span> JdbcManager hive;

    <span class="keyword" >public</span> <span class="keyword" >void</span> insert(String path, String part, String subPart){
        hive.selectBySql(String.<span class="keyword" >class</span>, <span class="string" >"LOAD DATA INPATH '"</span> + path + <span class="string" >"' INTO TABLE hoge PARTITION (p1 = '"</span> + part + <span class="string" >"', p2 = '"</span> + subPart + <span class="string" >"'"</span>).getSingleResult();
    }
    <span class="keyword" >public</span> <span class="keyword" >void</span> insertFromLocal(String path, String part, String subPart){
        hive.selectBySql(String.<span class="keyword" >class</span>, <span class="string" >"LOAD DATA LOCAL INPATH '"</span> + path + <span class="string" >"' INTO TABLE hoge PARTITION (p1 = '"</span> + part + <span class="string" >"', p2 = '"</span> + subPart + <span class="string" >"'"</span>).getSingleResult();
    }
    <span class="keyword" >public</span> <span class="keyword" >void</span> overwrite(String path, String part, String subPart){
        hive.selectBySql(String.<span class="keyword" >class</span>, <span class="string" >"LOAD DATA INPATH '"</span> + path + <span class="string" >"' OVERWRITE INTO TABLE hoge PARTITION (p1 = '"</span> + part + <span class="string" >"', p2 = '"</span> + subPart + <span class="string" >"'"</span>).getSingleResult();
    }
    <span class="keyword" >public</span> <span class="keyword" >void</span> overwriteFromLocal(String path, String part, String subPart){
        hive.selectBySql(String.<span class="keyword" >class</span>, <span class="string" >"LOAD DATA INPATH '"</span> + path + <span class="string" >"' OVERWRITE INTO TABLE hoge PARTITION (p1 = '"</span> + part + <span class="string" >"', p2 = '"</span> + subPart + <span class="string" >"'"</span>).getSingleResult();
    }
    <span class="keyword" >public</span> List&lt;Hoge&gt; getHoge(){
        <span class="keyword" >return</span> hive.from(Hoge.<span class="keyword" >class</span>).where(<span class="string" >"id &gt; 1234"</span>).limit(<span class="number" >100</span>).getResultList();
    }
}
</pre>

<p>＼(^o^)／　やたー！うごいたー！</p>

<h3>おわり。</h3>
<p>ってことで、とりあえず、HiveをS2JDBC経由で利用できるようになったよ！</p>
<p>Seasarのパッケージはホント便利！ほとんどwrapperが用意されてるし、diconで切り替えれるから修正が楽！<br />
でも、coreなパッケージほど、 private メソッド多いよ！せめて protected か getter を用意して！(無駄にコピーしちゃったのがいくつかある...)</p>

<p>ああああ、でもこれなら HiveJDBC をちゃんと修正した方が早かったかも。。orz<br />
今度やってみよう。。</p>
]]>
</content:encoded>
</item>
<item rdf:about="http://blog.xole.net/article.php?id=745">
<title>HDFS の HadoopThriftServer をなんとかする</title>
<link>http://blog.xole.net/article.php?id=745</link>
<dc:date>2010-02-07T03:27:10+09:00</dc:date>
<description>hadoop の話題。その2
hadoop を支える HDFS には HDFS-APIを通すことで、プログラム中から HDFS の読み書きが出きるようになります。(たぶん、hdfs-s3 なんかもこのAPI経由(? ソース読んでない))...</description>
<content:encoded>
<![CDATA[
<p>hadoop の話題。その2</p>
<p>hadoop を支える <a href="http://hadoop.apache.org/hdfs/">HDFS</a> には <a href="http://wiki.apache.org/hadoop/HDFS-APIs">HDFS-API</a>を通すことで、プログラム中から HDFS の読み書きが出きるようになります。(たぶん、<a href="http://wiki.apache.org/hadoop/AmazonS3">hdfs-s3</a> なんかもこのAPI経由(? ソース読んでない))</p>

<p>(中略)</p>

<p>んで、この HDFS-API のなかに、Thrift を使って リモート上から HDFS の読み書きをできるようにしている <a href="http://svn.apache.org/repos/asf/hadoop/hdfs/trunk/src/contrib/thriftfs/">HadoopThriftServer(theiftfs)</a> があります。</p>

<p>この thriftfs の起動は <a href="http://svn.apache.org/repos/asf/hadoop/hdfs/trunk/src/contrib/thriftfs/scripts/start_thrift_server.sh">
</a> に書かれているのですが、shellを握ってしまうのでこんな感じにしました。</p>

<pre class="javascript">
<span class="preprocessor" >#!/usr/bin/env bash</span>

THRIFTFS_PID_FILE=$HADOOP_PID_DIR/thrift.pid
THRIFTFS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log

<span class="keyword" >if</span> [ -f $THRIFTFS_PID_FILE ]; then
    echo running as process `cat $THRIFTFS_PID_FILE`. stop it first
    exit 1
fi

CLASSPATH=$HADOOP_CONF_DIR
CLASSPATH=$CLASSPATH:$HADOOP_HOME/hadoop-0.20.1-core.jar:$HADOOP_HOME/hadoop-0.20.1-tools.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-1.0.4.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-api-1.0.4.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/log4j-1.2.15.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/contrib/thriftfs/hadoop-0.20.1-thriftfs.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/hadoopthriftapi.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/libthrift.jar

nohup java -Dcom.sun.management.jmxremote -cp $CLASSPATH org.apache.hadoop.thriftfs.HadoopThriftServer 10010 &gt; $THRIFTFS_LOG_FILE 2&gt;&amp;1 &lt; /dev/<span class="keyword" >null</span> &amp;
echo $! &gt; $THRIFTFS_PID_FILE

$HADOOP_HOME/bin/hadoop dfsadmin -safemode leave</pre>

<p>同様に 停止は</p>


<pre class="javascript">
<span class="preprocessor" >#!/usr/bin/env bash</span>

THRIFT_FS_PID_FILE=$HADOOP_PID_DIR/thrift.pid
THRIFT_FS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log

<span class="keyword" >if</span> [ -f $THRIFT_FS_PID_FILE ]; then
    kill `cat $THRIFT_FS_PID_FILE`
    rm $THRIFT_FS_PID_FILE
<span class="keyword" >else</span>
    echo no pidfile $THRIFT_FS_PID_FILE
fi</pre>

<p>起動する際は dfs($HADOOP_HOME/bin/start-dfs.sh)が起動している状態で起動する必要があります。<br />
(dfsadmin -safemode leave は適宜行ってください)</p>

<p>んで、このThriftFSは、こんな感じでリモート上の DFS のファイルを読み書きできます</p>

<pre class="java">
<span class="keyword" >import</span> org.apache.hadoop.conf.Configuration;
<span class="keyword" >import</span> org.apache.hadoop.hdfs.protocol.FSConstants;
<span class="keyword" >import</span> org.apache.hadoop.fs.permission.FsPermission;

<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.Pathname;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftHandle;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftIOException;

<span class="keyword" >import</span> com.facebook.thrift.TException;
<span class="keyword" >import</span> com.facebook.thrift.protocol.TBinaryProtocol;
<span class="keyword" >import</span> com.facebook.thrift.protocol.TProtocol;
<span class="keyword" >import</span> com.facebook.thrift.transport.TSocket;
<span class="keyword" >import</span> com.facebook.thrift.transport.TTransportException;

<span class="keyword" >public</span> <span class="keyword" >class</span> HDFSInput {
    
    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >void</span> main(String...args) {
        <span class="keyword" >final</span> <span class="keyword" >int</span> defaultBufferSize = config.getInt(<span class="string" >"io.file.buffer.size"</span>, <span class="number" >4096</span>);
        <span class="keyword" >final</span> <span class="keyword" >long</span> defaultBlockSize = config.getLong(<span class="string" >"dfs.block.size"</span>, FSConstants.DEFAULT_BLOCK_SIZE);
        <span class="keyword" >final</span> <span class="keyword" >short</span> defaultReplication = (<span class="keyword" >short</span>) config.getInt(<span class="string" >"dfs.replication"</span>, <span class="number" >3</span>);

        TSocket socket = <span class="keyword" >new</span> TSocket(<span class="string" >"master1"</span>, <span class="number" >10010</span>);
        TProtocol protocol = <span class="keyword" >new</span> TBinaryProtocol(socket);
        
        <span class="keyword" >try</span> {
            socket.open();
            <span class="keyword" >try</span> {
                ThriftHadoopFileSystem.Client client = <span class="keyword" >new</span> ThriftHadoopFileSystem.Client(protocol);
                <span class="comment" >// client timeout 5 sec</span>
                client.setTimeout(<span class="number" >5</span>);
                Pathname hoge = <span class="keyword" >new</span> Pathname(<span class="string" >"/tmp/hoge"</span>);

                FsPermission permission = FsPermission.createImmutable((<span class="keyword" >short</span>) <span class="number" >0655</span>);
                <span class="keyword" >boolean</span> overwrite = <span class="keyword" >true</span>;

                ThriftHandle writeHandler = client.createFile(hoge, permission.toShort(), overwrite, defaultBufferSize, defaultReplication, defaultBlockSize);
                client.write(writeHandler, <span class="string" >"hello world"</span>);
                client.close(writeHandler);

                ThriftHandle readHandler = client.open(hoge);
                System.out.println(client.read(readHandler, <span class="number" >0</span>, <span class="number" >1024</span>));
                client.close(readHandler);
            } <span class="keyword" >catch</span> (TTransportException e) {
                e.printStackTrace();
            } <span class="keyword" >catch</span> (ThriftIOException e) {
                e.printStackTrace();
            }
        } <span class="keyword" >catch</span> (TException e) {
            e.printStackTrace();
        } <span class="keyword" >finally</span> {
            socket.close();
        }
    }
}
</pre>

<p>read時にBufferedReaderにwrapするともう少し便利に読み書きできる(これは今度書く)</p>

<p>んで、読み書きできるようになったんだけど、どうも複数のクライアントから連続して読み書きをすると、整合性が取れなくなってしまう(?)のかエラーがでるようになった。<br />
元の<a href="http://svn.apache.org/repos/asf/hadoop/hdfs/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java">ソース</a>を読むと...なんともエレガントな。。。</p>

<p>ということで、java.util.concurrent.atomic を使って書き直してみた(ここが本題)</p>
<pre class="java">
<span class="keyword" >package</span> org.apache.hadoop.thriftfs;

<span class="keyword" >import</span> java.io.IOException;
<span class="keyword" >import</span> java.net.InetSocketAddress;
<span class="keyword" >import</span> java.net.ServerSocket;
<span class="keyword" >import</span> java.util.LinkedList;
<span class="keyword" >import</span> java.util.List;
<span class="keyword" >import</span> java.util.Random;
<span class="keyword" >import</span> java.util.concurrent.ConcurrentHashMap;
<span class="keyword" >import</span> java.util.concurrent.atomic.AtomicLong;

<span class="keyword" >import</span> org.apache.commons.logging.Log;
<span class="keyword" >import</span> org.apache.commons.logging.LogFactory;
<span class="keyword" >import</span> org.apache.hadoop.conf.Configuration;
<span class="keyword" >import</span> org.apache.hadoop.fs.FSDataInputStream;
<span class="keyword" >import</span> org.apache.hadoop.fs.FSDataOutputStream;
<span class="keyword" >import</span> org.apache.hadoop.fs.FileSystem;
<span class="keyword" >import</span> org.apache.hadoop.fs.Path;
<span class="keyword" >import</span> org.apache.hadoop.fs.permission.FsPermission;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.Pathname;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftHandle;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftIOException;
<span class="keyword" >import</span> org.apache.hadoop.util.Daemon;
<span class="keyword" >import</span> org.apache.hadoop.util.StringUtils;

<span class="keyword" >import</span> com.facebook.thrift.protocol.TBinaryProtocol;
<span class="keyword" >import</span> com.facebook.thrift.server.TServer;
<span class="keyword" >import</span> com.facebook.thrift.server.TThreadPoolServer;
<span class="keyword" >import</span> com.facebook.thrift.transport.TServerSocket;
<span class="keyword" >import</span> com.facebook.thrift.transport.TServerTransport;
<span class="keyword" >import</span> com.facebook.thrift.transport.TTransportFactory;

<span class="keyword" >public</span> <span class="keyword" >class</span> HadoopThriftServer <span class="keyword" >extends</span> ThriftHadoopFileSystem {

    <span class="keyword" >static</span> <span class="keyword" >int</span> serverPort = <span class="number" >0</span>;                    <span class="comment" >// default port</span>
    TServer    server = <span class="keyword" >null</span>;

    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >class</span> HadoopThriftHandler <span class="keyword" >implements</span> ThriftHadoopFileSystem.Iface
    {

      <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >final</span> Log LOG = LogFactory.getLog(<span class="string" >"org.apache.hadoop.thrift"</span>);

      <span class="comment" >// HDFS glue</span>
      Configuration conf;
      FileSystem fs;
          
      <span class="comment" >// stucture that maps each Thrift object into an hadoop object</span>
      <span class="keyword" >private</span> AtomicLong nextId = <span class="keyword" >new</span> AtomicLong(<span class="keyword" >new</span> Random().nextLong());
      <span class="keyword" >private</span> ConcurrentHashMap&lt;Long, Object&gt; hadoopHash = <span class="keyword" >new</span> ConcurrentHashMap&lt;Long, Object&gt;();
      <span class="keyword" >private</span> Daemon inactivityThread = <span class="keyword" >null</span>;

      <span class="comment" >// Detect inactive session</span>
      <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >volatile</span> <span class="keyword" >long</span> inactivityPeriod = <span class="number" >3600</span> * <span class="number" >1000</span>; <span class="comment" >// 1 hr</span>
      <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >volatile</span> <span class="keyword" >long</span> inactivityRecheckInterval = <span class="number" >60</span> * <span class="number" >1000</span>;
      <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >volatile</span> <span class="keyword" >boolean</span> fsRunning = <span class="keyword" >true</span>;
      <span class="keyword" >private</span> AtomicLong now = <span class="keyword" >new</span> AtomicLong(now());

      <span class="comment" >// allow outsider to change the hadoopthrift path</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> setOption(String key, String val) {
      }

      <span class="comment" >/**
       * Current system time.
       * @return current time in msec.
       */</span>
      <span class="keyword" >static</span> <span class="keyword" >long</span> now() {
        <span class="keyword" >return</span> System.currentTimeMillis();
      }

      <span class="comment" >/**
      * getVersion
      *
      * @return current version of the interface.
      */</span>
      <span class="keyword" >public</span> String getVersion() {
        <span class="keyword" >return</span> <span class="string" >"0.1"</span>;
      }

      <span class="comment" >/**
       * shutdown
       *
       * cleanly closes everything and exit.
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> shutdown(<span class="keyword" >int</span> status) {
        LOG.info(<span class="string" >"HadoopThriftServer shutting down."</span>);
        <span class="keyword" >try</span> {
          fs.close();
        } <span class="keyword" >catch</span> (IOException e) {
          LOG.warn(<span class="string" >"Unable to close file system"</span>);
        }
        Runtime.getRuntime().exit(status);
      }

      <span class="comment" >/**
       * Periodically checks to see if there is inactivity
       */</span>
      <span class="keyword" >class</span> InactivityMonitor <span class="keyword" >implements</span> Runnable {
        <span class="keyword" >public</span> <span class="keyword" >void</span> run() {
          <span class="keyword" >while</span> (fsRunning) {
            <span class="keyword" >try</span> {
              <span class="keyword" >if</span> (now() &gt; now.get() + inactivityPeriod) {
                LOG.warn(<span class="string" >"HadoopThriftServer Inactivity period of "</span> +
                         inactivityPeriod + <span class="string" >" expired... Stopping Server."</span>);
                shutdown(-<span class="number" >1</span>);
              }
            } <span class="keyword" >catch</span> (Exception e) {
              LOG.error(StringUtils.stringifyException(e));
            }
            <span class="keyword" >try</span> {
              Thread.sleep(inactivityRecheckInterval);
            } <span class="keyword" >catch</span> (InterruptedException ie) {
            }
          }
        }
      }

      <span class="comment" >/**
       * HadoopThriftServer
       *
       * Constructor for the HadoopThriftServer glue with Thrift Class.
       *
       * @param name - the name of this handler
       */</span>
      <span class="keyword" >public</span> HadoopThriftHandler(String name) {
        conf = <span class="keyword" >new</span> Configuration();
        now.set(now());
        <span class="keyword" >try</span> {
          inactivityThread = <span class="keyword" >new</span> Daemon(<span class="keyword" >new</span> InactivityMonitor());
          fs = FileSystem.get(conf);
        } <span class="keyword" >catch</span> (IOException e) {
          LOG.warn(<span class="string" >"Unable to open hadoop file system..."</span>);
          Runtime.getRuntime().exit(-<span class="number" >1</span>);
        }
      }

      <span class="comment" >/**
        * printStackTrace
        *
        * Helper function to print an exception stack trace to the log and not stderr
        *
        * @param e the exception
        *
        */</span>
      <span class="keyword" >static</span> <span class="keyword" >private</span> <span class="keyword" >void</span> printStackTrace(Exception e) {
        <span class="keyword" >for</span>(StackTraceElement s: e.getStackTrace()) {
          LOG.error(s);
        }
      }

      <span class="comment" >/**
       * Lookup a thrift object into a hadoop object
       */</span>
      <span class="keyword" >private</span> <span class="keyword" >synchronized</span> Object lookup(<span class="keyword" >long</span> id) {
        <span class="keyword" >return</span> hadoopHash.get(<span class="keyword" >new</span> Long(id));
      }

      <span class="comment" >/**
       * Insert a thrift object into a hadoop object. Return its id.
       */</span>
      <span class="keyword" >private</span> <span class="keyword" >synchronized</span> <span class="keyword" >long</span> insert(Object o) {
        <span class="keyword" >long</span> next = nextId.incrementAndGet();
        hadoopHash.put(next, o);
        <span class="keyword" >return</span> next;
      }

      <span class="comment" >/**
       * Delete a thrift object from the hadoop store.
       */</span>
      <span class="keyword" >private</span> <span class="keyword" >synchronized</span> Object remove(<span class="keyword" >long</span> id) {
        <span class="keyword" >return</span> hadoopHash.remove(<span class="keyword" >new</span> Long(id));
      }

      <span class="comment" >/**
        * Implement the API exported by this thrift server
        */</span>

      <span class="comment" >/** Set inactivity timeout period. The period is specified in seconds.
        * if there are no RPC calls to the HadoopThrift server for this much
        * time, then the server kills itself.
        */</span>
      <span class="keyword" >public</span> <span class="keyword" >synchronized</span> <span class="keyword" >void</span> setInactivityTimeoutPeriod(<span class="keyword" >long</span> periodInSeconds) {
        inactivityPeriod = periodInSeconds * <span class="number" >1000</span>; <span class="comment" >// in milli seconds</span>
        <span class="keyword" >if</span> (inactivityRecheckInterval &gt; inactivityPeriod ) {
          inactivityRecheckInterval = inactivityPeriod;
        }
      }


      <span class="comment" >/**
        * Create a file and open it for writing
        */</span>
      <span class="keyword" >public</span> ThriftHandle create(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"create: "</span> + path);
          FSDataOutputStream out = fs.create(<span class="keyword" >new</span> Path(path.pathname));
          <span class="keyword" >long</span> id = insert(out);
          ThriftHandle obj = <span class="keyword" >new</span> ThriftHandle(id);
          HadoopThriftHandler.LOG.debug(<span class="string" >"created: "</span> + path + <span class="string" >" id: "</span> + id);
          <span class="keyword" >return</span> obj;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
        * Create a file and open it for writing, delete file if it exists
        */</span>
      <span class="keyword" >public</span> ThriftHandle createFile(Pathname path, 
                                     <span class="keyword" >short</span> mode,
                                     <span class="keyword" >boolean</span>  overwrite,
                                     <span class="keyword" >int</span> bufferSize,
                                     <span class="keyword" >short</span> replication,
                                     <span class="keyword" >long</span> blockSize) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"create: "</span> + path +
                                       <span class="string" >" permission: "</span> + mode +
                                       <span class="string" >" overwrite: "</span> + overwrite +
                                       <span class="string" >" bufferSize: "</span> + bufferSize +
                                       <span class="string" >" replication: "</span> + replication +
                                       <span class="string" >" blockSize: "</span> + blockSize);
          FSDataOutputStream out = fs.create(<span class="keyword" >new</span> Path(path.pathname), 
                                             <span class="keyword" >new</span> FsPermission(mode),
                                             overwrite,
                                             bufferSize,
                                             replication,
                                             blockSize,
                                             <span class="keyword" >null</span>); <span class="comment" >// progress</span>
          <span class="keyword" >long</span> id = insert(out);
          ThriftHandle obj = <span class="keyword" >new</span> ThriftHandle(id);
          HadoopThriftHandler.LOG.debug(<span class="string" >"created: "</span> + path + <span class="string" >" id: "</span> + id);
          <span class="keyword" >return</span> obj;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Opens an existing file and returns a handle to read it
       */</span>
      <span class="keyword" >public</span> ThriftHandle open(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"open: "</span> + path);
          FSDataInputStream out = fs.open(<span class="keyword" >new</span> Path(path.pathname));
          <span class="keyword" >long</span> id = insert(out);
          ThriftHandle obj = <span class="keyword" >new</span> ThriftHandle(id);
          HadoopThriftHandler.LOG.debug(<span class="string" >"opened: "</span> + path + <span class="string" >" id: "</span> + id);
          <span class="keyword" >return</span> obj;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Opens an existing file to append to it.
       */</span>
      <span class="keyword" >public</span> ThriftHandle append(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"append: "</span> + path);
          FSDataOutputStream out = fs.append(<span class="keyword" >new</span> Path(path.pathname));
          <span class="keyword" >long</span> id = insert(out);
          ThriftHandle obj = <span class="keyword" >new</span> ThriftHandle(id);
          HadoopThriftHandler.LOG.debug(<span class="string" >"appended: "</span> + path + <span class="string" >" id: "</span> + id);
          <span class="keyword" >return</span> obj;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * write to a file
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> write(ThriftHandle tout, String data) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"write: "</span> + tout.id);
          FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
          <span class="keyword" >byte</span>[] tmp = data.getBytes(<span class="string" >"UTF-8"</span>);
          out.write(tmp, <span class="number" >0</span>, tmp.length);
          HadoopThriftHandler.LOG.debug(<span class="string" >"wrote: "</span> + tout.id);
          <span class="keyword" >return</span> <span class="keyword" >true</span>;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * read from a file
       */</span>
      <span class="keyword" >public</span> String read(ThriftHandle tout, <span class="keyword" >long</span> offset,
                         <span class="keyword" >int</span> length) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"read: "</span> + tout.id +
                                       <span class="string" >" offset: "</span> + offset +
                                       <span class="string" >" length: "</span> + length);
          FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
          
          <span class="keyword" >if</span> (in.getPos() != offset) {
            in.seek(offset);
          }
          <span class="keyword" >byte</span>[] tmp = <span class="keyword" >new</span> <span class="keyword" >byte</span>[length];
          <span class="keyword" >int</span> numbytes = in.read(offset, tmp, <span class="number" >0</span>, length);
          HadoopThriftHandler.LOG.debug(<span class="string" >"read done: "</span> + tout.id);
          <span class="keyword" >return</span> <span class="keyword" >new</span> String(tmp, <span class="number" >0</span>, numbytes, <span class="string" >"UTF-8"</span>);
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Delete a file/directory
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> rm(Pathname path, <span class="keyword" >boolean</span> recursive) 
                            <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"rm: "</span> + path +
                                       <span class="string" >" recursive: "</span> + recursive);
          <span class="keyword" >boolean</span> ret = fs.delete(<span class="keyword" >new</span> Path(path.pathname), recursive);
          HadoopThriftHandler.LOG.debug(<span class="string" >"rm: "</span> + path);
          <span class="keyword" >return</span> ret;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Move a file/directory
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> rename(Pathname path, Pathname dest) 
                            <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"rename: "</span> + path +
                                       <span class="string" >" destination: "</span> + dest);
          <span class="keyword" >boolean</span> ret = fs.rename(<span class="keyword" >new</span> Path(path.pathname), 
                                  <span class="keyword" >new</span> Path(dest.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"rename: "</span> + path);
          <span class="keyword" >return</span> ret;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       *  close file
       */</span>
       <span class="keyword" >public</span> <span class="keyword" >boolean</span> close(ThriftHandle tout) <span class="keyword" >throws</span> ThriftIOException {
         <span class="keyword" >try</span> {
           now.set(now());
           HadoopThriftHandler.LOG.debug(<span class="string" >"close: "</span> + tout.id);
           Object obj = remove(tout.id);
           
           <span class="keyword" >if</span> (obj <span class="keyword" >instanceof</span> FSDataOutputStream) {
             FSDataOutputStream out = (FSDataOutputStream)obj;
             out.close();
           } <span class="keyword" >else</span> <span class="keyword" >if</span> (obj <span class="keyword" >instanceof</span> FSDataInputStream) {
             FSDataInputStream in = (FSDataInputStream)obj;
             in.close();
           } <span class="keyword" >else</span> {
             <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(<span class="string" >"Unknown thrift handle."</span>);
           }
           HadoopThriftHandler.LOG.debug(<span class="string" >"closed: "</span> + tout.id);
           <span class="keyword" >return</span> <span class="keyword" >true</span>;
         } <span class="keyword" >catch</span> (IOException e) {
           <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
         }
       }

       <span class="comment" >/**
        * Create a directory
        */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> mkdirs(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"mkdirs: "</span> + path);
          <span class="keyword" >boolean</span> ret = fs.mkdirs(<span class="keyword" >new</span> Path(path.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"mkdirs: "</span> + path);
          <span class="keyword" >return</span> ret;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Does this pathname exist?
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> exists(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"exists: "</span> + path);
          <span class="keyword" >boolean</span> ret = fs.exists(<span class="keyword" >new</span> Path(path.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"exists done: "</span> + path);
          <span class="keyword" >return</span> ret;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Returns status about the specified pathname
       */</span>
      <span class="keyword" >public</span> org.apache.hadoop.thriftfs.api.FileStatus stat(
                              Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"stat: "</span> + path);
          org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
                                             <span class="keyword" >new</span> Path(path.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"stat done: "</span> + path);
          <span class="keyword" >return</span> <span class="keyword" >new</span> org.apache.hadoop.thriftfs.api.FileStatus(
            stat.getPath().toString(),
            stat.getLen(),
            stat.isDir(),
            stat.getReplication(),
            stat.getBlockSize(),
            stat.getModificationTime(),
            stat.getPermission().toString(),
            stat.getOwner(),
            stat.getGroup());
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * If the specified pathname is a directory, then return the
       * list of pathnames in this directory
       */</span>
      <span class="keyword" >public</span> List&lt;org.apache.hadoop.thriftfs.api.FileStatus&gt; listStatus(
                              Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"listStatus: "</span> + path);

          org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
                                             <span class="keyword" >new</span> Path(path.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"listStatus done: "</span> + path);
          org.apache.hadoop.thriftfs.api.FileStatus tmp;
          List&lt;org.apache.hadoop.thriftfs.api.FileStatus&gt; value = 
            <span class="keyword" >new</span> LinkedList&lt;org.apache.hadoop.thriftfs.api.FileStatus&gt;();

          <span class="keyword" >for</span> (<span class="keyword" >int</span> i = <span class="number" >0</span>; i &lt; stat.length; i++) {
            tmp = <span class="keyword" >new</span> org.apache.hadoop.thriftfs.api.FileStatus(
                        stat[i].getPath().toString(),
                        stat[i].getLen(),
                        stat[i].isDir(),
                        stat[i].getReplication(),
                        stat[i].getBlockSize(),
                        stat[i].getModificationTime(),
                        stat[i].getPermission().toString(),
                        stat[i].getOwner(),
                        stat[i].getGroup());
            value.add(tmp);
          }
          <span class="keyword" >return</span> value;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Sets the permission of a pathname
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> chmod(Pathname path, <span class="keyword" >short</span> mode) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"chmod: "</span> + path + 
                                       <span class="string" >" mode "</span> + mode);
          fs.setPermission(<span class="keyword" >new</span> Path(path.pathname), <span class="keyword" >new</span> FsPermission(mode));
          HadoopThriftHandler.LOG.debug(<span class="string" >"chmod done: "</span> + path);
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Sets the owner &amp; group of a pathname
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> chown(Pathname path, String owner, String group) 
                                                         <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"chown: "</span> + path +
                                       <span class="string" >" owner: "</span> + owner +
                                       <span class="string" >" group: "</span> + group);
          fs.setOwner(<span class="keyword" >new</span> Path(path.pathname), owner, group);
          HadoopThriftHandler.LOG.debug(<span class="string" >"chown done: "</span> + path);
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Sets the replication factor of a file
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> setReplication(Pathname path, <span class="keyword" >short</span> repl) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"setrepl: "</span> + path +
                                       <span class="string" >" replication factor: "</span> + repl);
          fs.setReplication(<span class="keyword" >new</span> Path(path.pathname), repl);
          HadoopThriftHandler.LOG.debug(<span class="string" >"setrepl done: "</span> + path);
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }

      }

      <span class="comment" >/**
       * Returns the block locations of this file
       */</span>
      <span class="keyword" >public</span> List&lt;org.apache.hadoop.thriftfs.api.BlockLocation&gt; 
               getFileBlockLocations(Pathname path, <span class="keyword" >long</span> start, <span class="keyword" >long</span> length) 
                                           <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"getFileBlockLocations: "</span> + path);

          org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
                                                   <span class="keyword" >new</span> Path(path.pathname));

          org.apache.hadoop.fs.BlockLocation[] stat = 
              fs.getFileBlockLocations(status, start, length);
          HadoopThriftHandler.LOG.debug(<span class="string" >"getFileBlockLocations done: "</span> + path);

          org.apache.hadoop.thriftfs.api.BlockLocation tmp;
          List&lt;org.apache.hadoop.thriftfs.api.BlockLocation&gt; value = 
            <span class="keyword" >new</span> LinkedList&lt;org.apache.hadoop.thriftfs.api.BlockLocation&gt;();

          <span class="keyword" >for</span> (<span class="keyword" >int</span> i = <span class="number" >0</span>; i &lt; stat.length; i++) {

            <span class="comment" >// construct the list of hostnames from the array returned</span>
            <span class="comment" >// by HDFS</span>
            List&lt;String&gt; hosts = <span class="keyword" >new</span> LinkedList&lt;String&gt;();
            String[] hostsHdfs = stat[i].getHosts();
            <span class="keyword" >for</span> (<span class="keyword" >int</span> j = <span class="number" >0</span>; j &lt; hostsHdfs.length; j++) {
              hosts.add(hostsHdfs[j]);
            }

            <span class="comment" >// construct the list of host:port from the array returned</span>
            <span class="comment" >// by HDFS</span>
            List&lt;String&gt; names = <span class="keyword" >new</span> LinkedList&lt;String&gt;();
            String[] namesHdfs = stat[i].getNames();
            <span class="keyword" >for</span> (<span class="keyword" >int</span> j = <span class="number" >0</span>; j &lt; namesHdfs.length; j++) {
              names.add(namesHdfs[j]);
            }
            tmp = <span class="keyword" >new</span> org.apache.hadoop.thriftfs.api.BlockLocation(
                        hosts, names, stat[i].getOffset(), stat[i].getLength());
            value.add(tmp);
          }
          <span class="keyword" >return</span> value;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }
    }

    <span class="comment" >// Bind to port. If the specified port is 0, then bind to random port.</span>
    <span class="keyword" >private</span> ServerSocket createServerSocket(<span class="keyword" >int</span> port) <span class="keyword" >throws</span> IOException {
      <span class="keyword" >try</span> {
        ServerSocket sock = <span class="keyword" >new</span> ServerSocket();
        <span class="comment" >// Prevent 2MSL delay problem on server restarts</span>
        sock.setReuseAddress(<span class="keyword" >true</span>);
        <span class="comment" >// Bind to listening port</span>
        <span class="keyword" >if</span> (port == <span class="number" >0</span>) {
          sock.bind(<span class="keyword" >null</span>);
          serverPort = sock.getLocalPort();
        } <span class="keyword" >else</span> {
          sock.bind(<span class="keyword" >new</span> InetSocketAddress(port));
        }
        <span class="keyword" >return</span> sock;
      } <span class="keyword" >catch</span> (IOException ioe) {
        <span class="keyword" >throw</span> <span class="keyword" >new</span> IOException(<span class="string" >"Could not create ServerSocket on port "</span> + port + <span class="string" >"."</span> +
                              ioe);
      }
    }

    <span class="comment" >/**
     * Constrcts a server object
     */</span>
    <span class="keyword" >public</span> HadoopThriftServer(String [] args) {

      <span class="keyword" >if</span> (args.length &gt; <span class="number" >0</span>) {
        serverPort = <span class="keyword" >new</span> Integer(args[<span class="number" >0</span>]);
      }
      <span class="keyword" >try</span> {
        ServerSocket ssock = createServerSocket(serverPort);
        TServerTransport serverTransport = <span class="keyword" >new</span> TServerSocket(ssock);
        Iface handler = <span class="keyword" >new</span> HadoopThriftHandler(<span class="string" >"hdfs-thrift-dhruba"</span>);
        ThriftHadoopFileSystem.Processor processor = <span class="keyword" >new</span> ThriftHadoopFileSystem.Processor(handler);
        TThreadPoolServer.Options options = <span class="keyword" >new</span> TThreadPoolServer.Options();
        options.minWorkerThreads = <span class="number" >10</span>;
        server = <span class="keyword" >new</span> TThreadPoolServer(processor, serverTransport,
                                               <span class="keyword" >new</span> TTransportFactory(),
                                               <span class="keyword" >new</span> TTransportFactory(),
                                               <span class="keyword" >new</span> TBinaryProtocol.Factory(),
                                               <span class="keyword" >new</span> TBinaryProtocol.Factory(), 
                                               options);
        System.out.println(<span class="string" >"Starting the hadoop thrift server on port ["</span> + serverPort + <span class="string" >"]..."</span>);
        HadoopThriftHandler.LOG.info(<span class="string" >"Starting the hadoop thrift server on port ["</span> +serverPort + <span class="string" >"]..."</span>);
        System.out.flush();

      } <span class="keyword" >catch</span> (Exception x) {
        x.printStackTrace();
      }
    }

    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >void</span> main(String [] args) {
      HadoopThriftServer me = <span class="keyword" >new</span> HadoopThriftServer(args);
      me.server.serve();
    }
}</pre>

<p>安定した。気がする。。<br />
いつか問題となるようなコードを書いてpatchを作ろう</p>
]]>
</content:encoded>
</item>
<item rdf:about="http://blog.xole.net/article.php?id=744">
<title>Hive の Local Metastore に derby を使う</title>
<link>http://blog.xole.net/article.php?id=744</link>
<dc:date>2010-02-07T02:37:53+09:00</dc:date>
<description>突然ですが、hadoop の話題

Hadoop Hive といえば、SQL 感覚で MapReduce できるんですが、Hive は SQL のように記述できるようにするために、metastore 形式でメタデータを管理してます。
そ...</description>
<content:encoded>
<![CDATA[
<p>突然ですが、hadoop の話題</p>
<p>
<a href="http://hadoop.apache.org/hive/">Hadoop Hive</a> といえば、SQL 感覚で MapReduce できるんですが、Hive は SQL のように記述できるようにするために、metastore 形式でメタデータを管理してます。<br />
そのあたりは、<a href="http://wiki.apache.org/hadoop/Hive/AdminManual/MetastoreAdmin">Hive/AdminManual/MetastoreAdmin - Hadoop Wiki</a> や <a href="http://wiki.apache.org/hadoop/Hive/AdminManual/MetastoreAdmin?action=AttachFile&do=view&target=metastore_usage.pptx">metastore_usage.pptx</a>や<a href="http://blog.katsuma.tv/2009/10/hive_local_metastore.html">HiveのmetastoreをMySQLを使ってLocal Metastore形式で利用する - blog.katsuma.tv</a> などに詳しく書かれているので省略。</p>
<p>ただ、local metastore にいちいち mysql をセットアップするのもあれだったんで、今回は derby にしておきます</p>

<h2>derby編</h2>

<h3>derby のインストール</h3>
<p>
<a href="http://db.apache.org/derby/">Apache Derby</a>からダウンロードしてきます。とりあえず最新のを落としてくる</p>

<pre class="javascript">&gt; wget http:<span class="comment" >//..../db-derby-10.5.3.0-bin.tar.gz</span>
&gt; tar xzvf db-derby-10.5.3.0-bin.tar.gz
&gt; sudo mv db-derby-10.5.3.0-bin /opt/local/derby
</pre>

<h3>次に、データ保存用の /var/db/derby を作っておく</h3>

<pre class="javascript">&gt; sudo mkdir /<span class="keyword" >var</span>/db/derby
&gt; sudo chown nowel:users /<span class="keyword" >var</span>/db/derby
</pre>

<p>chown は derby 用のユーザでもいいので、実行ユーザに変えておく</p>

<h3>環境変数の設定</h3>

<p>環境変数名とかは、Hadoop のそれっぽくしておく
<pre class="javascript">
<span class="preprocessor" >#!/usr/bin/env bash</span>

<span class="keyword" >export</span> DERBY_HOME=/opt/local/derby
<span class="keyword" >export</span> DERBY_OPTS=<span class="string" >"-Dderby.drda.startNetworkServer=true -Dderby.drda.maxThreads=30 -Dderby.system.home=/var/db/derby -Dderby.drda.portNumber=1527 -Dderby.drda.host=0.0.0.0"</span>
<span class="keyword" >export</span> DERBY_LOG_DIR=/<span class="keyword" >var</span>/log</pre>

<p>これを $HOME/.derby_profile とかしておいて、.bashrcなんかに</p>
<pre>
<code>source $HOME/.derby_profile</code>
</pre>
<p>と記述しておくといちいち source しなくて楽</p>

<h3>起動用のスクリプトを用意</h3>

<p>start/stop/status くらいは何度か確認することがあるので、こんな感じで用意</p>

<p>start.sh</p>

<pre class="javascript">
<span class="preprocessor" >#!/usr/bin/env bash</span>

logfile=$DERBY_LOG_DIR/derby.log

nohup $DERBY_HOME/bin/NetworkServerControl start &gt; $logfile 2&gt;&amp;1 &lt; /dev/<span class="keyword" >null</span> &amp;</pre>

<p>stop.sh</p>
<pre class="javascript">
<span class="preprocessor" >#!/usr/bin/env bash</span>

$DERBY_HOME/bin/NetworkServerControl shutdown</pre>

<p>status.sh</p>
<pre class="javascript">
<span class="preprocessor" >#!/usr/bin/env bash</span>

$DERBY_HOME/bin/NetworkServerControl sysinfo
$DERBY_HOME/bin/NetworkServerControl runtimeinfo</pre>

<p>とこんな感じ。</p>

<h3>起動と停止</h3>

<p>さっき用意した start.sh ファイル群を $HOME/bin とかに置いているのであれば</p>
<pre>
<code>&gt; $HOME/bin/start.sh</code>
</pre>
<p>で起動できる</p>


<p>ちなみに、derbyはstartNetworkServerというスクリプトが用意されているけど、素のまま起動すると、ネットワーク越しにアクセスできない(正確には起動した同一ホストからしかアクセスできない)</p>
<p>ネットワーク越しに利用するなら<code>startNetworkServer -h 0.0.0.0</code>とするか、<code>derby.drda.host=0.0.0.0</code>みたいな変数を利用する。<br />
(ここで結構ハマった...とりあえず、開発用なら 0.0.0.0 で始めるといいかと)</p>

<h2>Hive編</h3>

<p>hadoop の mapred/hdfs などはセットアップ済みとして進めます。</p>

<h3>hive のセットアップ</h3>

<p>先に書いておくと、<a href="http://www.apache.org/dyn/closer.cgi/hadoop/hive/">http://www.apache.org/dyn/closer.cgi/hadoop/hive/</a>とかに置いてある hive-0.4.1 を使ってセットアップを進めても</p>

<pre class="javascript">FAILED: Error <span class="keyword" >in</span> metadata: org.datanucleus.jdo.exceptions.TransactionNotReadableException: Cant read fields outside of transactions. You may want to set <span class="string" >'NontransactionalRead=true'</span>.
FailedObject:1[OID]org.apache.hadoop.hive.metastore.model.MDatabase
FAILED: Execution Error, <span class="keyword" >return</span> code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
</pre>

<p>が発生してなかなか前に進めなくなる。<br />
Hive CLI の起動時に</p>

<pre class="javascript">hive &gt; set javax.jdo.option.NontransactionalRead=<span class="keyword" >true</span>;
</pre>

<p>としておくことで、一時的になんとかなるけど、接続毎にやることになるので、おすすめはしません。<br />
(jpox.properties を使うというのもあるけど、設定の二重管理になりそうだし、後述する新しいのでは必要なさそうなので今回は 0.4.1 を使わないという方向で進みます)</p>


<h3>Hiveをtrunkからもってきてビルドする</h3>

<p>たぶん、trunkに入ってるのは hive-0.5.x 系</p>

<pre class="javascript">&gt; svn <span class="keyword" >export</span> http:<span class="comment" >//svn.apache.org/repos/asf/hadoop/hive/trunk hive</span>
&gt; cd hive
&gt; ant <span class="keyword" >package</span>
&gt; :
&gt; : (しばし待つ)
&gt; :
&gt; cp -r build/dist/lib/* lib
&gt; sudo mv hive /opt/local/hive-0.5.0-trunk
</pre>

<h3>環境変数の設定</h3>

<p>こんな感じで用意する</p>

<pre class="javascript">
<span class="preprocessor" >#!/usr/bin/env bash</span>

<span class="keyword" >export</span> HIVE_HOME=/opt/local/hive-0.5.0-trunk
<span class="keyword" >export</span> HIVE_CONF_DIR=/home/hive/conf
<span class="keyword" >export</span> HIVE_PID_DIR=/<span class="keyword" >var</span>/run
<span class="keyword" >export</span> HIVE_LOG_DIR=/<span class="keyword" >var</span>/log
</pre>

<p>
<code>HIVE_CONF_DIR</code>は<code>HADOOP_CONF_DIR</code>と違って、hive-default.xml を HIVE-HOME/conf から読んでくれない(?)ので、hive-default.xmlとhive-log4j.propertiesは <code>HIVE_CONF_DIR</code> にシンボリックリンクしておく</p>

<pre class="javascript">&gt; ln -s $HIVE_HOME/conf/hive-<span class="keyword" >default</span>.xml $HIVE_CONF_DIR/hive-<span class="keyword" >default</span>.xml
&gt; ln -s $HIVE_HOME/conf/hive-log4j.properties $HIVE_CONF_DIR/hive-log4j.properties
</pre>

<h3>hive-site.xmlを用意</h3>

<p>$HIVE_CONF_DIR に hive-site.xml をこんな感じで用意</p>

<pre class="javascript">&lt;?xml version=<span class="string" >"1.0"</span>?&gt;
&lt;?xml-stylesheet type=<span class="string" >"text/xsl"</span> href=<span class="string" >"configuration.xsl"</span>?&gt;

&lt;configuration&gt;
    &lt;property&gt;
        &lt;name&gt;hive.metastore.local&lt;/name&gt;
        &lt;value&gt;<span class="keyword" >true</span>&lt;/value&gt;
    &lt;/property&gt;
    &lt;property&gt;
        &lt;name&gt;hive.metastore.warehouse.dir&lt;/name&gt;
        &lt;value&gt;/hive/warehouse&lt;/value&gt;
    &lt;/property&gt;
    &lt;property&gt;
        &lt;name&gt;hive.metastore.rawstore.impl&lt;/name&gt;
        &lt;value&gt;org.apache.hadoop.hive.metastore.ObjectStore&lt;/value&gt;
    &lt;/property&gt;
    &lt;property&gt;
        &lt;name&gt;javax.jdo.option.ConnectionURL&lt;/name&gt;
        &lt;value&gt;jdbc:derby:<span class="comment" >//master1:1527/metastore;create=true&lt;/value&gt;</span>
    &lt;/property&gt;
    &lt;property&gt;
        &lt;name&gt;javax.jdo.option.ConnectionDriverName&lt;/name&gt;
        &lt;value&gt;org.apache.derby.jdbc.ClientDriver&lt;/value&gt;
    &lt;/property&gt;
    &lt;property&gt;
        &lt;name&gt;javax.jdo.option.ConnectionUserName&lt;/name&gt;
        &lt;value&gt;&lt;!-- !empty --&gt;&lt;/value&gt;
    &lt;/property&gt;
    &lt;property&gt;
        &lt;name&gt;javax.jdo.option.ConnectionPassword&lt;/name&gt;
        &lt;value&gt;&lt;!-- !empty --&gt;&lt;/value&gt;
    &lt;/property&gt;
    &lt;property&gt;
        &lt;name&gt;datanucleus.autoCreateTables&lt;/name&gt;
        &lt;value&gt;<span class="keyword" >true</span>&lt;/value&gt;
    &lt;/property&gt;
&lt;/configuration&gt;</pre>

<h3>Hive CLIの起動</h3>

<p>とりあえず、derby の metastorage が使えるかどうかを確認するため、$HIVE_HOME/bin/hive で Hive CLI を起動して <code>show tables</code> する</p>

<pre class="javascript">&gt; $HIVE_HOME/bin/hive
hive&gt; show tables;
OK
Time taken: 8.613 seconds
hive&gt; exit;
</pre>

<p>とここまで出れば derby で Local Metastore が使えるようになってます。</p>

<h3>おわり</h3>

<p>と、ここまで書いてみたけど、Hiveのwikiに同じようなものがあった orz<br />
ref - <a href="http://wiki.apache.org/hadoop/HiveDerbyServerMode">HiveDerbyServerMode - Hadoop Wiki</a>
</p>
]]>
</content:encoded>
</item>
<item rdf:about="http://blog.xole.net/article.php?id=743">
<title>PHPで200行で作る memcached 互換サーバ</title>
<link>http://blog.xole.net/article.php?id=743</link>
<dc:date>2010-01-20T22:00:52+09:00</dc:date>
<description>まさに誰得。


class MemcachedServer {
    protected $command;
    public function __construct(MemcachedCommand $command){...</description>
<content:encoded>
<![CDATA[
<p>まさに誰得。</p>

<pre class="php">
<span class="keyword" >class</span> MemcachedServer {
    <span class="keyword" >protected</span> <span class="vars" >$command</span>;
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(MemcachedCommand <span class="vars" >$command</span>){
        <span class="vars" >$this</span>-&gt;command = <span class="vars" >$command</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> start(){
        <span class="vars" >$this</span>-&gt;run();
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> run(){
        <span class="vars" >$socket</span> = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        <span class="keyword" >if</span>(false === <span class="vars" >$socket</span>){
            <span class="vars" >$code</span> = socket_last_error();
            <span class="vars" >$msg</span> = socket_strerror(<span class="vars" >$code</span>);
            <span class="keyword" >throw</span> <span class="keyword" >new</span> Exception(sprintf(<span class="string" >'socket_create was error(%s):%s'</span>, <span class="vars" >$code</span>, <span class="vars" >$msg</span>));
        }
        socket_set_option(<span class="vars" >$socket</span>, SOL_SOCKET, SO_REUSEADDR, 1);

        <span class="vars" >$binded</span> = @socket_bind(<span class="vars" >$socket</span>, 0, 11222);
        <span class="keyword" >if</span>(false === <span class="vars" >$binded</span>){
            <span class="vars" >$code</span> = socket_last_error();
            <span class="vars" >$msg</span> = socket_strerror(<span class="vars" >$code</span>);
            <span class="keyword" >throw</span> <span class="keyword" >new</span> Exception(sprintf(<span class="string" >'socket_bind was error(%s):%s'</span>, <span class="vars" >$code</span>, <span class="vars" >$msg</span>));
        }
        <span class="vars" >$listend</span> = @socket_listen(<span class="vars" >$socket</span>);
        <span class="keyword" >if</span>(false === <span class="vars" >$listend</span>){
            <span class="vars" >$code</span> = socket_last_error();
            <span class="vars" >$msg</span> = socket_strerror(<span class="vars" >$code</span>);
            <span class="keyword" >throw</span> <span class="keyword" >new</span> Exception(sprintf(<span class="string" >'socket_listen was error(%s):%s'</span>, <span class="vars" >$code</span>, <span class="vars" >$msg</span>));
        }
        <span class="comment" >//socket_set_nonblock($socket);</span>
       
        <span class="func" >echo</span> <span class="string" >'server start on tcp://0.0.0.0:11222'</span>, PHP_EOL;
        <span class="vars" >$read</span> = <span class="keyword" >array</span>(<span class="vars" >$socket</span>);
        <span class="vars" >$write</span> = null;
        <span class="vars" >$except</span> = null;
        <span class="keyword" >while</span>(true){
            <span class="vars" >$accept</span> = @socket_accept(<span class="vars" >$socket</span>);
            <span class="keyword" >if</span>(false === <span class="vars" >$accept</span>){
                <span class="keyword" >continue</span>;
            }
            <span class="vars" >$handler</span> = <span class="keyword" >new</span> MemcachedAcceptHandler(<span class="vars" >$accept</span>, <span class="vars" >$this</span>-&gt;command);
            <span class="vars" >$handler</span>-&gt;init();
            <span class="vars" >$handler</span>-&gt;execute();
            <span class="vars" >$handler</span>-&gt;destroy();
        }
    }
}

<span class="keyword" >interface</span> StreamReadWrite {
    <span class="keyword" >const</span> DELIMITER = <span class="string" >"\r\n"</span>;
    <span class="keyword" >public</span> <span class="keyword" >function</span> readLine();
    <span class="keyword" >public</span> <span class="keyword" >function</span> writeLine(<span class="vars" >$str</span>);
}

<span class="keyword" >class</span> MemcachedAcceptHandler <span class="keyword" >implements</span> StreamReadWrite {
    <span class="keyword" >protected</span> <span class="vars" >$socket</span>;
    <span class="keyword" >protected</span> <span class="vars" >$command</span>;
    <span class="keyword" >protected</span> <span class="vars" >$connected</span> = true;
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(<span class="vars" >$socket</span>, MemcachedCommand <span class="vars" >$command</span>){
        <span class="vars" >$this</span>-&gt;socket = <span class="vars" >$socket</span>;
        <span class="vars" >$this</span>-&gt;command = <span class="vars" >$command</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> __destruct(){
        <span class="keyword" >if</span>(null !== <span class="vars" >$this</span>-&gt;socket){
            @socket_close(<span class="vars" >$this</span>-&gt;socket);
            unset(<span class="vars" >$this</span>-&gt;socket);
        }
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> init(){
        <span class="func" >echo</span> <span class="string" >'new connection'</span>, PHP_EOL;
        <span class="comment" >//socket_set_nonblock($this-&gt;socket);</span>
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> execute(){
        <span class="keyword" >while</span>(<span class="vars" >$this</span>-&gt;connected){
            <span class="vars" >$read</span> = <span class="keyword" >array</span>(<span class="vars" >$this</span>-&gt;socket);
            <span class="vars" >$write</span> = <span class="keyword" >array</span>();
            <span class="vars" >$except</span> = <span class="keyword" >array</span>();
            <span class="vars" >$select</span> = @socket_select(<span class="vars" >$read</span>, <span class="vars" >$write</span>, <span class="vars" >$except</span>, 1);
            <span class="keyword" >if</span>(false === <span class="vars" >$select</span>){
                <span class="keyword" >throw</span> <span class="keyword" >new</span> RuntimeException(<span class="string" >'socket_select'</span>);
            }
            <span class="keyword" >if</span>(<span class="vars" >$select</span> &lt; 1){
                <span class="keyword" >continue</span>;
            }

            <span class="vars" >$line</span> = <span class="vars" >$this</span>-&gt;readLine();
            <span class="keyword" >if</span>(null === <span class="vars" >$line</span>){
                <span class="keyword" >continue</span>;
            }
            <span class="comment" >// get hoge =&gt; get</span>
            <span class="vars" >$mode</span> = <span class="func" >substr</span>(<span class="vars" >$line</span>, 0, 3);
            <span class="comment" >// get hoge foo =&gt; array(hoge, foo)</span>
            <span class="vars" >$args</span> = <span class="func" >explode</span>(<span class="string" >' '</span>, <span class="func" >substr</span>(<span class="vars" >$line</span>, 4));
            <span class="vars" >$this</span>-&gt;command-&gt;call(<span class="vars" >$this</span>, <span class="vars" >$mode</span>, <span class="vars" >$args</span>);
        }
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> destroy(){
        <span class="func" >echo</span> <span class="string" >'close connection'</span>, PHP_EOL;
        @socket_shutdown(<span class="vars" >$this</span>-&gt;socket);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> readLine(){
        <span class="vars" >$line</span> = <span class="string" >''</span>;
        <span class="keyword" >while</span>(true){
            <span class="vars" >$buf</span> = @socket_read(<span class="vars" >$this</span>-&gt;socket, 1);
            <span class="keyword" >if</span>(false === <span class="vars" >$buf</span> || <span class="string" >''</span> === <span class="vars" >$buf</span>){
                <span class="comment" >// FIXME!!!</span>
                <span class="vars" >$this</span>-&gt;connected = false;
                <span class="keyword" >return</span> null;
            }
            <span class="vars" >$line</span> .= <span class="vars" >$buf</span>;
            <span class="keyword" >if</span>(self::DELIMITER == <span class="func" >substr</span>(<span class="vars" >$line</span>, -2)){
                <span class="vars" >$line</span> = <span class="func" >substr</span>(<span class="vars" >$line</span>, 0, -2);
                <span class="keyword" >break</span>;
            }
        }
        <span class="keyword" >if</span>(<span class="func" >empty</span>(<span class="vars" >$line</span>)){
            <span class="keyword" >return</span> null;
        }
        <span class="keyword" >if</span>(preg_match(<span class="string" >'/^\s+$/'</span>, <span class="vars" >$line</span>)){
            <span class="keyword" >return</span> null;
        }
        <span class="keyword" >return</span> <span class="vars" >$line</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> writeLine(<span class="vars" >$str</span>){
        <span class="keyword" >return</span> @socket_write(<span class="vars" >$this</span>-&gt;socket, <span class="vars" >$str</span> . self::DELIMITER);
    }
}

<span class="keyword" >interface</span> MemcachedCommand {
    <span class="keyword" >public</span> <span class="keyword" >function</span> call(StreamReadWrite <span class="vars" >$reader</span>, <span class="vars" >$mode</span>, <span class="keyword" >array</span> <span class="vars" >$args</span>);
}

<span class="keyword" >abstract</span> <span class="keyword" >class</span> AbstractMemcachedCommand <span class="keyword" >implements</span> MemcachedCommand {
    <span class="keyword" >protected</span> <span class="vars" >$reflector</span>;
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(){
        <span class="vars" >$this</span>-&gt;reflector = <span class="keyword" >new</span> ReflectionObject(<span class="vars" >$this</span>);
    }
    <span class="keyword" >protected</span> <span class="keyword" >static</span> <span class="keyword" >function</span> concat(<span class="keyword" >array</span> <span class="vars" >$a</span>, <span class="keyword" >array</span> <span class="vars" >$b</span>){
        <span class="func" >array_splice</span>(<span class="vars" >$a</span>, <span class="func" >count</span>(<span class="vars" >$a</span>), 0, <span class="vars" >$b</span>);
        <span class="keyword" >return</span> <span class="vars" >$a</span>;
    }
    <span class="keyword" >public</span> final <span class="keyword" >function</span> call(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$mode</span>, <span class="keyword" >array</span> <span class="vars" >$args</span>){
        <span class="keyword" >if</span>(!<span class="vars" >$this</span>-&gt;reflector-&gt;hasMethod(<span class="vars" >$mode</span>)){
            <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;error(<span class="vars" >$rw</span>);
        }
        <span class="func" >echo</span> <span class="string" >'command =&gt; '</span>, <span class="vars" >$mode</span>, <span class="string" >' '</span>, join(<span class="string" >' '</span>, <span class="vars" >$args</span>), PHP_EOL;
        <span class="keyword" >return</span> call_user_func_array(<span class="keyword" >array</span>(<span class="vars" >$this</span>, <span class="vars" >$mode</span>), self::concat(<span class="keyword" >array</span>(<span class="vars" >$rw</span>), <span class="vars" >$args</span>));
    }
    <span class="keyword" >protected</span> <span class="keyword" >function</span> error(<span class="vars" >$rw</span>){
        <span class="vars" >$rw</span>-&gt;writeLine(<span class="string" >'ERROR'</span>);
    }
    <span class="keyword" >protected</span> <span class="keyword" >abstract</span> <span class="keyword" >function</span> get(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$keys</span>);
    <span class="keyword" >protected</span> <span class="keyword" >abstract</span> <span class="keyword" >function</span> set(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$key</span>, <span class="vars" >$flag</span>, <span class="vars" >$expire</span>, <span class="vars" >$length</span>);
    <span class="keyword" >protected</span> <span class="keyword" >abstract</span> <span class="keyword" >function</span> <span class="func" >delete</span>(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$key</span>, <span class="vars" >$expire</span> = 0);
}
<span class="keyword" >class</span> StorageMemcacheCommand <span class="keyword" >extends</span> AbstractMemcachedCommand {
    <span class="keyword" >protected</span> <span class="vars" >$cache</span> = <span class="keyword" >array</span>();
    <span class="keyword" >protected</span> <span class="keyword" >function</span> get(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$keys</span>){
        <span class="vars" >$args</span> = func_get_args();
        <span class="func" >array_shift</span>(<span class="vars" >$args</span>);
        <span class="keyword" >foreach</span>(<span class="vars" >$args</span> <span class="keyword" >as</span> <span class="vars" >$key</span>){
            <span class="keyword" >if</span>(!isset(<span class="vars" >$this</span>-&gt;cache[<span class="vars" >$key</span>])){
                <span class="keyword" >continue</span>;
            }
            <span class="vars" >$value</span> = <span class="vars" >$this</span>-&gt;cache[<span class="vars" >$key</span>];
            <span class="keyword" >if</span>(<span class="vars" >$value</span>-&gt;expire &lt; time()){
                <span class="keyword" >continue</span>;
            }
            <span class="vars" >$rw</span>-&gt;writeLine(sprintf(<span class="string" >'VALUE %s %d %d'</span>, <span class="vars" >$key</span>, <span class="vars" >$value</span>-&gt;flag, <span class="vars" >$value</span>-&gt;length));
            <span class="vars" >$rw</span>-&gt;writeLine(<span class="vars" >$value</span>-&gt;value);
        }
        <span class="vars" >$rw</span>-&gt;writeLine(<span class="string" >'END'</span>);
    }
    <span class="keyword" >protected</span> <span class="keyword" >function</span> set(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$key</span>, <span class="vars" >$flag</span>, <span class="vars" >$expire</span>, <span class="vars" >$length</span>){
        <span class="vars" >$value</span> = <span class="keyword" >new</span> stdClass;
        <span class="vars" >$value</span>-&gt;flag = <span class="vars" >$flag</span>;
        <span class="vars" >$value</span>-&gt;expire = time() + <span class="vars" >$expire</span>;
        <span class="vars" >$value</span>-&gt;length = <span class="vars" >$length</span>;
        <span class="vars" >$value</span>-&gt;value = <span class="vars" >$rw</span>-&gt;readLine();
        <span class="vars" >$this</span>-&gt;cache[<span class="vars" >$key</span>] = <span class="vars" >$value</span>;
        <span class="vars" >$rw</span>-&gt;writeLine(<span class="string" >'STORED'</span>);
    }
    <span class="keyword" >protected</span> <span class="keyword" >function</span> <span class="func" >delete</span>(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$key</span>, <span class="vars" >$expire</span> = 0){
        <span class="keyword" >if</span>(isset(<span class="vars" >$this</span>-&gt;cache[<span class="vars" >$key</span>])){
            <span class="vars" >$this</span>-&gt;cache[<span class="vars" >$key</span>]-&gt;expire = <span class="vars" >$expire</span>;
        }
    }
}

<span class="vars" >$server</span> = <span class="keyword" >new</span> MemcachedServer(<span class="keyword" >new</span> StorageMemcacheCommand);
<span class="vars" >$server</span>-&gt;start();</pre>

<p>とりあえず、set/get/deleteだけ。threadとか色々ないので、実際には削除されない。。<br />
他にも構文解析とかしてない（しなくていいくらい memcache はシンプルなんだけど）ので上手いことハンドリングしないといけない。。<br />
同時アクセスに難あり。。</p>

<p>(中略)</p>

<p>などなど、色々あるので、開発用にしか向いてない。というか、よっぽどストイックにPHPを愛している人じゃないと向いていないかも。<br />
まぁ最近は色々memcached互換系のがでてきたので、クライアント作成時のdebug用途向けかも。</p>

<p>今回は、PHPの<a href="http://jp2.php.net/manual/ja/book.sockets.php">socket</a> 関数をゴリゴリ使っています。<br />
blocking modeとかはちょっと動きが不安定（？）なので、あまり性能はでない<br />
c-lang版と比較を、この前のスクリプトを使ってみた</p>

<pre class="php">
<span class="vars" >$target</span> = <span class="keyword" >array</span>(
    <span class="keyword" >array</span>(<span class="string" >'host'</span> =&gt; <span class="string" >'localhost'</span>, <span class="string" >'port'</span> =&gt; 11211),
    <span class="keyword" >array</span>(<span class="string" >'host'</span> =&gt; <span class="string" >'localhost'</span>, <span class="string" >'port'</span> =&gt; 11222)
);
<span class="keyword" >foreach</span>(<span class="vars" >$target</span> <span class="keyword" >as</span> <span class="vars" >$t</span>){
    <span class="vars" >$memcache</span> = <span class="keyword" >new</span> Memcache;
    <span class="vars" >$memcache</span>-&gt;connect(<span class="vars" >$t</span>[<span class="string" >'host'</span>], <span class="vars" >$t</span>[<span class="string" >'port'</span>]);

    <span class="vars" >$fail</span> = 0;
    <span class="vars" >$fails</span> = <span class="keyword" >array</span>();
    <span class="vars" >$elapsed</span> = microtime(true);
    <span class="vars" >$count</span> = 500;
    <span class="keyword" >for</span>(<span class="vars" >$i</span> = 0; <span class="vars" >$i</span> &lt; <span class="vars" >$count</span>; ++<span class="vars" >$i</span>){
        <span class="keyword" >if</span>(false === <span class="vars" >$memcache</span>-&gt;set(<span class="string" >'hoge'</span>, 1234, 0, 10)){
            <span class="func" >echo</span> <span class="string" >'ERROR!!'</span>, PHP_EOL;
        }
        <span class="keyword" >if</span>(false === <span class="vars" >$memcache</span>-&gt;set(<span class="string" >'hoge'</span>, 123, 0, 10)){
            <span class="func" >echo</span> <span class="string" >'ERROR!!!'</span>, PHP_EOL;
        }
        <span class="vars" >$value</span> = (int) <span class="vars" >$memcache</span>-&gt;get(<span class="string" >'hoge'</span>);
        <span class="keyword" >if</span>(false === <span class="vars" >$memcache</span>-&gt;set(<span class="string" >'hoge'</span>, ((int)<span class="vars" >$value</span> + 1), 0, 10)){
            <span class="func" >echo</span> <span class="string" >'ERROR!!!!'</span>, PHP_EOL;
        }
        <span class="vars" >$result</span> = (int)<span class="vars" >$memcache</span>-&gt;get(<span class="string" >'hoge'</span>);
        <span class="keyword" >if</span>(<span class="vars" >$result</span> != 124){
            <span class="vars" >$fails</span>[] = <span class="vars" >$result</span>;
            <span class="vars" >$fail</span>++;
        }
    }
    <span class="func" >echo</span> <span class="string" >'target host =&gt; '</span>, <span class="vars" >$t</span>[<span class="string" >'host'</span>], <span class="string" >' port =&gt;'</span>, <span class="vars" >$t</span>[<span class="string" >'port'</span>], PHP_EOL;
    <span class="func" >echo</span> <span class="string" >'elapsed: '</span>, (microtime(true) - <span class="vars" >$elapsed</span>), PHP_EOL;
    <span class="func" >echo</span> <span class="string" >'fail =&gt; '</span>, <span class="vars" >$fail</span>, PHP_EOL;
}</pre>

<pre class="php">target host =&gt; localhost port =&gt;11211
elapsed: 0.634283065796
fail =&gt; 0

target host =&gt; localhost port =&gt;11222
elapsed: 1.12030887604
fail =&gt; 0
</pre>

<p>とまあ、こんな感じ。（そこそこ？）<br />
ちなみに、stream_socketを使って書いてみたものも置いておく。fwriteまわりでハマったので動かないと思うけど。</p>

<pre class="php">
<span class="comment" >// 動かない＞＜</span>
<span class="keyword" >class</span> MemcachedServer {
    <span class="keyword" >protected</span> <span class="vars" >$command</span>;
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(MemcachedCommand <span class="vars" >$command</span>){
        <span class="vars" >$this</span>-&gt;command = <span class="vars" >$command</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> start(){
        <span class="vars" >$this</span>-&gt;run();
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> run(){
        <span class="vars" >$socket</span> = stream_socket_server(<span class="string" >'tcp://0.0.0.0:11222'</span>, <span class="vars" >$code</span>, <span class="vars" >$msg</span>, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
        <span class="keyword" >if</span>(false === <span class="vars" >$socket</span>){
            <span class="keyword" >throw</span> <span class="keyword" >new</span> RuntimeException(sprintf(<span class="string" >'stream_socket_server was error(%s):%s'</span>, <span class="vars" >$code</span>, <span class="vars" >$msg</span>));
        }
        <span class="vars" >$nonBlocking</span> = stream_set_blocking(<span class="vars" >$socket</span>, 0);
        <span class="keyword" >if</span>(false === <span class="vars" >$nonBlocking</span>){
            <span class="keyword" >throw</span> <span class="keyword" >new</span> RuntimeException(<span class="string" >'stream_set_blocking: set blocking mode error'</span>);
        }

        <span class="func" >echo</span> <span class="string" >'server start on tcp://0.0.0.0:11222'</span>, PHP_EOL;
        <span class="vars" >$read</span> = <span class="keyword" >array</span>(<span class="vars" >$socket</span>);
        <span class="vars" >$write</span> = null;
        <span class="vars" >$except</span> = null;
        <span class="keyword" >while</span>(true){
            <span class="vars" >$accept</span> = @stream_socket_accept(<span class="vars" >$socket</span>, 1);
            <span class="keyword" >if</span>(false === <span class="vars" >$accept</span>){
                <span class="keyword" >continue</span>;
            }
            <span class="vars" >$handler</span> = <span class="keyword" >new</span> MemcachedAcceptHandler(<span class="vars" >$accept</span>, <span class="vars" >$this</span>-&gt;command);
            <span class="vars" >$handler</span>-&gt;init();
            <span class="vars" >$handler</span>-&gt;execute();
            <span class="vars" >$handler</span>-&gt;destroy();
        }
    }
}

<span class="keyword" >interface</span> StreamReadWrite {
    <span class="keyword" >const</span> DELIMITER = <span class="string" >"\r\n"</span>;
    <span class="keyword" >public</span> <span class="keyword" >function</span> readLine();
    <span class="keyword" >public</span> <span class="keyword" >function</span> writeLine(<span class="vars" >$str</span>);
}

<span class="keyword" >class</span> MemcachedAcceptHandler <span class="keyword" >implements</span> StreamReadWrite {
    <span class="keyword" >protected</span> <span class="vars" >$socket</span>;
    <span class="keyword" >protected</span> <span class="vars" >$command</span>;
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(<span class="vars" >$socket</span>, MemcachedCommand <span class="vars" >$command</span>){
        <span class="vars" >$this</span>-&gt;socket = <span class="vars" >$socket</span>;
        <span class="vars" >$this</span>-&gt;command = <span class="vars" >$command</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> __destruct(){
        <span class="keyword" >if</span>(null !== <span class="vars" >$this</span>-&gt;socket){
            fclose(<span class="vars" >$this</span>-&gt;socket);
            unset(<span class="vars" >$this</span>-&gt;socket);
        }
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> init(){
        <span class="func" >echo</span> <span class="string" >'new connection'</span>, PHP_EOL;
        stream_set_blocking(<span class="vars" >$this</span>-&gt;socket, true);
        stream_set_timeout(<span class="vars" >$this</span>-&gt;socket, 60);
        <span class="comment" >// disable writebuffer</span>
        stream_set_write_buffer(<span class="vars" >$this</span>-&gt;socket, 0);
    }
    <span class="keyword" >protected</span> <span class="keyword" >function</span> <span class="func" >feof</span>(){
        <span class="vars" >$recv</span> = stream_socket_recvfrom(<span class="vars" >$this</span>-&gt;socket, 1, STREAM_PEEK);
        <span class="keyword" >return</span> <span class="func" >strlen</span>(<span class="vars" >$recv</span>) &lt; 1;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> execute(){
        <span class="keyword" >while</span>(true){
            <span class="vars" >$read</span> = <span class="keyword" >array</span>(<span class="vars" >$this</span>-&gt;socket);
            <span class="vars" >$write</span> = <span class="keyword" >array</span>();
            <span class="vars" >$except</span> = <span class="keyword" >array</span>();
            <span class="vars" >$select</span> = @stream_select(<span class="vars" >$read</span>, <span class="vars" >$write</span>, <span class="vars" >$except</span>, 1);
            <span class="keyword" >if</span>(false === <span class="vars" >$select</span>){
                <span class="keyword" >throw</span> <span class="keyword" >new</span> RuntimeException(<span class="string" >'stream_select'</span>);
            }
            <span class="keyword" >if</span>(<span class="vars" >$select</span> &lt; 1){
                <span class="keyword" >continue</span>;
            }

            <span class="vars" >$line</span> = <span class="vars" >$this</span>-&gt;readLine();
            <span class="func" >echo</span> <span class="string" >'line =&gt; '</span>, <span class="vars" >$line</span>, PHP_EOL;
            <span class="keyword" >if</span>(null === <span class="vars" >$line</span>){
                <span class="keyword" >return</span>;
            }
            <span class="comment" >// get hoge =&gt; get</span>
            <span class="vars" >$mode</span> = <span class="func" >substr</span>(<span class="vars" >$line</span>, 0, 3);
            <span class="comment" >// get hoge foo =&gt; array(hoge, foo)</span>
            <span class="vars" >$args</span> = <span class="func" >explode</span>(<span class="string" >' '</span>, <span class="func" >substr</span>(<span class="vars" >$line</span>, 4));
            <span class="vars" >$this</span>-&gt;command-&gt;call(<span class="vars" >$this</span>, <span class="vars" >$mode</span>, <span class="vars" >$args</span>);
        }
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> destroy(){
        <span class="func" >echo</span> <span class="string" >'close connection'</span>, PHP_EOL;
        <span class="comment" >// socket_close($socket)</span>
        stream_socket_shutdown(<span class="vars" >$this</span>-&gt;socket, STREAM_SHUT_RDWR);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> readLine(){
        <span class="comment" >// 1048576 = 1024 * 1024</span>
        <span class="comment" >//socket_read($socket, 1048576, PHP_NORMAL_READ</span>
        <span class="vars" >$line</span> = stream_get_line(<span class="vars" >$this</span>-&gt;socket, 2048, self::DELIMITER);
        <span class="keyword" >if</span>(<span class="func" >empty</span>(<span class="vars" >$line</span>)){
            <span class="keyword" >return</span> null;
        }
        <span class="keyword" >if</span>(preg_match(<span class="string" >'/^\s+$/'</span>, <span class="vars" >$line</span>)){
            <span class="keyword" >return</span> null;
        }
        <span class="keyword" >return</span> <span class="vars" >$line</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> writeLine(<span class="vars" >$str</span>){
        <span class="func" >fputs</span>(<span class="vars" >$this</span>-&gt;socket, <span class="vars" >$str</span> . <span class="string" >"\n\0"</span>);
    }
}

<span class="keyword" >interface</span> MemcachedCommand {
    <span class="keyword" >public</span> <span class="keyword" >function</span> call(StreamReadWrite <span class="vars" >$reader</span>, <span class="vars" >$mode</span>, <span class="keyword" >array</span> <span class="vars" >$args</span>);
}

<span class="keyword" >abstract</span> <span class="keyword" >class</span> AbstractMemcachedCommand <span class="keyword" >implements</span> MemcachedCommand {
    <span class="keyword" >protected</span> <span class="vars" >$reflector</span>;
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(){
        <span class="vars" >$this</span>-&gt;reflector = <span class="keyword" >new</span> ReflectionObject(<span class="vars" >$this</span>);
    }
    <span class="keyword" >protected</span> <span class="keyword" >static</span> <span class="keyword" >function</span> concat(<span class="keyword" >array</span> <span class="vars" >$a</span>, <span class="keyword" >array</span> <span class="vars" >$b</span>){
        <span class="func" >array_splice</span>(<span class="vars" >$a</span>, <span class="func" >count</span>(<span class="vars" >$a</span>), 0, <span class="vars" >$b</span>);
        <span class="keyword" >return</span> <span class="vars" >$a</span>;
    }
    <span class="keyword" >public</span> final <span class="keyword" >function</span> call(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$mode</span>, <span class="keyword" >array</span> <span class="vars" >$args</span>){
        <span class="keyword" >if</span>(!<span class="vars" >$this</span>-&gt;reflector-&gt;hasMethod(<span class="vars" >$mode</span>)){
            <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;error(<span class="vars" >$rw</span>);
        }
        <span class="keyword" >return</span> call_user_func_array(<span class="keyword" >array</span>(<span class="vars" >$this</span>, <span class="vars" >$mode</span>), self::concat(<span class="keyword" >array</span>(<span class="vars" >$rw</span>), <span class="vars" >$args</span>));
    }
    <span class="keyword" >protected</span> <span class="keyword" >function</span> error(<span class="vars" >$rw</span>){
        <span class="vars" >$rw</span>-&gt;writeLine(<span class="string" >'ERROR'</span>);
    }
    <span class="keyword" >protected</span> <span class="keyword" >abstract</span> <span class="keyword" >function</span> get(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$keys</span>);
    <span class="keyword" >protected</span> <span class="keyword" >abstract</span> <span class="keyword" >function</span> set(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$key</span>, <span class="vars" >$flag</span>, <span class="vars" >$expire</span>, <span class="vars" >$length</span>);
}
<span class="keyword" >class</span> StorageMemcacheCommand <span class="keyword" >extends</span> AbstractMemcachedCommand {
    <span class="keyword" >protected</span> <span class="vars" >$cache</span> = <span class="keyword" >array</span>();
    <span class="keyword" >protected</span> <span class="keyword" >function</span> get(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$keys</span>){
        <span class="vars" >$args</span> = func_get_args();
        <span class="func" >array_shift</span>(<span class="vars" >$args</span>);
        <span class="keyword" >foreach</span>(<span class="vars" >$args</span> <span class="keyword" >as</span> <span class="vars" >$key</span>){
            <span class="keyword" >if</span>(!isset(<span class="vars" >$this</span>-&gt;cache[<span class="vars" >$key</span>])){
                <span class="keyword" >continue</span>;
            }
            <span class="vars" >$value</span> = <span class="vars" >$this</span>-&gt;cache[<span class="vars" >$key</span>];
            <span class="vars" >$rw</span>-&gt;writeLine(sprintf(<span class="string" >'VALUE %s %d %d'</span>, <span class="vars" >$key</span>, <span class="vars" >$value</span>-&gt;flag, <span class="vars" >$value</span>-&gt;expire));
            <span class="vars" >$rw</span>-&gt;writeLine(<span class="vars" >$value</span>-&gt;value);
        }
        <span class="vars" >$rw</span>-&gt;writeLine(<span class="string" >'END'</span>);
    }
    <span class="keyword" >protected</span> <span class="keyword" >function</span> set(StreamReadWrite <span class="vars" >$rw</span>, <span class="vars" >$key</span>, <span class="vars" >$flag</span>, <span class="vars" >$expire</span>, <span class="vars" >$length</span>){
        <span class="vars" >$value</span> = <span class="keyword" >new</span> stdClass;
        <span class="vars" >$value</span>-&gt;flag = <span class="vars" >$flag</span>;
        <span class="vars" >$value</span>-&gt;expire = <span class="vars" >$expire</span>;
        <span class="vars" >$value</span>-&gt;length = <span class="vars" >$length</span>;
        <span class="vars" >$value</span>-&gt;value = <span class="vars" >$rw</span>-&gt;readLine();
        <span class="vars" >$this</span>-&gt;cache[<span class="vars" >$key</span>] = <span class="vars" >$value</span>;
        <span class="vars" >$rw</span>-&gt;writeLine(<span class="string" >'STORED'</span>);
    }

}

<span class="vars" >$server</span> = <span class="keyword" >new</span> MemcachedServer(<span class="keyword" >new</span> StorageMemcacheCommand);
<span class="vars" >$server</span>-&gt;start();</pre>
]]>
</content:encoded>
</item>
<item rdf:about="http://blog.xole.net/article.php?id=742">
<title>memcacheの cache の部分を java で（その2）</title>
<link>http://blog.xole.net/article.php?id=742</link>
<dc:date>2010-01-20T00:21:29+09:00</dc:date>
<description>
前回の続き。

前回の結果、とりあえず、DelayQueueによって期限切れのEntryは取り出せるようになった。
だけど、この仕組みの状態では期限切れになったEntryは容赦なく消えていってしまう。
つまり、そのEntryがまだ...</description>
<content:encoded>
<![CDATA[
<p>
<a href="http://blog.xole.net/article.php?id=740">前回</a>の続き。</p>

<p>前回の結果、とりあえず、DelayQueueによって期限切れのEntryは取り出せるようになった。<br />
だけど、この仕組みの状態では期限切れになったEntryは容赦なく消えていってしまう。<br />
つまり、そのEntryがまだ利用されるかもしれないのに、消えてしまうのは（色々な意味で）もったいない。<br />
特に、javaだとHashMapとかのloadFactorまわりの動きは（きっと）もったいない</p>

<p>もう一度LRUについてwikipediaに確認すると</p>

<blockquote cite="http://ja.wikipedia.org/wiki/Least_Recently_Used">
<p>Least Recently Used (LRU) はキャッシュメモリや仮想メモリが扱うデータのリソースへの割り当てを決定するアルゴリズムである。対義語はMost Recently Used (MRU)。
<br />
和訳すると「最近最も使われなかったもの」つまり「使われてから最も長い時間が経ったもの」「参照される頻度が最も低いもの」である。</p>
</blockquote>

<p>ということなので（？）、ホントに使われ無かったもの（アクセスが少ないもの）から順番に消えるように考えてみた。</p>

<p>その結果がこれ</p>

<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> LRUCache <span class="keyword" >implements</span> CacheLifeCycle {
    
    <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >final</span> Log log = LogFactory.getLog(LRUCache.<span class="keyword" >class</span>);
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> ConcurrentMap&lt;String, PrioritalEntry&gt; cache = <span class="keyword" >new</span> ConcurrentHashMap&lt;String, PrioritalEntry&gt;();

    <span class="keyword" >protected</span> <span class="keyword" >final</span> DelayQueue&lt;PrioritalEntry&gt; expiredQueue =  <span class="keyword" >new</span> DelayQueue&lt;PrioritalEntry&gt;();
    
    <span class="keyword" >private</span> <span class="keyword" >final</span> ReadWriteLock lock = <span class="keyword" >new</span> ReentrantReadWriteLock();
    
    <span class="keyword" >private</span> <span class="keyword" >final</span> Lock writeLock = lock.writeLock();
    
    <span class="keyword" >private</span> <span class="keyword" >final</span> Lock readLock = lock.readLock();
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> register(LifeCycleExecutor executor){
        executor.add(<span class="keyword" >this</span>, expiredQueue);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> purge(PrioritalEntry entry){
        writeLock.lock();
        <span class="keyword" >try</span> {
            <span class="comment" >// priorityを下げる</span>
            <span class="keyword" >if</span>(entry.decrementPriority() &lt; <span class="number" >1</span>){
                <span class="comment" >// 1以下なら誰も使ってなさそうなので消す</span>
                <span class="keyword" >if</span>(log.isDebugEnabled()){
                    log.debug(<span class="string" >"purge entry =&gt; "</span> + entry);
                }
                cache.remove(entry.getKey());
                <span class="keyword" >return</span>;
            }
            <span class="comment" >// もう一度チャンス</span>
            entry.setExpiredAt(retransmission(entry.getPriority()));
            expiredQueue.offer(entry);
        } <span class="keyword" >finally</span> {
            writeLock.unlock();
        }
    }
    
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> set(String key, String value, <span class="keyword" >long</span> expiredAt){
        writeLock.lock();
        <span class="keyword" >try</span> {
            PrioritalEntry newEntry = <span class="keyword" >new</span> PrioritalEntry(key, value, expiredAt);
            PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry);
            <span class="comment" >// 既存の値がない</span>
            <span class="keyword" >if</span>(<span class="keyword" >null</span> == previousEntry){
                <span class="comment" >// ということは新しい値</span>
                <span class="keyword" >return</span> expiredQueue.offer(newEntry);
            }
            previousEntry.setValue(value);
            previousEntry.setExpiredAt(expiredAt);
            previousEntry.incrementPriority();
            <span class="keyword" >return</span> <span class="keyword" >true</span>;
        } <span class="keyword" >finally</span> {
            writeLock.unlock();
        }
    }

    <span class="keyword" >public</span> PrioritalEntry get(String key) {
        readLock.lock();
        <span class="keyword" >try</span> {
            <span class="keyword" >if</span>(!cache.containsKey(key)){
                <span class="keyword" >return</span> <span class="keyword" >null</span>;
            }
            
            PrioritalEntry entry = cache.get(key);
            <span class="comment" >// 時間切れなので見えなくする</span>
            <span class="keyword" >if</span>(entry.isExpired()){
                <span class="keyword" >return</span> <span class="keyword" >null</span>;
            }
            <span class="comment" >// 使う人がいたのでpriorityをあげる</span>
            entry.incrementPriority();
            <span class="keyword" >return</span> entry;
        } <span class="keyword" >finally</span> {
            readLock.unlock();
        }
    }

    <span class="keyword" >public</span> <span class="keyword" >void</span> remove(String key) {
        remove(key, <span class="number" >0</span>);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> remove(String key, <span class="keyword" >long</span> expiredAt){
        writeLock.lock();
        <span class="keyword" >try</span> {
            PrioritalEntry entry = cache.get(key);
            <span class="keyword" >if</span>(<span class="keyword" >null</span> != entry){
                entry.setExpiredAt(expiredAt);
                entry.decrementPriority();
            }
        } <span class="keyword" >finally</span> {
            writeLock.unlock();
        }
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> flush() {
        flush(<span class="number" >0</span>);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> flush(<span class="keyword" >final</span> <span class="keyword" >long</span> expiredAt){
        writeLock.lock();
        <span class="keyword" >try</span> {
            Iterator&lt;Map.Entry&lt;String, PrioritalEntry&gt;&gt; entries = cache.entrySet().iterator();
            <span class="keyword" >while</span>(entries.hasNext()){
                <span class="keyword" >final</span> Map.Entry&lt;String, PrioritalEntry&gt; entry = entries.next();
                <span class="keyword" >final</span> PrioritalEntry value = entry.getValue();
                value.setExpiredAt(expiredAt);
            }
        } <span class="keyword" >finally</span> {
            writeLock.unlock();
        }
    }
    
    <span class="keyword" >protected</span> <span class="keyword" >static</span> <span class="keyword" >long</span> retransmission(<span class="keyword" >final</span> <span class="keyword" >int</span> currentPriority){
        <span class="keyword" >if</span>(currentPriority &lt; PrioritalEntry.MAX_PRIORITY){
            <span class="comment" >// binary exponential backoff</span>
            <span class="keyword" >long</span> freq = currentPriority + Math.round(Math.pow(<span class="number" >2</span>, currentPriority));
            <span class="keyword" >return</span> Math.round(<span class="number" >0.875</span> * freq) + Math.round(<span class="number" >0.125</span> * currentPriority);
        }
        <span class="keyword" >return</span> retransmission(PrioritalEntry.MAX_PRIORITY - <span class="number" >1</span>);
    }
}</pre>

<p>んで、PrioritalEntryの実装はこれ</p>

<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> PrioritalEntry <span class="keyword" >implements</span> Delayed {
    
    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >final</span> <span class="keyword" >int</span> MAX_PRIORITY = <span class="number" >10</span>;
    
    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >final</span> <span class="keyword" >int</span> DEFAULT_PRIORITY = <span class="number" >5</span>;
    
    <span class="keyword" >private</span> <span class="keyword" >final</span> AtomicInteger priority = <span class="keyword" >new</span> AtomicInteger(DEFAULT_PRIORITY);
    
    <span class="keyword" >private</span> <span class="keyword" >final</span> String key;
    
    <span class="keyword" >private</span> <span class="keyword" >final</span> AtomicReference&lt;String&gt; value;
    
    <span class="keyword" >private</span> <span class="keyword" >final</span> AtomicLong expiredAt;
    
    <span class="keyword" >public</span> PrioritalEntry(<span class="keyword" >final</span> String key, <span class="keyword" >final</span> String value, <span class="keyword" >final</span> <span class="keyword" >long</span> expiredAt){
        <span class="keyword" >this</span>.key = key;
        <span class="keyword" >this</span>.value = <span class="keyword" >new</span> AtomicReference&lt;String&gt;(value);
        <span class="keyword" >if</span>(<span class="number" >0</span> == expiredAt){
            <span class="keyword" >this</span>.expiredAt = <span class="keyword" >new</span> AtomicLong(Long.MAX_VALUE);
        } <span class="keyword" >else</span> <span class="keyword" >if</span>(expiredAt &lt; <span class="number" >0</span>) {
            <span class="keyword" >this</span>.expiredAt = <span class="keyword" >new</span> AtomicLong(<span class="number" >0</span>);
        } <span class="keyword" >else</span> {
            <span class="keyword" >this</span>.expiredAt = <span class="keyword" >new</span> AtomicLong(relative(expiredAt));
        }
    }

    <span class="keyword" >protected</span> <span class="keyword" >static</span> <span class="keyword" >long</span> relative(<span class="keyword" >long</span> expiredAt){
        <span class="comment" >// 現在時間 + 指定時間</span>
        <span class="keyword" >return</span> System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    <span class="keyword" >protected</span> <span class="keyword" >static</span> <span class="keyword" >long</span> absolute(<span class="keyword" >long</span> expiredAt){
        <span class="keyword" >return</span> TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >int</span> incrementPriority(){
        <span class="keyword" >if</span>(MAX_PRIORITY &lt; priority.get()){
            <span class="keyword" >return</span> MAX_PRIORITY;
        }
        <span class="keyword" >return</span> priority.incrementAndGet();
    }
    
    <span class="keyword" >public</span> <span class="keyword" >int</span> decrementPriority(){
        <span class="keyword" >return</span> priority.decrementAndGet();
    }
    
    <span class="keyword" >public</span> <span class="keyword" >int</span> getPriority(){
        <span class="keyword" >return</span> priority.get();
    }

    <span class="keyword" >public</span> String getKey(){
        <span class="keyword" >return</span> key;
    }
    
    <span class="keyword" >public</span> String getValue(){
        <span class="keyword" >return</span> value.get();
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> setValue(String value){
        <span class="keyword" >this</span>.value.set(value);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> setExpiredAt(<span class="keyword" >long</span> newValue){
        <span class="comment" >// 現在時間より次の期限を設定</span>
        <span class="keyword" >this</span>.expiredAt.set(relative(newValue));
    }

    <span class="keyword" >private</span> <span class="keyword" >long</span> elapsed(){
        <span class="keyword" >return</span> expiredAt.get() - System.currentTimeMillis();
    }
    
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> isExpired(){
        <span class="keyword" >return</span> elapsed() &lt; <span class="number" >1</span>;
    }

    <span class="keyword" >public</span> <span class="keyword" >long</span> getDelay(TimeUnit unit) {
        <span class="comment" >// expiredの時間から経過時間を引き、残り時間を算出</span>
        <span class="keyword" >return</span> unit.convert(elapsed(), TimeUnit.MILLISECONDS);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >int</span> compareTo(Delayed o) {
        PrioritalEntry target = (PrioritalEntry) o;
        <span class="keyword" >final</span> <span class="keyword" >long</span> x = expiredAt.get();
        <span class="keyword" >final</span> <span class="keyword" >long</span> y = target.expiredAt.get();
        <span class="keyword" >if</span>(x &lt; y){
            <span class="keyword" >return</span> -<span class="number" >1</span>;
        }
        <span class="keyword" >if</span>(x &gt; y){
            <span class="keyword" >return</span> <span class="number" >1</span>;
        }
        <span class="keyword" >return</span> <span class="number" >0</span>;
    }
    
    <span class="keyword" >public</span> String toString(){
        StringBuilder buf = <span class="keyword" >new</span> StringBuilder();
        buf.append(<span class="string" >"key="</span>).append(key).append(<span class="string" >","</span>);
        buf.append(<span class="string" >"value="</span>).append(value).append(<span class="string" >","</span>);
        buf.append(<span class="string" >"priority="</span>).append(priority);
        <span class="keyword" >return</span> buf.toString();
    }
    
}</pre>

<p>他にも</p>
<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >interface</span> CacheLifeCycle {
    <span class="keyword" >public</span> <span class="keyword" >void</span> register(LifeCycleExecutor executor);

    <span class="keyword" >public</span> <span class="keyword" >void</span> purge(PrioritalEntry entry);
}

<span class="keyword" >public</span> <span class="keyword" >class</span> LifeCycleExecutor {
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> ExecutorService executor;
    
    <span class="keyword" >public</span> LifeCycleExecutor(<span class="keyword" >final</span> ExecutorService executor){
        <span class="keyword" >this</span>.executor = executor;
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> add(CacheLifeCycle lifeCycle, BlockingQueue&lt;PrioritalEntry&gt; queue){
        executor.execute(<span class="keyword" >new</span> Monitor(lifeCycle, queue));
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> shutdown(){
        executor.shutdown();
        <span class="keyword" >try</span> {
            <span class="keyword" >if</span>(!executor.awaitTermination(<span class="number" >10</span>, TimeUnit.SECONDS)){
                executor.shutdownNow();
            }
        } <span class="keyword" >catch</span>(InterruptedException e){
            e.printStackTrace(System.err);
        }
    }
    
    <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >class</span> Monitor <span class="keyword" >implements</span> Runnable {
        <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >final</span> Log log = LogFactory.getLog(Monitor.<span class="keyword" >class</span>);
        <span class="keyword" >private</span> <span class="keyword" >final</span> CacheLifeCycle lifeCycle;
        <span class="keyword" >private</span> <span class="keyword" >final</span> BlockingQueue&lt;PrioritalEntry&gt; queue;
        <span class="keyword" >private</span> Monitor(CacheLifeCycle lifeCycle, BlockingQueue&lt;PrioritalEntry&gt; queue){
            <span class="keyword" >this</span>.lifeCycle = lifeCycle;
            <span class="keyword" >this</span>.queue = queue;
        }
        <span class="keyword" >public</span> <span class="keyword" >void</span> run(){
            <span class="keyword" >try</span> {
                <span class="keyword" >while</span>(<span class="keyword" >true</span>){
                    PrioritalEntry entry = queue.take();
                    <span class="keyword" >if</span>(log.isDebugEnabled()){
                        log.debug(<span class="string" >"find entry =&gt; "</span> + entry);
                    }
                    lifeCycle.purge(entry);
                }
            } <span class="keyword" >catch</span>(InterruptedException e){
            }
        }
    }

}</pre>

<p>細かく解説していく（誰に？）と</p>

<p>まず、PrioritalEntry(スペルとか意味は気にしない)の実装側から</p>

<p>ほとんどの実装は前回のと同じ。<br />今回は他にpriority(優先度)も一緒に持ってみた<br />
インスタンス生成時は</p>

<pre class="java">    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >final</span> <span class="keyword" >int</span> MAX_PRIORITY = <span class="number" >10</span>;
    
    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >final</span> <span class="keyword" >int</span> DEFAULT_PRIORITY = <span class="number" >5</span>;
    
    <span class="keyword" >private</span> <span class="keyword" >final</span> AtomicInteger priority = <span class="keyword" >new</span> AtomicInteger(DEFAULT_PRIORITY);</pre>

<p>で作られるんだけど、LRUCacheのgetとかの呼び出しが行われる毎に、priorityをincrementしてる</p>

<pre class="java">
<span class="comment" >// class LRUCache</span>
    <span class="keyword" >public</span> PrioritalEntry get(String key) {
        readLock.lock();
        <span class="keyword" >try</span> {
            <span class="keyword" >if</span>(!cache.containsKey(key)){
                <span class="keyword" >return</span> <span class="keyword" >null</span>;
            }
            
            PrioritalEntry entry = cache.get(key);
            <span class="comment" >// 時間切れなので見えなくする</span>
            <span class="keyword" >if</span>(entry.isExpired()){
                <span class="keyword" >return</span> <span class="keyword" >null</span>;
            }
            <span class="comment" >// 使う人がいたのでpriorityをあげる</span>
            entry.incrementPriority();
            <span class="keyword" >return</span> entry;
        } <span class="keyword" >finally</span> {
            readLock.unlock();
        }
    }

<span class="comment" >// class PrioritalEntry</span>
    <span class="keyword" >public</span> <span class="keyword" >int</span> incrementPriority(){
        <span class="keyword" >if</span>(MAX_PRIORITY &lt; priority.get()){
            <span class="keyword" >return</span> MAX_PRIORITY;
        }
        <span class="keyword" >return</span> priority.incrementAndGet();
    }
    
    <span class="keyword" >public</span> <span class="keyword" >int</span> decrementPriority(){
        <span class="keyword" >return</span> priority.decrementAndGet();
    }
    
    <span class="keyword" >public</span> <span class="keyword" >int</span> getPriority(){
        <span class="keyword" >return</span> priority.get();
    }</pre>

<p>これで、最も "使われる頻度の高い" Entryは "priorityが高い" という状態を作り出してる。</p>

<p>その他にも、setで値が更新される際にも、既に同じkeyでEntryが作られてたらその部分を "再利用" してみた</p>

<pre class="java">    <span class="keyword" >public</span> <span class="keyword" >boolean</span> set(String key, String value, <span class="keyword" >long</span> expiredAt){
        writeLock.lock();
        <span class="keyword" >try</span> {
            PrioritalEntry newEntry = <span class="keyword" >new</span> PrioritalEntry(key, value, expiredAt);
            PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry);
            <span class="comment" >// 既存の値がない</span>
            <span class="keyword" >if</span>(<span class="keyword" >null</span> == previousEntry){
                <span class="comment" >// ということは新しい値</span>
                <span class="keyword" >return</span> expiredQueue.offer(newEntry);
            }
            previousEntry.setValue(value);
            previousEntry.setExpiredAt(expiredAt);
            previousEntry.incrementPriority();
            <span class="keyword" >return</span> <span class="keyword" >true</span>;
        } <span class="keyword" >finally</span> {
            writeLock.unlock();
        }
    }</pre>

<p>ということで、priorityという値によって最近使われているかどうかまで、表現できたので、次は値の削除について</p>

<pre class="java">    <span class="keyword" >public</span> <span class="keyword" >void</span> purge(PrioritalEntry entry){
        writeLock.lock();
        <span class="keyword" >try</span> {
            <span class="comment" >// priorityを下げる</span>
            <span class="keyword" >if</span>(entry.decrementPriority() &lt; <span class="number" >1</span>){
                <span class="comment" >// 1以下なら誰も使ってなさそうなので消す</span>
                <span class="keyword" >if</span>(log.isDebugEnabled()){
                    log.debug(<span class="string" >"purge entry =&gt; "</span> + entry);
                }
                cache.remove(entry.getKey());
                <span class="keyword" >return</span>;
            }
            <span class="comment" >// もう一度チャンス</span>
            entry.setExpiredAt(retransmission(entry.getPriority()));
            expiredQueue.offer(entry);
        } <span class="keyword" >finally</span> {
            writeLock.unlock();
        }
    }</pre>

<p>purgeというメソッドでは、LifeCycleExecutorによって、DelayQueueの中身が取り出された "期限切れ" の Entry が引数に渡される</p>

<pre class="java">        <span class="keyword" >public</span> <span class="keyword" >void</span> run(){
            <span class="keyword" >try</span> {
                <span class="keyword" >while</span>(<span class="keyword" >true</span>){
                    PrioritalEntry entry = queue.take();
                    <span class="keyword" >if</span>(log.isDebugEnabled()){
                        log.debug(<span class="string" >"find entry =&gt; "</span> + entry);
                    }
                    lifeCycle.purge(entry);
                }
            } <span class="keyword" >catch</span>(InterruptedException e){
            }
        }</pre>

<p>期限切れの Entry であっても、すぐに削除は行わずに "再利用" されるチャンスを与えるため、期限切れの Entry であっても Delay Queue に再送(retransmission)している。<br />
ただ単純に再送するのではなく、再送のアルゴリズムとして昔どこかで覚えた "<a href="http://www.atmarkit.co.jp/fwin2k/network/tcpip006/tcpip04.html">binary exponential backoff</a>" を使ってみた（うろ覚えバージョン）</p>

<pre class="java">
<span class="comment" >// binary exponential backoff</span>
<span class="keyword" >long</span> freq = currentPriority + Math.round(Math.pow(<span class="number" >2</span>, currentPriority));
<span class="keyword" >return</span> Math.round(<span class="number" >0.875</span> * freq) + Math.round(<span class="number" >0.125</span> * currentPriority);</pre>

<p>これを実行してみるとこんな時間が求められる</p>

<pre class="java">retransmission(<span class="number" >0</span>) =&gt; <span class="number" >1</span>
retransmission(<span class="number" >1</span>) =&gt; <span class="number" >3</span>
retransmission(<span class="number" >2</span>) =&gt; <span class="number" >5</span>
retransmission(<span class="number" >3</span>) =&gt; <span class="number" >10</span>
retransmission(<span class="number" >4</span>) =&gt; <span class="number" >19</span>
retransmission(<span class="number" >5</span>) =&gt; <span class="number" >33</span>
retransmission(<span class="number" >6</span>) =&gt; <span class="number" >62</span>
retransmission(<span class="number" >7</span>) =&gt; <span class="number" >119</span>
retransmission(<span class="number" >8</span>) =&gt; <span class="number" >232</span>
retransmission(<span class="number" >9</span>) =&gt; <span class="number" >457</span>
:
:
:</pre>

<p>これは再送時間を作るときに、本来ならリトライ回数を使うんだけど（リトライの数が多いほど、次回の再送時間までの待ちが増える、よってこまめに再送させない）、ここでは priority を使用している。<br />
これによって、priorityが高いものほど、時間切れでも生存時間を多くすることができる。これによって再利用のチャンスが増えるはず。（priorityが高い = 最も再利用される可能性が高そう）</p>

<p>ということで、簡単に動かしてみる</p>

<pre class="java">LRUCache cache = <span class="keyword" >new</span> LRUCache();
LifeCycleExecutor executor = <span class="keyword" >new</span> LifeCycleExecutor(Executors.newCachedThreadPool());
cache.register(executor);

cache.set(<span class="string" >"hoge"</span>, <span class="string" >"123"</span>, <span class="number" >5</span>);
cache.set(<span class="string" >"foo"</span>, <span class="string" >"456"</span>, <span class="number" >2</span>);
cache.set(<span class="string" >"bar"</span>, <span class="string" >"789"</span>, <span class="number" >2</span>);

cache.get(<span class="string" >"foo"</span>);

<span class="keyword" >try</span> {
    TimeUnit.SECONDS.sleep(<span class="number" >120</span>);
} <span class="keyword" >catch</span>(InterruptedException e){
}

executor.shutdown();</pre>

<p>で、これを動かしてみると。。。</p>

<pre class="java">LifeCycleExecutor$Monitor: find entry =&gt; key=foo,value=<span class="number" >456</span>,priority=<span class="number" >6</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=bar,value=<span class="number" >789</span>,priority=<span class="number" >5</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=hoge,value=<span class="number" >123</span>,priority=<span class="number" >5</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=bar,value=<span class="number" >789</span>,priority=<span class="number" >4</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=hoge,value=<span class="number" >123</span>,priority=<span class="number" >4</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=bar,value=<span class="number" >789</span>,priority=<span class="number" >3</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=hoge,value=<span class="number" >123</span>,priority=<span class="number" >3</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=foo,value=<span class="number" >456</span>,priority=<span class="number" >5</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=bar,value=<span class="number" >789</span>,priority=<span class="number" >2</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=hoge,value=<span class="number" >123</span>,priority=<span class="number" >2</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=bar,value=<span class="number" >789</span>,priority=<span class="number" >1</span>
LRUCache: purge entry =&gt; key=bar,value=<span class="number" >789</span>,priority=<span class="number" >0</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=hoge,value=<span class="number" >123</span>,priority=<span class="number" >1</span>
LRUCache: purge entry =&gt; key=hoge,value=<span class="number" >123</span>,priority=<span class="number" >0</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=foo,value=<span class="number" >456</span>,priority=<span class="number" >4</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=foo,value=<span class="number" >456</span>,priority=<span class="number" >3</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=foo,value=<span class="number" >456</span>,priority=<span class="number" >2</span>
LifeCycleExecutor$Monitor: find entry =&gt; key=foo,value=<span class="number" >456</span>,priority=<span class="number" >1</span>
LRUCache: purge entry =&gt; key=foo,value=<span class="number" >456</span>,priority=<span class="number" >0</span>
</pre>

<p>アクセスのあった "foo" キーは、 同じタイムアウトが設定されている "bar" よりも後に削除されている。<br />
これでなんちゃって LRU はできた（？）</p>

<p>ちなみに、writeLockとかreadLockなどはたぶん使ってない。というか使わなくてもいいはず。</p>

<p>ちなみに、本家 memcached は <a href="http://gihyo.jp/dev/feature/01/memcached/0002">slab allocation</a>とかを使っているので、メモリの効率からいうとそっちの方がいいかも。<br />
こういった部分を効率よく切り替えれるようにしたいものです</p>

<p>次回は、分散ハッシュ（というか複数台で連携）など。</p>
]]>
</content:encoded>
</item>
<item rdf:about="http://blog.xole.net/article.php?id=741">
<title>最近の我が家の電源まわり事情</title>
<link>http://blog.xole.net/article.php?id=741</link>
<dc:date>2010-01-17T20:39:11+09:00</dc:date>
<description>最近、我が家の電源まわりが結構すっきりしてきたので、一部紹介しようかと思います

電源タップがすっきりした！
以前はごちゃごちゃしていた、電源まわりが↓のように非常にすっきりしてます。




というのも、ここ最近あらゆる電源ま...</description>
<content:encoded>
<![CDATA[
<p>最近、我が家の電源まわりが結構すっきりしてきたので、一部紹介しようかと思います</p>

<h3>電源タップがすっきりした！</h3>
<p>以前はごちゃごちゃしていた、電源まわりが↓のように非常にすっきりしてます。</p>
<p>
<img src="http://blog.xole.net/resources/pic001.jpg" />
</p>

<p>というのも、ここ最近あらゆる電源まわりがUSB(mini-usb/micro-usb)経由で充電できるものが増えてきたため、USBハブに電源まわりを一元化させてます。<br />
<img src="http://blog.xole.net/resources/pic002.jpg" />
<br />（タコ足ならぬタコUSB状態）</p>

<p>ネット環境もイーモバの<a href="http://emobile.jp/products/hw/d25hw/">PocketWifi</a>にすることで、大きかった（十分小さいけど）、バッファローのルータなどはおさらばです。<br />
でも、その代わりにUSBで電源を供給できるようになった！</p>

<h3>100円ショップを活用</h3>
<p>ほぼ全てのガジェット関係はUSBから電源を供給できるようになっているものの、一部、付属してなかったりオプションだったりするのですが<br />
最近の100円ショップは進化してます。<br />
USB部分が、mini-usb化されていて、必要なオプションだけ選んで買えというスタイル。<br />
<img src="http://blog.xole.net/resources/pic003.jpg" />
<br/>（先端の小さいフタみたいのを個別に買ってくる。これでDSもUSBから電源を供給している）</p>

<p>そうそう。こんな感じのが100円ショップで売ってる</p>
<iframe src="http://rcm-jp.amazon.co.jp/e/cm?lt1=_blank&bc1=FFFFFF&IS2=1&npa=1&bg1=FFFFFF&fc1=000000&lc1=0000FF&t=lilydaselfish-22&o=9&p=8&l=as1&m=amazon&f=ifr&md=1X69VDGQCMF7Z30FM082&asins=B00280MGS4" style="width:120px;height:240px;" scrolling="no" marginwidth="0" marginheight="0" frameborder="0">
</iframe>

<p>しかも100円ショップでも、1メートル級のUSBも売っていたりすので、電源から遠い場所でも安心！<br />
<img src="http://blog.xole.net/resources/pic0033.jpg" />
<br />
（びよーん。比較にipod mini置いてみた）</p>


<h3>なんでもUSB</h3>
<p>ただ、100円ショップは微妙に品揃えが悪かったりする。例えば、iPod関係のUSBポートが売ってなかったりする。。<br />
そんな時は<a href="http://www.donki.com">ドンキホーテ</a>。100円ショップに匹敵かそれ以上のUSB関係の機器が売ってます。<br />
<img src="http://blog.xole.net/resources/pic004.jpg" />
<br />（これはドンキで買ってきた。iPod/iPod Touch用とiPod mini用）</p>
<p>もう、USBまわりを調整するなら100円ショップとドンキがあれば確実です。</p>

<h3>常に動かすものも、USBで</h3>
<p>ちょっと例が悪いかもしれないですが、最近池袋にあるゲーセンのUFOキャッチャーで取れたこれ。<br />
<img src="http://blog.xole.net/resources/pic006.jpg" />
<br />
（画像が汚いけど、これ・・加湿器なんだぜ・・・）</p>

<p>夏は、扇風機もUSB経由になることは、すでに予想されます<br />
<img src="http://blog.xole.net/resources/pic0066.jpg" />
<br />
（扇風機もUSB・・・卓上用だけど・・・）</p>

<h3>高度にUSB化された家電ライフを目指して</h3>
<p>たぶん、100円ショップとドンキホーテの企業努力で、今後とも我が家の電源は、ありとあらゆるものはUSB化されていくと思います。<br />
<p>engadgetで読んだけど、"<a href="http://japanese.engadget.com/2008/03/11/belkin-usb/">電源から直接USB</a>"とか"<a href="http://japanese.engadget.com/2009/07/23/49-usb/">49ポートUSB</a>"のが日本でもぞくぞく登場すると嬉しいなぁ。</p>
<p>今は、USBハブ(4ポートとか)をタコHub化してて使い勝手が悪い。</p>

<p>ってか、たぶん、正しいUSBHubはこんな感じだと思う<br />
<img src="http://blog.xole.net/resources/pic005.jpg" />
<br />
（家中のUSBメモリ指してみました(合計4+8+2GB)。思ったより少なかった）</p>

<p>では、Happy USB Life を！</p>
]]>
</content:encoded>
</item>
<item rdf:about="http://blog.xole.net/article.php?id=740">
<title>memcacheの cache の部分を java で</title>
<link>http://blog.xole.net/article.php?id=740</link>
<dc:date>2010-01-15T00:06:52+09:00</dc:date>
<description>指定時間後に消える（というか参照出来なくなる）をやってみることに。
DelayQueueとDelayedで実装してみる。

Cacheの部分

public class RetireCache implements Cache {...</description>
<content:encoded>
<![CDATA[
<p>指定時間後に消える（というか参照出来なくなる）をやってみることに。<br />
DelayQueueとDelayedで実装してみる。</p>

<p>Cacheの部分</p>
<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> RetireCache <span class="keyword" >implements</span> Cache {
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> Map&lt;String, PeriodEntry&gt; cache = <span class="keyword" >new</span> ConcurrentHashMap&lt;String, PeriodEntry&gt;();
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> DelayQueue&lt;PeriodEntry&gt; expiredQueue =  <span class="keyword" >new</span> DelayQueue&lt;PeriodEntry&gt;();

    <span class="keyword" >public</span> <span class="keyword" >boolean</span> set(String key, String value, <span class="keyword" >long</span> expiredAt) {
        check();
        <span class="keyword" >final</span> PeriodEntry entry = <span class="keyword" >new</span> PeriodEntry(key, value, expiredAt);
        cache.put(key, entry);
        <span class="keyword" >return</span> expiredQueue.add(entry);
    }

    <span class="keyword" >public</span> Entry get(String key) {
        <span class="keyword" >if</span>(!cache.containsKey(key)){
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
        PeriodEntry entry = cache.get(key);
        <span class="keyword" >if</span>(entry.isExpired()){
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
        <span class="keyword" >return</span> entry;
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> remove(String key) {
        remove(key, <span class="number" >0</span>);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> remove(String key, <span class="keyword" >long</span> expiredAt){
        PeriodEntry entry = cache.get(key);
        <span class="keyword" >if</span>(<span class="keyword" >null</span> != entry){
            entry.setExpiredAt(expiredAt);
        }
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> flush() {
        flush(<span class="number" >0</span>);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> flush(<span class="keyword" >long</span> expiredAt){
        Iterator&lt;Map.Entry&lt;String, PeriodEntry&gt;&gt; entries = cache.entrySet().iterator();
        <span class="keyword" >while</span>(entries.hasNext()){
            Map.Entry&lt;String, PeriodEntry&gt; entry = entries.next();
            entry.getValue().setExpiredAt(expiredAt);
        }
    }
    
    <span class="keyword" >protected</span> <span class="keyword" >void</span> check(){
        <span class="keyword" >synchronized</span>(<span class="keyword" >this</span>){
            PeriodEntry e = <span class="keyword" >null</span>;
            <span class="keyword" >while</span>((e = expiredQueue.poll()) != <span class="keyword" >null</span>){
                cache.remove(e.getKey());
            }
        }
    }
}</pre>

<p>mapにputするときは、同じentryのインスタンスをqueueにも入れておく<br />
getするときには、isExpiredを見て期限切れはnullを返す。</p>
<p>なんとなく、期限切れのqueueの参照にスレッドを立てるのもあれなので、setの時にentryの削除をやってみる</p>

<p>Entryの部分</p>

<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> PeriodEntry <span class="keyword" >implements</span> Entry, Delayed {
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> String key;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> String value;
    
    <span class="keyword" >protected</span> <span class="keyword" >long</span> expiredAt;
    
    <span class="keyword" >public</span> PeriodEntry(<span class="keyword" >final</span> String key, <span class="keyword" >final</span> String value, <span class="keyword" >final</span> <span class="keyword" >long</span> expiredAt){
        <span class="keyword" >this</span>.key = key;
        <span class="keyword" >this</span>.value = value;
        <span class="keyword" >if</span>(<span class="number" >0</span> == expiredAt){
            <span class="keyword" >this</span>.expiredAt = Long.MAX_VALUE;
        } <span class="keyword" >else</span> <span class="keyword" >if</span>(expiredAt &lt; <span class="number" >0</span>) {
            <span class="keyword" >this</span>.expiredAt = <span class="number" >0</span>;
        } <span class="keyword" >else</span> {
            <span class="keyword" >this</span>.expiredAt = relative(expiredAt);
        }
    }
    
    <span class="keyword" >protected</span> <span class="keyword" >static</span> <span class="keyword" >long</span> relative(<span class="keyword" >long</span> expiredAt){
        <span class="comment" >// 現在時間 + 指定時間</span>
        <span class="keyword" >return</span> System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt);
    }
    
    <span class="keyword" >protected</span> <span class="keyword" >static</span> <span class="keyword" >long</span> absolute(<span class="keyword" >long</span> expiredAt){
        <span class="keyword" >return</span> TimeUnit.SECONDS.toMillis(expiredAt);
    }

    <span class="keyword" >public</span> String getKey(){
        <span class="keyword" >return</span> key;
    }
    
    <span class="keyword" >public</span> String getValue(){
        <span class="keyword" >return</span> value;
    }
    
    <span class="keyword" >public</span> <span class="keyword" >void</span> setExpiredAt(<span class="keyword" >long</span> newValue){
        <span class="comment" >// 現在時間より次の期限を設定</span>
        <span class="keyword" >this</span>.expiredAt = relative(newValue);
    }

    <span class="keyword" >private</span> <span class="keyword" >long</span> elapsed(){
        <span class="keyword" >return</span> expiredAt - System.currentTimeMillis();
    }
    
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> isExpired(){
        <span class="keyword" >return</span> elapsed() &lt; <span class="number" >1</span>;
    }

    <span class="keyword" >public</span> <span class="keyword" >long</span> getDelay(TimeUnit unit) {
        <span class="comment" >// expiredの時間から経過時間を引き、残り時間を算出</span>
        <span class="keyword" >return</span> unit.convert(elapsed(), TimeUnit.MILLISECONDS);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >int</span> compareTo(Delayed o) {
        PeriodEntry target = (PeriodEntry) o;
        <span class="keyword" >if</span>(expiredAt &lt; target.expiredAt){
            <span class="keyword" >return</span> -<span class="number" >1</span>;
        }
        <span class="keyword" >if</span>(expiredAt &gt; target.expiredAt){
            <span class="keyword" >return</span> <span class="number" >1</span>;
        }
        <span class="keyword" >return</span> <span class="number" >0</span>;
    }

}</pre>

<p>entryは基本的に相対的な時間を設定するようにしてる以外は、普通のでDelayedな実装で。</p>

<p>テスト</p>

<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> RetireCacheTest {
    
    <span class="annotation" >@Test</span>
    <span class="keyword" >public</span> <span class="keyword" >void</span> 指定時間で参照出来なくなる(){
        Cache cache = <span class="keyword" >new</span> RetireCache();
        cache.set(<span class="string" >"hoge"</span>, <span class="string" >"hogeValue"</span>, <span class="number" >2</span>);
        Assert.assertEquals(cache.get(<span class="string" >"hoge"</span>).getValue(), <span class="string" >"hogeValue"</span>);
        <span class="keyword" >try</span> {
            TimeUnit.SECONDS.sleep(<span class="number" >2</span>);
        } <span class="keyword" >catch</span>(InterruptedException e){
        }
        Assert.assertNull(<span class="string" >"参照できなくなってる"</span>, cache.get(<span class="string" >"hoge"</span>));
    }
    
    <span class="annotation" >@Test</span>
    <span class="keyword" >public</span> <span class="keyword" >void</span> 指定時間後に消えてる(){
        Cache cache = <span class="keyword" >new</span> RetireCache();
        cache.set(<span class="string" >"hoge"</span>, <span class="string" >"hogeValue"</span>, <span class="number" >2</span>);
        <span class="keyword" >try</span> {
            TimeUnit.SECONDS.sleep(<span class="number" >1</span>);
        } <span class="keyword" >catch</span>(InterruptedException e){
        }
        Assert.assertEquals(cache.get(<span class="string" >"hoge"</span>).getValue(), <span class="string" >"hogeValue"</span>);
        cache.remove(<span class="string" >"hoge"</span>, <span class="number" >5</span>);
        <span class="keyword" >try</span> {
            TimeUnit.SECONDS.sleep(<span class="number" >1</span>);
        } <span class="keyword" >catch</span>(InterruptedException e){
        }
        Assert.assertEquals(<span class="string" >"これは消えてるのが正解?"</span>, cache.get(<span class="string" >"hoge"</span>).getValue(), <span class="string" >"hogeValue"</span>);
    }
    
    <span class="annotation" >@Test</span>
    <span class="keyword" >public</span> <span class="keyword" >void</span> ゼロを指定すると消えない(){
        Cache cache = <span class="keyword" >new</span> RetireCache();
        cache.set(<span class="string" >"hoge"</span>, <span class="string" >"hogeValue"</span>, <span class="number" >0</span>);
        cache.set(<span class="string" >"foo"</span>, <span class="string" >"fooValue"</span>, <span class="number" >1</span>);
        <span class="keyword" >try</span> {
            TimeUnit.SECONDS.sleep(<span class="number" >1</span>);
        } <span class="keyword" >catch</span>(InterruptedException e){
        }
        Assert.assertNull(<span class="string" >"これは1秒後消えてる"</span>, cache.get(<span class="string" >"foo"</span>));
        Assert.assertEquals(<span class="string" >"これは消えない"</span>, cache.get(<span class="string" >"hoge"</span>).getValue(), <span class="string" >"hogeValue"</span>);
    }

    <span class="annotation" >@Test</span>
    <span class="keyword" >public</span> <span class="keyword" >void</span> キーがない(){
        Cache cache = <span class="keyword" >new</span> RetireCache();
        cache.set(<span class="string" >"hoge"</span>, <span class="string" >"1"</span>, <span class="number" >0</span>);
        Assert.assertNull(cache.get(<span class="string" >"foo"</span>));
    }

    :
    : 省略
    :
}</pre>

<p>memcacheのdeleteと一時実装が違うかな。本物は、delete key time でtimeにどんな時間を指定してもすぐに参照出来なくなる</p>

<p>時間を指定しない場合</p>
<pre class="java">set hoge <span class="number" >0</span> <span class="number" >30</span> <span class="number" >4</span>
<span class="number" >1234</span>
STORED

delete hoge
DELETED

get hoge
END</pre>


<p>消える時間を指定しても...</p>
<pre class="java">set hoge <span class="number" >0</span> <span class="number" >30</span> <span class="number" >4</span>
<span class="number" >1234</span>
STORED

get hoge
VALUE hoge <span class="number" >0</span> <span class="number" >4</span>
<span class="number" >1234</span>
END

delete hoge <span class="number" >10000</span>
DELETED

get hoge
END
</pre>

<p>なかなか難しい<br />
次は LRU</p>
]]>
</content:encoded>
</item>
<item rdf:about="http://blog.xole.net/article.php?id=739">
<title>NIO にしたら爆速になった(was: なんちゃってmemcache互換サーバ)</title>
<link>http://blog.xole.net/article.php?id=739</link>
<dc:date>2010-01-13T02:37:24+09:00</dc:date>
<description>昨日の続き。
server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった

メイン

public class Server extends Thread {
    ...</description>
<content:encoded>
<![CDATA[
<p>昨日の続き。<br />
server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった</p>

<p>メイン</p>
<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> Server <span class="keyword" >extends</span> Thread {
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> BlockingQueue&lt;Socket&gt; accept = <span class="keyword" >new</span> LinkedBlockingQueue&lt;Socket&gt;();
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> ExecutorService acceptPool;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> Cache cache;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> <span class="keyword" >int</span> port;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> <span class="keyword" >int</span> maxConnection;
    
    <span class="keyword" >public</span> Server(<span class="keyword" >final</span> <span class="keyword" >int</span> port, <span class="keyword" >final</span> <span class="keyword" >int</span> maxConnection){
        <span class="keyword" >this</span>(port, maxConnection, ByteSizeUnit.Mega.toLong(<span class="number" >64</span>));
    }
    <span class="keyword" >public</span> Server(<span class="keyword" >final</span> <span class="keyword" >int</span> port, <span class="keyword" >final</span> <span class="keyword" >int</span> maxConnection, <span class="keyword" >final</span> <span class="keyword" >long</span> maxMemory){
        <span class="keyword" >this</span>.port = port;
        <span class="keyword" >this</span>.maxConnection = maxConnection;
        <span class="keyword" >this</span>.cache = <span class="keyword" >new</span> LRUCache(maxMemory);
        <span class="keyword" >this</span>.acceptPool = Executors.newFixedThreadPool(maxConnection);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >void</span> main(String...args){
        Server s = <span class="keyword" >new</span> Server(<span class="number" >12221</span>, <span class="number" >32</span>);
        s.start();
        <span class="keyword" >try</span> {
            s.join();
        } <span class="keyword" >catch</span>(InterruptedException e){
            e.printStackTrace(System.err);
        }
    }

    <span class="keyword" >public</span> <span class="keyword" >void</span> run(){
        ServerSocketChannel channel = <span class="keyword" >null</span>;
        <span class="keyword" >try</span> {
            channel = ServerSocketChannel.open();
            channel.configureBlocking(<span class="keyword" >false</span>);
            
            <span class="keyword" >final</span> ServerSocket serverSocket = channel.socket();
            serverSocket.setReuseAddress(<span class="keyword" >true</span>);
            serverSocket.bind(<span class="keyword" >new</span> InetSocketAddress(port));
            
            <span class="keyword" >final</span> Selector selector = Selector.open();
            <span class="keyword" >try</span> {
                channel.register(selector, SelectionKey.OP_ACCEPT, <span class="keyword" >new</span> AcceptAction(cache));
                
                <span class="keyword" >while</span>(<span class="number" >0</span> &lt; selector.select()){
                    Iterator&lt;SelectionKey&gt; keys = selector.selectedKeys().iterator();
                    <span class="keyword" >while</span>(keys.hasNext()){
                        <span class="keyword" >final</span> SelectionKey key = keys.next();
                        keys.remove();
                        
                        <span class="keyword" >if</span>(!key.isValid()){
                            <span class="keyword" >continue</span>;
                        }
                        
                        Action action = (Action) key.attachment();
                        action.execute(key);
                    }
                }
            } <span class="keyword" >finally</span> {
                Iterator&lt;SelectionKey&gt; keys = selector.selectedKeys().iterator();
                <span class="keyword" >while</span>(keys.hasNext()){
                    <span class="keyword" >final</span> SelectionKey key = keys.next();
                    key.channel().close();
                }
            }
        } <span class="keyword" >catch</span> (UnknownHostException e) {
            e.printStackTrace();
        } <span class="keyword" >catch</span> (IOException e) {
            e.printStackTrace();
        } <span class="keyword" >finally</span> {
            <span class="keyword" >if</span>(<span class="keyword" >null</span> != channel){
                <span class="keyword" >try</span> {
                    channel.close();
                } <span class="keyword" >catch</span>(IOException e){
                    e.printStackTrace();
                }
            }
        }
    }
}</pre>

<p>新規接続を受け入れるAccept部分</p>

<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> AcceptAction <span class="keyword" >implements</span> Action {
    
    <span class="keyword" >private</span> <span class="keyword" >final</span> Cache cache;
    
    <span class="keyword" >public</span> AcceptAction(<span class="keyword" >final</span> Cache cache){
        <span class="keyword" >this</span>.cache = cache;
    }

    <span class="keyword" >public</span> <span class="keyword" >void</span> execute(SelectionKey selectionKey) {
        SocketChannel channel = <span class="keyword" >null</span>;
        <span class="keyword" >try</span> {
            channel = ((ServerSocketChannel) selectionKey.channel()).accept();
            channel.configureBlocking(<span class="keyword" >false</span>);

            ReadAction action = <span class="keyword" >new</span> ReadAction(cache, <span class="keyword" >new</span> ByteBufferReader(channel));
            channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
        } <span class="keyword" >catch</span> (IOException e) {
            e.printStackTrace();
        }
    }
}</pre>

<p>んで、読み込みと書き込みは分けた</p>

<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> ReadAction <span class="keyword" >implements</span> Action {
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> Cache cache;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> Reader reader;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> Handler handler;
    
    <span class="keyword" >public</span> ReadAction(<span class="keyword" >final</span> Cache cache, <span class="keyword" >final</span> Reader reader){
        <span class="keyword" >this</span>.cache = cache;
        <span class="keyword" >this</span>.reader = reader;
        <span class="keyword" >this</span>.handler = <span class="keyword" >new</span> Handler(cache, reader);
    }

    <span class="keyword" >public</span> <span class="keyword" >void</span> execute(SelectionKey selectionKey) {
        <span class="keyword" >if</span>(!selectionKey.isReadable()){
            <span class="keyword" >return</span>;
        }
        
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        <span class="keyword" >try</span> {
            <span class="keyword" >if</span>(!reader.readable()){
                channel.close();
                selectionKey.cancel();
                <span class="keyword" >return</span>;
            }
            String line = reader.readLine();
            <span class="keyword" >if</span>(line == <span class="keyword" >null</span>){
                channel.close();
                selectionKey.cancel();
                <span class="keyword" >return</span>;
            }
            Return r = handler.execute(line);

            WriteAction action = <span class="keyword" >new</span> WriteAction(r, <span class="keyword" >this</span>);
            channel.register(selectionKey.selector(), SelectionKey.OP_WRITE, action);
        } <span class="keyword" >catch</span>(IOException e){
            e.printStackTrace();
        }
    }

    <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >class</span> Handler <span class="keyword" >implements</span> CommandVisitor {
        <span class="keyword" >private</span> <span class="keyword" >final</span> Cache cache;
        <span class="keyword" >private</span> <span class="keyword" >final</span> Reader reader;
        <span class="keyword" >private</span> Handler(<span class="keyword" >final</span> Cache cache, <span class="keyword" >final</span> Reader reader){
            <span class="keyword" >this</span>.cache = cache;
            <span class="keyword" >this</span>.reader = reader;
        }
        <span class="keyword" >public</span> Return execute(String line){
            <span class="keyword" >try</span> {
                <span class="keyword" >final</span> StringReader r = <span class="keyword" >new</span> StringReader(line);
                <span class="keyword" >final</span> MemcacheParser parser = <span class="keyword" >new</span> MemcacheParser(r);
                
                Command command = parser.Command();
                <span class="keyword" >return</span> command.accept(<span class="keyword" >this</span>, <span class="keyword" >null</span>);
            } <span class="keyword" >catch</span>(ParseException e){
                e.printStackTrace();
                <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.ERROR);
            } <span class="keyword" >catch</span>(Exception e){
                e.printStackTrace();
                <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.SERVER_ERROR, e.getMessage());
            }
        }
        :
        : <span class="comment" >// 省略</span>
        :    
        <span class="keyword" >public</span> Return visit(RetrievalCommand command, Parameter parameter) {
            <span class="keyword" >final</span> List&lt;String&gt; keys = command.getKeys();
            <span class="keyword" >final</span> List&lt;Return&gt; returns = <span class="keyword" >new</span> ArrayList&lt;Return&gt;();
            <span class="keyword" >for</span>(<span class="keyword" >final</span> String key: keys){
                Entry entry = cache.get(key);
                <span class="keyword" >if</span>(<span class="keyword" >null</span> == entry){
                    returns.add(<span class="keyword" >new</span> Return(ResponseType.END));
                    <span class="keyword" >continue</span>;
                }
                returns.add(<span class="keyword" >new</span> Return(ResponseType.SEND_VALUE,
                    key, entry.getFlag(), entry.getLength(), entry.getValue()
                ));
            }
            <span class="keyword" >return</span> <span class="keyword" >new</span> Return(returns.toArray(<span class="keyword" >new</span> Return[returns.size()]));
        }
    
        <span class="keyword" >public</span> Return visit(SetCommand command, Parameter parameter) {
            <span class="keyword" >try</span> {
                <span class="keyword" >final</span> String nextLine = reader.readLine();
                <span class="keyword" >if</span>(cache.set(command.getKey(), nextLine, command.getFlags(), command.getExpTime())){
                    <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.STORED);
                }
                <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.NOT_STORED);
            } <span class="keyword" >catch</span>(IOException e){
                e.printStackTrace();
                <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.ERROR);
            }
        }
    
        <span class="keyword" >public</span> Return visit(VersionCommand command, Parameter parameter) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
    }

}</pre>

<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> WriteAction <span class="keyword" >implements</span> Action {
    
    <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >final</span> Charset ASCII = Charset.forName(<span class="string" >"US-ASCII"</span>); 
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> Return ret;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> ReadAction action;
    
    <span class="keyword" >public</span> WriteAction(<span class="keyword" >final</span> Return ret, ReadAction action){
        <span class="keyword" >this</span>.ret = ret;
        <span class="keyword" >this</span>.action = action;
    }

    <span class="keyword" >public</span> <span class="keyword" >void</span> execute(SelectionKey selectionKey) {
        <span class="keyword" >if</span>(!selectionKey.isWritable()){
            <span class="keyword" >return</span>;
        }
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        
        <span class="keyword" >final</span> CharBuffer buf = CharBuffer.wrap(ret.renderMessage());
        <span class="keyword" >try</span> {
            ByteBuffer bytes = ASCII.encode(buf);
            <span class="keyword" >int</span> capacity = bytes.capacity();
            <span class="keyword" >int</span> size = channel.write(bytes);
            <span class="keyword" >if</span>(size &lt; capacity){
                <span class="keyword" >int</span> sum = size;
                <span class="keyword" >while</span>(sum &lt; capacity){
                    <span class="keyword" >if</span>(size == <span class="number" >0</span>){
                        <span class="keyword" >return</span>;
                    }
                    bytes.position(sum);
                    size = channel.write(bytes);
                    sum += size;
                }
            }
            <span class="keyword" >try</span> {
                channel.register(selectionKey.selector(), SelectionKey.OP_READ, action);
            } <span class="keyword" >catch</span>(ClosedChannelException e){
                e.printStackTrace();
            }
        } <span class="keyword" >catch</span>(IOException e){
            e.printStackTrace();
        }
    }
}</pre>

<p>ということで、これをつかって簡単に比較すると</p>

<pre class="php">
<span class="vars" >$target</span> = <span class="keyword" >array</span>(
    <span class="keyword" >array</span>(<span class="string" >'host'</span> =&gt; <span class="string" >'localhost'</span>, <span class="string" >'port'</span> =&gt; 11211),
    <span class="keyword" >array</span>(<span class="string" >'host'</span> =&gt; <span class="string" >'localhost'</span>, <span class="string" >'port'</span> =&gt; 12221)
);
<span class="keyword" >foreach</span>(<span class="vars" >$target</span> <span class="keyword" >as</span> <span class="vars" >$t</span>){
    <span class="vars" >$memcache</span> = <span class="keyword" >new</span> Memcache;
    <span class="vars" >$memcache</span>-&gt;connect(<span class="vars" >$t</span>[<span class="string" >'host'</span>], <span class="vars" >$t</span>[<span class="string" >'port'</span>]);

    <span class="vars" >$fail</span> = 0;
    <span class="vars" >$elapsed</span> = microtime(true);
    <span class="keyword" >for</span>(<span class="vars" >$i</span> = 0; <span class="vars" >$i</span> &lt; 1000; ++<span class="vars" >$i</span>){
        <span class="keyword" >if</span>(false === <span class="vars" >$memcache</span>-&gt;set(<span class="string" >'hoge'</span>, <span class="string" >'1234'</span>)){
            <span class="func" >echo</span> <span class="string" >'ERROR!!'</span>, PHP_EOL;
        }
        <span class="keyword" >if</span>(false === <span class="vars" >$memcache</span>-&gt;set(<span class="string" >'hoge'</span>, <span class="string" >'123'</span>)){
            <span class="func" >echo</span> <span class="string" >'ERROR!!!'</span>, PHP_EOL;
        }
        <span class="vars" >$value</span> = <span class="vars" >$memcache</span>-&gt;get(<span class="string" >'hoge'</span>);
        <span class="keyword" >if</span>(false === <span class="vars" >$memcache</span>-&gt;set(<span class="string" >'hoge'</span>, <span class="vars" >$value</span> + 1)){
            <span class="func" >echo</span> <span class="string" >'ERRROR!!!!'</span>, PHP_EOL;
        }
        <span class="vars" >$result</span> = <span class="vars" >$memcache</span>-&gt;get(<span class="string" >'hoge'</span>);
        <span class="keyword" >if</span>(<span class="vars" >$result</span> != 124){
            <span class="vars" >$fail</span>++;
        }
    }
    <span class="func" >echo</span> <span class="string" >'target host =&gt; '</span>, <span class="vars" >$t</span>[<span class="string" >'host'</span>], <span class="string" >' port =&gt;'</span>, <span class="vars" >$t</span>[<span class="string" >'port'</span>], PHP_EOL;
    <span class="func" >echo</span> <span class="string" >'elapsed: '</span>, (microtime(true) - <span class="vars" >$elapsed</span>), PHP_EOL;
    <span class="func" >echo</span> <span class="string" >'fail =&gt; '</span>, <span class="vars" >$fail</span>, PHP_EOL;
}
</pre>

<pre class="php">1)
target host =&gt; localhost port =&gt;11211
elapsed: 0.66518497467
fail =&gt; 0
target host =&gt; localhost port =&gt;12221
elapsed: 0.870754003525
fail =&gt; 0

2)
target host =&gt; localhost port =&gt;11211
elapsed: 1.1857790947
fail =&gt; 0
target host =&gt; localhost port =&gt;12221
elapsed: 0.868647098541
fail =&gt; 0
</pre>

<p>memcacheに匹敵してきた。</p>

<p>ってか、ByteBufferを読み書きするのは初めて書いたので、すごく手こずった。。<br />
単純に \r\n までの一行を読みたいだけなのに。。<br />
ということで、BufferedReaderのreadLine的なのを書いて色々とお茶を濁す。。</p>

<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> ByteBufferReader {
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> SocketChannel channel;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> ByteBuffer buffer;
    
    <span class="keyword" >public</span> ByteBufferReader(<span class="keyword" >final</span> SocketChannel channel){
        <span class="keyword" >this</span>.channel = channel;
        <span class="keyword" >this</span>.buffer = ByteBuffer.allocateDirect(<span class="number" >512</span>);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >boolean</span> readable() <span class="keyword" >throws</span> IOException {
        <span class="keyword" >return</span> buffer.hasRemaining();
    }
    
    <span class="keyword" >public</span> String readLine() <span class="keyword" >throws</span> IOException {
        <span class="keyword" >final</span> StringBuilder sb = <span class="keyword" >new</span> StringBuilder();
        readInto(sb);
        <span class="keyword" >if</span>(sb.length() &lt; <span class="number" >1</span>){
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
        <span class="keyword" >return</span> sb.toString();
    }
    
    <span class="keyword" >private</span> <span class="keyword" >void</span> readInto(<span class="keyword" >final</span> StringBuilder sb) <span class="keyword" >throws</span> IOException {
        <span class="keyword" >while</span>(<span class="keyword" >true</span>){
            <span class="keyword" >int</span> read = channel.read(buffer);
            <span class="keyword" >if</span>(read == <span class="number" >0</span>){
                <span class="keyword" >break</span>;
            }
            <span class="keyword" >if</span>(read == -<span class="number" >1</span>){
                <span class="keyword" >return</span>;
            }
        }
        buffer.flip();
        <span class="keyword" >while</span>(more(sb));
        buffer.compact();
    }
    
    <span class="keyword" >private</span> <span class="keyword" >boolean</span> more(<span class="keyword" >final</span> StringBuilder sb) <span class="keyword" >throws</span> IOException {
        <span class="keyword" >if</span>(buffer.hasRemaining()){
            <span class="keyword" >char</span> ch = (<span class="keyword" >char</span>) buffer.get();
            <span class="keyword" >switch</span>(ch){
            <span class="keyword" >case</span> <span class="string" >'\n'</span>:
                <span class="keyword" >return</span> <span class="keyword" >false</span>;
            <span class="keyword" >case</span> <span class="string" >'\r'</span>:
                <span class="keyword" >return</span> <span class="keyword" >true</span>;
            <span class="keyword" >default</span>:
                sb.append(ch);
                <span class="keyword" >return</span> <span class="keyword" >true</span>;
            }
        }
        buffer.clear();
        <span class="keyword" >return</span> <span class="keyword" >true</span>;
    }

}</pre>

<p>とりあえず、一段落<br />
NIOは難しかった。</p>
]]>
</content:encoded>
</item>
<item rdf:about="http://blog.xole.net/article.php?id=738">
<title>JavaCC で memcache text protocol の BNF(と、なんちゃってmemcache互換サーバ)</title>
<link>http://blog.xole.net/article.php?id=738</link>
<dc:date>2010-01-11T23:39:32+09:00</dc:date>
<description>ANTLRのやつがあったけど、JavaCCのがみつからなかったので、でっちあげた
via - http://harward.us/~nharward/antlr/memcached_protocol.g


できあがったのは、こんな...</description>
<content:encoded>
<![CDATA[
<p>ANTLRのやつがあったけど、JavaCCのがみつからなかったので、でっちあげた<br />
via - <a href="http://harward.us/~nharward/antlr/memcached_protocol.g">http://harward.us/~nharward/antlr/memcached_protocol.g</a>
</p>

<p>できあがったのは、こんな感じ</p>

<pre class="javascript">SKIP: {
    <span class="string" >" "</span> | <span class="string" >"\t"</span> | <span class="string" >"\r"</span> | <span class="string" >"\n"</span>
}
TOKEN: {
    &lt; NUMBER: [<span class="string" >"1"</span>-<span class="string" >"9"</span>] ([<span class="string" >"0"</span>-<span class="string" >"9"</span>])* | <span class="string" >"0"</span> &gt;
  | &lt; FLAGS: &lt; NUMBER &gt; &gt;
  | &lt; TIME: &lt; NUMBER &gt;  &gt;
  | &lt; LENGTH: &lt; NUMBER &gt; &gt;
  | &lt; CREMENT_VALUE: &lt; NUMBER &gt; &gt;
  | &lt; CAS_UNIQUE: &lt; NUMBER &gt; &gt;
}
TOKEN: {
  &lt; SET_STATEMENT: <span class="string" >"set"</span> &gt;
  | &lt; ADD_STATEMENT: <span class="string" >"add"</span> &gt;
  | &lt; REPLACE_STATEMENT: <span class="string" >"replace"</span> &gt;
  | &lt; APPEND_STATEMENT: <span class="string" >"append"</span> &gt;
  | &lt; PREPEND_STATEMENT: <span class="string" >"prepend"</span> &gt;
  | &lt; CAS_STATEMENT: <span class="string" >"cas"</span> &gt;
  | &lt; STORAGE_STATEMENT:
        &lt; SET_STATEMENT &gt;
        | &lt; ADD_STATEMENT &gt;
        | &lt; REPLACE_STATEMENT &gt;
        | &lt; APPEND_STATEMENT &gt;
        | &lt; PREPEND_STATEMENT &gt;
    &gt;
  | &lt; STORAGE_COMMAND:
        (
          &lt; STORAGE_STATEMENT &gt; &lt; KEY &gt; &lt; FLAGS &gt; &lt; TIME &gt; &lt; LENGTH &gt;
          | &lt; CAS_STATEMENT &gt; &lt; KEY &gt; &lt; FLAGS &gt; &lt; TIME &gt; &lt; LENGTH &gt; &lt; CAS_UNIQUE &gt;
        )
        (&lt; NOREPLY &gt;)?
    &gt;
}
TOKEN: {
  &lt; RETRIEVAL_STATEMENT: <span class="string" >"get"</span> | <span class="string" >"gets"</span> &gt;
  | &lt; RETRIEVAL_COMMAND:
        &lt; RETRIEVAL_STATEMENT &gt; &lt; KEY &gt;
    &gt;
}
TOKEN: {
  &lt; DELETE_STATEMENT: <span class="string" >"delete"</span> &gt;
  | &lt; DELETE_COMMAND:
        &lt; DELETE_STATEMENT &gt; &lt; KEY &gt; (&lt; TIME &gt;)? (&lt; NOREPLY &gt;)?
    &gt;
}
TOKEN: {
  &lt; INCREMENT_STATEMENT: <span class="string" >"incr"</span> &gt;
  | &lt; INCREMENT_COMMAND:
        &lt; INCREMENT_STATEMENT &gt; &lt; KEY &gt; &lt; CREMENT_VALUE &gt; (&lt; NOREPLY &gt;)?
    &gt;
}
TOKEN: {
  &lt; DECREMENT_STATEMENT: <span class="string" >"decr"</span> &gt;
  | &lt; DECREMENT_COMMAND:
        &lt; DECREMENT_STATEMENT &gt; &lt; KEY &gt; &lt; CREMENT_VALUE &gt; (&lt; NOREPLY &gt;)?
    &gt;
}
TOKEN: {
  &lt; STATISTICS_STATEMENT: <span class="string" >"STAT"</span> &gt;
  | &lt; STATISTICS_OPTION: <span class="string" >"items"</span> | <span class="string" >"slabs"</span> | <span class="string" >"sizes"</span> &gt;
  | &lt; STATISTICS_COMMAND:
        &lt; STATISTICS_STATEMENT &gt; (&lt; STATISTICS_OPTION &gt;)?
    &gt;
}
TOKEN: {
  &lt; FLUSH_STATEMENT: <span class="string" >"flush_all"</span> &gt;
  | &lt; FLUSH_COMMAND:
        &lt; FLUSH_STATEMENT &gt; (&lt; TIME &gt;)? (&lt; NOREPLY &gt;)?
    &gt;
}
TOKEN: {
  &lt; VERSION_STATEMENT: <span class="string" >"version"</span> &gt;
  | &lt; VERSION_COMMAND:
        &lt; VERSION_STATEMENT &gt;
    &gt;
}
TOKEN: {
    &lt; NOREPLY: <span class="string" >"noreply"</span> &gt;
}
<span class="comment" >// last match</span>
TOKEN: {
    &lt; KEY: (~[<span class="string" >" "</span>, <span class="string" >"\r"</span>,<span class="string" >"\n"</span>])+ &gt;
}</pre>

<p>少し、数値まわりのToken(TIME, LENGTHとか)が適当すぎるかな。</p>

<p>んで、これにテキトーなNodeをparseしてあげてみる</p>

<pre class="java">Command Command():
{
  Command command;
}
{
  (
    command = RetrievalCommand()
  | command = StorageCommand()
  | command = DeleteCommand()
  | command = VersionCommand()
  )
  {
    <span class="keyword" >return</span> command;
  }
}
StorageCommand StorageCommand():
{
  StorageCommand command;
  String key;
  Long flags = 0L;
  Long time = 0L;
  Long length = 0L;
  Boolean noreply = Boolean.FALSE;
}
{
  command = createStorageCommand()
  key = Key()
  flags = Flags()
  time = Time()
  length = Length()
  noreply = Noreply()
  {
    command.setNode(jjtThis);
    command.setKey(key);
    command.setFlags(flags);
    command.setExpTime(time);
    command.setLength(length);
    command.setNoreply(noreply);
    <span class="keyword" >return</span> command;
  }
}
StorageCommand createStorageCommand():
{}
{
  (
    &lt; SET_STATEMENT &gt;
    {
      <span class="keyword" >return</span> <span class="keyword" >new</span> SetCommand();
    }
    | &lt; ADD_STATEMENT &gt;
    {
      <span class="keyword" >return</span> <span class="keyword" >new</span> AddCommand();
    }
    | &lt; REPLACE_STATEMENT &gt;
    {
      <span class="keyword" >return</span> <span class="keyword" >new</span> ReplaceCommand();
    }
    | &lt; APPEND_STATEMENT &gt;
    {
      <span class="keyword" >return</span> <span class="keyword" >new</span> AppendCommand();
    }
    | &lt; PREPEND_STATEMENT &gt;
    {
      <span class="keyword" >return</span> <span class="keyword" >new</span> PrependCommand();
    }
  )
}

RetrievalCommand RetrievalCommand():
{
  RetrievalCommand command = <span class="keyword" >new</span> RetrievalCommand();
  String key;
}
{
  &lt; RETRIEVAL_STATEMENT &gt;
  (
    key = Key()
    {
      command.addKey(key);
    }
  )+
  {
    command.setNode(jjtThis);
    <span class="keyword" >return</span> command;
  }
}
DeleteCommand DeleteCommand():
{
  DeleteCommand command = <span class="keyword" >new</span> DeleteCommand();
  String key;
  Long time = 0L;
  Boolean noreply = Boolean.FALSE;
}
{
  &lt; DELETE_STATEMENT &gt;
  key = Key()
  time = Time()
  noreply = Noreply()
  {
    command.setNode(jjtThis);
    command.setKey(key);
    command.setExpTime(time);
    command.setNoreply(noreply);
    <span class="keyword" >return</span> command;
  }
}
VersionCommand VersionCommand():
{}
{
  &lt; VERSION_STATEMENT &gt;
  {
    VersionCommand command = <span class="keyword" >new</span> VersionCommand();
    command.setNode(jjtThis);
    <span class="keyword" >return</span> command;
  }
}

String Key():
{ Token key; }
{
  key = &lt; KEY &gt;
  {
    <span class="keyword" >return</span> key.image;
  }
}

Long Flags():
{ Token flags; }
{
  flags = &lt; NUMBER &gt;
  {
    <span class="keyword" >return</span> Long.valueOf(flags.image);
  }
}

Long Time():
{ Token time; }
{
  time = &lt; NUMBER &gt;
  {
    <span class="keyword" >return</span> Long.valueOf(time.image);
  }
}

Long Length():
{ Token length; }
{
  length = &lt; NUMBER &gt;
  {
    <span class="keyword" >return</span> Long.valueOf(length.image);
  }
}

Boolean Noreply():
{ Boolean noreply = Boolean.FALSE; }
{
  [&lt; NOREPLY &gt;{noreply = Boolean.TRUE;}]
  {
    <span class="keyword" >return</span> noreply;
  }
}</pre>

<p>これに、適当なコードを投げてあげると</p>

<pre class="java">{
    StringReader reader = <span class="keyword" >new</span> StringReader(<span class="string" >"get hoge\r\n"</span>);
    MemcacheParser parser = <span class="keyword" >new</span> MemcacheParser(reader);
    <span class="keyword" >try</span> {
        parser.Command();
    } <span class="keyword" >catch</span> (ParseException e) {
        e.printStackTrace();
    }
}
{
    StringReader reader = <span class="keyword" >new</span> StringReader(<span class="string" >"gets hoge foo\r\n"</span>);
    MemcacheParser parser = <span class="keyword" >new</span> MemcacheParser(reader);
    <span class="keyword" >try</span> {
        parser.Command();
    } <span class="keyword" >catch</span> (ParseException e) {
        e.printStackTrace();
    }
}
{
    StringReader reader = <span class="keyword" >new</span> StringReader(<span class="string" >"set xyzkey 0 0 6\r\n"</span>);
    MemcacheParser parser = <span class="keyword" >new</span> MemcacheParser(reader);
    <span class="keyword" >try</span> {
        parser.Command();
    } <span class="keyword" >catch</span> (ParseException e) {
        e.printStackTrace();
    }
}</pre>

<pre class="java">Call:   Command
  Call:   RetrievalCommand
    Consumed token: &lt;&lt;RETRIEVAL_STATEMENT&gt;: <span class="string" >"get"</span> at line <span class="number" >1</span> column <span class="number" >1</span>&gt;
    Call:   Key
      Consumed token: &lt;&lt;KEY&gt;: <span class="string" >"hoge"</span> at line <span class="number" >1</span> column <span class="number" >5</span>&gt;
    Return: Key
  Return: RetrievalCommand
Return: Command
Call:   Command
  Call:   RetrievalCommand
    Consumed token: &lt;&lt;RETRIEVAL_STATEMENT&gt;: <span class="string" >"gets"</span> at line <span class="number" >1</span> column <span class="number" >1</span>&gt;
    Call:   Key
      Consumed token: &lt;&lt;KEY&gt;: <span class="string" >"hoge"</span> at line <span class="number" >1</span> column <span class="number" >6</span>&gt;
    Return: Key
    Call:   Key
      Consumed token: &lt;&lt;KEY&gt;: <span class="string" >"foo"</span> at line <span class="number" >1</span> column <span class="number" >11</span>&gt;
    Return: Key
  Return: RetrievalCommand
Return: Command
Call:   Command
  Call:   StorageCommand
    Call:   createStorageCommand
      Consumed token: &lt;<span class="string" >"set"</span> at line <span class="number" >1</span> column <span class="number" >1</span>&gt;
    Return: createStorageCommand
    Call:   Key
      Consumed token: &lt;&lt;KEY&gt;: <span class="string" >"xyzkey"</span> at line <span class="number" >1</span> column <span class="number" >5</span>&gt;
    Return: Key
    Call:   Flags
      Consumed token: &lt;&lt;NUMBER&gt;: <span class="string" >"0"</span> at line <span class="number" >1</span> column <span class="number" >12</span>&gt;
    Return: Flags
    Call:   Time
      Consumed token: &lt;&lt;NUMBER&gt;: <span class="string" >"0"</span> at line <span class="number" >1</span> column <span class="number" >14</span>&gt;
    Return: Time
    Call:   Length
      Consumed token: &lt;&lt;NUMBER&gt;: <span class="string" >"6"</span> at line <span class="number" >1</span> column <span class="number" >16</span>&gt;
    Return: Length
    Call:   Noreply
    Return: Noreply
  Return: StorageCommand
Return: Command</pre>

<p>と、こんな感じになる。<br />
&lt;NUMBER&gt;とか、ホント、マジメにtokenが書けてないですね。。</p>

<p>ここまでできたので、set と get しかない、memcached 互換ものをでっち上げてみた</p>

<pre class="java">
<span class="keyword" >public</span> <span class="keyword" >class</span> Server <span class="keyword" >extends</span> Thread {
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> BlockingQueue&lt;Socket&gt; accept = <span class="keyword" >new</span> LinkedBlockingQueue&lt;Socket&gt;();
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> Cache&lt;String, String&gt; cache = <span class="keyword" >new</span> LRUCache&lt;String, String&gt;();

    <span class="keyword" >protected</span> <span class="keyword" >final</span> ExecutorService acceptPool;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> <span class="keyword" >int</span> port;
    
    <span class="keyword" >protected</span> <span class="keyword" >final</span> <span class="keyword" >int</span> maxConnection;
    
    <span class="keyword" >public</span> Server(<span class="keyword" >final</span> <span class="keyword" >int</span> port, <span class="keyword" >final</span> <span class="keyword" >int</span> maxConnection){
        <span class="keyword" >this</span>.port = port;
        <span class="keyword" >this</span>.maxConnection = maxConnection;
        <span class="keyword" >this</span>.acceptPool = Executors.newFixedThreadPool(maxConnection);
    }
    
    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >void</span> main(String...args){
        Server s = <span class="keyword" >new</span> Server(<span class="number" >12221</span>, <span class="number" >32</span>);
        s.start();
        
        <span class="keyword" >while</span>(s.isAlive()){
            <span class="keyword" >try</span> {
                TimeUnit.MICROSECONDS.sleep(<span class="number" >10</span>);
            } <span class="keyword" >catch</span>(InterruptedException e){}
        }
    }

    <span class="keyword" >public</span> <span class="keyword" >void</span> run(){
        <span class="keyword" >try</span> {
            ServerSocketFactory factory = ServerSocketFactory.getDefault();
            ServerSocket socket = factory.createServerSocket(port, maxConnection);
            socket.setReuseAddress(<span class="keyword" >true</span>);
            
            <span class="keyword" >while</span>(!socket.isClosed()){
                <span class="keyword" >final</span> Socket accept = socket.accept();
                acceptPool.execute(<span class="keyword" >new</span> AcceptHandler(accept));
            }
        } <span class="keyword" >catch</span> (UnknownHostException e) {
            e.printStackTrace();
        } <span class="keyword" >catch</span> (IOException e) {
            e.printStackTrace();
        }
    }
    
    <span class="keyword" >protected</span> <span class="keyword" >class</span> AcceptHandler <span class="keyword" >implements</span> Runnable {
        <span class="keyword" >private</span> <span class="keyword" >final</span> Socket socket;
        <span class="keyword" >public</span> AcceptHandler(<span class="keyword" >final</span> Socket socket){
            <span class="keyword" >this</span>.socket = socket;
        }
        <span class="keyword" >public</span> <span class="keyword" >void</span> run(){
            <span class="keyword" >try</span> {
                <span class="keyword" >final</span> InputStream in = socket.getInputStream();
                <span class="keyword" >final</span> OutputStream out = socket.getOutputStream();
                
                <span class="keyword" >final</span> BufferedReader reader = <span class="keyword" >new</span> BufferedReader(<span class="keyword" >new</span> InputStreamReader(in));
                <span class="keyword" >final</span> DataOutputStream writer = <span class="keyword" >new</span> DataOutputStream(out);
                <span class="keyword" >final</span> CommandWorker worker = <span class="keyword" >new</span> CommandWorker(reader);
                <span class="keyword" >while</span>(!socket.isClosed()){
                    <span class="keyword" >if</span>(!worker.prepare()){
                        <span class="keyword" >break</span>;
                    }
                    Return r = worker.call();
                    writer.writeBytes(r.renderMessage());
                }
            } <span class="keyword" >catch</span>(IOException e){
                e.printStackTrace();
            } <span class="keyword" >finally</span> {
                <span class="keyword" >try</span> {
                    socket.close();
                } <span class="keyword" >catch</span>(IOException e){
                    <span class="comment" >// nop</span>
                }
            }
        }
    }
    <span class="keyword" >private</span> <span class="keyword" >class</span> CommandWorker <span class="keyword" >implements</span> CommandVisitor {
        <span class="keyword" >private</span> <span class="keyword" >final</span> BufferedReader reader;
        <span class="keyword" >private</span> String currentLine;
        <span class="keyword" >public</span> CommandWorker(<span class="keyword" >final</span> BufferedReader reader) {
            <span class="keyword" >this</span>.reader = reader;
        }
        
        <span class="keyword" >public</span> <span class="keyword" >boolean</span> prepare(){
            <span class="keyword" >try</span> {
                String line = reader.readLine();
                <span class="keyword" >if</span>(<span class="keyword" >null</span> == line){
                    <span class="keyword" >return</span> <span class="keyword" >false</span>;
                }
                currentLine = line;
                <span class="keyword" >return</span> <span class="keyword" >true</span>;
            } <span class="keyword" >catch</span>(IOException e){
                <span class="keyword" >return</span> <span class="keyword" >false</span>;
            }
        }
        
        <span class="keyword" >public</span> Return call() {
            <span class="keyword" >try</span> {
                <span class="keyword" >final</span> StringReader r = <span class="keyword" >new</span> StringReader(currentLine);
                <span class="keyword" >final</span> MemcacheParser parser = <span class="keyword" >new</span> MemcacheParser(r);
                
                Command command = parser.Command();
                <span class="keyword" >return</span> command.accept(<span class="keyword" >this</span>, <span class="keyword" >null</span>);
            } <span class="keyword" >catch</span>(ParseException e){
                <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.ERROR);
            }
        }
        
        <span class="keyword" >public</span> Return visit(Command command, Parameter parameter) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
    
        <span class="keyword" >public</span> Return visit(AddCommand command, Parameter parameter) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
    
        <span class="keyword" >public</span> Return visit(AppendCommand command, Parameter parameter) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
    
        <span class="keyword" >public</span> Return visit(CasCommand command, Parameter parameter) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
    
        <span class="keyword" >public</span> Return visit(DeleteCommand command, Parameter parameter) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
    
        <span class="keyword" >public</span> Return visit(PrependCommand command, Parameter parameter) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
    
        <span class="keyword" >public</span> Return visit(ReplaceCommand command, Parameter parameter) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
    
        <span class="keyword" >public</span> Return visit(RetrievalCommand command, Parameter parameter) {
            String value = cache.get(command.getKeys().get(<span class="number" >0</span>));
            <span class="keyword" >if</span>(<span class="keyword" >null</span> == value){
                <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.END);
            }
            <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.SEND_VALUE,
                command.getKeys().get(<span class="number" >0</span>), <span class="number" >0</span>, value.length(),
                value
            );
        }
    
        <span class="keyword" >public</span> Return visit(SetCommand command, Parameter parameter) {
            <span class="keyword" >try</span> {
                <span class="keyword" >final</span> String nextLine = reader.readLine();
                cache.put(command.getKey(), nextLine, command.getExpTime().longValue());
                <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.STORED);
            } <span class="keyword" >catch</span>(IOException e){
                e.printStackTrace();
                <span class="keyword" >return</span> <span class="keyword" >new</span> Return(ResponseType.ERROR);
            }
        }
    
        <span class="keyword" >public</span> Return visit(VersionCommand command, Parameter parameter) {
            <span class="keyword" >return</span> <span class="keyword" >null</span>;
        }
    }
}</pre>

<p>見事に、setとgetしか実装してません。しかもハンドリングは少し適当。</p>

<p>ということで、これと(java-lang)、memcached(c-lang)で比較してみた。</p>

<pre class="java">$target = array(
    array(<span class="string" >'host'</span> =&gt; <span class="string" >'localhost'</span>, <span class="string" >'port'</span> =&gt; <span class="number" >11211</span>),
    array(<span class="string" >'host'</span> =&gt; <span class="string" >'localhost'</span>, <span class="string" >'port'</span> =&gt; <span class="number" >12221</span>)
);
foreach($target as $t){
    $memcache = <span class="keyword" >new</span> Memcache;
    $memcache-&gt;connect($t[<span class="string" >'host'</span>], $t[<span class="string" >'port'</span>]);

    $elapsed = microtime(<span class="keyword" >true</span>);
    <span class="keyword" >for</span>($i = <span class="number" >0</span>; $i &lt; <span class="number" >1000</span>; ++$i){
        $memcache-&gt;set(<span class="string" >'hoge'</span>, <span class="string" >'123'</span>);
        $memcache-&gt;set(<span class="string" >'hoge'</span>, <span class="string" >'124'</span>);
        $memcache-&gt;get(<span class="string" >'hoge'</span>);

    }
    echo <span class="string" >'target host =&gt; '</span>, $t[<span class="string" >'host'</span>], <span class="string" >' port =&gt;'</span>, $t[<span class="string" >'port'</span>], PHP_EOL;
    echo <span class="string" >'elapsed: '</span>, (microtime(<span class="keyword" >true</span>) - $elapsed), PHP_EOL;
}</pre>

<pre class="java">target host =&gt; localhost port =&gt;<span class="number" >11211</span>
elapsed: <span class="number" >0.420136213303</span>
target host =&gt; localhost port =&gt;<span class="number" >12221</span>
elapsed: <span class="number" >1.34848499298</span>
</pre>

<p>なんつーか、「もうちょっとがんばりま賞」って感じで残念感があります。(約3倍遅い)<br />
とりあえず、動きそうなので、他の実装も頑張る。</p>
]]>
</content:encoded>
</item>
<item rdf:about="http://blog.xole.net/article.php?id=737">
<title>PHP で ConsistentHash</title>
<link>http://blog.xole.net/article.php?id=737</link>
<dc:date>2010-01-07T23:46:51+09:00</dc:date>
<description>ConsistentHashの動きをPHPでもやってみた
via - http://www.hyuki.com/yukiwiki/wiki.cgi?ConsistentHashing


こんな感じで実装

interface H...</description>
<content:encoded>
<![CDATA[
<p>ConsistentHashの動きをPHPでもやってみた<br />
via - <a href="http://www.hyuki.com/yukiwiki/wiki.cgi?ConsistentHashing">http://www.hyuki.com/yukiwiki/wiki.cgi?ConsistentHashing</a>
</p>

<p>こんな感じで実装</p>
<pre class="php">
<span class="keyword" >interface</span> HashFunction {
    <span class="keyword" >public</span> <span class="keyword" >function</span> hash(<span class="vars" >$key</span>);
}
<span class="keyword" >interface</span> Circle {
    <span class="keyword" >public</span> <span class="keyword" >function</span> put(<span class="vars" >$key</span>, <span class="vars" >$value</span>);
}
<span class="keyword" >interface</span> Node {
    <span class="keyword" >public</span> <span class="keyword" >function</span> put(<span class="vars" >$key</span>, <span class="vars" >$value</span>);
    <span class="keyword" >public</span> <span class="keyword" >function</span> get(<span class="vars" >$key</span>);
    <span class="keyword" >public</span> <span class="keyword" >function</span> has(<span class="vars" >$key</span>);
    <span class="keyword" >public</span> <span class="keyword" >function</span> keys();
    <span class="keyword" >public</span> <span class="keyword" >function</span> getName();
}

<span class="keyword" >class</span> TreeMap <span class="keyword" >implements</span> Circle {
    <span class="keyword" >private</span> <span class="vars" >$values</span>;
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(<span class="keyword" >array</span> <span class="vars" >$values</span> = <span class="keyword" >array</span>()){
        <span class="vars" >$this</span>-&gt;values = <span class="keyword" >new</span> ArrayObject(<span class="vars" >$values</span>);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> put(<span class="vars" >$key</span>, <span class="vars" >$value</span>){
        <span class="vars" >$this</span>-&gt;values-&gt;offsetSet(<span class="vars" >$key</span>, <span class="vars" >$value</span>);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> get(<span class="vars" >$key</span>){
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;values-&gt;offsetGet(<span class="vars" >$key</span>);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> remove(<span class="vars" >$key</span>){
        <span class="vars" >$this</span>-&gt;values-&gt;offsetUnset(<span class="vars" >$key</span>);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> has(<span class="vars" >$key</span>){
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;values-&gt;offsetExists(<span class="vars" >$key</span>);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> isEmpty(){
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;values-&gt;<span class="func" >count</span>() &lt; 1;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> firstKey(){
        <span class="vars" >$map</span> = clone <span class="vars" >$this</span>-&gt;values;
        <span class="vars" >$map</span>-&gt;ksort();
        <span class="vars" >$arrayKeys</span> = <span class="func" >array_keys</span>(<span class="vars" >$map</span>-&gt;getArrayCopy());
        <span class="keyword" >return</span> <span class="vars" >$arrayKeys</span>[0];
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> tailMap(<span class="vars" >$key</span>){
        <span class="vars" >$map</span> = clone <span class="vars" >$this</span>-&gt;values;
        <span class="vars" >$map</span>-&gt;ksort();
        <span class="vars" >$array</span> = <span class="vars" >$map</span>-&gt;getArrayCopy();
        <span class="vars" >$arrayKeys</span> = <span class="func" >array_keys</span>(<span class="vars" >$array</span>);

        <span class="vars" >$results</span> = <span class="keyword" >array</span>();
        <span class="keyword" >foreach</span>(<span class="vars" >$arrayKeys</span> <span class="keyword" >as</span> <span class="vars" >$arrayKey</span>){
            <span class="keyword" >if</span>(<span class="vars" >$key</span> &lt;= <span class="vars" >$arrayKey</span>){
                <span class="vars" >$results</span>[<span class="vars" >$arrayKey</span>] = <span class="vars" >$array</span>[<span class="vars" >$arrayKey</span>];
            }
        }
        <span class="keyword" >return</span> <span class="keyword" >new</span> self(<span class="vars" >$results</span>);
    }
}

<span class="keyword" >class</span> ConsistentHash {
    <span class="keyword" >private</span> <span class="vars" >$hashFunction</span>;
    <span class="keyword" >private</span> <span class="vars" >$numberOfReplicas</span>;
    <span class="keyword" >private</span> <span class="vars" >$circle</span>;
    <span class="keyword" >private</span> <span class="vars" >$nodes</span> = <span class="keyword" >array</span>();
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(HashFunction <span class="vars" >$hashFunction</span>, <span class="vars" >$numberOfReplicas</span>){
        <span class="vars" >$this</span>-&gt;hashFunction = <span class="vars" >$hashFunction</span>;
        <span class="vars" >$this</span>-&gt;numberOfReplicas = <span class="vars" >$numberOfReplicas</span>;
        <span class="vars" >$this</span>-&gt;circle = <span class="keyword" >new</span> TreeMap;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> getNodes(){
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;nodes;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> add(Node <span class="vars" >$node</span>){
        <span class="keyword" >for</span>(<span class="vars" >$i</span> = 0; <span class="vars" >$i</span> &lt; <span class="vars" >$this</span>-&gt;numberOfReplicas; ++<span class="vars" >$i</span>){
            <span class="vars" >$nodeKey</span> = <span class="vars" >$this</span>-&gt;hashFunction-&gt;hash(<span class="vars" >$node</span>-&gt;getName() . <span class="vars" >$i</span>);
            <span class="vars" >$this</span>-&gt;circle-&gt;put(<span class="vars" >$nodeKey</span>, <span class="vars" >$node</span>);
        }
        <span class="vars" >$this</span>-&gt;nodes[] = <span class="vars" >$node</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> get(<span class="vars" >$key</span>){
        <span class="keyword" >if</span>(<span class="vars" >$this</span>-&gt;circle-&gt;isEmpty()){
            <span class="keyword" >return</span> null;
        }
        <span class="vars" >$hash</span> = <span class="vars" >$this</span>-&gt;hashFunction-&gt;hash(<span class="vars" >$key</span>);
        <span class="keyword" >if</span>(!<span class="vars" >$this</span>-&gt;circle-&gt;has(<span class="vars" >$hash</span>)){
            <span class="vars" >$tailMap</span> = <span class="vars" >$this</span>-&gt;circle-&gt;tailMap(<span class="vars" >$hash</span>);
            <span class="keyword" >if</span>(<span class="vars" >$tailMap</span>-&gt;isEmpty()){
                <span class="vars" >$hash</span> = <span class="vars" >$this</span>-&gt;circle-&gt;firstKey();
            } <span class="keyword" >else</span> {
                <span class="vars" >$hash</span> = <span class="vars" >$tailMap</span>-&gt;firstKey();
            }
        }
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;circle-&gt;get(<span class="vars" >$hash</span>);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> remove(Node <span class="vars" >$node</span>){
        <span class="keyword" >for</span>(<span class="vars" >$i</span> = 0; <span class="vars" >$i</span> &lt; <span class="vars" >$this</span>-&gt;numberOfReplicas; ++<span class="vars" >$i</span>){
            <span class="vars" >$nodeKey</span> = <span class="vars" >$this</span>-&gt;hashFunction-&gt;hash(<span class="vars" >$node</span>-&gt;getName() . <span class="vars" >$i</span>);
            <span class="vars" >$this</span>-&gt;circle-&gt;remove(<span class="vars" >$nodeKey</span>);
        }
    }
}

<span class="keyword" >class</span> ConsistentHashNode <span class="keyword" >implements</span> Node {
    <span class="keyword" >private</span> <span class="vars" >$hash</span>;
    <span class="keyword" >private</span> <span class="vars" >$keys</span> = <span class="keyword" >array</span>();
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(ConsistentHash <span class="vars" >$hash</span>){
        <span class="vars" >$this</span>-&gt;hash = <span class="vars" >$hash</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> put(<span class="vars" >$key</span>, <span class="vars" >$value</span>){
        <span class="vars" >$this</span>-&gt;hash-&gt;get(<span class="vars" >$key</span>)-&gt;put(<span class="vars" >$key</span>, <span class="vars" >$value</span>);
        <span class="vars" >$this</span>-&gt;keys[] = <span class="vars" >$key</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> get(<span class="vars" >$key</span>){
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;hash-&gt;get(<span class="vars" >$key</span>)-&gt;get(<span class="vars" >$key</span>);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> has(<span class="vars" >$key</span>){
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;hash-&gt;get(<span class="vars" >$key</span>)-&gt;has(<span class="vars" >$key</span>);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> keys(){
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;keys;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> getName(){
        <span class="keyword" >return</span> <span class="keyword" >__CLASS__</span>;
    }
}

<span class="keyword" >class</span> IdentNode <span class="keyword" >implements</span> Node {
    <span class="keyword" >private</span> <span class="vars" >$name</span>;
    <span class="keyword" >private</span> <span class="vars" >$values</span> = <span class="keyword" >array</span>();
    <span class="keyword" >public</span> <span class="keyword" >function</span> __construct(<span class="vars" >$name</span>){
        <span class="vars" >$this</span>-&gt;name = <span class="vars" >$name</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> put(<span class="vars" >$key</span>, <span class="vars" >$value</span>){
        <span class="vars" >$this</span>-&gt;values[<span class="vars" >$key</span>] = <span class="vars" >$value</span>;
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> get(<span class="vars" >$key</span>){
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;values[<span class="vars" >$key</span>];
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> has(<span class="vars" >$key</span>){
        <span class="keyword" >return</span> isset(<span class="vars" >$this</span>-&gt;values);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> keys(){
        <span class="keyword" >return</span> <span class="func" >array_keys</span>(<span class="vars" >$this</span>-&gt;values);
    }
    <span class="keyword" >public</span> <span class="keyword" >function</span> getName(){
        <span class="keyword" >return</span> <span class="vars" >$this</span>-&gt;name;
    }
}</pre>

<p>PHPでTreeMapみたいな実装(keyを並び替えながら挿入、指定key以降のmap取得)が思いつかなかったので、ksortとかつかって、なんちゃって実装</p>
<p>んで、ハッシュアルゴリズムとかは、こんな感じで実装して、必要に応じて切り替える</p>

<pre class="php">
<span class="keyword" >class</span> HashMD5Integer <span class="keyword" >implements</span> HashFunction {
    <span class="keyword" >public</span> <span class="keyword" >function</span> hash(<span class="vars" >$key</span>){
        <span class="vars" >$values</span> = unpack(<span class="string" >'H*'</span>, md5(<span class="vars" >$key</span>));
        <span class="keyword" >return</span> <span class="vars" >$values</span>[1];
    }
}
<span class="keyword" >class</span> HashSha1Integer <span class="keyword" >implements</span> HashFunction {
    <span class="keyword" >public</span> <span class="keyword" >function</span> hash(<span class="vars" >$key</span>){
        <span class="vars" >$values</span> = unpack(<span class="string" >'H*'</span>, sha1(<span class="vars" >$key</span>));
        <span class="keyword" >return</span> <span class="vars" >$values</span>[1];
    }
}
<span class="keyword" >class</span> HashCRC32Integer <span class="keyword" >implements</span> HashFunction {
    <span class="keyword" >public</span> <span class="keyword" >function</span> hash(<span class="vars" >$key</span>){
        <span class="vars" >$values</span> = unpack(<span class="string" >'H*'</span>, crc32(<span class="vars" >$key</span>));
        <span class="keyword" >return</span> <span class="vars" >$values</span>[1];
    }
}</pre>

<p>これについても、ハッシュ値を並び替えるため、数値にしたくて、ハッシュ文字列(md5)を数値(digit)にする方法が思いつかなかったので、unpack(H*)。もっといい方法があると思う。</p>

<p>こんな風に動かす</p>
<pre class="php">
<span class="vars" >$hash</span> = <span class="keyword" >new</span> ConsistentHash(<span class="keyword" >new</span> HashMD5Integer, 8);
<span class="vars" >$hash</span>-&gt;add(<span class="keyword" >new</span> IdentNode(<span class="string" >'hoge1'</span>));
<span class="vars" >$hash</span>-&gt;add(<span class="keyword" >new</span> IdentNode(<span class="string" >'hoge2'</span>));
<span class="vars" >$hash</span>-&gt;add(<span class="keyword" >new</span> IdentNode(<span class="string" >'hoge3'</span>));
<span class="vars" >$map</span> = <span class="keyword" >new</span> ConsistentHashNode(<span class="vars" >$hash</span>);
<span class="keyword" >for</span>(<span class="vars" >$i</span> = 0; <span class="vars" >$i</span> &lt; 10; ++<span class="vars" >$i</span>){
    <span class="vars" >$map</span>-&gt;put(<span class="string" >'key'</span> . <span class="vars" >$i</span>, <span class="string" >'value'</span> . <span class="vars" >$i</span>);
}
<span class="keyword" >foreach</span>(<span class="vars" >$hash</span>-&gt;getNodes() <span class="keyword" >as</span> <span class="vars" >$node</span>){
    <span class="func" >echo</span> <span class="vars" >$node</span>-&gt;getName(), <span class="string" >':'</span>, join(<span class="string" >','</span>, <span class="vars" >$node</span>-&gt;keys()), PHP_EOL;
}
</pre>

<p>実行結果としては、こんな感じで出た。まあそこそこバラけてる(?)</p>
<pre class="php">hoge1:key1,key4,key6
hoge2:key3,key5,key7
hoge3:key0,key2,key8,key9
</pre>

<p>試しに他のアルゴリズムでも実行して比較してみた</p>

<pre class="php">
<span class="vars" >$hashAlgos</span> = <span class="keyword" >array</span>(
    <span class="keyword" >new</span> HashMD5Integer,
    <span class="keyword" >new</span> HashSha1Integer,
    <span class="keyword" >new</span> HashCRC32Integer
);
<span class="keyword" >foreach</span>(<span class="vars" >$hashAlgos</span> <span class="keyword" >as</span> <span class="vars" >$algo</span>){
    <span class="func" >echo</span> get_class(<span class="vars" >$algo</span>), PHP_EOL;
    <span class="vars" >$hash</span> = <span class="keyword" >new</span> ConsistentHash(<span class="vars" >$algo</span>, 32);
    <span class="keyword" >for</span>(<span class="vars" >$i</span> = 0; <span class="vars" >$i</span> &lt; 10; <span class="vars" >$i</span>++){
       <span class="vars" >$hash</span>-&gt;add(<span class="keyword" >new</span> IdentNode(<span class="string" >'hoge'</span> . <span class="vars" >$i</span>));
    }

    <span class="vars" >$map</span> = <span class="keyword" >new</span> ConsistentHashNode(<span class="vars" >$hash</span>);
    <span class="keyword" >for</span>(<span class="vars" >$i</span> = 0; <span class="vars" >$i</span> &lt; 100; <span class="vars" >$i</span>++){
        <span class="vars" >$map</span>-&gt;put(<span class="string" >'*'</span> . <span class="vars" >$i</span>, <span class="vars" >$i</span>);
    }

    <span class="vars" >$allKeys</span> = <span class="vars" >$map</span>-&gt;keys();
    <span class="keyword" >foreach</span>(<span class="vars" >$hash</span>-&gt;getNodes() <span class="keyword" >as</span> <span class="vars" >$node</span>){
        <span class="vars" >$keys</span> = <span class="vars" >$node</span>-&gt;keys();
        <span class="func" >echo</span> <span class="string" >'node('</span>, (<span class="func" >count</span>(<span class="vars" >$keys</span>) / <span class="func" >count</span>(<span class="vars" >$allKeys</span>)) * 100, <span class="string" >'%):'</span>, <span class="vars" >$node</span>-&gt;getName(), <span class="string" >', keys:'</span>, join(<span class="string" >','</span>, <span class="vars" >$keys</span>), PHP_EOL;
    }
}</pre>

<pre class="php">HashMD5Integer
node(12%):hoge0, keys:*2,*9,*21,*27,*30,*47,*49,*52,*53,*70,*77,*89
node(12%):hoge1, keys:*0,*8,*10,*20,*23,*33,*40,*41,*69,*86,*90,*98
node(11%):hoge2, keys:*16,*22,*31,*38,*39,*63,*64,*72,*75,*79,*88
node(10%):hoge3, keys:*12,*48,*56,*58,*62,*65,*67,*73,*83,*84
node(4%):hoge4, keys:*6,*61,*91,*95
node(11%):hoge5, keys:*1,*4,*7,*13,*17,*28,*42,*44,*54,*55,*94
node(13%):hoge6, keys:*5,*11,*26,*35,*37,*43,*50,*51,*57,*74,*87,*92,*97
node(8%):hoge7, keys:*18,*32,*36,*45,*46,*60,*82,*85
node(8%):hoge8, keys:*15,*19,*25,*59,*68,*78,*81,*96
node(11%):hoge9, keys:*3,*14,*24,*29,*34,*66,*71,*76,*80,*93,*99
HashSha1Integer
node(11%):hoge0, keys:*29,*33,*36,*38,*46,*55,*67,*74,*81,*86,*92
node(8%):hoge1, keys:*1,*2,*18,*48,*69,*89,*93,*99
node(8%):hoge2, keys:*6,*57,*58,*62,*66,*68,*71,*78
node(12%):hoge3, keys:*8,*14,*32,*39,*42,*51,*56,*75,*79,*88,*95,*97
node(14%):hoge4, keys:*3,*9,*12,*13,*34,*40,*43,*45,*53,*59,*64,*65,*77,*94
node(10%):hoge5, keys:*7,*23,*24,*41,*49,*61,*72,*76,*80,*91
node(10%):hoge6, keys:*0,*10,*11,*15,*17,*25,*28,*35,*87,*98
node(8%):hoge7, keys:*4,*20,*21,*27,*31,*50,*63,*85
node(12%):hoge8, keys:*5,*16,*22,*26,*30,*37,*44,*47,*52,*70,*90,*96
node(7%):hoge9, keys:*19,*54,*60,*73,*82,*83,*84
HashCRC32Integer
node(13%):hoge0, keys:*11,*12,*13,*14,*16,*17,*40,*42,*44,*45,*47,*49,*98
node(9%):hoge1, keys:*58,*62,*63,*64,*65,*74,*75,*81,*96
node(3%):hoge2, keys:*29,*30,*36
node(14%):hoge3, keys:*4,*5,*9,*21,*25,*27,*32,*38,*39,*51,*78,*87,*92,*99
node(7%):hoge4, keys:*43,*50,*56,*59,*80,*86,*97
node(22%):hoge5, keys:*10,*15,*18,*19,*34,*41,*48,*52,*53,*54,*55,*57,*60,*61,*66,*67,*68,*69,*70,*71,*85,*93
node(5%):hoge6, keys:*8,*24,*28,*31,*79
node(5%):hoge7, keys:*0,*1,*35,*46,*82
node(16%):hoge8, keys:*2,*3,*6,*22,*23,*26,*37,*72,*73,*76,*77,*83,*88,*90,*94,*95
node(6%):hoge9, keys:*7,*20,*33,*84,*89,*91</pre>

<p>crc32とかだと、似たようなキー(52-55,66-71とか)は一部集中してる感じ。逆に md5, sha1 あたりは一様にバラけてるので、バラ撒くなら後者の2つを使う方が良さそう。<br />
というか、crc32は一部に偏りすぎな気もする（実装の問題?)</p>

<p>ノードを増やしてみたりした結果はこれ</p>
<pre class="php">
<span class="vars" >$hash</span> = <span class="keyword" >new</span> ConsistentHash(<span class="keyword" >new</span> HashMD5Integer, 32);
<span class="vars" >$hash</span>-&gt;add(<span class="keyword" >new</span> IdentNode(<span class="string" >'hoge1'</span>));
<span class="vars" >$hash</span>-&gt;add(<span class="keyword" >new</span> IdentNode(<span class="string" >'hoge2'</span>));
<span class="vars" >$hash</span>-&gt;add(<span class="keyword" >new</span> IdentNode(<span class="string" >'hoge3'</span>));

<span class="vars" >$map</span> = <span class="keyword" >new</span> ConsistentHashNode(<span class="vars" >$hash</span>);
<span class="keyword" >for</span>(<span class="vars" >$i</span> = 0; <span class="vars" >$i</span> &lt; 10; ++<span class="vars" >$i</span>){
    <span class="vars" >$map</span>-&gt;put(<span class="string" >'key'</span> . <span class="vars" >$i</span>, <span class="string" >'value'</span> . <span class="vars" >$i</span>);
}
<span class="vars" >$hash</span>-&gt;add(<span class="keyword" >new</span> IdentNode(<span class="string" >'hoge4'</span>));
<span class="keyword" >for</span>(<span class="vars" >$i</span> = 10; <span class="vars" >$i</span> &lt; 20; ++<span class="vars" >$i</span>){
    <span class="vars" >$map</span>-&gt;put(<span class="string" >'key'</span> . <span class="vars" >$i</span>, <span class="string" >'value'</span> . <span class="vars" >$i</span>);
}
<span class="vars" >$hash</span>-&gt;add(<span class="keyword" >new</span> IdentNode(<span class="string" >'hoge5'</span>));
<span class="keyword" >for</span>(<span class="vars" >$i</span> = 30; <span class="vars" >$i</span> &lt; 40; ++<span class="vars" >$i</span>){
    <span class="vars" >$map</span>-&gt;put(<span class="string" >'key'</span> . <span class="vars" >$i</span>, <span class="string" >'value'</span> . <span class="vars" >$i</span>);
}

<span class="vars" >$allKeys</span> = <span class="vars" >$map</span>-&gt;keys();
<span class="keyword" >foreach</span>(<span class="vars" >$hash</span>-&gt;getNodes() <span class="keyword" >as</span> <span class="vars" >$node</span>){
    <span class="vars" >$keys</span> = <span class="vars" >$node</span>-&gt;keys();
    <span class="func" >echo</span> <span class="string" >'node('</span>, (<span class="func" >count</span>(<span class="vars" >$keys</span>) / <span class="func" >count</span>(<span class="vars" >$allKeys</span>)) * 100, <span class="string" >'%):'</span>, <span class="vars" >$node</span>-&gt;getName(), <span class="string" >', keys:'</span>, join(<span class="string" >','</span>, <span class="vars" >$keys</span>), PHP_EOL;
}
</pre>

<pre class="php">node(30%):hoge1, keys:key0,key4,key6,key7,key16,key17,key30,key31,key34
node(16.6666666667%):hoge2, keys:key2,key3,key5,key9,key19
node(23.3333333333%):hoge3, keys:key1,key8,key10,key11,key12,key14,key15
node(20%):hoge4, keys:key13,key18,key32,key35,key37,key39
node(10%):hoge5, keys:key33,key36,key38
</pre>

<p>これはこういう動きが正しいのかわからない。。。(先に存在していたノードはキーが増えるよなぁ）</p>

<p>んで、結論。</p>
<p>ConsistentHashは面白い</p>
]]>
</content:encoded>
</item>

</rdf:RDF>