Question

We copied a 150 mb csv file into flume's spool directory, when it is getting loaded into hdfs, the file was splitting into smaller size files like 80 kb's. is there a way to load the file without getting split into smaller files using flume? because more metadata will be generated inside namenode about the smaller files, so we need to avoid it.

My flume-ng code looks like this

# Initialize agent's source, channel and sink
agent.sources = TwitterExampleDir
agent.channels = memoryChannel
agent.sinks = flumeHDFS

# Setting the source to spool directory where the file exists
agent.sources.TwitterExampleDir.type = spooldir
agent.sources.TwitterExampleDir.spoolDir = /usr/local/flume/live

# Setting the channel to memory
agent.channels.memoryChannel.type = memory
# Max number of events stored in the memory channel
agent.channels.memoryChannel.capacity = 10000
# agent.channels.memoryChannel.batchSize = 15000
agent.channels.memoryChannel.transactioncapacity = 1000000

# Setting the sink to HDFS
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.path = hdfs://info3s7:54310/spool5
agent.sinks.flumeHDFS.hdfs.fileType = DataStream

# Write format can be text or writable
agent.sinks.flumeHDFS.hdfs.writeFormat = Text

# use a single csv file at a time
agent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1

# rollover file based on maximum size of 10 MB
agent.sinks.flumeHDFS.hdfs.rollCount=0
agent.sinks.flumeHDFS.hdfs.rollInterval=2000
agent.sinks.flumeHDFS.hdfs.rollSize = 0
agent.sinks.flumeHDFS.hdfs.batchSize =1000000

# never rollover based on the number of events
agent.sinks.flumeHDFS.hdfs.rollCount = 0

# rollover file based on max time of 1 min
#agent.sinks.flumeHDFS.hdfs.rollInterval = 0
# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600

# Connect source and sink with channel
agent.sources.TwitterExampleDir.channels = memoryChannel
agent.sinks.flumeHDFS.channel = memoryChannel
Was it helpful?

Solution

What you want is this:

# rollover file based on maximum size of 10 MB
agent.sinks.flumeHDFS.hdfs.rollCount = 0
agent.sinks.flumeHDFS.hdfs.rollInterval = 0
agent.sinks.flumeHDFS.hdfs.rollSize = 10000000
agent.sinks.flumeHDFS.hdfs.batchSize = 10000

From the flume documentation

hdfs.rollSize: File size to trigger roll, in bytes (0: never roll based on file size)

In your example you use rollInterval of 2000 which will roll over the file after 2000 seconds, resulting in small files.

Also note that batchSize reflects the number of events before the file is flushed to HDFS, not necessarily the number of events before the file is closed and a new one created. You'll want to set that to some value small enough to not time out writing a large file but large enough to avoid overhead of many requests to HDFS.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top