<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0"
    xmlns:dc="http://purl.org/dc/elements/1.1/"
    xmlns:sy="http://purl.org/rss/1.0/modules/syndication/"
    xmlns:admin="http://webns.net/mvcb/"
    xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
    xmlns:content="http://purl.org/rss/1.0/modules/content/">
<channel>
<title>ハタさんのブログ(復刻版)</title>
<link>http://blog.xole.net/index.php</link>
<pubDate>Sun, 07 Feb 2010 03:27:10 </pubDate>
<description>
ハタさんのブログ(復刻版) - RSS 2.0 (Really Simple Syndication).
</description>
<item>
<title>HDFS の HadoopThriftServer をなんとかする</title>
<link>http://blog.xole.net/article.php?id=745</link>
<pubDate>Sun, 07 Feb 2010 03:27:10 +09:00</pubDate>
<description>hadoop の話題。その2
hadoop を支える HDFS には HDFS-APIを通すことで、プログラム中から HDFS の読み書きが出きるようになります。(たぶん、hdfs-s3 なんかもこのAPI経由(? ソース読んでない))...</description>
<content:encoded>
<![CDATA[<p>hadoop の話題。その2</p>
<p>hadoop を支える <a href="http://hadoop.apache.org/hdfs/">HDFS</a> には <a href="http://wiki.apache.org/hadoop/HDFS-APIs">HDFS-API</a>を通すことで、プログラム中から HDFS の読み書きが出きるようになります。(たぶん、<a href="http://wiki.apache.org/hadoop/AmazonS3">hdfs-s3</a> なんかもこのAPI経由(? ソース読んでない))</p>

<p>(中略)</p>

<p>んで、この HDFS-API のなかに、Thrift を使って リモート上から HDFS の読み書きをできるようにしている <a href="http://svn.apache.org/repos/asf/hadoop/hdfs/trunk/src/contrib/thriftfs/">HadoopThriftServer(theiftfs)</a> があります。</p>

<p>この thriftfs の起動は <a href="http://svn.apache.org/repos/asf/hadoop/hdfs/trunk/src/contrib/thriftfs/scripts/start_thrift_server.sh">
</a> に書かれているのですが、shellを握ってしまうのでこんな感じにしました。</p>

<pre class="javascript">
<span class="preprocessor" >#!/usr/bin/env bash</span>

THRIFTFS_PID_FILE=$HADOOP_PID_DIR/thrift.pid
THRIFTFS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log

<span class="keyword" >if</span> [ -f $THRIFTFS_PID_FILE ]; then
    echo running as process `cat $THRIFTFS_PID_FILE`. stop it first
    exit 1
fi

CLASSPATH=$HADOOP_CONF_DIR
CLASSPATH=$CLASSPATH:$HADOOP_HOME/hadoop-0.20.1-core.jar:$HADOOP_HOME/hadoop-0.20.1-tools.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-1.0.4.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/commons-logging-api-1.0.4.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/log4j-1.2.15.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/contrib/thriftfs/hadoop-0.20.1-thriftfs.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/hadoopthriftapi.jar
CLASSPATH=$CLASSPATH:$HADOOP_HOME/src/contrib/thriftfs/lib/libthrift.jar

nohup java -Dcom.sun.management.jmxremote -cp $CLASSPATH org.apache.hadoop.thriftfs.HadoopThriftServer 10010 &gt; $THRIFTFS_LOG_FILE 2&gt;&amp;1 &lt; /dev/<span class="keyword" >null</span> &amp;
echo $! &gt; $THRIFTFS_PID_FILE

$HADOOP_HOME/bin/hadoop dfsadmin -safemode leave</pre>

<p>同様に 停止は</p>


<pre class="javascript">
<span class="preprocessor" >#!/usr/bin/env bash</span>

THRIFT_FS_PID_FILE=$HADOOP_PID_DIR/thrift.pid
THRIFT_FS_LOG_FILE=$HADOOP_LOG_DIR/thrift.log

<span class="keyword" >if</span> [ -f $THRIFT_FS_PID_FILE ]; then
    kill `cat $THRIFT_FS_PID_FILE`
    rm $THRIFT_FS_PID_FILE
<span class="keyword" >else</span>
    echo no pidfile $THRIFT_FS_PID_FILE
fi</pre>

<p>起動する際は dfs($HADOOP_HOME/bin/start-dfs.sh)が起動している状態で起動する必要があります。<br />
(dfsadmin -safemode leave は適宜行ってください)</p>

<p>んで、このThriftFSは、こんな感じでリモート上の DFS のファイルを読み書きできます</p>

<pre class="java">
<span class="keyword" >import</span> org.apache.hadoop.conf.Configuration;
<span class="keyword" >import</span> org.apache.hadoop.hdfs.protocol.FSConstants;
<span class="keyword" >import</span> org.apache.hadoop.fs.permission.FsPermission;

<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.Pathname;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftHandle;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftIOException;

<span class="keyword" >import</span> com.facebook.thrift.TException;
<span class="keyword" >import</span> com.facebook.thrift.protocol.TBinaryProtocol;
<span class="keyword" >import</span> com.facebook.thrift.protocol.TProtocol;
<span class="keyword" >import</span> com.facebook.thrift.transport.TSocket;
<span class="keyword" >import</span> com.facebook.thrift.transport.TTransportException;

<span class="keyword" >public</span> <span class="keyword" >class</span> HDFSInput {
    
    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >void</span> main(String...args) {
        <span class="keyword" >final</span> <span class="keyword" >int</span> defaultBufferSize = config.getInt(<span class="string" >"io.file.buffer.size"</span>, <span class="number" >4096</span>);
        <span class="keyword" >final</span> <span class="keyword" >long</span> defaultBlockSize = config.getLong(<span class="string" >"dfs.block.size"</span>, FSConstants.DEFAULT_BLOCK_SIZE);
        <span class="keyword" >final</span> <span class="keyword" >short</span> defaultReplication = (<span class="keyword" >short</span>) config.getInt(<span class="string" >"dfs.replication"</span>, <span class="number" >3</span>);

        TSocket socket = <span class="keyword" >new</span> TSocket(<span class="string" >"master1"</span>, <span class="number" >10010</span>);
        TProtocol protocol = <span class="keyword" >new</span> TBinaryProtocol(socket);
        
        <span class="keyword" >try</span> {
            socket.open();
            <span class="keyword" >try</span> {
                ThriftHadoopFileSystem.Client client = <span class="keyword" >new</span> ThriftHadoopFileSystem.Client(protocol);
                <span class="comment" >// client timeout 5 sec</span>
                client.setTimeout(<span class="number" >5</span>);
                Pathname hoge = <span class="keyword" >new</span> Pathname(<span class="string" >"/tmp/hoge"</span>);

                FsPermission permission = FsPermission.createImmutable((<span class="keyword" >short</span>) <span class="number" >0655</span>);
                <span class="keyword" >boolean</span> overwrite = <span class="keyword" >true</span>;

                ThriftHandle writeHandler = client.createFile(hoge, permission.toShort(), overwrite, defaultBufferSize, defaultReplication, defaultBlockSize);
                client.write(writeHandler, <span class="string" >"hello world"</span>);
                client.close(writeHandler);

                ThriftHandle readHandler = client.open(hoge);
                System.out.println(client.read(readHandler, <span class="number" >0</span>, <span class="number" >1024</span>));
                client.close(readHandler);
            } <span class="keyword" >catch</span> (TTransportException e) {
                e.printStackTrace();
            } <span class="keyword" >catch</span> (ThriftIOException e) {
                e.printStackTrace();
            }
        } <span class="keyword" >catch</span> (TException e) {
            e.printStackTrace();
        } <span class="keyword" >finally</span> {
            socket.close();
        }
    }
}
</pre>

<p>read時にBufferedReaderにwrapするともう少し便利に読み書きできる(これは今度書く)</p>

<p>んで、読み書きできるようになったんだけど、どうも複数のクライアントから連続して読み書きをすると、整合性が取れなくなってしまう(?)のかエラーがでるようになった。<br />
元の<a href="http://svn.apache.org/repos/asf/hadoop/hdfs/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java">ソース</a>を読むと...なんともエレガントな。。。</p>

<p>ということで、java.util.concurrent.atomic を使って書き直してみた(ここが本題)</p>
<pre class="java">
<span class="keyword" >package</span> org.apache.hadoop.thriftfs;

<span class="keyword" >import</span> java.io.IOException;
<span class="keyword" >import</span> java.net.InetSocketAddress;
<span class="keyword" >import</span> java.net.ServerSocket;
<span class="keyword" >import</span> java.util.LinkedList;
<span class="keyword" >import</span> java.util.List;
<span class="keyword" >import</span> java.util.Random;
<span class="keyword" >import</span> java.util.concurrent.ConcurrentHashMap;
<span class="keyword" >import</span> java.util.concurrent.atomic.AtomicLong;

<span class="keyword" >import</span> org.apache.commons.logging.Log;
<span class="keyword" >import</span> org.apache.commons.logging.LogFactory;
<span class="keyword" >import</span> org.apache.hadoop.conf.Configuration;
<span class="keyword" >import</span> org.apache.hadoop.fs.FSDataInputStream;
<span class="keyword" >import</span> org.apache.hadoop.fs.FSDataOutputStream;
<span class="keyword" >import</span> org.apache.hadoop.fs.FileSystem;
<span class="keyword" >import</span> org.apache.hadoop.fs.Path;
<span class="keyword" >import</span> org.apache.hadoop.fs.permission.FsPermission;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.Pathname;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftHandle;
<span class="keyword" >import</span> org.apache.hadoop.thriftfs.api.ThriftIOException;
<span class="keyword" >import</span> org.apache.hadoop.util.Daemon;
<span class="keyword" >import</span> org.apache.hadoop.util.StringUtils;

<span class="keyword" >import</span> com.facebook.thrift.protocol.TBinaryProtocol;
<span class="keyword" >import</span> com.facebook.thrift.server.TServer;
<span class="keyword" >import</span> com.facebook.thrift.server.TThreadPoolServer;
<span class="keyword" >import</span> com.facebook.thrift.transport.TServerSocket;
<span class="keyword" >import</span> com.facebook.thrift.transport.TServerTransport;
<span class="keyword" >import</span> com.facebook.thrift.transport.TTransportFactory;

<span class="keyword" >public</span> <span class="keyword" >class</span> HadoopThriftServer <span class="keyword" >extends</span> ThriftHadoopFileSystem {

    <span class="keyword" >static</span> <span class="keyword" >int</span> serverPort = <span class="number" >0</span>;                    <span class="comment" >// default port</span>
    TServer    server = <span class="keyword" >null</span>;

    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >class</span> HadoopThriftHandler <span class="keyword" >implements</span> ThriftHadoopFileSystem.Iface
    {

      <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >final</span> Log LOG = LogFactory.getLog(<span class="string" >"org.apache.hadoop.thrift"</span>);

      <span class="comment" >// HDFS glue</span>
      Configuration conf;
      FileSystem fs;
          
      <span class="comment" >// stucture that maps each Thrift object into an hadoop object</span>
      <span class="keyword" >private</span> AtomicLong nextId = <span class="keyword" >new</span> AtomicLong(<span class="keyword" >new</span> Random().nextLong());
      <span class="keyword" >private</span> ConcurrentHashMap&lt;Long, Object&gt; hadoopHash = <span class="keyword" >new</span> ConcurrentHashMap&lt;Long, Object&gt;();
      <span class="keyword" >private</span> Daemon inactivityThread = <span class="keyword" >null</span>;

      <span class="comment" >// Detect inactive session</span>
      <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >volatile</span> <span class="keyword" >long</span> inactivityPeriod = <span class="number" >3600</span> * <span class="number" >1000</span>; <span class="comment" >// 1 hr</span>
      <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >volatile</span> <span class="keyword" >long</span> inactivityRecheckInterval = <span class="number" >60</span> * <span class="number" >1000</span>;
      <span class="keyword" >private</span> <span class="keyword" >static</span> <span class="keyword" >volatile</span> <span class="keyword" >boolean</span> fsRunning = <span class="keyword" >true</span>;
      <span class="keyword" >private</span> AtomicLong now = <span class="keyword" >new</span> AtomicLong(now());

      <span class="comment" >// allow outsider to change the hadoopthrift path</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> setOption(String key, String val) {
      }

      <span class="comment" >/**
       * Current system time.
       * @return current time in msec.
       */</span>
      <span class="keyword" >static</span> <span class="keyword" >long</span> now() {
        <span class="keyword" >return</span> System.currentTimeMillis();
      }

      <span class="comment" >/**
      * getVersion
      *
      * @return current version of the interface.
      */</span>
      <span class="keyword" >public</span> String getVersion() {
        <span class="keyword" >return</span> <span class="string" >"0.1"</span>;
      }

      <span class="comment" >/**
       * shutdown
       *
       * cleanly closes everything and exit.
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> shutdown(<span class="keyword" >int</span> status) {
        LOG.info(<span class="string" >"HadoopThriftServer shutting down."</span>);
        <span class="keyword" >try</span> {
          fs.close();
        } <span class="keyword" >catch</span> (IOException e) {
          LOG.warn(<span class="string" >"Unable to close file system"</span>);
        }
        Runtime.getRuntime().exit(status);
      }

      <span class="comment" >/**
       * Periodically checks to see if there is inactivity
       */</span>
      <span class="keyword" >class</span> InactivityMonitor <span class="keyword" >implements</span> Runnable {
        <span class="keyword" >public</span> <span class="keyword" >void</span> run() {
          <span class="keyword" >while</span> (fsRunning) {
            <span class="keyword" >try</span> {
              <span class="keyword" >if</span> (now() &gt; now.get() + inactivityPeriod) {
                LOG.warn(<span class="string" >"HadoopThriftServer Inactivity period of "</span> +
                         inactivityPeriod + <span class="string" >" expired... Stopping Server."</span>);
                shutdown(-<span class="number" >1</span>);
              }
            } <span class="keyword" >catch</span> (Exception e) {
              LOG.error(StringUtils.stringifyException(e));
            }
            <span class="keyword" >try</span> {
              Thread.sleep(inactivityRecheckInterval);
            } <span class="keyword" >catch</span> (InterruptedException ie) {
            }
          }
        }
      }

      <span class="comment" >/**
       * HadoopThriftServer
       *
       * Constructor for the HadoopThriftServer glue with Thrift Class.
       *
       * @param name - the name of this handler
       */</span>
      <span class="keyword" >public</span> HadoopThriftHandler(String name) {
        conf = <span class="keyword" >new</span> Configuration();
        now.set(now());
        <span class="keyword" >try</span> {
          inactivityThread = <span class="keyword" >new</span> Daemon(<span class="keyword" >new</span> InactivityMonitor());
          fs = FileSystem.get(conf);
        } <span class="keyword" >catch</span> (IOException e) {
          LOG.warn(<span class="string" >"Unable to open hadoop file system..."</span>);
          Runtime.getRuntime().exit(-<span class="number" >1</span>);
        }
      }

      <span class="comment" >/**
        * printStackTrace
        *
        * Helper function to print an exception stack trace to the log and not stderr
        *
        * @param e the exception
        *
        */</span>
      <span class="keyword" >static</span> <span class="keyword" >private</span> <span class="keyword" >void</span> printStackTrace(Exception e) {
        <span class="keyword" >for</span>(StackTraceElement s: e.getStackTrace()) {
          LOG.error(s);
        }
      }

      <span class="comment" >/**
       * Lookup a thrift object into a hadoop object
       */</span>
      <span class="keyword" >private</span> <span class="keyword" >synchronized</span> Object lookup(<span class="keyword" >long</span> id) {
        <span class="keyword" >return</span> hadoopHash.get(<span class="keyword" >new</span> Long(id));
      }

      <span class="comment" >/**
       * Insert a thrift object into a hadoop object. Return its id.
       */</span>
      <span class="keyword" >private</span> <span class="keyword" >synchronized</span> <span class="keyword" >long</span> insert(Object o) {
        <span class="keyword" >long</span> next = nextId.incrementAndGet();
        hadoopHash.put(next, o);
        <span class="keyword" >return</span> next;
      }

      <span class="comment" >/**
       * Delete a thrift object from the hadoop store.
       */</span>
      <span class="keyword" >private</span> <span class="keyword" >synchronized</span> Object remove(<span class="keyword" >long</span> id) {
        <span class="keyword" >return</span> hadoopHash.remove(<span class="keyword" >new</span> Long(id));
      }

      <span class="comment" >/**
        * Implement the API exported by this thrift server
        */</span>

      <span class="comment" >/** Set inactivity timeout period. The period is specified in seconds.
        * if there are no RPC calls to the HadoopThrift server for this much
        * time, then the server kills itself.
        */</span>
      <span class="keyword" >public</span> <span class="keyword" >synchronized</span> <span class="keyword" >void</span> setInactivityTimeoutPeriod(<span class="keyword" >long</span> periodInSeconds) {
        inactivityPeriod = periodInSeconds * <span class="number" >1000</span>; <span class="comment" >// in milli seconds</span>
        <span class="keyword" >if</span> (inactivityRecheckInterval &gt; inactivityPeriod ) {
          inactivityRecheckInterval = inactivityPeriod;
        }
      }


      <span class="comment" >/**
        * Create a file and open it for writing
        */</span>
      <span class="keyword" >public</span> ThriftHandle create(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"create: "</span> + path);
          FSDataOutputStream out = fs.create(<span class="keyword" >new</span> Path(path.pathname));
          <span class="keyword" >long</span> id = insert(out);
          ThriftHandle obj = <span class="keyword" >new</span> ThriftHandle(id);
          HadoopThriftHandler.LOG.debug(<span class="string" >"created: "</span> + path + <span class="string" >" id: "</span> + id);
          <span class="keyword" >return</span> obj;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
        * Create a file and open it for writing, delete file if it exists
        */</span>
      <span class="keyword" >public</span> ThriftHandle createFile(Pathname path, 
                                     <span class="keyword" >short</span> mode,
                                     <span class="keyword" >boolean</span>  overwrite,
                                     <span class="keyword" >int</span> bufferSize,
                                     <span class="keyword" >short</span> replication,
                                     <span class="keyword" >long</span> blockSize) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"create: "</span> + path +
                                       <span class="string" >" permission: "</span> + mode +
                                       <span class="string" >" overwrite: "</span> + overwrite +
                                       <span class="string" >" bufferSize: "</span> + bufferSize +
                                       <span class="string" >" replication: "</span> + replication +
                                       <span class="string" >" blockSize: "</span> + blockSize);
          FSDataOutputStream out = fs.create(<span class="keyword" >new</span> Path(path.pathname), 
                                             <span class="keyword" >new</span> FsPermission(mode),
                                             overwrite,
                                             bufferSize,
                                             replication,
                                             blockSize,
                                             <span class="keyword" >null</span>); <span class="comment" >// progress</span>
          <span class="keyword" >long</span> id = insert(out);
          ThriftHandle obj = <span class="keyword" >new</span> ThriftHandle(id);
          HadoopThriftHandler.LOG.debug(<span class="string" >"created: "</span> + path + <span class="string" >" id: "</span> + id);
          <span class="keyword" >return</span> obj;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Opens an existing file and returns a handle to read it
       */</span>
      <span class="keyword" >public</span> ThriftHandle open(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"open: "</span> + path);
          FSDataInputStream out = fs.open(<span class="keyword" >new</span> Path(path.pathname));
          <span class="keyword" >long</span> id = insert(out);
          ThriftHandle obj = <span class="keyword" >new</span> ThriftHandle(id);
          HadoopThriftHandler.LOG.debug(<span class="string" >"opened: "</span> + path + <span class="string" >" id: "</span> + id);
          <span class="keyword" >return</span> obj;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Opens an existing file to append to it.
       */</span>
      <span class="keyword" >public</span> ThriftHandle append(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"append: "</span> + path);
          FSDataOutputStream out = fs.append(<span class="keyword" >new</span> Path(path.pathname));
          <span class="keyword" >long</span> id = insert(out);
          ThriftHandle obj = <span class="keyword" >new</span> ThriftHandle(id);
          HadoopThriftHandler.LOG.debug(<span class="string" >"appended: "</span> + path + <span class="string" >" id: "</span> + id);
          <span class="keyword" >return</span> obj;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * write to a file
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> write(ThriftHandle tout, String data) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"write: "</span> + tout.id);
          FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
          <span class="keyword" >byte</span>[] tmp = data.getBytes(<span class="string" >"UTF-8"</span>);
          out.write(tmp, <span class="number" >0</span>, tmp.length);
          HadoopThriftHandler.LOG.debug(<span class="string" >"wrote: "</span> + tout.id);
          <span class="keyword" >return</span> <span class="keyword" >true</span>;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * read from a file
       */</span>
      <span class="keyword" >public</span> String read(ThriftHandle tout, <span class="keyword" >long</span> offset,
                         <span class="keyword" >int</span> length) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"read: "</span> + tout.id +
                                       <span class="string" >" offset: "</span> + offset +
                                       <span class="string" >" length: "</span> + length);
          FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
          
          <span class="keyword" >if</span> (in.getPos() != offset) {
            in.seek(offset);
          }
          <span class="keyword" >byte</span>[] tmp = <span class="keyword" >new</span> <span class="keyword" >byte</span>[length];
          <span class="keyword" >int</span> numbytes = in.read(offset, tmp, <span class="number" >0</span>, length);
          HadoopThriftHandler.LOG.debug(<span class="string" >"read done: "</span> + tout.id);
          <span class="keyword" >return</span> <span class="keyword" >new</span> String(tmp, <span class="number" >0</span>, numbytes, <span class="string" >"UTF-8"</span>);
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Delete a file/directory
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> rm(Pathname path, <span class="keyword" >boolean</span> recursive) 
                            <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"rm: "</span> + path +
                                       <span class="string" >" recursive: "</span> + recursive);
          <span class="keyword" >boolean</span> ret = fs.delete(<span class="keyword" >new</span> Path(path.pathname), recursive);
          HadoopThriftHandler.LOG.debug(<span class="string" >"rm: "</span> + path);
          <span class="keyword" >return</span> ret;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Move a file/directory
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> rename(Pathname path, Pathname dest) 
                            <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"rename: "</span> + path +
                                       <span class="string" >" destination: "</span> + dest);
          <span class="keyword" >boolean</span> ret = fs.rename(<span class="keyword" >new</span> Path(path.pathname), 
                                  <span class="keyword" >new</span> Path(dest.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"rename: "</span> + path);
          <span class="keyword" >return</span> ret;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       *  close file
       */</span>
       <span class="keyword" >public</span> <span class="keyword" >boolean</span> close(ThriftHandle tout) <span class="keyword" >throws</span> ThriftIOException {
         <span class="keyword" >try</span> {
           now.set(now());
           HadoopThriftHandler.LOG.debug(<span class="string" >"close: "</span> + tout.id);
           Object obj = remove(tout.id);
           
           <span class="keyword" >if</span> (obj <span class="keyword" >instanceof</span> FSDataOutputStream) {
             FSDataOutputStream out = (FSDataOutputStream)obj;
             out.close();
           } <span class="keyword" >else</span> <span class="keyword" >if</span> (obj <span class="keyword" >instanceof</span> FSDataInputStream) {
             FSDataInputStream in = (FSDataInputStream)obj;
             in.close();
           } <span class="keyword" >else</span> {
             <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(<span class="string" >"Unknown thrift handle."</span>);
           }
           HadoopThriftHandler.LOG.debug(<span class="string" >"closed: "</span> + tout.id);
           <span class="keyword" >return</span> <span class="keyword" >true</span>;
         } <span class="keyword" >catch</span> (IOException e) {
           <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
         }
       }

       <span class="comment" >/**
        * Create a directory
        */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> mkdirs(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"mkdirs: "</span> + path);
          <span class="keyword" >boolean</span> ret = fs.mkdirs(<span class="keyword" >new</span> Path(path.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"mkdirs: "</span> + path);
          <span class="keyword" >return</span> ret;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Does this pathname exist?
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >boolean</span> exists(Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"exists: "</span> + path);
          <span class="keyword" >boolean</span> ret = fs.exists(<span class="keyword" >new</span> Path(path.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"exists done: "</span> + path);
          <span class="keyword" >return</span> ret;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Returns status about the specified pathname
       */</span>
      <span class="keyword" >public</span> org.apache.hadoop.thriftfs.api.FileStatus stat(
                              Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"stat: "</span> + path);
          org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
                                             <span class="keyword" >new</span> Path(path.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"stat done: "</span> + path);
          <span class="keyword" >return</span> <span class="keyword" >new</span> org.apache.hadoop.thriftfs.api.FileStatus(
            stat.getPath().toString(),
            stat.getLen(),
            stat.isDir(),
            stat.getReplication(),
            stat.getBlockSize(),
            stat.getModificationTime(),
            stat.getPermission().toString(),
            stat.getOwner(),
            stat.getGroup());
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * If the specified pathname is a directory, then return the
       * list of pathnames in this directory
       */</span>
      <span class="keyword" >public</span> List&lt;org.apache.hadoop.thriftfs.api.FileStatus&gt; listStatus(
                              Pathname path) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"listStatus: "</span> + path);

          org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
                                             <span class="keyword" >new</span> Path(path.pathname));
          HadoopThriftHandler.LOG.debug(<span class="string" >"listStatus done: "</span> + path);
          org.apache.hadoop.thriftfs.api.FileStatus tmp;
          List&lt;org.apache.hadoop.thriftfs.api.FileStatus&gt; value = 
            <span class="keyword" >new</span> LinkedList&lt;org.apache.hadoop.thriftfs.api.FileStatus&gt;();

          <span class="keyword" >for</span> (<span class="keyword" >int</span> i = <span class="number" >0</span>; i &lt; stat.length; i++) {
            tmp = <span class="keyword" >new</span> org.apache.hadoop.thriftfs.api.FileStatus(
                        stat[i].getPath().toString(),
                        stat[i].getLen(),
                        stat[i].isDir(),
                        stat[i].getReplication(),
                        stat[i].getBlockSize(),
                        stat[i].getModificationTime(),
                        stat[i].getPermission().toString(),
                        stat[i].getOwner(),
                        stat[i].getGroup());
            value.add(tmp);
          }
          <span class="keyword" >return</span> value;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Sets the permission of a pathname
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> chmod(Pathname path, <span class="keyword" >short</span> mode) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"chmod: "</span> + path + 
                                       <span class="string" >" mode "</span> + mode);
          fs.setPermission(<span class="keyword" >new</span> Path(path.pathname), <span class="keyword" >new</span> FsPermission(mode));
          HadoopThriftHandler.LOG.debug(<span class="string" >"chmod done: "</span> + path);
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Sets the owner &amp; group of a pathname
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> chown(Pathname path, String owner, String group) 
                                                         <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"chown: "</span> + path +
                                       <span class="string" >" owner: "</span> + owner +
                                       <span class="string" >" group: "</span> + group);
          fs.setOwner(<span class="keyword" >new</span> Path(path.pathname), owner, group);
          HadoopThriftHandler.LOG.debug(<span class="string" >"chown done: "</span> + path);
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }

      <span class="comment" >/**
       * Sets the replication factor of a file
       */</span>
      <span class="keyword" >public</span> <span class="keyword" >void</span> setReplication(Pathname path, <span class="keyword" >short</span> repl) <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"setrepl: "</span> + path +
                                       <span class="string" >" replication factor: "</span> + repl);
          fs.setReplication(<span class="keyword" >new</span> Path(path.pathname), repl);
          HadoopThriftHandler.LOG.debug(<span class="string" >"setrepl done: "</span> + path);
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }

      }

      <span class="comment" >/**
       * Returns the block locations of this file
       */</span>
      <span class="keyword" >public</span> List&lt;org.apache.hadoop.thriftfs.api.BlockLocation&gt; 
               getFileBlockLocations(Pathname path, <span class="keyword" >long</span> start, <span class="keyword" >long</span> length) 
                                           <span class="keyword" >throws</span> ThriftIOException {
        <span class="keyword" >try</span> {
          now.set(now());
          HadoopThriftHandler.LOG.debug(<span class="string" >"getFileBlockLocations: "</span> + path);

          org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
                                                   <span class="keyword" >new</span> Path(path.pathname));

          org.apache.hadoop.fs.BlockLocation[] stat = 
              fs.getFileBlockLocations(status, start, length);
          HadoopThriftHandler.LOG.debug(<span class="string" >"getFileBlockLocations done: "</span> + path);

          org.apache.hadoop.thriftfs.api.BlockLocation tmp;
          List&lt;org.apache.hadoop.thriftfs.api.BlockLocation&gt; value = 
            <span class="keyword" >new</span> LinkedList&lt;org.apache.hadoop.thriftfs.api.BlockLocation&gt;();

          <span class="keyword" >for</span> (<span class="keyword" >int</span> i = <span class="number" >0</span>; i &lt; stat.length; i++) {

            <span class="comment" >// construct the list of hostnames from the array returned</span>
            <span class="comment" >// by HDFS</span>
            List&lt;String&gt; hosts = <span class="keyword" >new</span> LinkedList&lt;String&gt;();
            String[] hostsHdfs = stat[i].getHosts();
            <span class="keyword" >for</span> (<span class="keyword" >int</span> j = <span class="number" >0</span>; j &lt; hostsHdfs.length; j++) {
              hosts.add(hostsHdfs[j]);
            }

            <span class="comment" >// construct the list of host:port from the array returned</span>
            <span class="comment" >// by HDFS</span>
            List&lt;String&gt; names = <span class="keyword" >new</span> LinkedList&lt;String&gt;();
            String[] namesHdfs = stat[i].getNames();
            <span class="keyword" >for</span> (<span class="keyword" >int</span> j = <span class="number" >0</span>; j &lt; namesHdfs.length; j++) {
              names.add(namesHdfs[j]);
            }
            tmp = <span class="keyword" >new</span> org.apache.hadoop.thriftfs.api.BlockLocation(
                        hosts, names, stat[i].getOffset(), stat[i].getLength());
            value.add(tmp);
          }
          <span class="keyword" >return</span> value;
        } <span class="keyword" >catch</span> (IOException e) {
          <span class="keyword" >throw</span> <span class="keyword" >new</span> ThriftIOException(e.getMessage());
        }
      }
    }

    <span class="comment" >// Bind to port. If the specified port is 0, then bind to random port.</span>
    <span class="keyword" >private</span> ServerSocket createServerSocket(<span class="keyword" >int</span> port) <span class="keyword" >throws</span> IOException {
      <span class="keyword" >try</span> {
        ServerSocket sock = <span class="keyword" >new</span> ServerSocket();
        <span class="comment" >// Prevent 2MSL delay problem on server restarts</span>
        sock.setReuseAddress(<span class="keyword" >true</span>);
        <span class="comment" >// Bind to listening port</span>
        <span class="keyword" >if</span> (port == <span class="number" >0</span>) {
          sock.bind(<span class="keyword" >null</span>);
          serverPort = sock.getLocalPort();
        } <span class="keyword" >else</span> {
          sock.bind(<span class="keyword" >new</span> InetSocketAddress(port));
        }
        <span class="keyword" >return</span> sock;
      } <span class="keyword" >catch</span> (IOException ioe) {
        <span class="keyword" >throw</span> <span class="keyword" >new</span> IOException(<span class="string" >"Could not create ServerSocket on port "</span> + port + <span class="string" >"."</span> +
                              ioe);
      }
    }

    <span class="comment" >/**
     * Constrcts a server object
     */</span>
    <span class="keyword" >public</span> HadoopThriftServer(String [] args) {

      <span class="keyword" >if</span> (args.length &gt; <span class="number" >0</span>) {
        serverPort = <span class="keyword" >new</span> Integer(args[<span class="number" >0</span>]);
      }
      <span class="keyword" >try</span> {
        ServerSocket ssock = createServerSocket(serverPort);
        TServerTransport serverTransport = <span class="keyword" >new</span> TServerSocket(ssock);
        Iface handler = <span class="keyword" >new</span> HadoopThriftHandler(<span class="string" >"hdfs-thrift-dhruba"</span>);
        ThriftHadoopFileSystem.Processor processor = <span class="keyword" >new</span> ThriftHadoopFileSystem.Processor(handler);
        TThreadPoolServer.Options options = <span class="keyword" >new</span> TThreadPoolServer.Options();
        options.minWorkerThreads = <span class="number" >10</span>;
        server = <span class="keyword" >new</span> TThreadPoolServer(processor, serverTransport,
                                               <span class="keyword" >new</span> TTransportFactory(),
                                               <span class="keyword" >new</span> TTransportFactory(),
                                               <span class="keyword" >new</span> TBinaryProtocol.Factory(),
                                               <span class="keyword" >new</span> TBinaryProtocol.Factory(), 
                                               options);
        System.out.println(<span class="string" >"Starting the hadoop thrift server on port ["</span> + serverPort + <span class="string" >"]..."</span>);
        HadoopThriftHandler.LOG.info(<span class="string" >"Starting the hadoop thrift server on port ["</span> +serverPort + <span class="string" >"]..."</span>);
        System.out.flush();

      } <span class="keyword" >catch</span> (Exception x) {
        x.printStackTrace();
      }
    }

    <span class="keyword" >public</span> <span class="keyword" >static</span> <span class="keyword" >void</span> main(String [] args) {
      HadoopThriftServer me = <span class="keyword" >new</span> HadoopThriftServer(args);
      me.server.serve();
    }
}</pre>

<p>安定した。気がする。。<br />
いつか問題となるようなコードを書いてpatchを作ろう</p>]]>
</content:encoded>
</item>
</channel>
</rss>