PubSub¶
bibtutils.gcp.pubsub¶
Functionality making use of GCP’s PubSubs.
See the official PubSub Python Client documentation here: link.
- bibtutils.gcp.pubsub.process_trigger(context, event=None, timeout_secs=1800, notify_slack=False, fail_alert_webhook_secret_uri='FAIL_ALERT_WEBHOOK_SECRET_URI')[source]¶
Check timestamp of triggering event; catches infinite retry loops on ‘retry on fail’ cloud functions. Its good practice to always call this function first in a Cloud Function. Additionally, be sure to wrap the call to this function in a try/except block where the except block returns normally. This ensures that an exception raised here does not result in an infinite rety loop.
If the timeout has been exceeded and notify_slack=True, will attempt to alert via Slack after fetching a webhook in Secret Manager whose URI should be provided in the environment variable specified in the function call. It expects to find a full secret URI in that environment variable, not just a secret name!
If (and only if) the triggering pubsub’s event is also passed and has a payload, it will be decoded as utf-8 and returned. Otherwise, this function returns
None
.import json from bibtutil.gcp.pubsub import process_trigger def main(event, context): try: payload = process_trigger(context, event=event) if not payload: raise IOError('No payload in triggering pubsub!') payload = json.loads(payload) except Exception as e: _LOGGER.critical(f'Exception while processing trigger: {type(e).__name__}: {e}') return
- Parameters:
context (
google.cloud.functions.Context
) – the triggering pubsub’s context.event (
dict
) – (Optional) the triggering pubsub’s event. defaults toNone
.timeout_secs (
int
) – (Optional) the number of seconds to consider as the timeout threshold from the original trigger time. Defaults to 1800.notify_slack (
bool
) – (Optional) whether or not to attempt to notify a slack channel specified by fail_alert_webhook_secret_uri. If the notification attempt fails, simply passes. Defaults to Falsefail_alert_webhook_secret_uri (
str
) – (Optional) the name of the environment variable from which to read the secret URI. Defaults to'FAIL_ALERT_WEBHOOK_SECRET_URI'
. secret uri format in the envar:'projects/{host_project}/secrets/{secret_name}/versions/latest'
.
- Return type:
str
ORNone
- Returns:
the pubsub payload, if present.
- bibtutils.gcp.pubsub.retrigger_self(payload, proj_envar='_GOOGLE_PROJECT', topic_envar='_TRIGGER_TOPIC', **kwargs)[source]¶
Dispatches the next iteration of a PubSub-triggered Cloud Function. Any extra arguments (
kwargs
) are passed to thesend_pubsub()
function.from bibtutils.gcp.pubsub import process_trigger, retrigger_self def main(event, context): payload = process_trigger(event, context=context) print(payload) retrigger_self('All work and no play makes Jack a dull boy')
- Parameters:
payload (
dict
ORstr
) – the pubsub payload. can be either adict
or astr
. will be converted to bytes before sending.proj_envar (
str
) – (Optional) the environment variable to reference for current GCP project. Defaults to'_GOOGLE_PROJECT'
.topic_envar (
str
) – (Optional) the environment variable to reference for the triggering pubsub topic. Defaults to'_TRIGGER_TOPIC'
.
- bibtutils.gcp.pubsub.send_pubsub(topic_uri, payload, credentials=None)[source]¶
Publishes a pubsub message to the specified topic. Executing account must have pubsub publisher permissions on the topic or in the project.
from bibtutils.gcp.pubsub import process_trigger, send_pubsub def main(event, context): process_trigger(event) topic_uri = ( f'projects/{os.environ["GOOGLE_PROJECT"]}' f'/topics/{os.environ["NEXT_TOPIC"]}' ) send_pubsub( topic_uri=topic_uri, payload={'favorite_color': 'blue'} )
- Parameters:
topic_uri (
str
) – the topic on which to publish. topic uri format:'projects/{project_name}/topics/{topic_name}'
payload (
dict
ORstr
) – the pubsub payload. can be either adict
or astr
. will be converted to bytes before sending.credentials (
google.oauth2.credentials.Credentials
) – the credentials object to use when making the API call, if not to use the account running the function for authentication.