Source code for bibtutils.gcp.pubsub
"""
bibtutils.gcp.pubsub
~~~~~~~~~~~~~~~~~~~~
Functionality making use of GCP's PubSubs.
See the official PubSub Python Client documentation here: `link <https://googleapis.dev/python/pubsub/latest/index.html>`_.
"""
import base64
import json
import logging
import os
from datetime import datetime
from datetime import timezone
from warnings import warn
from dateutil.parser import parse
from google.cloud import pubsub_v1
from bibtutils.gcp.secrets import get_secret_by_uri
from bibtutils.slack.error import send_cf_fail_alert
warn(
"This library is deprecated. Please use a supported library: "
"https://broadinstitute.github.io/bibt-libraries/",
DeprecationWarning,
)
_LOGGER = logging.getLogger(__name__)
[docs]
def send_pubsub(topic_uri, payload, credentials=None):
"""
Publishes a pubsub message to the specified topic. Executing account
must have pubsub publisher permissions on the topic or in the project.
.. code:: python
from bibtutils.gcp.pubsub import process_trigger, send_pubsub
def main(event, context):
process_trigger(event)
topic_uri = (
f'projects/{os.environ["GOOGLE_PROJECT"]}'
f'/topics/{os.environ["NEXT_TOPIC"]}'
)
send_pubsub(
topic_uri=topic_uri,
payload={'favorite_color': 'blue'}
)
:type topic_uri: :py:class:`str`
:param topic_uri: the topic on which to publish.
topic uri format: ``'projects/{project_name}/topics/{topic_name}'``
:type payload: :py:class:`dict` OR :py:class:`str`
:param payload: the pubsub payload. can be either a ``dict`` or a ``str``.
will be converted to bytes before sending.
:type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials`
:param credentials: the credentials object to use when making the API call, if not to
use the account running the function for authentication.
"""
publisher = pubsub_v1.PublisherClient(credentials=credentials)
_LOGGER.info(f"Payload: {payload}\nPubSub: {topic_uri}")
# Convert to Bytes then publish message.
if isinstance(payload, dict):
payload = json.dumps(payload, default=str)
payload_bytes = payload.encode("utf-8")
publisher.publish(topic=topic_uri, data=payload_bytes)
_LOGGER.info("PubSub sent.")
return
[docs]
def retrigger_self(
payload, proj_envar="_GOOGLE_PROJECT", topic_envar="_TRIGGER_TOPIC", **kwargs
):
"""
Dispatches the next iteration of a PubSub-triggered Cloud Function.
Any extra arguments (``kwargs``) are passed to the :func:`~bibtutils.gcp.pubsub.send_pubsub` function.
.. code:: python
from bibtutils.gcp.pubsub import process_trigger, retrigger_self
def main(event, context):
payload = process_trigger(event, context=context)
print(payload)
retrigger_self('All work and no play makes Jack a dull boy')
:type payload: :py:class:`dict` OR :py:class:`str`
:param payload: the pubsub payload. can be either a ``dict`` or a ``str``.
will be converted to bytes before sending.
:type proj_envar: :py:class:`str`
:param proj_envar: (Optional) the environment variable to
reference for current GCP project. Defaults to ``'_GOOGLE_PROJECT'``.
:type topic_envar: :py:class:`str`
:param topic_envar: (Optional) the environment variable to
reference for the triggering pubsub topic. Defaults to ``'_TRIGGER_TOPIC'``.
"""
_LOGGER.info(f"Dispatching next worker.")
topic = (
f"projects/{os.environ.get(proj_envar)}/topics/{os.environ.get(topic_envar)}"
)
send_pubsub(topic, payload, **kwargs)
return
[docs]
def process_trigger(
context,
event=None,
timeout_secs=1800,
notify_slack=False,
fail_alert_webhook_secret_uri="FAIL_ALERT_WEBHOOK_SECRET_URI",
):
"""Check timestamp of triggering event; catches infinite retry
loops on 'retry on fail' cloud functions. Its good practice to always call
this function first in a Cloud Function. Additionally, **be sure to wrap the call
to this function in a try/except block where the except block returns normally.**
This ensures that an exception raised here does not result in an infinite rety loop.
If the timeout has been exceeded and `notify_slack=True`, will attempt to alert via Slack after
fetching a webhook in Secret Manager whose URI should be provided in the
environment variable specified in the function call. **It expects to find a
full secret URI in that environment variable, not just a secret name!**
**If (and only if)** the triggering pubsub's event is also passed **and has a payload**,
it will be decoded as utf-8 and returned. Otherwise, this function returns ``None``.
.. code-block:: python
import json
from bibtutil.gcp.pubsub import process_trigger
def main(event, context):
try:
payload = process_trigger(context, event=event)
if not payload:
raise IOError('No payload in triggering pubsub!')
payload = json.loads(payload)
except Exception as e:
_LOGGER.critical(f'Exception while processing trigger: {type(e).__name__}: {e}')
return
:type context: :class:`google.cloud.functions.Context`
:param context: the triggering pubsub's context.
:type event: :py:class:`dict`
:param event: (Optional) the triggering pubsub's event. defaults to :py:class:`None`.
:type timeout_secs: :py:class:`int`
:param timeout_secs: (Optional) the number of seconds to consider as
the timeout threshold from the original trigger time. Defaults to 1800.
:type notify_slack: :py:class:`bool`
:param notify_slack: (Optional) whether or not to attempt to notify
a slack channel specified by `fail_alert_webhook_secret_uri`. If
the notification attempt fails, simply passes. Defaults to `False`
:type fail_alert_webhook_secret_uri: :py:class:`str`
:param fail_alert_webhook_secret_uri: (Optional) the name of the
environment variable from which to read the secret URI. Defaults
to ``'FAIL_ALERT_WEBHOOK_SECRET_URI'``. secret uri format in the
envar: ``'projects/{host_project}/secrets/{secret_name}/versions/latest'``.
:rtype: :py:class:`str` OR :py:class:`None`
:returns: the pubsub payload, if present.
"""
_LOGGER.info(f"Processing PubSub: {context.event_id}")
utctime = datetime.now(timezone.utc)
eventtime = parse(context.timestamp)
lapsed = utctime - eventtime
lapsed = datetime.now(timezone.utc) - parse(context.timestamp)
_LOGGER.info(f"Lapsed time since triggering event: {lapsed.total_seconds()}")
if lapsed.total_seconds() > timeout_secs:
_LOGGER.critical(
f"Threshold of {timeout_secs} seconds exceeded by "
f"{lapsed.total_seconds()-timeout_secs} seconds. Exiting."
)
if notify_slack == True:
try:
webhook = get_secret_by_uri(
os.environ.get(fail_alert_webhook_secret_uri)
)
webhook = json.loads(webhook)
try:
send_cf_fail_alert(utctime, eventtime, webhook["hook"])
except Exception as e:
_LOGGER.error(
f"Could not send fail alert to Slack: {type(e).__name__} : {e}"
)
pass
except Exception as e:
_LOGGER.error(
"Could not get the Slack alert webhook from envar: "
f"{fail_alert_webhook_secret_uri}. Did you set a value "
f"here? Exception: {type(e).__name__} : {e}"
)
pass
raise TimeoutError(
f"Threshold of {timeout_secs} seconds exceeded by "
f"{lapsed.total_seconds()-timeout_secs} seconds. Exiting."
)
if event != None and "data" in event:
return base64.b64decode(event["data"]).decode("utf-8")
return None