2010年11月23日火曜日

マルチスレッド デザインパターン(Active Object パターン) Ruby編

マルチスレッドデザインパターンについてまとめる。
今回はActive Objectパターン。


【利用用途】
activeとは能動的な、という意味である。
本パターンは、外部から非同期にメッセージを受け取っても、
自分固有のスレッドで、自分の都合のいいタイミングで処理を実行させたい場合に利用する。
非同期にリクエストを飛ばすクライアントと、それを処理するサーバを想像してもらえると分かりやすい。



【クラス役割】
クライアント側 クラス一覧
CalcClient
 計算をサーバに依頼する。
 計算は時間がかかるという前提で、Futureパターンを利用する。

PrintClient
 出力をサーバに依頼する。


◆ サーバ側 クラス一覧
Proxy
 クライアントからの処理を受け付ける窓口の役割を担う。
 また、スケジューラへのリクエストの登録を行う。
 本クラスの仕事は、メソッドの呼び出しをインスタンスに変換することである。
 それぞれのリクエストが、リクエストクラスを元に作られるインスタンスで
 管理されるということである。
 プロキシの名前から分かるとおり、実処理は行わない。
 プロキシを介するということは、インターフェース(API)を変えることなく、
 実装の変更もできる。
 起動(invocation)と実行(execution)の分離、というやつである。

Scheduler
 キューへの出し入れをスケジューリングする。
 プロキシからリクエストを渡され、キューに投入する。
 また、キューからリクエストを取得し、リクエストの実行を依頼する。
 スケジューリング処理は単純なFIFO(First In First Out)である。

ActivationQueue
 リクエストを一定の数だけ保持する。
 キューへのリクエストの出し入れはスケジューラが行う。

Request
 クライアントから依頼されたリクエストを管理する。
 ここでは実処理を行うインスタンスと、Futureパターンで利用するインスタンスを保持させる。

CalcRequest(Requestクラスの継承クラス)
 計算処理を実担当クラスへ依頼する。

PrintRequest(Requestクラスの継承クラス)
 出力処理を実担当クラスへ依頼する。

Servant
 実処理を担う。
 今回は、計算と出力を行うためのメソッドを保持している。

FutureResult
 futureパターン用の引換券クラスである。

RealResult
 futureパターン用の実データクラスである。

ActiveObjectFactory
 サーバ側の処理に必要なインスタンスを作成する。
 つまり初期化である。



【処理フロー】
◆ クライアント側 フロー
 サーバのプロキシへ処理を依頼 →
 クライアントは引換券を取得 
 その引換券を元に結果を取得

◆ サーバ側 フロー
 プロキシがリクエストを引き受け、スケジューラへ管理を依頼 
 スケジューラはキューへリクエストを投入 
 スケジューラは必要に応じてキューからリクエストを取得 
 リクエストを管理するインスタンスが実処理担当を担うインスタンスへ処理を依頼 
 リクエストを管理するインスタンスは、結果をフューチャ用インスタンスへセット
 ※プログラム上は、結果は実結果クラスとして管理している




【コード】
#!/usr/bin/ruby

require 'thread'

Thread.abort_on_exception = "yes"

##### CLIENT #####
class CalcClient
  def initialize(proxy, num)
    @proxy = proxy
    @num = num
  end

  def start()
    future_result = @proxy.calc(@num)
    @real_result = future_result.getResult()
    puts("CalcResult:" + @real_result.getRealValue().to_s)
  end
end


class PrintStrClient
  def initialize(proxy, str)
    @proxy = proxy
    @str = str
  end

  def start()
    @proxy.printStr(@str)
  end
end


##### SERVER #####
class Proxy
  def initialize(scheduler, servant)
    @scheduler = scheduler
    @servant = servant
  end

  # 実処理を行うインスタンスのメソッド名と同一である
  def calc(num)
    future = FutureResult.new()
    @scheduler.invoke(CalcRequest.new(@servant, future, num))
    return future
  end

  # こちらは引き換え券は不要なため、スケジューラに処理を移譲するのみである
  def printStr(str)
    @scheduler.invoke(PrintRequest.new(@servant, str))
  end
end


class Scheduler
  def initialize(queue)
    @queue = queue
  end

  def invoke(request)
    @queue.putRequest(request)
  end

  def start()
    Thread.start do
      # サーバとして稼動させると想定したのでループさせておけばよいとした
      # ただし本コードではクライアントスレッドが終了した時点で
      # 終了する(joinさせていないため)
      loop do
        request = @queue.takeRequest()
        request.execute()
      end
    end
  end
end


class ActivationQueue
  def initialize(max_request_size)
    @queue = Array.new
    @max_request_size = max_request_size
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def putRequest(request)
    @mutex.synchronize do
      while  @queue.size >= @max_request_size
        @cv.wait(@mutex)
      end
    
      @queue.push(request)
      @cv.broadcast
    end
   end

  def takeRequest()
    @mutex.synchronize do
      while @queue.size() == 0
        @cv.wait(@mutex)
      end
      request = @queue.shift
      @cv.broadcast
      return request
    end
  end
end


class Request
  def initialize(servant, future)
    @servant = servant
    @future = future
  end
end


class CalcRequest < Request
  def initialize(servant, future, num)
    super(servant, future)
    @num = num
  end

  def execute()
    real_result = @servant.calc(@num)
    @future.setResult(real_result)
  end
end


class PrintRequest < Request
  def initialize(servant, str)
    # 結果を得る必要はないので、引換券はnilにする
    super(servant, nil)
    @str = str
  end

  def execute()
    @servant.printStr(@str)
  end
end


class Servant
  def calc(num)
    answer = num * 10
    sleep 1
    return RealResult.new(answer)
  end

  def printStr(str)
    puts("PrintStr Result:" + str.to_s)
  end
end


class FutureResult
  def initialize()
    @mutex = Mutex.new
    @cv = ConditionVariable.new
    @is_ready = false
  end

  def setResult(real_result)
    @mutex.synchronize do
      @real_result = real_result
      @is_ready = true
      @cv.broadcast
    end
  end

  def getResult()
    @mutex.synchronize do
      while !@is_ready
        @cv.wait(@mutex)
      end
    end

    return @real_result
  end
end


class RealResult
  def initialize(value)
    @value = value
  end

  def getRealValue()
    return @value
  end
end


class ActiveObjectFactory
  def createActiveObject()
    # リクエストをキューイングするインスタンスの作成
    queue = ActivationQueue.new(10)

    # キューへの出し入れをスケジューリングするインスタンスの作成
    scheduler = Scheduler.new(queue)

    # 実処理を担うインスタンスの作成
    servant = Servant.new()

    # クライアントからの処理を受け付ける窓口の役割を担うインスタンスの作成
    proxy = Proxy.new(scheduler, servant)
    scheduler.start()
    return proxy
  end
end


################################
### サーバ側の処理に必要なインスタンスの作成
active_object_factory = ActiveObjectFactory.new()
proxy = active_object_factory.createActiveObject();


################################
### クライアント側の処理に必要なインスタンスの作成
threads = Array.new

threads << Thread.start do
  calc_client = CalcClient.new(proxy, 1)
  calc_client.start()
end

threads << Thread.start do
  print_str_client = PrintStrClient.new(proxy, 1)
  print_str_client.start()
end
 
threads << Thread.start do
  calc_client = CalcClient.new(proxy, 2)
  calc_client.start()

threads.each do |t|
  t.join
end

処理結果は下のようになる。
$ ./activeobject.rb
Calc     Result:10
PrintStr Result:1
Calc     Result:20

スレッドパターン以前の並行処理プログラミングの基礎はここを参考してほしい。