admin管理员组

文章数量:1122847

监控系统

技术栈:Filebeat+Kafka+Flink+ElasticSearch+Kibana+Micrometer等

文章目录

  • 前言
  • 一、组成部分简介
  • 二、组成部分详解
    • 1.business-log.jar
      • metric
      • tag
      • 命名规范
      • 指标上报
      • 核心代码
    • 2.Filebeat
    • 3.Kafka
    • 4.Flink
    • 5.Elasticsearch
    • 6.Kibana
  • 三、业务方接入使用
    • 1.引入pom坐标
    • 2.埋点
    • 3.添加日志配置
  • 总结


前言

需求背景:需要对系统进行业务监控,如支付系统需要监控各个支付渠道的各种交易类型各时间段的下单量、支付成功/失败量/率等。 通过构建监控系统我们可以知道系统各时段交易状况,同时预设多种指标,当触发预设指标时进行告警通知。说白了就是避免系统出问题自己都不知道,等上游系统发现问题找过来再去解决问题黄花菜都凉了。
一个完整的监控系统通常由数据采集、数据存储、数据查询和处理、告警以及可视化展示等多个模块组成。所以,要从头搭建一个监控系统,其实也是一个很大的系统工程(CPU、内存、磁盘等系统资源监控不在此次讨论范围)。
本博文带大家搭建一套业务监控系统,涉及流程如下,本章首先介绍虚线框内相关内容:


一、组成部分简介

本系统基于系统埋点通过产生的日志进行分析做准实时监控,基本架构还是ELK相关技术栈,各组成部分如下:
(1)business-log.jar 作为日志收集包引入到业务系统,标准化日志收集格式,降低使用复杂度,减少业务方工作量,为防止收集到的业务日志过大,jar包会聚合出指定时间各个tag纬度的值(内部实现封装了micrometer用来预聚合),没啥好说的。
(2)filebeat作为kafaka生产方,将系统通过business-log.jar 包产生的日志发送到Kafka,没啥好说的。
(3)flink作为kafaka消费方,对日志数据进行数据格式清洗以及聚合,没啥好说的。
(4)elasticSearch存储flink处理后的日志数据,没啥好说的。
(5)kibana查询elasticSearch,对数据进行展示,通过配置Dashboard进行展示,没啥好说的。

二、组成部分详解

1.business-log.jar

我们把一个业务指标抽象为metric,它由两部分组成,metrictag

metric

metric是一个由 “ . ” 分隔的字符串,它描述了这个metric的基本含义,例如 paycore.order.count 描述了paycore系统下单请求量这一指标。

tag

metrics的本质是在某个时间点上, 某个key的某个value的一个快照, 它是一个随时间变化的时间序列数据。而metrics如果只考虑单机上查看这一场景的话, 其实只有metric就已经够了, 因为只需要把每个时间点的数据展示出来就好了。我们的metrics数据需要支持按业务多纬度进行聚合。这些数据存到es中,为了更好的支持在es中进行动态聚合操作,引入了tag的概念。

tag由两部分组成, tagKey和tagValue。一个tag可以理解为对一组数据全集的一个完整划分, tagKey表示划分方式, tagValue表示划分后的值。例如交易下单的场景, 如果需要知道用户是从pc下单, 还是mobile下单。 那么按下单来源划分的话,可以把所有交易下单划分来自pc和来自mobile, 所以tagKey就是source, tagValue就是pc或者mobile。

tag的存在使得时间序列数据库中的数据聚合变得更灵活。我们把订单创建按照来源进行划分之后,当指定了source=pc这一查询条件时, 就可以方便的聚合出,某个时间段内来自于pc端的量,这一点类似于传统数据库的group by操作。当不指定source这一tag的时候, 也能聚合出总的创建量。

命名规范

metrictag
paycore.order.countinstCode=payAli、transCode=deduct、isSuccess=true…
paycore.order.exception.countinstCode=payAli、transCode=deduct…

指标上报

metrictag1:instCodetag2:transCodetag3:isSuccesstag4:data
paycore.order.countpayAlideducttrue2
paycore.order.countpayAlideductfalse3
paycore.order.countpayAlideputetrue5
paycore.order.countpayAliwappaytrue10
paycore.order.countpayAliwappayfalse1
paycore.order.countpayWxdeducttrue30

详细示例日志如下:

[root@ls79UBdNss business]# tail -f all.8081-30dt.log
{"appName":"paycore","data":1.0,"env":"dev","errcode":"G003","errmsg":"失败","hostName":"192.168.0.4","instCode":"payWx","isSuccess":"false","metric":"paycore.order.count","timestamp":1664023972652,"transCode":"depute","units":"次"}
{"appName":"paycore","data":2.0,"env":"dev","errcode":"G001","errmsg":"失败","hostName":"192.168.0.4","instCode":"payWx","isSuccess":"false","metric":"paycore.order.count","timestamp":1664023972652,"transCode":"wapPay","units":"次"}
{"appName":"paycore","data":2.0,"env":"dev","errcode":"G003","errmsg":"失败","hostName":"192.168.0.4","instCode":"payWx","isSuccess":"false","metric":"paycore.order.count","timestamp":1664023972652,"transCode":"wapPay","units":"次"}
{"appName":"paycore","data":3.0,"env":"dev","errcode":"G000","errmsg":"成功","hostName":"192.168.0.4","instCode":"payWx","isSuccess":"true","metric":"paycore.order.count","timestamp":1664023972652,"transCode":"wapPay","units":"次"}
{"appName":"paycore","data":1.0,"env":"dev","errcode":"G002","errmsg":"失败","hostName":"192.168.0.4","instCode":"payAli","isSuccess":"false","metric":"paycore.order.count","timestamp":1664023972652,"transCode":"wapPay","units":"次"}

核心代码

    private static void log() {try {Collection<Meter> meters = Search.in(Metrics.globalRegistry).meters();AtomicInteger i = new AtomicInteger();Iterator var2 = meters.iterator();while(var2.hasNext()) {Meter each = (Meter)var2.next();Iterable<Measurement> measure = each.measure();Map<String, Object> logs = new TreeMap();logs.put("appName", appName);logs.put("env", Env.getEnv());logs.put("hostName", ip);logs.put("timestamp", System.currentTimeMillis());logs.put("metric", each.getId().getName());List<Tag> tags = each.getId().getTags();tags.forEach((tag) -> {logs.put(tag.getKey(), tag.getValue());});measure.forEach((measurement) -> {double value = measurement.getValue();logs.put("data", value);});String logstr = JSON.toJSONString(logs);double data = (Double)logs.get("data");if (data > 0.0D) {logger.info(logstr);i.addAndGet(1);if (i.get() == 4096) {try {Thread.sleep(500L);i.set(0);} catch (InterruptedException var11) {}}}}} catch (Exception ex) {ex.printStackTrace();}}

2.Filebeat

是一个轻量级日志采集器,其工作流程如下:当启动 Filebeat 程序时,它会启动一个或多个查找器去检测指定的日志目录或文件。对于查找器 prospector 所在的每个日志文件,FIlebeat 会启动收集进程 harvester。 每个 harvester 都会为新内容读取单个日志文件,并将新日志数据发送到后台处理程序,后台处理程序会集合这些事件,最后发送集合的数据到 output 指定的目的地。
配置文件filebeat.yml示例如下:

filebeat.inputs:
- type: logfields: {ip: ipv4address, log_type: monitor, log_topic: monitor}fields_under_root: trueinput_type: logmultiline.match: after multiline.negate: false multiline.pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}paths: [/日志路径//*.log]tail_files: truesymlinks: truespool_size: 2048harvester_buffer_size: 65536multiline.max_lines: 1000
output.kafka:compression: gzipversion: "0.10.2.1"hosts: ["kafka服务器地址:9092"]max_message_bytes: 1000000partition.round_robin: {reachable_only: false}required_acks: 1bulk_max_size: 4096keep_alive: 10worker: 2topic: 队列名称
processors:
- rename:fields:- {from: "log.file.path", to: "source"}
- drop_fields:fields: [beat.hostname, beat.name, beat.version]

3.Kafka

作为大数据里首选消息中间件,没啥可以介绍的了。

4.Flink

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算,具体使用场景以及使用自行学习吧,我们此处指讲涉及到的相关内容。
flink作为kafka消费方,我们必须要知道经过filebeat将业务日志发送到kafka时数据格式。

业务系统打印的日志格式:
{"appName":"monitor","data":1.0,"env":"dev","errcode":"G000","errmsg":"成功","hostName":"192.168.0.4","instCode":"payAli","isSuccess":"true","metric":"pay.request.count","timestamp":1664035382807,"transCode":"deduct","units":"次"}
经过filebeat收集发送后的日志格式(此格式根据filebeat.yml配置可以变更):
{"@timestamp":"2022-09-24T16:03:03.550Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.4.2"},"log_type":"monitor","input":{"type":"log"},"ecs":{"version":"8.0.0"},"host":{"name":"ls79UBdNss"},"agent":{"type":"filebeat","version":"8.4.2","ephemeral_id":"868b12d1-463f-455b-b5e6-0f77ef3492a9","id":"38d72ecc-81ac-4e95-8a14-2e683773617c","name":"ls79UBdNss"},"source":"/home/finance/Logs/monitor/business/all.8081-30dt.log","message":"{\"appName\":\"monitor\",\"data\":1.0,\"env\":\"dev\",\"errcode\":\"G000\",\"errmsg\":\"成功\",\"hostName\":\"192.168.0.4\",\"instCode\":\"payAli\",\"isSuccess\":\"true\",\"metric\":\"pay.request.count\",\"timestamp\":1664035382807,\"transCode\":\"deduct\",\"units\":\"次\"}","log":{"offset":233,"file":{}},"ip":"ipv4address","log_topic":"monitor"}

当flink获取到如上格式的日志后进行清洗聚合,转换成我们需要的格式,并根据设定窗口时间输出,构建指定索引,将数据灌入到ES中,
核心代码如下:

SingleOutputStreamOperator<Map<String, Object>> streamOperator = stream.flatMap(new FlatMapFunction<KafkaRecord, Map<String, Object>>() {@Overridepublic void flatMap(KafkaRecord record, Collector<Map<String, Object>> out) throws Exception {if (!StringUtils.isEmpty(record.getValue())) {Map<String, Object> datamap = BusinessMetricParse.praseMsg(record.getValue());if (datamap != null) {out.collect(datamap);}} else {logger.info("收到的kafka消息为空");}}});SingleOutputStreamOperator<Tuple3<String, Map<String, Object>, Double>> keyStreamOperator = streamOperator.flatMap(new FlatMapFunction<Map<String, Object>, Tuple3<String, Map<String, Object>, Double>>() {@Overridepublic void flatMap(Map<String, Object> value, Collector<Tuple3<String, Map<String, Object>, Double>> out) throws Exception {Map<String, Object> data = new TreeMap(value);String k = (String)data.get("monitor_key");data.remove("monitor_key");data.remove("hostname");Tuple3<String, Map<String, Object>, Double> td = new Tuple3(k, data, (Double)data.get("data"));out.collect(td);}});SingleOutputStreamOperator<Tuple3<String, Map<String, Object>, Double>> data = keyStreamOperator.assignTimestampsAndWatermarks(new BusiTimeStamp()).keyBy(new int[]{0}).window(TumblingEventTimeWindows.of(Time.milliseconds(60000L))).sum(2);this.sendToEs(data, params, "busimetrics");env.execute("业务指标聚合->sink es");

5.Elasticsearch

存储到ES中的数据格式如下:

6.Kibana

配置索引相关信息


配置折线图:


以配置各渠道下各交易类型交易成功量折线图为例,其他什么成功率也是可以以此配置出来,想要什么自己配置吧,效果图如下:

三、业务方接入使用

监控平台一般是有专门团队进行开发维护,最终需要提供给业务方使用的,所以需要简单方式接入,降低业务方工作量,业务方只要进行埋点,不需要关心filebeat及之后的处理流程。

1.引入pom坐标

        <dependency><groupId>com.xxx</groupId><artifactId>xxx</artifactId><version>1.0</version></dependency>

2.埋点

 @RequestMapping("/logTest")public String  pay() {logger.info("请求下单");OrderResponse response = payService.pay();logger.info("请求下单结果:{}", JSON.toJSONString(response));RequestMonitorUtil.epayLogger(response.isSuccess,response.getTransCode(),response.getErrcode(),response.getErrmsg());return "Success";}/**
*工具类
*/
public class RequestMonitorUtil {public static void epayLogger(Boolean isSuccess,String transCode,String errcode,String errmsg){try{EpayLogger.setGlobalLoggerInterval(10);EpayLogger.getBuilder().units("次").metric("pay.request.count").addTag("isSuccess",isSuccess.toString()).addTag("transCode",transCode).addTag("errcode",errcode).addTag("errmsg",errmsg).build().increment(1);}catch (Exception e){e.printStackTrace();}}
}

3.添加日志配置

    <!--业务监控日志--><appender name="business-log" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_PATH}/business/all.${tomcat.port:-0000}-30dt.log</file><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><fileNamePattern>${LOG_PATH}/business/all.${tomcat.port:-0000}-30dt.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern><maxFileSize>1MB</maxFileSize><maxHistory>15</maxHistory> <!-- 记录近15天的日志 --></rollingPolicy><encoder><pattern>%msg%n</pattern></encoder></appender><!-- 添加异步日志包裹 --><appender name="async-business-log" class="com.kkk.business.log.util.log.LogbackAsyncAppender"><includeCallerData>true</includeCallerData><appender-ref ref="business-log"/></appender><logger name="com.kkk.business.log.util.log.EpayLogger" additivity="false" level="INFO"><appender-ref ref="async-business-log"/></logger>

只要三步,就完成了业务方的使用,是不是很简单就完成了业务监控。


总结

好久没有写博客了…

本文标签: 监控系统