Question

I have a data source that produces point at a potentially high rate, and I'd like to perform a possibly time-consuming operation on each point; but I would also like the system to degrade gracefully when it becomes overloaded, by dropping excess data points.

As far as I can tell, using a gen_event will never skip events. Conceptually, what I would like the gen_event to do is to drop all but the latest pending events before running the handlers again.

Is there a way to do this with standard OTP ? or is there a good reason why I should not handle things that way ?

So far the best I have is using a gen_server and relying on the timeout to trigger the expensive events:

-behaviour(gen_server).
init() -> 
    {ok, Pid} = gen_event:start_link(),
    {ok, {Pid, none}}.

handle_call({add, H, A},_From,{Pid,Data}) ->
    {reply, gen_event:add_handler(Pid,H,A), {Pid,Data}}.

handle_cast(Data,{Pid,_OldData}) -> 
    {noreply, {Pid,Data,0}}.  % set timeout to 0 

handle_info(timeout, {Pid,Data}) ->
    gen_event:sync_notify(Pid,Data),
    {noreply, {Pid,Data}}.

Is this approach correct ? (esp. with respect to supervision ? )

Was it helpful?

Solution

Is there a way to do this with standard OTP ?

No.

is there a good reason why I should not handle things that way ?

No, timing out early can increase the performance of the entire system. Read about how here.

Is this approach correct ? (esp. with respect to supervision ? )

No idea, you haven't provided the supervision code.


As a bit of extra information to your first question:

If you can use 3rd party libraries outside of OTP, there are a few out there that can add preemptive timeouts, which is what you are describing.

There are two that I am familiar with the first is dispcount, and the second is chick (I'm the author of chick, i'll try not to advertise the project here).

Dispcount works really good for single resources that only have a limited number of jobs that can be run at the same time and does no queuing. you can read about it here (warning lots of really interesting information!).

Dispcount didn't work for me because i would have had to spawn 4000+ pools of processes to handle the amount of different queues inside of my app. I wrote chick because I needed a way to dynamically increase and decrease my queue length, as well as being able to queue up requests and deny others, without having to spawn 4000+ pools of processes.

If I were you I would try out discount first (as most solutions do not need chick), and then if you need something a bit more dynamic then a pool that can respond to a certain number of requests try out chick.

OTHER TIPS

I can't comment on supervision, but I would implement this as a queue with expiring items.

I've implemented something that you can use below.

I made it a gen_server; when you create it you give it a maximum age for old items.

Its interface is that you can send it items to be processed and you can request items that have not been dequeued It records the time at which it receives every item. Every time it receives an item to be processed, it checks all the items in the queue, dequeueing and discarding those that are older than the maximum age. (If you want the maximum age to be always respected, you can filter the queue before you return queued items)

Your data source will cast data ({process_this, Anything}) to the work queue and your (potentially slow) consumers process will call (gimme) to get data.

-module(work_queue).
-behavior(gen_server).

-export([init/1, handle_cast/2, handle_call/3]).

init(DiscardAfter) ->
  {ok, {DiscardAfter, queue:new()}}.

handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
  Instant = now(),
  Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
  Queue2 = queue:in({Instant, Data}, Queue1),
  {noreply, {DiscardAfter, Queue2}}.

handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
  case queue:is_empty(Queue0) of
    true ->
      {reply, no_data, State};
    false ->
      {{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
      {reply, {data, Data}, {DiscardAfter, Queue1}}
  end.

delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
  ((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.

too_old(Stamp, Instant, DiscardAfter) ->
  delta(Stamp, Instant) > DiscardAfter.

Little demo at the REPL:

c(work_queue).
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).         
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),      
timer:sleep(11 * 1000),                                                
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.        
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top