本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建并运行适用于 Python 的 Kinesis Data Analytics 应用程序
在本练习中,您将使用 Kinesis 流作为源和接收器来创建适用于 Python 应用程序的 Kinesis Data Analytics 应用程序。
本节包含以下步骤。
创建相关资源
在为本练习创建适用于 Apache Flink 的 Kinesis Data Analytics 应用程序之前,请创建以下相关资源:
-
两个 Kinesis 流用于输入和输出。
-
Amazon S3 存储段,用于存储应用程序代码和输出 (
ka-app-)<username>
创建两个 Kinesis Streams
对于本练习 Kinesis Data Analytics 应用程序,请创建两个 Kinesis Data Analytics(ExampleInputStream和ExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。
您可以使用 Amazon Kinesis 控制台或以下控制台创建这些流。Amazon CLI命令。有关控制台说明,请参阅创建和更新数据流中的Amazon Kinesis Data Streams.
创建数据流 (Amazon CLI)
-
创建第一个流 (
ExampleInputStream),使用以下Amazon Kinesiscreate-streamAmazon CLI命令。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser -
要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为
ExampleOutputStream)。$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
创建 AAmazon S3 存储桶
您可以使用控制台创建 Amazon S3 存储段。有关创建此资源的说明,请参阅以下主题:
-
如何创建 S3 存储桶?中的Amazon Simple Service. 附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如
ka-app-.<username>
其他资源
当您创建应用程序时,Kinesis Data Analytics 将创建以下亚马逊 CloudWatch 资源(如果不存在):
-
名为
/aws/kinesis-analytics-java/MyApplication的日志组。 -
名为
kinesis-analytics-log-stream的日志流。
将示例记录写入输入流
在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。
此部分需要 Amazon SDK for Python (Boto)
本节中的 Python 脚本使用Amazon CLI. 你必须配置你的Amazon CLI使用您的账户凭证和默认区域。配置 CourceAmazon CLI,输入以下内容:
aws configure
-
使用以下内容创建名为
stock.py的文件:import datetime import json import random import boto3 import time STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") time.sleep(2) if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis')) -
运行
stock.py脚本:$ python stock.py在完成本教程的其余部分时,请将脚本保持运行状态。
创建和检查 Apache Flink 流式处理 Python 代码
本示例的 Python 应用程序代码可从 GitHub. 要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples导航到
amazon-kinesis-data-analytics-java-examples/python/GettingStarted目录。
应用程序代码位于 streaming-file-sink.py 文件中。请注意有关应用程序代码的以下信息:
应用程序使用 Kinesis 表源从源流中进行读取。以下片段调用
create_table函数来创建 Kinesis 表源:table_env.execute_sql( create_table(input_table_name, input_stream, input_region, stream_initpos) )这些区域有:
create_table函数使用 SQL 命令创建由流式处理源支持的表:def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ('connector' = 'kinesis','stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format( table_name, stream_name, region, stream_initpos )应用程序创建两个表,然后将一个表的内容写入另一个表。
# 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region, stream_initpos) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))该应用程序使用 Flink 连接器,从flink- sql-connector-kinesis_1.13.2
文件。 使用第三方 python 软件包时(例如boto3
),则需要将添加到中GettingStarted文件夹在哪里 getting-started.py位于中。无需在 Apache Flink 或 Kinesis Data Analytics 中添加任何其他配置。可以在以下网站找到示例:如何在 PyfLink 中使用 boto3.
上传 Apache Flink 流式处理 Python 代码
在本节中,您将创建 Amazon S3 存储桶并上传应用程序代码。
使用控制台上传应用程序代码:
使用您首选的压缩应用程序来压缩
getting-started.py和https://mvnrepository.com/artifact/org.apache.flink/flink- sql-connector-kinesis_1.13.2文件。命名存档 myapp.zip. 如果在档案中包含外部文件夹,则必须将其包含在配置文件中的代码的路径中:GettingStarted/getting-started.py.通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/
。 -
选择 Create bucket (创建存储桶)。
-
Enter
ka-app-code-中的Bucket name字段中返回的子位置类型。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择 Next(下一步)。<username> -
在配置选项步骤中,让设置保持原样,然后选择下一步。
-
在设置权限步骤中,让设置保持原样,然后选择下一步。
-
请选择 Create bucket (创建存储桶)。
-
在 Amazon S3 控制台中,选择ka-app-code-
<username>存储桶,然后选择上传. -
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
myapp.zip文件。选择 Next(下一步)。 -
您无需更改该对象的任何设置,因此,请选择 Upload (上传)。
使用上传应用程序代码Amazon CLI:
不要使用 Finder (macOS) 或 Windows 资源管理器 (Windows) 中的压缩功能来创建myapp.zip存档。这可能会导致应用程序代码无效。
使用您首选的压缩应用程序来压缩
getting-started.py和https://mvnrepository.com/artifact/org.apache.flink/flink- sql-connector-kinesis_1.13.2文件。命名存档 myapp.zip. 如果在档案中包含外部文件夹,则必须将其包含在配置文件中的代码的路径中:GettingStarted/getting-started.py.运行以下 命令:
$ aws s3 --regionaws regioncp myapp.zip s3://ka-app-code-<username>
您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。
创建和运行 Kinesis Data Analytics 应用程序
按照以下步骤,使用控制台创建、配置、更新和运行应用程序。
创建 应用程序
打开 Kinesis Data Analytics 控制台,网址为https://console.aws.amazon.com/kinesisanalytics
. -
在 Kinesis Data Analytics 仪表板上,选择创建分析应用程序.
-
在 Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 Application name (应用程序名称),输入
MyApplication。 -
对于描述,输入
My java test app。 -
对于 Runtime (运行时),请选择 Apache Flink。
-
将版本保留为Apache Flink 版本 1.13.1(推荐版本).
-
-
对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。 -
选择 Create application(创建应用程序)。
在使用控制台创建 Kinesis Data Analytics 应用程序时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-west-2 -
角色:
kinesis-analytics-MyApplication-us-west-2
配置应用程序
可以使用以下过程配置该应用程序。
配置应用程序
-
在存储库的MyApplication页面上,选择配置.
-
在 Configure application (配置应用程序) 页面上,提供 Code location (代码位置):
-
适用于Amazon S3 存储桶,输入
ka-app-code-.<username> -
适用于Amazon S3 对象的路径,输入
myapp.zip.
-
-
在 Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。 -
UNDER属性,选择添加组. 适用于Group ID (组 ID),输入
consumer.config.0. -
输入以下应用程序属性和值:
密钥 值 input.stream.nameExampleInputStreamaws.regionus-west-2flink.stream.initposLATEST选择Save(保存)。
UNDER属性,选择添加组。适用于Group ID (组 ID),输入
producer.config.0.输入以下应用程序属性和值:
密钥 值 output.stream.nameExampleOutputStreamaws.regionus-west-2shard.count1UNDER属性,选择添加组。适用于Group ID (组 ID),输入
kinesis.analytics.flink.run.options. 这个特殊的属性组告诉你的应用程序在哪里可以找到它的代码资源。有关更多信息,请参阅 指定代码文件。输入以下应用程序属性和值:
密钥 值 pythongetting-started.pyjarfileflink-sql-connector-kinesis_2.12-1.13.1.jar-
在 Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)。
-
适用于CloudWatch 记录,选择启用”复选框。
-
选择 Update(更新)。
当您选择启用亚马逊时 CloudWatch 日志记录,Kinesis Data Analytics 将为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
编辑 IAM 策略
编辑 IAM 策略以添加权限以访问 Amazon S3 存储桶。
编辑 IAM 策略以添加 S3 存储桶权限
通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/
。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-west-2策略。 -
在 Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901) 替换为您的账户 ID。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" }] }
运行应用程序
通过运行应用程序,打开 Apache Flink 控制面板,然后选择所需的 Flink 作业,可以查看 Flink 作业图。
停止应用程序
要停止应用程序,在MyApplication页面上,选择停止. 确认该操作。