Question

I have a set of the keywords (over 600) and I want to use streaming api to track tweets with them. Twitter api limits the number of keywords, which you are allowed to track, to 200. So I decided to have several threads that will do it, using several OAuth tokens for this. This is how I do it:

String[] dbKeywords = KeywordImpl.listKeywords();
    List<String[]> keywords = ditributeKeywords(dbKeywords);
    for (String[] subList : keywords) {
        StreamCrawler streamCrawler = new StreamCrawler();
        streamCrawler.setKeywords(subList);
        Thread crawlerThread = new Thread(streamCrawler);
        crawlerThread.start();
    }

This is how words are distributed among threads. Each thread receives no more than 200 words. This is the implementation of the StreamCrawler:

public class StreamCrawler extends Crawler implements Runnable {

...

    private String[] keywords;
    public void setKeywords(String[] keywords) {
    this.keywords = keywords;
}

@Override
public void run() {
    TwitterStream twitterStream = getTwitterInstance();
    StatusListener listener = new StatusListener() {
        ArrayDeque<Tweet> tweetbuffer = new ArrayDeque<Tweet>();
        ArrayDeque<TwitterUser> userbuffer = new ArrayDeque<TwitterUser>();


        @Override
        public void onException(Exception arg0) {
            System.out.println(arg0);
        }

        @Override
        public void onDeletionNotice(StatusDeletionNotice arg0) {
            System.out.println(arg0);
        }

        @Override
        public void onScrubGeo(long arg0, long arg1) {
            System.out.println(arg1);
        }

        @Override
        public void onStatus(Status status) {
                 ...Doing something with message
        }

        @Override
        public void onTrackLimitationNotice(int arg0) {
            System.out.println(arg0);
            try {
                Thread.sleep(5 * 60 * 1000);
                System.out.println("Will sleep for 5 minutes!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onStallWarning(StallWarning arg0) {
            System.out.println(arg0);
        }

    };

    FilterQuery fq = new FilterQuery();
    String keywords[] = getKeywords();
    System.out.println(keywords.length);
    System.out.println("Listening for " + Arrays.toString(keywords));
    fq.track(keywords);
    twitterStream.addListener(listener);
    twitterStream.filter(fq);
}

private long getCurrentThreadId() {
    return Thread.currentThread().getId();
}

private TwitterStream getTwitterInstance() {
    TwitterConfiguration configuration = null;
    TwitterStream twitterStream = null;
    while (configuration == null) {
        configuration = TokenFactory.getAvailableToken();
        if (configuration != null) {
            System.out
                    .println("Token was obtained " + getCurrentThreadId());
            System.out.println(configuration.getTwitterAccount());
            setToken(configuration);
            ConfigurationBuilder cb = new ConfigurationBuilder();
            cb.setDebugEnabled(true);
            cb.setOAuthConsumerKey(configuration.getConsumerKey());
            cb.setOAuthConsumerSecret(configuration.getConsumerSecret());
            cb.setOAuthAccessToken(configuration.getAccessToken());
            cb.setOAuthAccessTokenSecret(configuration.getAccessSecret());
            twitterStream = new TwitterStreamFactory(cb.build())
                    .getInstance();
        } else {
            // If there is no available configuration, wait for 2 minutes
            // and try again
            try {
                System.out
                        .println("There were no available tokens, sleeping for 2 minutes.");
                Thread.sleep(2 * 60 * 1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    return twitterStream;
    }
}

So my problem is that when I start for example 2 threads I get notification that both of them are opening stream and getting it. But actually only first one is really getting stream and respectively calling OnStatus method. The array, which is used in the second thread, is not empty; Twitterconfiguration is also valid and unique. So I don't understand what might be the reason for such behavior. Why does the only first thread return tweets?

Was it helpful?

Solution

As far as I see you're trying to make two simultaneous connections to the public streaming endpoints (a.k.a. general streams or stream.twitter.com) from the same IP.
More specifically, I think you want two active connections to stream.twitter.com/1.1/statuses/filter.json from the same IP.

Although the Twitter streaming-apis documentation doesn't clearly say about only one standing connection to the public endpoints, the Twitter employees clarify this on the dev site https://dev.twitter.com/discussions/7542

For general streams, you should only make one connection from the same IP.

This means that it doesn't matter you use two different Twitter applications/accounts to connect to public streams; as long you're connecting from the same IP address you can have only one standing connection to the public streams. You said that you got both streams connected, and the answer to this behaviour is given by a Twitter employee: https://dev.twitter.com/discussions/14935

You may find that at times stream.twitter.com lets you get away with more open connections here or there, but that behavior shouldn't be counted on.

If you try for instance, in the 2nd thread, to connect to user stream instead (twitter4j TwitterStream user() method), then you'll really start getting both filter & user streams.

Regarding the 200 track keywords limit, probably the twitter4j.org javadoc is little bit outdated. Here is what the twitter api docs are saying

The default access level allows up to 400 track keywords, 5,000 follow userids and 25 0.1-360 degree location boxes. If you need elevated access to the Streaming API, you should explore our partner providers of Twitter data ...

So, if you need to go beyond the 400, you'll probably want to ask Twitter for increased track access level for your Twitter account application, or working with certified partner providers of Twitter data.

Another thing you don't necessarily need, is starting new threads for getting the streams, since the twitter4j filter (or user) "method internally creates a thread which manipulates TwitterStream and calls adequate listener methods continuously" (quoted from an example code by Yusuke Yamamoto).

I hope this help. (I couldn't post more links because I'm getting this "You need at least 10 reputation to post more than 2 links")

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top