STOMP协议详解
STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。STOMP协议由于设计简单,易于开发客户端,因此在多种语言和多种平台上得到广泛地应用。STOMP协议的前身是TTMP协议(一个简单的基于文本的协议),专为消息中间件设计。STOMP是一个非常简单和容易实现的协议,其设计灵感源自于HTTP的简单性。尽管STOMP协议在服务器端的实现可能有一定的难度,但客户端的实现却很容易。例如,可以使用Telnet登录到任何的STOMP代理,并与STOMP代理进行交互。
Stomp的实现
业界已经有很多优秀的Stomp的服务器/客户端的开源实现.
STOMP服务器
项目名 | 兼容STOMP的版本 | 描述 |
---|---|---|
Apache Apollo | 1.0 1.1 1.2 | ActiveMQ的继承者 http://activemq.apache.org/apollo |
Apache ActiveMQ | 1.0 1.1 | 流行的开源消息服务器 http://activemq.apache.org/ |
HornetQ | 1.0 | 来自JBoss的消息中间件 http://www.jboss.org/hornetq |
RabbitMQ | 1.0 1.1 1.2 | 基于Erlang、支持多种协议的消息Broker,通过插件支持STOMP协议 http://www.rabbitmq.com/plugins.html#rabbitmq-stomp |
Stampy | 1.2 | STOMP 1.2规范的一个Java实现 http://mrstampy.github.com/Stampy/ |
StompServer | 1.0 | 一个轻量级的纯Ruby实现的STOMP服务器 http://stompserver.rubyforge.org/ |
STOMP客户端
项目名 | 兼容STOMP的版本 | 描述 |
---|---|---|
activemessaging | 1.0 | Ruby客户端库 http://code.google.com/p/activemessaging/ |
onstomp | 1.0 1.1 | Ruby客户端库 https://rubygems.org/gems/onstomp |
Apache CMS | 1.0 | C++客户端库 http://activemq.apache.org/cms/ |
Net::STOMP::Client | 1.0 1.1 1.2 | Perl客户端库 http://search.cpan.org/dist/Net-STOMP-Client/ |
Gozirra | 1.0 | Java客户端库 http://www.germane-software.com/software/Java/Gozirra/ |
libstomp | 1.0 | C客户端库,基于APR库 http://stomp.codehaus.org/C |
Stampy | 1.2 | Java客户端库 http://mrstampy.github.com/Stampy/ |
stomp.js | 1.0 1.1 | JavaScript客户端库 http://jmesnil.net/stomp-websocket/doc/ |
stompest | 1.0 1.1 1.2 | Python客户端库,全功能实现,包括同步和异步 https://github.com/nikipore/stompest |
StompKit | 1.2 | Objective-C客户端库,事件驱动 https://github.com/mobile-web-messaging/StompKit/ |
stompngo | 1.0 1.1 1.2 | Go客户端库 https://github.com/gmallard/stompngo |
stomp.py | 1.0 1.1 1.2 | Python客户端库 https://github.com/jasonrbriggs/stomp.py |
tStomp | 1.1 | TCL客户端库 https://github.com/siemens/tstomp |
STOMP协议分析
STOMP协议与HTTP协议很相似,它基于TCP协议,使用了以下命令:
- CONNECT
- SEND
- SUBSCRIBE
- UNSUBSCRIBE
- BEGIN
- COMMIT
- ABORT
- ACK
- NACK
- DISCONNECT
STOMP的客户端和服务器之间的通信是通过“帧”(Frame)实现的,每个帧由多“行”(Line)组成。 第一行包含了命令,然后紧跟键值对形式的Header内容。 第二行必须是空行。 第三行开始就是Body内容,末尾都以空字符结尾。 STOMP的客户端和服务器之间的通信是通过MESSAGE帧、RECEIPT帧或ERROR帧实现的,它们的格式相似。
Stomp的python Demo
在demo中,我们选择的类库是stomp.py, 使用的版本是4.1.20,stomp.py==4.1.20
.
import time
import stomp
import json
from config import *
from engine.common import case_log
def default_call_back(data):
print("******************Receive New Data******************")
print data
print("****************************************************")
class MyListener(stomp.ConnectionListener):
def __init__(self, callback):
self.callback = callback
def on_error(self, headers, message):
print('received an error %s' % message)
def on_message(self, headers, message):
print('received a headers %s' % headers)
print('received a message %s' % message)
# key = headers.get('ack', None)
self.callback(headers, message)
class MessSendOrRecv(object):
def __init__(self, ip='127.0.0.1', port=61613, user=None, pwd=None, callback=None):
print("stomp init")
case_log.info("stomp init.[{0}]:[{1}]".format(ip,port))
self.conn = stomp.Connection12([(ip, port)])
call_back = callback if callback else default_call_back
self.conn.set_listener(name='logicServerQueue',
lstnr=MyListener(callback=call_back))
self.conn.start()
self.conn.connect(wait=True)
print("ip: {0}, port: {1} ,connected!".format(ip, port))
case_log.info("ip: {0}, port: {1} ,connected!".format(ip,port))
def send_message(self, destination="QLOUD_TEST_ENGINE",
message=b"test", headers={'send_id':'123456', 'content_type':'text/plain'}):
"""
发布信息到事业总线,发送消息到testQueue队列,指定consumerId='88.3@6006
:param destination: 接收消息的队列名称
:param message: 消息内容
:return:
"""
# print("start to send message...")
case_log.info("Start to send message......")
self.conn.send(
body=json.dumps(message),
destination=destination,
headers=headers,
)
def recv_message(self, destination="QLOUD_TEST_ENGINE", headers=None,
subscription_id=None):
"""
从testQueue队列中接收消息,用selector过滤,只接收consumerId = '88.3@6006'的消息
:param destination: 消息发送到的队列名称
:param subscription_id: 消息订阅者的ID
:return: None
"""
case_log.info("Listen Topic:"+destination)
self.conn.subscribe(destination=destination,
headers=headers,
id=subscription_id,
ack='auto')
def ack_message(self, key=None, transaction=None, receipt=None):
self.conn.ack(id=key, transaction=transaction, receipt=receipt)
case_log.info("Send ACK Message, keyID:{0}".format(key))
def disconnect(self):
time.sleep(1.5)
self.conn.disconnect()
@staticmethod
def run_forever():
print "run forever"
while True:
time.sleep(1)
if __name__ == '__main__':
stompmess = MessSendOrRecv(ip="127.0.0.1", port=30211)
headers = {'send_id':123456}
msg = {"order":"1", ""}
stompmess.recv_message(destination='TEST_ENGINE', subscription_id=9527, headers=headers)
stompmess.send_message(destination='TEST_ENGINE', message=msg, headers={'send_id':123, 'content-text':'application/json'})
while True:
time.sleep(1)
stompmess.run_forever()
# stompmess.disconnect()