Domanda

I am new to using RabbitMQ and Pika so please excuse if the answer is obvious...

We are feeding some data and passing the results into our rabbitmq message queue. The queue is being consumed by a process that writes the data into elasticsearch.

The data is being produced faster than it can be fed into elastic search and consequently the queue grows and almost never shrinks.

We are using pika and getting the warning:

UserWarning: Pika: Write buffer exceeded warning threshold at X bytes and an estimated X frames behind.

This continues for some time until Pika simply crashes with a strange error message:

NameError: global name 'log' is not defined

We are using the Pika BlockingConnection object (http://pika.github.com/connecting.html#blockingconnection).

My plan to fix this is to use the add_backpressure_callback function to have a function that will call time.sleep(0.5) every time that we need to apply back-pressure. However, this seems like it is too simple of a solution and that there must be a more appropriate way of dealing with something like this.

I would guess that it is a common situation that the queue is being populated faster than it is being consumed. I am looking for an example or even some advice as to what is the best way to slow down the queue.

Thanks!

È stato utile?

Soluzione

Interesting problem, and as you rightly point out this is probably quite common. I saw another related question on Stack Overflow with some pointers

Pika: Write buffer exceeded warning

Additionally, may you want to consider scaling up your elasticsearch, this is perhaps the fundamental bottleneck you want to fix. A quick look on the elasticsearch.org website came up with

"Distributed

One of the main features of Elastic Search is its distributed nature. Indices are broken down into shards, each shard with 0 or more replicas. Each data node within the cluster hosts one or more shards, and acts as a coordinator to delegate operations to the correct shard(s). Rebalancing and routing are done automatically and behind the scenes. "

(...although not sure if insertion is also distributed and scalable)

Afterall, RabbitMQ is not supposed to grow queues infinitely. Also may want to look at scaling up RabbitMQ itself, for example by using things like per-queue processes etc. in the RabbitMQ configuration.

Cheers!

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top