Hadoopを利用するうえで必要な知識を密度濃くまとめた。
◆ 構成
マスタ スレーブ
HDFS NameNode DataNode
MapReduce JobTracker TaskTracker
mapper(×起動数)
reducer(×起動数)
本ブログ内では物理ホスト3台を利用する。
node01 マスタ
node02 スレーブ1
node03 スレーブ2
NameNodeのSecondaryNameNodeはどちらかを利用できる。
CheckpointNode
一定の間隔で同期する
BackupNode
常に同じ情報を保持し同期する
こちらは0.2.1以降でしか利用できない
◆ 事前作業
javaをインストールする。
# su -
# yum install java-1.6.0-openjdk
hostsへ登録する。
# vi /etc/hosts
x.x.x.x node01
y.y.y.y node02
z.z.z.z node03
※127.0.0.1でホスト名を登録しておかないこと。
iptablesの設定を見直し、
各ノード間での通信を許可しておく。
# vi /etc/sysconfig/iptables
# /etc/init.d/iptables restart
◆ hadoopのインストール
hadoopのver1.0.3(0.20系)をインストールする。
0.2.1系以降とのコンフィグ差分はそのつど記載する。
アカウントを作成する。
# useradd hadoop
パスワードは何でもよい。
# passwd hadoop
# cd /usr/local/src/
# wget http://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-1.0.3/hadoop-1.0.3.tar.gz
# tar zxvf hadoop-1.0.3.tar.gz
# mv hadoop-1.0.3 /usr/local/
# ln -s /usr/local/hadoop-1.0.3 /usr/local/hadoop
# cd /usr/local/hadoop
# chown -R hadoop.hadoop /usr/local/hadoop/
hadoopユーザの環境変数を設定しておく。
# su - hadoop
$ vi ./.bashrc
export JAVA_HOME=/usr
export HADOOP_INSTALL=/usr/local/hadoop
export PATH=$PATH:$HADOOP_INSTALL/bin:$JAVA_HOME/bin
$ source ~/.bashrc
hadoopコマンドが使えることと、インストールしたバージョンを確認する。
$ hadoop version
◆ 分散モードで利用するための設定
【事前作業】
マスタからスレーブへ鍵なしsshログインできなければならない。
スレーブ全サーバで秘密鍵と公開鍵を作成し、公開鍵をマスタへ配布しておくこと。
sshで各ノードにログインして各種プロセスを起動させるために、
起動ツールを動かすマスタ側だけで設定すればよい。
$ su - hadoop
スレーブ側で鍵を作成する。
$ ssh-keygen -t rsa -f ~/.ssh/id_rsa
作成した公開鍵をコピーする。
$ cat ~/.ssh/id_rsa.pub
コピーした公開鍵をマスタ側に記載する。
$ vi ~/.ssh/authorized_keys
$ chmod 600 ~/.ssh/authorized_keys
鍵なしでsshできることを確認する。
$ ssh hadoop@XXX
【マスタだけの設定】
$ cd /usr/local/hadoop/
CheckpointNodeを起動させるホストを指定する。
今回はNameNodeと同じホストで起動させる。
$ vi conf/masters
node01
BackupNode(0.2.1以降)を利用する場合は不要である。
conf/hdfs-site.xml内に記載するためである。
DataNode、TaskTrackerを起動させるホストを指定する。
$ vi conf/slaves
node02
node03
【全ノードでの設定】
・共通設定
$ vi conf/core-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://node01:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/dsk01/hadoop-${user.name}</value>
</property>
</configuration>
・HDFSの設定
$ vi conf/hdfs-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<property>
<name>dfs.name.dir</name>
dfs.namenode.name.dir(0.2.1系)
<value>${hadoop.tmp.dir}/dfs/name</value>
</property>
<property>
<name>dfs.data.dir</name>
dfs.datanode.data.dir(0.2.1系)
<value>${hadoop.tmp.dir}/dfs/data</value>
</property>
HDFSのブロックサイズよりも小さなファイルはそのまま配置される。
134217728 / 1024 / 1024 = 128MB
<property>
<name>dfs.block.size</name>
<value>134217728</value>
</property>
ディスクサイズはレプリケーション数分増える
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
※※※※※※※※※※※※※※※※※※※※※
・CheckpointNode利用時
<property>
<name>fs.checkpoint.dir</name>
<value>${hadoop.tmp.dir}/dfs/checkpoint</value>
</property>
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>
<property>
<name>dfs.namenode.checkpoint.size</name>
<value>67108864</value>
</property>
・BackupNode利用時
fsimageとeditsがhttp経由で行われるため明示的に書いておく必要がある。
<property>
<name>dfs.namenode.http-address</name>
<value>node01:50070</value>
</property>
<property>
<name>dfs.namenode.backup.address</name>
<value>バックアップ機:50100</value>
</property>
※BackupNodeのデータ領域もdfs.name.dirでの設定値が使われる。
そのため、NameNodeと同一ホストで起動することはできない。
※※※※※※※※※※※※※※※※※※※※※
</configuration>
・MapReduceの設定
$ vi conf/mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapred.job.tracker</name>
mapreduce.jobtracker.address(0.2.1系)
<value>node01:9001</value>
</property>
<property>
<name>mapred.local.dir</name>
mapreduce.cluster.local.dir(0.2.1系)
<value>${hadoop.tmp.dir}/mapred/local</value>
</property>
<property>
<name>mapred.job.reuse.jvm.num.tasks</name>
mapreduce.job.jvm.numtasks(0.2.1系)
<value>-1</value>
</property>
<property>
<name>mapred.child.java.opts</name>
mapreduce.map.java.opts && mapreduce.reduce.java.opts(0.2.1系)
<value>-Xmx200m</value>
</property>
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
mapreduce.tasktracker.map.tasks.maximum(0.2.1系)
<value>8</value>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
mapreduce.tasktracker.reduce.tasks.maximum(0.2.1系)
<value>8</value>
</property>
</configuration>
こういったツールを用意しておけば便利だろう。
# vi copy.sh
for i in `seq -f %02g 2 3`
do
scp /usr/local/hadoop/conf/core-site.xml hadoop@node${i}:/usr/local/hadoop/conf/core-site.xml
scp /usr/local/hadoop/conf/hdfs-site.xml hadoop@node${i}:/usr/local/hadoop/conf/hdfs-site.xml
scp /usr/local/hadoop/conf/mapred-site.xml hadoop@node${i}:/usr/local/hadoop/conf/mapred-site.xml
done
◆ HDFSのフォーマット
HDFSの実ディレクトリを念のため事前に削除しておく。
core-site.xmlのhadoop.tmp.dirで指定したディレクトリである。
各サーバで実施する。
$ rm -rf /dsk01/hadoop-hadoop
HDFSのフォーマットを実施する。
マスタノードだけで実行すればよい。
$ hadoop namenode -format
◆ hadoopの起動
マスタノードで実行する。
$ ./bin/start-all.sh
starting namenode, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-namenode-node01.out
node01: starting secondarynamenode, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-secondarynamenode-node01.out
node02: starting datanode, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-datanode-node02.out
node03: starting datanode, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-datanode-node03.out
starting jobtracker, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-jobtracker-node01.out
node02: starting tasktracker, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-tasktracker-node02.out
node03: starting tasktracker, logging to /usr/local/hadoop-1.0.3/libexec/../logs/hadoop-hadoop-tasktracker-node03.out
各ホストで以下を実施していってもよい。
$ ./bin/start-dfs.sh
$ ./bin/start-mapred.sh
停止するにはマスタから以下を実行する。
$ ./bin/stop-all.sh
WEBから起動を確認できる。
NameNode
http://node01:50070/
JobTracker
http://node01:50030/
◆ 動作試験1
ファイル内の文字数をカウントさせる。
wordcountというプログラムがディフォルトで用意されているのでそれを使う。
HDFSが分散ファイルシステムであることをイメージするため、
各ノードそれぞれからファイルをHDFSへ置いてみる。
・node01側
$ mkdir input
$ vi input/file1
a b c
$ vi input/file2
d d e f g
$ vi input/file3
a d e f
$ hadoop fs -put input input
圧縮ファイルでもそのままputすればよい。
・node02側
$ mkdir input
$ vi input/file4
d e f
$ hadoop fs -put input/file4 input/
・node03側
$ mkdir input
$ vi input/file5
g h i i j k
$ hadoop fs -put input/file5 input/
当然どのノードからも実行でき、見え方にも相違はない。
$ hadoop fs -ls
drwxr-xr-x - hadoop supergroup 0 2012-10-03 11:04 /user/hadoop/input
$ hadoop fs -ls /user/hadoop/input
Found 5 items
-rw-r--r-- 2 hadoop supergroup 6 YYYY-MM-DD mm:dd /user/hadoop/input/file1
-rw-r--r-- 2 hadoop supergroup 11 YYYY-MM-DD mm:dd /user/hadoop/input/file2
-rw-r--r-- 2 hadoop supergroup 8 YYYY-MM-DD mm:dd /user/hadoop/input/file3
-rw-r--r-- 2 hadoop supergroup 6 YYYY-MM-DD mm:dd /user/hadoop/input/file4
-rw-r--r-- 2 hadoop supergroup 12 YYYY-MM-DD mm:dd /user/hadoop/input/file5
※ファイルがないと以下のエラーが出る
ls: Cannot access .: No such file or directory.
実行させる。
$ hadoop jar /usr/local/hadoop/hadoop-examples-1.0.3.jar wordcount input output
$ hadoop fs -ls output
-rw-r--r-- 2 hadoop supergroup 0 YYYY-MM-DD mm:dd /user/hadoop/output/_SUCCESS
drwxr-xr-x - hadoop supergroup 0 YYYY-MM-DD mm:dd /user/hadoop/output/_logs
-rw-r--r-- 2 hadoop supergroup 44 YYYY-MM-DD mm:dd /user/hadoop/output/part-00000
$ hadoop fs -cat /user/hadoop/output/part-00000
a 2
b 1
c 1
(略)
◆ 動作試験2
独自にmapperとreducerを作成し試験1と同様のことを実施する。
mapperとreducer用のファイルを作成する。
分散処理のコマンドを発行するホストのみにあればよい。
$ vi mapper.rb
#!/usr/bin/ruby
ARGF.each_line do |line|
line.chomp!
words = line.split(' ')
words.each do |word|
puts "#{word.upcase}\t1"
end
end
$ vi reducer.rb
#!/usr/bin/ruby
counter = Hash.new {|h,k| h[k] = 0}
ARGF.each_line do |line|
line.chomp!
word, num = line.split(/\t/)
counter[word] += num.to_i
end
counter.each do |word, counter|
puts "#{word}\t#{counter}"
end
$ chmod 755 mapper.rb
$ chmod 755 reducer.rb
いきなり分散処理させる前に、作成したツールの動作試験をしておく。
$ cat input/* | ./mapper.rb | ./reducer.rb
MapReduce処理を実施させる。
その前に、動作試験1での結果ファイルは消しておく。
結果用のディレクトリ名を変えてもいい。
$ hadoop fs -rmr output
$ hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \
-input input \
-output output \
-mapper 'mapper.rb' \
-reducer 'reducer.rb' \
-file mapper.rb \
-file reducer.rb
-fileオプションを指定したファイルはコマンド発行ホストからコピーされる。
外部ファイルが別ファイルシステムにある場合はここを参考にしてほしい。
$ hadoop fs -ls output
$ hadoop fs -cat /user/hadoop/output/part-00000
ジョブをキャンセルするには以下のようにする。
デーモン処理されているのでCtrl-Cではジョブは停止できない。
$ hadoop job -list all
$ hadoop job -kill job_xxxxxxxx
◆ DataNodeの縮退手順
メンテナンスなどのために、あるDataNodeを停止させるとする。
レプリケーション数を満たさなくなるが、停止するDataNodeが
持っているブロックを他のDataNodeにレプリケーションされてから停止してくれる。
マスタノード側で縮退させるノードを設定する。
$ vi conf/hdfs-site.xml
以下を追記する。
<property>
<name>dfs.hosts.exclude</name>
<value>${HADOOP_HOME}/conf/dfs.hosts.exclude</value>
</property>
$ vi ./conf/dfs.hosts.exclude
node02
ネームノードを更新する。
$ hadoop dfsadmin -refreshNodes
状態を確認する。
$ hadoop dfsadmin -report
Decommission Status : Normal
↓
Decommission Status : Decommission in progress
↓
Decommission Status : Decommissioned
レプリケーションするサーバ数が足りないままになると
"Decommission in progress"の状態から動かなくなった。
縮退させるDataNode側でDataNodeを停止する。
$ ./bin/hadoop-daemon.sh stop datanode
マスタ側のslavesファイルからも該当ノードも削除しておく。
$ vi conf/slaves
TaskTrackerは処理を停止することなく増減させられる。
JobTrackerへTaskTrackerは通知を上げてメンバへの脱退・加入をするためである。
◆ DataNodeの参加手順
該当ノードを削除する。
$ vi ./conf/dfs.hosts.exclude
ネームノードを更新する。
$ hadoop dfsadmin -refreshNodes
状態を確認する。
$ hadoop dfsadmin -report
Decommission Status : Normal
該当ノードを追加する。
$ vi conf/slaves
DataNodeを起動させる。
縮退させるDataNode側
$ ./bin/hadoop-daemon.sh start datanode
◆ NameNode故障時の復旧方法
1. fsimageやeditsを格納するname配下をすべてrmする。
2. BackupNodeのname配下のcurrentとimage配下だけをscpでコピーする。
以上である。
※BackupNodeの起動コマンドをメモ。
$ ./bin/hdfs namenode -backup
ここで書ききれなかったhadoopのチューニングポイントはここでまとめた。
hadoopでhdfs化したのちにhiveとimpalaを導入し高速ログ検索する内容はこちら。