2013年1月24日木曜日

Hadoop 最速マスター

Hadoopを利用するうえで必要な知識を密度濃くまとめた。


構成
           マスタ       スレーブ
HDFS       NameNode     DataNode
MapReduce  JobTracker   TaskTracker
                           mapper(×起動数)
                           reducer(×起動数)

本ブログ内では物理ホスト3台を利用する。
node01 マスタ
node02 スレーブ1
node03 スレーブ2


NameNodeのSecondaryNameNodeはどちらかを利用できる。
CheckpointNode
一定の間隔で同期する

BackupNode
常に同じ情報を保持し同期する
こちらは0.2.1以降でしか利用できない



 事前作業
javaをインストールする。
# su -
# yum install java-1.6.0-openjdk

hostsへ登録する。
# vi /etc/hosts
x.x.x.x node01
y.y.y.y node02
z.z.z.z node03
※127.0.0.1でホスト名を登録しておかないこと。

iptablesの設定を見直し、
各ノード間での通信を許可しておく。
# vi /etc/sysconfig/iptables

# /etc/init.d/iptables restart



 hadoopのインストール
hadoopのver1.0.3(0.20系)をインストールする。
0.2.1系以降とのコンフィグ差分はそのつど記載する。

アカウントを作成する。
# useradd hadoop

パスワードは何でもよい。
# passwd hadoop

# cd /usr/local/src/

# wget http://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-1.0.3/hadoop-1.0.3.tar.gz

# tar zxvf hadoop-1.0.3.tar.gz

# mv hadoop-1.0.3 /usr/local/

# ln -s /usr/local/hadoop-1.0.3 /usr/local/hadoop

# cd /usr/local/hadoop

# chown -R hadoop.hadoop /usr/local/hadoop/

hadoopユーザの環境変数を設定しておく。
# su - hadoop

$ vi ./.bashrc
export JAVA_HOME=/usr
export HADOOP_INSTALL=/usr/local/hadoop
export PATH=$PATH:$HADOOP_INSTALL/bin:$JAVA_HOME/bin

$ source ~/.bashrc

hadoopコマンドが使えることと、インストールしたバージョンを確認する。
$ hadoop version



 分散モードで利用するための設定
【事前作業】
マスタからスレーブへ鍵なしsshログインできなければならない。
スレーブ全サーバで秘密鍵と公開鍵を作成し、公開鍵をマスタへ配布しておくこと。
sshで各ノードにログインして各種プロセスを起動させるために、
起動ツールを動かすマスタ側だけで設定すればよい。

$ su - hadoop

スレーブ側で鍵を作成する。
$ ssh-keygen -t rsa -f ~/.ssh/id_rsa

作成した公開鍵をコピーする。
$ cat ~/.ssh/id_rsa.pub

コピーした公開鍵をマスタ側に記載する。
$ vi  ~/.ssh/authorized_keys

$ chmod 600 ~/.ssh/authorized_keys

鍵なしでsshできることを確認する。
$ ssh hadoop@XXX


【マスタだけの設定】
$ cd /usr/local/hadoop/

CheckpointNodeを起動させるホストを指定する。
今回はNameNodeと同じホストで起動させる。
$ vi conf/masters
node01
BackupNode(0.2.1以降)を利用する場合は不要である。
conf/hdfs-site.xml内に記載するためである。

DataNode、TaskTrackerを起動させるホストを指定する。
$ vi conf/slaves
node02
node03




【全ノードでの設定】
・共通設定
$ vi conf/core-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://node01:9000</value>
  </property>

  <property>
    <name>hadoop.tmp.dir</name>
    <value>/dsk01/hadoop-${user.name}</value>
  </property>
</configuration>


・HDFSの設定
$ vi conf/hdfs-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

  <property>
    <name>dfs.name.dir</name>
          dfs.namenode.name.dir(0.2.1系)
    <value>${hadoop.tmp.dir}/dfs/name</value>
  </property>

  <property>
    <name>dfs.data.dir</name>
          dfs.datanode.data.dir(0.2.1系)
    <value>${hadoop.tmp.dir}/dfs/data</value>
  </property>
  
  HDFSのブロックサイズよりも小さなファイルはそのまま配置される。
  134217728 / 1024 / 1024 = 128MB
  <property>
    <name>dfs.block.size</name>
    <value>134217728</value>
  </property>

  ディスクサイズはレプリケーション数分増える
  <property>
    <name>dfs.replication</name>
    <value>3</value>
  </property>


 ※※※※※※※※※※※※※※※※※※※※※
 ・CheckpointNode利用時
  <property>
    <name>fs.checkpoint.dir</name>
    <value>${hadoop.tmp.dir}/dfs/checkpoint</value>
  </property>

  <property>
    <name>dfs.namenode.checkpoint.period</name>
    <value>3600</value>
  </property>

  <property>
    <name>dfs.namenode.checkpoint.size</name>
    <value>67108864</value>
  </property>


 ・BackupNode利用時
 fsimageとeditsがhttp経由で行われるため明示的に書いておく必要がある。
  <property>
    <name>dfs.namenode.http-address</name>
    <value>node01:50070</value>
  </property>

  <property>
    <name>dfs.namenode.backup.address</name>
    <value>バックアップ機:50100</value>
  </property>

 ※BackupNodeのデータ領域もdfs.name.dirでの設定値が使われる。
  そのため、NameNodeと同一ホストで起動することはできない。
 ※※※※※※※※※※※※※※※※※※※※※

</configuration>


・MapReduceの設定
$ vi conf/mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <property>
    <name>mapred.job.tracker</name>
          mapreduce.jobtracker.address(0.2.1系)
    <value>node01:9001</value>
  </property>

  <property>
    <name>mapred.local.dir</name>
          mapreduce.cluster.local.dir(0.2.1系)
    <value>${hadoop.tmp.dir}/mapred/local</value>
  </property>

  <property>
     <name>mapred.job.reuse.jvm.num.tasks</name>
           mapreduce.job.jvm.numtasks(0.2.1系)
     <value>-1</value>
  </property>

  <property>
    <name>mapred.child.java.opts</name>
          mapreduce.map.java.opts && mapreduce.reduce.java.opts(0.2.1系)
    <value>-Xmx200m</value>
  </property>

  <property>
    <name>mapred.tasktracker.map.tasks.maximum</name>
          mapreduce.tasktracker.map.tasks.maximum(0.2.1系)
    <value>8</value>
  </property>

  <property>
    <name>mapred.tasktracker.reduce.tasks.maximum</name>
          mapreduce.tasktracker.reduce.tasks.maximum(0.2.1系)
    <value>8</value>
  </property>
</configuration>


こういったツールを用意しておけば便利だろう。
# vi copy.sh
for i in `seq -f %02g 2 3`
do
  scp /usr/local/hadoop/conf/core-site.xml   hadoop@node${i}:/usr/local/hadoop/conf/core-site.xml
  scp /usr/local/hadoop/conf/hdfs-site.xml   hadoop@node${i}:/usr/local/hadoop/conf/hdfs-site.xml
  scp /usr/local/hadoop/conf/mapred-site.xml hadoop@node${i}:/usr/local/hadoop/conf/mapred-site.xml
done



 HDFSのフォーマット
HDFSの実ディレクトリを念のため事前に削除しておく。
core-site.xmlのhadoop.tmp.dirで指定したディレクトリである。
各サーバで実施する。
$ rm -rf /dsk01/hadoop-hadoop

HDFSのフォーマットを実施する。
マスタノードだけで実行すればよい。
$ hadoop namenode -format



◆ hadoopの起動
マスタノードで実行する。
$ ./bin/start-all.sh
starting namenode, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-namenode-node01.out
node01: starting secondarynamenode, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-secondarynamenode-node01.out
node02: starting datanode, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-datanode-node02.out
node03: starting datanode, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-datanode-node03.out
starting jobtracker, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-jobtracker-node01.out
node02: starting tasktracker, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-tasktracker-node02.out
node03: starting tasktracker, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-tasktracker-node03.out


各ホストで以下を実施していってもよい。
$ ./bin/start-dfs.sh
$ ./bin/start-mapred.sh


停止するにはマスタから以下を実行する。
$ ./bin/stop-all.sh


WEBから起動を確認できる。
NameNode
http://node01:50070/

JobTracker
http://node01:50030/



 動作試験1
ファイル内の文字数をカウントさせる。
wordcountというプログラムがディフォルトで用意されているのでそれを使う。

HDFSが分散ファイルシステムであることをイメージするため、
各ノードそれぞれからファイルをHDFSへ置いてみる。

・node01側
$ mkdir input
$ vi input/file1
a b c

$ vi input/file2
d d e f g

$ vi input/file3
a d e f

$ hadoop fs -put input input
圧縮ファイルでもそのままputすればよい。


・node02側
$ mkdir input
$ vi input/file4
d e f

$ hadoop fs -put input/file4 input/


・node03側
$ mkdir input
$ vi input/file5
g h i i j k

$ hadoop fs -put input/file5 input/


当然どのノードからも実行でき、見え方にも相違はない。
$ hadoop fs -ls
drwxr-xr-x   - hadoop supergroup          0 2012-10-03 11:04 /user/hadoop/input

$ hadoop fs -ls /user/hadoop/input
Found 5 items
-rw-r--r--   2 hadoop supergroup          6 YYYY-MM-DD mm:dd /user/hadoop/input/file1
-rw-r--r--   2 hadoop supergroup         11 YYYY-MM-DD mm:dd /user/hadoop/input/file2
-rw-r--r--   2 hadoop supergroup          8 YYYY-MM-DD mm:dd /user/hadoop/input/file3
-rw-r--r--   2 hadoop supergroup          6 YYYY-MM-DD mm:dd /user/hadoop/input/file4
-rw-r--r--   2 hadoop supergroup         12 YYYY-MM-DD mm:dd /user/hadoop/input/file5
※ファイルがないと以下のエラーが出る
ls: Cannot access .: No such file or directory.

実行させる。
$ hadoop jar /usr/local/hadoop/hadoop-examples-1.0.3.jar wordcount input output

$ hadoop fs -ls output
-rw-r--r--   2 hadoop supergroup          0 YYYY-MM-DD mm:dd /user/hadoop/output/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 YYYY-MM-DD mm:dd /user/hadoop/output/_logs
-rw-r--r--   2 hadoop supergroup         44 YYYY-MM-DD mm:dd /user/hadoop/output/part-00000

$ hadoop fs -cat /user/hadoop/output/part-00000
a       2
b       1
c       1
(略)



 動作試験2
独自にmapperとreducerを作成し試験1と同様のことを実施する。

mapperとreducer用のファイルを作成する。
分散処理のコマンドを発行するホストのみにあればよい。
$ vi mapper.rb
#!/usr/bin/ruby

ARGF.each_line do |line|
  line.chomp!
  words = line.split(' ')
  words.each do |word|
    puts "#{word.upcase}\t1"
  end
end


$ vi reducer.rb
#!/usr/bin/ruby

counter = Hash.new {|h,k| h[k] = 0}

ARGF.each_line do |line|
  line.chomp!
  word, num = line.split(/\t/)
  counter[word] += num.to_i
end

counter.each do |word, counter|
  puts "#{word}\t#{counter}"
end


$ chmod 755  mapper.rb

$ chmod 755  reducer.rb

いきなり分散処理させる前に、作成したツールの動作試験をしておく。
$ cat input/* | ./mapper.rb | ./reducer.rb

MapReduce処理を実施させる。
その前に、動作試験1での結果ファイルは消しておく。
結果用のディレクトリ名を変えてもいい。
$ hadoop fs -rmr output

$ hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \
-input input \
-output output \
-mapper 'mapper.rb' \
-reducer 'reducer.rb' \
-file mapper.rb \
-file reducer.rb

-fileオプションを指定したファイルはコマンド発行ホストからコピーされる。
外部ファイルが別ファイルシステムにある場合はここを参考にしてほしい。

$ hadoop fs -ls output

$ hadoop fs -cat /user/hadoop/output/part-00000


ジョブをキャンセルするには以下のようにする。
デーモン処理されているのでCtrl-Cではジョブは停止できない。
$ hadoop job -list all
$ hadoop job -kill job_xxxxxxxx



 DataNodeの縮退手順
メンテナンスなどのために、あるDataNodeを停止させるとする。
レプリケーション数を満たさなくなるが、停止するDataNodeが
持っているブロックを他のDataNodeにレプリケーションされてから停止してくれる。

マスタノード側で縮退させるノードを設定する。
$ vi conf/hdfs-site.xml
以下を追記する。
  <property>
      <name>dfs.hosts.exclude</name>
      <value>${HADOOP_HOME}/conf/dfs.hosts.exclude</value>
  </property>

$ vi ./conf/dfs.hosts.exclude
node02

ネームノードを更新する。
$ hadoop dfsadmin -refreshNodes

状態を確認する。
$ hadoop dfsadmin -report
Decommission Status : Normal

Decommission Status : Decommission in progress

Decommission Status : Decommissioned

レプリケーションするサーバ数が足りないままになると
"Decommission in progress"の状態から動かなくなった。


縮退させるDataNode側でDataNodeを停止する。
$ ./bin/hadoop-daemon.sh stop datanode

マスタ側のslavesファイルからも該当ノードも削除しておく。
$ vi conf/slaves

TaskTrackerは処理を停止することなく増減させられる。
JobTrackerへTaskTrackerは通知を上げてメンバへの脱退・加入をするためである。



 DataNodeの参加手順
該当ノードを削除する。
$ vi ./conf/dfs.hosts.exclude

ネームノードを更新する。
$ hadoop dfsadmin -refreshNodes

状態を確認する。
$ hadoop dfsadmin -report
Decommission Status : Normal

該当ノードを追加する。
$ vi conf/slaves

DataNodeを起動させる。
縮退させるDataNode側
$ ./bin/hadoop-daemon.sh start datanode



 NameNode故障時の復旧方法
1. fsimageやeditsを格納するname配下をすべてrmする。

2. BackupNodeのname配下のcurrentとimage配下だけをscpでコピーする。

以上である。

※BackupNodeの起動コマンドをメモ。
$ ./bin/hdfs namenode -backup



ここで書ききれなかったhadoopのチューニングポイントはここでまとめた。
hadoopでhdfs化したのちにhiveとimpalaを導入し高速ログ検索する内容はこちら