Try a preview of AI-powered search below!

Cross

Apache Beam

Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the pipeline. The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow.

Prerequisite

To use Apache Beam, ensure that you have YugabyteDB up and running. Download and install YugabyteDB by following the steps in Quick start.

Setup

To run Apache Beam with YugabyteDB, do the following:

  1. Create a folder apache-beam-test as follows:

    mkdir apache-beam-test && cd apache-beam-test
    
  2. Create a python virtual environment and activate it as follows:

    python3 -m venv myenv
    source myenv/bin/activate
    
  3. Install the latest Apache Beam Python SDK from Pypi using the following command:

    pip install beam-nuggets
    
  4. Create a python file, democode.py and add the following code to it:

    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from beam_nuggets.io import relational_db
    records = [
        {'name': 'Jan', 'num': 1},
        {'name': 'Feb', 'num': 2},
        {'name': 'Mar', 'num': 3},
        {'name': 'Apr', 'num': 4},
        {'name': 'May', 'num': 5},
    ]
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql',  #postgresql+pg8000
        host='127.0.0.1',
        port=5433,
        username=yugabyte,
        password='yugabyte',
        database='yugabyte',
        create_if_missing=True  # create the database if not there
    )
    table_config = relational_db.TableConfiguration(
        name='months_col01',
        create_if_missing=True,
        primary_key_columns=['num']
    )
    with beam.Pipeline(options=PipelineOptions()) as p:
        months = p | "Reading month records" >> beam.Create(records)
        months | 'Writing to DB table' >> relational_db.Write(
            source_config=source_config,
            table_config=table_config
        )
    if __name__ == "__main__":
        print('demo code ran successful')
    
  5. Run democode.py as follows:

    python democode.py
    
    demo code ran successful
    
  6. You can also verify the changes from a ysql shell as follows:

    select * from month_col01;
    

    You can see the following output:

     num | name
    -----+------
       5 | May
       1 | Jan
       4 | Apr
       2 | Feb
       3 | Mar
    (5 rows)