Finally there is no leak, it's just that I'm new to Java and was impatient with the garbage collection. In any case, I would like to know about the resetting / restarting of a scheduler.
Restarting a cancelled scheduler in akka
Question
I am just starting with Akka and have created a test application. In it I create a bunch of actors who create a scheduler to generate a heartbeat event. Upon another type of event, I cancel the scheduler with heartbeat.cancel();
, but I'd like to restart it when another event occurs. If I recreate the scheduler I see that the memory consumption increases continuously.
The question then would be either how do I resume the scheduler or how do I dispose the scheduler properly.
This is the code for that Actor
public class Device extends UntypedActor {
enum CommunicationStatus{
OK,
FAIL,
UNKNOWN
}
private static class Heartbeat {
}
public final String deviceId;
private CommunicationStatus commStatus;
private Cancellable heartBeatScheduler;
public Device(String Id)
{
deviceId = Id;
commStatus = CommunicationStatus.UNKNOWN;
}
@Override
public void preStart() {
getContext().system().eventStream().subscribe(getSelf(), DeviceCommunicationStatusUpdated.class);
startHeartbeat();
}
@Override
public void postStop() {
stopHeartBeat();
}
private void startHeartbeat() {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
log.info("Starting heartbeat");
heartBeatScheduler = getContext().system().scheduler().
schedule(Duration.Zero(),
Duration.create(1, TimeUnit.SECONDS),
getContext().self(),
new Heartbeat(),
getContext().system().dispatcher(),
ActorRef.noSender());
}
private void stopHeartBeat() {
if(!heartBeatScheduler.isCancelled()) {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
log.info("Stopping heartbeat");
heartBeatScheduler.cancel();
}
}
public String getDeviceId() {
return deviceId;
}
public CommunicationStatus getCommunicationStatus(){
return commStatus;
}
@Override
public void onReceive(Object message) throws Exception {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
if(message instanceof Heartbeat){
log.info("Pum, pum");
}
else if (message instanceof DeviceCommunicationStatusUpdated){
DeviceCommunicationStatusUpdated event = (DeviceCommunicationStatusUpdated) message;
if(event.deviceId == this.deviceId){
log.info("Received communication status update. '{}' is now {}", deviceId, event.state);
this.commStatus =
event.state == DeviceCommunicationStatusUpdated.State.OK ?
CommunicationStatus.OK : CommunicationStatus.FAIL;
if(commStatus == CommunicationStatus.OK && heartBeatScheduler.isCancelled()){
startHeartbeat();
}
else {
stopHeartBeat();
}
}
}
else unhandled(message);
}
}
Solution
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow