本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
滑动窗口
您可以不使用 GROUP BY 对记录分组,而是定义基于时间或基于行的窗口。您应通过添加显式 WINDOW 子句执行此操作。
在这种情况下,当窗口随着时间滑动时,在流中出现新记录时,Amazon Kinesis Data Analytics 将发送输出。Kinesis Data Analytics 将通过在窗口中处理行来发送此输出。窗口在这种类型的处理中可以重叠,一个记录可以属于多个窗口并且可随各个窗口一起处理。以下示例说明了滑动的窗口。
考虑创建一个简单的查询对流中的记录进行计数。此示例假定有一个 5 秒的窗口。在以下示例流中,新记录在 t 的时间到达1, t2, t6和 t7,三条记录在时间 t 到达8秒。
记住以下内容:
-
此示例假定有一个 5 秒的窗口。该 5 秒窗口持续随着时间滑动。
-
对于进入窗口的每一行,滑动窗口会发送输出行。应用程序启动后不久,您会看到查询针对出现在流中的每个新记录发送输出,即使尚未经过 5 秒窗口。例如,当记录出现在第一秒和第二秒时,查询会发送输出。稍后,查询会处理 5 秒窗口中的记录。
-
该窗口随着时间滑动。如果流中的旧记录落后于窗口,查询将不会发送任何输出,除非流中也有一个新记录落在该 5 秒窗口中。
假设查询在 t 开始执行。0. 那么,将出现以下情况:
-
当时 t0,查询开始执行。查询不发送输出 (计数值),因为此时没有记录。
-
在时间 t1,将在流中显示一个新记录,并且查询发送计数值 1。
-
在时间 t2,将出现另一个记录,并且查询发送计数 2。
-
此 5 秒窗口随着时间滑动:
-
在 t3,滑动窗口 t3至 t0
-
在 t4 处 (滑动窗口 t4至 t0)
-
在 t5滑动窗口 t5—t0
在所有这些时间,此 5 秒窗口具有相同的记录 — 没有新记录。因此,查询不会发送任何输出。
-
-
在时间 t6,此 5 秒窗口为 (t)6至 t1)。查询在 t 处检测到一个新记录。6所以它发出输出 2。t 的记录1不再位于窗口中,计数时不考虑。
-
在时间 t7,此 5 秒窗口为 t7至 t2. 查询在 t 处检测到一个新记录。7所以它发出输出 2。t 的记录2不再位于 5 秒窗口中,因此计数时不考虑。
-
在时间 t8,此 5 秒窗口为 t8至 t3. 查询检测到三个新记录,因此发送了记录计数 5。
总之,窗口是固定大小,并且随时间滑动。当出现新记录时,查询会发送输出。
我们建议使用滑动窗口的时间不要超过 1 小时。如果您使用时间更长的窗口,应用程序在常规系统维护之后需要更长的时间才能重新启动。这是因为必须再次从流中读取源数据。
以下示例查询使用 WINDOW 子句定义窗口和执行聚合。由于查询不指定 GROUP BY,因此查询使用滑动窗口方法处理流中的记录。
示例 1:使用 1 分钟滑动窗口处理流
在填充应用程序内部流 SOURCE_SQL_STREAM_001 时,请考虑“入门”练习中的演示流。下面是架构。
(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)
假设您希望应用程序使用 1 分钟滑动窗口计算聚合。也就是说,对于出现在流中的每个新记录,您希望应用程序通过对前面的 1 分钟窗口中的记录应用聚合来发送输出。
您可以使用以下基于时间的窗口式查询。查询使用 WINDOW 子句定义 1 分钟范围间隔。WINDOW 子句中的 PARTITION BY 按照滑动窗口中的股票行情机值对记录进行分组。
SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
测试查询
-
按照入门练习设置应用程序。
-
使用先前的
SELECT查询替换应用程序代码中的SELECT语句。生成的应用程序代码如下。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
示例 2:查询在滑动窗口中应用聚合
针对演示流的以下查询将返回一个 10 秒窗口中,每个股票行情机的价格的平均百分比变化。
SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
测试查询
-
按照入门练习设置应用程序。
-
使用先前的
SELECT查询替换应用程序代码中的SELECT语句。生成的应用程序代码如下。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
示例 3:从同一流中的多个滑动窗口查询数据
您可以编写查询以发送输出,其中的每个列值都是使用同一流上定义的不同滑动窗口计算的。
在以下示例中,查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出,这些代码的两行移动平均值超过了 10 行移动平均值。a2 和 a10 列值派生自 2 行和 10 行滑动窗口。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);
要针对演示流测试此查询,请按照示例 1中介绍的测试过程操作。