By guillaume blaquiere.Dec 18, 2021
Distributed computing is the key to process big data at scale. All data processing systems use clusters of VMs: Hadoop, Dataflow, and, of course, BigQuery.
Dozens or hundreds of processes can run at the same time and perform parallel operations to speed up the process.
The BigQuery export statement is a great example: with a SQL statement, you can spawn hundreds of slots (i.e. slice of VM) to export the data from BigQuery. But because the export is performed in parallel by hundreds of slots, the export generates hundreds of files.
I already described how to leverage BigQuery to handle, merge, clean CSV files with the tradeoff of several CSV files as output;
Or how to export the data from BigQuery and load them in Cloud SQL, with an iteration through all the exported CSV files by BigQuery. And an issue has been open on my GitHub repository with a great optimisation proposition:
Instead of importing a lot of small files into Cloud SQL, why not merging all the files and importing only one in Cloud SQL?
Let’s solve that challenge
Cloud Storage Compose Feature
Cloud Storage stores binary object (Blob) of any type. When a export is performed by BigQuery, Cloud Storage is used to stored those exported files.
Cloud Storage is very efficient (and affordable) to store blobs, but its processing capacities are limited:
- Can serve gzipped file in a plain format with transcoding feature
- Can enforce a lifecycle on objects
- Can compose several files in only one
That’s roughly all!
The latest bullet points, Compose, isn’t well known because it’s not available through the console. It allows you to compose (to append) up to 32 files into a single one.
But this feature is limited to 32 files per API call, and when you have hundreds of files, you need to loop over that command.
Cloud Workflow Solution
Cloud Workflow allows to declare through a YAML file a sequence of API calls with controls on the flows (iteration, conditions, retries,…).
Therefore a Workflow is perfect to browse all the exported files by BigQuery and to append them in a single one CSV file.
Let’s define that workflow!
BigQuery Export call
The first step is to query BigQuery with the export statement. The format used is CSV, because it’s easy to merge CSV by appending the files (without headers).
The BigQuery job query connector is used to perform that query.
The service account that runs the Workflow must have the role “BigQuery data viewer” on the data to query and “BigQuery Job User” on the project to be able to run a query job.
- export-query: call: googleapis.bigquery.v2.jobs.query args: projectId: ${projectid} body: query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + prefix + "*.csv', format='CSV', overwrite=true,header=false) AS " + query} useLegacySql: false
The header=false
is important to avoid header addition in each file, and then issue when appending files.
Cloud Storage management
Cloud Storage is the core service of that workflow. Many different APIs, and features, are used.
In that section, the service account that runs the Workflow must have the Storage Object Admin (viewer + creator), to be allowed to perform the following operations.
Output file
As output, I want a new file with all the exported data in it. I don’t want to change the file generated by BigQuery, to keep an integrity in the BigQuery process, and to re-run the compose operation in case of issue.
So, the idea is to create an initial file in which all the other CSVs will be appended.
For now (December 2021), there is an issue in the connector, the team is on it. Therefore, I use the Cloud Storage JSON API directly
- create-final-file: call: http.post args: url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + bucket + "/o?name=" + finalFileName + "&uploadType=media"} auth: type: OAuth2 body:
That file can be empty (my case, body:
is empty) but you can add a header manually if you need a header in your use case.
List the files
The other usage of Cloud Storage is the file listing to get all the exported files. The good news is that the BigQuery export statement requires a Cloud Storage file prefix to store the exported file.
Cloud Storage can only filter on file prefix.
We simply have to reuse the same one!
Let’s use the Cloud Storage object list connector for that
- list-files: call: googleapis.storage.v1.objects.list args: bucket: ${bucket} pageToken: ${pagetoken} prefix: ${prefix} maxResults: 62 result: listResult
I will discuss the maxResults
and the pageToken
parameter, in the optimization section
Iterate over the files
Now that we have listed all the files, we need to iterate over them and when we have accumulate 32 files (the Compose limit), or reach the end of the file list, we can run a Cloud Storage compose operation.
The iteration is based on the list of files returned by the Cloud Storage list object. We append the file names in a list, and when the list reaches 32 elements, a compose operation is triggered.
- init-iter:
assign:
- finalFileFormatted:
name: ${finalFileName}
- fileList:
- ${finalFileFormatted}
- process-files:
for:
value: file
in: ${listResult.items}
steps:
- concat-file:
assign:
- fileFormatted:
name: ${file.name}
- fileList: ${list.concat(fileList, fileFormatted)}
- test-concat:
switch:
- condition: ${len(fileList) == 32}
steps:
- compose-files:
call: compose_file
args:
fileList: ${fileList}
projectid: ${projectid}
bucket: ${bucket}
finalFileName: ${finalFileName}
next: init-for-iter
- init-for-iter:
assign:
- fileList:
- ${finalFileFormatted}
- finish-compose: # Process the latest files in the fileList buffer
switch:
- condition: ${len(fileList) > 1} # If there is more than the finalFileName in the list
steps:
- last-compose-files:
call: compose_file
args:
fileList: ${fileList}
projectid: ${projectid}
bucket: ${bucket}
finalFileName: ${finalFileName}
The list isn’t empty, the first file is the desired output file, all the other files are appended to it.
Compose the final file
The last and the most important, the compose operation. Here, we are using the connector and appending all the files in the destinationObject
.
- compose: call: googleapis.storage.v1.objects.compose args: destinationBucket: ${bucket} destinationObject: ${text.replace_all(finalFileName,"/","%2F")} userProject: ${projectid} body: sourceObjects: ${fileList}
As you can see, the destinationObject
is “special”. We must transform it to avoid incompatible characters. Here the /
is replaced by %2F
, the URL encoding equivalent. You could have a similar replacement to perform with space character or <>
.
The team is working on new encoding functions to do that automatically.
Deploy it and run it
You can find the full code in my GitHub repository and follow the instructions in the README.md
file to deploy and run the workflow.
Issues and optimization
In the workflow YAML definition, I included some optimization and issue fixes.
Firstly, when the process is too quick between the end of the BigQuery export and the compose operation, the files aren’t found on Cloud Storage (I don’t know why). I added a sleep timer of 1 second to fix that latency issue
- waitForGCS: # fix latency issue with Cloud Storage call: sys.sleep args: seconds: 1
Then to optimize (i.e. minimize) the number of calls to the Compose API, I list the objects by multiple of 31.
Why 31?
Because the limit is 32 files to compose, and we have to take the final output file in consideration. Therefore only 31 new files to compose with the final output file.
- list-files: call: googleapis.storage.v1.objects.list args: bucket: ${bucket} pageToken: ${pagetoken} prefix: ${prefix} maxResults: 62
But why 62? Not 93, 124 or more? Here comes a limit of Workflow. There is a limit on the variable size. Today it’s 64kb, soon 256Kb, but you can’t store too many results in a variable without crashing your execution.
The latest issue is the prevention of too much iteration depth. if you recursively call a sub-workflow, you can get an execution RecursionError
.
The solution is to perform the check and the iteration outside the sub-workflow itself and to call it again until the end of the recursion (still pages to process)
- composefiles:
call: list_and_compose_file
args:
pagetoken: ${listResult.nextPageToken}
bucket: ${bucket}
prefix: ${prefix}
projectid: ${projectid}
finalFileName: ${finalFileName}
result: listResult
- missing-files: # Non recursive loop, to prevent depth errors
switch:
- condition: ${"nextPageToken" in listResult}
next: composefiles
Combine, optimize, innovate
Cloud Workflow is a wonderful tool to easily orchestrate API calls. You can combine products without having code to deploy, applications to write or complex architecture to set up. You can solve challenges that previously took much more effort to achieve.
The product is not yet perfect, and several developer features are still in the roadmap, but it already offers elegant solutions, it evolves very quickly and it is already Generally Available (GA). Have a try on it!
The original article published on Medium.