寒玉 Blog
  • Home
  • Books
  • About Me
  • Categories
  • Tags
  • Archives

使用python链接stompy


STOMP协议详解

STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。STOMP协议由于设计简单,易于开发客户端,因此在多种语言和多种平台上得到广泛地应用。STOMP协议的前身是TTMP协议(一个简单的基于文本的协议),专为消息中间件设计。STOMP是一个非常简单和容易实现的协议,其设计灵感源自于HTTP的简单性。尽管STOMP协议在服务器端的实现可能有一定的难度,但客户端的实现却很容易。例如,可以使用Telnet登录到任何的STOMP代理,并与STOMP代理进行交互。

Stomp的实现

业界已经有很多优秀的Stomp的服务器/客户端的开源实现.

STOMP服务器

项目名 兼容STOMP的版本 描述
Apache Apollo 1.0 1.1 1.2 ActiveMQ的继承者 http://activemq.apache.org/apollo
Apache ActiveMQ 1.0 1.1 流行的开源消息服务器 http://activemq.apache.org/
HornetQ 1.0 来自JBoss的消息中间件 http://www.jboss.org/hornetq
RabbitMQ 1.0 1.1 1.2 基于Erlang、支持多种协议的消息Broker,通过插件支持STOMP协议 http://www.rabbitmq.com/plugins.html#rabbitmq-stomp
Stampy 1.2 STOMP 1.2规范的一个Java实现 http://mrstampy.github.com/Stampy/
StompServer 1.0 一个轻量级的纯Ruby实现的STOMP服务器 http://stompserver.rubyforge.org/

STOMP客户端

项目名 兼容STOMP的版本 描述
activemessaging 1.0 Ruby客户端库 http://code.google.com/p/activemessaging/
onstomp 1.0 1.1 Ruby客户端库 https://rubygems.org/gems/onstomp
Apache CMS 1.0 C++客户端库 http://activemq.apache.org/cms/
Net::STOMP::Client 1.0 1.1 1.2 Perl客户端库 http://search.cpan.org/dist/Net-STOMP-Client/
Gozirra 1.0 Java客户端库 http://www.germane-software.com/software/Java/Gozirra/
libstomp 1.0 C客户端库,基于APR库 http://stomp.codehaus.org/C
Stampy 1.2 Java客户端库 http://mrstampy.github.com/Stampy/
stomp.js 1.0 1.1 JavaScript客户端库 http://jmesnil.net/stomp-websocket/doc/
stompest 1.0 1.1 1.2 Python客户端库,全功能实现,包括同步和异步 https://github.com/nikipore/stompest
StompKit 1.2 Objective-C客户端库,事件驱动 https://github.com/mobile-web-messaging/StompKit/
stompngo 1.0 1.1 1.2 Go客户端库 https://github.com/gmallard/stompngo
stomp.py 1.0 1.1 1.2 Python客户端库 https://github.com/jasonrbriggs/stomp.py
tStomp 1.1 TCL客户端库 https://github.com/siemens/tstomp

STOMP协议分析

STOMP协议与HTTP协议很相似,它基于TCP协议,使用了以下命令:

  • CONNECT
  • SEND
  • SUBSCRIBE
  • UNSUBSCRIBE
  • BEGIN
  • COMMIT
  • ABORT
  • ACK
  • NACK
  • DISCONNECT

STOMP的客户端和服务器之间的通信是通过“帧”(Frame)实现的,每个帧由多“行”(Line)组成。 第一行包含了命令,然后紧跟键值对形式的Header内容。 第二行必须是空行。 第三行开始就是Body内容,末尾都以空字符结尾。 STOMP的客户端和服务器之间的通信是通过MESSAGE帧、RECEIPT帧或ERROR帧实现的,它们的格式相似。

Stomp的python Demo

在demo中,我们选择的类库是stomp.py, 使用的版本是4.1.20,stomp.py==4.1.20.

import time
import stomp
import json
from config import *
from engine.common import case_log


def default_call_back(data):
    print("******************Receive New Data******************")
    print data
    print("****************************************************")


class MyListener(stomp.ConnectionListener):
    def __init__(self, callback):
        self.callback = callback

    def on_error(self, headers, message):
        print('received an error %s' % message)

    def on_message(self, headers, message):
        print('received a headers %s' % headers)
        print('received a message %s' % message)
        # key = headers.get('ack', None)
        self.callback(headers, message)


class MessSendOrRecv(object):
    def __init__(self, ip='127.0.0.1', port=61613, user=None, pwd=None, callback=None):
        print("stomp init")
        case_log.info("stomp init.[{0}]:[{1}]".format(ip,port))
        self.conn = stomp.Connection12([(ip, port)])
        call_back = callback if callback else default_call_back
        self.conn.set_listener(name='logicServerQueue',
                               lstnr=MyListener(callback=call_back))
        self.conn.start()
        self.conn.connect(wait=True)
        print("ip: {0}, port: {1} ,connected!".format(ip, port))
        case_log.info("ip: {0}, port: {1} ,connected!".format(ip,port))

    def send_message(self, destination="QLOUD_TEST_ENGINE",
                     message=b"test", headers={'send_id':'123456', 'content_type':'text/plain'}):
        """
        发布信息到事业总线,发送消息到testQueue队列,指定consumerId='88.3@6006
        :param destination: 接收消息的队列名称
        :param message: 消息内容
        :return: 
        """
        # print("start to send message...")
        case_log.info("Start to send message......")
        self.conn.send(
            body=json.dumps(message),
            destination=destination,
            headers=headers,
        )

    def recv_message(self, destination="QLOUD_TEST_ENGINE", headers=None,
                     subscription_id=None):
        """
        从testQueue队列中接收消息,用selector过滤,只接收consumerId = '88.3@6006'的消息
        :param destination: 消息发送到的队列名称
        :param subscription_id: 消息订阅者的ID
        :return: None
        """
        case_log.info("Listen Topic:"+destination)
        self.conn.subscribe(destination=destination,
                             headers=headers,
                            id=subscription_id,
                            ack='auto')

    def ack_message(self, key=None, transaction=None, receipt=None):
        self.conn.ack(id=key, transaction=transaction, receipt=receipt)
        case_log.info("Send ACK Message, keyID:{0}".format(key))

    def disconnect(self):
        time.sleep(1.5)
        self.conn.disconnect()

    @staticmethod
    def run_forever():
        print "run forever"
        while True:
            time.sleep(1)


if __name__ == '__main__':
    stompmess = MessSendOrRecv(ip="127.0.0.1", port=30211)
    headers = {'send_id':123456}
    msg = {"order":"1", ""}
    stompmess.recv_message(destination='TEST_ENGINE', subscription_id=9527, headers=headers)
    stompmess.send_message(destination='TEST_ENGINE', message=msg, headers={'send_id':123, 'content-text':'application/json'})
    while True:
        time.sleep(1)

    stompmess.run_forever()
    # stompmess.disconnect()

参考

  • https://blog.csdn.net/jhfyuf/article/details/86800382

  • « Java线程池原理及简单介绍
  • Redash环境搭建及二次开发 »

Published

5 28, 2019

Category

Python

Tags

  • Python 4
  • Stompy 1
  • Powered by Pelican. Theme: Elegant by Talha Mansoor