前言

Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据、转换数据,然后将数据发送到您最喜欢的 “存储库” 中。

上面是官方对Logstash的解释,通俗讲logstash是一款日志收集器,收集的对象就是一些日志文件,比如服务器登陆日志、网站访问日志等等。然而由于各种各样日志的内容不同,格式也不太方便查看,因此我们可以是用logstash对日志进行收集和统一的过滤,使之变成可读性高的内容,方面我们的开发或者运维人员查看,从而有效的分析系统/项目运行的性能,做好监控和预警的准备工作等。

集中、转换和存储数据

就官方的解释而言,我们知道logstash的三大功能(个人理解),集中收集日志、转换/过滤内容、存储数据。在logstash配置文件中,我们会使用input、fileter、output来对此三大功能做定义和配置。
ELK日志系统 - Logstash篇

输入 - 采集各种样式、大小和来源的数据

数据往往以各种各样的形式,或分散或集中地存在于很多系统中。 Logstash 支持 各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
ELK日志系统 - Logstash篇

过滤/转换 - 实时解析和转换数据

数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

  • 利用 Grok 从非结构化数据中派生出结构
  • 从 IP 地址破译出地理坐标 将 PII 数据匿名化,完全排除敏感字段
  • 简化整体处理,不受数据源、格式或架构的影响

ELK日志系统 - Logstash篇

存储 - 选择您的存储库,导出您的数据

尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

ELK日志系统 - Logstash篇

安装

# 安装依赖
yum -y install java-openjdk-1.8.0

# 安装logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.1.rpm
rpm -ivh logstash-6.6.1.rpm

or

rpm -Uvh https://artifacts.elastic.co/downloads/logstash/logstash-6.6.1.rpm

or  使用yum

配置使用

安装完成之后,我们使用-e来测试一下。Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器。输入插件从数据源那里来(消费数据),过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。
ELK日志系统 - Logstash篇

下面是一个简单的示例,从标准输入到标准输出。

[root@openresty conf.d]# /usr/share/logstash/bin/logstash -e 'input { stdin {} } output { stdout {} }'
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can 
specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config 
which logs errors to the console
[WARN ] 2019-03-13 11:38:45.011 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules 
or command line options are specified
[INFO ] 2019-03-13 11:38:45.039 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.6.1"}
[INFO ] 2019-03-13 11:38:53.968 [Converge PipelineAction::Create<main>] pipeline - Starting pipeline 
{:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[INFO ] 2019-03-13 11:38:54.238 [Converge PipelineAction::Create<main>] pipeline - Pipeline started successfully 
{:pipeline_id=>"main", :thread=>"#<Thread:0x2cda5673 run>"}
The stdin plugin is now waiting for input:
[INFO ] 2019-03-13 11:38:54.358 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - 
Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2019-03-13 11:38:54.760 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
hello www.lianst.com
{
      "host" => "openresty",
   "message" => "hello www.lianst.com",
  "@version" => "1",
"@timestamp" => 2019-03-13T03:39:20.147Z
 }

在生产环境中,Logstash 的管道要复杂很多,可能需要配置多个输入、过滤器和输出插件。

input {
      # 日志来源
}

filter {
     # 过滤器
}

output {
    # 存储日志的目的地
}


常用插件及配置示例

输入插件

常规方法示例

logstash通常有三种输入插件:

  • 标准输入:stdin
  • 文件输入:file $PATH
  • 其他输入:beat、redis等
input {  # 三种输入方法不能共存,下面列子仅为示例。
      stdin {}  # 标准输入
      file {    # 文件输入
          path => ["/data/wwwlogs/*.log"]
          start_position => "beginning"
      }
       beats {  # beat输入
          port => 5044
     }
 }
 filter {
     # 过滤器
  }
  
output {
    # 存储日志的目的地
}

生产环境配置

在生产环境中,一般使用Filebeat来发送日志行到Logstash。Filebeat客户端是一个轻量级的、资源友好的工具,它从服务器上的文件中收集日志,并将这些日志转发到你的Logstash实例以进行处理。Filebeat设计就是为了可靠性和低延迟。Filebeat在主机上占用的资源很少,而且Beats input插件将对Logstash实例的资源需求降到最低。

注意:在一个典型的用例中,Filebeat和Logstash实例是分开的,它们分别运行在不同的机器上。在本文中,Logstash和Filebeat在同一台机器上运行。

1.安装filebeat

wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.1-x86_64.rpm
rpm -ivh filebeat-6.6.1-x86_64.rpm

2.配置filebeat.yml

[root@openresty ~]# cat /etc/filebeat/filebeat.yml  | grep "^\s*[^# \t].*$"
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /data/wwwlogs/*.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.kibana:
output.logstash:
  hosts: ["10.10.0.2:5044"]
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

过滤插件

logstash有众多过滤器插件,比如grok、geoip等等,下面是用grok插件演示。

编写配置文件:file-filter-output.conf

  
input{
     file {
      path => ["/data/wwwlogs/*.log"]
      start_position => "beginning"
 }
 }
filter{
   grok{
       match =>  {"message" => "%{NGINXACCESS}"}
       remove_field => "message"
}
   date {
       match => ["timestamp","dd/MMM/YYYY:H:m:s Z" ]
       timezone => "Asia/Shanghai"
       remove_field => "timestamp"
}
   geoip {
       source => "remote_ip"
       target => "geoip"
       database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"
       add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
       add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
  mutate {
      rename => {
        "agent" => "user_agent"
      }

}
}
output {
    stdout { codec => rubydebug }

}

执行/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/file-filter-output.conf -t检测配置文件语法是否有错误,没有的话就直接/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/file-filter-output.conf执行:

             "remote_ip" => "10.10.0.2",
                  "tags" => [
        [0] "_geoip_lookup_failure"
    ],
                  "host" => "openresty",
           "httpversion" => "1.1",
                 "bytes" => "8",
            "user_agent" => "\"curl/7.29.0\"",
               "request" => "/test12.html",
                  "path" => "/data/wwwlogs/host.access.log",
              "@version" => "1",
    "http_x_forward_for" => "\"185.105.159.83\""
}
{
                 "geoip" => {},
                "status" => "200",
                "method" => "GET",
            "@timestamp" => 2019-03-13T02:30:40.000Z,
               "referer" => "\"-\"",
             "remote_ip" => "10.10.0.2",
                  "tags" => [
        [0] "_geoip_lookup_failure"
    ],
                  "host" => "openresty",
           "httpversion" => "1.1",
                 "bytes" => "7",
            "user_agent" => "\"curl/7.29.0\"",
               "request" => "/test4.html",
                  "path" => "/data/wwwlogs/host.access.log",
              "@version" => "1",
    "http_x_forward_for" => "\"91.208.233.46\""
}
{
                 "geoip" => {},
                "status" => "200",
                "method" => "GET",
            "@timestamp" => 2019-03-13T02:30:41.000Z,
               "referer" => "\"-\"",
             "remote_ip" => "10.10.0.2",
                  "tags" => [
        [0] "_geoip_lookup_failure"
    ],
                  "host" => "openresty",
           "httpversion" => "1.1",
                 "bytes" => "8",
            "user_agent" => "\"curl/7.29.0\"",
               "request" => "/test13.html",
                  "path" => "/data/wwwlogs/host.access.log",
              "@version" => "1",
    "http_x_forward_for" => "\"115.85.242.137\""
}
{
                 "geoip" => {},
                "status" => "200",
                "method" => "GET",
            "@timestamp" => 2019-03-13T02:30:42.000Z,
               "referer" => "\"-\"",
             "remote_ip" => "10.10.0.2",
                  "tags" => [
        [0] "_geoip_lookup_failure"
    ],
                  "host" => "openresty",
           "httpversion" => "1.1",
                 "bytes" => "8",
            "user_agent" => "\"curl/7.29.0\"",
               "request" => "/test15.html",
                  "path" => "/data/wwwlogs/host.access.log",
              "@version" => "1",
    "http_x_forward_for" => "\"65.118.70.78\""
}
{
                 "geoip" => {},
                "status" => "200",
                "method" => "GET",
            "@timestamp" => 2019-03-13T02:30:43.000Z,
               "referer" => "\"-\"",
             "remote_ip" => "10.10.0.2",
                  "tags" => [
        [0] "_geoip_lookup_failure"

输出/存储插件

在正式的生产环境中,常见的输出方式有kafka、redis、elaticsearch、hadoop,这里我们使用elasticsearch。

编写file-filter-ela.conf配置文件:

input{
     file {
          path => ["/data/wwwlogs/*.log"]
          start_position => "beginning"
}
}
filter{
       grok{
           match =>  {"message" => "%{NGINXACCESS}"}
           remove_field => "message"
}
       date {
           match => ["timestamp","dd/MMM/YYYY:H:m:s Z" ]
           timezone => "Asia/Shanghai"
           remove_field => "timestamp"
}
       geoip {
           source => "remote_ip"
           target => "geoip"
           database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"
           add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
           add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
    }
      mutate {
          rename => {
            "agent" => "user_agent"
          }

    }
}

output {
    elasticsearch {
       hosts => ["10.10.0.3:9200","10.10.0.4:9200"]
        index => "logstash-%{+YYYY.MM.dd}"
        document_type => "nginx_logs"
}
}

依然使用/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/file-filter-ela.conf -t来检测一下语法,确认语法没有错误后执行/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/file-filter-ela.conf。

访问elasticsearch,搜索一下就可以看到数据。

[root@openresty ~]# curl http://node1:9200/logstash-*/_search?q=153.35.215.165 | jq .
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  8773  100  8773    0     0  84887      0 --:--:-- --:--:-- --:--:-- 85174
{
  "took": 98,
  "timed_out": false,
  "_shards": {
    "total": 10,
    "successful": 10,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 31,
    "max_score": 2.9549103,
    "hits": [
      {
        "_index": "logstash-2019.03.12",
        "_type": "nginx_logs",
        "_id": "WCirdGkBu_5TaHayAdky",
        "_score": 2.9549103,
        "_source": {
          "http_x_forward_for": "\"-\"",
          "host": "openresty",
          "user_agent": "\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36\"",
          "referer": "\"-\"",
          "@timestamp": "2019-03-12T09:43:39.000Z",
          "geoip": {
            "continent_code": "AS",
            "location": {
              "lon": 120.2625,
              "lat": 32.0142
            },
            "latitude": 32.0142,
            "coordinates": [
              "120.2625",
              "32.0142"
            ],
            "country_code3": "CN",
            "country_name": "China",
            "ip": "153.35.215.165",
            "timezone": "Asia/Shanghai",
            "city_name": "Jingjiang",
            "longitude": 120.2625,
            "region_name": "Jiangsu",
            "country_code2": "CN",
            "region_code": "JS"
          },
          "httpversion": "1.1",
          "path": "/data/wwwlogs/host.access.log",
          "method": "GET",
          "status": "404",
          "@version": "1",
          "bytes": "577",
          "request": "/test",
          "remote_ip": "153.35.215.165"
        }
      },