The clone filter came in handy. The following is my resulting config file.
input {
rabbitmq {
host => "amqp-host"
queue => "elasticsearch"
key => "elasticsearch"
exchange => "elasticsearch"
type => "all"
durable => true
auto_delete => false
exclusive => false
format => "json_event"
debug => false
}
}
filter {
clone {
exclude_tags => ["cloned"]
clones => ["statsd", "elastic-search"]
add_tag => ["cloned"]
}
grep {
type => "statsd"
add_tag => "grepped"
match => ["@message", "Execution of .*Command took .* sec"]
}
grok {
type => "statsd"
tags => ["grepped"]
add_tag => "grokked"
pattern => "Execution of %{DATA:command_name}Command took %{DATA:response_time} sec"
}
mutate {
type => "statsd"
tags => ["grepped", "grokked"]
lowercase => [ "command_name" ]
add_tag => ["mutated"]
}
}
output {
elasticsearch_river {
type => "all"
rabbitmq_host => "amqp-host"
debug => false
durable => true
persistent => true
es_host => "es-host"
exchange => "logstash-elasticsearch"
exchange_type => "direct"
index => "logs-%{+YYYY.MM.dd}"
index_type => "%{@type}"
queue => "logstash-elasticsearch"
}
statsd {
type => "statsd"
tags => ["grepped", "grokked", "mutated"]
host => "some.host.local"
port => 1234
sender => ""
namespace => ""
timing => ["commands.%{command_name}.responsetime", "%{response_time}"]
increment => ["commands.%{command_name}.requests"]
}
}