介绍
Siddhi CEP 是一个开源的,轻量级,易使用的Complex Event Processing Engine(ecp).具有自己的DSL.丰富的模式匹配功能和可扩展性. 官网地址https://siddhi.io/
架构
名称 | 描述 |
---|---|
Stream | A logical series of events ordered in time with a uniquely identifiable name, and a defined set of typed attributes defining its schema. |
Event | An event is a single event object associated with a stream. All events of a stream contains a timestamp and an identical set of typed attributes based on the schema of the stream they belong to. |
Table | A structured representation of data stored with a defined schema. Stored data can be backed by In-Memory, or external data stores such as RDBMS, MongoDB, etc. The tables can be accessed and manipulated at runtime. |
Named-Window | A structured representation of data stored with a defined schema and eviction policy. Window data is stored In-Memory and automatically cleared by the named-window constrain. Other siddhi elements can only query the values in windows at runtime but they cannot modify them. |
Named-Aggregation | A structured representation of data that's incrementally aggregated and stored with a defined schema and aggregation granularity such as seconds, minutes, hours, etc. Aggregation data is stored both In-Memory and in external data stores such as RDBMS. Other siddhi elements can only query the values in windows at runtime but they cannot modify them. |
Query | A logical construct that processes events in streaming manner by by consuming data from one or more streams, tables, windows and aggregations, and publishes output events into a stream, table or a window. |
Source | A construct that consumes data from external sources (such as TCP, Kafka, HTTP, etc) with various event formats such as XML, JSON, binary, etc, convert then to Siddhi events, and passes into streams for processing. |
Sink | A construct that consumes events arriving at a stream, maps them to a predefined data format (such as XML, JSON, binary, etc), and publishes them to external endpoints (such as E-mail, TCP, Kafka, HTTP, etc). |
Input Handler | A mechanism to programmatically inject events into streams. |
Stream/Query Callback | A mechanism to programmatically consume output events from streams or queries. |
Partition | A logical container that isolates the processing of queries based on the partition keys derived from the events. |
Inner Stream | A positionable stream that connects portioned queries with each other within the partition. |
组件
siddhi-running
siddhi的主应用程序,负责规则的调度,运行
siddhi-tooling
siddhi的规则编辑器,完成规则的编辑,测试和发布
部署
部署前提条件
- Memory - 128 MB (最小), 500 MB (建议), higher memory might be needed based on in-memory data stored for processing
- Cores - 2 cores (建议), use lower number of cores after testing Siddhi Apps for performance
- JDK - 8 or 11
- To build Siddhi from the Source distribution, it is necessary that you have JDK version 8 or 11 and Maven 3.0.4 or later
部署Siddhi-runner
wget https://github.com/siddhi-io/distribution/releases/download/v5.1.0/siddhi-runner-5.1.0.zip
mv siddhi-runner-5.1.0.zip /opt/siddhi
unzip siddhi-runner-5.1.0.zip
./siddhi-runner-5.1.0/bin/runner.sh //启动runner
部署Siddhi-tooling
wget https://github.com/siddhi-io/distribution/releases/download/v5.1.0/siddhi-tooling-5.1.0.zip
mv siddhi-tooling-5.1.0.zip /opt/siddhi
unzip siddhi-tooling-5.1.0.zip
./siddhi-tooling-5.1.0/bin/tooling.s //启动tooling
tooling启动之后,访问地址htt://localhost:9390/editor#
主页如下:
流程可视化界面
扩展
siddhi有许多扩展可以使用,用于扩展它的消息处理形式和扩展.扩展列表https://siddhi.io/en/v5.1/docs/extensions/.
siddhi-io-kafka
siddhi-io-kafka可以帮助siddhi从kafka收取消息,并将处理的结果发送回kafka topic. 地址.
安装siddhi-io-kafka
该扩展的安装需要,需要一些kafka的jar包.
kafka_2.11-*.jar
kafka-clients-*.jar
metrics-core-*.jar
scala-library-2.11.*.jar
scala-parser-combinators_2.11.*.jar (if exists)
zkclient-*.jar
zookeeper-*.jar
siddhi的扩展jar包会放到根目录下的jars和bundles目录下.
转化jar文件为osgi的bundle jar .操作过程如下
创建目录source,放置kafka的相关jar文件到该目录下.创建一个destination目录.运行命令:
{WSO2SPHome}/bin/jartobundle.sh source destination
添加destination目录下的jar文件到siddhi的bundles目录下,添加source目录下的jar文件到siddhi的jar文件夹下,重启siddhi,完成组件安装
备注 因为siddhi中本身是安装有siddhi-io-kafka插件的,我们所做的工作其实就是添加kafka的链接类库
@App:name('HelloKafkaApp')
@App:description('Description of the plan')
-- define stream siddhiStream1 (symbol string, price double, volume long);
-- @sink(type='kafka', bootstrap.servers='localhost:6667',topic='siddhi_sink_kafka',is.binary.message='True' )
-- @sink(type='kafka' , bootstrap.servers='localhost:6667',topic='siddhi_sink_kafka',is.binary.message='True')
-- define stream outputStream (symbol string, price double, volume long);
-- @sink(type='log', prefix='LOGGER')
-- @sink(type='kafka' , bootstrap.servers='localhost:6667',topic='siddhi_sink_kafka',is.binary.message='True', @map(type='json') )
-- @sink(type='log', prefix='LOGGER')
@sink(type = 'kafka', bootstrap.servers = "localhost:6667", topic = "siddhi_sink_kafka", is.binary.message = "True", @map(type = 'json'))
define stream OutputStream1 (price double);
-- Please refer to https://siddhi.io/en/v5.1/docs/quick-start/ on getting started with Siddhi editor.
-- , enclosing.element='$', @attributes(price="price")
@source(type = 'kafka', bootstrap.servers = "localhost:6667", topic.list = "siddhi_kafka", group.id = "siddhi_group", threading.option = "single.thread", @map(type = 'json'))
define stream siddhiStream1 (price double);
-- -- define stream OutputStream(symbol string, price double, volume long);
@info(name = 'query1')
from siddhiStream1[price > 20]
select price
insert into OutputStream1;
siddhi-store-rdbms
siddhi-store-rdbms插件可以帮助siddhi将事件持久化并从数据库中(如mysql,ms-sql,postgresql,h2和oracle中检索事件).地址
安装siddhi-store-rdbms
改组建的安装,只需要copy相关的jdbc connector jar到siddhi中即可.
@App:name("SquidAlertRule")
@App:description("Description of the plan")
-- Please refer to https://siddhi.io/en/v5.1/docs/quick-start/ on getting started with Siddhi editor.
@source(type = 'kafka', bootstrap.servers = "localhost:6667", topic.list = "siddhi_squid_kafka", group.id = "siddhi_group", threading.option = "single.thread", @map(type = 'json'))
define stream SquidDataStream (domain string);
-- @store(type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/hotel",
-- username="siddhi", password="123",
-- jdbc.driver.name="com.mysql.jdbc.Driver",
-- @cache(size="1000", retention.period="5 min", purge.interval="1 min"))
-- define table BlackDomainTable (username string, salary double);
-- @store(type='rdbms' , jdbc.url='jdbc:mariadb://localhost:30939/siddhi',username='root',password='******',jdbc.driver.name='org.mariadb.jdbc.Driver',table.name='blackDomain')
@store(type='rdbms' , jdbc.url='jdbc:mysql://localhost:3306/siddhi',username='root',password='******',jdbc.driver.name='com.mysql.cj.jdbc.Driver',table.name='blackDomain')
define table blackDomain (domain string);
@sink(type = 'kafka', bootstrap.servers = "localhost:6667", topic = "siddhi_squid_sink_kafka", is.binary.message = "True", @map(type = 'json'))
define stream SquidSinkDataStream (domain string, is_alert bool);
@info(name='QueryBlack')
from SquidDataStream[blackDomain.domain == domain in blackDomain]
select domain,True as is_alert
insert into SquidSinkDataStream;
@info(name='QueryNotBlack')
from SquidDataStream[ not (blackDomain.domain == domain in blackDomain)]
select domain, False as is_alert
insert into SquidSinkDataStream;
Rule Editor
Rule Editor是一个可视化的规则编辑器,该编辑器可以实现规则的编辑,管理,消息模拟和应用部署.
界面如下:
Rule 部署
在完成规则书写完之后,我们可以直接部署.
- 点击菜单上的deploy
- 添加服务器,默认用户名可以不填
- 选择要部署的siddhi脚本和要部署到的服务器,点击deploy进行部署