Question

I am just starting with Akka and have created a test application. In it I create a bunch of actors who create a scheduler to generate a heartbeat event. Upon another type of event, I cancel the scheduler with heartbeat.cancel();, but I'd like to restart it when another event occurs. If I recreate the scheduler I see that the memory consumption increases continuously.

The question then would be either how do I resume the scheduler or how do I dispose the scheduler properly.

This is the code for that Actor

public class Device extends UntypedActor {

    enum CommunicationStatus{
        OK,
        FAIL,
        UNKNOWN
    }

    private static class Heartbeat {

    }

    public final String deviceId;
    private CommunicationStatus commStatus;
    private Cancellable heartBeatScheduler;

    public Device(String Id)
    {
        deviceId = Id;
        commStatus = CommunicationStatus.UNKNOWN;

    }

    @Override
    public void preStart() {
        getContext().system().eventStream().subscribe(getSelf(), DeviceCommunicationStatusUpdated.class);
        startHeartbeat();
    }

    @Override
    public void postStop() {
        stopHeartBeat();
    }

    private void startHeartbeat() {

        LoggingAdapter log = Logging.getLogger(getContext().system(), this);
        log.info("Starting heartbeat");

        heartBeatScheduler = getContext().system().scheduler().
                schedule(Duration.Zero(),
                        Duration.create(1, TimeUnit.SECONDS),
                        getContext().self(),
                        new Heartbeat(),
                        getContext().system().dispatcher(),
                        ActorRef.noSender());
    }

    private void stopHeartBeat() {
        if(!heartBeatScheduler.isCancelled()) {
            LoggingAdapter log = Logging.getLogger(getContext().system(), this);
            log.info("Stopping heartbeat");
            heartBeatScheduler.cancel();
        }
    }

    public String getDeviceId() {
        return deviceId;
    }

    public CommunicationStatus getCommunicationStatus(){
        return commStatus;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        if(message instanceof Heartbeat){
            log.info("Pum, pum");
        }
        else if (message instanceof DeviceCommunicationStatusUpdated){
            DeviceCommunicationStatusUpdated event = (DeviceCommunicationStatusUpdated) message;

            if(event.deviceId == this.deviceId){
                log.info("Received communication status update. '{}' is now {}", deviceId, event.state);
                this.commStatus =
                        event.state == DeviceCommunicationStatusUpdated.State.OK ?
                                CommunicationStatus.OK : CommunicationStatus.FAIL;

                if(commStatus == CommunicationStatus.OK && heartBeatScheduler.isCancelled()){
                    startHeartbeat();
                }
                else {
                    stopHeartBeat();
                }

            }
        }

        else unhandled(message);
    }
}
Was it helpful?

Solution

Finally there is no leak, it's just that I'm new to Java and was impatient with the garbage collection. In any case, I would like to know about the resetting / restarting of a scheduler.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top