◆ 目的
サーバに届いたメールログをNoSQLであるCassandra(カサンドラ)のデータベースに格納させ、
高速に検索できるようにする。事前にCassandraの予備知識があると理解しやすいだろう。
SQLではないので、テーブルに格納されたデータを柔軟にselectすることはできない。
NoSQLはkey-value形式である。
そのため設計時に何から何を求めるか決めておく必要がある。
今回は次の目的を達成できるようにする。
a) fromアドレスと日時からのメールログの検索
⇒ fromアドレスと日時をキーにするわけである
b) 日時からのメールログの検索
⇒ 日時をキーにするわけである
追記
ここではcassandra-0.6を使っているのだが、既に0.8までアップデートされているようだ。
しかも大幅にいろいろと変更があるため、具体的な手順は参考にならないかもしれない。
ここではThrift APIを使っているのだが、SQL的なコマンドが使えるCQL(Cassandra Query Language)も
充実してきており、データベースの操作方法をも変わってくるだろう。
ただ、NoSQLの考え方さえ身につけば今後どのようなアップデートがあろうとも、
応用はできるはずだ。とうことで、このまま説明は続ける。
◆ データベースへ投入する前のログファイル
ログファイルは通常一行で並んでいるだろうか。
それをYAML形式にして、Cassandraに渡すようにする。
YAML形式にするための処理は力技で対応できるだろう。
ログファイル('---'はYAMLの区切り文字列である)
--- date : '20110920013059' mta : mta101 msg : Message Received fromip : 192.0.2.80 from : user1@example.com rcpts : user5@example.com uniq : 7EF042644D002253005E1653 --- date : '20110920013100' mta : mta101 msg : Message Received fromip : 192.0.2.200 from : user2@example.com rcpts : user9@example.com uniq : 7EF042644D002253005E1651 --- date : '20110920015100' mta : mta102 msg : User Unknown fromip : 192.0.2.3 from : user8@example.com rcpts : user3@example.com uniq : 7EF04264-4D002253-005E1650 ---
ログの説明は下のとおりである。
date ⇒ 日時
mta ⇒ トラフィックを処理したサーバ名
msg ⇒ トラフィックを処理した際のログ内容
fromip ⇒ メールの送信元IPアドレス
from ⇒ メールの送信元メールアドレス
rcpts ⇒ メールの送信先メールアドレス
uniq ⇒ メールシステムでは各ログを一意に特定するためのキーが付与されるだろう。
それを用いる。メッセージIDは使えないのか、と疑問を持つかもしれないが、
メッセージIDは決して一意ではないので使えない。
◆ データベース データモデル
カラムファミリ(SQLでいうところのテーブル)は3つ用意する。
・from_date ①
・date ②
・uniq ③
検索条件をキーにする。
そしてそのキーから得られる値を、さらにキーにして検索しなおすところがポイントである。
例えば、送信元アドレスと、日時をキーにして検索すると、
その条件にマッチする一意なユニークな値が得られる(①)。
そのユニークな値をキーにしたカラムファミリを用意しておけば、
そこからdate、mta、msg、msgid、fromip、from、rcptsなどのロウ(row)が検索できる(③)。
あるキーに紐づいて取得できる(スーパ)カラムの集合がロウである。
日時をキーにして検索すると、
その条件にマッチする一意なユニークな値が得られる(②)。
先と同じく、そのユニークな値をキーにしたカラムファミリから、
date、mta、msg、msgid、fromip、from、rcptsなどのロウ求めることができる(③)。
(SuperColumn)
(Key) (Key) (Column)
(Name) (Value) (TimeStamp)
elem['from'] - elem['date'] - elem['uniq'] - elem['uniq'] - 自動で付与 ①
"date" - elem['date'] - elem['uniq'] - elem['uniq'] - 自動で付与 ②
elem['uniq'] - "date" - elem['date'] - 自動で付与 ③
- "mta" - elem['mta'] - 自動で付与
- "msg" - elem['msg'] - 自動で付与
- "msgid" - elem['msgid'] - 自動で付与
- "fromip" - elem['fromip'] - 自動で付与
- "from" - elem['from'] - 自動で付与
- "rcpts" - elem['rcpts'] - 自動で付与
※elem[XXX]はYAML形式にしたログの各要素を表すとする。
注意
2つ目のdateカラムファミリにスーパーカラムは必要なのか?
不要だが、いるのだ。
日時は範囲検索(いつからいつまで)させたいのだが、
その検索条件をさせるために、今回利用するThrift APIが
キーのキーの範囲しか指定できなかったためであるorz
◆ コンフィグ
/usr/local/src/apache-cassandra-0.6.6/conf/storage-conf.xml
<Keyspaces>
<Keyspace Name="Log">
<ColumnFamily Name="from_date"
ColumnType="Super"
CompareWith="BytesType"
CompareSubcolumnsWith="BytesType" />
<ColumnFamily Name="date"
ColumnType="Super"
CompareWith="BytesType"
CompareSubcolumnsWith="BytesType" />
<ColumnFamily Name="uniq"
CompareWith="UTF8Type"
KeysCached="1000"
RowsCached="100"
RowCacheSavePeriodInSeconds="0"
KeyCacheSavePeriodInSeconds="3600"/>
</Keyspace>
</keyspaces>
◆ コード概要
ログの読み込みとデータベースへの書き込みを同時にしなくてはならない。
スレッドを使って役割を分けてやり、処理の待ち合わせを行いながら実行させていければいいだろう。
マルチスレッド デザインパターンである、Producer-Consumer パターンを使えばいいだろうか。
ファイルの読み込みとデータベースへの書き込みでは処理スピードが違うため
(前者の方が断然早い)スループットが落ちる。
そこで中継地点としてTableクラスをおき、そこにデータを保持させ、
処理スピードの違いを吸収させる。
あとはThrift APIを使って処理していけばいい。
今回はRubyを使う。Rubyを使ったCassandra操作はここを参照されたい。
◆ コード
#!/usr/bin/ruby
require 'rubygems'
require 'cassandra'
require 'yaml'
class Table
def initialize()
@queue_num = 0
@queues = Array.new
@queue_max = 5
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def push(num)
@mutex.synchronize do
while @queues.size() >= @queue_max
@cv.wait(@mutex)
end
@queues.push(num)
@cv.broadcast
end
end
def take()
@mutex.synchronize do
while @queues.size() == 0
@cv.wait(@mutex)
end
queue = @queues.shift()
@cv.broadcast
return queue
end
end
end
class Database
def initialize(table, file)
@table = table
@client = Cassandra.new('Log', '127.0.0.1:9160')
@file = open(file)
@t_inserts = Array.new
@t_insert_num = 3
end
def start()
load_thread()
insert_thread()
@t_inserts.each do |t|
t.join
end
@t_load.join
end
def load_thread()
@t_load = Thread.new() do
YAML.load_documents(@file) do |elem|
@table.push(elem)
end
@t_insert_num.times do
@table.push(nil)
end
end
end
def insert_thread()
@t_insert_num.times do
@t_inserts << Thread.new() do
client = Cassandra.new('Log', '127.0.0.1:9160')
loop do
elem = @table.take()
break if elem == nil
insert(elem, client)
end
end
end
end
def insert(elem, client)
begin
insert_uniqkey(elem, client)
insert_fromkey_datekey(elem, client)
insert_date_key(elem, client)
rescue => e
puts e
end
end
def insert_uniqkey(elem, client)
client.insert(:uniq, elem['uniq'], \
{'date' => elem['date'], \
'mta' => elem['mta'], \
'msg' => elem['msg'], \
'fromip' => elem['fromip'], \
'from' => elem['from'], \
'rcpts' => elem['rcpts'] })
end
def insert_fromkey_datekey(elem, client)
client.insert(:from_date, elem['from'], \
{elem['date'][0..11] => \
{elem['uniq'] => elem['uniq'] }})
end
def insert_date_key(elem, client)
client.insert(:date, "date", \
{elem['date'][0..11] => \
{elem['uniq'] => elem['uniq']}})
end
### fromアドレスと日時から検索
### 今回は横着して検索する条件を埋め込んでいる
def get_from_date()
@client.get(:from_date, "user1@example.com", \
:start => "201109200000" , :finish => "201109201500").each do |obj|
obj[1].each do |v|
result=@client.get(:uniq, v[1])
end
end
end
### 日時から検索
### 今回は横着して検索する条件を埋め込んでいる
def get_date()
@client.get(:date, 'date', \
:start => "201109200100" , :finish => "201109200150").each do |obj|
obj[1].each do |v|
tmp = @client.get(:uniq, v[1])
puts "date: #{tmp["date"][0..11]}, from: #{tmp["from"]} "
end
end
end
end
if ARGV.size != 1
puts "ARG ERROR"
exit 1;
end
file = ARGV.shift
table = Table.new
database = Database.new(table, file)
database.start()
database.get_from_date()
database.get_date()