本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
教程:使用 Kinesis Data Analytics 应用程序将数据从一个 MSK 集群复制到 VPC 中的另一个集群
以下教程说明了如何创建具有 Amazon MSK 集群和两个主题的 Amazon VPC,以及如何创建从一个 Amazon MSK 主题中读取并写入到另一个主题的 Kinesis Data Analts 应用程序。
要为本练习设置所需的先决条件,请先完成入门 (DataStreamAPI)练习。
使用 Amazon MSK 集群创建Amazon VPC
要创建示例 VPC 和 Amazon MSK 集群以从 Kinesis Data Analytics 应用程序中访问,请按照使用Amazon MSK 入门教程。
在完成本教程时,请注意以下几点:
In第 5 步:创建主题,重复
kafka-topics.sh --create命令以创建名为的目标主题AWSKafkaTutorialTopicDestination:bin/kafka-topics.sh --create --zookeeperZooKeeperConnectionString--replication-factor 3 --partitions 1 --topic AmazonKafkaTutorialTopicDestination记录集群的引导服务器列表。您可以使用以下命令获取引导服务器列表(替换
ClusterArn使用您的 MSK 集群的 ARN):aws kafka get-bootstrap-brokers --region us-west-2 --cluster-arnClusterArn{... "BootstrapBrokerStringTls": "b-2.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-1.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-3.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094" }在执行教程中的步骤时,请务必使用所选的Amazon代码、命令和控制台条目中的区域。
创建应用程序代码
在本节中,您下载并编译应用程序 JAR 文件。我们建议使用 Java 11。
该示例的 Java 应用程序代码可从 GitHub. 要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples应用程序代码位于
amazon-kinesis-data-analytics-java-examples/KafkaConnectors/KafkaGettingStartedJob.java文件中。您可以检查代码以熟悉 Kinesis Data Analytics 应用程序代码的结构。使用命令行 Maven 工具或首选的开发环境以创建 JAR 文件。要使用命令行 Maven 工具编译 JAR 文件,请输入以下内容:
mvn package -Dflink.version=1.13.2如果构建成功,则会创建以下文件:
target/KafkaGettingStartedJob-1.0.jar注意 提供的源代码依赖于 Java 11 中的库。如果你使用的是开发环境,
上传 Apache Flink 流式处理 Java 代码
在本节中,您将应用程序代码上传到在一节中创建的 Amazon S3 存储桶入门 (DataStreamAPI)教程。
如果您从入门教程中删除了 Amazon S3 存储桶,请按照上传 Apache Flink 流式处理 Java 代码再次进行。
-
在 Amazon S3 控制台中,选择ka-app-code-
<username>存储桶,然后选择上传. -
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
KafkaGettingStartedJob-1.0.jar文件。 您无需更改该对象的任何设置,因此,请选择 Upload (上传)。
您的应用程序代码现在存储在存储Amazon S3 中,应用程序可以在其中访问代码。
创建 应用程序
打开 Kinesis Data Analytics 控制台,网址为https://console.aws.amazon.com/kinesisanalytics
. -
在 Amazon Kinesis Data Analytics 控制面板创建分析应用程序.
-
在 Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 Application name (应用程序名称),输入
MyApplication。 -
适用于运行时,选择Flink 版本 1.13.2.
-
-
对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。 -
选择 Create application(创建应用程序)。
在使用控制台创建 Kinesis Data Analts 应用程序时,您可以选择为应用程序创建一个 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-west-2 -
角色:
kinesis-analytics-MyApplication-us-west-2
配置应用程序
-
在存储库的MyApplication页面上,选择配置.
-
在 Configure application (配置应用程序) 页面上,提供 Code location (代码位置):
-
适用于Amazon S3 存储桶输入
ka-app-code-.<username> -
适用于Amazon S3 对象的路径输入
KafkaGettingStartedJob-1.0.jar.
-
-
在 Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2。注意 使用控制台指定应用程序资源时(例如 CloudWatch 日志或 Amazon VPC)时,控制台修改应用程序执行角色以授予访问这些资源的权限。
-
在 Properties (属性) 下面,选择 Add Group (添加组)。使用以下属性创建一个名为
KafkaSource的属性组:密钥 值 topic AmazonKafkaTutorialTopic bootstrap.servers 您以前保存的引导服务器列表security.protocol SSL ssl.truststore.location /usr/usr/usr/security/security/security/security/securityts ssl.truststore.password changeit 注意 默认证书的 ssl.truststore.password 为“changeit”;如果使用默认证书,则不需要更改该值。
再次选择 Add Group (添加组)。使用以下属性创建一个名为
KafkaSink的属性组:密钥 值 topic AmazonKafkaTutorialTopicDestination bootstrap.servers 您以前保存的引导服务器列表security.protocol SSL ssl.truststore.location /usr/usr/usr/security/security/security/security/securityts ssl.truststore.password changeit 交易.timeout.ms 1000 应用程序代码读取上述应用程序属性,以配置用于与 VPC 和 Amazon MSK 集群交互的源和接收器。有关使用属性的更多信息,请参阅运行时属性。
-
在 Snapshots (快照) 下面,选择 Disable (禁用)。这样,就可以轻松更新应用程序,而无需加载无效的应用程序状态数据。
-
在 Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)。
-
适用于CloudWatch 记录,选择启用”复选框。
-
在 Virtual Private Cloud (VPC) 部分中,选择要与应用程序关联的 VPC。选择与您的 VPC 关联的子网和安全组,您希望应用程序使用它们访问 VPC 资源。
-
选择 Update(更新)。
当你选择启用 CloudWatch 日志组和日Kinesis Data Analytics 和日志组日志组和日志志组日志组和日志志组日志 这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
该日志流用于监控应用程序。
运行应用程序
通过运行应用程序,打开 Apache Flink 控制面板,然后选择所需的 Flink 作业,可以查看 Flink 作业图。
测试应用程序
在本节中,您将记录写入到源主题。应用程序从源主题中读取记录,并将其写入到目标主题中。您可以将记录写入到源主题以及从目标主题中读取记录,以验证应用程序是否正常工作。
要写入和读取主题中的记录,请按照中的步骤操作步骤 6:生成和使用数据中的使用Amazon MSK 入门教程。
要从目标主题中读取,请在到集群的第二个连接中使用目标主题名称,而不是源主题:
bin/kafka-console-consumer.sh --bootstrap-serverBootstrapBrokerString--consumer.config client.properties --topic AmazonKafkaTutorialTopicDestination --from-beginning
如果在目标主题中没有任何记录,请参阅故障排除主题中的无法访问 VPC 中的资源一节。