示例:滑动窗口 - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:滑动窗口

在本练习中,您创建一个使用滑动窗口聚合数据的 Kinesis Data Analytics 应用程序。在 Flink 中,该功能状态。要禁用终止保护,请使用以下内容:

sink.producer.aggregation-enabled' = 'false'
注意

要为本练习设置所需的先决条件,请先完成入门 (DataStreamAPI)练习。

创建相关资源

在为本练习创建 Kinesis Data Analytics 应用程序之前,请创建以下相关资源:

  • 两个 Kinesis 数据流(ExampleInputStreamExampleOutputStream)。

  • Amazon S3 存储桶 (ka-app-code-<username>

您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:

  • 创建和更新数据流中的Amazon Kinesis Data Streams 开发者指南. 将数据流命名为 ExampleInputStreamExampleOutputStream

  • 如何创建 S3 存储桶?中的Amazon Storage Service. 附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如ka-app-code-<username>.

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 Amazon SDK for Python (Boto)

  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'EVENT_TIME': datetime.datetime.now().isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

该示例的 Java 应用程序代码可从 GitHub. 要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  3. 导航到 amazon-kinesis-data-analytics-java-examples/SlidingWindow目录。

应用程序代码位于 SlidingWindowStreamingJobWithParallelism.java 文件中。请注意有关应用程序代码的以下信息:

  • 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建了 Kinesis source:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
  • 应用程序使用 timeWindow 操作符在 10 秒的滑动窗口(以 5 秒为增量)中查找每个股票代号的最小值。以下代码创建操作符,并将聚合的数据发送到新的 Kinesis Data Streams 接收器:

  • 添加以下 import 语句:

    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; //flink 1.13
  • 应用程序使用 timeWindow 操作符在 5 秒的滚动窗口中查找每个股票代号的值计数。以下代码创建操作符,并将聚合的数据发送到新的 Kinesis Data Streams 接收器:

    input.flatMap(new Tokenizer()) // Tokenizer for generating words .keyBy(0) // Logically partition the stream for each word //.timeWindow(Time.seconds(5)) // Tumbling window definition (Flink 1.11) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //Flink 1.13 .sum(1) // Sum the number of words per partition .map(value -> value.f0 + "," + value.f1.toString() + "\n") .addSink(createSinkFromStaticConfig());

编译应用程序代码

要编译应用程序,请执行以下操作:

  1. 如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅入门 (DataStreamAPI)教程中的先决条件

  2. 使用以下命令编译应用程序:

    mvn package -Dflink.version=1.13.2
    注意

    提供的源代码依赖于 Java 11 中的库。

编译应用程序将创建应用程序 JAR 文件 (target/aws-kinesis-analytics-java-apps-1.0.jar)。

上传 Apache Flink 流式处理 Java 代码

在本节中,您将应用程序代码上传到在一节中创建的 Amazon S3 存储桶创建相关资源部分。

  1. 在 Amazon S3 控制台中,选择ka-app-code-<username>存储桶,然后选择上传.

  2. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 aws-kinesis-analytics-java-apps-1.0.jar 文件。

  3. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在存储Amazon S3 中,应用程序可以在其中访问代码。

创建和运行 Kinesis Data Analytics 应用程序

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建 应用程序

  1. 打开 Kinesis Data Analytics 控制台,网址为https://console.aws.amazon.com/kinesisanalytics.

  2. 在 Kinesis Data Analytics 仪表板上,选择创建分析应用程序.

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于 Runtime (运行时),请选择 Apache Flink

    • 将版本下拉菜单保留为Apache Flink 1.13.2(推荐版本).

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  5. 选择 Create application(创建应用程序)。

注意

在使用控制台创建 Kinesis Data Analts 应用程序时,您可以选择为应用程序创建一个 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

编辑 IAM 策略

编辑 IAM 策略添加访问数据流数据流的Kinesis 限限限。

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/aws-kinesis-analytics-java-apps-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

配置应用程序

  1. 在存储库的MyApplication页面上,选择配置.

  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 适用于Amazon S3 存储桶输入ka-app-code-<username>.

    • 适用于Amazon S3 对象的路径输入aws-kinesis-analytics-java-apps-1.0.jar.

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  5. 适用于CloudWatch 记录,选择启用”复选框。

  6. 选择 Update(更新)。

注意

当您选择启用亚马逊时 CloudWatch 日志组和日Kinesis Data Analytics 和日志组日志组和日志志组日志组和日志志组日志 这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。

配置应用程序并行度

该应用程序示例使用任务的并行执行功能。以下应用程序代码设置 min 操作符的并行度:

.setParallelism(3) // Set parallelism for the min operator

应用程序并行度不能大于预置的并行度(默认为 1)。要增加应用程序的并行度,请使用以下 Amazon CLI 操作:

aws kinesisanalyticsv2 update-application --application-name MyApplication --current-application-version-id <VersionId> --application-configuration-update "{\"FlinkApplicationConfigurationUpdate\": { \"ParallelismConfigurationUpdate\": {\"ParallelismUpdate\": 5, \"ConfigurationTypeUpdate\": \"CUSTOM\" }}}"

您可以使用检索当前应用程序版本 IDDescribeApplication要么ListApplications行动。

运行应用程序

通过运行应用程序,打开 Apache Flink 控制面板,然后选择所需的 Flink 作业,可以查看 Flink 作业图。

你可以在上查看 Kinesis Data Analytics 指标 CloudWatch 控制台来验证应用程序是否正常工作。

清理 Amazon 资源

本节包含清理在滑动窗口教程中创建的 Amazon 资源的过程。

删除 Kinesis Data Analytics 应用程序

  1. 打开 Kinesis Data Analytics 控制台,网址为https://console.aws.amazon.com/kinesisanalytics.

  2. 在 Kinesis Data Analytics 面板中,选择MyApplication.

  3. 在应用程序的页面中,选择 Delete (删除),然后确认删除。

删除 Kinesis Data Streams

  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在 Kinesis Data Streams 面板中,选择ExampleInputStream.

  3. ExampleInputStream页面上,选择删除 Kinesis Streams然后确认删除。

  4. Kinesis Streams页面上,选择ExampleOutputStream,选择操作,选择Delete,然后确认删除。

删除您的 Amazon S3 对象和存储桶

  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 选择ka-app-code-<username>存储桶。

  3. 选择 Delete (删除),然后输入存储桶名称以确认删除。

删除资源

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择kinesis-analytics-service-MyApplication-<your-region>政策。

  5. 选择 Policy Actions (策略操作),然后选择 Delete (删除)

  6. 在导航栏中,选择 Roles(角色)

  7. 选择kinesines-anticticMyApplication-<your-region>角色。

  8. 选择 Delete role (删除角色),然后确认删除。

删除日期和时间 CloudWatch 资源

  1. 打开 CloudWatch 控制台https://console.aws.amazon.com/cloudwatch/.

  2. 在导航栏中,选择 Logs (日志)

  3. 选择/aws/kinesis-analytictics/MyApplication日志组。

  4. 选择 Delete Log Group (删除日志组),然后确认删除。