マルチスレッドデザインパターンについてまとめる。
今回はActive Objectパターン。
activeとは能動的な、という意味である。今回はActive Objectパターン。
【利用用途】
本パターンは、外部から非同期にメッセージを受け取っても、
自分固有のスレッドで、自分の都合のいいタイミングで処理を実行させたい場合に利用する。
非同期にリクエストを飛ばすクライアントと、それを処理するサーバを想像してもらえると分かりやすい。
◆ クライアント側 クラス一覧
・ CalcClient
計算をサーバに依頼する。
計算は時間がかかるという前提で、Futureパターンを利用する。
・ PrintClient
出力をサーバに依頼する。
◆ サーバ側 クラス一覧
・ Proxy
クライアントからの処理を受け付ける窓口の役割を担う。
また、スケジューラへのリクエストの登録を行う。
本クラスの仕事は、メソッドの呼び出しをインスタンスに変換することである。
それぞれのリクエストが、リクエストクラスを元に作られるインスタンスで
管理されるということである。
プロキシの名前から分かるとおり、実処理は行わない。
プロキシを介するということは、インターフェース(API)を変えることなく、
実装の変更もできる。
起動(invocation)と実行(execution)の分離、というやつである。
・ Scheduler
キューへの出し入れをスケジューリングする。
プロキシからリクエストを渡され、キューに投入する。
また、キューからリクエストを取得し、リクエストの実行を依頼する。
スケジューリング処理は単純なFIFO(First In First Out)である。
・ ActivationQueue
リクエストを一定の数だけ保持する。
キューへのリクエストの出し入れはスケジューラが行う。
・ Request
クライアントから依頼されたリクエストを管理する。
ここでは実処理を行うインスタンスと、Futureパターンで利用するインスタンスを保持させる。
・ CalcRequest(Requestクラスの継承クラス)
計算処理を実担当クラスへ依頼する。
・ PrintRequest(Requestクラスの継承クラス)
出力処理を実担当クラスへ依頼する。
・ Servant
実処理を担う。
今回は、計算と出力を行うためのメソッドを保持している。
・ FutureResult
futureパターン用の引換券クラスである。
・ RealResult
futureパターン用の実データクラスである。
・ ActiveObjectFactory
サーバ側の処理に必要なインスタンスを作成する。
つまり初期化である。
【処理フロー】
◆ クライアント側 フロー
サーバのプロキシへ処理を依頼 →
クライアントは引換券を取得 →
その引換券を元に結果を取得
◆ サーバ側 フロー
プロキシがリクエストを引き受け、スケジューラへ管理を依頼 →
スケジューラはキューへリクエストを投入 →
スケジューラは必要に応じてキューからリクエストを取得 →
リクエストを管理するインスタンスが実処理担当を担うインスタンスへ処理を依頼 →
リクエストを管理するインスタンスは、結果をフューチャ用インスタンスへセット
※プログラム上は、結果は実結果クラスとして管理している
【コード】
#!/usr/bin/ruby require 'thread' Thread.abort_on_exception = "yes" ##### CLIENT ##### class CalcClient def initialize(proxy, num) @proxy = proxy @num = num end def start() future_result = @proxy.calc(@num) @real_result = future_result.getResult() puts("CalcResult:" + @real_result.getRealValue().to_s) end end class PrintStrClient def initialize(proxy, str) @proxy = proxy @str = str end def start() @proxy.printStr(@str) end end ##### SERVER ##### class Proxy def initialize(scheduler, servant) @scheduler = scheduler @servant = servant end # 実処理を行うインスタンスのメソッド名と同一である def calc(num) future = FutureResult.new() @scheduler.invoke(CalcRequest.new(@servant, future, num)) return future end # こちらは引き換え券は不要なため、スケジューラに処理を移譲するのみである def printStr(str) @scheduler.invoke(PrintRequest.new(@servant, str)) end end class Scheduler def initialize(queue) @queue = queue end def invoke(request) @queue.putRequest(request) end def start() Thread.start do # サーバとして稼動させると想定したのでループさせておけばよいとした # ただし本コードではクライアントスレッドが終了した時点で # 終了する(joinさせていないため) loop do request = @queue.takeRequest() request.execute() end end end end class ActivationQueue def initialize(max_request_size) @queue = Array.new @max_request_size = max_request_size @mutex = Mutex.new @cv = ConditionVariable.new end def putRequest(request) @mutex.synchronize do while @queue.size >= @max_request_size @cv.wait(@mutex) end @queue.push(request) @cv.broadcast end end def takeRequest() @mutex.synchronize do while @queue.size() == 0 @cv.wait(@mutex) end request = @queue.shift @cv.broadcast return request end end end class Request def initialize(servant, future) @servant = servant @future = future end end class CalcRequest < Request def initialize(servant, future, num) super(servant, future) @num = num end def execute() real_result = @servant.calc(@num) @future.setResult(real_result) end end class PrintRequest < Request def initialize(servant, str) # 結果を得る必要はないので、引換券はnilにする super(servant, nil) @str = str end def execute() @servant.printStr(@str) end end class Servant def calc(num) answer = num * 10 sleep 1 return RealResult.new(answer) end def printStr(str) puts("PrintStr Result:" + str.to_s) end end class FutureResult def initialize() @mutex = Mutex.new @cv = ConditionVariable.new @is_ready = false end def setResult(real_result) @mutex.synchronize do @real_result = real_result @is_ready = true @cv.broadcast end end def getResult() @mutex.synchronize do while !@is_ready @cv.wait(@mutex) end end return @real_result end end class RealResult def initialize(value) @value = value end def getRealValue() return @value end end class ActiveObjectFactory def createActiveObject() # リクエストをキューイングするインスタンスの作成 queue = ActivationQueue.new(10) # キューへの出し入れをスケジューリングするインスタンスの作成 scheduler = Scheduler.new(queue) # 実処理を担うインスタンスの作成 servant = Servant.new() # クライアントからの処理を受け付ける窓口の役割を担うインスタンスの作成 proxy = Proxy.new(scheduler, servant) scheduler.start() return proxy end end ################################ ### サーバ側の処理に必要なインスタンスの作成 active_object_factory = ActiveObjectFactory.new() proxy = active_object_factory.createActiveObject(); ################################ ### クライアント側の処理に必要なインスタンスの作成 threads = Array.new threads << Thread.start do calc_client = CalcClient.new(proxy, 1) calc_client.start() end threads << Thread.start do print_str_client = PrintStrClient.new(proxy, 1) print_str_client.start() end threads << Thread.start do calc_client = CalcClient.new(proxy, 2) calc_client.start() threads.each do |t| t.join end
処理結果は下のようになる。
$ ./activeobject.rb Calc Result:10 PrintStr Result:1 Calc Result:20
スレッドパターン以前の並行処理プログラミングの基礎はここを参考してほしい。