2011年9月28日水曜日

Cassandraを使った高速ログ検索システムの構築


◆ 目的
サーバに届いたメールログをNoSQLであるCassandra(カサンドラ)のデータベースに格納させ、
高速に検索できるようにする。事前にCassandraの予備知識があると理解しやすいだろう。

SQLではないので、テーブルに格納されたデータを柔軟にselectすることはできない。
NoSQLはkey-value形式である。
そのため設計時に何から何を求めるか決めておく必要がある。
今回は次の目的を達成できるようにする。

a) fromアドレスと日時からのメールログの検索
⇒ fromアドレスと日時をキーにするわけである

b) 日時からのメールログの検索
⇒ 日時をキーにするわけである

追記
ここではcassandra-0.6を使っているのだが、既に0.8までアップデートされているようだ。
しかも大幅にいろいろと変更があるため、具体的な手順は参考にならないかもしれない。
ここではThrift APIを使っているのだが、SQL的なコマンドが使えるCQL(Cassandra Query Language)も
充実してきており、データベースの操作方法をも変わってくるだろう。
ただ、NoSQLの考え方さえ身につけば今後どのようなアップデートがあろうとも、
応用はできるはずだ。とうことで、このまま説明は続ける。



◆ データベースへ投入する前のログファイル
ログファイルは通常一行で並んでいるだろうか。
それをYAML形式にして、Cassandraに渡すようにする。
YAML形式にするための処理は力技で対応できるだろう。

ログファイル('---'はYAMLの区切り文字列である)

---
date     : '20110920013059'
mta      : mta101
msg      : Message Received
fromip   : 192.0.2.80
from     : user1@example.com
rcpts    : user5@example.com
uniq     : 7EF042644D002253005E1653
---
date     : '20110920013100'
mta      : mta101
msg      : Message Received
fromip   : 192.0.2.200
from     : user2@example.com
rcpts    : user9@example.com
uniq     : 7EF042644D002253005E1651
---
date     : '20110920015100'
mta      : mta102
msg      : User Unknown
fromip   : 192.0.2.3
from     : user8@example.com
rcpts    : user3@example.com
uniq     : 7EF04264-4D002253-005E1650
---

ログの説明は下のとおりである。
date ⇒ 日時
mta ⇒ トラフィックを処理したサーバ名
msg ⇒ トラフィックを処理した際のログ内容
fromip ⇒ メールの送信元IPアドレス
from ⇒ メールの送信元メールアドレス
rcpts ⇒ メールの送信先メールアドレス
uniq ⇒ メールシステムでは各ログを一意に特定するためのキーが付与されるだろう。
            それを用いる。メッセージIDは使えないのか、と疑問を持つかもしれないが、
            メッセージIDは決して一意ではないので使えない。



◆ データベース データモデル
カラムファミリ(SQLでいうところのテーブル)は3つ用意する。
・from_date 
・date 
・uniq 

検索条件をキーにする。
そしてそのキーから得られる値を、さらにキーにして検索しなおすところがポイントである。
例えば、送信元アドレスと、日時をキーにして検索すると、
その条件にマッチする一意なユニークな値が得られる()。
そのユニークな値をキーにしたカラムファミリを用意しておけば、
そこからdate、mta、msg、msgid、fromip、from、rcptsなどのロウ(row)が検索できる()。
あるキーに紐づいて取得できる(スーパ)カラムの集合がロウである。

日時をキーにして検索すると、
その条件にマッチする一意なユニークな値が得られる()。
先と同じく、そのユニークな値をキーにしたカラムファミリから、
date、mta、msg、msgid、fromip、from、rcptsなどのロウ求めることができる()。



(SuperColumn)
(Key)          (Key)          (Column)
                              (Name)         (Value)            (TimeStamp)
elem['from'] - elem['date'] - elem['uniq'] - elem['uniq']     - 自動で付与 

"date"       - elem['date'] - elem['uniq'] - elem['uniq']     - 自動で付与 

               elem['uniq'] - "date"       - elem['date']     - 自動で付与 
                            - "mta"        - elem['mta']      - 自動で付与
                            - "msg"        - elem['msg']      - 自動で付与
                            - "msgid"      - elem['msgid']    - 自動で付与
                            - "fromip"     - elem['fromip']   - 自動で付与
                            - "from"       - elem['from']     - 自動で付与
                            - "rcpts"      - elem['rcpts']    - 自動で付与


※elem[XXX]はYAML形式にしたログの各要素を表すとする。



注意
2つ目のdateカラムファミリにスーパーカラムは必要なのか?
不要だが、いるのだ。
日時は範囲検索(いつからいつまで)させたいのだが、
その検索条件をさせるために、今回利用するThrift APIが
キーのキーの範囲しか指定できなかったためであるorz



◆ コンフィグ
/usr/local/src/apache-cassandra-0.6.6/conf/storage-conf.xml

<Keyspaces>
    <Keyspace Name="Log">
      <ColumnFamily Name="from_date"
                    ColumnType="Super"
                    CompareWith="BytesType"
                    CompareSubcolumnsWith="BytesType" />

      <ColumnFamily Name="date"
                    ColumnType="Super"
                    CompareWith="BytesType"
                    CompareSubcolumnsWith="BytesType" />

      <ColumnFamily Name="uniq"
                    CompareWith="UTF8Type"
                    KeysCached="1000"
                    RowsCached="100"
                    RowCacheSavePeriodInSeconds="0"
                    KeyCacheSavePeriodInSeconds="3600"/>

    </Keyspace>
</keyspaces>




◆ コード概要
ログの読み込みとデータベースへの書き込みを同時にしなくてはならない。
スレッドを使って役割を分けてやり、処理の待ち合わせを行いながら実行させていければいいだろう。

マルチスレッド デザインパターンである、Producer-Consumer パターンを使えばいいだろうか。
ファイルの読み込みとデータベースへの書き込みでは処理スピードが違うため
(前者の方が断然早い)スループットが落ちる。
そこで中継地点としてTableクラスをおき、そこにデータを保持させ、
処理スピードの違いを吸収させる。

あとはThrift APIを使って処理していけばいい。
今回はRubyを使う。Rubyを使ったCassandra操作はここを参照されたい。




◆ コード
#!/usr/bin/ruby

require 'rubygems'
require 'cassandra'
require 'yaml'

class Table

  def initialize()
    @queue_num = 0

    @queues = Array.new
    @queue_max = 5

    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end


  def push(num)
    @mutex.synchronize do
      while @queues.size() >= @queue_max
        @cv.wait(@mutex)
      end
      @queues.push(num)
      @cv.broadcast
    end
  end


  def take()
    @mutex.synchronize do
      while @queues.size() == 0
        @cv.wait(@mutex)
      end
      queue = @queues.shift()
      @cv.broadcast
      return queue
    end
  end

end



class Database

  def initialize(table, file)
    @table = table
    @client = Cassandra.new('Log', '127.0.0.1:9160')
    @file  = open(file)

    @t_inserts = Array.new
    @t_insert_num = 3
  end


  def start()
    load_thread()
    insert_thread()
    @t_inserts.each do |t|
      t.join
    end
    @t_load.join
  end


  def load_thread()
    @t_load = Thread.new() do
      YAML.load_documents(@file) do |elem|
        @table.push(elem)
      end

      @t_insert_num.times do
        @table.push(nil)
      end
    end
  end


  def insert_thread()
    @t_insert_num.times do
      @t_inserts << Thread.new() do
        client = Cassandra.new('Log', '127.0.0.1:9160')
        loop do
          elem = @table.take()
          break if elem == nil
          insert(elem, client)
        end
      end
    end
  end


  def insert(elem, client)
    begin
      insert_uniqkey(elem, client)
      insert_fromkey_datekey(elem, client)
      insert_date_key(elem, client)
    rescue => e
      puts e
    end
  end


  def insert_uniqkey(elem, client)
    client.insert(:uniq, elem['uniq'], \
      {'date'     => elem['date'], \
      'mta'      => elem['mta'], \
      'msg'      => elem['msg'], \
      'fromip'   => elem['fromip'], \
      'from'     => elem['from'], \
      'rcpts'    => elem['rcpts'] })
  end


  def insert_fromkey_datekey(elem, client)
    client.insert(:from_date, elem['from'], \
                  {elem['date'][0..11] => \
                  {elem['uniq'] => elem['uniq'] }})
  end


  def insert_date_key(elem, client)
    client.insert(:date, "date", \
                  {elem['date'][0..11] => \
                  {elem['uniq'] => elem['uniq']}})
  end


  ### fromアドレスと日時から検索
  ### 今回は横着して検索する条件を埋め込んでいる
  def get_from_date()
    @client.get(:from_date, "user1@example.com", \
           :start => "201109200000" , :finish => "201109201500").each do |obj|
      obj[1].each do |v|
        result=@client.get(:uniq, v[1])
      end
    end
  end


  ### 日時から検索
  ### 今回は横着して検索する条件を埋め込んでいる
  def get_date()
    @client.get(:date, 'date', \
          :start => "201109200100" , :finish => "201109200150").each do |obj|
      obj[1].each do |v|
        tmp = @client.get(:uniq, v[1])
          puts "date: #{tmp["date"][0..11]}, from: #{tmp["from"]} "
      end
    end
  end

end


if  ARGV.size != 1
  puts "ARG ERROR"
  exit 1;
end


file = ARGV.shift

table = Table.new
database = Database.new(table, file)

database.start()

database.get_from_date()
database.get_date()