- Print
- DarkLight
Google Dataflow
- Print
- DarkLight
Dataflow is a fully managed service for running Apache Beam pipelines that allow you to perform a variety of data processing tasks. Apache Beam is a programming model that defines pipelines which then can be executed on a number of runners such Apache Spark, Apache Flink, Apache Apex as well as Google Cloud Dataflow. Dataflow pipelines can be either batch or streaming.
Choosing Dataflow as the runner allows the user to focus on the pipeline whilst Dataflow manages and auto-scales the resources on-demand. The Dataflow runner uploads the executable code and dependencies to Cloud Storage then creates a Dataflow job that executes the pipeline.
It is possible to set the runner programmatically, or specify it using the command-line. The following section demonstrates how Google Cloud customers can set the Dataflow runner programmatically to read data from a table in DoordaHost and load it into a Data Sink of their choice such as Google Cloud Storage.
Pre-requisites
The project to house the Dataflow job and associated resources has been created. For more details please see https://cloud.google.com/resource-manager/docs/creating-managing-projects
Activate the Cloud Shell
Within the Dataflow application, activate the Cloud Shell by clicking the associated icon in the menu bar as shown below
The Cloud Shell is a built-in command line tool for the console. The Cloud Shell is used to manage the infrastructure and develop applications from within your browser. Cloud Shell comes with gcloud, Google’s Cloud SDK and an online code editor Cloud Code.
Ensure the Cloud Shell prompt references your PROJECT_ID that is described in the pre-requisites. If not, execute the following command:
> gcloud config set project [PROJECT_ID]
Setup Environment
Dataflow runs jobs using the Apache Beam SDK. In order to submit jobs to the Dataflow Service using Python, the development environment requires Python as well as the Apache Beam SDK for Python. Additionally, Dataflow uses pip3, Python’s package manager to manage SDK dependencies and virtualenv to create isolated Python environments.
In the Cloud Shell, execute the following commands to create a new Python environment called env:
> pip3 install –upgrade virtualenv user
> python3 -m virtualenv env
> source env/bin/activate
Install the required python libraries:
> pip3 install apache-beam[gcp]
> pip3 install doorda-sdk
Setup a Cloud Storage Bucket
Dataflow uses Cloud Storage buckets to store output data and cache pipeline code. In Cloud Shell, create a Cloud Storage bucket by using the gsutil Python application that facilitates access to Cloud Storage from the command line:
> gsutil mb gs://<YOUR_BUCKET_NAME>
Create Pipeline
In order to use Beam, a driver program needs to be created. The driver program defines the pipeline, including all of the inputs, transforms and outputs. It also sets the execution options for the pipeline which are typically passed in using command line options. This includes the pipeline runner, which, in turn, determines what back-end the pipeline will run on.
The Beam SDK provides a number of abstractions that simplify the mechanics of large scale distributed processing. The code example at the bottom of this page demonstrates using the Python SDK to execute a Pipeline across an arbitrary number of machines based on the value passed in the num_workers command line option. The pipeline splits the reading of records from the specified table across a number of machines and writes the resulting JSON to Google Cloud Storage.
The driver can either be coded directly inside Google Dataflow using Cloud Code or in an external editor and then uploaded directly to the Cloud Shell.
Define the dependencies for parallel processing
In Dataflow, data processing work is represented by a pipeline.
In order to take advantage of Dataflow’s parallel processing, all pipeline dependencies need to be made available on the remote machines that Dataflow manages during job execution. This can be achieved by defining all pipeline dependencies within a requirements.txt. This will be referenced as a pipeline option in order for Dataflow to ensure dependencies are copied to each remote machine participating in the job execution.
In this example, doorda-sdk must be passed to all remote machines:
> pip3 freeze | grep doorda-sdk > requirements.txt
Launch Pipeline
Use the Command Shell to launch the pipeline on the Cloud Dataflow service. The running pipeline is referred to as a jobs in the Dataflow application.
> python3 doorda_dataflow_demo.py [--pipeline_options]
The flags are divided between Dataflow specific options and user defined. For convenience the mandatory dataflow options to run the pipeline are shown below
Dataflow options:
Field | Value |
project | <YOUR_PROJECT_ID> |
runner | DataflowRunner |
temp_location | gs://<YOUR_BUCKET_NAME>/<TEMP_FOLDER_NAME> |
job_name | <Name to appear in the Dataflow console jobs list> |
num_workers | <INT> |
autoscaling_algorithm | NONE |
requirements_file | requirements.txt |
User defined options:
Field | Value |
username | <YOUR_DOORDA_USERNAME> |
password | <YOUR_DOORDA_PASSWORD> |
catalog | <DOORDA_CATALOG> |
schema | <DOORDA_SCHEMA> |
table | <DOORD_TABLE> |
Driver Program - doorda_dataflow_demo.py:
import apache_beam as beam
from apache_beam.io import restriction_trackers
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.transforms.core import RestrictionProvider
from apache_beam.io.restriction_trackers import OffsetRange, OffsetRestrictionTracker
from doorda_sdk.host import client
from contextlib import contextmanager
import logging
import math
@contextmanager
def open_cursor(username, password, catalog, schema):
conn = client.connect(username=username,
password=password,
catalog=catalog,
schema=schema)
print(f'successfully connected to Doorda')
cursor = conn.cursor()
yield cursor
cursor.close()
class CursorByName():
"""
Iterator that produces json output consisting of column_name:value from cursor
"""
def __init__(self, cursor):
self._cursor = cursor
def __iter__(self):
return self
def __next__(self):
row = self._cursor.__next__()
return {description[0]: row[col] for col, description in enumerate(self._cursor.description) }
class DoordaOptions(PipelineOptions):
"""
Application specific Pipeline Options
"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--username', required=True)
parser.add_argument('--password', required=True)
parser.add_argument('--catalog', required=True)
parser.add_argument('--schema', required=True)
parser.add_argument('--table', required=True)
parser.add_argument('--output', required=True)
class QuerySplitter:
"""
Defines how a query will be divided across the cluster
"""
def __init__(self, options, num_workers=3):
with open_cursor(options.username,
options.password,
options.catalog,
options.schema) as cursor:
logging.info(f'options are {options}')
cursor.execute(f'select count(1) from {options.catalog}.{options.schema}.{options.table}')
row_count = cursor.fetchone()[0]
logging.info(f'row count is {row_count}')
self._start = 0
self._stop = row_count
self._num_workers = num_workers
def split(self):
split_size = math.ceil(self._stop/self._num_workers)
logging.info(f'split_size = {split_size}')
i = self._start
while i < self._stop - split_size:
logging.info(f'yielding offset ({i}, {i+split_size})')
yield (i, i+ split_size)
i += split_size
logging.info(f'final offset yield ({i}, {self._stop})')
yield (i, self._stop)
class QueryFn(beam.DoFn):
"""
Query to Parallelise across Cluster.
"""
def __init__(self, options):
self._options = options
def process(self, query_range):
start = query_range[0]
stop = query_range[1]
logging.info(f'process {self._options.table}, offset={start}, limit={stop-start}')
with open_cursor(self._options.username,
self._options.password,
self._options.catalog,
self._options.schema) as cursor:
cursor.execute(f'select * from {self._options.catalog}.{self._options.schema}.{self._options.table} offset {start} limit {stop-start}')
#for row in cursor.iter_result():
for row in CursorByName(cursor):
yield row
def run(argv=None):
pipeline_options = DoordaOptions()
logging.info(f'arguments are: {pipeline_options.get_all_options()}')
splitter = QuerySplitter(pipeline_options, pipeline_options.get_all_options()["num_workers"])
inputs = [x for x in splitter.split()]
#This is required to ensure open_cursor global open_cursor is available to all nodes.
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
# load data from DoordaHost
| 'Doorda Get Tables' >> beam.Create(inputs)
| 'Doorda Read' >> beam.ParDo(QueryFn(pipeline.options))
| 'Write Records' >> beam.io.WriteToText(pipeline.options.output)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()