本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将自定义指标与Amazon Kinesis Data Analytics 结合使用自定义指标
Kinesis Data Analytics CloudWatch,包括资源使用率和吞吐量的指标。此外,您可以创建自己的指标来跟踪特定于应用程序的数据,例如处理事件或访问外部资源。
工作方式
Kinesis Data Analytics 中的自定义指标使用 Apache Flink 指标系统。Apache Flink 指标具有以下属性:
类型:指标的类型描述了它如何衡量和报告数据。可用的 Apache Flink 量度类型包括计数、量规、直方图和仪表。有关 Apache Flink 指标类型的更多信息,请参阅。指标类型
. 注意 Amazon CloudWatch 指标不支持直方图 Apache Flink 量度类型。 CloudWatch 只能显示 Apache Flink 计数、仪表盘和计量器类型的指标。
Scope: 指标的范围由其标识符和一组键值对组成,这些键值对指示指标将如何报告给 CloudWatch. 指标的标识符包含以下各项:
系统范围,表示报告指标的级别(例如 Operator)。
用户范围,用于定义诸如用户变量或指标组名称之类的属性。这些属性是使用定义的
MetricGroup.addGroup(key, value)要么 MetricGroup.addGroup(name).
有关指标范围的更多信息,请参阅范围
.
有关 Apache Flink 指标的更多信息,请参阅。指标
要在适用于 Apache Flink 的 Kinesis Data Analytics 应用程序中创建自定义指标,您可以从任何扩展的用户函数访问 Apache Flink 指标系统RichFunction通过调用GetMetricGroupKinesisAnalytics到 CloudWatch. 您定义的自定义指标具有以下特征:
您的自定义指标有指标名称和组名称。这些名称必须由字母数字字符组成。
您在用户作用域中定义的属性(除了
KinesisAnalytics量度组)发布为 CloudWatch 维度。自定义指标发布在
Application默认是级别。维度(任务/运算符/并行度)根据应用程序的监视级别添加到指标中。您可以使用设置应用程序的监视级别MonitoringConfigurationParameterCreateApplication动作,或者或MonitoringConfigurationUpdateParameterUpdateApplicationaction.
示例
以下代码示例演示了如何创建映射类、如何创建和递增自定义指标,以及如何通过将映射类添加到DataStream对象。
记录计数自定义指标
以下代码示例演示如何创建一个映射类,该类用于创建对数据流中的记录进行计数的量度(功能与numRecordsIn指标):
private static class NoOpMapperFunction extends RichMapFunction<String, String> { private transient int valueToExpose = 0; private final String customMetricName; public NoOpMapperFunction(final String customMetricName) { this.customMetricName = customMetricName; } @Override public void open(Configuration config) { getRuntimeContext().getMetricGroup() .addGroup("kinesisanalytics") .addGroup("Program", "RecordCountApplication") .addGroup("NoOpMapperFunction") .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } }
在上一示例中,valueToExpose变量为应用程序处理的每条记录递增。
定义映射类后,您可以创建一个实现映射的应用程序内流:
DataStream<String> noopMapperFunctionAfterFilter = kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));
有关此应用程序的完整代码,请参阅记录计数自定义指标应用程序
字数自定义指标
以下代码示例说明了如何创建映射类,以创建对数据流中的字数进行计数的量度:
private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext().getMetricGroup() .addGroup("kinesisanalytics") .addGroup("Service", "WordCountApplication") .addGroup("Tokenizer") .counter("TotalWords"); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>>out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { counter.inc(); out.collect(new Tuple2<>(token, 1)); } } } }
在上一示例中,counter对于应用程序处理的每个单词,变量都会递增。
定义映射类后,您可以创建一个实现映射的应用程序内流:
// Split up the lines in pairs (2-tuples) containing: (word,1), and // group by the tuple field "0" and sum up tuple field "1" DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1); // Serialize the tuple to string format, and publish the output to kinesis sink wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());
有关此应用程序的完整代码,请参阅字数统计自定义指标应用程序
查看自定义指标
应用程序的自定义指标将显示在 CloudWatch 中的指标控制台AWS/KinesisAnalytics控制面板,在应用程序指标组。