Google Cloud Functions is an even-driven, serverless computing platform. It provides an easy, auto-scaled way to stand up functions in the cloud that are run on a given payload, based on events. One of its use cases is the real-time processing of files. Since Cloud Functions is a Google product, it provides an especially easy way to respond to change notifications emerging from Google Cloud Storage (GCS). These notifications can be configured to trigger in response to various events inside a bucket—object finalization, deletion, archiving and metadata updates (learn more about those triggers: https://cloud.google.com/functions/docs/calling/storage). This guide shows an example of how to configure a solution that will update a BigQuery (BQ) table every time a new data file is loaded into a given GCS bucket. Along the way there will be notes about realistic configuration options, such as how to organize/name the necessary files, how to load dependencies, how to customize deployments, how to pass environment variables, and how to update deployments. Limitations of the solution will also be listed.
Architecture
Cloud Storage events used by Cloud Functions are based on Cloud Pub/Sub Notifications for Google Cloud Storage (https://cloud.google.com/storage/docs/pubsub-notifications). When a new item is uploaded to GCS, a notification is sent through Pub/Sub to Cloud Functions, which will run our function, updating the chosen BQ table.
Limitations:
- Cloud Functions can only be triggered by Cloud Storage buckets in the same Google Cloud Platform project.
- While git repos on Github and Bitbucket can be mirrored in Google Cloud Source Repositories and used as sources of the code for deployments, BitBucket self-hosted servers are still not supported (https://stackoverflow.com/questions/54667858/is-it-possible-to-mirror-a-bitbucket-repository-hosted-in-a-private-domain-in-go). The only alternatives to manual local deployment that work for folks with a self-hosted server is to have a process for triggering zipping and pushing the latest version of the code to GCS from within the self-hosted solution.
Steps
- Install and initialize the Cloud SDK (https://cloud.google.com/sdk/docs/).
- Important Note: Several of the available deploy flags we’ll use later require an updated version, so make sure to upgrade before starting.
- Create a GCS bucket if you don’t already have one you’d like to use
- Create a repo with the following files and file structure
your_repo_name/
├── main.py
├── requirements.txt
└── optional_custom_dependency # Could be a local or private remote dependency
├── init.py # don’t forget to initialize the folder
└── actual_module.py
- You must have a main.py file.
- Dependencies:
- If you have python requirements, they must be in a requirements.txt file in the same directory as main.py.
- Dependencies are installed in a Cloud Build environment that does not provide access to SSH keys. Packages hosted in repositories that require SSH-based authentication (e.g. our BitBucket) must be vendored and uploaded alongside your project’s code. In other words, you’d need to copy the modules/code from the BitBucket repo and include it in the directory uploaded for your cloud function. See https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
4. Write a function that we want run in the cloud
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | import logging import os import pandas as pd from google.cloud import bigquery # This env var is set automatically; see https://cloud.google.com/functions/docs/env-var#environment_variables_set_automatically project_id = os.environ.get('GCP_PROJECT') def from_gcs_trigger_to_bq(data, context, allowed_exts=("csv",)): """Background Cloud Function to be triggered by Cloud Storage. This function updates a BQ table with new files added to a GCS bucket. Args: data (dict): The Cloud Functions event payload. context (google.cloud.functions.Context): Metadata of triggering event. Returns: None; A BQ table as set in the environment variables is updated. """ # Read environment variables and event payload data dataset_id = os.environ.get('DATASET_ID', 'Environment variable "DATASET_ID" is not set.') table_id = os.environ.get('TABLE_ID', 'Environment variable "TABLE_ID" is not set.') bucket_name = data['bucket'] file_name = data['name'] uri = f"gs://{bucket_name}/{file_name}" full_table_path = f"{project_id}.{dataset_id}.{table_id}" # Example check that the file meets requirements assert file_name.endswith(allowed_exts), f"Only files with a type of {allowed_exts} allowed." # Example that doesn't require reading the file into memory # Determine whether to insert or update depending on if the file was new or updated. # The only way to tell if a file is new or updated is with 'metageneration'. See https://firebase.google.com/docs/functions/gcp-storage-events#access_storage_object_attributes if data['metageneration'] == 1: # 1 indicates a new object # insert client = bigquery.Client() dataset_ref = client.dataset(dataset_id, project=project_id) # See python docs: https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.LoadJobConfig.html # configuration options: https://cloud.google.com/bigquery/docs/reference/rest/v2/JobConfiguration#jobconfigurationload job_config = bigquery.LoadJobConfig() job_config.autodetect = True job_config.write_disposition = 'WRITE_APPEND' # 'WRITE_TRUNCATE' client.load_table_from_uri(uri, full_table_path, job_config=job_config) else: # update logging.info("This would update.") |
5. Deploy the files to Cloud Functions
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | GOOGLE_PROJECT_NAME=your_project_name CLOUD_FUNCTION_NAME=from_gcs_trigger_to_bq # matches function name above GCS_BUCKET_NAME=your-bucket-name # the chosen GCS bucket to read from MEMORY=256MB # This is the default; max of 2048MB TIMEOUT=60s # This is the default; Max of 540s DATASET_ID=your_dataset # manually set env var TABLE_ID=your_table # manually set env var gcloud functions deploy ${CLOUD_FUNCTION_NAME} \ --memory ${MEMORY} \ --retry \ --runtime python37 \ --timeout ${TIMEOUT} \ --trigger-resource ${GCS_BUCKET_NAME} \ --trigger-event google.storage.object.finalize \ --update-env-vars DATASET_ID=${DATASET_ID},TABLE_ID=${TABLE_ID} # use update instead of set, because it will add or update |
Note: To update the deployed code or other flags, simply rerun the gcloud deploy command with only the changed flags.
6. Drop files into the chosen GCS bucket and watch the magic happen
A full example repo, with the deployment command in deploy.sh can be found at https://github.com/ZaxR/gcs_triggered_google_cloud_functions.