Question

I am using Apache HTTPClient 4 to connect to twitter's streaming api with default level access. It works perfectly well in the beginning but after a few minutes of retrieving data it bails out with this error:

2012-03-28 16:17:00,040 DEBUG org.apache.http.impl.conn.SingleClientConnManager: Get connection for route HttpRoute[{tls}->http://myproxy:80->https://stream.twitter.com:443]
2012-03-28 16:17:00,040 WARN com.cloudera.flume.core.connector.DirectDriver: Exception in source: TestTwitterSource
java.lang.IllegalStateException: Invalid use of SingleClientConnManager: connection still allocated.
    at org.apache.http.impl.conn.SingleClientConnManager.getConnection(SingleClientConnManager.java:216)
Make sure to release the connection before allocating another one.
    at org.apache.http.impl.conn.SingleClientConnManager$1.getConnection(SingleClientConnManager.java:190)

I understand why I am facing this issue. I am trying to use this HttpClient in a flume cluster as a flume source. The code looks like this:

public Event next() throws IOException, InterruptedException {

    try {

        HttpHost target = new HttpHost("stream.twitter.com", 443, "https");
        new BasicHttpContext();
        HttpPost httpPost = new HttpPost("/1/statuses/filter.json");
        StringEntity postEntity = new StringEntity("track=birthday",
                "UTF-8");
        postEntity.setContentType("application/x-www-form-urlencoded");
        httpPost.setEntity(postEntity);
        HttpResponse response = httpClient.execute(target, httpPost,
                new BasicHttpContext());
        BufferedReader reader = new BufferedReader(new InputStreamReader(
                response.getEntity().getContent()));
        String line = null;
        StringBuffer buffer = new StringBuffer();
        while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if(buffer.length()>30000) break;
        }
        return new EventImpl(buffer.toString().getBytes());
    } catch (IOException ie) {
        throw ie;
    }

}

I am trying to buffer 30,000 characters in the response stream to a StringBuffer and then return this as the data received. I am obviously not closing the connection - but I do not want to close it just yet I guess. Twitter's dev guide talks about this here It reads:

Some HTTP client libraries only return the response body after the connection has been closed by the server. These clients will not work for accessing the Streaming API. You must use an HTTP client that will return response data incrementally. Most robust HTTP client libraries will provide this functionality. The Apache HttpClient will handle this use case, for example.

It clearly tells you that HttpClient will return response data incrementally. I've gone through the examples and tutorials, but I haven't found anything that comes close to doing this. If you guys have used a httpclient (if not apache) and read the streaming api of twitter incrementally, please let me know how you achieved this feat. Those who haven't, please feel free to contribute to answers. TIA.

UPDATE

I tried doing this: 1) I moved obtaining stream handle to the open method of the flume source. 2) Using a simple inpustream and reading data into a bytebuffer. So here is what the method body looks like now:

        byte[] buffer = new byte[30000];

        while (true) {
            int count = instream.read(buffer);
            if (count == -1)
                continue;
            else
                break;
        }
        return new EventImpl(buffer);

This works to an extent - I get tweets, they are nicely being written to a destination. The problem is with the instream.read(buffer) return value. Even when there is no data on the stream, and the buffer has default \u0000 bytes and 30,000 of them, so this value is getting written to the destination. So the destination file looks like this.. " tweets..tweets..tweeets.. \u0000\u0000\u0000\u0000\u0000\u0000\u0000...tweets..tweets... ". I understand the count won't return a -1 coz this is a never ending stream, so how do I figure out if the buffer has new content from the read command?

Was it helpful?

Solution 2

It turns out that it was a flume issue. Flume is optimized to transfer events of size 32kb. Anything beyond 32kb, Flume bails out. (The workaround is to tune event size to be greater than 32KB). So, I've changed my code to buffer 20,000 characters at least. It kind of works, but it is not fool proof. This can still fail if the buffer length exceeds 32kb, however, it hasn't failed so far in an hour of testing - I believe it has to do with the fact that Twitter doesn't send a lot of data on its public stream.

while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if(buffer.length()>20000) break;
        }

OTHER TIPS

The problem is that your code is leaking connections. Please make sure that no matter what you either close the content stream or abort the request.

    InputStream instream = response.getEntity().getContent();
    try {
        BufferedReader reader = new BufferedReader(
               new InputStreamReader(instream));
        String line = null;
        StringBuffer buffer = new StringBuffer();
        while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if (buffer.length()>30000) {
               httpPost.abort();
               // connection will not be re-used
               break;
            }
        }
        return new EventImpl(buffer.toString().getBytes());
    } finally {
        // if request is not aborted the connection can be re-used
        try {
          instream.close();
        } catch (IOException ex) {
          // log or ignore
        }
    }
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top