Question

### TOPOLOGY ###
class WordCountTopology < DSL::Topology
  spout RandomSentenceSpout, :parallelism => 2

  bolt SplitSentenceBolt, :parallelism => 2 do
    source RandomSentenceSpout, :shuffle
  end

  bolt WordCountBolt, :parallelism => 2 do
    source SplitSentenceBolt, :fields => ["word"]
  end

  configure :word_count do |env|
    debug true
    max_task_parallelism 4
    if env == :cluster
      num_workers 6
      max_spout_pending(1000)
    end
  end

  on_submit do |env|
    if env == :local
      sleep(60)
      cluster.shutdown
    end
  end
end



### SPOUT ###
class RandomSentenceSpout < DSL::Spout
  output_fields :word

  on_send {@sentences[rand(@sentences.length)]}
  sleep(10)

  on_init do
    @sentences = [
      "the cow jumped over the moon",
      "an apple a day keeps the doctor away",
      "four score and seven years ago",
      "snow white and the seven dwarfs",
      "i am at two with nature"
    ]
  end
end


####  SplitSentenceBolt ###
class SplitSentenceBolt < DSL::Bolt
  output_fields :word
  on_receive {|tuple| tuple[0].split(' ').map{|w| [w]}}
end




### WORDCOUNTBOLT ###
class WordCountBolt < DSL::Bolt
  output_fields :word, :count
  on_init {@counts = Hash.new{|h, k| h[k] = 0}}

  on_receive do |tuple|
    word = tuple[0].to_s
    @counts[word] += 1

    [word, @counts[word]]
  end
end

I want to implement a tick tuple. I want to do the word count every 60 seconds, emit the result , reset counter to 0.

// may be
// in split sentence bolt

function to send tick_tuple every 60 seconds

// in word count bolt
if(tick_tuple){
emit results
@counts = [] # re initialize
}

Can anyone help me implement this ? I am new to storm world.

Was it helpful?

Solution

You can use Storm internal tick tuple functionally to set a bolt to receive a tick tuple at a specified interval.

In the bolt section of the topology definition simply add the following configuration:

bolt MyBolt do
  ...
  set Backtype::Config::TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60
  ...
end

In your bolt, you can test for the tick tuple like this:

if tuple.source_stream_id == "__tick"
  ...
end

OTHER TIPS

Don't know ruby, but hope python will help you figure it out. That's how we have it in our bolt:

def process(self, tup):                                                     
    """                                                                     
    If spout trigers tick-tuple, then perform upload                                 
    """                                                                     
    if tup.stream == '__tick':                                                       
        self.perform_upload()                                                  
        storm.ack(tup)                                                      
        return   
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top