青柳です。
メッセージキューを使おうかと思って調べていたら面白そうなキューのライブラリがあったので試してみました。
○メッセージキューとは
メッセージキューメッセージキュー(英: Message queue)は、プロセス間通信や同一プロセス内のスレッド間通信に使われるソフトウェアコンポーネントである。制御やデータを伝達するメッセージのキューである。
[Wikipeida]
MQ【メッセージキューイング】
アプリケーションソフト間でデータを交換して連携動作させる際に、送信するデータをいったん保管しておき、相手の処理の完了を待つことなく次の処理を行う方式。
[e-Word]
メッセージ指向ミドルウェア (MOM)
イラスト参考
[Oracle]
MOMとは
MOMとは、異なるプラットフォーム間でアプリケーション同士が双方向に情報をやり取りするためのソフトウェアのことである。
[Binary]
MOM(メッセージ指向ミドルウェア)の存在意義って何?
日本ではMOMがあまり活用されていないようですが、MOMを使う理由はどこにあって、どうしてあまりはやらないのでしょうか。
[togetter]
○ØMQとは
本家
The Intelligent Transport Layer
[ØMQ]
ØMQ (ZeroMQ) 序論
異なったソケットタイプ,接続処理,フレーミング,さらにはルーティングといった低レベルな詳細事項を,いくらかでも抽象化できたら素晴らしいとは思わないでしょうか? ZeroMQ (ØMQ/ZMQ) ネットワークライブラリは,まさにそのためのものです。"このライブラリはメッセージ全体をインプロセスや IPC,TCP,マルチキャストなど,さまざまなトランスポートを越えて送信できるソケットを提供します。ファンアウト,PubSub,タスク分散,要求/応答などのパターンによる N 対 N の接続が,ソケットを使って可能になるのです。"
[InfoQ]
ØMQ(zeromq)について調査する。
N-N通信を実現する、socket API風軽量メッセージングライブラリ。
自動的な再接続や、メッセージのキューイングを行ってくれる。
複数のメッセージングパターンと呼ばれるものを組み合わせることによって、柔軟なメッセージ配信を行うことができる。
[グニャラくんのwktk運営日記]
ActiveMQ or RabbitMQ or ZeroMQ or ...
どうやら ZeroMQ はシンプルで高速、RabbitMQ は割と高速でスケーラビリティが高い、ActiveMQ は遅いけど機能豊富といった感じらしい。ま、ActiveMQ は JMS 実装だしね……ESB 向きなんだろうなぁ。
[wivlog]
システム間連携 その4:ZeroMQ
ZeroMQを用いる事により、Berkeley socketsと同様のコーディングでありながら、Berkeley socketsで提供されずユーザーが実装しなければならなかった障害対応等の実アプリケーションで必須な機能が使用でき、簡単にシステム間連携が実現できます。
[TeckSketch]
○Rubyでの実装
■request/response
response.rb
#coding: utf-8request.rb
require 'ffi-rzmq'
context = ZMQ::Context.new(1)
# Socket to talk to server
socket = context.socket(ZMQ::REP)
socket.bind("tcp://127.0.0.1:5555")
while true
msg = ''
res = socket.recv_string(msg)
puts "recive message " + msg
socket.send_string(msg + " World")
end
#coding: utf-8■publish/subscribe
require 'rubygems'
require 'ffi-rzmq'
context = ZMQ::Context.new(1)
# Socket to talk to server
puts "Connecting to hello world server…"
requester = context.socket(ZMQ::REQ)
requester.connect("tcp://127.0.0.1:5555")
0.upto(9) do |request_nbr|
puts "Sending request #{request_nbr}…"
requester.send_string ARGV[0]
reply = ''
rc = requester.recv_string(reply)
puts "Received reply #{request_nbr}: [#{reply}]"
end
publish.rb
#coding: utf-8susbcribe.rb
require 'ffi-rzmq'
context = ZMQ::Context.new(1)
pub = context.socket(ZMQ::PUB)
pub.bind("tcp://127.0.0.1:5555")
0.upto(9) do |i|
puts "Sending #{i}…"
pub.send_string i.to_s
sleep(1)
end
pub.close
context.terminate
#coding: utf-8■push/pull
require 'ffi-rzmq'
context = ZMQ::Context.new(1)
sub = context.socket(ZMQ::SUB)
sub.setsockopt(ZMQ::SUBSCRIBE, '')
sub.connect("tcp://127.0.0.1:5555")
while true
msg = ''
res = sub.recv_string(msg)
puts ARGV[0] + " " + msg
end
push.rb
#coding: utf-8pull.rb
require 'ffi-rzmq'
context = ZMQ::Context.new(1)
push = context.socket(ZMQ::PUSH)
push.bind("tcp://127.0.0.1:5555")
0.upto(9) do |i|
puts "Sending #{i}…"
push.send_string i.to_s
sleep(1)
end
push.close
context.terminate
#coding: utf-8■pipeline
require 'ffi-rzmq'
context = ZMQ::Context.new(1)
pull = context.socket(ZMQ::PULL)
pull.connect("tcp://127.0.0.1:5555")
while true
msg = ''
res = pull.recv_string(msg)
puts ARGV[0] + " " + msg
end
task.rb
#coding: utf-8worker.rb
require 'ffi-rzmq'
context = ZMQ::Context.new(1)
push = context.socket(ZMQ::PUSH)
push.bind("tcp://127.0.0.1:5555")
pull = context.socket(ZMQ::PULL)
pull.bind("tcp://127.0.0.1:5556")
th_push = Thread.new do
0.upto(9) do |i|
puts "Sending #{i}…"
push.send_string i.to_s
sleep(1)
end
push.close
end
th_pull = Thread.new do
while true
msg = ''
res = pull.recv_string(msg)
puts "Recv " + msg
end
end
th_push.join
th_pull.join
context.terminate
#coding: utf-8■まとめ
require 'ffi-rzmq'
context = ZMQ::Context.new(1)
pull = context.socket(ZMQ::PULL)
pull.connect("tcp://127.0.0.1:5555")
push = context.socket(ZMQ::PUSH)
push.connect("tcp://127.0.0.1:5556")
while true
msg = ''
res = pull.recv_string(msg)
back = ARGV[0] + " " + msg
puts back
push.send_string(back)
end
非同期の仕組みが驚くほど簡単に実装できます。
大量のクロールを走らせるような仕組みを構築するのに使えそうです。
ただし、キューはメモリ上で行われるためメッセージが確実に処理されたかどうかの保証がありません。
クロールを作ろうとすると実際にクロールされたかどうか、現在クロール中なのかどうかをDBなどで管理する必要がありそうです。
0 件のコメント:
コメントを投稿