Question

I am very new to logstash. I can just run logstash jar file and see a kibana web page. It's cool~~

Now, I want to change a following line (syslog message) to the next line.

Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40
==>
{ 'timestamp': 'Feb 19 18:45:29', 
  'host': 'SD550', 0x1000:10, 0x1001:20, 0x1002:30, 0x1003:40 }

In log message, '0x1000' is a starting register address, '4' is the number of register values, and next values are just value. So, that means 0x1000:10, 0x1001:20, 0x1002:30, 0x1003:40. An important point is that the number of register values is able to change. As a result, the length of log message can be variable. Even though it has any length, I'd like to get a proper result. (e.g., 0x2000,2,12,22 ==> 0x2000:12, 0x2001:22)

This is my incomplete config file for logstash. I found some filters such as grok, mutate and extractnumbers. But, I don't know how to do what I want to do.

input { 
  file { 
        path => "/var/log/syslog"
        type => "syslog"
  } 
}

filter {
   ???
}

output {
  elasticsearch { }
}

I know I want a lot, sorry guys. In addition, My final goal is to draw a TIME(x)/VALUE(y) chart for a specific register in kibana. Is it possible? Can I have some advice from you?

Thank you, Youngmin Kim

Was it helpful?

Solution 2

Thank you everybody who answers my question.. Especially, Ben Lim.

With your help, I got this result.

{
      "@version" => "1",
    "@timestamp" => "2014-02-20T11:07:28.125Z",
          "type" => "syslog",
          "host" => "ymkim-SD550",
          "path" => "/var/log/syslog",
            "ts" => "Feb 20 21:07:27",
          "user" => "ymkim",
          "func" => "REG",
          "8192" => 16,
          "8193" => 32,
          "8194" => 17,
          "8195" => 109
}

from $ logger REG,2000,4,10,20,11,6d

This is my config file.

input { 
  file { 
        path => "/var/log/syslog"
        type => "syslog"
  } 
}

filter {
  grok {
        match => ["message", "%{SYSLOGTIMESTAMP:ts} %{SYSLOGHOST:hostname} %{WORD:user}: %{WORD:func},%{WORD:address},%{NUMBER:regNumber},%{GREEDYDATA:regValue}"]
  }

  if [func] == "REG" {  
      modbus_csv {
          start_address => "address"
          num_register => "regNumber"
          source => "regValue"
          remove_field => ["regValue", "hostname", "message", 
                "address", "regNumber"]
      }
  }

}

output {
    stdout { debug => true }
  elasticsearch { }
}

and modified csv filter, named modbus_csv.rb.

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"

require "csv"

# CSV filter. Takes an event field containing CSV data, parses it,
# and stores it as individual fields (can optionally specify the names).
class LogStash::Filters::MODBUS_CSV < LogStash::Filters::Base
  config_name "modbus_csv"
  milestone 2

  # The CSV data in the value of the source field will be expanded into a
  # datastructure.
  config :source, :validate => :string, :default => "message"

  # Define a list of column names (in the order they appear in the CSV,
  # as if it were a header line). If this is not specified or there
  # are not enough columns specified, the default column name is "columnX"
  # (where X is the field number, starting from 1).
  config :columns, :validate => :array, :default => []
  config :start_address, :validate => :string, :default => "0"
  config :num_register, :validate => :string, :default => "0"

  # Define the column separator value. If this is not specified the default
  # is a comma ','.
  # Optional.
  config :separator, :validate => :string, :default => ","

  # Define the character used to quote CSV fields. If this is not specified
  # the default is a double quote '"'.
  # Optional.
  config :quote_char, :validate => :string, :default => '"'

  # Define target for placing the data.
  # Defaults to writing to the root of the event.
  config :target, :validate => :string

  public
  def register

    # Nothing to do here

  end # def register

  public
  def filter(event)
    return unless filter?(event)

    @logger.debug("Running modbus_csv filter", :event => event)

    matches = 0

    @logger.debug(event[@num_register].hex)
    for i in 0..(event[@num_register].hex)
        @columns[i] = event[@start_address].hex + i
    end
    if event[@source]
      if event[@source].is_a?(String)
        event[@source] = [event[@source]]
      end

      if event[@source].length > 1
        @logger.warn("modbus_csv filter only works on fields of length 1",
                     :source => @source, :value => event[@source],
                     :event => event)
        return
      end

      raw = event[@source].first
      begin
        values = CSV.parse_line(raw, :col_sep => @separator, :quote_char => @quote_char)

        if @target.nil?
          # Default is to write to the root of the event.
          dest = event
        else
          dest = event[@target] ||= {}
        end

        values.each_index do |i|
          field_name = @columns[i].to_s || "column#{i+1}"
          dest[field_name] = values[i].hex
        end

        filter_matched(event)
      rescue => e
        event.tag "_modbus_csvparsefailure"
        @logger.warn("Trouble parsing modbus_csv", :source => @source, :raw => raw,
                      :exception => e)
        return
      end # begin
    end # if event

    @logger.debug("Event after modbus_csv filter", :event => event)

  end # def filter

end # class LogStash::Filters::Csv

Finally, I got a chart what I want. (*func = REG (13) 4096 mean per 10m | (13 hits))

OTHER TIPS

I have one idea. To handle variable log length with multiple register address:value, you can use grok filter to filter the message first. Then use csv filter to separate each register value.

Filter:

filter {
    grok {
            match => ["message", "%{MONTH:month} %{NUMBER:day} %{TIME:time} %{WORD:host} %{WORD:user}: %{WORD:unit},%{WORD:address},%{NUMBER:regNumber},%{GREEDYDATA:regValue}"]
            add_field => ["logdate","%{month} %{day} %{time}"]
            remove_field => ["month","day", "time"]
    }

    csv {
            source => "regValue"
            remove_field => ["regValue"]
    }
}

Output:

{
   "message" => "Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40",
  "@version" => "1",
"@timestamp" => "2014-02-20T02:05:53.608Z",
      "host" => "SD550"
      "user" => "Jack",
      "unit" => "REG",
   "address" => "0x1000",
 "regNumber" => "4",
   "logdate" => "Feb 19 18:45:29",
   "column1" => "10",
   "column2" => "20",
   "column3" => "30",
   "column4" => "40"
}

However, the address field name is given by csv filter(You can't give the field name by CSV filter column because the number of field is variable). If you want to meet your requirement, you need to modify the csv filter.

You'll want to use grok to match the various fields, there are a number of built in grok patterns which will help you with this. The %{SYSLOGBASE} will get the timestamp and host for you and then the rest can probably be grabbed with patterns such as %{NUMBER} and others found at https://github.com/logstash/logstash/blob/v1.3.3/patterns/grok-patterns

Due to your variable log length your patterns are liable to get a bit complex, however I think you can get away with just matching all of the numbers and stashing them in an array then in your mutate you can map them onto the register value.

As far as generating the graph in kibana, that won't be very difficult once your data is properly formatted. There is a built in time series graph type which is easy to populate.

Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40

if you use the following config file on data that looks like the above, and open kibana, you it works. It splits up the fields into different categories you can search on. Im new to all this but that's how I would do it. A screenshot is below as well of a simple time pie chart after i put about 8 lines of the above in with a different time and address value

input {
  tcp { 
    type => "test"
    port => 3333
  } 
}


filter {
    grok {
        match => ["message", "%{MONTH:month} %{DAY:day} %{TIME:time} %{WORD:sd550} %{WORD:name}: %{WORD:asmThing},%{WORD:address},%{NUMBER:firstno}%{NUMBER:2no}%{NUMBER:3no}%{NUMBER:4no}%{NUMBER:5no}"]
}
 }
output {
  elasticsearch {
    # Setting 'embedded' will run  a real elasticsearch server inside logstash.
    # This option below saves you from having to run a separate process just
    # for ElasticSearch, so you can get started quicker!
    embedded => true
  }
}

Test Kibana

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top