I've done this many times in other programming languages, and it was a fun learning exercise in Erlang.
The basic strategy is that you tell each process its starting byte offset, record size, and number of records to read. (If you have different-sized records in one file, you might pass a list of record sizes, like [200, 800, 800, 800, 800, 200, 100]
). The workers process their chunk independently and return the result to the parent.
I think it's easy enough that you can figure it out yourself. You'll want to look at erlang:spawn
, file:open
, file:read
, and file:position
as the main pieces. But if you want the spoilers, here's my implementation of a module that reads numbers from a binary file and finds the average using multiple processes. (I'm not an Erlang expert, so there may be better ways of doing this.)
-module(average).
-export([write_file/3, read_file/4]).
-export([read_file_worker/5]).
write_file(Filename, BlockSize, ListOfNumbers) ->
BS = BlockSize*8,
BinData = [<<X:BS>> || X <- ListOfNumbers],
{ok, IoDevice} = file:open(Filename, [write, raw, binary]),
file:write(IoDevice, BinData),
file:close(IoDevice).
read_file(Filename, BlocksPerProcess, BlockSize, TotalBlockCount) ->
{ok, SpawnCount} = read_file_spawner(Filename, BlocksPerProcess, BlockSize, TotalBlockCount, 0, 0),
{ok, Sum} = read_file_listener(SpawnCount, 0),
io:format("Total sum: ~p~nNumbers seen: ~p~nAverage: ~p~n", [Sum, TotalBlockCount, Sum/TotalBlockCount]).
read_file_spawner(Filename, BlocksPerProcess, BlockSize, TotalBlockCount, BlockOffset, SpawnCount) when BlockOffset < TotalBlockCount ->
Offset = BlockOffset * BlockSize,
MaxBlocks = min(BlocksPerProcess, TotalBlockCount - BlockOffset),
spawn(?MODULE, read_file_worker, [self(), Filename, Offset, BlockSize, MaxBlocks]),
read_file_spawner(Filename, BlocksPerProcess, BlockSize, TotalBlockCount, BlockOffset + BlocksPerProcess, SpawnCount + 1);
read_file_spawner(_Filename, _BlocksPerProcess, _BlockSize, _TotalBlockCount, _BlockOffset, SpawnCount) ->
{ok, SpawnCount}.
read_file_listener(0, Accum) ->
{ok, Accum};
read_file_listener(SpawnCount, Accum) ->
receive
{ok, Number} ->
io:format("Got ~p~n", [Number]),
read_file_listener(SpawnCount - 1, Accum + Number)
end.
read_file_worker(MasterPid, Filename, Offset, BlockSize, MaxBlocks) ->
{ok, IoDevice} = file:open(Filename, [read, raw, binary]),
{ok, Offset} = file:position(IoDevice, {bof, Offset}),
{ok, Sum} = read_file_worker_loop(IoDevice, BlockSize, 0, MaxBlocks, 0),
MasterPid ! {ok, Sum}.
read_file_worker_loop(IoDevice, BlockSize, BlocksRead, MaxBlocks, Accum) when BlocksRead < MaxBlocks ->
{ok, BinData} = file:read(IoDevice, BlockSize),
Number = binary:decode_unsigned(BinData),
read_file_worker_loop(IoDevice, BlockSize, BlocksRead + 1, MaxBlocks, Accum + Number);
read_file_worker_loop(_IoDevice, _BlockSize, _BlocksRead, _MaxBlocks, Accum) ->
{ok, Accum}.
And here it is in action, reading 10 numbers from a file using 5 processes:
12> Numbers = [1,1,2,4,6,10,10,1,0,500].
[1,1,2,4,6,10,10,1,0,500]
13> average:write_file("/tmp/test.binary", 32, Numbers).
ok
14> average:read_file("/tmp/file.binary", 2, 32, length(Numbers)).
Got 500
Got 11
Got 6
Got 16
Got 2
Total sum: 535
Numbers seen: 10
Average: 53.5
ok