Вопрос

I'm trying to setup a single logstash worker that takes all messages from a one amqp/rabbitmq queue, filters some of the messages to send to statsD but also sends ALL the messages to elastic search. The following implementation only doesn't send ANY messages to ElasticSearch.

input {
  rabbitmq {
    host => "amqp-host"
    queue => "elasticsearch"
    key => "elasticsearch"
    exchange => "elasticsearch"
    type => "all"
    durable => true
    auto_delete => false
    exclusive => false
    format => "json_event"
    debug => false
  }
}

filter {
    grep {
      add_tag => "grepped"
      match => ["@message", "Execution of .*? took .* sec"]
    }

    grok {
        tags => ["grepped"]
        add_tag => "grokked"
        pattern => "Execution of %{DATA:command_name} took %{DATA:response_time} sec"
    }

    mutate {
        tags => ["grepped", "grokked"]
        lowercase => [ "command_name" ]
        add_tag => ["mutated"]
    }
}

output {
  elasticsearch_river {
    type => "all"
    rabbitmq_host => "amqp-host"
    debug => false
    durable => true
    persistent => true
    es_host => "es-host"
    exchange => "logstash-elasticsearch"
    exchange_type => "direct"
    index => "logs-%{+YYYY.MM.dd}"
    index_type => "%{@type}"
    queue => "logstash-elasticsearch" 
  }

 statsd {
    type => "command-filter"
    tags => ["grepped", "grokked", "mutated"]
    host => "some.domain.local"
    port => 1234
    sender => ""
    namespace => ""
    timing => ["prefix.%{command_name}.suffix", "%{response_time}"]
    increment => ["prefix.%{command_name}.suffix"]
  }
}

Is there some catchall filter? Or a way to arrange the tags so some messages are filtered but ALL are forwarded to ES?

Это было полезно?

Решение

The clone filter came in handy. The following is my resulting config file.

input {
  rabbitmq {
    host => "amqp-host"
    queue => "elasticsearch"
    key => "elasticsearch"
    exchange => "elasticsearch"
    type => "all"
    durable => true
    auto_delete => false
    exclusive => false
    format => "json_event"
    debug => false
  }
}

filter {
    clone {
        exclude_tags => ["cloned"]
        clones => ["statsd", "elastic-search"]
        add_tag => ["cloned"]
    }

    grep {
      type => "statsd"
      add_tag => "grepped"
      match => ["@message", "Execution of .*Command took .* sec"]
    }

    grok {
        type => "statsd"
        tags => ["grepped"]
        add_tag => "grokked"
        pattern => "Execution of %{DATA:command_name}Command took %{DATA:response_time} sec"
    }

    mutate {
        type => "statsd"
        tags => ["grepped", "grokked"]
        lowercase => [ "command_name" ]
        add_tag => ["mutated"]
    }
}

output {
  elasticsearch_river {
    type => "all"
    rabbitmq_host => "amqp-host"
    debug => false
    durable => true
    persistent => true
    es_host => "es-host"
    exchange => "logstash-elasticsearch"
    exchange_type => "direct"
    index => "logs-%{+YYYY.MM.dd}"
    index_type => "%{@type}"
    queue => "logstash-elasticsearch" 
  }

  statsd {
    type => "statsd"
    tags => ["grepped", "grokked", "mutated"]
    host => "some.host.local"
    port => 1234
    sender => ""
    namespace => ""
    timing => ["commands.%{command_name}.responsetime", "%{response_time}"]
    increment => ["commands.%{command_name}.requests"]
  }
}

Другие советы

The clone filter is actually unnecessary in your situation.

There are a few things I would recommend:

  • Please simplify your config until you get your "base" working. Don't set optional parameters for now
  • Make sure the type matches up
  • Use tags, they are a lifesaver
  • When "grepping" you should stay consistent and always use grok or always use grep. My preference is grok

Some other tips... Choose what you want to do with your messages based on what tags they have set. Here is an example config snippet of mine:

grok{
   type => "company"
   pattern => ["((status:new))"]
   add_tag => ["company-vprod-status-new", "company-vprod-status-new-%{company}", "company-vprod-status-new-%{candidate_username}"]
   tags=> "company-vprod-status-change"
}
output {
 elasticsearch {
    host => "127.0.0.1"
    type => "company"
    index => "logstash-syslog-%{+YYYY.MM.dd}"
 }
 statsd {
    host => "graphite.test.company.com"
    increment => ["vprod.statuses.new.all", "vprod.statuses.new.%{company}.all"]
    tags => ["company-vprod-status-new"]   
 }
}

Also, really pay close attention to your type. If you set the type attribute to a value that doesn't exist, then that block will never get triggered. This is why I prefer to use tags, unless there is an explicit reason for using types.

You can also add:

drop => false

at the end of the grep stanza (if you are still using grep that is)

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top