Datadog Gold Partner logo

Replicate data from BigQuery to Cloud SQL with Cloud Workflow

By guillaume blaquiere.Nov 23, 2021

Article Replicate data from BigQuery to Cloud SQL with Cloud Workflow 1

With the popularity and the power of the Clouds, some think that unicorns exist: One database that can rule them all!
However, technology, and physics, can’t achieve this dream. So, each cloud provider proposes many data storage options and each fits very specific use cases.

The BigQuery issue

Analytics databases, like BigQuery, are very efficient to process petabytes of data in seconds. Aggregations, complex queries, joins with billions of rows create new insight and new values for the companies.

The bad side is the latency. To process large volume of data, a cluster of CPU/Memory couples (named “slots” in BigQuery) need to be provisioned before starting the query. And even if it’s very fast, it takes about 1 second. And the process is the same, even if the volume of data is low, or if the query is simple.

It’s not a concern for analytics, but not for real-time, like website or API serving.

A low latency database like Cloud SQL is required.

Cloud SQL option

Cloud SQL is a managed database service that can run MySQL, PostgreSQL and SQL server. These database engines can answer simple queries in a few milliseconds and are perfect for websites and API servers.

However, the Cloud SQL databases can’t scale out (add additional servers in parallel to process the data) but only scale up (increase the quantity of CPUs and the memory on one server). Read replicas (additional servers in parallel) can also run a query on only one server (replica).

Because of that, the performances are low to process efficiently a large amount of data (which is limited to 10 terabytes)

Serving powerful insights at low latency

Businesses need powerful insight with the capacity to compute and deep dive into petabytes of data, and to be able to browse the result at low latency.

The ideal would be to combine both worlds, but unicorns do not exist!

The solution is to perform the computation in BigQuery and to export the result to Cloud SQL for low latency.

Data duplication concern

Data duplication can be scary or viewed as an anti-pattern. It’s not so obvious.

  • Data duplication increases the cost!
    Yes, it’s true. However data storage costs about 10 times less than data processing and duplicating the data to optimise the processing is fine to save money!
  • Duplicated data are out of control.
    It could be true. In this case, we need to define the “golden source”. Here it’s BigQuery, the Cloud SQL data are only here in read only. No update is allowed. Two way updates can be a management nightmare; here, for reporting data, we avoid this concern with a “main/replicate” pattern.

Load BigQuery data into Cloud SQL

BigQuery allows to export data in CSV files and to store the file in Cloud Storage.
Cloud SQL allows to import CSV file from Cloud Storage.

In principle, the process seems obvious. However, it’s not so simple!

  • BigQuery exports a large volume of data in several files.
  • Cloud Storage can import only one file at a time. Concurrent imports aren’t supported

To sequence this process, an orchestration is required. Cloud Workflow is perfect for that!

Let’s write this pipeline!

Export BigQuery Data

BigQuery can export the data stored in a table. It’s free but you can’t select the data that you want or format/transform them. It’s a raw copy and paste of BigQuery storage to Cloud Storage in CSV format.

It’s not ideal, because we have to correctly format the data to be inserted in Cloud SQL. Hopefully, another option exists: the EXPORT statement.

This statement takes export’s options and a query in parameter. The result of the query is stored in one or several files in Cloud Storage.
A simple query performs the export! And a workflow connector exists for running queries!

Let’s start our Workflow with that step.

- export-query:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${projectid}
body:
query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + prefix + "*.csv', format='CSV', overwrite=true,header=false) AS " + query}
useLegacySql: false

Because you have to insert a job and read data, the service account that runs the workflow must have access to the data (be bigquery.dataViewer on the table at least) and be able to create a job in the project (be bigquery.jobUser). To write the export files in Cloud Storage, the role storage.objectAdmin is required.

Import data in Cloud SQL

Now, we have to import the data in Cloud SQL. An API call can be performed to the Cloud SQL Admin API.
However, this time, no connector exists, and we have to call the API directly.

- callImport:
call: http.post
args:
url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/" + instance + "/import"}
auth:
type: OAuth2
body:
importContext:
uri: ${file}
database: ${databaseschema}
fileType: CSV
csvImportOptions:
table: ${importtable}
result: operation

For the call, you can note the OAuth2 auth argument. It’s required to allow Workflow adding authentication header to the API call. The Workflow service account needs to be Cloud SQL admin to perform an import.

Because :

  • The operation is asynchronous
  • There isn’t connector for that async operation
  • In case of several file to import, only one import can be done at a time

We have to:

  • Get the end of the current import by checking regularly the state of the current import operation.
  • When the job is done, continue the process: iterate over the others file or exit.

To achieve that, we can get the status in the result of the import call. The variable is named operation.
If the status is not DONE, we have to wait and check it again, else we can exit.

- chekoperation:
switch:
- condition: ${operation.body.status != "DONE"}
next: wait
next: completed
- completed:
return: "done"
- wait:
call: sys.sleep
args:
seconds: 1
next: getoperation
- getoperation:
call: http.get
args:
url: ${operation.body.selfLink}
auth:
type: OAuth2
result: operation
next: chekoperation

The API has the good idea to provide a selflink to get the operation status. The check query is easiest thanks to it.

Browse the export’s files in Cloud Storage

We have the 2 sides of the process: the export and the import. We must bind them, and Cloud Storage is the place where the relation is made.
But… the connexion is not so easy!

Indeed, if you have a close look at the import definition, you can import only one file at a time. And BigQuery export can create a set of files.

Therefore, we have to iterate over the files created by BigQuery during the export and invoke, for each file, the import step in Cloud SQL process (or sub-workflow)

- list-files:
call: googleapis.storage.v1.objects.list
args:
bucket: ${bucket}
pageToken: ${pagetoken}
prefix: ${prefix}
result: listResult
- process-files:
for:
value: file
in: ${listResult.items}
steps:
- wait-import:
call: load_file
args:
projectid: ${projectid}
instance: ${instance}
databaseschema: ${databaseschema}
importtable: ${importtable}
file: ${"gs://" + bucket + "/" + file.name}1

In addition, if you have a large number of files, the Cloud Storage list API answer can contain several pages. We must iterate over the file AND over the pages also.

As you can see in my GitHub repository, the subworkflow list_file_to_import is not recursive and doesn’t call itself in case of the next page to browse. The whole result listResult is sent back to the invoker.

In fact, Cloud Workflow has a limit in the depth of subworkflow calls and you can get a RecursionError if you reach it.
So, the trick is to exit the subworkflow, with the required information to let the invoker choose to call again the same subworkflow with different parameters, the nextPageToken in that case.


You can find the full Workflow code in the GitHub repository. You need to customize the assignment step to set your values, or to update this first step to dynamically get the values from Workflow Argument.
You might also need to add some error management checks if you have issues to handle automatically

Productionize your synchronization

With that workflow built and deployed, you have to execute it when required. It can be on a schedule, with Cloud Scheduler, or on an event.

For now, you have to write a Cloud Functions (or Cloud Run/App Engine) to catch the event and run a workflow execution. However, soon, you will be able to do it with Eventarc out of the box.


Data analytics and low latency databases are complementary and, to leverage the power of both, a safe and orchestrated workflow is the right way to reach the next level.


The original article published on Medium.

Related Posts