Aggregator that releases partial group based on correlation but holds on to rest of the messages

StackOverflow https://stackoverflow.com/questions/23349598

  •  11-07-2023
  •  | 
  •  

Question

I want to set the correlation strategy on an aggregator so that it uses a date out of the incoming file (as message) name to correlate files so all files with todays date belong to the same group. Now since I might have multiple days worth of data its possible that I have aggregated 2 days of files. I want to base the release strategy on a done file (message) that includes the date in the filename as well so essentially each day will have a bunch of files and a done for file. Ingesting done file should release files for that day from the aggregator but still keep the other day files until the done file for that day is ingested.

so in this scenario, correlation is obviously simple - but what I am not sure about is how to release not all but only some specific messages from the group based on the correlation key. Documentation talks about messagereaper but that goes into messagestore stuff and I want to do all this in memory.

let me elaborate with an example

i have these files on a directory which im polling by a file inbound channel adapter

file-1-2014.04.27.dat

file-2-2014.04.27.dat

file-3-2014.04.27.dat

done-2014.04.27.dat

file-1-2014.04.28.dat

file-2-2014.04.28.dat

done-2014.04.28.dat

as these files are being polled in i have an aggregator in the flow where all incoming files are being aggregated. To correlate I was thinking I can extract the date and put that in correlation_id header so that first 3 files are being considered to belong to one group and then second 2 files belong to the second group .. now once I consume the done-2014.04.27.dat file at that time I want to release the first 3 files to be further processed in the flow but hold on to

file-1-2014.04.28.dat

file-2-2014.04.28.dat

until I receive the

done-2014.04.28.dat

and then release these 2 files.

Any help would be appreciated. Thanks

Was it helpful?

Solution

I am not sure what you mean when you say "correlation is simple" but then go on to say you only want to release part of the group. If they have different dates then they will be in different groups, so there's no need to release part of a group, just release the whole group by running the reaper just after midnight (or any time the next day). It's not at all clear why you need a "done" message.

By default, the aggregator uses an in-memory message store (SimpleMessageStore).

EDIT:

Just put the done file in the same group and have your release strategy detect the presence of the done file. You could use an expression, but if the group can be large, it would be more efficient to implement ReleaseStrategy and iterate over MessageGroup.getMessages() looking for the done file.

The next step depends on what's downstream of the aggregator. If you use a splitter to split them back to separate files, you can simply add a filter to drop the done file. If you deal with the collection of files directly, either ignore the done file, or add a transformer to remove it from the collection.

With respect to the reaper; assuming files arrive in real time, I was simply suggesting that if you, say, run the reaper once a day (say at 01:00) with a group timeout of, say 30 minutes, then the reaper will release yesterday's files (without the need for a done file).

EDIT:

See my comment on your "answer" below - you have 2 subscribers on filesLogger.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top