PubSub

bibtutils.gcp.pubsub

Functionality making use of GCP’s PubSubs.

See the official PubSub Python Client documentation here: link.

bibtutils.gcp.pubsub.process_trigger(context, event=None, timeout_secs=1800, notify_slack=False, fail_alert_webhook_secret_uri='FAIL_ALERT_WEBHOOK_SECRET_URI')[source]

Check timestamp of triggering event; catches infinite retry loops on ‘retry on fail’ cloud functions. Its good practice to always call this function first in a Cloud Function. Additionally, be sure to wrap the call to this function in a try/except block where the except block returns normally. This ensures that an exception raised here does not result in an infinite rety loop.

If the timeout has been exceeded and notify_slack=True, will attempt to alert via Slack after fetching a webhook in Secret Manager whose URI should be provided in the environment variable specified in the function call. It expects to find a full secret URI in that environment variable, not just a secret name!

If (and only if) the triggering pubsub’s event is also passed and has a payload, it will be decoded as utf-8 and returned. Otherwise, this function returns None.

import json
from bibtutil.gcp.pubsub import process_trigger
def main(event, context):
    try:
        payload = process_trigger(context, event=event)
        if not payload:
            raise IOError('No payload in triggering pubsub!')
        payload = json.loads(payload)
    except Exception as e:
        _LOGGER.critical(f'Exception while processing trigger: {type(e).__name__}: {e}')
        return
Parameters:
  • context (google.cloud.functions.Context) – the triggering pubsub’s context.

  • event (dict) – (Optional) the triggering pubsub’s event. defaults to None.

  • timeout_secs (int) – (Optional) the number of seconds to consider as the timeout threshold from the original trigger time. Defaults to 1800.

  • notify_slack (bool) – (Optional) whether or not to attempt to notify a slack channel specified by fail_alert_webhook_secret_uri. If the notification attempt fails, simply passes. Defaults to False

  • fail_alert_webhook_secret_uri (str) – (Optional) the name of the environment variable from which to read the secret URI. Defaults to 'FAIL_ALERT_WEBHOOK_SECRET_URI'. secret uri format in the envar: 'projects/{host_project}/secrets/{secret_name}/versions/latest'.

Return type:

str OR None

Returns:

the pubsub payload, if present.

bibtutils.gcp.pubsub.retrigger_self(payload, proj_envar='_GOOGLE_PROJECT', topic_envar='_TRIGGER_TOPIC', **kwargs)[source]

Dispatches the next iteration of a PubSub-triggered Cloud Function. Any extra arguments (kwargs) are passed to the send_pubsub() function.

from bibtutils.gcp.pubsub import process_trigger, retrigger_self
def main(event, context):
    payload = process_trigger(event, context=context)
    print(payload)
    retrigger_self('All work and no play makes Jack a dull boy')
Parameters:
  • payload (dict OR str) – the pubsub payload. can be either a dict or a str. will be converted to bytes before sending.

  • proj_envar (str) – (Optional) the environment variable to reference for current GCP project. Defaults to '_GOOGLE_PROJECT'.

  • topic_envar (str) – (Optional) the environment variable to reference for the triggering pubsub topic. Defaults to '_TRIGGER_TOPIC'.

bibtutils.gcp.pubsub.send_pubsub(topic_uri, payload, credentials=None)[source]

Publishes a pubsub message to the specified topic. Executing account must have pubsub publisher permissions on the topic or in the project.

from bibtutils.gcp.pubsub import process_trigger, send_pubsub
def main(event, context):
    process_trigger(event)
    topic_uri = (
        f'projects/{os.environ["GOOGLE_PROJECT"]}'
        f'/topics/{os.environ["NEXT_TOPIC"]}'
    )
    send_pubsub(
        topic_uri=topic_uri,
        payload={'favorite_color': 'blue'}
    )
Parameters:
  • topic_uri (str) – the topic on which to publish. topic uri format: 'projects/{project_name}/topics/{topic_name}'

  • payload (dict OR str) – the pubsub payload. can be either a dict or a str. will be converted to bytes before sending.

  • credentials (google.oauth2.credentials.Credentials) – the credentials object to use when making the API call, if not to use the account running the function for authentication.