Flink - Table API 之 window (窗口)

news/2024/7/3 21:04:51

窗口

  • 时间语义,要配合窗口操作才能发挥作用
  • 在Table API 和 SQL 中,主要有两种窗口

Group Window(分组窗口)

  • 根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数

Over Windows

  • 针对每个输入行,计算相邻范围内的聚合

Group Windows

  • Group Windows是使用window(w:GroupWindows)子句定义的,并且必须由as子句指定一个别名

  • 为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用

    Table table = input
    	.window(w:GroupWindow] as "w") //定义窗口,别名为"w"
    	.groupBy("w","a")  // 按照字段a和窗口w分组
    	.select("a,b.sum"); // 聚合
    
  • Table API 提供了一组具有特定语义的预定义Window类,这些类会被转换为底层DataStream或DataSet的窗口操作

滚动窗口(Tumbling windows)

  • 滚动窗口要用Tunble类来定义

    //Tumbling Event-time Window
    .window(Tumble.over("10.minutes").on("rowtime").as("w"))
    
    // Tumbling Processing-tiem Window
    .window(Tumble.over("10.minutes").on("proctime").as("w"))
    
    // Tumbling Row-count Window
    .window(Tumble.over("10.rows").on("proctime").as("w"))
    

滑动窗口(Sliding windows)

  • 滑动窗口要用Slide类来定义

    //Sliding Event-time Window
    .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
    
    // Sliding Processing-tiem Window
    .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
    
    // Sliding Row-count Window
    .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
    

会话窗口 (Session Window)

  • 会话窗口要用Session类来定义

    // Session Event-time window
    .window(Session.withGap("10.minutes").on("rowtime").as("w"))
    
    // Session Processing-time window
    .window(Session.withGap("10.minutes").on("proctime").as("w"))
    

SQL中的Group Windows

  • Group Windows定义在SQL查询的Group By子句中

    TUMBLE(time_attr,interval)

    • 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口字段

    HOP(time_attr,interval,interval)

    • 定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三是窗口长度

    SESSION(time_attr,interval)

    • 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔。
  • 案例代码

    public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            DataStream<String> fileDataStream = env.readTextFile("data/temps.txt");
    
    
            DataStream<TempInfo> dataStream = fileDataStream.map(new MapFunction<String, TempInfo>() {
                @Override
                public TempInfo map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new TempInfo(split[0], new Long(split[1]), new Double(split[2]));
                }
            }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TempInfo>(Time.seconds(1)) {
                @Override
                public long extractTimestamp(TempInfo element) {
                    return element.getTimeStamp() * 1000L;
                }
            });
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //Table dataTable = tableEnv.fromDataStream(dataStream, "id, timeStamp.rowtime as ts , temp, pt.proctime");
    
            //Table dataTable = tableEnv.fromDataStream(dataStream, "id, timeStamp.rowtime as ts , temp, rt.rowtime");
            Table dataTable = tableEnv.fromDataStream(dataStream,"id, timeStamp as ts, temp, rt.rowtime");
            
            // 构建视图
            tableEnv.createTemporaryView("sensor",dataTable);
    
    
            // 窗口操作
            // 1. Group Window
            // table API
            Table resTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
                    .groupBy("id,tw")
                    .select("id, id.count, temp.avg");
    
            // SQL
           Table resSqlTable = tableEnv.sqlQuery("SELECT id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt,interval '10' second)" +
                    "from sensor group by id, tumble(rt,interval '10' second) ");
    
    
    
            // 2. Over Window
            // table API
            Table overTable = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
                    .select("id, rt, id.count over ow,temp.avg over ow");
    
            // SQL
            tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow "+
                    " from sensor "+
                    " window ow as (partition by id order by rt rows between 2 preceding and current row)");
    
    
    
            tableEnv.toAppendStream(resTable, Row.class).print("group window");
            tableEnv.toAppendStream(resSqlTable, Row.class).print("resSqlTable");
            tableEnv.toRetractStream(overTable,Row.class).print("over window");
    
            env.execute();
        }
    

Over Windows

  • Over window聚合是标准SQL中已有的(over子句),可以在查询的SELECT 子句中定义

  • Over window聚合,会针对每个输入行,计算相邻行范围内的聚合

  • Over windows使用window(w:overwindows*)子句定义,并在select()方法中通过别名来引用

    Table table = input
    	.window([w:OverWindow] as "w")
    	.select("a, b.sum over w, c.min over w");
    
  • Table API 提供了Over类,来配置Over窗口的属性

无界 Over Windows

  • 可以在事件时间或处理时间,以及指定为时间间隔,或行计数的范围内,定义Over windows
  • 无界的over windows 是使用常量指定的
// 无界的事件时间 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))

// 无界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))

// 无界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))

// 无界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))

有界 Over Windows

  • 有界的over window 是用间隔的大小指定的
// 有界的事件时间over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))

// 有界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))

// 有界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))

// 有届的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))

SQL中的Over Windows

  • 用Over做窗口聚合时,所有聚合必须在同一个窗口上定义,也就是说必须是相同的分区,排序和范围

  • 目前仅支持在当前行范围之前的窗口

  • ORDER BY 必须在单一的时间属性上指定

    SELECT COUNT(amount) OVER(
    	PARTITION BY user
    	ORDER BY proctime
    	ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
    FROM Orders
    

http://www.niftyadmin.cn/n/3018944.html

相关文章

lwip 网络接口结构体 netif

2019独角兽企业重金招聘Python工程师标准>>> 在lwIP中&#xff0c;是通过结构体netif来描述一个硬件网络接口的&#xff0c;在单网卡中&#xff0c;这个结构体只有一个&#xff0c;多网卡中可有何网卡数目相同的netif结构体&#xff0c;它们构成一个数据链。 /** Ge…

嵌入式linux学习笔记-- linux下inotify的使用(转载)

原文地址&#xff1a;http://www.cnblogs.com/jimmychange/p/3498862.html 有时候我们需要检测某个目录下文件或者子目录的改动状况&#xff0c;如添加、删除、以及更新等&#xff0c;Linux系统上提供了inotify来完成这个功能。inotify是在版本2.6.13的内核中首次出现&#xff…

微信接口测试

""用于向指定用户发送消息前提&#xff1a; 1. 申请账号 "appid": wx65d37317efb972e0, "secret": f59dc1e2f5e3641145a213027fb122cc, 2. 知道用户的微信ID othEL0hlOrdBIRLLXuX0BA8frGZE"""import …

嵌入式linux学习笔记---TCP立即发出 以及 TCP的keep alive

事情的起因是公司的产品的某一个功能存在的bug&#xff0c;所以就有了本次的探索。 需求&#xff1a; 产品在某一个端口上 定时的向外发送1440 字节的数据包&#xff0c;该数据包包含了产品当前的各种状态。 需求2 &#xff1a; 产品本身绑定一个本地的端口 接收来自外部的字符…

Flink Sort-Shuffle写简析

文章目录1、配置2、初始创建3、成员变量4、写shuffle文件4.1、获取SortBuffer4.2、追加数据4.3、buffer不足的处理4.4、buffer不足数据未读完5、关于排序5.1、segment申请5.2、writeIndex5.2.1、获取当前可用segment5.2.2、写入index到segment5.2.3、更新partition最后数据的索…

Flink Sort-Shuffle读简析

文章目录1、SortMergeResultPartition的创建使用2、PartitionedFileReader2.1、moveToNextReadableRegion2.2、readCurrentRegion2.3、hasRemaining3、读操作的调用4、数据返回4.1、读入缓存4.2、buffersRead读取1、SortMergeResultPartition的创建使用 首先是一个读过程的一个…

信息社会

消息 通知 公告(简报) 新闻(深度分析文章) 历史(沉淀形成知识) 各部门的信息系统 陕西省住房和城乡建设厅 陕西省建筑市场监管平台陕西省质量安全监管信息系统陕西省标准定额协同管理平台陕西省房地产市场监管信息系统陕西省城市园林绿化企业信息管理系统陕西省执业资格注册人员…

java 实现方法_java常见代码(1)------常见实现方法

1.equals 和 hashcode方法class Students {String name;int age;byte[] idSequence;Overridepublic boolean equals(Object obj) {if (!(obj instanceof Students))return false;Students other (Students) obj;return name.equals(other.name)&& age other.age&…