Datadog Gold Partner logo

Running pyspark jobs on Google Cloud using Serverless Dataproc

By Keven Pinto.Mar 24, 2022

Run Spark batch workloads without having to bother with the provisioning and management of clusters!

If you are interested in running a simple pyspark pipeline in Serverless mode on the Google Cloud Platform then read on..

The ability to run spark jobs in serverless mode is a great idea, however, the loss of control over the Cluster demands that we structure and package our code in a slightly different manner. This article provides an explanation of the method that I have employed to get my pipeline running in serverless mode.

The approach taken in this article is one of many that one can employ while working with Dataproc Serverless; the other approach that comes to mind is that of using custom containers …. having started on that path, I eventually abandoned it due to the following reasons:

  • One more configurable item to maintain
  • A constant need to modify the container if new pipelines utilise new python libraries
  • Pipelines with dependencies on different versions of the same package.

Note: I would like to state that custom containers are not a bad feature, it just didn’t fit my use case at this time.

Prerequisites

Prior to downloading the Code from Github, ensure that you have the following setup on the machine that you will be executing the code.

  • Poetry for dependency management and packaging.
  • gcloud installed and authorised to your GCP Project
  • gsutil
  • bq command line tool
  • make build tool

Please also ensure the following in the GCP Project you plan to execute the pyspark job:

A bit about the Pipeline …

The Pipeline is a simple pyspark job that reads a file from a GCS bucket and saves the result to a table in Bigquery. If all works, you should see a table called stock_prices in a bigquery dataset called serverless_spark_demo in your GCP Project.

Setting up the Pipeline for your GCP Project
  • Download the code from here
  • Edit the Makefile in the Repo and update the PROJECT & REGION variables
# Please add values without quotes
PROJECT_ID ?= <CHANGE ME>
REGION  ?= <CHANGE ME>
for example:
PROJECT_ID ?= my-gcp-project-292607
REGION ?= europe-west2
  • Run the following command from the root folder of the repo and ensure that you are in the correct GCP Project
gcloud config list
  • Run the following command from the root folder of the repo to setup the required buckets and dataset in your GCP Project.
make setup

After this command executes, you should have the following assets in your GCP Project:

  1. GCS Bucket serverless-spark-code-repo-<project_number> — our Pyspark Code gets uploaded here
  2. GCS Bucket serverless-spark-staging-<project_number> — Used for BQ operations
  3. GCS Bucket serverless-spark-data-<project_number — Our source csv file is in here
  4. Bigquery Dataset called serverless_spark_demo in BigQuery
Packaging Your Code

As this is a serverless setup, we will be packaging our python code along with all its 3rd party python dependencies and submit this as a single packaged file to the service. Dataproc Serverless supports .py.egg and .zip file types, we have chosen to go down the zip file route.

  • To Package the code, run the following command from the root folder of the repo
make build

The above command does the following:

  • Creates a build package in a folder called /dist in your repo root folder
  • Uploads this folder to the GCP Bucket(serverless-spark-code-repo-<project_number>)

Let us take a look at what’s happening under the hood! Below is the build target from the Makefile.

build: clean ## Build Python Package with Dependencies
   @mkdir -p ./dist
   @poetry update
   @poetry export -f requirements.txt --without-hashes -o requirements.txt
   @poetry run pip install . -r requirements.txt -t ${SRC_WITH_DEPS}
   @cd ./${SRC_WITH_DEPS}
   @find . -name "*.pyc" -delete
   @cd ./${SRC_WITH_DEPS} && zip -x "*.git*" -x "*.DS_Store" -x "*.pyc" -x "*/*__pycache__*/" -x ".idea*" -r ../dist/${SRC_WITH_DEPS}.zip .
   @rm -Rf ./${SRC_WITH_DEPS}
   @rm -f requirements.txt
   @cp ./src/main.py ./dist
   @mv ./dist/${SRC_WITH_DEPS}.zip ./dist/${APP_NAME}_${VERSION_NO}.zip
   @gsutil cp -r ./dist gs://${CODE_BUCKET}

Poetry (poetry update) first looks at all the dependencies (3rd party packages) required by the pipeline. it does this by interrogating the pyproject.toml file and updating thepoetry.lock file (if required). I’d encourage you to look at the [tool.poetry.dependencies] section of the pyproject.toml to get a view of the 3rd party packages used in the project.

This .lock file then forms the basis to setup all code and dependencies in a temp folder before zipping it all up in a single .zip file. This file is renamed to APP_NAME_VERSION_NO.zip in a later step – the values of APP_NAME and VERSION_NO are extracted from pyproject.toml.

One may notice that I have not added pyspark in the [tool.poetry.dependencies] section… that’s deliberate – pyspark comes pre installed on Googles standard spark container.

The final bit I want to discuss around the build is the following line.

@cp ./src/main.py ./dist

Why are we moving this file again to ./dist, isn’t this already part of the zip?

No!, is the short answer.

The reason we need to do this step is because dataproc serverless needs a python file as the main entry point, this cannot be inside a .zip or .egg file. So we need keep this as an independent file under the ./dist folder. The dist folder now looks like this.

├── dist
│   ├── main.py
│   └── spark_serverless_repo_exemplar_0.1.0.zip

Another important point to note is the config settings inpyproject.toml – we exclude the main.pyfile that sits directly under /src from being packaged. Below is an excerpt from the .toml file that shows the setting.

exclude = ["src/main*.py"]
packages = [
{ include = "src/**/*.py" },
]
Execute the Pipeline

The pipeline can be executed by running the following command from the root folder of the Repo

make run

The code below is an excerpt from the Makefile and shows the gcloud command we have used to run spark in serverless mode.

run: ## Run the dataproc serverless job
   gcloud beta dataproc batches submit --project ${PROJECT_ID} --region ${REGION} pyspark \
   gs://${CODE_BUCKET}/dist/main.py --py-files=gs://${CODE_BUCKET}/dist/${APP_NAME}_${VERSION_NO}.zip \
   --subnet default --properties spark.executor.instances=2,spark.driver.cores=4,spark.executor.cores=4,spark.app.name=spark_serverless_repo_exemplar \
   --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
   -- --project=${PROJECT_ID} --file-uri=gs://${DATA_BUCKET}/stocks.csv --temp-bq-bucket=${TEMP_BUCKET}

Note: Spark communicates to BigQuery via a Connector, this connector needs to be passed to the dataproc job via the — jars flag

--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar

Once this pipeline completes Successfully; one should see a table called stock_prices under the serverless_spark_demo dataset in bigquery.

View Pipeline Execution
  • Log on to your GCP Console
  • Type dataprocin the Search bar and select Dataproc
  • Click on the Batches Menu option on the Left
Article-Running pyspark jobs on Google Cloud using Serverless Dataproc-1
Dataproc Batches — Console View

This Screen should give you a list of Dataproc Serverless Batch jobs you have executed, you should see the job you just submitted either in the Pending or the Succeeded state.

Click on the Batch ID of the job we just executed, this opens up the detailed view for the job. Click on the Clone Menu Option and then click Submit

Article-Running pyspark jobs on Google Cloud using Serverless Dataproc-2

You will observe a new Batch Job in the dataproc batches list, however if you click on the Clusters menu option on the left, you will not see any clusters being instantiated…. Magic!

Summing it all up

Congrats- you have run just your first Dataproc Serverless job. As we have seen, there is absolutely no cluster to manage or provision.

I am hopeful you have found this useful and will try this out in your GCP projects. if you like what has been showcased, feel free to get in touch and we shall be happy to help you/your company on your Cloud journey with GCP.

To know a bit more about Dataproc Serverless kindly refer to this excellent article written by my colleague Ash.

Links and References


About CTS

CTS is the largest dedicated Google Cloud practice in Europe and one of the world’s leading Google Cloud experts, winning 2020 Google Partner of the Year Awards for both Workspace and GCP.

We offer a unique full stack Google Cloud solution for businesses, encompassing cloud migration and infrastructure modernisation. Our data practice focuses on analysis and visualisation, providing industry specific solutions for; Retail, Financial Services, Media and Entertainment.

We’re building talented teams ready to change the world using Google technologies. So if you’re passionate, curious and keen to get stuck in — take a look at our Careers Page and join us for the ride!

Disclaimer: This is to inform readers that the views, thoughts, and opinions expressed in the text belong solely to the author.


The original article published on Medium.

Related Posts