Question

Rarely, when under more load than usual my RabbitMQ application starts returning SocketException: Broken pipe (and basically doesn't process any further messages).

The system is using the RPC pattern, with workers listening on a few predefined queues for jobs, clients submitting tasks on these jobs while opening a temporary auto-delete queues that they specify as replyTo queue where they listen for the replies on (and use a correlation ID as well to match the messages).

The code that actually leads to the Broken pipe is quite simple, it is in the client part and basically does:

factory = new ConnectionFactory();
factory.setUri(uri);
connection = factory.newConnection(); // this is when we get the exception

The exception is as follows:

2013-09-06 21:37:03,947 +0000 [http-bio-8080-exec-350] ERROR RabbitRpcClient:79  - IOException 
java.net.SocketException: Broken pipe
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:142)
    at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:488)
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:125)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:292)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:285)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:383)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)     
    ...

I think this generally coincides with the workers taking longer than usual about their business, and thus more temporary client queues concurrently open (about 20-30 perhaps?), however as far as I know I'm not running into any of the usual watermarks (memory, disk - I could be running into some limit I don't know about).

I've reviewed the Rabbit logs and the only kind of errors I find there are:

=ERROR REPORT==== 6-Sep-2013::21:36:59 ===
closing AMQP connection <0.3105.1297> (10.118.69.132:42582 -> 10.12.111.134:5672):
{handshake_timeout,frame_header}

I checked both logs and the first "broken pipe" on the client appeared at 21:37:03, while the first ERROR of any kind in RabbitMQ logs on that date appeared at 21:36:59, with regular errors of the same kind appearing regularly thereafter until the systems were restarted. Thus I believe the ones published are corresponding log entries.

I'm using the Rabbit Java client 3.1.4 (latest on Maven central) with Rabbit server 3.1.4 running on Amazon Linux on AWS EC2.

Here is the rabbitmqctl status under normal situation (unfortunately not during the failure, I will try to get one when it appears next):

Status of node 'rabbit@ip-some-ip' ...
[{pid,2654},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.1.4"},
  {rabbitmq_management_agent,"RabbitMQ Management Agent","3.1.4"},
  {rabbit,"RabbitMQ","3.1.4"},
  {os_mon,"CPO  CXC 138 46","2.2.7"},
  {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.1.4"},
  {webmachine,"webmachine","1.10.3-rmq3.1.4-gite9359c7"},
  {mochiweb,"MochiMedia Web Server","2.7.0-rmq3.1.4-git680dba8"},
  {xmerl,"XML parser","1.2.10"},
  {inets,"INETS  CXC 138 49","5.7.1"},
  {mnesia,"MNESIA  CXC 138 12","4.5"},
  {amqp_client,"RabbitMQ AMQP Client","3.1.4"},
  {sasl,"SASL  CXC 138 11","2.1.10"},
  {stdlib,"ERTS  CXC 138 10","1.17.5"},
  {kernel,"ERTS  CXC 138 10","2.14.5"}]},
 {os,{unix,linux}},
 {erlang_version,
 "Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:2:2] [rq:2]     [async-threads:30] [kernel-poll:true]\n"},
{memory,
 [{total,331967824},
  {connection_procs,5389784},
  {queue_procs,2669016},
  {plugins,654768},
  {other_proc,10063336},
  {mnesia,90352},
  {mgmt_db,2706344},
  {msg_index,7148168},
  {other_ets,3495648},
  {binary,1952040},
  {code,17696200},
  {atom,1567425},
  {other_system,278534743}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,3126832332},
{disk_free_limit,1000000000},
{disk_free,1487147008},
{file_descriptors,
 [{total_limit,349900},
  {total_used,71},
  {sockets_limit,314908},
  {sockets_used,66}]},
{processes,[{limit,1048576},{used,930}]},
{run_queue,0},
 {uptime,5680}]
 ...done.

Any ideas what could be wrong or at least what I can do to debug this / get more clarity on what is happening?

Was it helpful?

Solution

I have changed my code to reuse Connection objects - actually even do that among multiple threads, and it seems the problem is not recurring (fingers crossed).

OTHER TIPS

package com.rm.rabbitmq.tls;

import java.io.*;
import java.security.*;
import javax.net.ssl.*;

import com.rabbitmq.client.*;

public class Example2 {

    public static void main(String[] args) throws Exception {

        char[] keyPassphrase = "".toCharArray();
        KeyStore ks = KeyStore.getInstance("PKCS12");
        ks.load(new FileInputStream("/Users/global/Documents/tls-gen/basic/result/client_key.p12"), keyPassphrase);

        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
        kmf.init(ks, keyPassphrase);

        char[] trustPassphrase = "welcome".toCharArray();
        KeyStore tks = KeyStore.getInstance("JKS");
        tks.load(new FileInputStream("/Users/global/Documents/tls-gen/basic/result/rabbitstore"), trustPassphrase);

        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(tks);

        SSLContext c = SSLContext.getInstance("TLSv1.3");
        c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5671);
        factory.useSslProtocol(c);
        //factory.enableHostnameVerification();

        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        channel.queueDeclare("rabbitmq-java-test", false, true, true, null);
        channel.basicPublish("", "rabbitmq-java-test", null, "Hello, World".getBytes());

        GetResponse chResponse = channel.basicGet("rabbitmq-java-test", false);
        if (chResponse == null) {
            System.out.println("No message retrieved");
        } else {
            byte[] body = chResponse.getBody();
            System.out.println("Received: " + new String(body));
        }

        channel.close();
        conn.close();
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top