By Keven Pinto.Jun 19, 2022
Hive to BigQuery is a common data migration path and there are plenty of ways to get there. This article describes the approach we took with one of our clients. The accompanying sample code is a scaled down version of our production code and is provided as an exemplar for anyone wanting to undertake a similar activity.
The Ask
A Discovery Workshop with the client revealed the following:
1) Table(s) on premise (Cloudera 6.x) are Hive managed
2) Underlying storage format for these tables is Parquet (SNAPPY compression)
3) These Parquet files will be uploaded by the client to a designated GCS Bucket i.e. Upload Bucket, each file size will be between 1–4 GB
4) The files will be encrypted using GPG
5) They will have an extension of .gpg when uploaded to the GCS Bucket
6) The root folder of the Upload Bucket will hold one or more manifest files (.manifest extension). These files will provide metadata on the data uploaded. below is an excerpt from a manifest file.
{ "files":[ { "file":"part-0000-8978787.snappy.parquet", "md5sum" :"d4adfabba91bb2c2dbaa774ea09bf83d", "cdhPath": "/cdh/hive/table1" }, { ..... } ] }
7) The “cdhPath” key in the Manifest file will allow us to identify the name of the target table. for example, “cdhPath”: “/cdh/hive/table1”
8) The Upload Bucket will follow the exact sub folders structure as specified in the “cdhPath” key of the Manifest File
9) The “md5sum” specified in the Manifest file will be pre GPG values
Our Approach
A review of these requirements meant we would need to deal with the following challenges:
- Make the data available locally to the decryption service, i.e. Mounted to a Machine/Pod/Container (as we cannot decrypt using a gs:// URI) or use a service that already has a bucket(s) mounted to worker nodes
- Ensure that we are able to orchestrate the entire process and reduce any manual intervention
- Design a workflow that allows us to decrypt files in parallel as we felt this would be a long running task given the size and number of the files
- Have a secure way of storing GPG keys and making them available to compute resources
Architecture
Based on the requirements & challenges we felt Cloud Composer would be the best tool for the job as it would allow us the following:
- Access to the gs://DAG_BUCKET/data subfolder on every node. This is mounted as /home/airflow/gcs/data by default by Cloud Composer. This took care of challenge 1
- Use TaskGroups in Airflow to decrypt in Parallel. This took care of challenge 2 & 3
- Use Airflows seamless integration with GCP Secrets manager to ensure keys are securely passed to nodes performing decryption. This took care of the challenge 4
Composer Configuration
The first thing we needed to do to our existing Composer 2 instance was to modify it to use GCP Secret Manager as its Secrets Backend. A secrets Backend is secure store from where Cloud Composer accesses secure information like passwords, connection details etc. Kindly refer to this doc from Google for more information on how to set this up.
As you can see from the screenshot (backend_kwargs) we have informed Cloud Composer that all our variables in Secrets Manager will start with the word “migration” and will be immediately followed by a Hyphen.
The DAG also looks for an Airflow Variable called source_location. This holds the name of the DAG Bucket. make sure to set this if you plan to download the code and use it.
GCP Secret Manager Configuration
We next had to setup our GPG Private Key, GPG Public Key and GPG Passphrase in Secrets manager. As mentioned in the section above these Variables needed to conform to the naming convention of migration-<name>
As you can see above, we have created 3 secrets and the names of these secrets conform to the naming standard we configured in Cloud Composer. At this point we’d also like to point out that Airflow will mask information stored in any variables whose name contains the following words (‘access_token’, ‘api_key’, ‘apikey’,’authorization’, ‘passphrase’, ‘passwd’, ‘password’, ‘private_key’, ‘secret’, ‘token’). This means that the output from these variables always appear as ‘***’ in any Airflow logs and output messages (Security out of the Box!). This feature dictated what the names of our variables would be. Kindly refer to this document for more details on this masking feature.
DAG Design
The following principles guided our work:
- Use gcp SDK tools wherever possible (gsutil, gcloud etc) via the BashOperator
- Avoid writing custom ETL (if possible) — leverage the bq cli tool from BigQuery
- Make sure the process is idempotent
- Ensure individual Tasks are atomic in nature
Overview of DAG Tasks
init (BashOperator)
Create necessary Folders under /home/airflow/gcs/data/
attach (BashOperator)
Copy data (gsutil cp) from the upload bucket to /encrypted folder. This now makes data available to all Worker Nodes in Airflow and allows us to gpg decrypt the files.
decrypt_hive_files (TaskGroup of BashOperators)
Decrypt the Files in Parallel and store this in the /decrypted folder.
Given that we are using Airflow 2.2.x, we are not able to take advantage of the dynamic_tasks (Airflow 2.3 ) feature. (n.b. I’m open to ideas if anyone has a better way to do this?)
Note: We tried creating task dynamically based on the manifest but this process would create multiple tasks for the same table.
calc_file_md5 (BashOperator)
Calculates md5 sums on the file(s) after decrypting the file and store this in the /scripts folder
validate_md5 (PythonOperator)
Compare prem and cloud md5’s
gen_load_script (PythonOperator)
This steps creates 2 shell scripts (dynamically) based on the values in the manifest files.
- group_files_by_table.sh
- load_parquets_to_bq.sh
group_files_by_table (BashOperator)
This step uses the dynamic script created in the previous step to group all parquet files for a table under a single directory. The directory is the name of the table.
run_bq_loader (BashOperator)
Use the script created in task gen_load_scripts to load data in BigQuery using the “bq” command line utility in Bigquery
Gotchas
There were quite a few, but these bit us the most.
- Profile your data and identify any outliers. We observed timestamp columns with values ”0000–00–00 00:00:00”. This data will break your load. Fix at source or exclude at source if possible
- Hive allows columns that start with a number, for example “3_month_rolling_average”, BigQuery does not allow columns that start with a number. Ideally, create a new staging table in prem with this column renamed, move data to this new staging table and then extract and ship parquets from this staging table to GCS.
The source code is available here.
Conclusion
We have a fully Orchestrated Process to move data from on-prem to BiqQuery without writing a single line of ETL code! We do hope you’ll will try and implement some of the ideas and concepts mentioned here.
Disclaimer: This is to inform readers that the views, thoughts, and opinions expressed in the text belong solely to the author.
About CTS
CTS is the largest dedicated Google Cloud practice in Europe and one of the world’s leading Google Cloud experts, winning 2020 Google Partner of the Year Awards for both Workspace and GCP.
We offer a unique full stack Google Cloud solution for businesses, encompassing cloud migration and infrastructure modernisation. Our data practice focuses on analysis and visualisation, providing industry specific solutions for; Retail, Financial Services, Media and Entertainment.
We’re building talented teams ready to change the world using Google technologies. So if you’re passionate, curious and keen to get stuck in — take a look at our Careers Page and join us for the ride!
The original article published on Medium.