Domanda

I'm trying to run the following code, but the status variable is always "PENDING". Could you please tell me what am I doing wrong?

Job execute = bigquery.jobs().insert(PROJECT_ID, runJob).execute();

String status;
while(status.equalsIgnoreCase("PENDING")) {
  status = execute.getStatus().getState();
  System.out.println("Status: " + status);
  Thread.wait(1000);
}
È stato utile?

Soluzione

Your code isn't making a request to BigQuery to get the updated state, it's just checking the state of the Job returned by the insert call.

Instead, you should poll for the state of the job by issuing a jobs.get request, and check that state, e.g.:

Job job = bigquery.jobs().insert(PROJECT_ID, runJob).execute();
String status = job.getStatus().getState();
while(!status.equalsIgnoreCase("DONE")) {
  status = bigquery.jobs().get(PROJECT_ID, job.getId()).execute().getStatus().getState();
  System.out.println("Status: " + status);
  Thread.wait(1000);
}

*Edited based on Jordan Tigani's comment.

Altri suggerimenti

I have realized that checking until the status is not "Done" might not yield the error at all times. Sometimes, the error can be caught after the job is in the "Done" state. i.e., Job goes from "pending" to "done" in some errors, skipping the "running" stage. Therefore, it might be good to check the error field in job['status'] even after the job is "Done".

Rather than have a busy wait loop synchronously blocking the thread running the insert, I've gone with a scheduled thread that maintains a queue of job id's. It loops through the jobs and checks their status, logging errors when discovered.

The crucial bits here are,

  1. Schedule a thread to monitor jobs

    jobPollScheduler.scheduleAtFixedRate(new JobPoll(), SCHEDULE_SECONDS, SCHEDULE_SECONDS, TimeUnit.SECONDS);
  1. loop through a queue of jobs and check their progress. Re-queue anything that isn't DONE

    while ((job = jobs.poll()) != null) {
      final Job statusJob = bigQuery.jobs().get(projectId, job.jobId).execute();
      if ("DONE".equals(statusJob.getStatus().getState())) {
        final ErrorProto errorResult = statusJob.getStatus().getErrorResult();
        if (errorResult == null || errorResult.toString() == null) {
          logger.debug("status={}, job={}", statusJob.getStatus().getState(), job);
        } else {
          logger.error("status={}, errorResult={}, job={}", statusJob.getStatus().getState(), errorResult, job);
        }
      } else {
        // job isn't done, yet. Add it back to queue.
        add(job.jobId);
        logger.debug("will check again, status={}, job={}", statusJob.getStatus().getState(), job);
      }
    }

The full working set of classes


import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nonnull;

/**
 * Monitor BigQuery inserts
 */
public class BigQueryMonitorSo21064586 {

  private static final Logger logger = LoggerFactory.getLogger(BigQueryMonitorSo21064586.class);
  private static final int SCHEDULE_SECONDS = 5;

  private final ScheduledExecutorService jobPollScheduler =
      Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("big-query-monitory-%d").build());
  private final Queue jobs = new DelayQueue();
  private final Supplier connectionSupplier;
  private final String projectId;

  /**
   * @param connectionSupplier gives us a connection to BigQuery
   * @param projectId Google cloud project
   */
  public BigQueryMonitorSo21064586(@Nonnull final Supplier connectionSupplier, @Nonnull final String projectId) {
    this.connectionSupplier = connectionSupplier;
    this.projectId = projectId;
  }

  public BigQueryMonitorSo21064586 start() {
    jobPollScheduler.scheduleAtFixedRate(new JobPoll(), SCHEDULE_SECONDS, SCHEDULE_SECONDS, TimeUnit.SECONDS);
    return this;
  }

  /**
   * @param jobId insert query job id
   */
  public void add(final String jobId) {
    final DelayedJobCheck job = new DelayedJobCheck(jobId);
    try {
      if (!jobs.offer(job)) {
        logger.error("could not enqueue BigQuery job, job={}", job);
      }
    } catch (final Exception e) {
      logger.error("failed to add job to queue, job={}", job, e);
    }
  }

  public void shutdown() {
    jobPollScheduler.shutdown();
  }

  private class JobPoll implements Runnable {

    /**
     * go through the queue and remove anything that is done
     */
    @Override
    public void run() {
      try {
        final Bigquery bigQuery = connectionSupplier.get();
        DelayedJobCheck job;
        while ((job = jobs.poll()) != null) {
          final Job statusJob = bigQuery.jobs().get(projectId, job.jobId).execute();
          if ("DONE".equals(statusJob.getStatus().getState())) {
            final ErrorProto errorResult = statusJob.getStatus().getErrorResult();
            if (errorResult == null || errorResult.toString() == null) {
              logger.debug("status={}, job={}", statusJob.getStatus().getState(), job);
            } else {
              logger.error("status={}, errorResult={}, job={}", statusJob.getStatus().getState(), errorResult, job);
            }
          } else {
            // job isn't done, yet. Add it back to queue.
            add(job.jobId);
            logger.debug("will check again, status={}, job={}", statusJob.getStatus().getState(), job);
          }
        }
      } catch (final Exception e) {
        logger.error("exception monitoring big query status, size={}", jobs.size(), e);
      }
    }
  }

  private static class DelayedJobCheck extends DelayedImpl {

    private final String jobId;

    DelayedJobCheck(final String jobId) {
      super(SCHEDULE_SECONDS, TimeUnit.SECONDS);
      this.jobId = jobId;
    }

    @Override
    public boolean equals(final Object obj) {
      if (this == obj) {
        return true;
      }
      if (obj == null || getClass() != obj.getClass()) {
        return false;
      }
      if (!super.equals(obj)) {
        return false;
      }
      final DelayedJobCheck other = (DelayedJobCheck) obj;
      return Objects.equals(jobId, other.jobId);
    }

    @Override
    public int hashCode() {
      return Objects.hash(super.hashCode(), jobId);
    }
  }

  private static class DelayedImpl implements Delayed {

    /**
     * timestamp when delay expires
     */
    private final long expiry;

    /**
     * @param amount how long the delay should be
     * @param timeUnit units of the delay
     */
    DelayedImpl(final long amount, final TimeUnit timeUnit) {
      final long more = TimeUnit.MILLISECONDS.convert(amount, timeUnit);
      expiry = System.currentTimeMillis() + more;
    }

    @Override
    public long getDelay(@Nonnull final TimeUnit unit) {
      final long diff = expiry - System.currentTimeMillis();
      return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(@Nonnull final Delayed o) {
      return Longs.compare(expiry, ((DelayedImpl) o).expiry);
    }

    @Override
    public boolean equals(final Object obj) {
      if (this == obj) {
        return true;
      }
      if (!(obj instanceof DelayedImpl)) {
        return false;
      }
      final DelayedImpl delayed = (DelayedImpl) obj;
      return expiry == delayed.expiry;
    }

    @Override
    public int hashCode() {
      return Objects.hash(expiry);
    }
  }
}
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top