从零开始搭建一套日志收集分析系统

系统模型

前段时间折腾了一下 ELK,将现有系统的日志重新进行收集分析的工作,最后成型的系统大致如下:

ELK-Stack

日志来源主要分两类,一类是新系统的日志,比如 golang 项目可以直接使用 logrus + kafka hook 将日志打到 kafka 消息队列中;
还有一类是旧系统的日志,因为这部分日志是存在机器上的 log 文件,在不改动原有实现的情况下只能采用第三方工具异步采集日志数据再打到 kafka 中,原先是用 rsyslog 采集日志数据,后面替换为 Filebeat,因为 Filebeat 可以记录日志源采集的偏移量,哪怕系统重启了也可以接着上一次的位置读取。

中间使用 Kafka 作为消息队列,一方面可以将数据持久化,另一方面可以作为缓冲。

后面使用 ELK 栈,logstash 订阅消费数据,ES 集群负责索引存储,最后 Kibana 聚合数据展示

搭建过程

logrus kafka hook

package main

import (
    "os"
    "log"
    "time"
    "strings"
    "math/rand"
    "crypto/tls"

    "github.com/sirupsen/logrus"
    "github.com/Shopify/sarama"
)

type KafkaHook struct {
    Topic       string
    AddHostname bool
    Hostname    string
    levels      []logrus.Level
    Formatter   logrus.Formatter
    Producer    sarama.AsyncProducer
}

func NewKafakaHook(levels []logrus.Level, formatter logrus.Formatter, topic string, brokers string, addHostname bool, tls *tls.Config) (*KafkaHook, error) {
    var (
        err error
        producer sarama.AsyncProducer
    )

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal
    // config.Producer.Compression = sarama.CompressionSnappy
    config.Producer.Flush.Frequency = 500 * time.Millisecond
    config.Producer.Timeout = 5 * time.Second

    if tls != nil {
        config.Net.TLS.Enable = true
        config.Net.TLS.Config = tls
    }

    producer, err = sarama.NewAsyncProducer(strings.Split(brokers, ","), config)
    if err != nil {
        return nil, err
    }

    go func() {
        errs := producer.Errors()
        for {
            select {
            case err := <-errs:
                if err != nil {
                    log.Printf("Failed to send log entry to Kafka: %v\n", err)
                }
            }
        }
    }()

    hostname, err := os.Hostname()
    if err != nil {
        hostname = "localhost"
    }

    hook := &KafkaHook{
        topic,
        addHostname,
        hostname,
        levels,
        formatter,
        producer,
    }

    return hook, nil
}

func (kh *KafkaHook) Fire(entry *logrus.Entry) error {
    var (
        bt []byte
        err error
    )

    if bt, err = entry.Time.MarshalBinary(); err != nil {
        return err
    }

    if kh.AddHostname {
        if _, ok := entry.Data["host"]; !ok {
            entry.Data["host"] = kh.Hostname
        }
    }

    if bt, err = kh.Formatter.Format(entry); err != nil {
        return err
    }

    value := sarama.ByteEncoder(bt)

    kh.Producer.Input() <- &sarama.ProducerMessage{
        Topic: kh.Topic,
        Value: value,
    }

    return nil
}

func (kh *KafkaHook) Levels() []logrus.Level {
    return kh.levels
}

func main() {
   hook, err := NewKafakaHook(
      []logrus.Level{
         logrus.DebugLevel,
         logrus.InfoLevel,
         logrus.ErrorLevel,
         logrus.WarnLevel,
      },
      &logrus.JSONFormatter{},
      "log-dev",
      "10.9.X.A:9092,10.9.X.B:9092,10.9.X.C:9092",
      true,
      nil,
   )
   if err != nil {
      log.Println(err)
   }

   logger := logrus.New()
   logger.SetLevel(logrus.DebugLevel)
   logger.Hooks.Add(hook)

   n := 0
   for {
      n++
      r := rand.Intn(1<<10)
      time.Sleep(time.Millisecond * time.Duration(r))
      switch {
      case r < 512: logger.Info("this is an info message " + strconv.Itoa(n))
      case r >= 512 && r < 869: logger.Warn("this is a warn message " + strconv.Itoa(n))
      case r >= 869 && r < 985: logger.Debug("this is a debug message " + strconv.Itoa(n))
      case r >= 985: logger.Error("this is an error message " + strconv.Itoa(n))
      default:
         logger.Warn("unexpected message")
      }
   }
}

Filebeat

在需要抓取日志的机器上安装并配置 Filebeat

安装

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation.html

以 ubuntu 为例:

$ curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-amd64.deb
$ sudo dpkg -i filebeat-6.3.2-amd64.deb

最好使用 6.3 版本,我试过升级到 6.4 结果原先的配置会出问题

启动和停止

$ sudo service filebeat start # sudo /etc/init.d/filebeat start
$ sudo service filebeat restart # sudo /etc/init.d/filebeat restart
$ sudo service filebeat stop # sudo /etc/init.d/filebeat stop

目录: https://www.elastic.co/guide/en/beats/filebeat/current/directory-layout.html

默认路径:

  • home: /usr/share/filebeat
  • bin: /usr/share/filebeat/bin
  • config: /etc/filebeat
  • data: /var/lib/filebeat
  • logs: /var/log/filebeat

配置

编辑 filebeat.yml 文件添加输入、输出等配置,比如

$ sudo vi /etc/filebeat/filebeat.yml

## Log Input 相关属性设置: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/req/req-worker-*.log # 文件名和目录名都可以用通配符,不过 path/*/*.log 不包括 path 根目录的文件
    fields: # 添加额外的字段
      log_topic: "log-test"
      service: "backend-req"
    fields_under_root: true # field 字段会放在根索引下,否则会放在 fields 字段下
    ignore_older: 24h # 忽略 24 小时前的文件
    scan_frequency: 11s # 设置不同的时间,这样可以错开扫描高峰
    max_backoff: 11s
    backoff: 11s
    harvester_buffer_size: 51200 # 采集的 buffer 大小
    close_timeout: 1h # 因为我这里的文件是一个小时产生一个,所以直接设置采集器默认 1 小时关闭
    clean_inactive: 25h # 需要大于 ignore_older + scan_frequency,有效地清理可以减小 registry 文件的大小和当中记录的文件条目数量
    harvester_limit: 10 # 限制最多爬取个数,默认不限制,如果碰到文件数很多一开始会占用大量 cpu
  - type: log
    enabled: true
    paths:
      - /var/log/push/push-worker-*.log
    fields:
      log_topic: "log-test"
      service: "backend-push"
    fields_under_root: true
    ignore_older: 24h
    scan_frequency: 19s
    max_backoff: 19s
    backoff: 19s
    harvester_buffer_size: 51200
    close_timeout: 1h
    clean_inactive: 25h
    harvester_limit: 10
  - type: log
    enabled: true
    paths:
      - /var/log/send/send-worker-*.log
    fields:
      log_topic: "log-test"
      service: "backend-send"
    fields_under_root: true
    ignore_older: 24h
    scan_frequency: 13s
    max_backoff: 13s
    backoff: 13s
    harvester_buffer_size: 51200
    close_timeout: 1h
    clean_inactive: 25h
    harvester_limit: 10
  - type: log
    enabled: true
    paths:
      - /var/log/sync/sync-worker-*.log
    fields:
      log_topic: "log-test"
      service: "backend-sync"
    fields_under_root: true
    ignore_older: 24h
    scan_frequency: 17s
    max_backoff: 17s
    backoff: 17s
    harvester_buffer_size: 51200
    close_timeout: 1h
    clean_inactive: 25h
    harvester_limit: 10
  - type: log
    enabled: true
    paths:
      - /var/log/delete/delete-worker-*.log
    fields:
      log_topic: "log-test"
      service: "backend-delete"
    fields_under_root: true
    ignore_older: 24h
    scan_frequency: 71s
    max_backoff: 71s
    backoff: 71s
    harvester_buffer_size: 51200
    close_timeout: 1h
    clean_inactive: 25h
    harvester_limit: 10
  - type: log
    enabled: true
    paths:
        - /var/log/php/error.log
    fields:
        log_topic: "log-im"
        service: "backend-php-error"
    fields_under_root: true
    ignore_older: 24h
    scan_frequency: 23s
    max_backoff: 23s
    backoff: 23s
    clean_inactive: 25h
    close_renamed: true # 因为这个日志文件会定期重命名并压缩,所以设置 close_renamed 可以关闭采集器
    multiline.pattern: '^\[[0-9]{2}-[A-Z]{1}[a-z]{2}-[0-9]{4}' # 合并多行日志,将类似 [06-Sep-2018 ... 开头的日志向后合并
    multiline.negate: true
    multiline.match: after
    harvester_limit: 10

## Kafka Output 相关属性设置: https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
output.kafka:
  enabled: true
  hosts: ["10.10.X.A:9092", "10.10.X.B:9092", "10.10.X.C:9092"]
  topic: '%{[log_topic]}'
  codec.format:
    string: '%{[beat][hostname]} %{[service]} %{[message]}' # 传给 logstash 时可以用 grok 过滤插件设置 `match => { "message" => "^%{DATA:hostname} %{DATA:service} (?<message>.*)"}` 解析
    # 如果设成 ‘%{[message]}’,则是原样转发消息
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: none # 默认是 gzip,如果像节省开销可以设成 none
  bulk_max_size: 100
  max_message_bytes: 1000000 # 不要超过 kafka server 端设置的 message.max.size,否则超过的部分会被丢弃

output.file:
  enabled: false
  path: /home/ubuntu/test-log
  filename: output.log
  permissions: 0644
  codec.format:
    string: '%{[message]}'

Filebeat 配置详解

假设 Filebeat 要抓取的是非结构化日志格式:

Mon Sep 10 2018 12:06:16 GMT+0800 (CST) - info: type: SyncSessionRangeService  uid:9182093812301298392103{}

18-09-07 08:19:43 - info: GRPC REQUEST & RESPONSE{
        "uid": 1290381298494,
        "type": "SyncSessionRangeService",
        "data": {
            "session_id": 123456,
            "session_type": 2,
            "start_id": 0,
            "stop_id": 3
        },
        "localId": 349058034598,
        "result": {
            "type": 5,
            "is_sync": true,
            "is_ack": true,
            "require_ack": false,
            "error": 0,
            "continue": false,
            "sync_local_id": 0,
            "messages": []
        }
    }

18-09-06 22:07:33 - info: type: SyncSessionRangeService  uid:20373933{}

18-09-07 08:19:43 - info: GRPC REQUEST & RESPONSE{
            "uid": 5792715,
            "type": "SyncSessionRangeService",
            "data": {
                "session_id": 13416794,
                "session_type": 2,
                "start_id": 0,
                "stop_id": 3
            },
            "localId": 1536279501,
            "result": {
                "type": 5,
                "is_sync": true,
                "is_ack": true,
                "require_ack": false,
                "error": 0,
                "continue": false,
                "sync_local_id": 0,
                "messages": []
            }
        }

[08/Sep/2018:01:49:55 +0800] check photo https://sgchatfiles.bldimg.com/2018/9/8/1/49/16263641_1536342594629.jpg

[08/Sep/2018:01:49:55 +0800] check "result": {
    "type": 5,
    "is_sync": true,
    "is_ack": true,
    "require_ack": false,
    "error": 0,
    "continue": false,
    "sync_local_id": 0,
    "messages": []
}

对于多行日志,使用 multiline.pattern 处理: https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html

比如:

  • Mon Sep 10 2018 12:06:16 GMT+0800 (CST) -> ‘[T|M|W|S|F]{1}[a-z]{2} [J|F|M|A|S|O|N|D]{1}[a-z]{2} [0-9]{2} [0-9]{4} [0-9]{2}:[0-9]{2}:[0-9]{2} [GMT+]{4}[0-9]{4} [(CST)]{5}’
  • [06-Sep-2018 … -> ‘^[[0-9]{2}-[A-Z]{1}[a-z]{2}-[0-9]{4}’
  • 18-09-07 08:19:43 -> ‘^[0-9]{2}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}’

更新文件记录: https://www.elastic.co/guide/en/beats/filebeat/current/migration-registry-file.html

常见问题(特别是 “Too many open file handlers” 问题的相关配置): https://www.elastic.co/guide/en/beats/filebeat/current/faq.html

Kafka

kafka 集群我是用的现有的集群,自己并没有试过,搭建过程大概可以参看 http://dearcharles.cn/2017/11/23/%E5%9F%BA%E4%BA%8EDocker%E6%90%AD%E5%BB%BA%E5%88%86%E5%B8%83%E5%BC%8F%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97Kafka/

我就先不详细写了,怕出现偏差

ELK

ELK 主要是以 docker-elk 为模板进行搭建,选择的版本是 6.3

不过该项目是搭建在单节点上的配置,如果是用在生产环境的多主机上要修改很多地方,这是我修改的 docker-elk

文件目录

$ tree .
.
├── LICENSE
├── README.md
├── docker-compose.yml
├── elasticsearch
│   ├── Dockerfile
│   └── config
│       └── elasticsearch.yml
├── elasticsearch-reference.md
├── extensions
│   ├── README.md
│   ├── curator
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   ├── config
│   │   │   ├── curator.yml
│   │   │   └── delete_log_files_curator.yml
│   │   ├── curator-compose.yml
│   │   └── entrypoint.sh
│   └── logspout
│       ├── Dockerfile
│       ├── README.md
│       ├── build.sh
│       ├── logspout-compose.yml
│       └── modules.go
├── kibana
│   ├── Dockerfile
│   └── config
│       └── kibana.yml
├── logstash
│   ├── Dockerfile
│   ├── config
│   │   └── logstash.yml
│   └── pipeline
│       └── logstash.conf
└── logstash-reference.md


docker-compose.yml

version: '2.2'

services:
  elasticsearch:
    build:
      context: elasticsearch/
    container_name: elasticsearch
    volumes:
      - /data/elasticsearch/data:/usr/share/elasticsearch/data ## es 数据
      - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:ro
    ports:
      - "9200:9200"
      - "9300:9300"
    cpus: 1.5 # 限制 cpu 使用率,比如两核的 cpu 设置 1.5 代表最多占用 1.5/2 即 75% 的 cpu,需要 docker-compose 1.16+ 才支持
    mem_limit: 1g # 限制内存最高占用
    ulimits: # 必须开启此项 bootstrap.memory_lock: true 才能生效
      memlock:
        soft: -1
        hard: -1
    environment:
      ES_JAVA_OPTS: "-Xmx512m -Xms512m" # java 堆内存大小
      PUBLISH_HOST: $ELK_HOST # host ip
      CLUSTER_NAME: $ELK_CLUSTERNAME
      NODE_NAME: $ELK_NODENAME
    restart: always
    network_mode: "host"

  logstash:
    build:
      context: logstash/
    container_name: logstash
    volumes:
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./logstash/pipeline:/usr/share/logstash/pipeline:ro
    ports:
      - "5000:5000"
    cpus: 1.5
    mem_limit: 1g
    environment:
      LS_JAVA_OPTS: "-Xmx512m -Xms512m"
    restart: always
    network_mode: "host"
    depends_on:
      - elasticsearch

  kibana:
    build:
      context: kibana/
    container_name: kibana
    volumes:
      - ./kibana/config/:/usr/share/kibana/config:ro
    ports:
      - "5601:5601"
    restart: always
    network_mode: "host"
    depends_on:
      - elasticsearch

  cerebro:
    image: yannart/cerebro
    container_name: cerebro
    restart: always
    ports:
      - "9000:9000"
    network_mode: "host"
    depends_on:
      - elasticsearch

  curator:
    build:
      context: extensions/curator/
    container_name: curator
    environment:
      ELASTICSEARCH_HOST: $ELK_HOST
      CRON: "30 0 * * *" # 每天 0 点 30 分清理 ${UNIT_COUNT} 天之前的索引
      CONFIG_FILE: /usr/share/curator/config/curator.yml
      COMMAND: /usr/share/curator/config/delete_log_files_curator.yml
      UNIT_COUNT: 10
    network_mode: "host"
    restart: always
    depends_on:
      - elasticsearch

logstash

logstash.conf 的配置都一样,在这里只是作为同一个 group 里的消费者,按一定方式分摊消费 kafka 队列中的消息

配置示例:

input {
    kafka {
        bootstrap_servers => "10.10.X.A:9092,10.10.X.B:9092,10.10.X.C:9092"
        consumer_threads => 6 # 一般设为 Partition 的数目即可
        decorate_events => false
        topics => ["log-im"]
        group_id => "group-log-im"
        type => "log-im"
    }
    kafka {
        bootstrap_servers => "10.9.X.A:9092,10.9.X.B:9092,10.9.X.C:9092"
        consumer_threads => 8
        decorate_events => false
        topics => ["log-dev"]
        group_id => "group-log-dev"
        type => "log-dev"
    }
}

## Add your filters / logstash plugins configuration here
filter {
    mutate {
        remove_field => [ "@version" ]
    }
    if [type] == "log-im" {
        # grok 插件可以将非结构化日志解析成结构化的
        # 测试工具: http://grokdebug.herokuapp.com
        # 推荐阅读: https://www.elastic.co/blog/do-you-grok-grok
        grok {
            match => { "message" => "^%{DATA:hostname} %{DATA:service} (?<message>.*)" }
            overwrite => [ "message" ]
        }
        grok {
            match => {
                "message" => [
                    "^\[%{DATA:time}\] (?<message>.*)",
                    "^(?<time>[T|M|W|S|F]{1}[a-z]{2} [J|F|M|A|S|O|N|D]{1}[a-z]{2} [0-9]{2} [0-9]{4} [0-9]{2}:[0-9]{2}:[0-9]{2} [GMT+]{4}[0-9]{4} [(CST)]{5}) - %{LOGLEVEL:level}: (?<message>.*)",
                    "^(?<time>[0-9]{2}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) - %{LOGLEVEL:level}: (?<message>.*)",
                    "^(?<message>.*)"
                ]
            }
            overwrite => [ "message" ]
        }
        json {
            source => "message"
            skip_on_invalid_json => true
            remove_field => [ "message" ]
        }
        date {
            match => [ "time", "ISO8601", "EEE MMM dd yyyy HH:mm:ss 'GMT'Z '(CST)'", "dd/MMM/yyyy:HH:mm:ss Z", "yy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm:ss ZZZ", "yyyy-MM-dd HH:mm:ss ZZ", "yyyy-MM-dd HH:mm ZZ" ]
            remove_field => [ "time" ]
        }
        if [service] == "backend-php-error" {
            mutate {
                add_field => { "level" => "error" }
            }
        }
    }
}

output {
    ## 根据 type 区分不同的 topic 日志,然后存入不同的 index 中,方便 Kibana 聚合
    if [type] == "log-im" {
        elasticsearch {
            hosts => ["localhost:9200", "10.42.75.252:9200", "10.42.111.182:9200"]
            index => "%{type}-%{+YYYY.MM.dd}" # 最好使用日期,一天一个索引,这样方便删除旧索引
        }
    }
    if [type] == "log-dev-2" {
        elasticsearch {
            hosts => ["localhost:9200", "10.42.75.252:9200", "10.42.111.182:9200"]
            index => "log-dev-2"
        }
    }
}

logstash.yml:

## Default Logstash configuration from logstash-docker.
## from https://github.com/elastic/logstash-docker/blob/master/build/logstash/config/logstash-oss.yml
http.host: "0.0.0.0"
path.config: /usr/share/logstash/pipeline

pipeline.workers: 2 # 设置成 cpu 核数或者其倍数
pipeline.output.workers: 2
pipeline.batch.size: 1000
pipeline.batch.delay: 10

Elasticsearch

ES 的集群设置会麻烦点,ES 节点有很多种角色,比如常见的 master、data、injest 节点,我这里是将三个节点作为 master 和 injest,另外三个作为 data 节点。

关于不同节点的作用和脑裂选项见 https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html

重要配置项: https://www.elastic.co/guide/cn/elasticsearch/guide/current/important-configuration-changes.html

参考配置项: https://github.com/wangriyu/docker-elk/blob/master/elasticsearch/config/reference.md

elasticsearch.yml:

## Default Elasticsearch configuration from elasticsearch-docker.
## from https://github.com/elastic/elasticsearch-docker/blob/master/build/elasticsearch/elasticsearch.yml
cluster.name: ${CLUSTER_NAME}

node.name: ${NODE_NAME}
node.master: true

network.host: 0.0.0.0
network.publish_host: ${PUBLISH_HOST}

## https://www.elastic.co/guide/en/elasticsearch/reference/6.3/setup-configuration-memory.html
## 最好按照官方说的关闭内存交换并设置此项为 true,容器的话需要设置 ulimit
bootstrap.memory_lock: true

## 集群单播 ip,添加机器时需要加进此列表中
discovery.zen.ping.unicast.hosts: ["10.42.X.A:9300", "10.42.X.B:9300", "10.42.X.C:9300"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s

## 设置此项,当 fielddata 达到预设值时会清除旧数据,有利于新数据的写入
indices.fielddata.cache.size: 30%

## 避免集群整个重启时频繁交换分片的问题,根据数量配置
gateway.recover_after_nodes: 2
gateway.expected_nodes: 3
gateway.recover_after_time: 5m

# 防止脑裂,推荐设成: 候选主节点数除二加一
# recommend set: (masters) / 2 + 1
discovery.zen.minimum_master_nodes: 2

Cerebro 是一套 webUI,可以查看 ES 集群和节点的信息:

Cerebro

Cerebro

Kibana

Kibana 默认访问同一主机上的 ES 节点即可

kibana.yml:

## Default Kibana configuration from kibana-docker.
## from https://github.com/elastic/kibana-docker/blob/master/build/kibana/config/kibana.yml
server.name: kibana
server.host: "0"
elasticsearch.url: http://localhost:9200
elasticsearch.requestTimeout: 90000

Kibana 使用说明:

第一次访问 5601 端口页面时首先点击侧边栏 Management,然后点击 Index Patterns,获取日志源.

Kibana-1

之后也可以在这里添加新的源,当日志字段更新,点击右上角刷新字段,否则筛选时可能找不到未匹配的字段。

Kibana-2

获取日志源后点击侧边栏 Discover,查看日志流,如果数据不动,将右上角 Auto Refresh 设置为 5s,并点击其右边选择一段 Time Range。

Kibana-3

顶部搜索栏支持过滤,比如 level:error 等,也可以点击 add filter 进行筛选,支持自定义查询语句

点击侧边栏 Visualize,可以进行数据可视化操作,比如下图设置 info 日志的折线图:

Kibana-4

设置好图表后点击顶部的 save 命名保存。

点击侧边栏的 Dashboard,可以添加展示面板,然后把我们上一步设置的图表放上去。

Kibana-5

最后可以用 nginx 做个负载均衡,将三个 Kibana 加到 upstream 中,即使一个不可访问也可以访问其他节点的,Kibana 设置的图形和面板是共享的

References

文章目录
  1. 系统模型
  2. 搭建过程
    1. logrus kafka hook
    2. Filebeat
      1. 安装
      2. 配置
    3. Kafka
    4. ELK
      1. docker-compose.yml
      2. logstash
      3. Elasticsearch
      4. Kibana
  3. References