Question

I'm trying to use the flume connectors for storm with a flume instance which is getting syslog messages.

my storm config is the following

flumeConf.put("flume-agent.source.type", "AVRO");
flumeConf.put("flume-agent.source.bind", "localhost");
flumeConf.put("flume-agent.source.port", "3564");
flumeConf.put("flume-agent.channel.type", "MEMORY");
flumeConf.put("flume-agent.channel.capacity", "2000");

my flume config is the following

mainAgent.sinks = consoleSink
mainAgent.sources = syslog-source
mainAgent.channels = mainChannel
mainAgent.sinks = stormSink

mainAgent.sinks.consoleSink.channel = mainChannel
mainAgent.sources.syslog-source.channels = mainChannel
mainAgent.sinks.stormSink.channel = mainChannel

# the source
mainAgent.sources.syslog-source.type = syslogudp
mainAgent.sources.syslog-source.bind = localhost
mainAgent.sources.syslog-source.port = 3216

mainAgent.channels.mainChannel.type = memory
mainAgent.channels.mainChannel.capacity = 2000
mainAgent.channels.mainChannel.transactionCapacity = 200

mainAgent.sinks.consoleSink.type = logger

# avro sink properties
mainAgent.sources.stormSink.type = avro
mainAgent.sources.stormSink.hostname = #my-storm-server.localdomain
mainAgent.sources.stormSink.port = 3564

When I try to run that I'm getting this:

[WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:697)] Could not configure sink  stormSink due to: Component has no type. Cannot configure. stormSink
org.apache.flume.conf.ConfigurationException: Component has no type. Cannot configure. stormSink
    at org.apache.flume.conf.ComponentConfiguration.configure(ComponentConfiguration.java:76)
    at org.apache.flume.conf.sink.SinkConfiguration.configure(SinkConfiguration.java:44)
    at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:680)
    at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:346)
    at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
    at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
    at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:108)
    at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:193)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:94)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

So I'm obviously missing something, I tried changing the lines of avro sink properties to

# avro sink properties
mainAgent.sinks.stormSink.type = avro
mainAgent.sinks.stormSink.hostname = #my-storm-server-ip
mainAgent.sinks.stormSink.port = 3564

and I'm getting

 [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:126)] Attempting to create Avro Rpc client.
2013-11-25 15:12:47,550 (Old I/O datagram worker ([id: 0x50b279f3, 0.0.0.0/0.0.0.0:3216])) [ERROR - org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:80)] Error writting to channel
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: mainChannel}
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
    at org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:76)
    at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:95)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.socket.oio.OioDatagramWorker.process(OioDatagramWorker.java:54)
    at org.jboss.netty.channel.socket.oio.AbstractOioWorker.run(AbstractOioWorker.java:70)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
    ... 12 more
2013-11-25 15:12:50,554 (Old I/O datagram worker ([id: 0x50b279f3, 0.0.0.0/0.0.0.0:3216])) [ERROR - org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:80)] Error writting to channel
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: mainChannel}
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
    at org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:76)
    at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:95)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.socket.oio.OioDatagramWorker.process(OioDatagramWorker.java:54)
    at org.jboss.netty.channel.socket.oio.AbstractOioWorker.run(AbstractOioWorker.java:70)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
    ... 12 more
2013-11-25 15:12:53,555 (Old I/O datagram worker ([id: 0x50b279f3, 0.0.0.0/0.0.0.0:3216])) [ERROR - org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:80)] Error writting to channel
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: mainChannel}
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
    at org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:76)
    at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:95)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.socket.oio.OioDatagramWorker.process(OioDatagramWorker.java:54)
    at org.jboss.netty.channel.socket.oio.AbstractOioWorker.run(AbstractOioWorker.java:70)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
    ... 12 more
2013-11-25 15:12:56,557 (Old I/O datagram worker ([id: 0x50b279f3, 0.0.0.0/0.0.0.0:3216])) [ERROR - org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:80)] Error writting to channel
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: mainChannel}
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
    at org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:76)
    at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:95)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.socket.oio.OioDatagramWorker.process(OioDatagramWorker.java:54)
    at org.jboss.netty.channel.socket.oio.AbstractOioWorker.run(AbstractOioWorker.java:70)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
    ... 12 more
^C2013-11-25 15:12:57,444 (agent-shutdown-hook) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.stop(LifecycleSupervisor.java:79)] Stopping lifecycle supervisor 10
2013-11-25 15:12:59,559 (Old I/O datagram worker ([id: 0x50b279f3, 0.0.0.0/0.0.0.0:3216])) [ERROR - org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:80)] Error writting to channel
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: mainChannel}
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
    at org.apache.flume.source.SyslogUDPSource$syslogHandler.messageReceived(SyslogUDPSource.java:76)
    at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:95)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.socket.oio.OioDatagramWorker.process(OioDatagramWorker.java:54)
    at org.jboss.netty.channel.socket.oio.AbstractOioWorker.run(AbstractOioWorker.java:70)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
    ... 12 more
2013-11-25 15:13:01,262 (lifecycleSupervisor-1-1) [WARN - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:291)] Unable to create Rpc client using hostname: <ip>, port: 3564
org.apache.flume.FlumeException: NettyAvroRpcClient { host: <ip>, port: 3564 }: RPC connection error
    at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
    at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
    at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
    at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
    at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
    at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)
    at org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:289)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException: Error connecting to /<ip>:3564
    at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
    at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
    at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
    at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)
    ... 16 more
2013-11-25 15:13:01,263 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:300)] Rpc sink stormSink started.
2013-11-25 15:13:01,265 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:205)] Rpc sink stormSink: Building RpcClient with hostname: <ip>, port: 3564
2013-11-25 15:13:01,265 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:126)] Attempting to create Avro Rpc client.
2013-11-25 15:13:01,271 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: <ip>, port: 3564 }: RPC connection error
    at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
    at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
    at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
    at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
    at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
    at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)
    at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:269)
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:339)
    ... 3 more
Caused by: java.io.IOException: Error connecting to /<ip>:3564
    at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
    at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
    at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
    at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)
    ... 10 more
Caused by: java.nio.channels.ClosedByInterruptException
    at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:677)
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.connect(NioClientSocketPipelineSink.java:134)
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:97)
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:55)
    at org.jboss.netty.channel.Channels.connect(Channels.java:642)
    at org.jboss.netty.channel.AbstractChannel.connect(AbstractChannel.java:204)
    at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:230)
    at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183)
    at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:253)
    ... 13 more

Storm is running and using tcpdump I can't see any flume-related traffic on the storm-server

Could you provide some info on what am I missing?

Was it helpful?

Solution

I changed my storm config to:

flumeConf.put("flume-agent.source.type", "AVRO");
flumeConf.put("flume-agent.source.bind", "the ip of my storm server");
flumeConf.put("flume-agent.source.port", "3564");
flumeConf.put("flume-agent.channel.type", "MEMORY");
flumeConf.put("flume-agent.channel.capacity", "2000");

and the flume config to

# Name the components on this agent
mainAgent.sinks = consoleSink
mainAgent.sources = syslog-source
mainAgent.channels = mainChannel
mainAgent.sinks = stormSink

# Bind the source and sink to the channel
mainAgent.sinks.consoleSink.channel = mainChannel
mainAgent.sources.syslog-source.channels = mainChannel
mainAgent.sinks.stormSink.channel = mainChannel

# Describe/configure the source
mainAgent.sources.syslog-source.type = syslogudp
mainAgent.sources.syslog-source.bind = localhost
mainAgent.sources.syslog-source.port = 3216

# Use a channel which buffers events in memory
mainAgent.channels.mainChannel.type = memory
mainAgent.channels.mainChannel.capacity = 2000
mainAgent.channels.mainChannel.transactionCapacity = 200

# Describe the sink
mainAgent.sinks.consoleSink.type = logger

# avro sink properties
mainAgent.sinks.stormSink.type = avro
mainAgent.sinks.stormSink.hostname = the ip of my storm server
mainAgent.sinks.stormSink.port = 3564

and it works

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top