Datadog Gold Partner logo

Quickly restore BigQuery dataset with time travel and Cloud Workflows

By guillaume blaquiere.Sep 24, 2022

Article-Quickly restore BigQuery dataset with time travel and Cloud Workflows-1

Data is gold for companies. Data lakes and data warehouses have been becoming popular years after years to keep and leverage that value.

Better is the data quality and richness, higher is its value and more powerful or accurate are the use cases that you can achieve with it.

However data pipelines are IT assets, and , as any of them, potential bugs, issues or misconfigurations can mess up your data, decrease the quality or break the consistency.

On Google Cloud, BigQuery offers a powerful feature to travel back in time, before the cataclysm!

BigQuery Time Travel

BigQuery is the Google Cloud data warehouse flagship. It’s serverless, you pay-as-you-use, there are tons of features.
One of them, not very known, is time travel: you can access your data state at any point of time over the past 7 days.

To use it, add the system time that you want to look at in your query.

SELECT *
FROM `mydataset.mytable`
FOR SYSTEM_TIME AS OF <DATE|TIMESTAMP>
Extend your knowledge on the change date

In addition to the time travel feature, a brand new one offers to know the change history of a BigQuery table.
It’s still limited to 7 days, but you can better understand what changed recently and how.

You can perform that type of request

SELECT
* except (_CHANGE_TYPE,_CHANGE_TIMESTAMP),
_CHANGE_TYPE AS change_type,
_CHANGE_TIMESTAMP AS change_time
FROM
APPENDS(TABLE mydataset.mytable, TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY), NULL);

Like that, you can see when a row has been updated and better understand the blast radius of your issue.

Restore a BigQuery table

It is interesting to know the past value of data. But now imagine

  • A user performs a query that changes, or even deletes the data
  • A nightly process is updated and disturbs all your data
  • On Monday, you discover a misconfiguration of the friday evening process

You want to restore the data at a certain point of time

And then, replay your workload with a fixed pipeline to keep the data quality as high as you can.


For that, you can perform that query in your table to get the precedent state and store in another table

CREATE TABLE mydataset.mynewtable AS 
SELECT *
FROM `mydataset.mytable`
FOR SYSTEM_TIME AS OF <DATE|TIMESTAMP>

Then you can

  • Drop the data of the current table
  • Insert the saved data in the current table
  • Delete the temporary new table

And that for all the tables of your dataset. So boring!!

Let’s automate that!

Automate the Dataset restoration

For automation, Cloud Workflows is a useful service to loop over all the dataset’s tables and to invoke BigQuery APIs.

Because, we use the API directly, we can simplify the process with the BigQuery Jobs API. In a query job insert:

  • Define the query in the source table with the time travel
  • Set the destination table equal to the source table,
  • Set the disposition to WRITE_TRUNCATE to clean the existing data before restoration

Like that, with a single API call, we can restore the data directly in the source table!

Let’s dive into the Workflows code.

Assign variables

Start by assigning global variable and the param variables




assign:
          - sourceDataset: ${param.source_dataset}
          - date: ${default(map.get(param,"date"),"TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -1 DAY)")}
          - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - location: "US"
          - maxResult: 100
          - pageToken: ""

In that example the date is optional. If omitted, the data from yesterday will be taken. The date is SQL injected (don’t let the script open to anyone!!). But that also means you can set a SQL formula.

List the tables in a dataset

The first step is to know the tables to restore. Most of the time, the whole tables in a dataset must be restore at a point of time to keep the data consistent.

The BigQuery tables list connector is used

- list-tables:
call: googleapis.bigquery.v2.tables.list
args:
datasetId: ${sourceDataset}
projectId: ${projectId}
maxResults: ${maxResult}
pageToken: ${pageToken}
result: listResult
Run in parallel the query jobs

With the list of tables, we can iterate over it and invoke the BigQuery Jobs insert connector.
To optimize the process, we use the parallel feature of Cloud Workflows

- perform-restore:
parallel:
for:
value: table
in: ${listResult.tables}
steps:
- snapshot:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${projectId}
body:
configuration:
query:
useLegacySql: false
destinationTable:
datasetId: ${sourceDataset}
projectId: ${projectId}
tableId: ${table.tableReference.tableId}
query: ${"select * from " + projectId + "." + sourceDataset + "." + table.tableReference.tableId + " FOR SYSTEM TIME AS OF " + date }
writeDisposition: "WRITE_TRUNCATE"

Note the date parameter injection in the time travel part. You can also notice that the table in the query and in the destination are the same

Iterate over the tables’ pages

If your dataset has a lot of tables, the list tables query can have several pages. Iterate over them

- check-iterate-pages:
switch:
- condition: ${default(map.get(listResult, "nextPageToken"),"") != ""}
steps:
- loop-over-pages:
assign:
- pageToken: ${listResult.nextPageToken}
next: list-tables- check-iterate-pages:
switch:
- condition: ${default(map.get(listResult, "nextPageToken"),"") != ""}
steps:
- loop-over-pages:
assign:
- pageToken: ${listResult.nextPageToken}
next: list-tables

Here the whole code

main:
params: [param]
steps:
- assignStep:
assign:
- sourceDataset: ${param.source_dataset}
- date: ${default(map.get(param,"date"),"TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -1 DAY)")}
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location: "US"
- maxResult: 100
- pageToken: ""
- list-tables:
call: googleapis.bigquery.v2.tables.list
args:
datasetId: ${sourceDataset}
projectId: ${projectId}
maxResults: ${maxResult}
pageToken: ${pageToken}
result: listResult
- perform-restore:
parallel:
for:
value: table
in: ${listResult.tables}
steps:
- snapshot:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${projectId}
body:
configuration:
query:
useLegacySql: false
destinationTable:
datasetId: ${sourceDataset}
projectId: ${projectId}
tableId: ${table.tableReference.tableId}
query: ${"select * from " + projectId + "." + sourceDataset + "." + table.tableReference.tableId + " FOR SYSTEM TIME AS OF " + date }
writeDisposition: "WRITE_TRUNCATE"
- check-iterate-pages:
switch:
- condition: ${default(map.get(listResult, "nextPageToken"),"") != ""}
steps:
- loop-over-pages:
assign:
- pageToken: ${listResult.nextPageToken}
next: list-tables

Deploy your workflow with that command

gcloud workflows deploy <workflowName> \
--source=<workflowFileName> \
--service-account=<runtimeServiceAccountEmail>

The runtime service account must have the BigQuery admin role on the dataset.

And run it

gcloud workflows run <workflowName> \
--data='{"source_dataset":"<YourDataset>"}'

You can also mention the date as input

gcloud workflows run <workflowName> \
--data='{"source_dataset":"<YourDataset>", "date":"2022-09-15"}'

Keep your data value

Your data are critical in your data warehouse, and the combination of BigQuery and Cloud Workflows can help you in that task.

I already presented how to snapshot your dataset to be able to go beyond the 7 days of the time travel.
This time, it’s a quick rollback possible in case of data corruption.

Issues happenmistakes exist, find your way to protect the core value!


The original article published on Medium.

Related Posts