2010/02/07
HiveJDBC を S2JDBC 経由で使えるようにする
hadoop の話題。その3
HiveのJDBCを使えばリモート上で動いているHiveに対して、JDBC(over thrift)経由でHive QLを実行出来るのですごく便利です。
ref - Hive/HiveClient - Hadoop Wiki
HiveJDBCはフツーのJDBCっぽく使えるので、こんな感じで普通のDBサーバにSQLを投げる感覚で使える
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; public class HiveConnection { public static void main(String...args) throws SQLException { try { Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); System.exit(1); } Connection conn = DriverManager.getConnection("jdbc:hive://master1:10000/", "", ""); Statement stmt = conn.createStatement(); stmt.execute("SELECT distinct(id) FROM hoge"); ResultSet rs = stmt.getResultSet(); while(rs.next()){ System.out.println(rs.getString(1)); } } }
なので、便利なものは便利なものと合体させてしまえ。ということで、みんな大好き S2JDBCで使えるようにしてみた
HiveServerを起動/停止
とりあえず、HiveJDBCを利用するには、thrift経由でHiveServerを叩ける必要がある。
以下のようなスクリプトを用意
start.sh
#!/usr/bin/env bash pidfile=$HIVE_PID_DIR/hiveserver.pid logfile=$HIVE_LOG_DIR/hiveserver.log if [ -f $pidfile ]; then echo running as process `cat $pidfile`. stop it first exit 1 fi nohup $HIVE_HOME/bin/hive --service hiveserver > $logfile 2>&1 < /dev/null & echo $! > $pidfile
stop.sh
#!/usr/bin/env bash pidfile=$HIVE_PID_DIR/hiveserver.pid logfile=$HIVE_LOG_DIR/hiveserver.log if [ -f $pidfile ]; then kill `cat $pidfile` rm $pidfile else echo no pidfile $pidfile fi
これで、実行したサーバでデフォルトのポート10000で起動する。
ちなみに、HADOOP_CONF_DIRとかHIVE_CONF_DIRは別途ちゃんと設定すること
HiveDialectを用意
先に書いておくと、SQLのorder by ...に相当するものは、HiveQLだと sort by ...なんだけど、今はまだ用意してない。今度書く
package org.seasar.extension.jdbc.dialect; import org.seasar.extension.jdbc.SelectForUpdateType; public class HiveDialect extends StandardDialect { @Override public String getName() { return "hive"; } @Override public boolean supportsLimit() { return true; } @Override public String convertLimitSql(String sql, int offset, int limit) { StringBuilder buf = new StringBuilder(sql.length() + 20); buf.append(sql); buf.append(" limit "); buf.append(limit); return buf.toString(); } @Override public boolean supportsBatchUpdateResults() { return false; } @Override public boolean supportsCursor() { return false; } @Override public boolean supportsForUpdate(SelectForUpdateType type, boolean withTarget) { return false; } @Override public boolean supportsGetGeneratedKeys() { return false; } @Override public boolean supportsIdentity() { return false; } @Override public boolean supportsInnerJoinForUpdate() { return false; } @Override public boolean supportsLockHint() { return false; } @Override public boolean supportsOffset() { return false; } @Override public boolean supportsOffsetWithoutLimit() { return false; } @Override public boolean supportsOuterJoinForUpdate() { return false; } @Override public boolean supportsSequence() { return false; } }
HiveConnectionPoolを用意
これは、HiveがXA transactionとかconnection#closeなんかを実行することができない(未実装のコードなので...svn:..jdbc/HiveConnection)ので ConnectionPoolImpl を継承して ConnectionWrapper を取る部る部分を override
package org.seasar.extension.dbcp.impl; import java.sql.Connection; import java.sql.SQLException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.sql.XAConnection; import javax.transaction.Transaction; import org.seasar.extension.dbcp.ConnectionWrapper; import org.seasar.framework.util.TransactionManagerUtil; public class HiveConnectionPool extends ConnectionPoolImpl { private Map<Transaction, ConnectionWrapper> txActivePool = new ConcurrentHashMap<Transaction, ConnectionWrapper>(); @Override public synchronized ConnectionWrapper checkOut() throws SQLException { Transaction tx = getTransaction(); ConnectionWrapper conn = txActivePool.get(tx); if(null == conn){ conn = createConnection(tx); } if(null != tx){ txActivePool.put(tx, conn); } return conn; } protected ConnectionWrapper createConnection(Transaction transaction) throws SQLException { XAConnection xaConnection = getXADataSource().getXAConnection(); Connection connection = xaConnection.getConnection(); return new HiveConnectionWrapper(xaConnection, connection, this, transaction); } protected Transaction getTransaction(){ return TransactionManagerUtil.getTransaction(getTransactionManager()); } }
HiveConnectionWrapper を用意する
これは後述するPreparedStatement対策として用意。PreparedStatementWrapper さえも wrapper を用意する必要があるので、ConnectionWrapperImpl を override
package org.seasar.extension.dbcp.impl; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import javax.sql.XAConnection; import javax.transaction.Transaction; import org.seasar.extension.dbcp.ConnectionPool; import org.seasar.framework.exception.SSQLException; public class HiveConnectionWrapper extends ConnectionWrapperImpl { public HiveConnectionWrapper(XAConnection xaConnection, Connection physicalConnection, ConnectionPool connectionPool, Transaction tx) throws SQLException { super(xaConnection, physicalConnection, connectionPool, tx); } protected SQLException wrapException(final SQLException e, final String sql) { return new SSQLException("ESSR0072", new Object[] { sql, e.getMessage(), new Integer(e.getErrorCode()), e.getSQLState() }, e .getSQLState(), e.getErrorCode(), e, sql); } protected void assertOpened() throws SQLException { if (isClosed()) { throw new SSQLException("ESSR0062", null); } } @Override public PreparedStatement prepareStatement(final String sql) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, autoGeneratedKeys), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnIndexes), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } @Override public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException { assertOpened(); try { return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnNames), sql); } catch (final SQLException ex) { release(); throw wrapException(ex, sql); } } }
HivePreparedStatementWrapperを用意する
これもConnectionと同じように、PreparedStatementの大半のコードが未実装(svn:jdbc/HivePreparedStatement)なので、PreparedStatementWrapper を継承して override
package org.seasar.extension.dbcp.impl; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.seasar.extension.jdbc.impl.PreparedStatementWrapper; import org.seasar.framework.exception.SSQLException; public class HivePreparedStatementWrapper extends PreparedStatementWrapper { protected final PreparedStatement original; protected final String sql; public HivePreparedStatementWrapper(PreparedStatement original, String sql) { super(original, sql); this.original = original; this.sql = sql; } protected SQLException wrapException(final SQLException e) { return wrapException(e, sql); } protected SQLException wrapException(final SQLException e, final String sql) { if (sql != null) { return new SSQLException("ESSR0072", new Object[] { sql, String.valueOf(e.getErrorCode()), e.getSQLState() }, e .getSQLState(), e.getErrorCode(), e, sql); } return e; } @Override public void close() { // HivePreparedStatement was not supported in #close } @Override public ResultSet executeQuery() throws SQLException { try { return new HiveResultSetWrapper(original.executeQuery()); } catch (final SQLException e) { throw wrapException(e); } } @Override public ResultSet executeQuery(final String sql) throws SQLException { try { return new HiveResultSetWrapper(original.executeQuery(sql)); } catch (final SQLException e) { throw wrapException(e, sql); } } @Override public ResultSet getResultSet() throws SQLException { try { return new HiveResultSetWrapper(original.getResultSet()); } catch (final SQLException e) { throw wrapException(e); } } @Override public ResultSet getGeneratedKeys() throws SQLException { try { return new HiveResultSetWrapper(original.getGeneratedKeys()); } catch (final SQLException e) { throw wrapException(e); } } }
HiveResultSetWrapper を用意
まだまだ続きます...HiveResultSet にも未実装が(svn:jdbc/HiveResultSet)なので、ResultSetWrapper を override
package org.seasar.extension.dbcp.impl; import java.sql.Date; import java.sql.ResultSet; import java.sql.SQLException; import org.seasar.extension.jdbc.impl.ResultSetWrapper; public class HiveResultSetWrapper extends ResultSetWrapper { protected static final String DOUBLE_QUOTE = "\""; protected final ResultSet rs; public HiveResultSetWrapper(ResultSet rs){ super(rs); this.rs = rs; } @Override public void close() throws SQLException { // HiveResultSet was not supported in #close } protected Date parseDate(final String dt) throws Exception { String tmp = dt; if(dt.startsWith(DOUBLE_QUOTE) && dt.endsWith(DOUBLE_QUOTE)){ int length = dt.length(); tmp = dt.substring(1, length - 1); } return Date.valueOf(tmp); } @Override public Date getDate(int columnIndex) throws SQLException { Object obj = getObject(columnIndex); if (obj == null) { return null; } if(obj instanceof String){ try { return parseDate((String) obj); } catch(Exception e){ throw new SQLException("Cannot convert column " + columnIndex + " to date: " + e.toString()); } } return super.getDate(columnIndex); } }
jdbc.diconを修正
やっと、s2jdbcに近づいてきました...
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE components PUBLIC "-//SEASAR2.1//DTD S2Container//EN" "http://www.seasar.org/dtd/components21.dtd"> <components namespace="jdbc_hive"> <!-- Hive does not support in jta --> <include path="jta.dicon"/> <component name="hiveDataSource" class="org.seasar.extension.dbcp.impl.XADataSourceImpl"> <property name="driverClassName">"org.apache.hadoop.hive.jdbc.HiveDriver"</property> <property name="URL">"jdbc:hive://master1:10000/"</property> <property name="user">""</property> <property name="password">""</property> </component> <component name="hive" class="org.seasar.extension.dbcp.impl.DataSourceImpl" autoBinding="none"> <arg> <component name="hivePool" class="org.seasar.extension.dbcp.impl.HiveConnectionPool" autoBinding="none"> <property name="xaDataSource">hiveDataSource</property> <property name="timeout">600</property> <property name="maxPoolSize">10</property> <property name="allowLocalTx">true</property> <property name="readOnly">true</property> <property name="transactionManager">TransactionManager</property> <destroyMethod name="close"/> </component> </arg> </component> </components>
s2jdbc.diconを用意
dialectをs2jdbcに読ませる
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE components PUBLIC "-//SEASAR//DTD S2Container 2.4//EN" "http://www.seasar.org/dtd/components24.dtd"> <components> <include path="jta.dicon"/> <include path="tx.dicon"/> <include path="s2jdbc-internal.dicon"/> <include path="jdbc-hive.dicon" /> <component name="hiveDialect" class="org.seasar.extension.jdbc.dialect.HiveDialect" /> <component name="hiveJdbcManager" class="org.seasar.extension.jdbc.manager.JdbcManagerImpl"> <property name="maxRows">0</property> <property name="fetchSize">0</property> <property name="queryTimeout">0</property> <property name="dialect">hiveDialect</property> <initMethod name="init" /> </component> </components>
S2JDBC経由でHiveQLを投げてみる
Hiveはinsert文は無いので、LOAD DATA 文です。
しかも、preparedStatementで値のbindできないので、直接クエリを書きます。
さらに、getSingleResult であっても、何も結果セットがかえってこない(svn:hive/service/HiveServier)ので、selectBySql(Integer.class...)とかはできません。
あきらめて、selectBySql(String.class) で void にしてます。。
selectについても、preparedStatementでbindで値のbindができないので、直接クエリを書きます。
今はまだ、order by の書き換え(sort by)を行っていないので、orderByが使えません。直接クエリに書きます。
joinについても少し難あり。。。
等など、ということで、いくらか諦めると...!
@Component @InterType("aop.propertyInterType") public class HogeService { @Property(PropertyType.WRITE) protected JdbcManager hive; public void insert(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public void insertFromLocal(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA LOCAL INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public void overwrite(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public void overwriteFromLocal(String path, String part, String subPart){ hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult(); } public List<Hoge> getHoge(){ return hive.from(Hoge.class).where("id > 1234").limit(100).getResultList(); } }
\(^o^)/ やたー!うごいたー!
おわり。
ってことで、とりあえず、HiveをS2JDBC経由で利用できるようになったよ!
Seasarのパッケージはホント便利!ほとんどwrapperが用意されてるし、diconで切り替えれるから修正が楽!
でも、coreなパッケージほど、 private メソッド多いよ!せめて protected か getter を用意して!(無駄にコピーしちゃったのがいくつかある...)
ああああ、でもこれなら HiveJDBC をちゃんと修正した方が早かったかも。。orz
今度やってみよう。。
Trackback
No Trackbacks
Track from Your Website
http://blog.xole.net/trackback/tb.php?id=746

Comment
No Comments