Google DataFlow
-
10-12-2020 - |
Pergunta
I'm trying to build a Google dataflow pipeline through one of the posts in Medium.
https://levelup.gitconnected.com/scaling-scikit-learn-with-apache-beam-251eb6fcf75b
However, it seems like I'm missing one of the project argument and it throws the following error. I'd appreciate your help to guide me through.
Error:
ERROR:apache_beam.runners.direct.executor:Giving up after 4 attempts.
WARNING:apache_beam.runners.direct.executor:A task failed with exception: Missing executing project information. Please use the --project command line option to specify it.
Code:
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import json
query = """
SELECT year, plurality, apgar_5min,
mother_age, father_age,
gestation_weeks, ever_born
,case when mother_married = true
then 1 else 0 end as mother_married
,weight_pounds as weight
,current_timestamp as time
,GENERATE_UUID() as guid
FROM `bigquery-public-data.samples.natality`
limit 100
"""
class ApplyDoFn(beam.DoFn):
def __init__(self):
self._model = None
from google.cloud import storage
import pandas as pd
import pickle as pkl
self._storage = storage
self._pkl = pkl
self._pd = pd
def process(self, element):
if self._model is None:
bucket = self._storage.Client().get_bucket('dsp_model_store')
blob = bucket.get_blob('natality/sklearn-linear')
self._model = self._pkl.loads(blob.download_as_string())
new_x = self._pd.DataFrame.from_dict(element, orient = "index").transpose().fillna(0)
weight = self._model.predict(new_x.iloc[:,1:8])[0]
return [ { 'guid': element['guid'], 'weight': weight, 'time': str(element['time']) } ]
schema = parse_table_schema_from_json(json.dumps({'fields':
[ { 'name': 'guid', 'type': 'STRING'},
{ 'name': 'weight', 'type': 'FLOAT64'},
{ 'name': 'time', 'type': 'STRING'} ]}))
class PublishDoFn(beam.DoFn):
def __init__(self):
from google.cloud import datastore
self._ds = datastore
def process(self, element):
client = self._ds.Client()
key = client.key('natality-guid', element['guid'])
entity = self._ds.Entity(key)
entity['weight'] = element['weight']
entity['time'] = element['time']
client.put(entity)
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)
# define the pipeline steps
p = beam.Pipeline(options=pipeline_options)
data = p | 'Read from BigQuery' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))
scored = data | 'Apply Model' >> beam.ParDo(ApplyDoFn())
scored | 'Save to BigQuery' >> beam.io.Write(beam.io.BigQuerySink(
'weight_preds', 'dsp_demo', schema = schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
scored | 'Create entities' >> beam.ParDo(PublishDoFn())
# run the pipeline
result = p.run()
result.wait_until_finish()
Solução
The error tells you to specify the project ID associated to the project in which you want to run the Dataflow job in.
In the Python SDK, you can set this and other Google Cloud related variables in the following way:
gcloud_options = pipeline_options.view_as(GoogleCloudOptions)
gcloud_options.project = '<insert project ID here>'
(...)
p = beam.Pipeline(options=pipeline_options)
For the full snippet, you can check the Dataflow documentation.
Licenciado em: CC-BY-SA com atribuição
Não afiliado a datascience.stackexchange