Domanda

I have few binary files of moderate size (1 to 5 GB) which I want to read and process using Erlang.

Each file has records of varying sizes, i.e. one record will be 200 kb but other record may be 800 kb size. Record size can be obtained by reading first few bytes of record. As it is a binary file, there's no delimiter between two records.

To process these files we can write multithreaded program but just for having fun I thought of processing files in parallel using Erlang.

I am newbie to Erlang so I don't know how I can split files into chunks and pass these chunks to Erlang processes.

Can anyone please give some ideas?

È stato utile?

Soluzione

I've done this many times in other programming languages, and it was a fun learning exercise in Erlang.

The basic strategy is that you tell each process its starting byte offset, record size, and number of records to read. (If you have different-sized records in one file, you might pass a list of record sizes, like [200, 800, 800, 800, 800, 200, 100]). The workers process their chunk independently and return the result to the parent.

I think it's easy enough that you can figure it out yourself. You'll want to look at erlang:spawn, file:open, file:read, and file:position as the main pieces. But if you want the spoilers, here's my implementation of a module that reads numbers from a binary file and finds the average using multiple processes. (I'm not an Erlang expert, so there may be better ways of doing this.)

-module(average).

-export([write_file/3, read_file/4]).
-export([read_file_worker/5]).

write_file(Filename, BlockSize, ListOfNumbers) ->
    BS = BlockSize*8,
    BinData = [<<X:BS>> || X <- ListOfNumbers],
    {ok, IoDevice} = file:open(Filename, [write, raw, binary]),
    file:write(IoDevice, BinData),
    file:close(IoDevice).

read_file(Filename, BlocksPerProcess, BlockSize, TotalBlockCount) ->
    {ok, SpawnCount} = read_file_spawner(Filename, BlocksPerProcess, BlockSize, TotalBlockCount, 0, 0),
    {ok, Sum} = read_file_listener(SpawnCount, 0),
    io:format("Total sum: ~p~nNumbers seen: ~p~nAverage: ~p~n", [Sum, TotalBlockCount, Sum/TotalBlockCount]).

read_file_spawner(Filename, BlocksPerProcess, BlockSize, TotalBlockCount, BlockOffset, SpawnCount) when BlockOffset < TotalBlockCount ->
    Offset = BlockOffset * BlockSize,
    MaxBlocks = min(BlocksPerProcess, TotalBlockCount - BlockOffset),
    spawn(?MODULE, read_file_worker, [self(), Filename, Offset, BlockSize, MaxBlocks]),
    read_file_spawner(Filename, BlocksPerProcess, BlockSize, TotalBlockCount, BlockOffset + BlocksPerProcess, SpawnCount + 1);
read_file_spawner(_Filename, _BlocksPerProcess, _BlockSize, _TotalBlockCount, _BlockOffset, SpawnCount) ->
    {ok, SpawnCount}.

read_file_listener(0, Accum) ->
    {ok, Accum};
read_file_listener(SpawnCount, Accum) ->
    receive
    {ok, Number} ->
        io:format("Got ~p~n", [Number]),
        read_file_listener(SpawnCount - 1, Accum + Number)
    end.

read_file_worker(MasterPid, Filename, Offset, BlockSize, MaxBlocks) ->
    {ok, IoDevice} = file:open(Filename, [read, raw, binary]),
    {ok, Offset} = file:position(IoDevice, {bof, Offset}),
    {ok, Sum} = read_file_worker_loop(IoDevice, BlockSize, 0, MaxBlocks, 0),
    MasterPid ! {ok, Sum}.

read_file_worker_loop(IoDevice, BlockSize, BlocksRead, MaxBlocks, Accum) when BlocksRead < MaxBlocks ->
    {ok, BinData} = file:read(IoDevice, BlockSize),
    Number = binary:decode_unsigned(BinData),
    read_file_worker_loop(IoDevice, BlockSize, BlocksRead + 1, MaxBlocks, Accum + Number);
read_file_worker_loop(_IoDevice, _BlockSize, _BlocksRead, _MaxBlocks, Accum)  ->
    {ok, Accum}.

And here it is in action, reading 10 numbers from a file using 5 processes:

12> Numbers = [1,1,2,4,6,10,10,1,0,500].                                  
[1,1,2,4,6,10,10,1,0,500]
13> average:write_file("/tmp/test.binary", 32, Numbers).                  
ok
14> average:read_file("/tmp/file.binary", 2, 32, length(Numbers)).        
Got 500
Got 11
Got 6
Got 16
Got 2
Total sum: 535
Numbers seen: 10
Average: 53.5
ok

Altri suggerimenti

Why not launching a process for each file? I can't see the value of splitting small files into chunks. Your files are already small. This can be parallelized on many machines also.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top