Question

I'm trying to build a Google dataflow pipeline through one of the posts in Medium.

https://levelup.gitconnected.com/scaling-scikit-learn-with-apache-beam-251eb6fcf75b

However, it seems like I'm missing one of the project argument and it throws the following error. I'd appreciate your help to guide me through.

Error:

ERROR:apache_beam.runners.direct.executor:Giving up after 4 attempts.
WARNING:apache_beam.runners.direct.executor:A task failed with exception: Missing executing project information. Please use the --project command line option to specify it.

Code:

import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import json

query = """
    SELECT year, plurality, apgar_5min, 
    mother_age, father_age,    
       gestation_weeks, ever_born
       ,case when mother_married = true 
          then 1 else 0 end as mother_married
      ,weight_pounds as weight
      ,current_timestamp as time
      ,GENERATE_UUID() as guid
    FROM `bigquery-public-data.samples.natality` 
    limit 100    
"""

class ApplyDoFn(beam.DoFn):

    def __init__(self):
        self._model = None
        from google.cloud import storage
        import pandas as pd
        import pickle as pkl
        self._storage = storage
        self._pkl = pkl
        self._pd = pd

    def process(self, element):
        if self._model is None:
            bucket = self._storage.Client().get_bucket('dsp_model_store')
            blob = bucket.get_blob('natality/sklearn-linear')
            self._model = self._pkl.loads(blob.download_as_string())

        new_x = self._pd.DataFrame.from_dict(element, orient = "index").transpose().fillna(0)   
        weight = self._model.predict(new_x.iloc[:,1:8])[0]
        return [ { 'guid': element['guid'], 'weight': weight, 'time': str(element['time']) } ]

schema = parse_table_schema_from_json(json.dumps({'fields':
            [ { 'name': 'guid', 'type': 'STRING'},
              { 'name': 'weight', 'type': 'FLOAT64'},
              { 'name': 'time', 'type': 'STRING'} ]}))

class PublishDoFn(beam.DoFn):

    def __init__(self):
        from google.cloud import datastore       
        self._ds = datastore

    def process(self, element):
        client = self._ds.Client()
        key = client.key('natality-guid', element['guid'])
        entity = self._ds.Entity(key)
        entity['weight'] = element['weight']         
        entity['time'] = element['time']
        client.put(entity)

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)

# define the pipeline steps
p = beam.Pipeline(options=pipeline_options)
data = p | 'Read from BigQuery' >> beam.io.Read(
       beam.io.BigQuerySource(query=query, use_standard_sql=True))
scored = data | 'Apply Model' >> beam.ParDo(ApplyDoFn())
scored | 'Save to BigQuery' >> beam.io.Write(beam.io.BigQuerySink(
                'weight_preds', 'dsp_demo', schema = schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

scored | 'Create entities' >> beam.ParDo(PublishDoFn())

# run the pipeline
result = p.run()
result.wait_until_finish()
Was it helpful?

Solution

The error tells you to specify the project ID associated to the project in which you want to run the Dataflow job in.

In the Python SDK, you can set this and other Google Cloud related variables in the following way:

gcloud_options = pipeline_options.view_as(GoogleCloudOptions)
gcloud_options.project = '<insert project ID here>'
(...)
p = beam.Pipeline(options=pipeline_options)

For the full snippet, you can check the Dataflow documentation.

Licensed under: CC-BY-SA with attribution
Not affiliated with datascience.stackexchange
scroll top