about: S2PHP5
about: Io Language
2010/03/08
JNA で mecab
JNIなmecabを使っていたけど、libmecab(JNI) の *.soとか、*.dylib のコンパイルを環境(LinuxとかMacとか)毎に用意するのが面倒になってきたので、JNAでアクセスできないかと挑戦中...。
mecab.h をよみつつ、こんなインタフェースを用意
package org.chasen.mecab; import com.sun.jna.Library; import com.sun.jna.PointerType; import com.sun.jna.Structure; public interface LibMecab extends Library { final int MECAB_NOR_NODE = 0; final int MECAB_UNK_NODE = 1; final int MECAB_BOS_NODE = 2; final int MECAB_EOS_NODE = 3; final int MECAB_EON_NODE = 4; final int MECAB_USR_DIC = 1; final int MECAB_SYS_DIC = 0; final int MECAB_UNK_DIC = 2; int mecab_do(int argc, String...argv); mecab_t mecab_new(int argc, String...argv); mecab_t mecab_new2(final String arg); String mecab_version(); String mecab_strerror(mecab_t mecab); void mecab_destroy(mecab_t mecab); int mecab_get_partial(mecab_t mecab); void mecab_set_partial(mecab_t mecab, int partial); float mecab_get_theta(mecab_t mecab); void mecab_set_theta(mecab_t mecab, float theta); int mecab_get_lattice_level(mecab_t mecab); void mecab_set_lattice_level(mecab_t mecab, int level); int mecab_get_all_morphs(mecab_t mecab); void mecab_set_all_morphs(mecab_t mecab, int all_morphs); String mecab_sparse_tostr(mecab_t mecab, final String str); String mecab_sparse_tostr2(mecab_t mecab, final String str, int len); String mecab_sparse_tostr3(mecab_t mecab, final String str, int len, String ostr, int olen); mecab_node_t mecab_sparse_tonode(mecab_t mecab, final String chr); mecab_node_t mecab_sparse_tonode2(mecab_t mecab, final String chr, int size); String mecab_nbest_sparse_tostr(mecab_t mecab, int N, final String str); String mecab_nbest_sparse_tostr2(mecab_t mecab, int N, final String str, int len); String mecab_nbest_sparse_tostr3(mecab_t mecab, int N, final String str, int len, String ostr, int olen); int mecab_nbest_init(mecab_t mecab, final String str); int mecab_nbest_init2(mecab_t mecab, final String str, int len); String mecab_nbest_next_tostr(mecab_t mecab); String mecab_nbest_next_tostr2(mecab_t mecab, String ostr, int olen); mecab_node_t mecab_nbest_next_tonode(mecab_t mecab); String mecab_format_node(mecab_t mecab, final mecab_node_t node); mecab_dictionary_info_t mecab_dictionary_info(mecab_t mecab); int mecab_dict_index(int argc, String...argv); int mecab_dict_gen(int argc, String...argv); int mecab_cost_train(int argc, String...argv); int mecab_system_eval(int argc, String...argv); int mecab_test_gen(int argc, String...argv); class mecab_t extends PointerType { public static class Type extends Structure { public int allocated; } public Type get(){ Type type = (Type) Structure.newInstance(Type.class); type.writeField("allocated", getPointer().getInt(0)); return type; } } class mecab_node_t extends PointerType { } class mecab_dictionary_info_t extends PointerType { } abstract class Tagger extends PointerType { abstract String parse(final String str); abstract Node parseToNode(final String str); abstract String parseNBest(int N, final String str); abstract boolean parseNBestInit(final String str); abstract Node nextNode(); abstract String next(); abstract String formatNode(final Node node); // configuration abstract boolean partial(); abstract void set_partial(boolean partial); abstract float theta(); abstract void set_theta(float theta); abstract int lattice_level(); abstract void set_lattice_level(int level); abstract boolean all_morphs(); abstract void set_all_morphs(boolean all_morphs); abstract DictionaryInfo dictionary_info(); abstract String what(); static Tagger create(int argc, String...argv){ return null; } static Tagger create(String...argv){ return null; } } class Node extends PointerType { } class DictionaryInfo extends PointerType { } }
で、いざ実行...。
package org.chasen.mecab; import org.chasen.mecab.LibMecab.mecab_t; import com.sun.jna.Native; public class Main { public static void main(String...args){ LibMecab mecab = (LibMecab) Native.loadLibrary("mecab", LibMecab.class); System.out.println(mecab.mecab_version()); mecab_t _ = mecab.mecab_new2(""); System.out.println(mecab.mecab_strerror(_)); // 文字化けだし、動かない... String result = mecab.mecab_sparse_tostr(_, "こんにちは!"); System.out.println(result); mecab.mecab_destroy(_); } }
こんな結果は出た。バージョンはちゃんと取れてた。けど、ちゃんと動かない...。
0.97
? M
?? t
??? d
?ち t
? M
?? t
?? d
EOS
うーん。もう少し。。これができれば結構楽になるハズ...。
継続していきます
ref - http://github.com/nowelium/JNA-Mecab
2010/03/06
PHP でも Protocol Buffers しよう!
どうも巷では、Google の protocol buffers のPHP版の実装で pb4php が使われてるみたいですが、実装が(個人的に)アレすぎるので、phpbuf を使いましょうという話
これの使い方は README に書いてあるので省略しますが、最近こいつを fork して RPC (protobuf-socket-rpc) の実装を追加したので、ここに書いておくよ。
ref - http://github.com/nowelium/phpbuf
ということで、実装方法を紹介
実装方法(サンプル)
こんな proto を用意します。
package com.github.nowelium.phpbuf; option optimize_for = SPEED; message Web { enum SiteType { BLOG = 0; NEWS = 1; VIDEO = 2; UNKNOWN = 3; } message Site { required string url = 1; required string title = 2; required SiteType type = 3; optional string summary = 4; } } message Request { message SearchRequest { required string query = 1; } } message Response { message SearchResult { repeated Web.Site sites = 1; } } service SearchService { rpc search(Request.SearchRequest) returns(Response.SearchResult); }
サーバサイド(java)
ちょー簡単です。
SocketRpcServer の実装は用意されているので、 Service の実装を register するだけ
package com.github.nowelium.phpbuf; import java.util.concurrent.Executors; import com.github.nowelium.phpbuf.ExampleSearch.SearchService; import com.googlecode.protobuf.socketrpc.SocketRpcServer; public class Server { public static void main(String...args){ SocketRpcServer server = new SocketRpcServer(12345, Executors.newCachedThreadPool()); server.registerBlockingService(SearchService.newReflectiveBlockingService(new SearchServiceImpl())); try { server.startServer(); synchronized(server){ server.wait(); } } catch(InterruptedException e){ // nop } finally { server.shutDown(); } } }
Service側の実装。これも簡単で、proto で定義したインタフェースどおりに実装するだけ
package com.github.nowelium.phpbuf; import java.util.logging.Logger; import com.github.nowelium.phpbuf.ExampleSearch.Web; import com.github.nowelium.phpbuf.ExampleSearch.Request.SearchRequest; import com.github.nowelium.phpbuf.ExampleSearch.Response.SearchResult; import com.github.nowelium.phpbuf.ExampleSearch.Web.SiteType; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; public class SearchServiceImpl implements ExampleSearch.SearchService.BlockingInterface { private final Logger logger = Logger.getLogger(SearchServiceImpl.class.getName()); public SearchResult search(RpcController controller, SearchRequest request) throws ServiceException { String query = request.getQuery(); logger.info("input query => " + query); return search(query); } protected SearchResult search(String query){ final SearchResult.Builder result = SearchResult.newBuilder(); { Web.Site.Builder builder = Web.Site.newBuilder(); builder.setTitle("example 1 - " + query); builder.setUrl("http://www.example.com/"); builder.setType(SiteType.NEWS); result.addSites(builder.build()); } { Web.Site.Builder builder = Web.Site.newBuilder(); builder.setTitle("example 2 - " + query); builder.setUrl("http://www.example.net/"); builder.setType(SiteType.UNKNOWN); result.addSites(builder.build()); } return result.build(); } }
クライアントサイド(php)
まずは、proto の message 定義を行います。
class Message_Web_Site extends PhpBuf_Message_Abstract { public function __construct(){ $this->setField('url', PhpBuf_Type::STRING, PhpBuf_Rule::REQUIRED, 1); $this->setField('title', PhpBuf_Type::STRING, PhpBuf_Rule::REQUIRED, 2); $this->setField('type', PhpBuf_Type::ENUM, PhpBuf_Rule::REQUIRED, 3, Message_SiteType::values()); $this->setField('summary', PhpBuf_Type::STRING, PhpBuf_Rule::OPTIONAL, 4); } public static function name(){ return __CLASS__; } } class Message_SiteType extends Message_Enum_Abstract { const BLOG = 0; const NEWS = 1; const VIDEO = 2; const UNKNOWN = 3; public static function values(){ return array( self::BLOG, self::NEWS, self::VIDEO, self::UNKNOWN ); } } class Message_Request_SearchRequest extends PhpBuf_Message_Abstract { public function __construct(){ $this->setField('query', PhpBuf_Type::STRING, PhpBuf_Rule::REQUIRED, 1); } public static function name(){ return __CLASS__; } } class Message_Response_SearchResult extends PhpBuf_Message_Abstract { public function __construct(){ $this->setField('sites', PhpBuf_Type::MESSAGE, PhpBuf_Rule::REPEATED, 1, Message_Web_Site::name()); } public static function name(){ return __CLASS__; } }
次に、Service 部分の実装。これも proto の定義で
class Service_SearchService extends PhpBuf_RPC_Socket_Service_Client { public function __construct($host, $port){ parent::__construct($host, $port); $this->setServiceFullQualifiedName('com.github.nowelium.phpbuf.SearchService'); $this->registerMethodResponderClass('search', Message_Response_SearchResult::name()); } }
main処理
$request = new Message_Request_SearchRequest; $request->query = 'hello world'; $service = new Service_SearchService('localhost', 12345); $result = $service->search($request); foreach($result->sites as $site){ echo 'title => ', $site->title, PHP_EOL; echo 'url => ', $site->url, PHP_EOL; echo 'type => ', $site->type, PHP_EOL; echo PHP_EOL; }
実装がすごく簡単ですね
実際に起動してみる
先ほどの java を起動しておきます。
shell > java -cp bin:protobuf-java-2.2.0.jar:protobuf-socket-rpc.jar com.github.nowelium.phpbuf.Server Picked up _JAVA_OPTIONS: -Dfile.encoding=UTF-8 2010/03/06 21:01:47 com.googlecode.protobuf.socketrpc.SocketRpcServer$ServerThread runServer 情報: Listening for requests on port: 12345
次に php で通信してみます
shell > php search_client.php title => example 1 - hello world url => http://www.example.com/ type => 1 title => example 2 - hello world url => http://www.example.net/ type => 3
通信できてるのがわかります。
おわり
ということで、PHPでも protobuf で RPC できるようになったよ!やったー!\(^o^)/
ここまでのサンプルは phpbuf-rpc-example 置いてます。省略した部分もあるので、詳細はソースでどうぞ
ちなみに、php-protobuf の元作者のページは ここ なんですが、ロシア語(?)で何を書いてるのかわからないです。
いつか、日本語にしてみたいですね。
# 後は、php code を gen できればいいかな。。(これもいつか..。)
2010/02/07
HiveJDBC を S2JDBC 経由で使えるようにする
hadoop の話題。その3
HiveのJDBCを使えばリモート上で動いているHiveに対して、JDBC(over thrift)経由でHive QLを実行出来るのですごく便利です。
ref - Hive/HiveClient - Hadoop Wiki
HiveJDBCはフツーのJDBCっぽく使えるので、こんな感じで普通のDBサーバにSQLを投げる感覚で使える
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; public class HiveConnection { public static void main(String...args) throws SQLException { try { Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); System.exit(1); } Connection conn = DriverManager.getConnection("jdbc:hive://master1:10000/", "", ""); Statement stmt = conn.createStatement(); stmt.execute("SELECT distinct(id) FROM hoge"); ResultSet rs = stmt.getResultSet(); while(rs.next()){ System.out.println(rs.getString(1)); } } }
なので、便利なものは便利なものと合体させてしまえ。ということで、みんな大好き S2JDBCで使えるようにしてみた
HiveServerを起動/停止
とりあえず、HiveJDBCを利用するには、thrift経由でHiveServerを叩ける必要がある。
以下のようなスクリプトを用意
start.sh
#!/usr/bin/env bash pidfile=$HIVE_PID_DIR/hiveserver.pid logfile=$HIVE_LOG_DIR/hiveserver.log if [ -f $pidfile ]; then echo running as process `cat $pidfile`. stop it first exit 1 fi nohup $HIVE_HOME/bin/hive --service hiveserver > $logfile 2>&1 < /dev/null & echo $! > $pidfile
stop.sh
#!/usr/bin/env bash pidfile=$HIVE_PID_DIR/hiveserver.pid logfile=$HIVE_LOG_DIR/hiveserver.log if [ -f $pidfile ]; then kill `cat $pidfile` rm $pidfile else echo no pidfile $pidfile fi
これで、実行したサーバでデフォルトのポート10000で起動する。
ちなみに、HADOOP_CONF_DIRとかHIVE_CONF_DIRは別途ちゃんと設定すること
HiveDialectを用意
先に書いておくと、SQLのorder by ...に相当するものは、HiveQLだと sort by ...なんだけど、今はまだ用意してない。今度書く
package org.seasar.extension.jdbc.dialect; import org.seasar.extension.jdbc.SelectForUpdateType; public class HiveDialect extends StandardDialect { @Override public String getName() { return "hive"; } @Override public boolean supportsLimit() { return true; } @Override public String convertLimitSql(String sql, int offset, int limit) { StringBuilder buf = new StringBuilder(sql.length() + 20); buf.append(sql); buf.append(" limit "); buf.append(limit); return buf.toString(); } @Override public boolean supportsBatchUpdateResults() { return false; } @Override public boolean supportsCursor() { return false; } @Override public boolean supportsForUpdate(SelectForUpdateType type, boolean withTarget) { return false; } @Override public boolean supportsGetGeneratedKeys() { return false; } @Override public boolean supportsIdentity() { return false; } @Override public boolean supportsInnerJoinForUpdate() { return false; } @Override public boolean supportsLockHint() { return false; } @Override public boolean supportsOffset() { return false; } @Override public boolean supportsOffsetWithoutLimit() { return false; } @Override public boolean supportsOuterJoinForUpdate() { return false; } @Override public boolean supportsSequence() { return false; } }
HiveConnectionPoolを用意
これは、HiveがXA transactionとかconnection#closeなんかを実行することができない(未実装のコードなので...svn:..jdbc/HiveConnection)ので ConnectionPoolImpl を継承して ConnectionWrapper を取る部る部分を override
package org.seasar.extension.dbcp.impl; import java.sql.Connection; import java.sql.SQLException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.sql.XAConnection; import javax.transaction.Transaction; import org.seasar.extension.dbcp.ConnectionWrapper; import org.seasar.framework.util.TransactionManagerUtil; public class HiveConnectionPool extends ConnectionPoolImpl { private Map<Transaction, ConnectionWrapper> txActivePool = new ConcurrentHashMap<Transaction, ConnectionWrapper>(); @Override public synchronized ConnectionWrapper checkOut() throws SQLException { Transaction tx = getTransaction(); ConnectionWrapper conn = txActivePool.get(tx); if(null == conn){ conn = createConnection(tx); } if(null != tx){ txActivePool.put(tx, conn); } return conn; } protected ConnectionWrapper createConnection(Transaction transaction) throws SQLException { XAConnection xaConnection = getXADataSource().getXAConnection(); Connection connection = xaConnection.getConnection(); return new HiveConnectionWrapper(xaConnection, connection, this, transaction); } protected Transaction getTransaction(){ return TransactionManagerUtil.getTransaction(getTransactionManager()); } }
HiveConnectionWrapper を用意する
これは後述するPreparedStatement対策として用意。PreparedStatementWrapper さえも wrapper を用意する必要があるので、ConnectionWrapperImpl を override
package org.seasar.extension.dbcp.impl; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import javax.sql.XAConnection; import javax.transaction.Transaction; import org.seasar.extension.dbcp.ConnectionPool; import org.seasar.framework.exception.SSQLException; public class HiveConnectionWrapper extends ConnectionWrapperImpl { public HiveConnectionWrapper(XAConnection xaConnection, Connection physicalConnection, ConnectionPool connectionPool, Transaction tx) throws SQLException { super(xaConnection, physicalConnection, connectionPool, tx); } protected SQLException wrapException(final SQLException e, final String sql) { return new SSQLException("ESSR0072", new Object[] { sql, e.getMessage(), new Integer(e.getErrorCode()), e.getSQLState() }, e .getSQLState(), e.getErrorCode(), e, sql); } protected void assertOpened() throws SQLException { if (isClosed()) { throw new SSQLException("ESSR0062", null); } } @Override public PreparedStatement prepareStatement(final String sql) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, autoGeneratedKeys), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnIndexes), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnNames), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } }
HivePreparedStatementWrapperを用意する
これもConnectionと同じように、PreparedStatementの大半のコードが未実装(svn:jdbc/HivePreparedStatement)なので、PreparedStatementWrapper を継承して override
package org.seasar.extension.dbcp.impl; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.seasar.extension.jdbc.impl.PreparedStatementWrapper; import org.seasar.framework.exception.SSQLException; public class HivePreparedStatementWrapper extends PreparedStatementWrapper { protected final PreparedStatement original; protected final String sql; public HivePreparedStatementWrapper(PreparedStatement original, String sql) { super(original, sql); this.original = original; this.sql = sql; } protected SQLException wrapException(final SQLException e) { return wrapException(e, sql); } protected SQLException wrapException(final SQLException e, final String sql) { if (sql != null) { return new SSQLException("ESSR0072", new Object[] { sql, String.valueOf(e.getErrorCode()), e.getSQLState() }, e .getSQLState(), e.getErrorCode(), e, sql); } return e; } @Override public void close() { // HivePreparedStatement was not supported in #close } @Override public ResultSet executeQuery() throws SQLException { try { return new HiveResultSetWrapper(original.executeQuery()); } catch (final SQLException e) { throw wrapException(e); } } @Override public ResultSet executeQuery(final String sql) throws SQLException { try { return new HiveResultSetWrapper(original.executeQuery(sql)); } catch (final SQLException e) { throw wrapException(e, sql); } } @Override public ResultSet getResultSet() throws SQLException { try { return new HiveResultSetWrapper(original.getResultSet()); } catch (final SQLException e) { throw wrapException(e); } } @Override public ResultSet getGeneratedKeys() throws SQLException { try { return new HiveResultSetWrapper(original.getGeneratedKeys()); } catch (final SQLException e) { throw wrapException(e); } } }
HiveResultSetWrapper を用意
まだまだ続きます...HiveResultSet にも未実装が(svn:jdbc/HiveResultSet)なので、ResultSetWrapper を override
package org.seasar.extension.dbcp.impl; import java.sql.Date; import java.sql.ResultSet; import java.sql.SQLException; import org.seasar.extension.jdbc.impl.ResultSetWrapper; public class HiveResultSetWrapper extends ResultSetWrapper { protected static final String DOUBLE_QUOTE = "\""; protected final ResultSet rs; public HiveResultSetWrapper(ResultSet rs){ super(rs); this.rs = rs; } @Override public void close() throws SQLException { // HiveResultSet was not supported in #close } protected Date parseDate(final String dt) throws Exception { String tmp = dt; if(dt.startsWith(DOUBLE_QUOTE) && dt.endsWith(DOUBLE_QUOTE)){ int length = dt.length(); tmp = dt.substring(1, length - 1); } return Date.valueOf(tmp); } @Override public Date getDate(int columnIndex) throws SQLException { Object obj = getObject(columnIndex); if (obj == null) { return null; } if(obj instanceof String){ try { return parseDate((String) obj); } catch(Exception e){ throw new SQLException("Cannot convert column " + columnIndex + " to date: " + e.toString()); } } return super.getDate(columnIndex); } }
jdbc.diconを修正
やっと、s2jdbcに近づいてきました...
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE components PUBLIC "-//SEASAR2.1//DTD S2Container//EN" "http://www.seasar.org/dtd/components21.dtd"> <components namespace="jdbc_hive"> <!-- Hive does not support in jta --> <include path="jta.dicon"/> <component name="hiveDataSource" class="org.seasar.extension.dbcp.impl.XADataSourceImpl"> <property name="driverClassName">"org.apache.hadoop.hive.jdbc.HiveDriver"</property> <property name="URL">"jdbc:hive://master1:10000/"</property> <property name="user">""</property> <property name="password">""</property> </component> <component name="hive" class="org.seasar.extension.dbcp.impl.DataSourceImpl" autoBinding="none"> <arg> <component name="hivePool" class="org.seasar.extension.dbcp.impl.HiveConnectionPool" autoBinding="none"> <property name="xaDataSource">hiveDataSource</property> <property name="timeout">600</property> <property name="maxPoolSize">10</property> <property name="allowLocalTx">true</property> <property name="readOnly">true</property> <property name="transactionManager">TransactionManager</property> <destroyMethod name="close"/> </component> </arg> </component> </components>
s2jdbc.diconを用意
dialectをs2jdbcに読ませる
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE components PUBLIC "-//SEASAR//DTD S2Container 2.4//EN" "http://www.seasar.org/dtd/components24.dtd"> <components> <include path="jta.dicon"/> <include path="tx.dicon"/> <include path="s2jdbc-internal.dicon"/> <include path="jdbc-hive.dicon" /> <component name="hiveDialect" class="org.seasar.extension.jdbc.dialect.HiveDialect" /> <component name="hiveJdbcManager" class="org.seasar.extension.jdbc.manager.JdbcManagerImpl"> <property name="maxRows">0</property> <property name="fetchSize">0</property> <property name="queryTimeout">0</property> <property name="dialect">hiveDialect</property> <initMethod name="init" /> </component> </components>
S2JDBC経由でHiveQLを投げてみる
Hiveはinsert文は無いので、LOAD DATA 文です。
しかも、preparedStatementで値のbindできないので、直接クエリを書きます。
さらに、getSingleResult であっても、何も結果セットがかえってこない(svn:hive/service/HiveServier)ので、selectBySql(Integer.class...)とかはできません。
あきらめて、selectBySql(String.class) で void にしてます。。
selectについても、preparedStatementでbindで値のbindができないので、直接クエリを書きます。
今はまだ、order by の書き換え(sort by)を行っていないので、orderByが使えません。直接クエリに書きます。
joinについても少し難あり。。。
等など、ということで、いくらか諦めると...!
@Component @InterType("aop.propertyInterType") public class HogeService { @Property(PropertyType.WRITE) protected JdbcManager hive; public void insert(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public void insertFromLocal(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA LOCAL INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public void overwrite(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public void overwriteFromLocal(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public List<Hoge> getHoge(){ return hive.from(Hoge.class).where("id > 1234").limit(100).getResultList(); } }
\(^o^)/ やたー!うごいたー!
おわり。
ってことで、とりあえず、HiveをS2JDBC経由で利用できるようになったよ!
Seasarのパッケージはホント便利!ほとんどwrapperが用意されてるし、diconで切り替えれるから修正が楽!
でも、coreなパッケージほど、 private メソッド多いよ!せめて protected か getter を用意して!(無駄にコピーしちゃったのがいくつかある...)
ああああ、でもこれなら HiveJDBC をちゃんと修正した方が早かったかも。。orz
今度やってみよう。。
HDFS の HadoopThriftServer をなんとかする
hadoop の話題。その2
hadoop を支える HDFS には HDFS-APIを通すことで、プログラム中から HDFS の読み書きが出きるようになります。(たぶん、hdfs-s3 なんかもこのAPI経由(? ソース読んでない))
(中略)
んで、この HDFS-API のなかに、Thrift を使って リモート上から HDFS の読み書きをできるようにしている HadoopThriftServer(theiftfs) があります。
この thriftfs の起動は に書かれているのですが、shellを握ってしまうのでこんな感じにしました。
#!/usr/bin/env bash THRIFTFS_PID_FILE=$HADOOP_PID_DIR/thrift.pid THRIFTFS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log if [ -f $THRIFTFS_PID_FILE ]; then echo running as process `cat $THRIFTFS_PID_FILE`. stop it first exit 1 fi CLASSPATH=$HADOOP_CONF_DIR CLASSPATH=$CLASSPATH:$HADOOP_HOME/hadoop-0.20.1-core.jar:$HADOOP_HOME/hadoop-0.20.1-tools.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-1.0.4.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-api-1.0.4.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/log4j-1.2.15.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/contrib/thriftfs/hadoop-0.20.1-thriftfs.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/hadoopthriftapi.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/libthrift.jar nohup java -Dcom.sun.management.jmxremote -cp $CLASSPATH org.apache.hadoop.thriftfs.HadoopThriftServer 10010 > $THRIFTFS_LOG_FILE 2>&1 < /dev/null & echo $! > $THRIFTFS_PID_FILE $HADOOP_HOME/bin/hadoop dfsadmin -safemode leave
同様に 停止は
#!/usr/bin/env bash THRIFT_FS_PID_FILE=$HADOOP_PID_DIR/thrift.pid THRIFT_FS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log if [ -f $THRIFT_FS_PID_FILE ]; then kill `cat $THRIFT_FS_PID_FILE` rm $THRIFT_FS_PID_FILE else echo no pidfile $THRIFT_FS_PID_FILE fi
起動する際は dfs($HADOOP_HOME/bin/start-dfs.sh)が起動している状態で起動する必要があります。
(dfsadmin -safemode leave は適宜行ってください)
んで、このThriftFSは、こんな感じでリモート上の DFS のファイルを読み書きできます
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.thriftfs.api.Pathname; import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem; import org.apache.hadoop.thriftfs.api.ThriftHandle; import org.apache.hadoop.thriftfs.api.ThriftIOException; import com.facebook.thrift.TException; import com.facebook.thrift.protocol.TBinaryProtocol; import com.facebook.thrift.protocol.TProtocol; import com.facebook.thrift.transport.TSocket; import com.facebook.thrift.transport.TTransportException; public class HDFSInput { public static void main(String...args) { final int defaultBufferSize = config.getInt("io.file.buffer.size", 4096); final long defaultBlockSize = config.getLong("dfs.block.size", FSConstants.DEFAULT_BLOCK_SIZE); final short defaultReplication = (short) config.getInt("dfs.replication", 3); TSocket socket = new TSocket("master1", 10010); TProtocol protocol = new TBinaryProtocol(socket); try { socket.open(); try { ThriftHadoopFileSystem.Client client = new ThriftHadoopFileSystem.Client(protocol); // client timeout 5 sec client.setTimeout(5); Pathname hoge = new Pathname("/tmp/hoge"); FsPermission permission = FsPermission.createImmutable((short) 0655); boolean overwrite = true; ThriftHandle writeHandler = client.createFile(hoge, permission.toShort(), overwrite, defaultBufferSize, defaultReplication, defaultBlockSize); client.write(writeHandler, "hello world"); client.close(writeHandler); ThriftHandle readHandler = client.open(hoge); System.out.println(client.read(readHandler, 0, 1024)); client.close(readHandler); } catch (TTransportException e) { e.printStackTrace(); } catch (ThriftIOException e) { e.printStackTrace(); } } catch (TException e) { e.printStackTrace(); } finally { socket.close(); } } }
read時にBufferedReaderにwrapするともう少し便利に読み書きできる(これは今度書く)
んで、読み書きできるようになったんだけど、どうも複数のクライアントから連続して読み書きをすると、整合性が取れなくなってしまう(?)のかエラーがでるようになった。
元のソースを読むと...なんともエレガントな。。。
ということで、java.util.concurrent.atomic を使って書き直してみた(ここが本題)
package org.apache.hadoop.thriftfs; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.thriftfs.api.Pathname; import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem; import org.apache.hadoop.thriftfs.api.ThriftHandle; import org.apache.hadoop.thriftfs.api.ThriftIOException; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StringUtils; import com.facebook.thrift.protocol.TBinaryProtocol; import com.facebook.thrift.server.TServer; import com.facebook.thrift.server.TThreadPoolServer; import com.facebook.thrift.transport.TServerSocket; import com.facebook.thrift.transport.TServerTransport; import com.facebook.thrift.transport.TTransportFactory; public class HadoopThriftServer extends ThriftHadoopFileSystem { static int serverPort = 0; // default port TServer server = null; public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift"); // HDFS glue Configuration conf; FileSystem fs; // stucture that maps each Thrift object into an hadoop object private AtomicLong nextId = new AtomicLong(new Random().nextLong()); private ConcurrentHashMap<Long, Object> hadoopHash = new ConcurrentHashMap<Long, Object>(); private Daemon inactivityThread = null; // Detect inactive session private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr private static volatile long inactivityRecheckInterval = 60 * 1000; private static volatile boolean fsRunning = true; private AtomicLong now = new AtomicLong(now()); // allow outsider to change the hadoopthrift path public void setOption(String key, String val) { } /** * Current system time. * @return current time in msec. */ static long now() { return System.currentTimeMillis(); } /** * getVersion * * @return current version of the interface. */ public String getVersion() { return "0.1"; } /** * shutdown * * cleanly closes everything and exit. */ public void shutdown(int status) { LOG.info("HadoopThriftServer shutting down."); try { fs.close(); } catch (IOException e) { LOG.warn("Unable to close file system"); } Runtime.getRuntime().exit(status); } /** * Periodically checks to see if there is inactivity */ class InactivityMonitor implements Runnable { public void run() { while (fsRunning) { try { if (now() > now.get() + inactivityPeriod) { LOG.warn("HadoopThriftServer Inactivity period of " + inactivityPeriod + " expired... Stopping Server."); shutdown(-1); } } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } try { Thread.sleep(inactivityRecheckInterval); } catch (InterruptedException ie) { } } } } /** * HadoopThriftServer * * Constructor for the HadoopThriftServer glue with Thrift Class. * * @param name - the name of this handler */ public HadoopThriftHandler(String name) { conf = new Configuration(); now.set(now()); try { inactivityThread = new Daemon(new InactivityMonitor()); fs = FileSystem.get(conf); } catch (IOException e) { LOG.warn("Unable to open hadoop file system..."); Runtime.getRuntime().exit(-1); } } /** * printStackTrace * * Helper function to print an exception stack trace to the log and not stderr * * @param e the exception * */ static private void printStackTrace(Exception e) { for(StackTraceElement s: e.getStackTrace()) { LOG.error(s); } } /** * Lookup a thrift object into a hadoop object */ private synchronized Object lookup(long id) { return hadoopHash.get(new Long(id)); } /** * Insert a thrift object into a hadoop object. Return its id. */ private synchronized long insert(Object o) { long next = nextId.incrementAndGet(); hadoopHash.put(next, o); return next; } /** * Delete a thrift object from the hadoop store. */ private synchronized Object remove(long id) { return hadoopHash.remove(new Long(id)); } /** * Implement the API exported by this thrift server */ /** Set inactivity timeout period. The period is specified in seconds. * if there are no RPC calls to the HadoopThrift server for this much * time, then the server kills itself. */ public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) { inactivityPeriod = periodInSeconds * 1000; // in milli seconds if (inactivityRecheckInterval > inactivityPeriod ) { inactivityRecheckInterval = inactivityPeriod; } } /** * Create a file and open it for writing */ public ThriftHandle create(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("create: " + path); FSDataOutputStream out = fs.create(new Path(path.pathname)); long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Create a file and open it for writing, delete file if it exists */ public ThriftHandle createFile(Pathname path, short mode, boolean overwrite, int bufferSize, short replication, long blockSize) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("create: " + path + " permission: " + mode + " overwrite: " + overwrite + " bufferSize: " + bufferSize + " replication: " + replication + " blockSize: " + blockSize); FSDataOutputStream out = fs.create(new Path(path.pathname), new FsPermission(mode), overwrite, bufferSize, replication, blockSize, null); // progress long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Opens an existing file and returns a handle to read it */ public ThriftHandle open(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("open: " + path); FSDataInputStream out = fs.open(new Path(path.pathname)); long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Opens an existing file to append to it. */ public ThriftHandle append(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("append: " + path); FSDataOutputStream out = fs.append(new Path(path.pathname)); long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * write to a file */ public boolean write(ThriftHandle tout, String data) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("write: " + tout.id); FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id); byte[] tmp = data.getBytes("UTF-8"); out.write(tmp, 0, tmp.length); HadoopThriftHandler.LOG.debug("wrote: " + tout.id); return true; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * read from a file */ public String read(ThriftHandle tout, long offset, int length) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("read: " + tout.id + " offset: " + offset + " length: " + length); FSDataInputStream in = (FSDataInputStream)lookup(tout.id); if (in.getPos() != offset) { in.seek(offset); } byte[] tmp = new byte[length]; int numbytes = in.read(offset, tmp, 0, length); HadoopThriftHandler.LOG.debug("read done: " + tout.id); return new String(tmp, 0, numbytes, "UTF-8"); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Delete a file/directory */ public boolean rm(Pathname path, boolean recursive) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("rm: " + path + " recursive: " + recursive); boolean ret = fs.delete(new Path(path.pathname), recursive); HadoopThriftHandler.LOG.debug("rm: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Move a file/directory */ public boolean rename(Pathname path, Pathname dest) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("rename: " + path + " destination: " + dest); boolean ret = fs.rename(new Path(path.pathname), new Path(dest.pathname)); HadoopThriftHandler.LOG.debug("rename: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * close file */ public boolean close(ThriftHandle tout) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("close: " + tout.id); Object obj = remove(tout.id); if (obj instanceof FSDataOutputStream) { FSDataOutputStream out = (FSDataOutputStream)obj; out.close(); } else if (obj instanceof FSDataInputStream) { FSDataInputStream in = (FSDataInputStream)obj; in.close(); } else { throw new ThriftIOException("Unknown thrift handle."); } HadoopThriftHandler.LOG.debug("closed: " + tout.id); return true; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Create a directory */ public boolean mkdirs(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("mkdirs: " + path); boolean ret = fs.mkdirs(new Path(path.pathname)); HadoopThriftHandler.LOG.debug("mkdirs: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Does this pathname exist? */ public boolean exists(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("exists: " + path); boolean ret = fs.exists(new Path(path.pathname)); HadoopThriftHandler.LOG.debug("exists done: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Returns status about the specified pathname */ public org.apache.hadoop.thriftfs.api.FileStatus stat( Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("stat: " + path); org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus( new Path(path.pathname)); HadoopThriftHandler.LOG.debug("stat done: " + path); return new org.apache.hadoop.thriftfs.api.FileStatus( stat.getPath().toString(), stat.getLen(), stat.isDir(), stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(), stat.getPermission().toString(), stat.getOwner(), stat.getGroup()); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * If the specified pathname is a directory, then return the * list of pathnames in this directory */ public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus( Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("listStatus: " + path); org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus( new Path(path.pathname)); HadoopThriftHandler.LOG.debug("listStatus done: " + path); org.apache.hadoop.thriftfs.api.FileStatus tmp; List<org.apache.hadoop.thriftfs.api.FileStatus> value = new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>(); for (int i = 0; i < stat.length; i++) { tmp = new org.apache.hadoop.thriftfs.api.FileStatus( stat[i].getPath().toString(), stat[i].getLen(), stat[i].isDir(), stat[i].getReplication(), stat[i].getBlockSize(), stat[i].getModificationTime(), stat[i].getPermission().toString(), stat[i].getOwner(), stat[i].getGroup()); value.add(tmp); } return value; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Sets the permission of a pathname */ public void chmod(Pathname path, short mode) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("chmod: " + path + " mode " + mode); fs.setPermission(new Path(path.pathname), new FsPermission(mode)); HadoopThriftHandler.LOG.debug("chmod done: " + path); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Sets the owner & group of a pathname */ public void chown(Pathname path, String owner, String group) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("chown: " + path + " owner: " + owner + " group: " + group); fs.setOwner(new Path(path.pathname), owner, group); HadoopThriftHandler.LOG.debug("chown done: " + path); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Sets the replication factor of a file */ public void setReplication(Pathname path, short repl) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("setrepl: " + path + " replication factor: " + repl); fs.setReplication(new Path(path.pathname), repl); HadoopThriftHandler.LOG.debug("setrepl done: " + path); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Returns the block locations of this file */ public List<org.apache.hadoop.thriftfs.api.BlockLocation> getFileBlockLocations(Pathname path, long start, long length) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path); org.apache.hadoop.fs.FileStatus status = fs.getFileStatus( new Path(path.pathname)); org.apache.hadoop.fs.BlockLocation[] stat = fs.getFileBlockLocations(status, start, length); HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path); org.apache.hadoop.thriftfs.api.BlockLocation tmp; List<org.apache.hadoop.thriftfs.api.BlockLocation> value = new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>(); for (int i = 0; i < stat.length; i++) { // construct the list of hostnames from the array returned // by HDFS List<String> hosts = new LinkedList<String>(); String[] hostsHdfs = stat[i].getHosts(); for (int j = 0; j < hostsHdfs.length; j++) { hosts.add(hostsHdfs[j]); } // construct the list of host:port from the array returned // by HDFS List<String> names = new LinkedList<String>(); String[] namesHdfs = stat[i].getNames(); for (int j = 0; j < namesHdfs.length; j++) { names.add(namesHdfs[j]); } tmp = new org.apache.hadoop.thriftfs.api.BlockLocation( hosts, names, stat[i].getOffset(), stat[i].getLength()); value.add(tmp); } return value; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } } // Bind to port. If the specified port is 0, then bind to random port. private ServerSocket createServerSocket(int port) throws IOException { try { ServerSocket sock = new ServerSocket(); // Prevent 2MSL delay problem on server restarts sock.setReuseAddress(true); // Bind to listening port if (port == 0) { sock.bind(null); serverPort = sock.getLocalPort(); } else { sock.bind(new InetSocketAddress(port)); } return sock; } catch (IOException ioe) { throw new IOException("Could not create ServerSocket on port " + port + "." + ioe); } } /** * Constrcts a server object */ public HadoopThriftServer(String [] args) { if (args.length > 0) { serverPort = new Integer(args[0]); } try { ServerSocket ssock = createServerSocket(serverPort); TServerTransport serverTransport = new TServerSocket(ssock); Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba"); ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler); TThreadPoolServer.Options options = new TThreadPoolServer.Options(); options.minWorkerThreads = 10; server = new TThreadPoolServer(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), options); System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]..."); HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]..."); System.out.flush(); } catch (Exception x) { x.printStackTrace(); } } public static void main(String [] args) { HadoopThriftServer me = new HadoopThriftServer(args); me.server.serve(); } }
安定した。気がする。。
いつか問題となるようなコードを書いてpatchを作ろう
Hive の Local Metastore に derby を使う
突然ですが、hadoop の話題
Hadoop Hive といえば、SQL 感覚で MapReduce できるんですが、Hive は SQL のように記述できるようにするために、metastore 形式でメタデータを管理してます。
そのあたりは、Hive/AdminManual/MetastoreAdmin - Hadoop Wiki や metastore_usage.pptxやHiveのmetastoreをMySQLを使ってLocal Metastore形式で利用する - blog.katsuma.tv などに詳しく書かれているので省略。
ただ、local metastore にいちいち mysql をセットアップするのもあれだったんで、今回は derby にしておきます
derby編
derby のインストール
Apache Derbyからダウンロードしてきます。とりあえず最新のを落としてくる
> wget http://..../db-derby-10.5.3.0-bin.tar.gz
> tar xzvf db-derby-10.5.3.0-bin.tar.gz
> sudo mv db-derby-10.5.3.0-bin /opt/local/derby
次に、データ保存用の /var/db/derby を作っておく
> sudo mkdir /var/db/derby > sudo chown nowel:users /var/db/derby
chown は derby 用のユーザでもいいので、実行ユーザに変えておく
環境変数の設定
環境変数名とかは、Hadoop のそれっぽくしておく
#!/usr/bin/env bash export DERBY_HOME=/opt/local/derby export DERBY_OPTS="-Dderby.drda.startNetworkServer=true -Dderby.drda.maxThreads=30 -Dderby.system.home=/var/db/derby -Dderby.drda.portNumber=1527 -Dderby.drda.host=0.0.0.0" export DERBY_LOG_DIR=/var/log
これを $HOME/.derby_profile とかしておいて、.bashrcなんかに
source $HOME/.derby_profile
と記述しておくといちいち source しなくて楽
起動用のスクリプトを用意
start/stop/status くらいは何度か確認することがあるので、こんな感じで用意
start.sh
#!/usr/bin/env bash logfile=$DERBY_LOG_DIR/derby.log nohup $DERBY_HOME/bin/NetworkServerControl start > $logfile 2>&1 < /dev/null &
stop.sh
#!/usr/bin/env bash
$DERBY_HOME/bin/NetworkServerControl shutdown
status.sh
#!/usr/bin/env bash
$DERBY_HOME/bin/NetworkServerControl sysinfo
$DERBY_HOME/bin/NetworkServerControl runtimeinfo
とこんな感じ。
起動と停止
さっき用意した start.sh ファイル群を $HOME/bin とかに置いているのであれば
> $HOME/bin/start.sh
で起動できる
ちなみに、derbyはstartNetworkServerというスクリプトが用意されているけど、素のまま起動すると、ネットワーク越しにアクセスできない(正確には起動した同一ホストからしかアクセスできない)
ネットワーク越しに利用するならstartNetworkServer -h 0.0.0.0とするか、derby.drda.host=0.0.0.0みたいな変数を利用する。
(ここで結構ハマった...とりあえず、開発用なら 0.0.0.0 で始めるといいかと)
Hive編
hadoop の mapred/hdfs などはセットアップ済みとして進めます。
hive のセットアップ
先に書いておくと、http://www.apache.org/dyn/closer.cgi/hadoop/hive/とかに置いてある hive-0.4.1 を使ってセットアップを進めても
FAILED: Error in metadata: org.datanucleus.jdo.exceptions.TransactionNotReadableException: Cant read fields outside of transactions. You may want to set 'NontransactionalRead=true'. FailedObject:1[OID]org.apache.hadoop.hive.metastore.model.MDatabase FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
が発生してなかなか前に進めなくなる。
Hive CLI の起動時に
hive > set javax.jdo.option.NontransactionalRead=true;
としておくことで、一時的になんとかなるけど、接続毎にやることになるので、おすすめはしません。
(jpox.properties を使うというのもあるけど、設定の二重管理になりそうだし、後述する新しいのでは必要なさそうなので今回は 0.4.1 を使わないという方向で進みます)
Hiveをtrunkからもってきてビルドする
たぶん、trunkに入ってるのは hive-0.5.x 系
> svn export http://svn.apache.org/repos/asf/hadoop/hive/trunk hive > cd hive > ant package > : > : (しばし待つ) > : > cp -r build/dist/lib/* lib > sudo mv hive /opt/local/hive-0.5.0-trunk
環境変数の設定
こんな感じで用意する
#!/usr/bin/env bash export HIVE_HOME=/opt/local/hive-0.5.0-trunk export HIVE_CONF_DIR=/home/hive/conf export HIVE_PID_DIR=/var/run export HIVE_LOG_DIR=/var/log
HIVE_CONF_DIRはHADOOP_CONF_DIRと違って、hive-default.xml を HIVE-HOME/conf から読んでくれない(?)ので、hive-default.xmlとhive-log4j.propertiesは HIVE_CONF_DIR にシンボリックリンクしておく
> ln -s $HIVE_HOME/conf/hive-default.xml $HIVE_CONF_DIR/hive-default.xml > ln -s $HIVE_HOME/conf/hive-log4j.properties $HIVE_CONF_DIR/hive-log4j.properties
hive-site.xmlを用意
$HIVE_CONF_DIR に hive-site.xml をこんな感じで用意
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.local</name> <value>true</value> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive/warehouse</value> </property> <property> <name>hive.metastore.rawstore.impl</name> <value>org.apache.hadoop.hive.metastore.ObjectStore</value> </property> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:derby://master1:1527/metastore;create=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>org.apache.derby.jdbc.ClientDriver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value><!-- !empty --></value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value><!-- !empty --></value> </property> <property> <name>datanucleus.autoCreateTables</name> <value>true</value> </property> </configuration>
Hive CLIの起動
とりあえず、derby の metastorage が使えるかどうかを確認するため、$HIVE_HOME/bin/hive で Hive CLI を起動して show tables する
> $HIVE_HOME/bin/hive hive> show tables; OK Time taken: 8.613 seconds hive> exit;
とここまで出れば derby で Local Metastore が使えるようになってます。
おわり
と、ここまで書いてみたけど、Hiveのwikiに同じようなものがあった orz
ref - HiveDerbyServerMode - Hadoop Wiki
2010/01/20
PHPで200行で作る memcached 互換サーバ
まさに誰得。
class MemcachedServer { protected $command; public function __construct(MemcachedCommand $command){ $this->command = $command; } public function start(){ $this->run(); } public function run(){ $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if(false === $socket){ $code = socket_last_error(); $msg = socket_strerror($code); throw new Exception(sprintf('socket_create was error(%s):%s', $code, $msg)); } socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1); $binded = @socket_bind($socket, 0, 11222); if(false === $binded){ $code = socket_last_error(); $msg = socket_strerror($code); throw new Exception(sprintf('socket_bind was error(%s):%s', $code, $msg)); } $listend = @socket_listen($socket); if(false === $listend){ $code = socket_last_error(); $msg = socket_strerror($code); throw new Exception(sprintf('socket_listen was error(%s):%s', $code, $msg)); } //socket_set_nonblock($socket); echo 'server start on tcp://0.0.0.0:11222', PHP_EOL; $read = array($socket); $write = null; $except = null; while(true){ $accept = @socket_accept($socket); if(false === $accept){ continue; } $handler = new MemcachedAcceptHandler($accept, $this->command); $handler->init(); $handler->execute(); $handler->destroy(); } } } interface StreamReadWrite { const DELIMITER = "\r\n"; public function readLine(); public function writeLine($str); } class MemcachedAcceptHandler implements StreamReadWrite { protected $socket; protected $command; protected $connected = true; public function __construct($socket, MemcachedCommand $command){ $this->socket = $socket; $this->command = $command; } public function __destruct(){ if(null !== $this->socket){ @socket_close($this->socket); unset($this->socket); } } public function init(){ echo 'new connection', PHP_EOL; //socket_set_nonblock($this->socket); } public function execute(){ while($this->connected){ $read = array($this->socket); $write = array(); $except = array(); $select = @socket_select($read, $write, $except, 1); if(false === $select){ throw new RuntimeException('socket_select'); } if($select < 1){ continue; } $line = $this->readLine(); if(null === $line){ continue; } // get hoge => get $mode = substr($line, 0, 3); // get hoge foo => array(hoge, foo) $args = explode(' ', substr($line, 4)); $this->command->call($this, $mode, $args); } } public function destroy(){ echo 'close connection', PHP_EOL; @socket_shutdown($this->socket); } public function readLine(){ $line = ''; while(true){ $buf = @socket_read($this->socket, 1); if(false === $buf || '' === $buf){ // FIXME!!! $this->connected = false; return null; } $line .= $buf; if(self::DELIMITER == substr($line, -2)){ $line = substr($line, 0, -2); break; } } if(empty($line)){ return null; } if(preg_match('/^\s+$/', $line)){ return null; } return $line; } public function writeLine($str){ return @socket_write($this->socket, $str . self::DELIMITER); } } interface MemcachedCommand { public function call(StreamReadWrite $reader, $mode, array $args); } abstract class AbstractMemcachedCommand implements MemcachedCommand { protected $reflector; public function __construct(){ $this->reflector = new ReflectionObject($this); } protected static function concat(array $a, array $b){ array_splice($a, count($a), 0, $b); return $a; } public final function call(StreamReadWrite $rw, $mode, array $args){ if(!$this->reflector->hasMethod($mode)){ return $this->error($rw); } echo 'command => ', $mode, ' ', join(' ', $args), PHP_EOL; return call_user_func_array(array($this, $mode), self::concat(array($rw), $args)); } protected function error($rw){ $rw->writeLine('ERROR'); } protected abstract function get(StreamReadWrite $rw, $keys); protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length); protected abstract function delete(StreamReadWrite $rw, $key, $expire = 0); } class StorageMemcacheCommand extends AbstractMemcachedCommand { protected $cache = array(); protected function get(StreamReadWrite $rw, $keys){ $args = func_get_args(); array_shift($args); foreach($args as $key){ if(!isset($this->cache[$key])){ continue; } $value = $this->cache[$key]; if($value->expire < time()){ continue; } $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->length)); $rw->writeLine($value->value); } $rw->writeLine('END'); } protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){ $value = new stdClass; $value->flag = $flag; $value->expire = time() + $expire; $value->length = $length; $value->value = $rw->readLine(); $this->cache[$key] = $value; $rw->writeLine('STORED'); } protected function delete(StreamReadWrite $rw, $key, $expire = 0){ if(isset($this->cache[$key])){ $this->cache[$key]->expire = $expire; } } } $server = new MemcachedServer(new StorageMemcacheCommand); $server->start();
とりあえず、set/get/deleteだけ。threadとか色々ないので、実際には削除されない。。
他にも構文解析とかしてない(しなくていいくらい memcache はシンプルなんだけど)ので上手いことハンドリングしないといけない。。
同時アクセスに難あり。。
(中略)
などなど、色々あるので、開発用にしか向いてない。というか、よっぽどストイックにPHPを愛している人じゃないと向いていないかも。
まぁ最近は色々memcached互換系のがでてきたので、クライアント作成時のdebug用途向けかも。
今回は、PHPのsocket 関数をゴリゴリ使っています。
blocking modeとかはちょっと動きが不安定(?)なので、あまり性能はでない
c-lang版と比較を、この前のスクリプトを使ってみた
$target = array( array('host' => 'localhost', 'port' => 11211), array('host' => 'localhost', 'port' => 11222) ); foreach($target as $t){ $memcache = new Memcache; $memcache->connect($t['host'], $t['port']); $fail = 0; $fails = array(); $elapsed = microtime(true); $count = 500; for($i = 0; $i < $count; ++$i){ if(false === $memcache->set('hoge', 1234, 0, 10)){ echo 'ERROR!!', PHP_EOL; } if(false === $memcache->set('hoge', 123, 0, 10)){ echo 'ERROR!!!', PHP_EOL; } $value = (int) $memcache->get('hoge'); if(false === $memcache->set('hoge', ((int)$value + 1), 0, 10)){ echo 'ERROR!!!!', PHP_EOL; } $result = (int)$memcache->get('hoge'); if($result != 124){ $fails[] = $result; $fail++; } } echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL; echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL; echo 'fail => ', $fail, PHP_EOL; }
target host => localhost port =>11211 elapsed: 0.634283065796 fail => 0 target host => localhost port =>11222 elapsed: 1.12030887604 fail => 0
とまあ、こんな感じ。(そこそこ?)
ちなみに、stream_socketを使って書いてみたものも置いておく。fwriteまわりでハマったので動かないと思うけど。
// 動かない>< class MemcachedServer { protected $command; public function __construct(MemcachedCommand $command){ $this->command = $command; } public function start(){ $this->run(); } public function run(){ $socket = stream_socket_server('tcp://0.0.0.0:11222', $code, $msg, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN); if(false === $socket){ throw new RuntimeException(sprintf('stream_socket_server was error(%s):%s', $code, $msg)); } $nonBlocking = stream_set_blocking($socket, 0); if(false === $nonBlocking){ throw new RuntimeException('stream_set_blocking: set blocking mode error'); } echo 'server start on tcp://0.0.0.0:11222', PHP_EOL; $read = array($socket); $write = null; $except = null; while(true){ $accept = @stream_socket_accept($socket, 1); if(false === $accept){ continue; } $handler = new MemcachedAcceptHandler($accept, $this->command); $handler->init(); $handler->execute(); $handler->destroy(); } } } interface StreamReadWrite { const DELIMITER = "\r\n"; public function readLine(); public function writeLine($str); } class MemcachedAcceptHandler implements StreamReadWrite { protected $socket; protected $command; public function __construct($socket, MemcachedCommand $command){ $this->socket = $socket; $this->command = $command; } public function __destruct(){ if(null !== $this->socket){ fclose($this->socket); unset($this->socket); } } public function init(){ echo 'new connection', PHP_EOL; stream_set_blocking($this->socket, true); stream_set_timeout($this->socket, 60); // disable writebuffer stream_set_write_buffer($this->socket, 0); } protected function feof(){ $recv = stream_socket_recvfrom($this->socket, 1, STREAM_PEEK); return strlen($recv) < 1; } public function execute(){ while(true){ $read = array($this->socket); $write = array(); $except = array(); $select = @stream_select($read, $write, $except, 1); if(false === $select){ throw new RuntimeException('stream_select'); } if($select < 1){ continue; } $line = $this->readLine(); echo 'line => ', $line, PHP_EOL; if(null === $line){ return; } // get hoge => get $mode = substr($line, 0, 3); // get hoge foo => array(hoge, foo) $args = explode(' ', substr($line, 4)); $this->command->call($this, $mode, $args); } } public function destroy(){ echo 'close connection', PHP_EOL; // socket_close($socket) stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR); } public function readLine(){ // 1048576 = 1024 * 1024 //socket_read($socket, 1048576, PHP_NORMAL_READ $line = stream_get_line($this->socket, 2048, self::DELIMITER); if(empty($line)){ return null; } if(preg_match('/^\s+$/', $line)){ return null; } return $line; } public function writeLine($str){ fputs($this->socket, $str . "\n\0"); } } interface MemcachedCommand { public function call(StreamReadWrite $reader, $mode, array $args); } abstract class AbstractMemcachedCommand implements MemcachedCommand { protected $reflector; public function __construct(){ $this->reflector = new ReflectionObject($this); } protected static function concat(array $a, array $b){ array_splice($a, count($a), 0, $b); return $a; } public final function call(StreamReadWrite $rw, $mode, array $args){ if(!$this->reflector->hasMethod($mode)){ return $this->error($rw); } return call_user_func_array(array($this, $mode), self::concat(array($rw), $args)); } protected function error($rw){ $rw->writeLine('ERROR'); } protected abstract function get(StreamReadWrite $rw, $keys); protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length); } class StorageMemcacheCommand extends AbstractMemcachedCommand { protected $cache = array(); protected function get(StreamReadWrite $rw, $keys){ $args = func_get_args(); array_shift($args); foreach($args as $key){ if(!isset($this->cache[$key])){ continue; } $value = $this->cache[$key]; $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->expire)); $rw->writeLine($value->value); } $rw->writeLine('END'); } protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){ $value = new stdClass; $value->flag = $flag; $value->expire = $expire; $value->length = $length; $value->value = $rw->readLine(); $this->cache[$key] = $value; $rw->writeLine('STORED'); } } $server = new MemcachedServer(new StorageMemcacheCommand); $server->start();
memcacheの cache の部分を java で(その2)
前回の続き。
前回の結果、とりあえず、DelayQueueによって期限切れのEntryは取り出せるようになった。
だけど、この仕組みの状態では期限切れになったEntryは容赦なく消えていってしまう。
つまり、そのEntryがまだ利用されるかもしれないのに、消えてしまうのは(色々な意味で)もったいない。
特に、javaだとHashMapとかのloadFactorまわりの動きは(きっと)もったいない
もう一度LRUについてwikipediaに確認すると
Least Recently Used (LRU) はキャッシュメモリや仮想メモリが扱うデータのリソースへの割り当てを決定するアルゴリズムである。対義語はMost Recently Used (MRU)。
和訳すると「最近最も使われなかったもの」つまり「使われてから最も長い時間が経ったもの」「参照される頻度が最も低いもの」である。
ということなので(?)、ホントに使われ無かったもの(アクセスが少ないもの)から順番に消えるように考えてみた。
その結果がこれ
public class LRUCache implements CacheLifeCycle { private static final Log log = LogFactory.getLog(LRUCache.class); protected final ConcurrentMap<String, PrioritalEntry> cache = new ConcurrentHashMap<String, PrioritalEntry>(); protected final DelayQueue<PrioritalEntry> expiredQueue = new DelayQueue<PrioritalEntry>(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock writeLock = lock.writeLock(); private final Lock readLock = lock.readLock(); public void register(LifeCycleExecutor executor){ executor.add(this, expiredQueue); } public void purge(PrioritalEntry entry){ writeLock.lock(); try { // priorityを下げる if(entry.decrementPriority() < 1){ // 1以下なら誰も使ってなさそうなので消す if(log.isDebugEnabled()){ log.debug("purge entry => " + entry); } cache.remove(entry.getKey()); return; } // もう一度チャンス entry.setExpiredAt(retransmission(entry.getPriority())); expiredQueue.offer(entry); } finally { writeLock.unlock(); } } public boolean set(String key, String value, long expiredAt){ writeLock.lock(); try { PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt); PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry); // 既存の値がない if(null == previousEntry){ // ということは新しい値 return expiredQueue.offer(newEntry); } previousEntry.setValue(value); previousEntry.setExpiredAt(expiredAt); previousEntry.incrementPriority(); return true; } finally { writeLock.unlock(); } } public PrioritalEntry get(String key) { readLock.lock(); try { if(!cache.containsKey(key)){ return null; } PrioritalEntry entry = cache.get(key); // 時間切れなので見えなくする if(entry.isExpired()){ return null; } // 使う人がいたのでpriorityをあげる entry.incrementPriority(); return entry; } finally { readLock.unlock(); } } public void remove(String key) { remove(key, 0); } public void remove(String key, long expiredAt){ writeLock.lock(); try { PrioritalEntry entry = cache.get(key); if(null != entry){ entry.setExpiredAt(expiredAt); entry.decrementPriority(); } } finally { writeLock.unlock(); } } public void flush() { flush(0); } public void flush(final long expiredAt){ writeLock.lock(); try { Iterator<Map.Entry<String, PrioritalEntry>> entries = cache.entrySet().iterator(); while(entries.hasNext()){ final Map.Entry<String, PrioritalEntry> entry = entries.next(); final PrioritalEntry value = entry.getValue(); value.setExpiredAt(expiredAt); } } finally { writeLock.unlock(); } } protected static long retransmission(final int currentPriority){ if(currentPriority < PrioritalEntry.MAX_PRIORITY){ // binary exponential backoff long freq = currentPriority + Math.round(Math.pow(2, currentPriority)); return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority); } return retransmission(PrioritalEntry.MAX_PRIORITY - 1); } }
んで、PrioritalEntryの実装はこれ
public class PrioritalEntry implements Delayed { public static final int MAX_PRIORITY = 10; public static final int DEFAULT_PRIORITY = 5; private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY); private final String key; private final AtomicReference<String> value; private final AtomicLong expiredAt; public PrioritalEntry(final String key, final String value, final long expiredAt){ this.key = key; this.value = new AtomicReference<String>(value); if(0 == expiredAt){ this.expiredAt = new AtomicLong(Long.MAX_VALUE); } else if(expiredAt < 0) { this.expiredAt = new AtomicLong(0); } else { this.expiredAt = new AtomicLong(relative(expiredAt)); } } protected static long relative(long expiredAt){ // 現在時間 + 指定時間 return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt); } protected static long absolute(long expiredAt){ return TimeUnit.SECONDS.toMillis(expiredAt); } public int incrementPriority(){ if(MAX_PRIORITY < priority.get()){ return MAX_PRIORITY; } return priority.incrementAndGet(); } public int decrementPriority(){ return priority.decrementAndGet(); } public int getPriority(){ return priority.get(); } public String getKey(){ return key; } public String getValue(){ return value.get(); } public void setValue(String value){ this.value.set(value); } public void setExpiredAt(long newValue){ // 現在時間より次の期限を設定 this.expiredAt.set(relative(newValue)); } private long elapsed(){ return expiredAt.get() - System.currentTimeMillis(); } public boolean isExpired(){ return elapsed() < 1; } public long getDelay(TimeUnit unit) { // expiredの時間から経過時間を引き、残り時間を算出 return unit.convert(elapsed(), TimeUnit.MILLISECONDS); } public int compareTo(Delayed o) { PrioritalEntry target = (PrioritalEntry) o; final long x = expiredAt.get(); final long y = target.expiredAt.get(); if(x < y){ return -1; } if(x > y){ return 1; } return 0; } public String toString(){ StringBuilder buf = new StringBuilder(); buf.append("key=").append(key).append(","); buf.append("value=").append(value).append(","); buf.append("priority=").append(priority); return buf.toString(); } }
他にも
public interface CacheLifeCycle { public void register(LifeCycleExecutor executor); public void purge(PrioritalEntry entry); } public class LifeCycleExecutor { protected final ExecutorService executor; public LifeCycleExecutor(final ExecutorService executor){ this.executor = executor; } public void add(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){ executor.execute(new Monitor(lifeCycle, queue)); } public void shutdown(){ executor.shutdown(); try { if(!executor.awaitTermination(10, TimeUnit.SECONDS)){ executor.shutdownNow(); } } catch(InterruptedException e){ e.printStackTrace(System.err); } } private static class Monitor implements Runnable { private static final Log log = LogFactory.getLog(Monitor.class); private final CacheLifeCycle lifeCycle; private final BlockingQueue<PrioritalEntry> queue; private Monitor(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){ this.lifeCycle = lifeCycle; this.queue = queue; } public void run(){ try { while(true){ PrioritalEntry entry = queue.take(); if(log.isDebugEnabled()){ log.debug("find entry => " + entry); } lifeCycle.purge(entry); } } catch(InterruptedException e){ } } } }
細かく解説していく(誰に?)と
まず、PrioritalEntry(スペルとか意味は気にしない)の実装側から
ほとんどの実装は前回のと同じ。
今回は他にpriority(優先度)も一緒に持ってみた
インスタンス生成時は
public static final int MAX_PRIORITY = 10; public static final int DEFAULT_PRIORITY = 5; private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY);
で作られるんだけど、LRUCacheのgetとかの呼び出しが行われる毎に、priorityをincrementしてる
// class LRUCache public PrioritalEntry get(String key) { readLock.lock(); try { if(!cache.containsKey(key)){ return null; } PrioritalEntry entry = cache.get(key); // 時間切れなので見えなくする if(entry.isExpired()){ return null; } // 使う人がいたのでpriorityをあげる entry.incrementPriority(); return entry; } finally { readLock.unlock(); } } // class PrioritalEntry public int incrementPriority(){ if(MAX_PRIORITY < priority.get()){ return MAX_PRIORITY; } return priority.incrementAndGet(); } public int decrementPriority(){ return priority.decrementAndGet(); } public int getPriority(){ return priority.get(); }
これで、最も "使われる頻度の高い" Entryは "priorityが高い" という状態を作り出してる。
その他にも、setで値が更新される際にも、既に同じkeyでEntryが作られてたらその部分を "再利用" してみた
public boolean set(String key, String value, long expiredAt){ writeLock.lock(); try { PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt); PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry); // 既存の値がない if(null == previousEntry){ // ということは新しい値 return expiredQueue.offer(newEntry); } previousEntry.setValue(value); previousEntry.setExpiredAt(expiredAt); previousEntry.incrementPriority(); return true; } finally { writeLock.unlock(); } }
ということで、priorityという値によって最近使われているかどうかまで、表現できたので、次は値の削除について
public void purge(PrioritalEntry entry){ writeLock.lock(); try { // priorityを下げる if(entry.decrementPriority() < 1){ // 1以下なら誰も使ってなさそうなので消す if(log.isDebugEnabled()){ log.debug("purge entry => " + entry); } cache.remove(entry.getKey()); return; } // もう一度チャンス entry.setExpiredAt(retransmission(entry.getPriority())); expiredQueue.offer(entry); } finally { writeLock.unlock(); } }
purgeというメソッドでは、LifeCycleExecutorによって、DelayQueueの中身が取り出された "期限切れ" の Entry が引数に渡される
public void run(){ try { while(true){ PrioritalEntry entry = queue.take(); if(log.isDebugEnabled()){ log.debug("find entry => " + entry); } lifeCycle.purge(entry); } } catch(InterruptedException e){ } }
期限切れの Entry であっても、すぐに削除は行わずに "再利用" されるチャンスを与えるため、期限切れの Entry であっても Delay Queue に再送(retransmission)している。
ただ単純に再送するのではなく、再送のアルゴリズムとして昔どこかで覚えた "binary exponential backoff" を使ってみた(うろ覚えバージョン)
// binary exponential backoff long freq = currentPriority + Math.round(Math.pow(2, currentPriority)); return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority);
これを実行してみるとこんな時間が求められる
retransmission(0) => 1 retransmission(1) => 3 retransmission(2) => 5 retransmission(3) => 10 retransmission(4) => 19 retransmission(5) => 33 retransmission(6) => 62 retransmission(7) => 119 retransmission(8) => 232 retransmission(9) => 457 : : :
これは再送時間を作るときに、本来ならリトライ回数を使うんだけど(リトライの数が多いほど、次回の再送時間までの待ちが増える、よってこまめに再送させない)、ここでは priority を使用している。
これによって、priorityが高いものほど、時間切れでも生存時間を多くすることができる。これによって再利用のチャンスが増えるはず。(priorityが高い = 最も再利用される可能性が高そう)
ということで、簡単に動かしてみる
LRUCache cache = new LRUCache(); LifeCycleExecutor executor = new LifeCycleExecutor(Executors.newCachedThreadPool()); cache.register(executor); cache.set("hoge", "123", 5); cache.set("foo", "456", 2); cache.set("bar", "789", 2); cache.get("foo"); try { TimeUnit.SECONDS.sleep(120); } catch(InterruptedException e){ } executor.shutdown();
で、これを動かしてみると。。。
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=6 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=5 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=5 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=4 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=4 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=3 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=3 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=5 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=2 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=2 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=1 LRUCache: purge entry => key=bar,value=789,priority=0 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=1 LRUCache: purge entry => key=hoge,value=123,priority=0 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=4 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=3 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=2 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=1 LRUCache: purge entry => key=foo,value=456,priority=0
アクセスのあった "foo" キーは、 同じタイムアウトが設定されている "bar" よりも後に削除されている。
これでなんちゃって LRU はできた(?)
ちなみに、writeLockとかreadLockなどはたぶん使ってない。というか使わなくてもいいはず。
ちなみに、本家 memcached は slab allocationとかを使っているので、メモリの効率からいうとそっちの方がいいかも。
こういった部分を効率よく切り替えれるようにしたいものです
次回は、分散ハッシュ(というか複数台で連携)など。
2010/01/17
最近の我が家の電源まわり事情
最近、我が家の電源まわりが結構すっきりしてきたので、一部紹介しようかと思います
電源タップがすっきりした!
以前はごちゃごちゃしていた、電源まわりが↓のように非常にすっきりしてます。
というのも、ここ最近あらゆる電源まわりがUSB(mini-usb/micro-usb)経由で充電できるものが増えてきたため、USBハブに電源まわりを一元化させてます。
(タコ足ならぬタコUSB状態)
ネット環境もイーモバのPocketWifiにすることで、大きかった(十分小さいけど)、バッファローのルータなどはおさらばです。
でも、その代わりにUSBで電源を供給できるようになった!
100円ショップを活用
ほぼ全てのガジェット関係はUSBから電源を供給できるようになっているものの、一部、付属してなかったりオプションだったりするのですが
最近の100円ショップは進化してます。
USB部分が、mini-usb化されていて、必要なオプションだけ選んで買えというスタイル。
(先端の小さいフタみたいのを個別に買ってくる。これでDSもUSBから電源を供給している)
そうそう。こんな感じのが100円ショップで売ってる
しかも100円ショップでも、1メートル級のUSBも売っていたりすので、電源から遠い場所でも安心!
(びよーん。比較にipod mini置いてみた)
なんでもUSB
ただ、100円ショップは微妙に品揃えが悪かったりする。例えば、iPod関係のUSBポートが売ってなかったりする。。
そんな時はドンキホーテ。100円ショップに匹敵かそれ以上のUSB関係の機器が売ってます。
(これはドンキで買ってきた。iPod/iPod Touch用とiPod mini用)
もう、USBまわりを調整するなら100円ショップとドンキがあれば確実です。
常に動かすものも、USBで
ちょっと例が悪いかもしれないですが、最近池袋にあるゲーセンのUFOキャッチャーで取れたこれ。
(画像が汚いけど、これ・・加湿器なんだぜ・・・)
夏は、扇風機もUSB経由になることは、すでに予想されます
(扇風機もUSB・・・卓上用だけど・・・)
高度にUSB化された家電ライフを目指して
たぶん、100円ショップとドンキホーテの企業努力で、今後とも我が家の電源は、ありとあらゆるものはUSB化されていくと思います。
engadgetで読んだけど、"電源から直接USB"とか"49ポートUSB"のが日本でもぞくぞく登場すると嬉しいなぁ。
今は、USBハブ(4ポートとか)をタコHub化してて使い勝手が悪い。
ってか、たぶん、正しいUSBHubはこんな感じだと思う
(家中のUSBメモリ指してみました(合計4+8+2GB)。思ったより少なかった)
では、Happy USB Life を!
2010/01/15
memcacheの cache の部分を java で
指定時間後に消える(というか参照出来なくなる)をやってみることに。
DelayQueueとDelayedで実装してみる。
Cacheの部分
public class RetireCache implements Cache { protected final Map<String, PeriodEntry> cache = new ConcurrentHashMap<String, PeriodEntry>(); protected final DelayQueue<PeriodEntry> expiredQueue = new DelayQueue<PeriodEntry>(); public boolean set(String key, String value, long expiredAt) { check(); final PeriodEntry entry = new PeriodEntry(key, value, expiredAt); cache.put(key, entry); return expiredQueue.add(entry); } public Entry get(String key) { if(!cache.containsKey(key)){ return null; } PeriodEntry entry = cache.get(key); if(entry.isExpired()){ return null; } return entry; } public void remove(String key) { remove(key, 0); } public void remove(String key, long expiredAt){ PeriodEntry entry = cache.get(key); if(null != entry){ entry.setExpiredAt(expiredAt); } } public void flush() { flush(0); } public void flush(long expiredAt){ Iterator<Map.Entry<String, PeriodEntry>> entries = cache.entrySet().iterator(); while(entries.hasNext()){ Map.Entry<String, PeriodEntry> entry = entries.next(); entry.getValue().setExpiredAt(expiredAt); } } protected void check(){ synchronized(this){ PeriodEntry e = null; while((e = expiredQueue.poll()) != null){ cache.remove(e.getKey()); } } } }
mapにputするときは、同じentryのインスタンスをqueueにも入れておく
getするときには、isExpiredを見て期限切れはnullを返す。
なんとなく、期限切れのqueueの参照にスレッドを立てるのもあれなので、setの時にentryの削除をやってみる
Entryの部分
public class PeriodEntry implements Entry, Delayed { protected final String key; protected final String value; protected long expiredAt; public PeriodEntry(final String key, final String value, final long expiredAt){ this.key = key; this.value = value; if(0 == expiredAt){ this.expiredAt = Long.MAX_VALUE; } else if(expiredAt < 0) { this.expiredAt = 0; } else { this.expiredAt = relative(expiredAt); } } protected static long relative(long expiredAt){ // 現在時間 + 指定時間 return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt); } protected static long absolute(long expiredAt){ return TimeUnit.SECONDS.toMillis(expiredAt); } public String getKey(){ return key; } public String getValue(){ return value; } public void setExpiredAt(long newValue){ // 現在時間より次の期限を設定 this.expiredAt = relative(newValue); } private long elapsed(){ return expiredAt - System.currentTimeMillis(); } public boolean isExpired(){ return elapsed() < 1; } public long getDelay(TimeUnit unit) { // expiredの時間から経過時間を引き、残り時間を算出 return unit.convert(elapsed(), TimeUnit.MILLISECONDS); } public int compareTo(Delayed o) { PeriodEntry target = (PeriodEntry) o; if(expiredAt < target.expiredAt){ return -1; } if(expiredAt > target.expiredAt){ return 1; } return 0; } }
entryは基本的に相対的な時間を設定するようにしてる以外は、普通のでDelayedな実装で。
テスト
public class RetireCacheTest { @Test public void 指定時間で参照出来なくなる(){ Cache cache = new RetireCache(); cache.set("hoge", "hogeValue", 2); Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue"); try { TimeUnit.SECONDS.sleep(2); } catch(InterruptedException e){ } Assert.assertNull("参照できなくなってる", cache.get("hoge")); } @Test public void 指定時間後に消えてる(){ Cache cache = new RetireCache(); cache.set("hoge", "hogeValue", 2); try { TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e){ } Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue"); cache.remove("hoge", 5); try { TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e){ } Assert.assertEquals("これは消えてるのが正解?", cache.get("hoge").getValue(), "hogeValue"); } @Test public void ゼロを指定すると消えない(){ Cache cache = new RetireCache(); cache.set("hoge", "hogeValue", 0); cache.set("foo", "fooValue", 1); try { TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e){ } Assert.assertNull("これは1秒後消えてる", cache.get("foo")); Assert.assertEquals("これは消えない", cache.get("hoge").getValue(), "hogeValue"); } @Test public void キーがない(){ Cache cache = new RetireCache(); cache.set("hoge", "1", 0); Assert.assertNull(cache.get("foo")); } : : 省略 : }
memcacheのdeleteと一時実装が違うかな。本物は、delete key time でtimeにどんな時間を指定してもすぐに参照出来なくなる
時間を指定しない場合
set hoge 0 30 4 1234 STORED delete hoge DELETED get hoge END
消える時間を指定しても...
set hoge 0 30 4 1234 STORED get hoge VALUE hoge 0 4 1234 END delete hoge 10000 DELETED get hoge END
なかなか難しい
次は LRU
2010/01/13
NIO にしたら爆速になった(was: なんちゃってmemcache互換サーバ)
昨日の続き。
server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった
メイン
public class Server extends Thread { protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>(); protected final ExecutorService acceptPool; protected final Cache cache; protected final int port; protected final int maxConnection; public Server(final int port, final int maxConnection){ this(port, maxConnection, ByteSizeUnit.Mega.toLong(64)); } public Server(final int port, final int maxConnection, final long maxMemory){ this.port = port; this.maxConnection = maxConnection; this.cache = new LRUCache(maxMemory); this.acceptPool = Executors.newFixedThreadPool(maxConnection); } public static void main(String...args){ Server s = new Server(12221, 32); s.start(); try { s.join(); } catch(InterruptedException e){ e.printStackTrace(System.err); } } public void run(){ ServerSocketChannel channel = null; try { channel = ServerSocketChannel.open(); channel.configureBlocking(false); final ServerSocket serverSocket = channel.socket(); serverSocket.setReuseAddress(true); serverSocket.bind(new InetSocketAddress(port)); final Selector selector = Selector.open(); try { channel.register(selector, SelectionKey.OP_ACCEPT, new AcceptAction(cache)); while(0 < selector.select()){ Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ final SelectionKey key = keys.next(); keys.remove(); if(!key.isValid()){ continue; } Action action = (Action) key.attachment(); action.execute(key); } } } finally { Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ final SelectionKey key = keys.next(); key.channel().close(); } } } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if(null != channel){ try { channel.close(); } catch(IOException e){ e.printStackTrace(); } } } } }
新規接続を受け入れるAccept部分
public class AcceptAction implements Action { private final Cache cache; public AcceptAction(final Cache cache){ this.cache = cache; } public void execute(SelectionKey selectionKey) { SocketChannel channel = null; try { channel = ((ServerSocketChannel) selectionKey.channel()).accept(); channel.configureBlocking(false); ReadAction action = new ReadAction(cache, new ByteBufferReader(channel)); channel.register(selectionKey.selector(), SelectionKey.OP_READ, action); } catch (IOException e) { e.printStackTrace(); } } }
んで、読み込みと書き込みは分けた
public class ReadAction implements Action { protected final Cache cache; protected final Reader reader; protected final Handler handler; public ReadAction(final Cache cache, final Reader reader){ this.cache = cache; this.reader = reader; this.handler = new Handler(cache, reader); } public void execute(SelectionKey selectionKey) { if(!selectionKey.isReadable()){ return; } SocketChannel channel = (SocketChannel) selectionKey.channel(); try { if(!reader.readable()){ channel.close(); selectionKey.cancel(); return; } String line = reader.readLine(); if(line == null){ channel.close(); selectionKey.cancel(); return; } Return r = handler.execute(line); WriteAction action = new WriteAction(r, this); channel.register(selectionKey.selector(), SelectionKey.OP_WRITE, action); } catch(IOException e){ e.printStackTrace(); } } private static class Handler implements CommandVisitor { private final Cache cache; private final Reader reader; private Handler(final Cache cache, final Reader reader){ this.cache = cache; this.reader = reader; } public Return execute(String line){ try { final StringReader r = new StringReader(line); final MemcacheParser parser = new MemcacheParser(r); Command command = parser.Command(); return command.accept(this, null); } catch(ParseException e){ e.printStackTrace(); return new Return(ResponseType.ERROR); } catch(Exception e){ e.printStackTrace(); return new Return(ResponseType.SERVER_ERROR, e.getMessage()); } } : : // 省略 : public Return visit(RetrievalCommand command, Parameter parameter) { final List<String> keys = command.getKeys(); final List<Return> returns = new ArrayList<Return>(); for(final String key: keys){ Entry entry = cache.get(key); if(null == entry){ returns.add(new Return(ResponseType.END)); continue; } returns.add(new Return(ResponseType.SEND_VALUE, key, entry.getFlag(), entry.getLength(), entry.getValue() )); } return new Return(returns.toArray(new Return[returns.size()])); } public Return visit(SetCommand command, Parameter parameter) { try { final String nextLine = reader.readLine(); if(cache.set(command.getKey(), nextLine, command.getFlags(), command.getExpTime())){ return new Return(ResponseType.STORED); } return new Return(ResponseType.NOT_STORED); } catch(IOException e){ e.printStackTrace(); return new Return(ResponseType.ERROR); } } public Return visit(VersionCommand command, Parameter parameter) { return null; } } }
public class WriteAction implements Action { private static final Charset ASCII = Charset.forName("US-ASCII"); protected final Return ret; protected final ReadAction action; public WriteAction(final Return ret, ReadAction action){ this.ret = ret; this.action = action; } public void execute(SelectionKey selectionKey) { if(!selectionKey.isWritable()){ return; } SocketChannel channel = (SocketChannel) selectionKey.channel(); final CharBuffer buf = CharBuffer.wrap(ret.renderMessage()); try { ByteBuffer bytes = ASCII.encode(buf); int capacity = bytes.capacity(); int size = channel.write(bytes); if(size < capacity){ int sum = size; while(sum < capacity){ if(size == 0){ return; } bytes.position(sum); size = channel.write(bytes); sum += size; } } try { channel.register(selectionKey.selector(), SelectionKey.OP_READ, action); } catch(ClosedChannelException e){ e.printStackTrace(); } } catch(IOException e){ e.printStackTrace(); } } }
ということで、これをつかって簡単に比較すると
$target = array( array('host' => 'localhost', 'port' => 11211), array('host' => 'localhost', 'port' => 12221) ); foreach($target as $t){ $memcache = new Memcache; $memcache->connect($t['host'], $t['port']); $fail = 0; $elapsed = microtime(true); for($i = 0; $i < 1000; ++$i){ if(false === $memcache->set('hoge', '1234')){ echo 'ERROR!!', PHP_EOL; } if(false === $memcache->set('hoge', '123')){ echo 'ERROR!!!', PHP_EOL; } $value = $memcache->get('hoge'); if(false === $memcache->set('hoge', $value + 1)){ echo 'ERRROR!!!!', PHP_EOL; } $result = $memcache->get('hoge'); if($result != 124){ $fail++; } } echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL; echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL; echo 'fail => ', $fail, PHP_EOL; }
1) target host => localhost port =>11211 elapsed: 0.66518497467 fail => 0 target host => localhost port =>12221 elapsed: 0.870754003525 fail => 0 2) target host => localhost port =>11211 elapsed: 1.1857790947 fail => 0 target host => localhost port =>12221 elapsed: 0.868647098541 fail => 0
memcacheに匹敵してきた。
ってか、ByteBufferを読み書きするのは初めて書いたので、すごく手こずった。。
単純に \r\n までの一行を読みたいだけなのに。。
ということで、BufferedReaderのreadLine的なのを書いて色々とお茶を濁す。。
public class ByteBufferReader { protected final SocketChannel channel; protected final ByteBuffer buffer; public ByteBufferReader(final SocketChannel channel){ this.channel = channel; this.buffer = ByteBuffer.allocateDirect(512); } public boolean readable() throws IOException { return buffer.hasRemaining(); } public String readLine() throws IOException { final StringBuilder sb = new StringBuilder(); readInto(sb); if(sb.length() < 1){ return null; } return sb.toString(); } private void readInto(final StringBuilder sb) throws IOException { while(true){ int read = channel.read(buffer); if(read == 0){ break; } if(read == -1){ return; } } buffer.flip(); while(more(sb)); buffer.compact(); } private boolean more(final StringBuilder sb) throws IOException { if(buffer.hasRemaining()){ char ch = (char) buffer.get(); switch(ch){ case '\n': return false; case '\r': return true; default: sb.append(ch); return true; } } buffer.clear(); return true; } }
とりあえず、一段落
NIOは難しかった。

S2Container/S2Dao.PHP5を使っていて困ったこと・質問などがありましたら、S2Container-PHP5のMLにてお願いします。