about: S2PHP5
about: Io Language
2010/02/07
HiveJDBC を S2JDBC 経由で使えるようにする
hadoop の話題。その3
HiveのJDBCを使えばリモート上で動いているHiveに対して、JDBC(over thrift)経由でHive QLを実行出来るのですごく便利です。
ref - Hive/HiveClient - Hadoop Wiki
HiveJDBCはフツーのJDBCっぽく使えるので、こんな感じで普通のDBサーバにSQLを投げる感覚で使える
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; public class HiveConnection { public static void main(String...args) throws SQLException { try { Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); System.exit(1); } Connection conn = DriverManager.getConnection("jdbc:hive://master1:10000/", "", ""); Statement stmt = conn.createStatement(); stmt.execute("SELECT distinct(id) FROM hoge"); ResultSet rs = stmt.getResultSet(); while(rs.next()){ System.out.println(rs.getString(1)); } } }
なので、便利なものは便利なものと合体させてしまえ。ということで、みんな大好き S2JDBCで使えるようにしてみた
HiveServerを起動/停止
とりあえず、HiveJDBCを利用するには、thrift経由でHiveServerを叩ける必要がある。
以下のようなスクリプトを用意
start.sh
#!/usr/bin/env bash pidfile=$HIVE_PID_DIR/hiveserver.pid logfile=$HIVE_LOG_DIR/hiveserver.log if [ -f $pidfile ]; then echo running as process `cat $pidfile`. stop it first exit 1 fi nohup $HIVE_HOME/bin/hive --service hiveserver > $logfile 2>&1 < /dev/null & echo $! > $pidfile
stop.sh
#!/usr/bin/env bash pidfile=$HIVE_PID_DIR/hiveserver.pid logfile=$HIVE_LOG_DIR/hiveserver.log if [ -f $pidfile ]; then kill `cat $pidfile` rm $pidfile else echo no pidfile $pidfile fi
これで、実行したサーバでデフォルトのポート10000で起動する。
ちなみに、HADOOP_CONF_DIRとかHIVE_CONF_DIRは別途ちゃんと設定すること
HiveDialectを用意
先に書いておくと、SQLのorder by ...に相当するものは、HiveQLだと sort by ...なんだけど、今はまだ用意してない。今度書く
package org.seasar.extension.jdbc.dialect; import org.seasar.extension.jdbc.SelectForUpdateType; public class HiveDialect extends StandardDialect { @Override public String getName() { return "hive"; } @Override public boolean supportsLimit() { return true; } @Override public String convertLimitSql(String sql, int offset, int limit) { StringBuilder buf = new StringBuilder(sql.length() + 20); buf.append(sql); buf.append(" limit "); buf.append(limit); return buf.toString(); } @Override public boolean supportsBatchUpdateResults() { return false; } @Override public boolean supportsCursor() { return false; } @Override public boolean supportsForUpdate(SelectForUpdateType type, boolean withTarget) { return false; } @Override public boolean supportsGetGeneratedKeys() { return false; } @Override public boolean supportsIdentity() { return false; } @Override public boolean supportsInnerJoinForUpdate() { return false; } @Override public boolean supportsLockHint() { return false; } @Override public boolean supportsOffset() { return false; } @Override public boolean supportsOffsetWithoutLimit() { return false; } @Override public boolean supportsOuterJoinForUpdate() { return false; } @Override public boolean supportsSequence() { return false; } }
HiveConnectionPoolを用意
これは、HiveがXA transactionとかconnection#closeなんかを実行することができない(未実装のコードなので...svn:..jdbc/HiveConnection)ので ConnectionPoolImpl を継承して ConnectionWrapper を取る部る部分を override
package org.seasar.extension.dbcp.impl; import java.sql.Connection; import java.sql.SQLException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.sql.XAConnection; import javax.transaction.Transaction; import org.seasar.extension.dbcp.ConnectionWrapper; import org.seasar.framework.util.TransactionManagerUtil; public class HiveConnectionPool extends ConnectionPoolImpl { private Map<Transaction, ConnectionWrapper> txActivePool = new ConcurrentHashMap<Transaction, ConnectionWrapper>(); @Override public synchronized ConnectionWrapper checkOut() throws SQLException { Transaction tx = getTransaction(); ConnectionWrapper conn = txActivePool.get(tx); if(null == conn){ conn = createConnection(tx); } if(null != tx){ txActivePool.put(tx, conn); } return conn; } protected ConnectionWrapper createConnection(Transaction transaction) throws SQLException { XAConnection xaConnection = getXADataSource().getXAConnection(); Connection connection = xaConnection.getConnection(); return new HiveConnectionWrapper(xaConnection, connection, this, transaction); } protected Transaction getTransaction(){ return TransactionManagerUtil.getTransaction(getTransactionManager()); } }
HiveConnectionWrapper を用意する
これは後述するPreparedStatement対策として用意。PreparedStatementWrapper さえも wrapper を用意する必要があるので、ConnectionWrapperImpl を override
package org.seasar.extension.dbcp.impl; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import javax.sql.XAConnection; import javax.transaction.Transaction; import org.seasar.extension.dbcp.ConnectionPool; import org.seasar.framework.exception.SSQLException; public class HiveConnectionWrapper extends ConnectionWrapperImpl { public HiveConnectionWrapper(XAConnection xaConnection, Connection physicalConnection, ConnectionPool connectionPool, Transaction tx) throws SQLException { super(xaConnection, physicalConnection, connectionPool, tx); } protected SQLException wrapException(final SQLException e, final String sql) { return new SSQLException("ESSR0072", new Object[] { sql, e.getMessage(), new Integer(e.getErrorCode()), e.getSQLState() }, e .getSQLState(), e.getErrorCode(), e, sql); } protected void assertOpened() throws SQLException { if (isClosed()) { throw new SSQLException("ESSR0062", null); } } @Override public PreparedStatement prepareStatement(final String sql) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, autoGeneratedKeys), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnIndexes), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnNames), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } }
HivePreparedStatementWrapperを用意する
これもConnectionと同じように、PreparedStatementの大半のコードが未実装(svn:jdbc/HivePreparedStatement)なので、PreparedStatementWrapper を継承して override
package org.seasar.extension.dbcp.impl; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.seasar.extension.jdbc.impl.PreparedStatementWrapper; import org.seasar.framework.exception.SSQLException; public class HivePreparedStatementWrapper extends PreparedStatementWrapper { protected final PreparedStatement original; protected final String sql; public HivePreparedStatementWrapper(PreparedStatement original, String sql) { super(original, sql); this.original = original; this.sql = sql; } protected SQLException wrapException(final SQLException e) { return wrapException(e, sql); } protected SQLException wrapException(final SQLException e, final String sql) { if (sql != null) { return new SSQLException("ESSR0072", new Object[] { sql, String.valueOf(e.getErrorCode()), e.getSQLState() }, e .getSQLState(), e.getErrorCode(), e, sql); } return e; } @Override public void close() { // HivePreparedStatement was not supported in #close } @Override public ResultSet executeQuery() throws SQLException { try { return new HiveResultSetWrapper(original.executeQuery()); } catch (final SQLException e) { throw wrapException(e); } } @Override public ResultSet executeQuery(final String sql) throws SQLException { try { return new HiveResultSetWrapper(original.executeQuery(sql)); } catch (final SQLException e) { throw wrapException(e, sql); } } @Override public ResultSet getResultSet() throws SQLException { try { return new HiveResultSetWrapper(original.getResultSet()); } catch (final SQLException e) { throw wrapException(e); } } @Override public ResultSet getGeneratedKeys() throws SQLException { try { return new HiveResultSetWrapper(original.getGeneratedKeys()); } catch (final SQLException e) { throw wrapException(e); } } }
HiveResultSetWrapper を用意
まだまだ続きます...HiveResultSet にも未実装が(svn:jdbc/HiveResultSet)なので、ResultSetWrapper を override
package org.seasar.extension.dbcp.impl; import java.sql.Date; import java.sql.ResultSet; import java.sql.SQLException; import org.seasar.extension.jdbc.impl.ResultSetWrapper; public class HiveResultSetWrapper extends ResultSetWrapper { protected static final String DOUBLE_QUOTE = "\""; protected final ResultSet rs; public HiveResultSetWrapper(ResultSet rs){ super(rs); this.rs = rs; } @Override public void close() throws SQLException { // HiveResultSet was not supported in #close } protected Date parseDate(final String dt) throws Exception { String tmp = dt; if(dt.startsWith(DOUBLE_QUOTE) && dt.endsWith(DOUBLE_QUOTE)){ int length = dt.length(); tmp = dt.substring(1, length - 1); } return Date.valueOf(tmp); } @Override public Date getDate(int columnIndex) throws SQLException { Object obj = getObject(columnIndex); if (obj == null) { return null; } if(obj instanceof String){ try { return parseDate((String) obj); } catch(Exception e){ throw new SQLException("Cannot convert column " + columnIndex + " to date: " + e.toString()); } } return super.getDate(columnIndex); } }
jdbc.diconを修正
やっと、s2jdbcに近づいてきました...
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE components PUBLIC "-//SEASAR2.1//DTD S2Container//EN" "http://www.seasar.org/dtd/components21.dtd"> <components namespace="jdbc_hive"> <!-- Hive does not support in jta --> <include path="jta.dicon"/> <component name="hiveDataSource" class="org.seasar.extension.dbcp.impl.XADataSourceImpl"> <property name="driverClassName">"org.apache.hadoop.hive.jdbc.HiveDriver"</property> <property name="URL">"jdbc:hive://master1:10000/"</property> <property name="user">""</property> <property name="password">""</property> </component> <component name="hive" class="org.seasar.extension.dbcp.impl.DataSourceImpl" autoBinding="none"> <arg> <component name="hivePool" class="org.seasar.extension.dbcp.impl.HiveConnectionPool" autoBinding="none"> <property name="xaDataSource">hiveDataSource</property> <property name="timeout">600</property> <property name="maxPoolSize">10</property> <property name="allowLocalTx">true</property> <property name="readOnly">true</property> <property name="transactionManager">TransactionManager</property> <destroyMethod name="close"/> </component> </arg> </component> </components>
s2jdbc.diconを用意
dialectをs2jdbcに読ませる
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE components PUBLIC "-//SEASAR//DTD S2Container 2.4//EN" "http://www.seasar.org/dtd/components24.dtd"> <components> <include path="jta.dicon"/> <include path="tx.dicon"/> <include path="s2jdbc-internal.dicon"/> <include path="jdbc-hive.dicon" /> <component name="hiveDialect" class="org.seasar.extension.jdbc.dialect.HiveDialect" /> <component name="hiveJdbcManager" class="org.seasar.extension.jdbc.manager.JdbcManagerImpl"> <property name="maxRows">0</property> <property name="fetchSize">0</property> <property name="queryTimeout">0</property> <property name="dialect">hiveDialect</property> <initMethod name="init" /> </component> </components>
S2JDBC経由でHiveQLを投げてみる
Hiveはinsert文は無いので、LOAD DATA 文です。
しかも、preparedStatementで値のbindできないので、直接クエリを書きます。
さらに、getSingleResult であっても、何も結果セットがかえってこない(svn:hive/service/HiveServier)ので、selectBySql(Integer.class...)とかはできません。
あきらめて、selectBySql(String.class) で void にしてます。。
selectについても、preparedStatementでbindで値のbindができないので、直接クエリを書きます。
今はまだ、order by の書き換え(sort by)を行っていないので、orderByが使えません。直接クエリに書きます。
joinについても少し難あり。。。
等など、ということで、いくらか諦めると...!
@Component @InterType("aop.propertyInterType") public class HogeService { @Property(PropertyType.WRITE) protected JdbcManager hive; public void insert(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public void insertFromLocal(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA LOCAL INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public void overwrite(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public void overwriteFromLocal(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public List<Hoge> getHoge(){ return hive.from(Hoge.class).where("id > 1234").limit(100).getResultList(); } }
\(^o^)/ やたー!うごいたー!
おわり。
ってことで、とりあえず、HiveをS2JDBC経由で利用できるようになったよ!
Seasarのパッケージはホント便利!ほとんどwrapperが用意されてるし、diconで切り替えれるから修正が楽!
でも、coreなパッケージほど、 private メソッド多いよ!せめて protected か getter を用意して!(無駄にコピーしちゃったのがいくつかある...)
ああああ、でもこれなら HiveJDBC をちゃんと修正した方が早かったかも。。orz
今度やってみよう。。
HDFS の HadoopThriftServer をなんとかする
hadoop の話題。その2
hadoop を支える HDFS には HDFS-APIを通すことで、プログラム中から HDFS の読み書きが出きるようになります。(たぶん、hdfs-s3 なんかもこのAPI経由(? ソース読んでない))
(中略)
んで、この HDFS-API のなかに、Thrift を使って リモート上から HDFS の読み書きをできるようにしている HadoopThriftServer(theiftfs) があります。
この thriftfs の起動は に書かれているのですが、shellを握ってしまうのでこんな感じにしました。
#!/usr/bin/env bash THRIFTFS_PID_FILE=$HADOOP_PID_DIR/thrift.pid THRIFTFS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log if [ -f $THRIFTFS_PID_FILE ]; then echo running as process `cat $THRIFTFS_PID_FILE`. stop it first exit 1 fi CLASSPATH=$HADOOP_CONF_DIR CLASSPATH=$CLASSPATH:$HADOOP_HOME/hadoop-0.20.1-core.jar:$HADOOP_HOME/hadoop-0.20.1-tools.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-1.0.4.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-api-1.0.4.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/log4j-1.2.15.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/contrib/thriftfs/hadoop-0.20.1-thriftfs.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/hadoopthriftapi.jar CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/libthrift.jar nohup java -Dcom.sun.management.jmxremote -cp $CLASSPATH org.apache.hadoop.thriftfs.HadoopThriftServer 10010 > $THRIFTFS_LOG_FILE 2>&1 < /dev/null & echo $! > $THRIFTFS_PID_FILE $HADOOP_HOME/bin/hadoop dfsadmin -safemode leave
同様に 停止は
#!/usr/bin/env bash THRIFT_FS_PID_FILE=$HADOOP_PID_DIR/thrift.pid THRIFT_FS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log if [ -f $THRIFT_FS_PID_FILE ]; then kill `cat $THRIFT_FS_PID_FILE` rm $THRIFT_FS_PID_FILE else echo no pidfile $THRIFT_FS_PID_FILE fi
起動する際は dfs($HADOOP_HOME/bin/start-dfs.sh)が起動している状態で起動する必要があります。
(dfsadmin -safemode leave は適宜行ってください)
んで、このThriftFSは、こんな感じでリモート上の DFS のファイルを読み書きできます
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.thriftfs.api.Pathname; import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem; import org.apache.hadoop.thriftfs.api.ThriftHandle; import org.apache.hadoop.thriftfs.api.ThriftIOException; import com.facebook.thrift.TException; import com.facebook.thrift.protocol.TBinaryProtocol; import com.facebook.thrift.protocol.TProtocol; import com.facebook.thrift.transport.TSocket; import com.facebook.thrift.transport.TTransportException; public class HDFSInput { public static void main(String...args) { final int defaultBufferSize = config.getInt("io.file.buffer.size", 4096); final long defaultBlockSize = config.getLong("dfs.block.size", FSConstants.DEFAULT_BLOCK_SIZE); final short defaultReplication = (short) config.getInt("dfs.replication", 3); TSocket socket = new TSocket("master1", 10010); TProtocol protocol = new TBinaryProtocol(socket); try { socket.open(); try { ThriftHadoopFileSystem.Client client = new ThriftHadoopFileSystem.Client(protocol); // client timeout 5 sec client.setTimeout(5); Pathname hoge = new Pathname("/tmp/hoge"); FsPermission permission = FsPermission.createImmutable((short) 0655); boolean overwrite = true; ThriftHandle writeHandler = client.createFile(hoge, permission.toShort(), overwrite, defaultBufferSize, defaultReplication, defaultBlockSize); client.write(writeHandler, "hello world"); client.close(writeHandler); ThriftHandle readHandler = client.open(hoge); System.out.println(client.read(readHandler, 0, 1024)); client.close(readHandler); } catch (TTransportException e) { e.printStackTrace(); } catch (ThriftIOException e) { e.printStackTrace(); } } catch (TException e) { e.printStackTrace(); } finally { socket.close(); } } }
read時にBufferedReaderにwrapするともう少し便利に読み書きできる(これは今度書く)
んで、読み書きできるようになったんだけど、どうも複数のクライアントから連続して読み書きをすると、整合性が取れなくなってしまう(?)のかエラーがでるようになった。
元のソースを読むと...なんともエレガントな。。。
ということで、java.util.concurrent.atomic を使って書き直してみた(ここが本題)
package org.apache.hadoop.thriftfs; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.thriftfs.api.Pathname; import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem; import org.apache.hadoop.thriftfs.api.ThriftHandle; import org.apache.hadoop.thriftfs.api.ThriftIOException; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StringUtils; import com.facebook.thrift.protocol.TBinaryProtocol; import com.facebook.thrift.server.TServer; import com.facebook.thrift.server.TThreadPoolServer; import com.facebook.thrift.transport.TServerSocket; import com.facebook.thrift.transport.TServerTransport; import com.facebook.thrift.transport.TTransportFactory; public class HadoopThriftServer extends ThriftHadoopFileSystem { static int serverPort = 0; // default port TServer server = null; public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift"); // HDFS glue Configuration conf; FileSystem fs; // stucture that maps each Thrift object into an hadoop object private AtomicLong nextId = new AtomicLong(new Random().nextLong()); private ConcurrentHashMap<Long, Object> hadoopHash = new ConcurrentHashMap<Long, Object>(); private Daemon inactivityThread = null; // Detect inactive session private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr private static volatile long inactivityRecheckInterval = 60 * 1000; private static volatile boolean fsRunning = true; private AtomicLong now = new AtomicLong(now()); // allow outsider to change the hadoopthrift path public void setOption(String key, String val) { } /** * Current system time. * @return current time in msec. */ static long now() { return System.currentTimeMillis(); } /** * getVersion * * @return current version of the interface. */ public String getVersion() { return "0.1"; } /** * shutdown * * cleanly closes everything and exit. */ public void shutdown(int status) { LOG.info("HadoopThriftServer shutting down."); try { fs.close(); } catch (IOException e) { LOG.warn("Unable to close file system"); } Runtime.getRuntime().exit(status); } /** * Periodically checks to see if there is inactivity */ class InactivityMonitor implements Runnable { public void run() { while (fsRunning) { try { if (now() > now.get() + inactivityPeriod) { LOG.warn("HadoopThriftServer Inactivity period of " + inactivityPeriod + " expired... Stopping Server."); shutdown(-1); } } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } try { Thread.sleep(inactivityRecheckInterval); } catch (InterruptedException ie) { } } } } /** * HadoopThriftServer * * Constructor for the HadoopThriftServer glue with Thrift Class. * * @param name - the name of this handler */ public HadoopThriftHandler(String name) { conf = new Configuration(); now.set(now()); try { inactivityThread = new Daemon(new InactivityMonitor()); fs = FileSystem.get(conf); } catch (IOException e) { LOG.warn("Unable to open hadoop file system..."); Runtime.getRuntime().exit(-1); } } /** * printStackTrace * * Helper function to print an exception stack trace to the log and not stderr * * @param e the exception * */ static private void printStackTrace(Exception e) { for(StackTraceElement s: e.getStackTrace()) { LOG.error(s); } } /** * Lookup a thrift object into a hadoop object */ private synchronized Object lookup(long id) { return hadoopHash.get(new Long(id)); } /** * Insert a thrift object into a hadoop object. Return its id. */ private synchronized long insert(Object o) { long next = nextId.incrementAndGet(); hadoopHash.put(next, o); return next; } /** * Delete a thrift object from the hadoop store. */ private synchronized Object remove(long id) { return hadoopHash.remove(new Long(id)); } /** * Implement the API exported by this thrift server */ /** Set inactivity timeout period. The period is specified in seconds. * if there are no RPC calls to the HadoopThrift server for this much * time, then the server kills itself. */ public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) { inactivityPeriod = periodInSeconds * 1000; // in milli seconds if (inactivityRecheckInterval > inactivityPeriod ) { inactivityRecheckInterval = inactivityPeriod; } } /** * Create a file and open it for writing */ public ThriftHandle create(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("create: " + path); FSDataOutputStream out = fs.create(new Path(path.pathname)); long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Create a file and open it for writing, delete file if it exists */ public ThriftHandle createFile(Pathname path, short mode, boolean overwrite, int bufferSize, short replication, long blockSize) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("create: " + path + " permission: " + mode + " overwrite: " + overwrite + " bufferSize: " + bufferSize + " replication: " + replication + " blockSize: " + blockSize); FSDataOutputStream out = fs.create(new Path(path.pathname), new FsPermission(mode), overwrite, bufferSize, replication, blockSize, null); // progress long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Opens an existing file and returns a handle to read it */ public ThriftHandle open(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("open: " + path); FSDataInputStream out = fs.open(new Path(path.pathname)); long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Opens an existing file to append to it. */ public ThriftHandle append(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("append: " + path); FSDataOutputStream out = fs.append(new Path(path.pathname)); long id = insert(out); ThriftHandle obj = new ThriftHandle(id); HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id); return obj; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * write to a file */ public boolean write(ThriftHandle tout, String data) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("write: " + tout.id); FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id); byte[] tmp = data.getBytes("UTF-8"); out.write(tmp, 0, tmp.length); HadoopThriftHandler.LOG.debug("wrote: " + tout.id); return true; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * read from a file */ public String read(ThriftHandle tout, long offset, int length) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("read: " + tout.id + " offset: " + offset + " length: " + length); FSDataInputStream in = (FSDataInputStream)lookup(tout.id); if (in.getPos() != offset) { in.seek(offset); } byte[] tmp = new byte[length]; int numbytes = in.read(offset, tmp, 0, length); HadoopThriftHandler.LOG.debug("read done: " + tout.id); return new String(tmp, 0, numbytes, "UTF-8"); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Delete a file/directory */ public boolean rm(Pathname path, boolean recursive) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("rm: " + path + " recursive: " + recursive); boolean ret = fs.delete(new Path(path.pathname), recursive); HadoopThriftHandler.LOG.debug("rm: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Move a file/directory */ public boolean rename(Pathname path, Pathname dest) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("rename: " + path + " destination: " + dest); boolean ret = fs.rename(new Path(path.pathname), new Path(dest.pathname)); HadoopThriftHandler.LOG.debug("rename: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * close file */ public boolean close(ThriftHandle tout) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("close: " + tout.id); Object obj = remove(tout.id); if (obj instanceof FSDataOutputStream) { FSDataOutputStream out = (FSDataOutputStream)obj; out.close(); } else if (obj instanceof FSDataInputStream) { FSDataInputStream in = (FSDataInputStream)obj; in.close(); } else { throw new ThriftIOException("Unknown thrift handle."); } HadoopThriftHandler.LOG.debug("closed: " + tout.id); return true; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Create a directory */ public boolean mkdirs(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("mkdirs: " + path); boolean ret = fs.mkdirs(new Path(path.pathname)); HadoopThriftHandler.LOG.debug("mkdirs: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Does this pathname exist? */ public boolean exists(Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("exists: " + path); boolean ret = fs.exists(new Path(path.pathname)); HadoopThriftHandler.LOG.debug("exists done: " + path); return ret; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Returns status about the specified pathname */ public org.apache.hadoop.thriftfs.api.FileStatus stat( Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("stat: " + path); org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus( new Path(path.pathname)); HadoopThriftHandler.LOG.debug("stat done: " + path); return new org.apache.hadoop.thriftfs.api.FileStatus( stat.getPath().toString(), stat.getLen(), stat.isDir(), stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(), stat.getPermission().toString(), stat.getOwner(), stat.getGroup()); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * If the specified pathname is a directory, then return the * list of pathnames in this directory */ public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus( Pathname path) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("listStatus: " + path); org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus( new Path(path.pathname)); HadoopThriftHandler.LOG.debug("listStatus done: " + path); org.apache.hadoop.thriftfs.api.FileStatus tmp; List<org.apache.hadoop.thriftfs.api.FileStatus> value = new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>(); for (int i = 0; i < stat.length; i++) { tmp = new org.apache.hadoop.thriftfs.api.FileStatus( stat[i].getPath().toString(), stat[i].getLen(), stat[i].isDir(), stat[i].getReplication(), stat[i].getBlockSize(), stat[i].getModificationTime(), stat[i].getPermission().toString(), stat[i].getOwner(), stat[i].getGroup()); value.add(tmp); } return value; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Sets the permission of a pathname */ public void chmod(Pathname path, short mode) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("chmod: " + path + " mode " + mode); fs.setPermission(new Path(path.pathname), new FsPermission(mode)); HadoopThriftHandler.LOG.debug("chmod done: " + path); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Sets the owner & group of a pathname */ public void chown(Pathname path, String owner, String group) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("chown: " + path + " owner: " + owner + " group: " + group); fs.setOwner(new Path(path.pathname), owner, group); HadoopThriftHandler.LOG.debug("chown done: " + path); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Sets the replication factor of a file */ public void setReplication(Pathname path, short repl) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("setrepl: " + path + " replication factor: " + repl); fs.setReplication(new Path(path.pathname), repl); HadoopThriftHandler.LOG.debug("setrepl done: " + path); } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } /** * Returns the block locations of this file */ public List<org.apache.hadoop.thriftfs.api.BlockLocation> getFileBlockLocations(Pathname path, long start, long length) throws ThriftIOException { try { now.set(now()); HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path); org.apache.hadoop.fs.FileStatus status = fs.getFileStatus( new Path(path.pathname)); org.apache.hadoop.fs.BlockLocation[] stat = fs.getFileBlockLocations(status, start, length); HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path); org.apache.hadoop.thriftfs.api.BlockLocation tmp; List<org.apache.hadoop.thriftfs.api.BlockLocation> value = new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>(); for (int i = 0; i < stat.length; i++) { // construct the list of hostnames from the array returned // by HDFS List<String> hosts = new LinkedList<String>(); String[] hostsHdfs = stat[i].getHosts(); for (int j = 0; j < hostsHdfs.length; j++) { hosts.add(hostsHdfs[j]); } // construct the list of host:port from the array returned // by HDFS List<String> names = new LinkedList<String>(); String[] namesHdfs = stat[i].getNames(); for (int j = 0; j < namesHdfs.length; j++) { names.add(namesHdfs[j]); } tmp = new org.apache.hadoop.thriftfs.api.BlockLocation( hosts, names, stat[i].getOffset(), stat[i].getLength()); value.add(tmp); } return value; } catch (IOException e) { throw new ThriftIOException(e.getMessage()); } } } // Bind to port. If the specified port is 0, then bind to random port. private ServerSocket createServerSocket(int port) throws IOException { try { ServerSocket sock = new ServerSocket(); // Prevent 2MSL delay problem on server restarts sock.setReuseAddress(true); // Bind to listening port if (port == 0) { sock.bind(null); serverPort = sock.getLocalPort(); } else { sock.bind(new InetSocketAddress(port)); } return sock; } catch (IOException ioe) { throw new IOException("Could not create ServerSocket on port " + port + "." + ioe); } } /** * Constrcts a server object */ public HadoopThriftServer(String [] args) { if (args.length > 0) { serverPort = new Integer(args[0]); } try { ServerSocket ssock = createServerSocket(serverPort); TServerTransport serverTransport = new TServerSocket(ssock); Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba"); ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler); TThreadPoolServer.Options options = new TThreadPoolServer.Options(); options.minWorkerThreads = 10; server = new TThreadPoolServer(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), options); System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]..."); HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]..."); System.out.flush(); } catch (Exception x) { x.printStackTrace(); } } public static void main(String [] args) { HadoopThriftServer me = new HadoopThriftServer(args); me.server.serve(); } }
安定した。気がする。。
いつか問題となるようなコードを書いてpatchを作ろう
Hive の Local Metastore に derby を使う
突然ですが、hadoop の話題
Hadoop Hive といえば、SQL 感覚で MapReduce できるんですが、Hive は SQL のように記述できるようにするために、metastore 形式でメタデータを管理してます。
そのあたりは、Hive/AdminManual/MetastoreAdmin - Hadoop Wiki や metastore_usage.pptxやHiveのmetastoreをMySQLを使ってLocal Metastore形式で利用する - blog.katsuma.tv などに詳しく書かれているので省略。
ただ、local metastore にいちいち mysql をセットアップするのもあれだったんで、今回は derby にしておきます
derby編
derby のインストール
Apache Derbyからダウンロードしてきます。とりあえず最新のを落としてくる
> wget http://..../db-derby-10.5.3.0-bin.tar.gz
> tar xzvf db-derby-10.5.3.0-bin.tar.gz
> sudo mv db-derby-10.5.3.0-bin /opt/local/derby
次に、データ保存用の /var/db/derby を作っておく
> sudo mkdir /var/db/derby > sudo chown nowel:users /var/db/derby
chown は derby 用のユーザでもいいので、実行ユーザに変えておく
環境変数の設定
環境変数名とかは、Hadoop のそれっぽくしておく
#!/usr/bin/env bash export DERBY_HOME=/opt/local/derby export DERBY_OPTS="-Dderby.drda.startNetworkServer=true -Dderby.drda.maxThreads=30 -Dderby.system.home=/var/db/derby -Dderby.drda.portNumber=1527 -Dderby.drda.host=0.0.0.0" export DERBY_LOG_DIR=/var/log
これを $HOME/.derby_profile とかしておいて、.bashrcなんかに
source $HOME/.derby_profile
と記述しておくといちいち source しなくて楽
起動用のスクリプトを用意
start/stop/status くらいは何度か確認することがあるので、こんな感じで用意
start.sh
#!/usr/bin/env bash logfile=$DERBY_LOG_DIR/derby.log nohup $DERBY_HOME/bin/NetworkServerControl start > $logfile 2>&1 < /dev/null &
stop.sh
#!/usr/bin/env bash
$DERBY_HOME/bin/NetworkServerControl shutdown
status.sh
#!/usr/bin/env bash
$DERBY_HOME/bin/NetworkServerControl sysinfo
$DERBY_HOME/bin/NetworkServerControl runtimeinfo
とこんな感じ。
起動と停止
さっき用意した start.sh ファイル群を $HOME/bin とかに置いているのであれば
> $HOME/bin/start.sh
で起動できる
ちなみに、derbyはstartNetworkServerというスクリプトが用意されているけど、素のまま起動すると、ネットワーク越しにアクセスできない(正確には起動した同一ホストからしかアクセスできない)
ネットワーク越しに利用するならstartNetworkServer -h 0.0.0.0とするか、derby.drda.host=0.0.0.0みたいな変数を利用する。
(ここで結構ハマった...とりあえず、開発用なら 0.0.0.0 で始めるといいかと)
Hive編
hadoop の mapred/hdfs などはセットアップ済みとして進めます。
hive のセットアップ
先に書いておくと、http://www.apache.org/dyn/closer.cgi/hadoop/hive/とかに置いてある hive-0.4.1 を使ってセットアップを進めても
FAILED: Error in metadata: org.datanucleus.jdo.exceptions.TransactionNotReadableException: Cant read fields outside of transactions. You may want to set 'NontransactionalRead=true'. FailedObject:1[OID]org.apache.hadoop.hive.metastore.model.MDatabase FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
が発生してなかなか前に進めなくなる。
Hive CLI の起動時に
hive > set javax.jdo.option.NontransactionalRead=true;
としておくことで、一時的になんとかなるけど、接続毎にやることになるので、おすすめはしません。
(jpox.properties を使うというのもあるけど、設定の二重管理になりそうだし、後述する新しいのでは必要なさそうなので今回は 0.4.1 を使わないという方向で進みます)
Hiveをtrunkからもってきてビルドする
たぶん、trunkに入ってるのは hive-0.5.x 系
> svn export http://svn.apache.org/repos/asf/hadoop/hive/trunk hive > cd hive > ant package > : > : (しばし待つ) > : > cp -r build/dist/lib/* lib > sudo mv hive /opt/local/hive-0.5.0-trunk
環境変数の設定
こんな感じで用意する
#!/usr/bin/env bash export HIVE_HOME=/opt/local/hive-0.5.0-trunk export HIVE_CONF_DIR=/home/hive/conf export HIVE_PID_DIR=/var/run export HIVE_LOG_DIR=/var/log
HIVE_CONF_DIRはHADOOP_CONF_DIRと違って、hive-default.xml を HIVE-HOME/conf から読んでくれない(?)ので、hive-default.xmlとhive-log4j.propertiesは HIVE_CONF_DIR にシンボリックリンクしておく
> ln -s $HIVE_HOME/conf/hive-default.xml $HIVE_CONF_DIR/hive-default.xml > ln -s $HIVE_HOME/conf/hive-log4j.properties $HIVE_CONF_DIR/hive-log4j.properties
hive-site.xmlを用意
$HIVE_CONF_DIR に hive-site.xml をこんな感じで用意
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.local</name> <value>true</value> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive/warehouse</value> </property> <property> <name>hive.metastore.rawstore.impl</name> <value>org.apache.hadoop.hive.metastore.ObjectStore</value> </property> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:derby://master1:1527/metastore;create=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>org.apache.derby.jdbc.ClientDriver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value><!-- !empty --></value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value><!-- !empty --></value> </property> <property> <name>datanucleus.autoCreateTables</name> <value>true</value> </property> </configuration>
Hive CLIの起動
とりあえず、derby の metastorage が使えるかどうかを確認するため、$HIVE_HOME/bin/hive で Hive CLI を起動して show tables する
> $HIVE_HOME/bin/hive hive> show tables; OK Time taken: 8.613 seconds hive> exit;
とここまで出れば derby で Local Metastore が使えるようになってます。
おわり
と、ここまで書いてみたけど、Hiveのwikiに同じようなものがあった orz
ref - HiveDerbyServerMode - Hadoop Wiki
2010/01/20
PHPで200行で作る memcached 互換サーバ
まさに誰得。
class MemcachedServer { protected $command; public function __construct(MemcachedCommand $command){ $this->command = $command; } public function start(){ $this->run(); } public function run(){ $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if(false === $socket){ $code = socket_last_error(); $msg = socket_strerror($code); throw new Exception(sprintf('socket_create was error(%s):%s', $code, $msg)); } socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1); $binded = @socket_bind($socket, 0, 11222); if(false === $binded){ $code = socket_last_error(); $msg = socket_strerror($code); throw new Exception(sprintf('socket_bind was error(%s):%s', $code, $msg)); } $listend = @socket_listen($socket); if(false === $listend){ $code = socket_last_error(); $msg = socket_strerror($code); throw new Exception(sprintf('socket_listen was error(%s):%s', $code, $msg)); } //socket_set_nonblock($socket); echo 'server start on tcp://0.0.0.0:11222', PHP_EOL; $read = array($socket); $write = null; $except = null; while(true){ $accept = @socket_accept($socket); if(false === $accept){ continue; } $handler = new MemcachedAcceptHandler($accept, $this->command); $handler->init(); $handler->execute(); $handler->destroy(); } } } interface StreamReadWrite { const DELIMITER = "\r\n"; public function readLine(); public function writeLine($str); } class MemcachedAcceptHandler implements StreamReadWrite { protected $socket; protected $command; protected $connected = true; public function __construct($socket, MemcachedCommand $command){ $this->socket = $socket; $this->command = $command; } public function __destruct(){ if(null !== $this->socket){ @socket_close($this->socket); unset($this->socket); } } public function init(){ echo 'new connection', PHP_EOL; //socket_set_nonblock($this->socket); } public function execute(){ while($this->connected){ $read = array($this->socket); $write = array(); $except = array(); $select = @socket_select($read, $write, $except, 1); if(false === $select){ throw new RuntimeException('socket_select'); } if($select < 1){ continue; } $line = $this->readLine(); if(null === $line){ continue; } // get hoge => get $mode = substr($line, 0, 3); // get hoge foo => array(hoge, foo) $args = explode(' ', substr($line, 4)); $this->command->call($this, $mode, $args); } } public function destroy(){ echo 'close connection', PHP_EOL; @socket_shutdown($this->socket); } public function readLine(){ $line = ''; while(true){ $buf = @socket_read($this->socket, 1); if(false === $buf || '' === $buf){ // FIXME!!! $this->connected = false; return null; } $line .= $buf; if(self::DELIMITER == substr($line, -2)){ $line = substr($line, 0, -2); break; } } if(empty($line)){ return null; } if(preg_match('/^\s+$/', $line)){ return null; } return $line; } public function writeLine($str){ return @socket_write($this->socket, $str . self::DELIMITER); } } interface MemcachedCommand { public function call(StreamReadWrite $reader, $mode, array $args); } abstract class AbstractMemcachedCommand implements MemcachedCommand { protected $reflector; public function __construct(){ $this->reflector = new ReflectionObject($this); } protected static function concat(array $a, array $b){ array_splice($a, count($a), 0, $b); return $a; } public final function call(StreamReadWrite $rw, $mode, array $args){ if(!$this->reflector->hasMethod($mode)){ return $this->error($rw); } echo 'command => ', $mode, ' ', join(' ', $args), PHP_EOL; return call_user_func_array(array($this, $mode), self::concat(array($rw), $args)); } protected function error($rw){ $rw->writeLine('ERROR'); } protected abstract function get(StreamReadWrite $rw, $keys); protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length); protected abstract function delete(StreamReadWrite $rw, $key, $expire = 0); } class StorageMemcacheCommand extends AbstractMemcachedCommand { protected $cache = array(); protected function get(StreamReadWrite $rw, $keys){ $args = func_get_args(); array_shift($args); foreach($args as $key){ if(!isset($this->cache[$key])){ continue; } $value = $this->cache[$key]; if($value->expire < time()){ continue; } $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->length)); $rw->writeLine($value->value); } $rw->writeLine('END'); } protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){ $value = new stdClass; $value->flag = $flag; $value->expire = time() + $expire; $value->length = $length; $value->value = $rw->readLine(); $this->cache[$key] = $value; $rw->writeLine('STORED'); } protected function delete(StreamReadWrite $rw, $key, $expire = 0){ if(isset($this->cache[$key])){ $this->cache[$key]->expire = $expire; } } } $server = new MemcachedServer(new StorageMemcacheCommand); $server->start();
とりあえず、set/get/deleteだけ。threadとか色々ないので、実際には削除されない。。
他にも構文解析とかしてない(しなくていいくらい memcache はシンプルなんだけど)ので上手いことハンドリングしないといけない。。
同時アクセスに難あり。。
(中略)
などなど、色々あるので、開発用にしか向いてない。というか、よっぽどストイックにPHPを愛している人じゃないと向いていないかも。
まぁ最近は色々memcached互換系のがでてきたので、クライアント作成時のdebug用途向けかも。
今回は、PHPのsocket 関数をゴリゴリ使っています。
blocking modeとかはちょっと動きが不安定(?)なので、あまり性能はでない
c-lang版と比較を、この前のスクリプトを使ってみた
$target = array( array('host' => 'localhost', 'port' => 11211), array('host' => 'localhost', 'port' => 11222) ); foreach($target as $t){ $memcache = new Memcache; $memcache->connect($t['host'], $t['port']); $fail = 0; $fails = array(); $elapsed = microtime(true); $count = 500; for($i = 0; $i < $count; ++$i){ if(false === $memcache->set('hoge', 1234, 0, 10)){ echo 'ERROR!!', PHP_EOL; } if(false === $memcache->set('hoge', 123, 0, 10)){ echo 'ERROR!!!', PHP_EOL; } $value = (int) $memcache->get('hoge'); if(false === $memcache->set('hoge', ((int)$value + 1), 0, 10)){ echo 'ERROR!!!!', PHP_EOL; } $result = (int)$memcache->get('hoge'); if($result != 124){ $fails[] = $result; $fail++; } } echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL; echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL; echo 'fail => ', $fail, PHP_EOL; }
target host => localhost port =>11211 elapsed: 0.634283065796 fail => 0 target host => localhost port =>11222 elapsed: 1.12030887604 fail => 0
とまあ、こんな感じ。(そこそこ?)
ちなみに、stream_socketを使って書いてみたものも置いておく。fwriteまわりでハマったので動かないと思うけど。
// 動かない>< class MemcachedServer { protected $command; public function __construct(MemcachedCommand $command){ $this->command = $command; } public function start(){ $this->run(); } public function run(){ $socket = stream_socket_server('tcp://0.0.0.0:11222', $code, $msg, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN); if(false === $socket){ throw new RuntimeException(sprintf('stream_socket_server was error(%s):%s', $code, $msg)); } $nonBlocking = stream_set_blocking($socket, 0); if(false === $nonBlocking){ throw new RuntimeException('stream_set_blocking: set blocking mode error'); } echo 'server start on tcp://0.0.0.0:11222', PHP_EOL; $read = array($socket); $write = null; $except = null; while(true){ $accept = @stream_socket_accept($socket, 1); if(false === $accept){ continue; } $handler = new MemcachedAcceptHandler($accept, $this->command); $handler->init(); $handler->execute(); $handler->destroy(); } } } interface StreamReadWrite { const DELIMITER = "\r\n"; public function readLine(); public function writeLine($str); } class MemcachedAcceptHandler implements StreamReadWrite { protected $socket; protected $command; public function __construct($socket, MemcachedCommand $command){ $this->socket = $socket; $this->command = $command; } public function __destruct(){ if(null !== $this->socket){ fclose($this->socket); unset($this->socket); } } public function init(){ echo 'new connection', PHP_EOL; stream_set_blocking($this->socket, true); stream_set_timeout($this->socket, 60); // disable writebuffer stream_set_write_buffer($this->socket, 0); } protected function feof(){ $recv = stream_socket_recvfrom($this->socket, 1, STREAM_PEEK); return strlen($recv) < 1; } public function execute(){ while(true){ $read = array($this->socket); $write = array(); $except = array(); $select = @stream_select($read, $write, $except, 1); if(false === $select){ throw new RuntimeException('stream_select'); } if($select < 1){ continue; } $line = $this->readLine(); echo 'line => ', $line, PHP_EOL; if(null === $line){ return; } // get hoge => get $mode = substr($line, 0, 3); // get hoge foo => array(hoge, foo) $args = explode(' ', substr($line, 4)); $this->command->call($this, $mode, $args); } } public function destroy(){ echo 'close connection', PHP_EOL; // socket_close($socket) stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR); } public function readLine(){ // 1048576 = 1024 * 1024 //socket_read($socket, 1048576, PHP_NORMAL_READ $line = stream_get_line($this->socket, 2048, self::DELIMITER); if(empty($line)){ return null; } if(preg_match('/^\s+$/', $line)){ return null; } return $line; } public function writeLine($str){ fputs($this->socket, $str . "\n\0"); } } interface MemcachedCommand { public function call(StreamReadWrite $reader, $mode, array $args); } abstract class AbstractMemcachedCommand implements MemcachedCommand { protected $reflector; public function __construct(){ $this->reflector = new ReflectionObject($this); } protected static function concat(array $a, array $b){ array_splice($a, count($a), 0, $b); return $a; } public final function call(StreamReadWrite $rw, $mode, array $args){ if(!$this->reflector->hasMethod($mode)){ return $this->error($rw); } return call_user_func_array(array($this, $mode), self::concat(array($rw), $args)); } protected function error($rw){ $rw->writeLine('ERROR'); } protected abstract function get(StreamReadWrite $rw, $keys); protected abstract function set(StreamReadWrite $rw, $key, $flag, $expire, $length); } class StorageMemcacheCommand extends AbstractMemcachedCommand { protected $cache = array(); protected function get(StreamReadWrite $rw, $keys){ $args = func_get_args(); array_shift($args); foreach($args as $key){ if(!isset($this->cache[$key])){ continue; } $value = $this->cache[$key]; $rw->writeLine(sprintf('VALUE %s %d %d', $key, $value->flag, $value->expire)); $rw->writeLine($value->value); } $rw->writeLine('END'); } protected function set(StreamReadWrite $rw, $key, $flag, $expire, $length){ $value = new stdClass; $value->flag = $flag; $value->expire = $expire; $value->length = $length; $value->value = $rw->readLine(); $this->cache[$key] = $value; $rw->writeLine('STORED'); } } $server = new MemcachedServer(new StorageMemcacheCommand); $server->start();
memcacheの cache の部分を java で(その2)
前回の続き。
前回の結果、とりあえず、DelayQueueによって期限切れのEntryは取り出せるようになった。
だけど、この仕組みの状態では期限切れになったEntryは容赦なく消えていってしまう。
つまり、そのEntryがまだ利用されるかもしれないのに、消えてしまうのは(色々な意味で)もったいない。
特に、javaだとHashMapとかのloadFactorまわりの動きは(きっと)もったいない
もう一度LRUについてwikipediaに確認すると
Least Recently Used (LRU) はキャッシュメモリや仮想メモリが扱うデータのリソースへの割り当てを決定するアルゴリズムである。対義語はMost Recently Used (MRU)。
和訳すると「最近最も使われなかったもの」つまり「使われてから最も長い時間が経ったもの」「参照される頻度が最も低いもの」である。
ということなので(?)、ホントに使われ無かったもの(アクセスが少ないもの)から順番に消えるように考えてみた。
その結果がこれ
public class LRUCache implements CacheLifeCycle { private static final Log log = LogFactory.getLog(LRUCache.class); protected final ConcurrentMap<String, PrioritalEntry> cache = new ConcurrentHashMap<String, PrioritalEntry>(); protected final DelayQueue<PrioritalEntry> expiredQueue = new DelayQueue<PrioritalEntry>(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock writeLock = lock.writeLock(); private final Lock readLock = lock.readLock(); public void register(LifeCycleExecutor executor){ executor.add(this, expiredQueue); } public void purge(PrioritalEntry entry){ writeLock.lock(); try { // priorityを下げる if(entry.decrementPriority() < 1){ // 1以下なら誰も使ってなさそうなので消す if(log.isDebugEnabled()){ log.debug("purge entry => " + entry); } cache.remove(entry.getKey()); return; } // もう一度チャンス entry.setExpiredAt(retransmission(entry.getPriority())); expiredQueue.offer(entry); } finally { writeLock.unlock(); } } public boolean set(String key, String value, long expiredAt){ writeLock.lock(); try { PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt); PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry); // 既存の値がない if(null == previousEntry){ // ということは新しい値 return expiredQueue.offer(newEntry); } previousEntry.setValue(value); previousEntry.setExpiredAt(expiredAt); previousEntry.incrementPriority(); return true; } finally { writeLock.unlock(); } } public PrioritalEntry get(String key) { readLock.lock(); try { if(!cache.containsKey(key)){ return null; } PrioritalEntry entry = cache.get(key); // 時間切れなので見えなくする if(entry.isExpired()){ return null; } // 使う人がいたのでpriorityをあげる entry.incrementPriority(); return entry; } finally { readLock.unlock(); } } public void remove(String key) { remove(key, 0); } public void remove(String key, long expiredAt){ writeLock.lock(); try { PrioritalEntry entry = cache.get(key); if(null != entry){ entry.setExpiredAt(expiredAt); entry.decrementPriority(); } } finally { writeLock.unlock(); } } public void flush() { flush(0); } public void flush(final long expiredAt){ writeLock.lock(); try { Iterator<Map.Entry<String, PrioritalEntry>> entries = cache.entrySet().iterator(); while(entries.hasNext()){ final Map.Entry<String, PrioritalEntry> entry = entries.next(); final PrioritalEntry value = entry.getValue(); value.setExpiredAt(expiredAt); } } finally { writeLock.unlock(); } } protected static long retransmission(final int currentPriority){ if(currentPriority < PrioritalEntry.MAX_PRIORITY){ // binary exponential backoff long freq = currentPriority + Math.round(Math.pow(2, currentPriority)); return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority); } return retransmission(PrioritalEntry.MAX_PRIORITY - 1); } }
んで、PrioritalEntryの実装はこれ
public class PrioritalEntry implements Delayed { public static final int MAX_PRIORITY = 10; public static final int DEFAULT_PRIORITY = 5; private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY); private final String key; private final AtomicReference<String> value; private final AtomicLong expiredAt; public PrioritalEntry(final String key, final String value, final long expiredAt){ this.key = key; this.value = new AtomicReference<String>(value); if(0 == expiredAt){ this.expiredAt = new AtomicLong(Long.MAX_VALUE); } else if(expiredAt < 0) { this.expiredAt = new AtomicLong(0); } else { this.expiredAt = new AtomicLong(relative(expiredAt)); } } protected static long relative(long expiredAt){ // 現在時間 + 指定時間 return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt); } protected static long absolute(long expiredAt){ return TimeUnit.SECONDS.toMillis(expiredAt); } public int incrementPriority(){ if(MAX_PRIORITY < priority.get()){ return MAX_PRIORITY; } return priority.incrementAndGet(); } public int decrementPriority(){ return priority.decrementAndGet(); } public int getPriority(){ return priority.get(); } public String getKey(){ return key; } public String getValue(){ return value.get(); } public void setValue(String value){ this.value.set(value); } public void setExpiredAt(long newValue){ // 現在時間より次の期限を設定 this.expiredAt.set(relative(newValue)); } private long elapsed(){ return expiredAt.get() - System.currentTimeMillis(); } public boolean isExpired(){ return elapsed() < 1; } public long getDelay(TimeUnit unit) { // expiredの時間から経過時間を引き、残り時間を算出 return unit.convert(elapsed(), TimeUnit.MILLISECONDS); } public int compareTo(Delayed o) { PrioritalEntry target = (PrioritalEntry) o; final long x = expiredAt.get(); final long y = target.expiredAt.get(); if(x < y){ return -1; } if(x > y){ return 1; } return 0; } public String toString(){ StringBuilder buf = new StringBuilder(); buf.append("key=").append(key).append(","); buf.append("value=").append(value).append(","); buf.append("priority=").append(priority); return buf.toString(); } }
他にも
public interface CacheLifeCycle { public void register(LifeCycleExecutor executor); public void purge(PrioritalEntry entry); } public class LifeCycleExecutor { protected final ExecutorService executor; public LifeCycleExecutor(final ExecutorService executor){ this.executor = executor; } public void add(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){ executor.execute(new Monitor(lifeCycle, queue)); } public void shutdown(){ executor.shutdown(); try { if(!executor.awaitTermination(10, TimeUnit.SECONDS)){ executor.shutdownNow(); } } catch(InterruptedException e){ e.printStackTrace(System.err); } } private static class Monitor implements Runnable { private static final Log log = LogFactory.getLog(Monitor.class); private final CacheLifeCycle lifeCycle; private final BlockingQueue<PrioritalEntry> queue; private Monitor(CacheLifeCycle lifeCycle, BlockingQueue<PrioritalEntry> queue){ this.lifeCycle = lifeCycle; this.queue = queue; } public void run(){ try { while(true){ PrioritalEntry entry = queue.take(); if(log.isDebugEnabled()){ log.debug("find entry => " + entry); } lifeCycle.purge(entry); } } catch(InterruptedException e){ } } } }
細かく解説していく(誰に?)と
まず、PrioritalEntry(スペルとか意味は気にしない)の実装側から
ほとんどの実装は前回のと同じ。
今回は他にpriority(優先度)も一緒に持ってみた
インスタンス生成時は
public static final int MAX_PRIORITY = 10; public static final int DEFAULT_PRIORITY = 5; private final AtomicInteger priority = new AtomicInteger(DEFAULT_PRIORITY);
で作られるんだけど、LRUCacheのgetとかの呼び出しが行われる毎に、priorityをincrementしてる
// class LRUCache public PrioritalEntry get(String key) { readLock.lock(); try { if(!cache.containsKey(key)){ return null; } PrioritalEntry entry = cache.get(key); // 時間切れなので見えなくする if(entry.isExpired()){ return null; } // 使う人がいたのでpriorityをあげる entry.incrementPriority(); return entry; } finally { readLock.unlock(); } } // class PrioritalEntry public int incrementPriority(){ if(MAX_PRIORITY < priority.get()){ return MAX_PRIORITY; } return priority.incrementAndGet(); } public int decrementPriority(){ return priority.decrementAndGet(); } public int getPriority(){ return priority.get(); }
これで、最も "使われる頻度の高い" Entryは "priorityが高い" という状態を作り出してる。
その他にも、setで値が更新される際にも、既に同じkeyでEntryが作られてたらその部分を "再利用" してみた
public boolean set(String key, String value, long expiredAt){ writeLock.lock(); try { PrioritalEntry newEntry = new PrioritalEntry(key, value, expiredAt); PrioritalEntry previousEntry = cache.putIfAbsent(key, newEntry); // 既存の値がない if(null == previousEntry){ // ということは新しい値 return expiredQueue.offer(newEntry); } previousEntry.setValue(value); previousEntry.setExpiredAt(expiredAt); previousEntry.incrementPriority(); return true; } finally { writeLock.unlock(); } }
ということで、priorityという値によって最近使われているかどうかまで、表現できたので、次は値の削除について
public void purge(PrioritalEntry entry){ writeLock.lock(); try { // priorityを下げる if(entry.decrementPriority() < 1){ // 1以下なら誰も使ってなさそうなので消す if(log.isDebugEnabled()){ log.debug("purge entry => " + entry); } cache.remove(entry.getKey()); return; } // もう一度チャンス entry.setExpiredAt(retransmission(entry.getPriority())); expiredQueue.offer(entry); } finally { writeLock.unlock(); } }
purgeというメソッドでは、LifeCycleExecutorによって、DelayQueueの中身が取り出された "期限切れ" の Entry が引数に渡される
public void run(){ try { while(true){ PrioritalEntry entry = queue.take(); if(log.isDebugEnabled()){ log.debug("find entry => " + entry); } lifeCycle.purge(entry); } } catch(InterruptedException e){ } }
期限切れの Entry であっても、すぐに削除は行わずに "再利用" されるチャンスを与えるため、期限切れの Entry であっても Delay Queue に再送(retransmission)している。
ただ単純に再送するのではなく、再送のアルゴリズムとして昔どこかで覚えた "binary exponential backoff" を使ってみた(うろ覚えバージョン)
// binary exponential backoff long freq = currentPriority + Math.round(Math.pow(2, currentPriority)); return Math.round(0.875 * freq) + Math.round(0.125 * currentPriority);
これを実行してみるとこんな時間が求められる
retransmission(0) => 1 retransmission(1) => 3 retransmission(2) => 5 retransmission(3) => 10 retransmission(4) => 19 retransmission(5) => 33 retransmission(6) => 62 retransmission(7) => 119 retransmission(8) => 232 retransmission(9) => 457 : : :
これは再送時間を作るときに、本来ならリトライ回数を使うんだけど(リトライの数が多いほど、次回の再送時間までの待ちが増える、よってこまめに再送させない)、ここでは priority を使用している。
これによって、priorityが高いものほど、時間切れでも生存時間を多くすることができる。これによって再利用のチャンスが増えるはず。(priorityが高い = 最も再利用される可能性が高そう)
ということで、簡単に動かしてみる
LRUCache cache = new LRUCache(); LifeCycleExecutor executor = new LifeCycleExecutor(Executors.newCachedThreadPool()); cache.register(executor); cache.set("hoge", "123", 5); cache.set("foo", "456", 2); cache.set("bar", "789", 2); cache.get("foo"); try { TimeUnit.SECONDS.sleep(120); } catch(InterruptedException e){ } executor.shutdown();
で、これを動かしてみると。。。
LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=6 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=5 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=5 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=4 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=4 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=3 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=3 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=5 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=2 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=2 LifeCycleExecutor$Monitor: find entry => key=bar,value=789,priority=1 LRUCache: purge entry => key=bar,value=789,priority=0 LifeCycleExecutor$Monitor: find entry => key=hoge,value=123,priority=1 LRUCache: purge entry => key=hoge,value=123,priority=0 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=4 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=3 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=2 LifeCycleExecutor$Monitor: find entry => key=foo,value=456,priority=1 LRUCache: purge entry => key=foo,value=456,priority=0
アクセスのあった "foo" キーは、 同じタイムアウトが設定されている "bar" よりも後に削除されている。
これでなんちゃって LRU はできた(?)
ちなみに、writeLockとかreadLockなどはたぶん使ってない。というか使わなくてもいいはず。
ちなみに、本家 memcached は slab allocationとかを使っているので、メモリの効率からいうとそっちの方がいいかも。
こういった部分を効率よく切り替えれるようにしたいものです
次回は、分散ハッシュ(というか複数台で連携)など。
2010/01/17
最近の我が家の電源まわり事情
最近、我が家の電源まわりが結構すっきりしてきたので、一部紹介しようかと思います
電源タップがすっきりした!
以前はごちゃごちゃしていた、電源まわりが↓のように非常にすっきりしてます。
というのも、ここ最近あらゆる電源まわりがUSB(mini-usb/micro-usb)経由で充電できるものが増えてきたため、USBハブに電源まわりを一元化させてます。
(タコ足ならぬタコUSB状態)
ネット環境もイーモバのPocketWifiにすることで、大きかった(十分小さいけど)、バッファローのルータなどはおさらばです。
でも、その代わりにUSBで電源を供給できるようになった!
100円ショップを活用
ほぼ全てのガジェット関係はUSBから電源を供給できるようになっているものの、一部、付属してなかったりオプションだったりするのですが
最近の100円ショップは進化してます。
USB部分が、mini-usb化されていて、必要なオプションだけ選んで買えというスタイル。
(先端の小さいフタみたいのを個別に買ってくる。これでDSもUSBから電源を供給している)
そうそう。こんな感じのが100円ショップで売ってる
しかも100円ショップでも、1メートル級のUSBも売っていたりすので、電源から遠い場所でも安心!
(びよーん。比較にipod mini置いてみた)
なんでもUSB
ただ、100円ショップは微妙に品揃えが悪かったりする。例えば、iPod関係のUSBポートが売ってなかったりする。。
そんな時はドンキホーテ。100円ショップに匹敵かそれ以上のUSB関係の機器が売ってます。
(これはドンキで買ってきた。iPod/iPod Touch用とiPod mini用)
もう、USBまわりを調整するなら100円ショップとドンキがあれば確実です。
常に動かすものも、USBで
ちょっと例が悪いかもしれないですが、最近池袋にあるゲーセンのUFOキャッチャーで取れたこれ。
(画像が汚いけど、これ・・加湿器なんだぜ・・・)
夏は、扇風機もUSB経由になることは、すでに予想されます
(扇風機もUSB・・・卓上用だけど・・・)
高度にUSB化された家電ライフを目指して
たぶん、100円ショップとドンキホーテの企業努力で、今後とも我が家の電源は、ありとあらゆるものはUSB化されていくと思います。
engadgetで読んだけど、"電源から直接USB"とか"49ポートUSB"のが日本でもぞくぞく登場すると嬉しいなぁ。
今は、USBハブ(4ポートとか)をタコHub化してて使い勝手が悪い。
ってか、たぶん、正しいUSBHubはこんな感じだと思う
(家中のUSBメモリ指してみました(合計4+8+2GB)。思ったより少なかった)
では、Happy USB Life を!
2010/01/15
memcacheの cache の部分を java で
指定時間後に消える(というか参照出来なくなる)をやってみることに。
DelayQueueとDelayedで実装してみる。
Cacheの部分
public class RetireCache implements Cache { protected final Map<String, PeriodEntry> cache = new ConcurrentHashMap<String, PeriodEntry>(); protected final DelayQueue<PeriodEntry> expiredQueue = new DelayQueue<PeriodEntry>(); public boolean set(String key, String value, long expiredAt) { check(); final PeriodEntry entry = new PeriodEntry(key, value, expiredAt); cache.put(key, entry); return expiredQueue.add(entry); } public Entry get(String key) { if(!cache.containsKey(key)){ return null; } PeriodEntry entry = cache.get(key); if(entry.isExpired()){ return null; } return entry; } public void remove(String key) { remove(key, 0); } public void remove(String key, long expiredAt){ PeriodEntry entry = cache.get(key); if(null != entry){ entry.setExpiredAt(expiredAt); } } public void flush() { flush(0); } public void flush(long expiredAt){ Iterator<Map.Entry<String, PeriodEntry>> entries = cache.entrySet().iterator(); while(entries.hasNext()){ Map.Entry<String, PeriodEntry> entry = entries.next(); entry.getValue().setExpiredAt(expiredAt); } } protected void check(){ synchronized(this){ PeriodEntry e = null; while((e = expiredQueue.poll()) != null){ cache.remove(e.getKey()); } } } }
mapにputするときは、同じentryのインスタンスをqueueにも入れておく
getするときには、isExpiredを見て期限切れはnullを返す。
なんとなく、期限切れのqueueの参照にスレッドを立てるのもあれなので、setの時にentryの削除をやってみる
Entryの部分
public class PeriodEntry implements Entry, Delayed { protected final String key; protected final String value; protected long expiredAt; public PeriodEntry(final String key, final String value, final long expiredAt){ this.key = key; this.value = value; if(0 == expiredAt){ this.expiredAt = Long.MAX_VALUE; } else if(expiredAt < 0) { this.expiredAt = 0; } else { this.expiredAt = relative(expiredAt); } } protected static long relative(long expiredAt){ // 現在時間 + 指定時間 return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expiredAt); } protected static long absolute(long expiredAt){ return TimeUnit.SECONDS.toMillis(expiredAt); } public String getKey(){ return key; } public String getValue(){ return value; } public void setExpiredAt(long newValue){ // 現在時間より次の期限を設定 this.expiredAt = relative(newValue); } private long elapsed(){ return expiredAt - System.currentTimeMillis(); } public boolean isExpired(){ return elapsed() < 1; } public long getDelay(TimeUnit unit) { // expiredの時間から経過時間を引き、残り時間を算出 return unit.convert(elapsed(), TimeUnit.MILLISECONDS); } public int compareTo(Delayed o) { PeriodEntry target = (PeriodEntry) o; if(expiredAt < target.expiredAt){ return -1; } if(expiredAt > target.expiredAt){ return 1; } return 0; } }
entryは基本的に相対的な時間を設定するようにしてる以外は、普通のでDelayedな実装で。
テスト
public class RetireCacheTest { @Test public void 指定時間で参照出来なくなる(){ Cache cache = new RetireCache(); cache.set("hoge", "hogeValue", 2); Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue"); try { TimeUnit.SECONDS.sleep(2); } catch(InterruptedException e){ } Assert.assertNull("参照できなくなってる", cache.get("hoge")); } @Test public void 指定時間後に消えてる(){ Cache cache = new RetireCache(); cache.set("hoge", "hogeValue", 2); try { TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e){ } Assert.assertEquals(cache.get("hoge").getValue(), "hogeValue"); cache.remove("hoge", 5); try { TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e){ } Assert.assertEquals("これは消えてるのが正解?", cache.get("hoge").getValue(), "hogeValue"); } @Test public void ゼロを指定すると消えない(){ Cache cache = new RetireCache(); cache.set("hoge", "hogeValue", 0); cache.set("foo", "fooValue", 1); try { TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e){ } Assert.assertNull("これは1秒後消えてる", cache.get("foo")); Assert.assertEquals("これは消えない", cache.get("hoge").getValue(), "hogeValue"); } @Test public void キーがない(){ Cache cache = new RetireCache(); cache.set("hoge", "1", 0); Assert.assertNull(cache.get("foo")); } : : 省略 : }
memcacheのdeleteと一時実装が違うかな。本物は、delete key time でtimeにどんな時間を指定してもすぐに参照出来なくなる
時間を指定しない場合
set hoge 0 30 4 1234 STORED delete hoge DELETED get hoge END
消える時間を指定しても...
set hoge 0 30 4 1234 STORED get hoge VALUE hoge 0 4 1234 END delete hoge 10000 DELETED get hoge END
なかなか難しい
次は LRU
2010/01/13
NIO にしたら爆速になった(was: なんちゃってmemcache互換サーバ)
昨日の続き。
server の部分を thread pool から、nio な ノンブロッキング にしてみたら、思った以上に早かった
メイン
public class Server extends Thread { protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>(); protected final ExecutorService acceptPool; protected final Cache cache; protected final int port; protected final int maxConnection; public Server(final int port, final int maxConnection){ this(port, maxConnection, ByteSizeUnit.Mega.toLong(64)); } public Server(final int port, final int maxConnection, final long maxMemory){ this.port = port; this.maxConnection = maxConnection; this.cache = new LRUCache(maxMemory); this.acceptPool = Executors.newFixedThreadPool(maxConnection); } public static void main(String...args){ Server s = new Server(12221, 32); s.start(); try { s.join(); } catch(InterruptedException e){ e.printStackTrace(System.err); } } public void run(){ ServerSocketChannel channel = null; try { channel = ServerSocketChannel.open(); channel.configureBlocking(false); final ServerSocket serverSocket = channel.socket(); serverSocket.setReuseAddress(true); serverSocket.bind(new InetSocketAddress(port)); final Selector selector = Selector.open(); try { channel.register(selector, SelectionKey.OP_ACCEPT, new AcceptAction(cache)); while(0 < selector.select()){ Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ final SelectionKey key = keys.next(); keys.remove(); if(!key.isValid()){ continue; } Action action = (Action) key.attachment(); action.execute(key); } } } finally { Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ final SelectionKey key = keys.next(); key.channel().close(); } } } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if(null != channel){ try { channel.close(); } catch(IOException e){ e.printStackTrace(); } } } } }
新規接続を受け入れるAccept部分
public class AcceptAction implements Action { private final Cache cache; public AcceptAction(final Cache cache){ this.cache = cache; } public void execute(SelectionKey selectionKey) { SocketChannel channel = null; try { channel = ((ServerSocketChannel) selectionKey.channel()).accept(); channel.configureBlocking(false); ReadAction action = new ReadAction(cache, new ByteBufferReader(channel)); channel.register(selectionKey.selector(), SelectionKey.OP_READ, action); } catch (IOException e) { e.printStackTrace(); } } }
んで、読み込みと書き込みは分けた
public class ReadAction implements Action { protected final Cache cache; protected final Reader reader; protected final Handler handler; public ReadAction(final Cache cache, final Reader reader){ this.cache = cache; this.reader = reader; this.handler = new Handler(cache, reader); } public void execute(SelectionKey selectionKey) { if(!selectionKey.isReadable()){ return; } SocketChannel channel = (SocketChannel) selectionKey.channel(); try { if(!reader.readable()){ channel.close(); selectionKey.cancel(); return; } String line = reader.readLine(); if(line == null){ channel.close(); selectionKey.cancel(); return; } Return r = handler.execute(line); WriteAction action = new WriteAction(r, this); channel.register(selectionKey.selector(), SelectionKey.OP_WRITE, action); } catch(IOException e){ e.printStackTrace(); } } private static class Handler implements CommandVisitor { private final Cache cache; private final Reader reader; private Handler(final Cache cache, final Reader reader){ this.cache = cache; this.reader = reader; } public Return execute(String line){ try { final StringReader r = new StringReader(line); final MemcacheParser parser = new MemcacheParser(r); Command command = parser.Command(); return command.accept(this, null); } catch(ParseException e){ e.printStackTrace(); return new Return(ResponseType.ERROR); } catch(Exception e){ e.printStackTrace(); return new Return(ResponseType.SERVER_ERROR, e.getMessage()); } } : : // 省略 : public Return visit(RetrievalCommand command, Parameter parameter) { final List<String> keys = command.getKeys(); final List<Return> returns = new ArrayList<Return>(); for(final String key: keys){ Entry entry = cache.get(key); if(null == entry){ returns.add(new Return(ResponseType.END)); continue; } returns.add(new Return(ResponseType.SEND_VALUE, key, entry.getFlag(), entry.getLength(), entry.getValue() )); } return new Return(returns.toArray(new Return[returns.size()])); } public Return visit(SetCommand command, Parameter parameter) { try { final String nextLine = reader.readLine(); if(cache.set(command.getKey(), nextLine, command.getFlags(), command.getExpTime())){ return new Return(ResponseType.STORED); } return new Return(ResponseType.NOT_STORED); } catch(IOException e){ e.printStackTrace(); return new Return(ResponseType.ERROR); } } public Return visit(VersionCommand command, Parameter parameter) { return null; } } }
public class WriteAction implements Action { private static final Charset ASCII = Charset.forName("US-ASCII"); protected final Return ret; protected final ReadAction action; public WriteAction(final Return ret, ReadAction action){ this.ret = ret; this.action = action; } public void execute(SelectionKey selectionKey) { if(!selectionKey.isWritable()){ return; } SocketChannel channel = (SocketChannel) selectionKey.channel(); final CharBuffer buf = CharBuffer.wrap(ret.renderMessage()); try { ByteBuffer bytes = ASCII.encode(buf); int capacity = bytes.capacity(); int size = channel.write(bytes); if(size < capacity){ int sum = size; while(sum < capacity){ if(size == 0){ return; } bytes.position(sum); size = channel.write(bytes); sum += size; } } try { channel.register(selectionKey.selector(), SelectionKey.OP_READ, action); } catch(ClosedChannelException e){ e.printStackTrace(); } } catch(IOException e){ e.printStackTrace(); } } }
ということで、これをつかって簡単に比較すると
$target = array( array('host' => 'localhost', 'port' => 11211), array('host' => 'localhost', 'port' => 12221) ); foreach($target as $t){ $memcache = new Memcache; $memcache->connect($t['host'], $t['port']); $fail = 0; $elapsed = microtime(true); for($i = 0; $i < 1000; ++$i){ if(false === $memcache->set('hoge', '1234')){ echo 'ERROR!!', PHP_EOL; } if(false === $memcache->set('hoge', '123')){ echo 'ERROR!!!', PHP_EOL; } $value = $memcache->get('hoge'); if(false === $memcache->set('hoge', $value + 1)){ echo 'ERRROR!!!!', PHP_EOL; } $result = $memcache->get('hoge'); if($result != 124){ $fail++; } } echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL; echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL; echo 'fail => ', $fail, PHP_EOL; }
1) target host => localhost port =>11211 elapsed: 0.66518497467 fail => 0 target host => localhost port =>12221 elapsed: 0.870754003525 fail => 0 2) target host => localhost port =>11211 elapsed: 1.1857790947 fail => 0 target host => localhost port =>12221 elapsed: 0.868647098541 fail => 0
memcacheに匹敵してきた。
ってか、ByteBufferを読み書きするのは初めて書いたので、すごく手こずった。。
単純に \r\n までの一行を読みたいだけなのに。。
ということで、BufferedReaderのreadLine的なのを書いて色々とお茶を濁す。。
public class ByteBufferReader { protected final SocketChannel channel; protected final ByteBuffer buffer; public ByteBufferReader(final SocketChannel channel){ this.channel = channel; this.buffer = ByteBuffer.allocateDirect(512); } public boolean readable() throws IOException { return buffer.hasRemaining(); } public String readLine() throws IOException { final StringBuilder sb = new StringBuilder(); readInto(sb); if(sb.length() < 1){ return null; } return sb.toString(); } private void readInto(final StringBuilder sb) throws IOException { while(true){ int read = channel.read(buffer); if(read == 0){ break; } if(read == -1){ return; } } buffer.flip(); while(more(sb)); buffer.compact(); } private boolean more(final StringBuilder sb) throws IOException { if(buffer.hasRemaining()){ char ch = (char) buffer.get(); switch(ch){ case '\n': return false; case '\r': return true; default: sb.append(ch); return true; } } buffer.clear(); return true; } }
とりあえず、一段落
NIOは難しかった。
2010/01/11
JavaCC で memcache text protocol の BNF(と、なんちゃってmemcache互換サーバ)
ANTLRのやつがあったけど、JavaCCのがみつからなかったので、でっちあげた
via - http://harward.us/~nharward/antlr/memcached_protocol.g
できあがったのは、こんな感じ
SKIP: {
" " | "\t" | "\r" | "\n"
}
TOKEN: {
< NUMBER: ["1"-"9"] (["0"-"9"])* | "0" >
| < FLAGS: < NUMBER > >
| < TIME: < NUMBER > >
| < LENGTH: < NUMBER > >
| < CREMENT_VALUE: < NUMBER > >
| < CAS_UNIQUE: < NUMBER > >
}
TOKEN: {
< SET_STATEMENT: "set" >
| < ADD_STATEMENT: "add" >
| < REPLACE_STATEMENT: "replace" >
| < APPEND_STATEMENT: "append" >
| < PREPEND_STATEMENT: "prepend" >
| < CAS_STATEMENT: "cas" >
| < STORAGE_STATEMENT:
< SET_STATEMENT >
| < ADD_STATEMENT >
| < REPLACE_STATEMENT >
| < APPEND_STATEMENT >
| < PREPEND_STATEMENT >
>
| < STORAGE_COMMAND:
(
< STORAGE_STATEMENT > < KEY > < FLAGS > < TIME > < LENGTH >
| < CAS_STATEMENT > < KEY > < FLAGS > < TIME > < LENGTH > < CAS_UNIQUE >
)
(< NOREPLY >)?
>
}
TOKEN: {
< RETRIEVAL_STATEMENT: "get" | "gets" >
| < RETRIEVAL_COMMAND:
< RETRIEVAL_STATEMENT > < KEY >
>
}
TOKEN: {
< DELETE_STATEMENT: "delete" >
| < DELETE_COMMAND:
< DELETE_STATEMENT > < KEY > (< TIME >)? (< NOREPLY >)?
>
}
TOKEN: {
< INCREMENT_STATEMENT: "incr" >
| < INCREMENT_COMMAND:
< INCREMENT_STATEMENT > < KEY > < CREMENT_VALUE > (< NOREPLY >)?
>
}
TOKEN: {
< DECREMENT_STATEMENT: "decr" >
| < DECREMENT_COMMAND:
< DECREMENT_STATEMENT > < KEY > < CREMENT_VALUE > (< NOREPLY >)?
>
}
TOKEN: {
< STATISTICS_STATEMENT: "STAT" >
| < STATISTICS_OPTION: "items" | "slabs" | "sizes" >
| < STATISTICS_COMMAND:
< STATISTICS_STATEMENT > (< STATISTICS_OPTION >)?
>
}
TOKEN: {
< FLUSH_STATEMENT: "flush_all" >
| < FLUSH_COMMAND:
< FLUSH_STATEMENT > (< TIME >)? (< NOREPLY >)?
>
}
TOKEN: {
< VERSION_STATEMENT: "version" >
| < VERSION_COMMAND:
< VERSION_STATEMENT >
>
}
TOKEN: {
< NOREPLY: "noreply" >
}
// last match
TOKEN: {
< KEY: (~[" ", "\r","\n"])+ >
}
少し、数値まわりのToken(TIME, LENGTHとか)が適当すぎるかな。
んで、これにテキトーなNodeをparseしてあげてみる
Command Command():
{
Command command;
}
{
(
command = RetrievalCommand()
| command = StorageCommand()
| command = DeleteCommand()
| command = VersionCommand()
)
{
return command;
}
}
StorageCommand StorageCommand():
{
StorageCommand command;
String key;
Long flags = 0L;
Long time = 0L;
Long length = 0L;
Boolean noreply = Boolean.FALSE;
}
{
command = createStorageCommand()
key = Key()
flags = Flags()
time = Time()
length = Length()
noreply = Noreply()
{
command.setNode(jjtThis);
command.setKey(key);
command.setFlags(flags);
command.setExpTime(time);
command.setLength(length);
command.setNoreply(noreply);
return command;
}
}
StorageCommand createStorageCommand():
{}
{
(
< SET_STATEMENT >
{
return new SetCommand();
}
| < ADD_STATEMENT >
{
return new AddCommand();
}
| < REPLACE_STATEMENT >
{
return new ReplaceCommand();
}
| < APPEND_STATEMENT >
{
return new AppendCommand();
}
| < PREPEND_STATEMENT >
{
return new PrependCommand();
}
)
}
RetrievalCommand RetrievalCommand():
{
RetrievalCommand command = new RetrievalCommand();
String key;
}
{
< RETRIEVAL_STATEMENT >
(
key = Key()
{
command.addKey(key);
}
)+
{
command.setNode(jjtThis);
return command;
}
}
DeleteCommand DeleteCommand():
{
DeleteCommand command = new DeleteCommand();
String key;
Long time = 0L;
Boolean noreply = Boolean.FALSE;
}
{
< DELETE_STATEMENT >
key = Key()
time = Time()
noreply = Noreply()
{
command.setNode(jjtThis);
command.setKey(key);
command.setExpTime(time);
command.setNoreply(noreply);
return command;
}
}
VersionCommand VersionCommand():
{}
{
< VERSION_STATEMENT >
{
VersionCommand command = new VersionCommand();
command.setNode(jjtThis);
return command;
}
}
String Key():
{ Token key; }
{
key = < KEY >
{
return key.image;
}
}
Long Flags():
{ Token flags; }
{
flags = < NUMBER >
{
return Long.valueOf(flags.image);
}
}
Long Time():
{ Token time; }
{
time = < NUMBER >
{
return Long.valueOf(time.image);
}
}
Long Length():
{ Token length; }
{
length = < NUMBER >
{
return Long.valueOf(length.image);
}
}
Boolean Noreply():
{ Boolean noreply = Boolean.FALSE; }
{
[< NOREPLY >{noreply = Boolean.TRUE;}]
{
return noreply;
}
}
これに、適当なコードを投げてあげると
{
StringReader reader = new StringReader("get hoge\r\n");
MemcacheParser parser = new MemcacheParser(reader);
try {
parser.Command();
} catch (ParseException e) {
e.printStackTrace();
}
}
{
StringReader reader = new StringReader("gets hoge foo\r\n");
MemcacheParser parser = new MemcacheParser(reader);
try {
parser.Command();
} catch (ParseException e) {
e.printStackTrace();
}
}
{
StringReader reader = new StringReader("set xyzkey 0 0 6\r\n");
MemcacheParser parser = new MemcacheParser(reader);
try {
parser.Command();
} catch (ParseException e) {
e.printStackTrace();
}
}
Call: Command
Call: RetrievalCommand
Consumed token: <<RETRIEVAL_STATEMENT>: "get" at line 1 column 1>
Call: Key
Consumed token: <<KEY>: "hoge" at line 1 column 5>
Return: Key
Return: RetrievalCommand
Return: Command
Call: Command
Call: RetrievalCommand
Consumed token: <<RETRIEVAL_STATEMENT>: "gets" at line 1 column 1>
Call: Key
Consumed token: <<KEY>: "hoge" at line 1 column 6>
Return: Key
Call: Key
Consumed token: <<KEY>: "foo" at line 1 column 11>
Return: Key
Return: RetrievalCommand
Return: Command
Call: Command
Call: StorageCommand
Call: createStorageCommand
Consumed token: <"set" at line 1 column 1>
Return: createStorageCommand
Call: Key
Consumed token: <<KEY>: "xyzkey" at line 1 column 5>
Return: Key
Call: Flags
Consumed token: <<NUMBER>: "0" at line 1 column 12>
Return: Flags
Call: Time
Consumed token: <<NUMBER>: "0" at line 1 column 14>
Return: Time
Call: Length
Consumed token: <<NUMBER>: "6" at line 1 column 16>
Return: Length
Call: Noreply
Return: Noreply
Return: StorageCommand
Return: Command
と、こんな感じになる。
<NUMBER>とか、ホント、マジメにtokenが書けてないですね。。
ここまでできたので、set と get しかない、memcached 互換ものをでっち上げてみた
public class Server extends Thread { protected final BlockingQueue<Socket> accept = new LinkedBlockingQueue<Socket>(); protected final Cache<String, String> cache = new LRUCache<String, String>(); protected final ExecutorService acceptPool; protected final int port; protected final int maxConnection; public Server(final int port, final int maxConnection){ this.port = port; this.maxConnection = maxConnection; this.acceptPool = Executors.newFixedThreadPool(maxConnection); } public static void main(String...args){ Server s = new Server(12221, 32); s.start(); while(s.isAlive()){ try { TimeUnit.MICROSECONDS.sleep(10); } catch(InterruptedException e){} } } public void run(){ try { ServerSocketFactory factory = ServerSocketFactory.getDefault(); ServerSocket socket = factory.createServerSocket(port, maxConnection); socket.setReuseAddress(true); while(!socket.isClosed()){ final Socket accept = socket.accept(); acceptPool.execute(new AcceptHandler(accept)); } } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } protected class AcceptHandler implements Runnable { private final Socket socket; public AcceptHandler(final Socket socket){ this.socket = socket; } public void run(){ try { final InputStream in = socket.getInputStream(); final OutputStream out = socket.getOutputStream(); final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final DataOutputStream writer = new DataOutputStream(out); final CommandWorker worker = new CommandWorker(reader); while(!socket.isClosed()){ if(!worker.prepare()){ break; } Return r = worker.call(); writer.writeBytes(r.renderMessage()); } } catch(IOException e){ e.printStackTrace(); } finally { try { socket.close(); } catch(IOException e){ // nop } } } } private class CommandWorker implements CommandVisitor { private final BufferedReader reader; private String currentLine; public CommandWorker(final BufferedReader reader) { this.reader = reader; } public boolean prepare(){ try { String line = reader.readLine(); if(null == line){ return false; } currentLine = line; return true; } catch(IOException e){ return false; } } public Return call() { try { final StringReader r = new StringReader(currentLine); final MemcacheParser parser = new MemcacheParser(r); Command command = parser.Command(); return command.accept(this, null); } catch(ParseException e){ return new Return(ResponseType.ERROR); } } public Return visit(Command command, Parameter parameter) { return null; } public Return visit(AddCommand command, Parameter parameter) { return null; } public Return visit(AppendCommand command, Parameter parameter) { return null; } public Return visit(CasCommand command, Parameter parameter) { return null; } public Return visit(DeleteCommand command, Parameter parameter) { return null; } public Return visit(PrependCommand command, Parameter parameter) { return null; } public Return visit(ReplaceCommand command, Parameter parameter) { return null; } public Return visit(RetrievalCommand command, Parameter parameter) { String value = cache.get(command.getKeys().get(0)); if(null == value){ return new Return(ResponseType.END); } return new Return(ResponseType.SEND_VALUE, command.getKeys().get(0), 0, value.length(), value ); } public Return visit(SetCommand command, Parameter parameter) { try { final String nextLine = reader.readLine(); cache.put(command.getKey(), nextLine, command.getExpTime().longValue()); return new Return(ResponseType.STORED); } catch(IOException e){ e.printStackTrace(); return new Return(ResponseType.ERROR); } } public Return visit(VersionCommand command, Parameter parameter) { return null; } } }
見事に、setとgetしか実装してません。しかもハンドリングは少し適当。
ということで、これと(java-lang)、memcached(c-lang)で比較してみた。
$target = array(
array('host' => 'localhost', 'port' => 11211),
array('host' => 'localhost', 'port' => 12221)
);
foreach($target as $t){
$memcache = new Memcache;
$memcache->connect($t['host'], $t['port']);
$elapsed = microtime(true);
for($i = 0; $i < 1000; ++$i){
$memcache->set('hoge', '123');
$memcache->set('hoge', '124');
$memcache->get('hoge');
}
echo 'target host => ', $t['host'], ' port =>', $t['port'], PHP_EOL;
echo 'elapsed: ', (microtime(true) - $elapsed), PHP_EOL;
}
target host => localhost port =>11211 elapsed: 0.420136213303 target host => localhost port =>12221 elapsed: 1.34848499298
なんつーか、「もうちょっとがんばりま賞」って感じで残念感があります。(約3倍遅い)
とりあえず、動きそうなので、他の実装も頑張る。
2010/01/07
PHP で ConsistentHash
ConsistentHashの動きをPHPでもやってみた
via - http://www.hyuki.com/yukiwiki/wiki.cgi?ConsistentHashing
こんな感じで実装
interface HashFunction { public function hash($key); } interface Circle { public function put($key, $value); } interface Node { public function put($key, $value); public function get($key); public function has($key); public function keys(); public function getName(); } class TreeMap implements Circle { private $values; public function __construct(array $values = array()){ $this->values = new ArrayObject($values); } public function put($key, $value){ $this->values->offsetSet($key, $value); } public function get($key){ return $this->values->offsetGet($key); } public function remove($key){ $this->values->offsetUnset($key); } public function has($key){ return $this->values->offsetExists($key); } public function isEmpty(){ return $this->values->count() < 1; } public function firstKey(){ $map = clone $this->values; $map->ksort(); $arrayKeys = array_keys($map->getArrayCopy()); return $arrayKeys[0]; } public function tailMap($key){ $map = clone $this->values; $map->ksort(); $array = $map->getArrayCopy(); $arrayKeys = array_keys($array); $results = array(); foreach($arrayKeys as $arrayKey){ if($key <= $arrayKey){ $results[$arrayKey] = $array[$arrayKey]; } } return new self($results); } } class ConsistentHash { private $hashFunction; private $numberOfReplicas; private $circle; private $nodes = array(); public function __construct(HashFunction $hashFunction, $numberOfReplicas){ $this->hashFunction = $hashFunction; $this->numberOfReplicas = $numberOfReplicas; $this->circle = new TreeMap; } public function getNodes(){ return $this->nodes; } public function add(Node $node){ for($i = 0; $i < $this->numberOfReplicas; ++$i){ $nodeKey = $this->hashFunction->hash($node->getName() . $i); $this->circle->put($nodeKey, $node); } $this->nodes[] = $node; } public function get($key){ if($this->circle->isEmpty()){ return null; } $hash = $this->hashFunction->hash($key); if(!$this->circle->has($hash)){ $tailMap = $this->circle->tailMap($hash); if($tailMap->isEmpty()){ $hash = $this->circle->firstKey(); } else { $hash = $tailMap->firstKey(); } } return $this->circle->get($hash); } public function remove(Node $node){ for($i = 0; $i < $this->numberOfReplicas; ++$i){ $nodeKey = $this->hashFunction->hash($node->getName() . $i); $this->circle->remove($nodeKey); } } } class ConsistentHashNode implements Node { private $hash; private $keys = array(); public function __construct(ConsistentHash $hash){ $this->hash = $hash; } public function put($key, $value){ $this->hash->get($key)->put($key, $value); $this->keys[] = $key; } public function get($key){ return $this->hash->get($key)->get($key); } public function has($key){ return $this->hash->get($key)->has($key); } public function keys(){ return $this->keys; } public function getName(){ return __CLASS__; } } class IdentNode implements Node { private $name; private $values = array(); public function __construct($name){ $this->name = $name; } public function put($key, $value){ $this->values[$key] = $value; } public function get($key){ return $this->values[$key]; } public function has($key){ return isset($this->values); } public function keys(){ return array_keys($this->values); } public function getName(){ return $this->name; } }
PHPでTreeMapみたいな実装(keyを並び替えながら挿入、指定key以降のmap取得)が思いつかなかったので、ksortとかつかって、なんちゃって実装
んで、ハッシュアルゴリズムとかは、こんな感じで実装して、必要に応じて切り替える
class HashMD5Integer implements HashFunction { public function hash($key){ $values = unpack('H*', md5($key)); return $values[1]; } } class HashSha1Integer implements HashFunction { public function hash($key){ $values = unpack('H*', sha1($key)); return $values[1]; } } class HashCRC32Integer implements HashFunction { public function hash($key){ $values = unpack('H*', crc32($key)); return $values[1]; } }
これについても、ハッシュ値を並び替えるため、数値にしたくて、ハッシュ文字列(md5)を数値(digit)にする方法が思いつかなかったので、unpack(H*)。もっといい方法があると思う。
こんな風に動かす
$hash = new ConsistentHash(new HashMD5Integer, 8); $hash->add(new IdentNode('hoge1')); $hash->add(new IdentNode('hoge2')); $hash->add(new IdentNode('hoge3')); $map = new ConsistentHashNode($hash); for($i = 0; $i < 10; ++$i){ $map->put('key' . $i, 'value' . $i); } foreach($hash->getNodes() as $node){ echo $node->getName(), ':', join(',', $node->keys()), PHP_EOL; }
実行結果としては、こんな感じで出た。まあそこそこバラけてる(?)
hoge1:key1,key4,key6 hoge2:key3,key5,key7 hoge3:key0,key2,key8,key9
試しに他のアルゴリズムでも実行して比較してみた
$hashAlgos = array( new HashMD5Integer, new HashSha1Integer, new HashCRC32Integer ); foreach($hashAlgos as $algo){ echo get_class($algo), PHP_EOL; $hash = new ConsistentHash($algo, 32); for($i = 0; $i < 10; $i++){ $hash->add(new IdentNode('hoge' . $i)); } $map = new ConsistentHashNode($hash); for($i = 0; $i < 100; $i++){ $map->put('*' . $i, $i); } $allKeys = $map->keys(); foreach($hash->getNodes() as $node){ $keys = $node->keys(); echo 'node(', (count($keys) / count($allKeys)) * 100, '%):', $node->getName(), ', keys:', join(',', $keys), PHP_EOL; } }
HashMD5Integer node(12%):hoge0, keys:*2,*9,*21,*27,*30,*47,*49,*52,*53,*70,*77,*89 node(12%):hoge1, keys:*0,*8,*10,*20,*23,*33,*40,*41,*69,*86,*90,*98 node(11%):hoge2, keys:*16,*22,*31,*38,*39,*63,*64,*72,*75,*79,*88 node(10%):hoge3, keys:*12,*48,*56,*58,*62,*65,*67,*73,*83,*84 node(4%):hoge4, keys:*6,*61,*91,*95 node(11%):hoge5, keys:*1,*4,*7,*13,*17,*28,*42,*44,*54,*55,*94 node(13%):hoge6, keys:*5,*11,*26,*35,*37,*43,*50,*51,*57,*74,*87,*92,*97 node(8%):hoge7, keys:*18,*32,*36,*45,*46,*60,*82,*85 node(8%):hoge8, keys:*15,*19,*25,*59,*68,*78,*81,*96 node(11%):hoge9, keys:*3,*14,*24,*29,*34,*66,*71,*76,*80,*93,*99 HashSha1Integer node(11%):hoge0, keys:*29,*33,*36,*38,*46,*55,*67,*74,*81,*86,*92 node(8%):hoge1, keys:*1,*2,*18,*48,*69,*89,*93,*99 node(8%):hoge2, keys:*6,*57,*58,*62,*66,*68,*71,*78 node(12%):hoge3, keys:*8,*14,*32,*39,*42,*51,*56,*75,*79,*88,*95,*97 node(14%):hoge4, keys:*3,*9,*12,*13,*34,*40,*43,*45,*53,*59,*64,*65,*77,*94 node(10%):hoge5, keys:*7,*23,*24,*41,*49,*61,*72,*76,*80,*91 node(10%):hoge6, keys:*0,*10,*11,*15,*17,*25,*28,*35,*87,*98 node(8%):hoge7, keys:*4,*20,*21,*27,*31,*50,*63,*85 node(12%):hoge8, keys:*5,*16,*22,*26,*30,*37,*44,*47,*52,*70,*90,*96 node(7%):hoge9, keys:*19,*54,*60,*73,*82,*83,*84 HashCRC32Integer node(13%):hoge0, keys:*11,*12,*13,*14,*16,*17,*40,*42,*44,*45,*47,*49,*98 node(9%):hoge1, keys:*58,*62,*63,*64,*65,*74,*75,*81,*96 node(3%):hoge2, keys:*29,*30,*36 node(14%):hoge3, keys:*4,*5,*9,*21,*25,*27,*32,*38,*39,*51,*78,*87,*92,*99 node(7%):hoge4, keys:*43,*50,*56,*59,*80,*86,*97 node(22%):hoge5, keys:*10,*15,*18,*19,*34,*41,*48,*52,*53,*54,*55,*57,*60,*61,*66,*67,*68,*69,*70,*71,*85,*93 node(5%):hoge6, keys:*8,*24,*28,*31,*79 node(5%):hoge7, keys:*0,*1,*35,*46,*82 node(16%):hoge8, keys:*2,*3,*6,*22,*23,*26,*37,*72,*73,*76,*77,*83,*88,*90,*94,*95 node(6%):hoge9, keys:*7,*20,*33,*84,*89,*91
crc32とかだと、似たようなキー(52-55,66-71とか)は一部集中してる感じ。逆に md5, sha1 あたりは一様にバラけてるので、バラ撒くなら後者の2つを使う方が良さそう。
というか、crc32は一部に偏りすぎな気もする(実装の問題?)
ノードを増やしてみたりした結果はこれ
$hash = new ConsistentHash(new HashMD5Integer, 32); $hash->add(new IdentNode('hoge1')); $hash->add(new IdentNode('hoge2')); $hash->add(new IdentNode('hoge3')); $map = new ConsistentHashNode($hash); for($i = 0; $i < 10; ++$i){ $map->put('key' . $i, 'value' . $i); } $hash->add(new IdentNode('hoge4')); for($i = 10; $i < 20; ++$i){ $map->put('key' . $i, 'value' . $i); } $hash->add(new IdentNode('hoge5')); for($i = 30; $i < 40; ++$i){ $map->put('key' . $i, 'value' . $i); } $allKeys = $map->keys(); foreach($hash->getNodes() as $node){ $keys = $node->keys(); echo 'node(', (count($keys) / count($allKeys)) * 100, '%):', $node->getName(), ', keys:', join(',', $keys), PHP_EOL; }
node(30%):hoge1, keys:key0,key4,key6,key7,key16,key17,key30,key31,key34 node(16.6666666667%):hoge2, keys:key2,key3,key5,key9,key19 node(23.3333333333%):hoge3, keys:key1,key8,key10,key11,key12,key14,key15 node(20%):hoge4, keys:key13,key18,key32,key35,key37,key39 node(10%):hoge5, keys:key33,key36,key38
これはこういう動きが正しいのかわからない。。。(先に存在していたノードはキーが増えるよなぁ)
んで、結論。
ConsistentHashは面白い

S2Container/S2Dao.PHP5を使っていて困ったこと・質問などがありましたら、S2Container-PHP5のMLにてお願いします。