Google Dataflow
    • Dark
      Light

    Google Dataflow

    • Dark
      Light

    Article summary

    Dataflow is a fully managed service for running Apache Beam pipelines that allow you to perform a variety of data processing tasks.   Apache Beam is a programming model that defines pipelines which then can be executed on a number of runners such Apache Spark, Apache Flink, Apache Apex as well as Google Cloud Dataflow.   Dataflow pipelines can be either batch or streaming.

     Choosing Dataflow as the runner allows the user to focus on the pipeline whilst Dataflow manages and auto-scales the resources on-demand.  The Dataflow runner uploads the executable code and dependencies to Cloud Storage then creates a Dataflow job that executes the pipeline.   

     It is possible to set the runner programmatically, or specify it using the command-line.  The following section demonstrates how Google Cloud customers can set the Dataflow runner programmatically to read data from a table in DoordaHost and load it into a Data Sink of their choice such as Google Cloud Storage.  

    Pre-requisites

    The project to house the Dataflow job and associated resources has been created.  For more details please see https://cloud.google.com/resource-manager/docs/creating-managing-projects

    Activate the Cloud Shell

    Within the Dataflow application, activate the Cloud Shell by clicking the associated icon in the menu bar as shown below

    The Cloud Shell is a built-in command line tool for the console.  The Cloud Shell is used to manage the infrastructure and develop applications from within your browser.  Cloud Shell comes with gcloud, Google’s Cloud SDK and an online code editor Cloud Code.

     Ensure the Cloud Shell prompt references your PROJECT_ID that is described in the pre-requisites.  If not, execute the following command:

    > gcloud config set project [PROJECT_ID]

     Setup Environment

    Dataflow runs jobs using the Apache Beam SDK.   In order to submit jobs to the Dataflow Service using Python, the development environment requires Python as well as the Apache Beam SDK for Python.  Additionally, Dataflow uses pip3, Python’s package manager to manage SDK dependencies and virtualenv to create isolated Python environments.


    In the Cloud Shell, execute the following commands to create a new Python environment called env:


    > pip3 install –upgrade virtualenv user
    > python3 -m virtualenv env
    > source env/bin/activate
    

    Install the required python libraries:


    > pip3 install apache-beam[gcp]
    > pip3 install doorda-sdk
    

     

    Setup a Cloud Storage Bucket


    Dataflow uses Cloud Storage buckets to store output data and cache pipeline code.  In Cloud Shell, create a Cloud Storage bucket by using the gsutil Python application that facilitates access to Cloud Storage from the command line:


    > gsutil mb gs://<YOUR_BUCKET_NAME>

    Create Pipeline 

     

    In order to use Beam, a driver program needs to be created.   The driver program defines the pipeline, including all of the inputs, transforms and outputs.  It also sets the execution options for the pipeline which are typically passed in using command line options.  This includes the pipeline runner, which, in turn, determines what back-end the pipeline will run on.

     

    The Beam SDK provides a number of abstractions that simplify the mechanics of large scale distributed processing.  The code example at the bottom of this page demonstrates using the Python SDK to execute a Pipeline across an arbitrary number of machines based on the value passed in the num_workers command line option.  The pipeline splits the reading of records from the specified table across a number of machines and writes the resulting JSON to Google Cloud Storage.

     

    The driver can either be coded directly inside Google Dataflow using Cloud Code or in an external editor and then uploaded directly to the Cloud Shell.


    Define the dependencies for parallel processing

     

    In Dataflow, data processing work is represented by a pipeline.   

     

    In order to take advantage of Dataflow’s parallel processing, all pipeline dependencies need to be made available on the remote machines that Dataflow manages during job execution.  This can be achieved by defining all pipeline dependencies within a requirements.txt.   This will be referenced as a pipeline option in order for Dataflow to ensure dependencies are copied to each remote machine participating in the job execution.

     

    In this example, doorda-sdk must be passed to all remote machines:


    > pip3 freeze | grep doorda-sdk > requirements.txt


    Launch Pipeline

     

    Use the Command Shell to launch the pipeline on the Cloud Dataflow service.  The running pipeline is referred to as a jobs in the Dataflow application.


    > python3 doorda_dataflow_demo.py [--pipeline_options]


    The flags are divided between Dataflow specific options and user defined.   For convenience the mandatory dataflow options to run the pipeline are shown below


    Dataflow options:


    Field

    Value

    project

    <YOUR_PROJECT_ID>

    runner

    DataflowRunner

    temp_location

    gs://<YOUR_BUCKET_NAME>/<TEMP_FOLDER_NAME>

    job_name

    <Name to appear in the Dataflow console jobs list>

    num_workers

    <INT>

    autoscaling_algorithm

    NONE 

    requirements_file

    requirements.txt

    User defined options:


    Field

    Value

    username

    <YOUR_DOORDA_USERNAME>

    password

    <YOUR_DOORDA_PASSWORD>

    catalog

    <DOORDA_CATALOG>

    schema

    <DOORDA_SCHEMA>

    table

    <DOORD_TABLE>


    Driver Program - doorda_dataflow_demo.py:

    import apache_beam as beam
    from apache_beam.io import restriction_trackers
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.transforms.core import RestrictionProvider
    from apache_beam.io.restriction_trackers import OffsetRange, OffsetRestrictionTracker
    from doorda_sdk.host import client
    
    from contextlib import contextmanager
    import logging
    import math
    
    @contextmanager
    def open_cursor(username, password, catalog, schema):
      conn = client.connect(username=username,
                              password=password,
                              catalog=catalog,
                              schema=schema)
      print(f'successfully connected to Doorda')
      cursor = conn.cursor()
      yield cursor
      cursor.close()
    
    class CursorByName():
      """
      Iterator that produces json output consisting of column_name:value from cursor
      """
      def __init__(self, cursor):
        self._cursor = cursor
    
      def __iter__(self):
        return self
    
      def __next__(self):
        row = self._cursor.__next__()
    
        return {description[0]: row[col] for col, description in enumerate(self._cursor.description) }
    
    class DoordaOptions(PipelineOptions):
      """
      Application specific Pipeline Options
      """
      
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument('--username', required=True)
        parser.add_argument('--password', required=True)
        parser.add_argument('--catalog', required=True)
        parser.add_argument('--schema', required=True)
        parser.add_argument('--table', required=True)
        parser.add_argument('--output', required=True)
    
    class QuerySplitter:
      """
      Defines how a query will be divided across the cluster
      """
    
      def __init__(self, options, num_workers=3):
        with open_cursor(options.username,
                         options.password,
                         options.catalog,
                         options.schema) as cursor:
          logging.info(f'options are {options}')
          cursor.execute(f'select count(1) from {options.catalog}.{options.schema}.{options.table}')
          row_count = cursor.fetchone()[0]
          logging.info(f'row count is {row_count}')
          self._start = 0
          self._stop = row_count
          self._num_workers = num_workers
    
      def split(self):
    
        split_size = math.ceil(self._stop/self._num_workers)
        logging.info(f'split_size = {split_size}')
        i = self._start
        while i < self._stop - split_size:
          logging.info(f'yielding offset ({i}, {i+split_size})')
          yield (i, i+ split_size)
          i += split_size
        logging.info(f'final offset yield ({i}, {self._stop})')
        yield (i, self._stop)
    
    
    class QueryFn(beam.DoFn):
      """
      Query to Parallelise across Cluster.
      """
    
      def __init__(self, options):
        self._options = options
    
      def process(self, query_range):
        start = query_range[0]
        stop = query_range[1]
    
        logging.info(f'process {self._options.table}, offset={start}, limit={stop-start}')
        with open_cursor(self._options.username,
                         self._options.password,
                         self._options.catalog,
                         self._options.schema) as cursor:
          cursor.execute(f'select * from {self._options.catalog}.{self._options.schema}.{self._options.table} offset {start} limit {stop-start}')
          #for row in cursor.iter_result():
          for row in CursorByName(cursor):
            yield row
    
    
    def run(argv=None):
      pipeline_options = DoordaOptions()
      logging.info(f'arguments are: {pipeline_options.get_all_options()}')
      splitter = QuerySplitter(pipeline_options, pipeline_options.get_all_options()["num_workers"])
      inputs = [x for x in splitter.split()]
      #This is required to ensure open_cursor global open_cursor is available to all nodes.
      pipeline_options.view_as(SetupOptions).save_main_session = True
      with beam.Pipeline(options=pipeline_options) as pipeline:
        (pipeline
        # load data from DoordaHost
        | 'Doorda Get Tables' >> beam.Create(inputs)
        | 'Doorda Read' >> beam.ParDo(QueryFn(pipeline.options))
        | 'Write Records' >> beam.io.WriteToText(pipeline.options.output)
        )
    
    
    
    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()




    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.
    ESC

    Eddy AI, facilitating knowledge discovery through conversational intelligence