Question

I was just trying out kafka-storm spout mentioned here https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka and the configuration i used are mentioned as below.

    BrokerHosts brokerHosts = KafkaConfig.StaticHosts.fromHostString(
            ImmutableList.of("localhost"), 1);
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, // list of Kafka
            "test", // topic to read from
            "/kafkastorm", // the root path in Zookeeper for the spout to
            "discovery"); // an id for this consumer for storing the
                            // consumer offsets in Zookeeper
    spoutConfig.scheme = new StringScheme();
    spoutConfig.stateUpdateIntervalMs = 1000;


    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    TridentTopology topology = new TridentTopology();
    InetSocketAddress inetSocketAddress = new InetSocketAddress(
            "localhost", 6379);
    TridentState wordsCount = topology
            .newStream(SPOUT_FIRST, kafkaSpout)
            .parallelismHint(1)
            .each(new Fields("str"), new TestSplit(), new Fields("words"))
            .groupBy(new Fields("words"))
            .persistentAggregate(
                    RedisState.transactional(inetSocketAddress),
                    new Count(), new Fields("counts")).parallelismHint(100);

    Config conf = new Config();
    conf.setMaxTaskParallelism(200);
    // conf.setDebug( true );
    // conf.setMaxSpoutPending(20);

    // This topology can only be run as local because it is a toy example
    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("symbolCounter", conf, topology.build());

But the speed at which the above spout fetched messages from the Kafka topic is around 7000/seconds but I am expected a load of around 50000 messages per seconds. I have tried various options of increasing the fetch buffer size in spoutConfig with no visible results.

Has any faced with the similar type of issue where he is not able to fetch the kafka topic via storm with the speed at which the producer produces messages?

Was it helpful?

Solution

I updated the "topology.spout.max.batch.size" value in config to about 64*1024 value and then storm processing became fast.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top