Question

I'm using logstash and elasticsearch to build a log system. RabbitMQ used to queue log message between two logstashs.

The message path is like below:

source log -> logstash -> rabbitMQ -> logstash(parse) -> elasticsearch

But i figure out that, no matter how much machine i added to rabbitMQ, it just use one machine resource to process messages.

I'm found some article say cluster just increase reliability and redundancy to prevent message lost.

But what i want is increase entire RabbitMQ cluster's throughput(in and out) by add more machine.

How do i configure my RabbitMQ cluster to increase it throughput?

Any comments are appreciated.

--

PS. i need to add more information here.

In my system limit i test is, can receive 7000/s messages and output 1700/s messages in 4 machine cluster system, but not enable HA and just bind 1 exchange to 1 queue and the queue just bind to 1 node. i guess 1 queue bind to 1 node is the throughput bottleneck. And its difficult to change the routing key now, so we have just one routing key and want to distribute message to different nodes to increase whole system throughput.

below is my logstash-indexer config

rabbitmq {
codec       => "json"
auto_delete => false
durable     => true
exchange    => "logstash-exchange"
key         => "logstash-routingKey"
queue       => "logstash-queue"
host        => "VIP-of-rabbitMQ"
user        => "guest"
password    => "guest"

passive     => false
exclusive   => false


threads     => 4
prefetch_count => 512   }
Was it helpful?

Solution

You need to add more queues. I guess you using only one queue. So in other word you tied to one erlang process. What you want is use multiple queues:

Here is a quick and dirty example how to add some logic to logstash to send message to different queue:

filter {

  # check if path contains source subfolder
  if "foo" in [path] {
    mutate { add_field => [ "source", "foo"] }
  }
  else if "bar" in [path] {
    mutate { add_field => [ "source", "bar"] }
  }
  else {
    mutate { add_field => [ "source", "unknown"] }
  }
}

Then in your output:

  rabbitmq {
    debug => true
    durable => true
    exchange_type => "direct"
    host => "your_rabbit_ip"
    key => "%{source}"
    exchange => "my_exchange"
    persistent => true
    port => 5672
    user => "logstash"
    password => "xxxxxxxxxx"
    workers => 12
  }

Updated:

Take a look at the repositories that this guy has: https://github.com/simonmacmullen

I guess you will be interested in this one: https://github.com/simonmacmullen/random-exchange

This exchange type is for load-balancing among consumers.
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top