By Keven Pinto.Mar 24, 2022
Run Spark batch workloads without having to bother with the provisioning and management of clusters!
If you are interested in running a simple pyspark pipeline in Serverless mode on the Google Cloud Platform then read on..
The ability to run spark jobs in serverless mode is a great idea, however, the loss of control over the Cluster demands that we structure and package our code in a slightly different manner. This article provides an explanation of the method that I have employed to get my pipeline running in serverless mode.
The approach taken in this article is one of many that one can employ while working with Dataproc Serverless; the other approach that comes to mind is that of using custom containers …. having started on that path, I eventually abandoned it due to the following reasons:
- One more configurable item to maintain
- A constant need to modify the container if new pipelines utilise new python libraries
- Pipelines with dependencies on different versions of the same package.
Note: I would like to state that custom containers are not a bad feature, it just didn’t fit my use case at this time.
Prerequisites
Prior to downloading the Code from Github, ensure that you have the following setup on the machine that you will be executing the code.
- Poetry for dependency management and packaging.
- gcloud installed and authorised to your GCP Project
- gsutil
- bq command line tool
- make build tool
Please also ensure the following in the GCP Project you plan to execute the pyspark job:
- Dataproc API enabled
- Ensure your subnet has Private Google Access enabled
- Ensure Billing is enabled
- Bigquery API is enabled
A bit about the Pipeline …
The Pipeline is a simple pyspark job that reads a file from a GCS bucket and saves the result to a table in Bigquery. If all works, you should see a table called stock_prices in a bigquery dataset called serverless_spark_demo in your GCP Project.
Setting up the Pipeline for your GCP Project
- Download the code from here
- Edit the Makefile in the Repo and update the PROJECT & REGION variables
# Please add values without quotes PROJECT_ID ?= <CHANGE ME> REGION ?= <CHANGE ME> for example: PROJECT_ID ?= my-gcp-project-292607 REGION ?= europe-west2
- Run the following command from the root folder of the repo and ensure that you are in the correct GCP Project
gcloud config list
- Run the following command from the root folder of the repo to setup the required buckets and dataset in your GCP Project.
make setup
After this command executes, you should have the following assets in your GCP Project:
- GCS Bucket
serverless-spark-code-repo-<project_number>
— our Pyspark Code gets uploaded here - GCS Bucket
serverless-spark-staging-<project_number>
— Used for BQ operations - GCS Bucket
serverless-spark-data-<project_number
— Our source csv file is in here - Bigquery Dataset called
serverless_spark_demo
in BigQuery
Packaging Your Code
As this is a serverless setup, we will be packaging our python code along with all its 3rd party python dependencies and submit this as a single packaged file to the service. Dataproc Serverless supports .py
, .egg
and .zip
file types, we have chosen to go down the zip file route.
- To Package the code, run the following command from the root folder of the repo
make build
The above command does the following:
- Creates a build package in a folder called
/dist
in your repo root folder - Uploads this folder to the GCP Bucket(
serverless-spark-code-repo-<project_number>)
Let us take a look at what’s happening under the hood! Below is the build target from the Makefile.
build: clean ## Build Python Package with Dependencies
@mkdir -p ./dist
@poetry update
@poetry export -f requirements.txt --without-hashes -o requirements.txt
@poetry run pip install . -r requirements.txt -t ${SRC_WITH_DEPS}
@cd ./${SRC_WITH_DEPS}
@find . -name "*.pyc" -delete
@cd ./${SRC_WITH_DEPS} && zip -x "*.git*" -x "*.DS_Store" -x "*.pyc" -x "*/*__pycache__*/" -x ".idea*" -r ../dist/${SRC_WITH_DEPS}.zip .
@rm -Rf ./${SRC_WITH_DEPS}
@rm -f requirements.txt
@cp ./src/main.py ./dist
@mv ./dist/${SRC_WITH_DEPS}.zip ./dist/${APP_NAME}_${VERSION_NO}.zip
@gsutil cp -r ./dist gs://${CODE_BUCKET}
Poetry (poetry update) first looks at all the dependencies (3rd party packages) required by the pipeline. it does this by interrogating the pyproject.toml
file and updating thepoetry.lock
file (if required). I’d encourage you to look at the [tool.poetry.dependencies]
section of the pyproject.toml
to get a view of the 3rd party packages used in the project.
This .lock file then forms the basis to setup all code and dependencies in a temp folder before zipping it all up in a single .zip file. This file is renamed to APP_NAME_VERSION_NO.zip
in a later step – the values of APP_NAME
and VERSION_NO
are extracted from pyproject.toml
.
One may notice that I have not added pyspark in the [tool.poetry.dependencies]
section… that’s deliberate – pyspark comes pre installed on Googles standard spark container.
The final bit I want to discuss around the build is the following line.
@cp ./src/main.py ./dist
Why are we moving this file again to ./dist
, isn’t this already part of the zip?
No!, is the short answer.
The reason we need to do this step is because dataproc serverless needs a python file as the main entry point, this cannot be inside a .zip or .egg file. So we need keep this as an independent file under the ./dist folder. The dist folder now looks like this.
├── dist │ ├── main.py │ └── spark_serverless_repo_exemplar_0.1.0.zip
Another important point to note is the config settings inpyproject.toml
– we exclude the main.py
file that sits directly under /src
from being packaged. Below is an excerpt from the .toml file that shows the setting.
exclude = ["src/main*.py"]
packages = [
{ include = "src/**/*.py" },
]
Execute the Pipeline
The pipeline can be executed by running the following command from the root folder of the Repo
make run
The code below is an excerpt from the Makefile and shows the gcloud command we have used to run spark in serverless mode.
run: ## Run the dataproc serverless job
gcloud beta dataproc batches submit --project ${PROJECT_ID} --region ${REGION} pyspark \
gs://${CODE_BUCKET}/dist/main.py --py-files=gs://${CODE_BUCKET}/dist/${APP_NAME}_${VERSION_NO}.zip \
--subnet default --properties spark.executor.instances=2,spark.driver.cores=4,spark.executor.cores=4,spark.app.name=spark_serverless_repo_exemplar \
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
-- --project=${PROJECT_ID} --file-uri=gs://${DATA_BUCKET}/stocks.csv --temp-bq-bucket=${TEMP_BUCKET}
Note: Spark communicates to BigQuery via a Connector, this connector needs to be passed to the dataproc job via the — jars flag
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar
Once this pipeline completes Successfully; one should see a table called stock_prices
under the serverless_spark_demo
dataset in bigquery.
View Pipeline Execution
- Log on to your GCP Console
- Type
dataproc
in the Search bar and select Dataproc - Click on the Batches Menu option on the Left
This Screen should give you a list of Dataproc Serverless Batch jobs you have executed, you should see the job you just submitted either in the Pending
or the Succeeded
state.
Click on the Batch ID of the job we just executed, this opens up the detailed view for the job. Click on the Clone
Menu Option and then click Submit
You will observe a new Batch Job in the dataproc batches list, however if you click on the Clusters
menu option on the left, you will not see any clusters being instantiated…. Magic!
Summing it all up
Congrats- you have run just your first Dataproc Serverless job. As we have seen, there is absolutely no cluster to manage or provision.
I am hopeful you have found this useful and will try this out in your GCP projects. if you like what has been showcased, feel free to get in touch and we shall be happy to help you/your company on your Cloud journey with GCP.
To know a bit more about Dataproc Serverless kindly refer to this excellent article written by my colleague Ash.
Links and References
About CTS
CTS is the largest dedicated Google Cloud practice in Europe and one of the world’s leading Google Cloud experts, winning 2020 Google Partner of the Year Awards for both Workspace and GCP.
We offer a unique full stack Google Cloud solution for businesses, encompassing cloud migration and infrastructure modernisation. Our data practice focuses on analysis and visualisation, providing industry specific solutions for; Retail, Financial Services, Media and Entertainment.
We’re building talented teams ready to change the world using Google technologies. So if you’re passionate, curious and keen to get stuck in — take a look at our Careers Page and join us for the ride!
Disclaimer: This is to inform readers that the views, thoughts, and opinions expressed in the text belong solely to the author.
The original article published on Medium.