流式计算
更新时间:2018-10-25 09:16:45
概述
流式计算介绍
目前对信息高时效性、可操作性的需求不断增长,这要求软件系统在更少的时间内能处理更多的数据。传统的大数据处理模型将在线事务处理和离线分析从时序上将两者完全分割开来,但显然该架构目前已经越来越落后于人们对于大数据实时处理的需求。
流计算的产生即来源于对于上述数据加工时效性的严苛需求: 数据的业务价值随着时间的流失而迅速降低,因此在数据发生后必须尽快对其进行计算和处理。而传统的大数据处理模式对于数据加工均遵循传统日清日毕模式,即以小时甚至以天为计算周期对当前数据进行累计并处理,显然这类处理方式无法满足数据实时计算的需求。在诸如实时大数据分析、风控预警、实时预测、金融交易等诸多业务场景领域,批量(或者说离线)处理对于上述对于数据处理时延要求苛刻的应用领域而言是完全无法胜任其业务需求的。而流计算作为一类针对流数据的实时计算模型,可有效地缩短全链路数据流时延、实时化计算逻辑、平摊计算成本,最终有效满足实时处理大数据的业务需求。
AliCloud StreamCompute(阿里云流计算)是运行在阿里云平台上的流式大数据分析平台,提供给用户在云上进行流式数据实时化分析工具。使用阿里云StreamSQL,用户可以轻松搭建自己的流式数据分析和计算服务,彻底规避掉底层流式处理逻辑的繁杂重复开发工作。利用阿里云流计算提供的全链路流式数据开发套件,用户可以享受到从数据集成、数据加工、数据运维全流程一站式解决方案,最大化实时化自身业务。
边缘流式计算
边缘流式计算是对阿里云流计算的扩展,解决物联网场景特有问题:
1)物联网数据采集频率高,数据量大,数据变化小,原始数据价值较低。数据全部上云分析的性价比低。
2)边缘和云平台的连接不稳定,数据上云无法满足实时计算的要求。
边缘流式计算引擎主要特点:
运行在边缘端,不依赖网络,低时延。
将数据采集、清洗、加工、聚合之后再上云,大大减少数据传输成本。
提供和阿里云流式计算完全相同的SQL语法。用户开发一次SQL,既可以在云上执行、也可以在边缘执行。
阿里云流计算已经得到广泛应用和开源社区支持,文档资料和使用案例丰富,学习成本低。提供了内建的字符串处理、时间、统计等各类计算函数。
支持处理乱序数据。
为IoT边缘计算深度定制,通过流计算与边缘端的消息路由的整合,打通了流计算与设备、函数计算、规则计算等边缘计算框架。
提供了流式计算开发的图形化控制台。
第一个流计算任务
1. 前提条件
登录本地控制台(具体登录方式详见本地控制台文档)
网关管理页网关在本地上线成功,可在运行状态tab看到实时上报的cpu_usage数据
- 进入计算管理-流式计算页面。
2. 新建任务
注意任务名称不可重复,字数限制在20以内。
输入任务名称后点击确定,即可在任务列表中看到该任务。
3. 编辑任务
左边可编辑SQL页面,右边输出日志。
点击保存按钮会将编辑框的内容保存,并进行语法校验,校验结果会显示在日志输出框。
点击启动按钮会运行当前任务。
点击退出按钮,退出当前界面,退出前会询问是否需要保存当前编辑的内容。
-- CPU高负载报警
create table property (
propertyName varchar,
propertyValue varchar,
productKey varchar,
deviceName varchar,
gmtCreate varchar,
ts varchar
) with (
type = 'custom',
tablefactoryclass = 'com.alibaba.blink.streaming.connector.edgehub.EdgeHubTableFactory',
jsonParser = 'device_property'
);
create table print_file (
productKey varchar,
deviceName varchar,
cpu double,
ts timestamp,
gmtCreate timestamp
) with (
type = 'custom',
class = 'com.alibaba.blink.connector.file.FileSink',
filePath = '/linkedge/run/debug/high_cpu_alter.txt'
);
insert into
print_file
select
productKey,
deviceName,
cast (propertyValue as double),
to_timestamp (cast (ts as bigint)),
to_timestamp (cast (gmtCreate as bigint))
from
property
where
propertyName = 'cpu_usage'
and cast (propertyValue as double) > 10;
4. 运行
点击启动按钮,大概五秒可以看到任务状态变为运行中。若运行失败,可以点击右边日志按钮查看失败原因。
5. 配置消息路由
任务运行成功后,会等待输入输入,并进行实时计算。
每个流计算任务默认是没有数据输入的,因此需要到消息路由中配置,要将哪些设备数据作为流计算任务的输入。
消息来源选择网关设备的productKey及devcieName,消息名选择属性,消息目标选择 流计算-CPU高负载报警
6. 查看运行结果
进入 /linkedge/run/debug/high_cpu_alter.txt
,查看输出结果
2018-10-13 15:01:26.404 -> productKey, deviceName, cpu, ts, gmtCreat
2018-10-13 15:01:43.964 -> a1hVtA6RAiU,gateway1010,21.243523,2018-10-13 15:01:43.6,2018-10-13 15:01:43.618
2018-10-13 15:02:13.607 -> a1hVtA6RAiU,gateway1010,20.971867,2018-10-13 15:02:13.587,2018-10-13 15:02:13.597
StreamSQL语法
阿里云流计算提供StreamSQL编写业务逻辑,为流式数据分析定制多种数据处理函数和操作符。
StreamSQL中有几个重要概念:
-- 声明一个数据源表
create table stream_source(word string) ;
-- 声明一个数据结果表
create table stream_result(word string, cnt bigint) ;
-- 声明计算逻辑,这里是统计word次数
insert into stream_result select
t.word
,count(1)
from stream_source t
group by t.word;
边缘流式计算SQL语法与阿里云计算语法完全一致。更多信息请参考阿里云流计算SQL文档。
下面介绍边缘流式计算引擎特有的几个语法。
EdgeHub
类别:源表(后续将支持edgehub作为结果表,V930版本暂未支持)
CREATE TABLE tabel_name (
-- Schema定义
) WITH (
type = 'custom',
tablefactoryclass = 'com.alibaba.blink.streaming.connector.edgehub.EdgeHubTableFactory'
jsonParser = ''
);
type、tablefactoryclass:固定,表示获取边缘端消息路由的消息(需要配路由规则)
jsonParser:定义消息解析器,不同的jsonParser对应不同的schema格式,为空表示default
productKey:产品Key
deviceName:设备名
propertyName:属性名
propertyValue:属性值
time:消息产生时间
gmtCreate:流计算引擎接收到消息的事件
示例
create table property (
propertyName varchar,
propertyValue varchar,
productKey varchar,
deviceName varchar,
gmtCreate varchar,
ts varchar,
tstamp as to_timestamp (cast (ts as bigint)),
WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
) with (
type = 'custom',
tablefactoryclass = 'com.alibaba.blink.streaming.connector.edgehub.EdgeHubTableFactory',
jsonParser = 'device_property'
);
2. 事件消息 jsonPaeser = 'device_event'
以下字段类型均为string:
productKey:产品Key
deviceName:设备名
eventCode:事件标识符
params:事件参数
time:消息产生时间
gmtCreate:流计算引擎接收到消息的事件
示例
create table event (
eventCode varchar,
params varchar,
productKey varchar,
deviceName varchar,
gmtCreate varchar,
ts varchar,
tstamp as to_timestamp (cast (ts as bigint)),
WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
) with (
type = 'custom',
tablefactoryclass = 'com.alibaba.blink.streaming.connector.edgehub.EdgeHubTableFactory',
jsonParser = 'device_event'
);
3. 默认Json格式 jsonPaeser = 'default'
json:从edgehub获取到的消息内容,json格式的字符串
jsonType:从edgehub获取到的消息类型
- device_property
- device_event
Json格式:
device_property
{ "gmtCreate": 1510292739881, "items": { "attribute_9": { "time": 1510292697471, "value": 560542025 }, "attribute_8": { "time": 1510292697470, "value": 715665571 } }, "productKey": "X5eCzh6fEH7", "deviceName": "xxxxxxxxxxxxxxxx" }
device_event
{ "eventCode": "BrokenInfo", "value": { "Power": "on", "structParam": { "param1": "abc", "param2": 123 } }, "time": 1510799670074, "productKey": "5RS5XTnNADg", "deviceName": "xxxxxxxxxxxxxxxx", "gmtCreate": 1510799670074 }
示例
create table property (
json varchar,
jsonType varchar,
gmtCreate as to_timestamp (
cast (json_value (json, '$.gmtCreate') as bigint)
),
deviceName as json_value (json, '$.deviceName'),
productKey as json_value (json, '$.productKey'),
ts as to_timestamp (
cast (
json_value (json, '$.items.temperature.time') as bigint
)
),
temperature as cast (
json_value (json, '$.items.temperature.value') as int
)
) with (
type = 'custom',
tablefactoryclass = 'com.alibaba.blink.streaming.connector.edgehub.EdgeHubTableFactory'
);
RDS
关系数据库(暂时只支持公网环境下的RDS for MySQL)
类别:结果表
create table db (
productKey varchar,
deviceName varchar,
ts timestamp,
temperature int
) with (
type = 'rds',
url = 'jdbc:mysql://${host}:${port}/${dbName}',
tableName = '${tableName}',
userName = '${userName}',
password = '${password}'
);
在实际使用过程中需要把${} 替换成具体的值
host:MySQL数据库host(若为localhost则为本地数据库)
port:MySQL数据库端口号
dbName:数据库名
tableName:表名
userName:数据库用户名
password:数据库密码
更多参数介绍见https://help.aliyun.com/document_detail/62525.html?spm=a2c4g.11186623.6.624.41PUuZ
File
文件(主要用于本地debug,文件路径可自定义)
类别:结果表
create table print_sink (
productKey varchar,
deviceName varchar,
eventCode varchar,
ts timestamp,
temperature int
) with (
type = 'custom',
class = 'com.alibaba.blink.connector.file.FileSink',
tag = 'prop_filter_b',
filePath = '/linkedge/run/debug/prop_filter_b.txt'
);
type、class:固定,表示输出到文件
tag:若存在,则每条输出前都附带tag信息
filePath:输出的文件路径
案例
1. 温湿度数据的统计和展现
-- 计算某工厂平均每分钟的温度,并写入数据库中,数据库的数据可用与展示工厂历史温度变化曲线。
-- 若某工厂有多个温度传感器,可支持计算多个传感器的平均温度(在消息路由中配置多个传感器即可)
-- 若想在单独计算某个传感器的平均每分钟的温度,在group by字段后加上productKey, deviceName即可
create table property (
propertyName varchar,
propertyValue varchar,
productKey varchar,
deviceName varchar,
gmtCreate varchar,
ts varchar,
tstamp as to_timestamp (cast (ts as bigint)),
WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
) with (
type = 'custom',
tablefactoryclass = 'com.alibaba.blink.streaming.connector.edgehub.EdgeHubTableFactory',
jsonParser = 'device_property'
);
create table db (
avg_temperature double,
t_start timestamp,
t_end timestamp,
t_r timestamp
) with (
type = 'rds',
url = 'jdbc:mysql://${host}:${port}/${dbName}',
tableName = '${tableName}',
userName = '${userName}',
password = '${password}'
);
insert into
db
select
avg (temperature) as avg_temperature,
tumble_start (tstamp, interval '1' minute),
tumble_end (tstamp, interval '1' minute),
tumble_rowtime (tstamp, interval '1' minute)
from (
select
cast (propertyValue as int) as temperature,
tstamp
from
property
where
propertyName = 'temperature'
)
group by
tumble (tstamp, interval '1' minute);
2. 高温高压报警
-- 锅炉温度超过60度则输出提示
create table property (
propertyName varchar,
propertyValue varchar,
productKey varchar,
deviceName varchar,
gmtCreate varchar,
ts varchar,
tstamp as to_timestamp (cast (ts as bigint)),
WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
) with (
type = 'custom',
tablefactoryclass = 'com.alibaba.blink.streaming.connector.edgehub.EdgeHubTableFactory',
jsonParser = 'device_property'
);
create table print_sink (
productKey varchar,
deviceName varchar,
temperature int,
ts timestamp,
gmtCreate timestamp
) with (
type = 'custom',
class = 'com.alibaba.blink.connector.file.FileSink',
filePath = '/linkedge/run/debug/high_alter.txt'
);
insert into
print_sink
select
productKey,
deviceName,
cast (propertyValue as int),
tstamp,
to_timestamp (cast (gmtCreate as bigint))
from
property
where
propertyName = 'temperature'
and cast (propertyValue as int) > 60;
3. 统计闸口平均每5分钟的人流量
-- 计算每个闸口每五分钟的人流量并输出(也可以写入数据库中)
-- 每个闸口检测到人会上报peopelPassed事件,并附带经过的人数count
create table event (
eventCode varchar,
params varchar,
productKey varchar,
deviceName varchar,
gmtCreate varchar,
ts varchar,
tstamp as to_timestamp (cast (ts as bigint)),
WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
) with (
type = 'custom',
tablefactoryclass = 'com.alibaba.blink.streaming.connector.edgehub.EdgeHubTableFactory',
jsonParser = 'device_event'
);
create table print_sink (
productKey varchar,
deviceName varchar,
flowrate int,
t_start timestamp,
t_end timestamp,
t_r timestamp
) with (
type = 'custom',
class = 'com.alibaba.blink.connector.file.FileSink',
filePath = '/linkedge/run/debug/visitor_flowrate.txt'
);
insert into
print_sink
select
productKey,
deviceName,
sum (cast (json_value (params, '$.count') as int)),
tumble_start (tstamp, interval '5' minute),
tumble_end (tstamp, interval '5' minute),
tumble_rowtime (tstamp, interval '5' minute)
from
event
where
eventCode = 'peopelPassed'
group by
tumble (tstamp, interval '5' minute),
productKey,
deviceName;
附1-本地MySQL的安装
sudo docker pull mysql:5.6.35
sudo docker run --name mysql -p ${port}:3306 -e MYSQL_ROOT_PASSWORD=${password} -d mysql:5.6.35
sudo docker exec -it mysql /bin/bash
port:宿主机一个空闲端口,会将将docker中MySQL的3306端口映射到该端口上
password:默认root用户的密码
示例,port=12345,password=admin1234
sudo docker pull mysql:5.6.35
sudo docker run --name mysql -p 12345:3306 -e MYSQL_ROOT_PASSWORD=linkedge1234 -d mysql:5.6.35
sudo docker exec -it mysql /bin/bash
当流计算使用本地MySQL数据库作为输出时,ip是宿主机ip(ifconfig看下,假设为192.168.1.166),端口是映射出来的端口12345,默认用户root,密码为linkedge1234,如下:
create table db (
productKey varchar,
deviceName varchar,
avg_temperature double,
t_start timestamp,
t_end timestamp,
t_r timestamp
) with (
type = 'rds',
url = 'jdbc:mysql://192.168.1.166:12345/iiot',
tableName = 'avg_temperature',
userName = 'root',
password = 'linkedge1234'
);
其中,数据库及表需要用户自己提前建好,且表的schema需要和流计算中rds结果表的schema保持一致(字段名和字段类型必须一致)。