2014年3月24日月曜日

cassandraの運用時に必要な定期バッチ処理の考察


cassandraを運用する上でどういった
定期バッチ処理が必要かについてまとめる。

しかし、cassandraの前提知識がないと
必要性を理解しにくいため、
下の流れで若干遠回りしながら説明していく。

◆ 分散システムの予備知識
◆ 修復機能の予備知識
◆ 定期的に必要となる処理の概要
◆ 定期的に必要となる処理の詳細



◆ 分散システムの予備知識

cassandraのような分散システムではノード間のデータで
同時に次の3つを保証することは難しい。
すべてを満たすためにはレイテンシー(遅延)を許容する必要がある。

CAP定理
Consistency:一貫性
Availability:可用性
Partition Tolerance:分断耐性

cassandraは可用性と分断耐性(AP)を重視し、一貫性を犠牲にしているが、
遅延とのトレードオフで一貫性を得ることができる。
コンシステンシレベルというパラメータでデータをいくつのノードに
レプリケーションするかを決められる。
ONE、TWO、THREE、QUORUM、ALL、LOCAL_QUORUM、EACH_QUORUM、ANYなどがある。

メモ
SQL:CAを重視
Hbase:CPを重視


もう少し一貫性という点を掘りさげてみる。
レプリケーション数をNとする。
READのときに読み込みを行うノード数をRとする。
WRITEのときに書込み完了を待つノード数をWとする。

例えば、Nが3で、Wが2の書き込み時は、
2台の書き込み完了を待ち、
残りの1台は非同期でレプリケーションが行われる。

このR、W、Nの間には下の関係があれば一貫性が保てる。
R + W > N

なぜこの式が成り立つのか。
証明は調べてもらうとして、直感的にはすぐに分かるだろう。

N=3、R=2、W=2、コンシステンシレベルがQUORUMの場合を考える。

3台のノード
 -----   -----   ----- 
| old | | old | | old |
 -----   -----   -----

書き込み(2箇所に書き込めたのでOK)
 -----   -----   -----
| new | | new | | old |
 -----   -----   -----

3台のノードのうち、少なくともW=2台のノードは最新の値が保存される。
3台からR=2台を選んで値を読み取るところを想像してほしい。
どの2台を選んでも,少なくともひとつは最新の値を参照できる。


余談だが、データの格納ノードにはいくつかの戦略がある。
● プライマリノードの選定(Partitioner)
row keyの値を元に以下の戦略で選定(詳細は割愛)。
・Murmur3Partitioner

・RandomPartitioner

・ByteOrderedPartitioner

・OrderPreservingPartitioner


● レプリカノードの選定(Replica Placement Strategy)
・Simple Strategy
リング上でプライマリレプリカの隣に配置されたノードを順次レプリカとして選択。
1号機がprimary replica nodeであれば、そのコピーは2号機と3号機で作られる。

・Network Topology Strategy
データセンター・ラック定義に従い、レプリカ配置先を選択。


本線に戻る。
次の疑問として、一貫性が失われている状態があった場合
いつ、どうやって解消されるのか。
それはHinted Handoff、Read Repair、Anti Entropyの修復機構で補われる。



◆ 修復機能の予備知識

 Hinted Handoff
WRITE時に担当ノードに障害が発生していた場合、
処理を受け取った正常なノードが追加データを一定期間預かる仕組み。

WRITEの際にそのキーを担当するレプリカノードの一つが停止していた場合、
正常に稼働しているコーディネータノード、またはレプリカノードの一つにヒント情報が保存され
停止していたノードの復活時に利用される。
※ver.1.0以降はコーディネータノードが受け持つ
※それ以前はレプリケーションノードが受け持っていた

max_hint_window_in_ms: これより古いhintは削除
hinted_handoff_throttle_delay_in_ms: HHリカバリー時の負荷制御


 Read Repair
READ時に複数の担当ノードから返却されたデータを比較して、
その結果が異なっていた場合にROW単位で最新のデータに更新する。


 Anti Entropy
各ノードがもっているデータの比較を行い、結果が異なる場合に、
Column Family単位で最新のデータに更新する仕組みである。
nodetoolコマンドにreapirオプションを付与して実行すると、
その結果としてAnti Entropyが走る。



◆ 定期的に必要となる処理の概要

修復機能を見ると気になることが出てこないだろうか。
削除操作を受け取れなかったレプリカが存在した場合の挙動時である。
そのレプリカが復旧すると、そのレプリカは消されるはずのデータを持ち、
それ以外のレプリカはデータを持たないという状況が発生する。
すると、本来は削除するべき操作のはずが、修復機能により
データが戻されてしまうのではないか。

その問題を回避するために、cassandraはデータを実際に消去する代わりに、
tombstoneと呼ばれるフラグをまず立てる。
ノード間のデータ整合時にtombstoneのフラグの有無の違いがある場合は
tombstoneが有りの状態が優先される。
そうでなければデータが復活することになるためである。

物理削除するまでの猶予期間(gc_grace_seconds)内で
ノード間のデータの不整合を修復し、
compaction(この後に説明)時にtombstoneフラグが立ったデータを実削除する。


つまり、定期的に動かすバッチ処理として次の2つが必須であることが想定できる。
・gc_grace_secondsの期間内での網羅的なデータの修復
・compation

gc_grace_secondsの期間内に網羅的な修復をしなかった場合に
どういった問題が発生するのか、具体例で考えてみる。


(問題なし)
gc_grece期間内に網羅的な修復をした場合

レプリカ数3、READ、WRITEのconsistency levelはQUORUM
f: 削除処理をしてtombstoneによる削除マーカが付与(gc_grace期間のスタート)
-: 完全削除
x: 何かしらの問題で削除処理が失敗した状態

     削除     網羅的な修復     compaction
         ↓          ↓               ↓
node1    f          f               -
node2    f          f               -
node3    x          f               -
      ------------------------------------------>
         |                    |
         |  gc_grace_seconds  |
         |                    |


(問題あり)
gc_grece期間内に網羅的な修復をしなかった場合

     削除                    compaction
         ↓                         ↓
node1    f                         -        x
node2    f                         -        x
node3    x                         x        x
      ------------------------------------------>
         |                    |
         |  gc_grace_seconds  |
         |                    |


compaction後に消したはずのデータが残り続ける。
Read Repair処理を覚えているだろうか。
READ時に複数のノードから返却される結果が異なっていた際に
自動的にデータ修復を行う機能であった。
データがない場合には、他ノードからデータの複製が行われ、
削除したはずのものが復活する事態を招く。



◆ 定期的に必要となる処理の詳細

● 網羅的なデータの修復

網羅的なデータの修復と一言で言っていたがどのようなものがあるのだろうか。

 Anti Entropy
nodetoolコマンドにreapirオプションを付与して実行すると、
Anti Entropyが走ると簡単に説明していた。
これによりデータの修復ができる。

nodetool repairをあるノードで実施すれば、そのノードが参加するレプリカノード間で
データ構造全体を比較し同期を行う。
全ノードで実施する必要はない。

例えば9台ノードがあり、3台でレプリケーションさせているのであれば、
1、4、7号機で実施すれば全データを網羅できる。

X号機:X号機(primaryデータ), Y号機(secondaryデータ), Z号機(thirdデータ)
1: 1(p), 9(s), 8(t)
2: 2(p), 1(s), 9(t)
3: 3(p), 2(s), 1(t)
4: 4(p), 3(s), 2(t)
5: 5(p), 4(s), 3(t)
6: 6(p), 5(s), 4(t)
7: 7(p), 6(s), 5(t)
8: 8(p), 7(s), 6(t)
9: 9(p), 8(s), 7(t)

1号機にAnti Entropy(nodetool repair)を実行すると以下の3つの処理が行われる。

・1号機上の1号機の担当するtokenのデータ1(primary)の修復のため、
1号機、2号機、3号機の1号機の担当するtokenのデータと比較し修復。

・1号機上の9号機の担当するtokenのデータ2(secondary)の修復のため、
9号機、1号機、2号機の9号機の担当するtokenのデータと比較し修復。

・1号機上の8号機の担当するtokenのデータ3(third)の修復のため、
8号機、9号機、1号機の8号機の担当するtokenのデータと比較し修復。


nodetool repairのオプションとして"-pr"を使うと動きが変わる。
・1号機上の"1号機の担当するtokenのデータ1(primary)の修復のため、
1号機、2号機、3号機の1号機の担当するtokenのデータと比較し修復。


Read Repairより速度は速いが修復が必要なデータサイズに応じて
一時領域が必要であるため、小さなデータのrepairに有利である。

(補足)
Anti Entropyが走ると最新の情報をノード間で比較するために、
まずMemTableの内容をSSTableに書きだすflush処理が行われる。
またSSTableとは別にノード間の差分をチェックするため
Merkle Tree(Hash Tree)が作られる。
MerkleTreeの作成にはノード上の全データのスキャンが必要であるため負荷をかけるが、
データセットをすべてレプリカ間で転送せずにチェック出来るため、
不整合の早期検知とデータ転送量を最小化することができる。


 Read Repair
ツール等で全データへのアクセスを強制的に実施することで
Read Repair機能を働かせてデータの不整合を修復する。
時間が比較して長くなるが一時領域が不要のため
大きなデータのRepairに有利である。
※cassandraの機能では未実装。



● compation

compactionの役割はSSTableの整理である。
compaction時にはflushが行われる。
SSTableを整理することでディスクの効率を高め、
またまたディスクのフリースペースを増やす効果もある。

そしてこの際にtombstoneのついたデータの削除を行う。

compactionと一言で言ってもいくつか種類がある。
それぞれの大きな相違点をまとめる。


 SizeTieredCompaction
・Major Compaction
(役割)
・すべてのSSTableを一つに併合
 (指定したCFのすべてのSSTableと同様の一時領域が必要)
・tombstoneの削除処理

(起動トリガー)
手動で実施(カラムファミリー毎SizeTieredCompactionStrategyで指定)
nodetool compactコマンドで実施
引数にカラムファミリを指定可能


・Minor Compaction
(役割)
・ファイルの個数を閾値として近い大きさのSSTableの併合
 (compaction対象となったSSTableと同様の一時領域が必要)
・tombstoneの削除処理

(起動トリガー)
自動実行(カラムファミリー毎SizeTieredCompactionStrategyで指定)

(動作概要)
SSTable併合ロジックは次の通り。
1. 対象Column FamilyのSSTableをサイズ別に小さい順でソート
    2. 近いサイズのSSTable(※1)ごとにグルーピング(バケット化)
 3. グループ=バケット毎に、マージ処理の実施
設定上限時個(※2)のSSTableがマージされる(※3)

※1
近いサイズとは、以下の条件を満たすものである
(avg * 0.5) < size < avg * 1.5

size: 対象のSSTableのサイズ
avg : 着目するバケット内のSSTableの平均サイズ

バケット内のSSTableの平均値に対して±50%
ただし、50KB以下のSSTableは同じバケット

※2
閾値の確認方法
表示:nodetool getcompactionthreshold
変更:nodetool setcompactionthreshold
デフォルト値 min=4, max=32
min=0またはmax=0でMinor Compaction無効になる

※3
バケット内に設定上限個以上のSSTableが含まれる場合、
いくつかのSSTableはマージされない
小さいサイズのものからマージされるため、
大きいサイズのSSTableはマージ対象になりにくくなる可能性がある。
またマージされないとtombstoneは削除対象にならなず
物理ディスク容量が下がらない。
物理削除可能なデータを閾値にcompactionを実施する(tombstone compaction)機能が
v1.2から利用可能。


 Leveled Compaction
(役割)
・SSTable1つあたりの大きさを一定とし、レベル分けして併合
 (Compaction対象となったSSTableと同様の一時領域が必要)
・tombstoneの削除処理

(起動トリガー)
自動実行(カラムファミリー毎にLeveledCompactionStrategyで指定)。
LeveledCompactionではflushが行われると直後に必ずComapctionが走る。

(動作概要)
compactionした結果、その階層のサイズを超過する場合は古いデータが
次の階層に追いやられる。
階層ごとに保持できるデータサイズは決まっており、
Level nの階層はSSTable最大サイズ * 10^n のデータを保持できる
(sstable_size_in_mb * 10^n)。

簡単な例で動きを追ってみる。
1. MemTableがflushされると作成されるSSTableはLevel0に追加される
2. Level0は即座にLevel1のSSTableとcompactionされる
3. compactionした結果、その階層のサイズを超過する場合は古いSSTableをLevel2に追いやる

カラムファミリごとに、どのSSTableがどの階層に属しているかはjsonファイルで管理されている。
同一階層ではRowKeyの重複は許されない。
そのことにより読み書きやcompactionに必要なリソースの低減がはかれる。

何かしらの理由で階層を初期化したい場合は、casanndraプロセスを停止後、
jsonファイルを全削除し、compactionを行えばよい。
cassandraを起動させるとcompactionは自動で全てのSSTableを対象として実施される。
jsonファイルがなくなり、階層判断が出来なくなるので
複数の階層にまたがっていたデータが1つにまとめられる。
jsonファイルが存在せずに階層構造が判断できないこの間は
データはLevel0に配置される。
そのためデータの参照ができないということはない

(注意点1)
階層が深くなるにつれレスポンスが悪化する可能性がある。
このコンパクションでは各世代でRowKeyの重複を許す実装になっている。
世代が下っていくと、あるRowKeyのデータを持つSSTableが複数存在する場合が出てくる。
Levelがn個あれば、最大でn個のSSTableが同じRowKeyのデータを持つこともあり得る。
参照するための検索コストだけではなく、その分ヒープメモリを消費するため、
それにともないGCの頻度や実行時間が増加する。

最も浅いLevelにあるデータだけを読めばいいのでは、と疑問をもつかもしれない。
しかし対象のRowKeyが存在するSSTableはすべて見なくてはならない。
RowとはRowKeyに紐づいて取得できるカラムの集合である。
一つのキーに対してカラムは一つとは限らない。
(例)
あるRowKey1に、Column1とColumn2が紐づいており、
SSTableのLevel2にRowKey1に紐づくColumn1が存在するとする。
この状態から、Rowkey1のColumw2 への修正が走ると、
MemTable上のRowKey1にはColumn2が作られる。
flushされると、Level1のRowKey1はColumn2が、
Level2のRowKey1はColumn1を持つという状態が生まれる。
よってすべてのLevelにあるSSTableを読む必要が出てくるわけである。

(注意点2)
Leveled Compactionではgc_grace期間を過ぎたデータが
compactionが実施されているにも関わらず残り続ける場合がある。

例としてあげた先の1~3のシーケンスの続きを使って考える。
これまでのフローであるRowKeyのデータはLevel2にあった。
4. Level2にあるデータが論理削除(物理削除ではない)されたとする。
5. しかし、tombstoneが付与されるデータはMemTable上に作られる
   ※Level2のデータにはtombstoneは付与されない
6. flushが起こり、MemTableから作成されるSSTableがLevel0に置かれる
7. Level0は即座にLevel1のSSTableとcompactionされる

同じRowKeyのデータであっても、階層間(Level1とLevel2)で
異なる状態(tombstoneの有無)が発生した。

では、このtombstoneが付与されたデータはいつ消されるのであろうか。
それはcompactionが走り、Level1にあるデータがLevel2に追加される段階で、
gc_grace期間を過ぎている場合に、tombstoneが付与されていないデータと共に削除される。

Leveled Compactionでは、gc_grace後のcompactionでは物理削除される条件にならず、
tombstoneフラグデータと、tombstoneフラグが付与されていない同一データが
邂逅した際に初めて物理的に消されることになる。
さらに深い階層にフラグが立っていないデータがあった場合は、
物理削除はより遅れることになる。

Anti Entropyが実行される際、ノード間の差分をチェックするために、
Merkle Tree(Hash Tree)が利用されることは説明した。
Level1とLevel2階層間でのtombstoneの有無の違いがある場合は、
Merkle Treeにはどちらが登録されるのであろうか。
階層に因らずtombstoneがついたデータが優先される。

(注意点3 ※ver1.2以降では解消されている)
Leveled Compactionが行うcompaction中に、
nodetool repairを手動で動かしてはならない。

Leveled Compactionのcompaction時、MemTableの内容を
SSTableに書きだすためにflush処理が動作する。
またnodetool repairコマンドによってAnti Entropyを働かせても
MemTableの内容をSSTableに書きだすためにflush処理が作動する。

flushの結果、作られるのはSSTableだけではなく
Bloom Filterを使ったファイルも作られる。
これはあるワードが存在するかもしれない(false positive)、
または確実に存在しないことを判定するアルゴリズムである。
SSTableを参照する前に、keyの検索前に存在の有無を定数オーダで
判定することで負荷を軽減できる。

nodetool repairの実行後は、Anti Entropyのために、
ノード間の差分をチェックするために利用されるMerkle Tree(Hash Tree)が作成された。

Merkle Treeだが、実はBloom Filterを元に読み込むサイズが計算される。
Leveled Compactionが行うcompactionの結果のBloom Filter用ファイルを
nodetool repairが参照し、見積ったサイズが小さい側に誤っていると
データの読み込みが不十分になり、Merkle Treeが正しく作られない。
(例)
Leveled Compaction    
      ↓ flush         nodetool repair
                            ↓ flush
BloomFilter #1        BloomFilter #2
                            ↓
                      MerkelTree #1を元に作成

Leveled CompactionのflushによってBloomFilter#1が作られている間に、
nodetool repairによるflushによるBloomFilter#2作成処理が重なった場合は、
後者の#2処理は待たされる。
#1の処理が終わった後に#2が作られるのではなく、#1ができた時点で、
#2の待機が解除され、#1を利用したMerkleTreeの作成に進む。

特定条件下で、削除データが復活する場合もありえる。
上の例で、Level1にはtombstoneがついたデータが、
Level2にはtombstoneがついていない同じデータ共存していた。

通常は、Merkel Treeにはtombstoneがあれば階層に因らずtombstoneが優先される。
しかし、tombstoneがついていないデータが参照された時点で、
Merkle Treeの作成が終了し、tombstoneがついたデータがMerke Treeに含まれないと、
それが他ノードに伝播し復活してしまう。

対策としては2つあるだろうか。
・nodetool flushをnodetool repairを行う前に行う
この間にLeveled Compactionのcompactionが動く可能性もゼロではないが、
nodetool flush後は該当CFのMemTableは空になるため、
次のflushの発生までにはそれなりの時間が必要になる。

・nodetool repairを使わない
代わりに網羅的なデータの修復で記載した
全データにアクセスし、Read Repairの機能経由で修復を行う。



~compactionのまとめ~
いくつかのcompactionを紹介したが、どういった観点で選択するばといいだろうか。
提供するサービスにより異なるが傾向としては次のようになるだろう。

データサイズ大、データ数小、IO低 のサービス
⇒ Major + Minor Compaction

データサイズ小、データ数大、IO高 のサービス
⇒ Leveled Compaction

Leveled ComCompactionがIO処理に有利な理由は、
compaction実行時に必要となるディスクサイズが
SizeTieredCompactionに比べて小さいため
DiskのIOも抑えられるためである。



(その他 関連記事)
cassandra リングから外れたノードの再追加方法
cassandraのモニタリングポイント