现在假设我们有如下的程序
env.addSource(consumer) .flatMap(new IteblogFlatMapper(BLACK_TABLE, HASH_TABLE)).name("My Map") .addSink(new ElasticsearchSink[IteblogEntry](config, esServers, new IteblogElasticsearchSinkFunction, new IteblogActionRequestFailureHandler)).name("es")
我们把这个程序的metric信息发送到Graphite监控系统中:
metrics.reporters: grph metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.grph.host: www.iteblog.com metrics.reporter.grph.port: 2003 metrics.reporter.grph.protocol: TCP
当我们运行这个程序,相关的变量(<host>,<operator_name>以及<subtask_index>等)会替换成下面的值:
<job_id> -> 9e81f84c50820c304b8af2b16fa8140b <task_id> -> cbc357ccb763df2852fee8c4fc7d55f2 <task_attempt_id> -> 690a6cebdd1bfa9edacfed50aa1d4807 <host> -> iteblog.com <operator_name> -> Sink: es <task_name> -> Source: Custom Source -> My Map -> Sink: es <task_attempt_num> -> 0 <job_name> -> MyFlinkJobs <tm_id> -> 34d31d54c0031328a6ec8910e571a7f8 <subtask_index> -> 0
所以metrics.scope.task
(默认值:<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>)范围指标的名字将会变成下面的路径:
flinkjobs.iteblog.com.taskmanager.34d31d54c0031328a6ec8910e571a7f8.MyFlinkJobs.Source: Custom Source -> TestFlat -> Map -> Sink: es.0
注意看<task_name>变量的值为Source: Custom Source -> My Map -> Sink: es
,而在Graphite系统中,其不支持空格、>以及:等特殊字符,因为指标名最终会以文件名的形式存储到文件系统中。metrics.scope.task
范围内的所有指标将无法在Graphite中显示。
解决这个问题主要两两种方法:
metrics.scope.task
属性的默认值;GraphiteReporter
类的相关代码。metrics.scope.task
属性的默认值这种方法最简单了,针对本文的情况,我们可以不将<task_name>变量写到指标名中,使得指标名中无特殊字符,比如我们做出如下修改(在 $FLINK_HOME/conf/flink-conf.yaml
文件里面修改):
metrics.scope.task: <host>.taskmanager.<tm_id>.<job_name>.custom_task_name.<subtask_index>
这样运行上面程序的时候,metrics.scope.task
属性最终的值如下:
flinkjobs.iteblog.com.taskmanager.34d31d54c0031328a6ec8910e571a7f8.MyFlinkJobs.custom_task_name.0
这样整个指标名中都无特殊字符,我们就可以在Graphite系统中看到相应的指标。
GraphiteReporter
类的相关代码上面那种方法我们将 <task_name> 变量的值写死了,有时候这并不是我们想要的,所有这时候我们可以修改 GraphiteReporter
类。GraphiteReporter
类继承自 org.apache.flink.dropwizard.ScheduledDropwizardReporter
,ScheduledDropwizardReporter
类中有个 filterCharacters
函数,我们可以在这个函数里将特殊字符全部替换掉,下面是一种实现:
@Override public String filterCharacters(String str) { char[] chars = null; final int strLen = str.length(); int pos = 0; for (int i = 0; i < strLen; i++) { final char c = str.charAt(i); switch (c) { case '>': case '<': case '"': // remove character by not moving cursor if (chars == null) { chars = str.toCharArray(); } break; case ' ': if (chars == null) { chars = str.toCharArray(); } chars[pos++] = '_'; break; case ',': case '=': case ';': case ':': case '?': case '\'': case '*': if (chars == null) { chars = str.toCharArray(); } chars[pos++] = '-'; break; default: if (chars != null) { chars[pos] = c; } pos++; } } return chars == null ? str : new String(chars, 0, pos); }
我们把
<
,>
以及"
特殊字符直接去掉了,
,=
,;
, :
, ?
,\
以及*
特殊字符替换成-
_
然后编译相关的模块,并替换flink-metrics-graphite-1.3.1.jar,我们再运行上面的例子,这时候metrics.scope.task
属性最终的值如下:
flinkjobs.iteblog.com.taskmanager.34d31d54c0031328a6ec8910e571a7f8.MyFlinkJobs.Source: Custom_Source_-_TestFlat_-_Map_-_Sink: es.0
这时候我们也可以在Graphite监控系统中看到相关的数据了。
本博客文章除特别声明,全部都是原创!