Question

I try to make a Jersey webservice that allows clients to create Jobs. These Jobs are stored in a database, using Hibernate as persistence provider. The jobs will be executed in the background by a Scheduled service, that I would like to schedule with Spring.

I created a Spring Scheduled method, like this:

@Service
public class MyTimedService
{
    @Inject
    IJobs allJobs;

    private static final Logger LOG = LoggerFactory.getLogger( MyTimedService.class );


    @Scheduled(fixedRate=5000)
    public void processJobs()
    {
        for(BaseJob job: allJobs.getQueuedJobs())
        {
            processJob(job, new JobContext());
        }
    }


private void processJob( final BaseJob job, JobContext context ) throws JobException
{
    job.start();

    LOG.info( "Starting: " + job.getName() );
    job.execute( context );
    LOG.info( "Finished: " + job.getName() );

    if ( job.getErrors().size() > 0 )
    {
        Throwable e = job.getErrors().get( 0 );
        throw new JobException( e );
    }
    job.finished();

}
...
}

Because the Job will run for a long time, I somehow need to make the job.start() report a state change (from QUEUE to IN_PROGRESS) into the database. Before, I used a command-line implementation and had my own transaction management, basically begin() and commit() just around the job.start().

Now I need to make it work using Spring...

Any advice on how to separate the concerns and make this work?

Was it helpful?

Solution

Edit

One thing I do not really understand is why the doWork needs one big transaction.

It does not have to be. There are caveats in either direction. I have noted some of these in the revised class blow (JobRunnerService) above the doWork(...) method. Those notes are worth...noting.

What I would like to achive is that the doWork regularly can set the job's progress

This may or may not be difficult to achieve depending on whether or not you want doWork(...) bound to a Transaction and whether or not each Job can be broken up the same way (ie: The updates would always occur at a static location in the code). I don't know all of your requirements, so I can't really answer this question. However, I would reiterate my advice on looking into Spring Batch.

JobRunnerService

import me.mike.jobs.model.Job;
import me.mike.jobs.model.JobState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;

/**
 * !!This bean is STATEFUL!!
 */
@Service
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class JobRunnerService {
    @Autowired
    private JobService js;

    public void processJob(Job job) {
        job.setState(JobState.WORKING_0);
        js.update(job);
        try {
            doWork(job);
            job.setState(JobState.COMPLETE);
        } catch (Exception e) {
            job.setState(JobState.FAILED);
        }
        System.out.println("I'm done working.");
        js.update(job);
    }

    /**
     * Be sure that any unchecked exception you throw gets added into the "rollbackFor" since it won't trigger
     * a rollback if you don't...
     *
     * The @Transactional is optional - I assumed you would want the work performed in the job to be transactional.
     *
     * Note: Remember, when doing the work represented by these jobs, that your EntityManager (or SessionFactory) is
     * configured with a TransactionManager and, as such, will throw exceptions when you attempt to do work within them
     * without a Transaction.  You will either need a separate EntityManager (SessionFactory) or something like a
     * JdbcTemplate.
     *
     * Note: If the Job's work DOES need to be Transactional, this will probably not work.  A very simple solution
     * would to be to split up the work within the job into "steps" or "stages."  The processJob(...) method above
     * could then call each stage and, at the conclusion, update the Job's state appropriately.  This, of course,
     * would not work if each Job had N number of stages where N could vary an indeterminate amount.
     */
    //@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = { IllegalArgumentException.class })
    public void doWork(Job job) throws IllegalArgumentException {
        // This method begins its own transaction, every single time its called.  Period.
        // Do some work...
        job.setState(JobState.WORKING_10);
        js.update(job);
        // Do more work...
        job.setState(JobState.WORKING_90);
        js.update(job);
        // At the conclusion, the transaction bound to this method is committed, unless a rollback was initiated.
    }
}

Preface: I think it would be wise of you to look into leveraging something like SpringBatch. It may require more config that this, but it also gives a lot more support.

If I understand you right, you want to store "Jobs" in a table (RESTful creation). You want an @Scheduled task that can periodically run in the background to perform the work that each of those jobs represent. You further want to change the state (heh) on each of those entities before and after you work on them. The caveat being that the initial state change needs to occur within its own transactional bounds as does the inevitable ending state change.

I've ran this code against a MySQL 5.x DB using Spring, JPA, and Hibernate. If you need, I can provide my applicationContext and my rest-servlet xml files for you.

This will perform what I understand your stated goals to be:

The Model:

import org.hibernate.validator.constraints.Length;

import javax.persistence.*;
import javax.validation.constraints.NotNull;
import java.util.UUID;

@Entity
public class Job {
    @Id
    private String id;

    @Column
    @NotNull
    @Length(min = 3, max = 50)
    private String name;

    @Enumerated(EnumType.STRING)
    @Column(length = 50, nullable = false)
    private JobState state;

    public UUID getId() {
        return UUID.fromString(id);
    }

    public void setId(UUID id) {
        this.id = id.toString();
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public JobState getState() {
        return state;
    }

    public void setState(JobState state) {
        this.state = state;
    }
}

The Repository:

import me.mike.jobs.model.Job;
import me.mike.jobs.model.JobState;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

@Repository
public class JobDao {
    @PersistenceContext
    private EntityManager em;


    @Transactional(propagation = Propagation.REQUIRED)
    public void create(Job job) {
        // ...
    }

    @Transactional(propagation = Propagation.REQUIRED, readOnly = true)
    public Set<Job> readAll() {
        // ...
    }

    @Transactional(propagation = Propagation.REQUIRED, readOnly = true)
    public Job readById(UUID id) {
        // ...
    }

    @Transactional(propagation = Propagation.REQUIRED, readOnly = true)
    public Set<Job> readByState(JobState state) {
        // ...
    }

    @Transactional(propagation = Propagation.REQUIRED)
    public void update(Job job) {
        // ...
    }

    @Transactional(propagation = Propagation.REQUIRED)
    public void delete(Job job) {
        // ...
    }
}

The JobService (This handles the RESTful actions on your Job entity)

import me.mike.jobs.dao.JobDao;
import me.mike.jobs.model.Job;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.Set;

@Service
public class JobService {
    @Autowired
    private JobDao jd;

    @Transactional(propagation = Propagation.REQUIRED)
    public void create(Job job) {
        // Business logic...
        jd.create(job);
        // More business logic...
    }

    @Transactional(propagation = Propagation.REQUIRED, readOnly = true)
    public Set<Job> read() {
        // Business logic...
        Set<Job> jobs = jd.readAll();
        // More business logic...
        return jobs;
    }

    @Transactional(propagation = Propagation.REQUIRED)
    public void update(Job job) {
        // Business logic...
        jd.update(job);
        // More business logic...
    }

    @Transactional(propagation = Propagation.REQUIRED)
    public void delete(Job job) {
        // Business logic...
        jd.delete(job);
        // More business logic...
    }
}

The MaintenanceService (This guy would hold all of your @ScheduledTask methods)

import me.mike.jobs.dao.JobDao;
import me.mike.jobs.model.Job;
import me.mike.jobs.model.JobState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class MaintenanceService {
    @Autowired
    private JobRunnerService jrs;

    @Autowired
    private JobDao jd;

    @Scheduled(fixedDelay = 5000, initialDelay = 5000)
    public void processQueuedJobs() {
        // This may be somewhat dangerous depending on how many jobs could potentially be racked up during the 'downtime'
        for (Job curJob : jd.readByState(JobState.QUEUED))
            jrs.processJob(curJob);
    }

    // Any other timed service methods...
}

The JobRunnerService This is the Service that actually runs the jobs

import me.mike.jobs.model.Job;
import me.mike.jobs.model.JobState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

/**
 * !!This bean is STATEFUL!!
 */
@Service
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class JobRunnerService {
    @Autowired
    private JobService js;

    public void processJob(Job job) {
        job.setState(JobState.WORKING);
        js.update(job);
        try {
            doWork(job);
            job.setState(JobState.COMPLETE);
        } catch (Exception e) {
            job.setState(JobState.FAILED);
        }
        System.out.println("I'm done working.");
        js.update(job);
    }

    /**
     * Be sure that any unchecked exception you throw gets added into the "rollbackFor" since it won't trigger
     * a rollback if you don't...
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = { IllegalArgumentException.class })
    public void doWork(Job job) throws IllegalArgumentException {
        // This method begins its own transaction, every single time its called.  Period.
        // Do your work here...
        // At the conclusion, the transaction bound to this method is committed, unless a rollback was initiated.
    }
}

OTHER TIPS

I'm assuming that you have annotation driven transaction management enabled in your spring configuration

@Service
public class MyTimedService {

    @Inject
    IJobs allJobs;

    @Inject
    JobService jobService;

    private static final Logger LOG = LoggerFactory.getLogger( MyTimedService.class );

    @Scheduled(fixedRate=5000)
    public void processJobs() {
        for(BaseJob job: allJobs.getQueuedJobs()) {
            processJob(job, new JobContext());
        }
    }

    private void processJob( final BaseJob job, JobContext context ) throws JobException {
        jobService.start(job);

        LOG.info( "Starting: " + job.getName() );
        job.execute( context );
        LOG.info( "Finished: " + job.getName() );

        if ( job.getErrors().size() > 0 ) {
            Throwable e = job.getErrors().get( 0 );
            throw new JobException( e );
        }

        jobService.complete(job);

    }

}

@Service
public class JobService {

    @Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW)
    public void start(BaseJob job){
        job.start();
    }

    @Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW)
    public void complete(BaseJob job){
        job.finished();
    }

}

Another point to be kept in mind

If there is an exception in processing the job, its status will remain IN_PROGRESS instead of something like COMPLETED_WITH_EXCEPTION.

Before I suggest my idea, I should say that what you describe as a problem is quite general and can be dealt with different perspectives. I try to reuse your code as much as possible.

  1. Configure Spring transactions (spring-tx) module for your project. This allows to use @Transactional on the methods for persistent transactions.
  2. I assume that what you denote by IJobs is a job repository that follows one of the standard Spring persistent implementation such as Spring JPA or Spring Repositories
  3. I reuse your code in the following
    • Try to separate the model that represent the job and is persisted (JobModel) from the object that represents an executable job (ExecutableJob). You can have a simple method to map these two together.
    • Make the "smallest" possible code block to be "transactional". Method updateJobStatus has one responsibility that updates the status of a job.
    • Use the method that updates the job status where is necessary. This includes when starting the job, finishing the job with success, and a situation in which either the job has finished with failure or a runtime exception may occur and you want to report the status again.

Reused schematic code:

@Service
public class LongRunningJobService {

    @Inject
    JobRepository jobs; // IJobs

    @Scheduled(fixedDelay = 60000)
    public void processJobs() {
        for (JobModel j : jobs.getQueuedJobs()) {
            JobContext context = null;
            processJob(j, context);
        }
    }

    protected void processJob(JobModel jobModel, JobContext context) {
        // update the status of the job
        updateJobStatus(jobModel, JobStatus.RUNNING);

        ExecutableJob job = null; // createJob(jobModel);
        job.execute(context);

        // process job results
            // if necessary, catch exceptions and again update job status

        // success
        updateJobStatus(jobModel, JobStatus.FINISHED);

    }

    @Transactional
    protected void updateJobStatus(JobModel jobModel, JobStatus status) {
        jobs.updateJobStatus(jobModel, status);
    }

    static enum JobStatus {
        QUEUED, RUNNING, FINISHED;
    }

}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top