Java 类名:com.alibaba.alink.operator.stream.feature.TumbleTimeWindowStreamOp
Python 类名:TumbleTimeWindowStreamOp
滚动窗口属于GroupWindow的一种,它基于GroupWindow,使用聚合函数进行计算,输出窗口内的统计量,,特征生成方式由clause(表达式决定)。
clause当前支持全部flink支持的聚合函数,并在此基础上额外支持了一系列聚合函数。
详细用法请参考 http://aboutdata.top/api/tutorial/appendix_aggregate_function.html
Alink支持的窗口, 其中Group窗口是输出窗口聚合统计量,OVER窗口是给定一行数据,将窗口特征追加到数据后面,输出带特征的一行数据。
各窗口的详细用法请参考 https://www.yuque.com/pinshu/alink_guide/dffffm
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
clause | 运算语句 | 运算语句 | String | ✓ | ||
timeCol | 时间戳列(TimeStamp) | 时间戳列(TimeStamp) | String | ✓ | 所选列类型为 [TIMESTAMP] | |
windowTime | 窗口大小 | 窗口大小 | String | ✓ | ||
groupCols | 分组列名数组 | 分组列名,多列,可选,默认不选 | String[] | null | ||
latency | 水位线的延迟 | 水位线的延迟,默认0.0 | Double | 0.0 | ||
watermarkType | 水位线的类别 | 水位线的类别 | String | “PERIOD”, “PUNCTUATED” | “PERIOD” |
from pyalink.alink import * import pandas as pd useLocalEnv(1) sourceFrame = pd.DataFrame([ [0, 0, 0, 1], [0, 2, 0, 2], [0, 1, 1, 3], [0, 3, 1, 4], [0, 3, 3, 5], [0, 0, 3, 6], [0, 0, 4, 7], [0, 3, 4, 8], [0, 1, 2, 9], [0, 2, 2, 10], ]) streamSource = StreamOperator.fromDataframe(sourceFrame,schemaStr="user int, device long, ip long, timeCol long") op = TumbleTimeWindowStreamOp()\ .setTimeCol("timeCol")\ .setWindowTime(60)\ .setGroupCols(["user"])\ .setClause("count_preceding(ip) as countip") streamSource.select('user, device, ip, to_timestamp(timeCol) as timeCol').link(op).print() StreamOperator.execute()
package com.alibaba.alink.operator.stream.feature; import org.apache.flink.types.Row; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.source.MemSourceStreamOp; import com.alibaba.alink.testutil.AlinkTestBase; import org.junit.Test; import java.sql.Timestamp; import java.util.Arrays; import java.util.List; public class TumbleTimeWindowStreamOpTest extends AlinkTestBase { @Test public void test() throws Exception { List <Row> sourceFrame = Arrays.asList( Row.of(0, 0, 0, new Timestamp(1000L)), Row.of(0, 2, 0, new Timestamp(2000L)), Row.of(0, 1, 1, new Timestamp(3000L)), Row.of(0, 3, 1, new Timestamp(4000L)), Row.of(0, 3, 3, new Timestamp(5000L)), Row.of(0, 0, 3, new Timestamp(6000L)), Row.of(0, 0, 4, new Timestamp(7000L)), Row.of(0, 3, 4, new Timestamp(8000L)), Row.of(0, 1, 2, new Timestamp(9000L)), Row.of(0, 2, 2, new Timestamp(10000L)) ); StreamOperator <?> source = new MemSourceStreamOp( sourceFrame, new String[] {"user", "device", "ip", "ts"}); source.link( new TumbleTimeWindowStreamOp() .setGroupCols("user") .setTimeCol("ts") .setWindowTime("6s") .setClause("TUMBLE_START() as start_time, TUMBLE_END() as end_time, COUNT_PRECEDING(ip) as count_ip") ).print(); StreamOperator.execute(); } }
批预测结果
user | countip |
---|---|
0 | 9 |
流预测结果
user | countip |
---|---|
0 | 9 |