示例:将 EFO 使用者与 Kinesis Data Streams 配合使用 - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:将 EFO 使用者与 Kinesis Data Streams 配合使用

在本练习中,您将创建一个 Kinesis Data Analytics 应用程序,该应用程序使用增强型扇出功能 (EFO)使用者。如果 Kinesis 使用者使用 EFO,Kinesis Data Streams 服务会为其提供自己的专用带宽,而不是让使用者与从流中读取数据的其他使用者共享流的固定带宽。

有关将 EFO 与 Kinesis 使用者结合使用的更多信息,请参阅FLINUX 针对 Kinesis 消费者的增强型扇出功能.

您在本示例中创建的应用程序使用AmazonKinesis 连接器 (flink-connector-kinesis) 1.13.2。

注意

要为本练习设置所需的先决条件,请先完成入门 (DataStreamAPI)练习。

创建相关资源

在为本练习创建 Kinesis Data Analytics 应用程序之前,请创建以下相关资源:

  • 两个 Kinesis 数据流 (ExampleInputStreamExampleOutputStream

  • Amazon S3 存储桶 (ka-app-code-<username>

您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:

  • 创建和更新数据流中的Amazon Kinesis Data Streams 开发者指南. 将数据流命名为 ExampleInputStreamExampleOutputStream

  • 如何创建 S3 存储桶?中的Amazon Storage Service. 附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如ka-app-code-<username>.

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 Amazon SDK for Python (Boto)

  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'EVENT_TIME': datetime.datetime.now().isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

该示例的 Java 应用程序代码可从 GitHub. 要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  3. 导航到 amazon-kinesis-data-analytics-java-examples/EfoConsumer目录。

应用程序代码位于 EfoApplication.java 文件中。请注意有关应用程序代码的以下信息:

  • 您可以通过在 Kinesis 使用器上设置以下参数来启用 EFO 使用者:

    • 记录_PUBLISHER_TYPE: 将该参数设置为EFO让您的应用程序使用 EFO 使用者访问 Kinesis 数据流数据。

    • EFO_CONSUMER_NAME: 将此参数设置为在此流的使用者中唯一的字符串值。在同一 Kinesis Data Stream 中重复使用使用者名称将导致以前使用该名称的使用者被终止。

  • 以下代码示例演示如何将值分配给使用者配置属性以使用 EFO 使用者从源流中读取:

    consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");

编译应用程序代码

要编译应用程序,请执行以下操作:

  1. 如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅入门 (DataStreamAPI)教程中的先决条件

  2. 使用以下命令编译应用程序:

    mvn package -Dflink.version=1.13.2
    注意

    提供的源代码依赖于 Java 11 中的库。

编译应用程序将创建应用程序 JAR 文件 (target/aws-kinesis-analytics-java-apps-1.0.jar)。

上传 Apache Flink 流式处理 Java 代码

在本节中,您将应用程序代码上传到在一节中创建的 Amazon S3 存储桶创建相关资源部分。

  1. 在 Amazon S3 控制台中,选择ka-app-code-<username>存储桶,然后选择上传.

  2. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 aws-kinesis-analytics-java-apps-1.0.jar 文件。

  3. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在存储Amazon S3 中,应用程序可以在其中访问代码。

创建和运行 Kinesis Data Analytics 应用程序

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建 应用程序

  1. 打开 Kinesis Data Analytics 控制台,网址为https://console.aws.amazon.com/kinesisanalytics.

  2. 在 Kinesis Data Analytics 仪表板上,选择创建分析应用程序.

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于 Runtime (运行时),请选择 Apache Flink

      注意

      Kinesis Data Analytics 使用 Apache Flink 版本 1.132。

    • 将版本下拉菜单保留为Apache Flink 1.13.2(推荐版本).

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  5. 选择 Create application(创建应用程序)。

注意

在使用控制台创建 Kinesis Data Analts 应用程序时,您可以选择为应用程序创建一个 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

编辑 IAM 策略

编辑 IAM 策略添加访问数据流数据流的Kinesis 限限限。

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    注意

    这些权限授予应用程序访问 EFO 消费者的能力。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/aws-kinesis-analytics-java-apps-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "AllStreams", "Effect": "Allow", "Action": [ "kinesis:ListShards", "kinesis:ListStreamConsumers", "kinesis:DescribeStreamSummary" ], "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/*" }, { "Sid": "Stream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:RegisterStreamConsumer", "kinesis:DeregisterStreamConsumer" ], "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" }, { "Sid": "Consumer", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream/consumer/my-flink-efo-consumer", "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream/consumer/my-flink-efo-consumer:*" ] } ] }

配置应用程序

  1. 在存储库的MyApplication页面上,选择配置.

  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 适用于Amazon S3 存储桶输入ka-app-code-<username>.

    • 适用于Amazon S3 对象的路径输入aws-kinesis-analytics-java-apps-1.0.jar.

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. UNDER属性,选择创建组. 适用于Group ID (组 ID)输入ConsumerConfigProperties.

  5. 输入以下应用程序属性和值:

    密钥
    flink.stream.recordpublisher EFO
    flink.stream.efo.consumername my-flink-efo-consumer
    INPUT_STREAM ExampleInputStream
    flink.inputstream.initpos LATEST
    AWS_REGION us-west-2
  6. UNDER属性,选择创建组. 适用于Group ID (组 ID)输入ProducerConfigProperties.

  7. 输入以下应用程序属性和值:

    密钥
    OUTPUT_STREAM ExampleOutputStream
    AWS_REGION us-west-2
    AggregationEnabled false
  8. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  9. 适用于CloudWatch 记录,选择启用”复选框。

  10. 选择 Update(更新)。

注意

当你选择启用 CloudWatch 日志组和日Kinesis Data Analytics 和日志组日志组和日志志组日志组和日志志组日志 这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。

运行应用程序

通过运行应用程序,打开 Apache Flink 控制面板,然后选择所需的 Flink 作业,可以查看 Flink 作业图。

你可以在上查看 Kinesis Data Analytics 指标 CloudWatch 控制台来验证应用程序是否正常工作。

您也可以在数据流的 Kinesis Data Streams 控制台中查看增强的扇出选项卡,用于您的消费者的姓名 (my-flink-efo-consumer)。

清理 Amazon 资源

本节包含清理过程Amazon在 efo 窗口教程中创建的资源。

删除 Kinesis Data Analytics 应用程序

  1. 打开 Kinesis Data Analytics 控制台,网址为https://console.aws.amazon.com/kinesisanalytics.

  2. 在 Kinesis Data Analytics 面板中,选择MyApplication.

  3. 在应用程序的页面中,选择 Delete (删除),然后确认删除。

删除 Kinesis Data Streams

  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在 Kinesis Data Streams 面板中,选择ExampleInputStream.

  3. ExampleInputStream页面上,选择删除 Kinesis Streams然后确认删除。

  4. Kinesis Streams页面上,选择ExampleOutputStream,选择操作,选择Delete,然后确认删除。

删除您的 Amazon S3 对象和存储桶

  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 选择ka-app-code-<username>存储桶。

  3. 选择 Delete (删除),然后输入存储桶名称以确认删除。

删除资源

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择kinesis-analytics-service-MyApplication-<your-region>政策。

  5. 选择 Policy Actions (策略操作),然后选择 Delete (删除)

  6. 在导航栏中,选择 Roles(角色)

  7. 选择kinesines-anticticMyApplication-<your-region>角色。

  8. 选择 Delete role (删除角色),然后确认删除。

删除日期和时间 CloudWatch 资源

  1. 打开 CloudWatch 控制台https://console.aws.amazon.com/cloudwatch/.

  2. 在导航栏中,选择 Logs (日志)

  3. 选择/aws/kinesis-analytictics/MyApplication日志组。

  4. 选择 Delete Log Group (删除日志组),然后确认删除。