2010/02/07

HiveJDBC を S2JDBC 経由で使えるようにする

ポスト @ 4:36:35 , 修正 @ 2010/02/07 4:51:57 | , , , ,     このエントリーを含むはてなブックマーク

hadoop の話題。その3

HiveのJDBCを使えばリモート上で動いているHiveに対して、JDBC(over thrift)経由でHive QLを実行出来るのですごく便利です。
ref - Hive/HiveClient - Hadoop Wiki

HiveJDBCはフツーのJDBCっぽく使えるので、こんな感じで普通のDBサーバにSQLを投げる感覚で使える

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class HiveConnection {
    public static void main(String...args) throws SQLException {
        try {
            Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            System.exit(1);
        }
        Connection conn = DriverManager.getConnection("jdbc:hive://master1:10000/", "", "");
        Statement stmt = conn.createStatement();
        stmt.execute("SELECT distinct(id) FROM hoge");
        
        ResultSet rs = stmt.getResultSet();
        while(rs.next()){
            System.out.println(rs.getString(1));
        }
    }
}

なので、便利なものは便利なものと合体させてしまえ。ということで、みんな大好き S2JDBCで使えるようにしてみた

HiveServerを起動/停止

とりあえず、HiveJDBCを利用するには、thrift経由でHiveServerを叩ける必要がある。
以下のようなスクリプトを用意

start.sh

#!/usr/bin/env bash

pidfile=$HIVE_PID_DIR/hiveserver.pid
logfile=$HIVE_LOG_DIR/hiveserver.log

if [ -f $pidfile ]; then
    echo running as process `cat $pidfile`. stop it first
    exit 1
fi

nohup $HIVE_HOME/bin/hive --service hiveserver > $logfile 2>&1 < /dev/null &
echo $! > $pidfile

stop.sh

#!/usr/bin/env bash

pidfile=$HIVE_PID_DIR/hiveserver.pid
logfile=$HIVE_LOG_DIR/hiveserver.log

if [ -f $pidfile ]; then
    kill `cat $pidfile`
    rm $pidfile
else
    echo no pidfile $pidfile
fi

これで、実行したサーバでデフォルトのポート10000で起動する。

ちなみに、HADOOP_CONF_DIRとかHIVE_CONF_DIRは別途ちゃんと設定すること

HiveDialectを用意

先に書いておくと、SQLのorder by ...に相当するものは、HiveQLだと sort by ...なんだけど、今はまだ用意してない。今度書く

package org.seasar.extension.jdbc.dialect;

import org.seasar.extension.jdbc.SelectForUpdateType;

public class HiveDialect extends StandardDialect {
    @Override
    public String getName() {
        return "hive";
    }

    @Override
    public boolean supportsLimit() {
        return true;
    }
    
    @Override
    public String convertLimitSql(String sql, int offset, int limit) {
        StringBuilder buf = new StringBuilder(sql.length() + 20);
        buf.append(sql);
        buf.append(" limit ");
        buf.append(limit);
        return buf.toString();
    }

    @Override
    public boolean supportsBatchUpdateResults() {
        return false;
    }

    @Override
    public boolean supportsCursor() {
        return false;
    }

    @Override
    public boolean supportsForUpdate(SelectForUpdateType type, boolean withTarget) {
        return false;
    }

    @Override
    public boolean supportsGetGeneratedKeys() {
        return false;
    }

    @Override
    public boolean supportsIdentity() {
        return false;
    }

    @Override
    public boolean supportsInnerJoinForUpdate() {
        return false;
    }

    @Override
    public boolean supportsLockHint() {
        return false;
    }

    @Override
    public boolean supportsOffset() {
        return false;
    }

    @Override
    public boolean supportsOffsetWithoutLimit() {
        return false;
    }

    @Override
    public boolean supportsOuterJoinForUpdate() {
        return false;
    }

    @Override
    public boolean supportsSequence() {
        return false;
    }
    
}

HiveConnectionPoolを用意

これは、HiveがXA transactionとかconnection#closeなんかを実行することができない(未実装のコードなので...svn:..jdbc/HiveConnection)ので ConnectionPoolImpl を継承して ConnectionWrapper を取る部る部分を override

package org.seasar.extension.dbcp.impl;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.sql.XAConnection;
import javax.transaction.Transaction;

import org.seasar.extension.dbcp.ConnectionWrapper;
import org.seasar.framework.util.TransactionManagerUtil;

public class HiveConnectionPool extends ConnectionPoolImpl {
    
    private Map<Transaction, ConnectionWrapper> txActivePool = new ConcurrentHashMap<Transaction, ConnectionWrapper>();

    @Override
    public synchronized ConnectionWrapper checkOut() throws SQLException {
        Transaction tx = getTransaction();
        ConnectionWrapper conn = txActivePool.get(tx);
        if(null == conn){
            conn = createConnection(tx);
        }
        if(null != tx){
            txActivePool.put(tx, conn);
        }
        return conn;
    }
    
    protected ConnectionWrapper createConnection(Transaction transaction) throws SQLException {
        XAConnection xaConnection = getXADataSource().getXAConnection();
        Connection connection = xaConnection.getConnection();
        return new HiveConnectionWrapper(xaConnection, connection, this, transaction);
    }
    
    protected Transaction getTransaction(){
        return TransactionManagerUtil.getTransaction(getTransactionManager());
    }
    
}

HiveConnectionWrapper を用意する

これは後述するPreparedStatement対策として用意。PreparedStatementWrapper さえも wrapper を用意する必要があるので、ConnectionWrapperImpl を override

package org.seasar.extension.dbcp.impl;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.sql.XAConnection;
import javax.transaction.Transaction;

import org.seasar.extension.dbcp.ConnectionPool;
import org.seasar.framework.exception.SSQLException;

public class HiveConnectionWrapper extends ConnectionWrapperImpl {

    public HiveConnectionWrapper(XAConnection xaConnection,
            Connection physicalConnection, ConnectionPool connectionPool,
            Transaction tx) throws SQLException {
        super(xaConnection, physicalConnection, connectionPool, tx);
    }
    
    protected SQLException wrapException(final SQLException e, final String sql) {
        return new SSQLException("ESSR0072",
                new Object[] { sql, e.getMessage(),
                        new Integer(e.getErrorCode()), e.getSQLState() }, e
                        .getSQLState(), e.getErrorCode(), e, sql);
    }
    
    protected void assertOpened() throws SQLException {
        if (isClosed()) {
            throw new SSQLException("ESSR0062", null);
        }
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, autoGeneratedKeys), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnIndexes), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
        assertOpened();
        try {
            return new HivePreparedStatementWrapper(getPhysicalConnection().prepareStatement(sql, columnNames), sql);
        } catch (final SQLException ex) {
            release();
            throw wrapException(ex, sql);
        }
    }
}

HivePreparedStatementWrapperを用意する

これもConnectionと同じように、PreparedStatementの大半のコードが未実装(svn:jdbc/HivePreparedStatement)なので、PreparedStatementWrapper を継承して override

package org.seasar.extension.dbcp.impl;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.seasar.extension.jdbc.impl.PreparedStatementWrapper;
import org.seasar.framework.exception.SSQLException;

public class HivePreparedStatementWrapper extends PreparedStatementWrapper {

    protected final PreparedStatement original;
    
    protected final String sql;
    
    public HivePreparedStatementWrapper(PreparedStatement original, String sql) {
        super(original, sql);
        this.original = original;
        this.sql = sql;
    }
    
    protected SQLException wrapException(final SQLException e) {
        return wrapException(e, sql);
    }
    
    protected SQLException wrapException(final SQLException e, final String sql) {
        if (sql != null) {
            return new SSQLException("ESSR0072", new Object[] { sql,
                    String.valueOf(e.getErrorCode()), e.getSQLState() }, e
                    .getSQLState(), e.getErrorCode(), e, sql);
        }
        return e;
    }
    
    @Override
    public void close() {
        // HivePreparedStatement was not supported in #close
    }
    
    @Override
    public ResultSet executeQuery() throws SQLException {
        try {
            return new HiveResultSetWrapper(original.executeQuery());
        } catch (final SQLException e) {
            throw wrapException(e);
        }
    }
    
    @Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        try {
            return new HiveResultSetWrapper(original.executeQuery(sql));
        } catch (final SQLException e) {
            throw wrapException(e, sql);
        }
    }
    
    @Override
    public ResultSet getResultSet() throws SQLException {
        try {
            return new HiveResultSetWrapper(original.getResultSet());
        } catch (final SQLException e) {
            throw wrapException(e);
        }
    }
    
    @Override
    public ResultSet getGeneratedKeys() throws SQLException {
        try {
            return new HiveResultSetWrapper(original.getGeneratedKeys());
        } catch (final SQLException e) {
            throw wrapException(e);
        }
    }
}

HiveResultSetWrapper を用意

まだまだ続きます...HiveResultSet にも未実装が(svn:jdbc/HiveResultSet)なので、ResultSetWrapper を override

package org.seasar.extension.dbcp.impl;

import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.seasar.extension.jdbc.impl.ResultSetWrapper;

public class HiveResultSetWrapper extends ResultSetWrapper {
    
    protected static final String DOUBLE_QUOTE = "\"";
    
    protected final ResultSet rs;
    
    public HiveResultSetWrapper(ResultSet rs){
        super(rs);
        this.rs = rs;
    }

    @Override
    public void close() throws SQLException {
        // HiveResultSet was not supported in #close
    }
    
    protected Date parseDate(final String dt) throws Exception {
        String tmp = dt;
        if(dt.startsWith(DOUBLE_QUOTE) && dt.endsWith(DOUBLE_QUOTE)){
            int length = dt.length();
            tmp = dt.substring(1, length - 1);
        }
        return Date.valueOf(tmp);
    }
    
    @Override
    public Date getDate(int columnIndex) throws SQLException {
        Object obj = getObject(columnIndex);
        if (obj == null) {
            return null;
        }
        if(obj instanceof String){
            try {
                return parseDate((String) obj);
            } catch(Exception e){
                throw new SQLException("Cannot convert column " + columnIndex + " to date: " + e.toString());
            }
        }
        return super.getDate(columnIndex);
    }
    
}

jdbc.diconを修正

やっと、s2jdbcに近づいてきました...

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE components PUBLIC "-//SEASAR2.1//DTD S2Container//EN" "http://www.seasar.org/dtd/components21.dtd">
<components namespace="jdbc_hive">
    <!-- Hive does not support in jta -->
    <include path="jta.dicon"/>
    
    <component name="hiveDataSource" class="org.seasar.extension.dbcp.impl.XADataSourceImpl">
        <property name="driverClassName">"org.apache.hadoop.hive.jdbc.HiveDriver"</property>
        <property name="URL">"jdbc:hive://master1:10000/"</property>
        <property name="user">""</property>
        <property name="password">""</property>
    </component>

    <component name="hive" class="org.seasar.extension.dbcp.impl.DataSourceImpl" autoBinding="none">
        <arg>
            <component name="hivePool" class="org.seasar.extension.dbcp.impl.HiveConnectionPool" autoBinding="none">
                <property name="xaDataSource">hiveDataSource</property>
                <property name="timeout">600</property>
                <property name="maxPoolSize">10</property>
                <property name="allowLocalTx">true</property>
                <property name="readOnly">true</property>
                <property name="transactionManager">TransactionManager</property>
                <destroyMethod name="close"/>
            </component>
        </arg>
    </component>
</components>

s2jdbc.diconを用意

dialectをs2jdbcに読ませる

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE components PUBLIC "-//SEASAR//DTD S2Container 2.4//EN" "http://www.seasar.org/dtd/components24.dtd">
<components>
    <include path="jta.dicon"/>
    <include path="tx.dicon"/>
    <include path="s2jdbc-internal.dicon"/>
    
    <include path="jdbc-hive.dicon" />
    
    <component name="hiveDialect" class="org.seasar.extension.jdbc.dialect.HiveDialect" />
            
    <component name="hiveJdbcManager" class="org.seasar.extension.jdbc.manager.JdbcManagerImpl">
        <property name="maxRows">0</property>
        <property name="fetchSize">0</property>
        <property name="queryTimeout">0</property>
        <property name="dialect">hiveDialect</property>
        <initMethod name="init" />
    </component>
</components>

S2JDBC経由でHiveQLを投げてみる

Hiveはinsert文は無いので、LOAD DATA 文です。
しかも、preparedStatementで値のbindできないので、直接クエリを書きます。
さらに、getSingleResult であっても、何も結果セットがかえってこない(svn:hive/service/HiveServier)ので、selectBySql(Integer.class...)とかはできません。
あきらめて、selectBySql(String.class) で void にしてます。。

selectについても、preparedStatementでbindで値のbindができないので、直接クエリを書きます。
今はまだ、order by の書き換え(sort by)を行っていないので、orderByが使えません。直接クエリに書きます。
joinについても少し難あり。。。

等など、ということで、いくらか諦めると...!

@Component
@InterType("aop.propertyInterType")
public class HogeService {
    @Property(PropertyType.WRITE)
    protected JdbcManager hive;

    public void insert(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public void insertFromLocal(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA LOCAL INPATH '" + path + "' INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public void overwrite(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public void overwriteFromLocal(String path, String part, String subPart){
        hive.selectBySql(String.class, "LOAD DATA INPATH '" + path + "' OVERWRITE INTO TABLE hoge PARTITION (p1 = '" + part + "', p2 = '" + subPart + "'").getSingleResult();
    }
    public List<Hoge> getHoge(){
        return hive.from(Hoge.class).where("id > 1234").limit(100).getResultList();
    }
}

\(^o^)/ やたー!うごいたー!

おわり。

ってことで、とりあえず、HiveをS2JDBC経由で利用できるようになったよ!

Seasarのパッケージはホント便利!ほとんどwrapperが用意されてるし、diconで切り替えれるから修正が楽!
でも、coreなパッケージほど、 private メソッド多いよ!せめて protected か getter を用意して!(無駄にコピーしちゃったのがいくつかある...)

ああああ、でもこれなら HiveJDBC をちゃんと修正した方が早かったかも。。orz
今度やってみよう。。


Trackback

No Trackbacks

Track from Your Website

http://blog.xole.net/trackback/tb.php?id=746

Comment

No Comments

Post Your Comment


*は入力必須です。E-Mailは公開されません。

1 + 2 =