Source code for bibtutils.gcp.storage

"""
bibtutils.gcp.storage
~~~~~~~~~~~~~~~~~~~~~

Functionality making use of GCP's Cloud Storage.

See the official Cloud Storage Python Client documentation here: `link <https://googleapis.dev/python/storage/latest/index.html>`_.

"""
import datetime
import json
import logging
from warnings import warn

from google.api_core import exceptions as google_exceptions
from google.cloud import storage

warn(
    "This library is deprecated. Please use a supported library: "
    "https://broadinstitute.github.io/bibt-libraries/",
    DeprecationWarning,
)
_LOGGER = logging.getLogger(__name__)


[docs] def create_bucket(project, bucket_name, location="US", credentials=None): """ Creates a Google Cloud Storage bucket in the specified project. :type project: :py:class:`str` :param project: the project in which to create the bucket. The account being used **must** have "Storage Admin" rights on **the GCP project**. :type bucket_name: :py:class:`str` :param bucket_name: the name of the bucket to create. Note that bucket names must be **universally** unique in GCP, and need to adhere to the GCS bucket naming guidelines: https://cloud.google.com/storage/docs/naming-buckets :type location: (Optional) :py:class:`str` :param location: if specified, creates the dataset in the desired location/region. The locations and regions supported are listed in #locations_and_regions. if unspoecified https://cloud.google.com/bigquery/docs/locations defaults to US. :type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials` :param credentials: the credentials object to use when making the API call, if not to use the account running the function for authentication. :rtype: :py:class:`gcp_storage:google.cloud.storage.bucket.Bucket` :returns: The bucket created during this function call. """ _LOGGER.info( f"Attempting to create bucket: [{bucket_name}] in project: [{project}]" ) client = storage.Client(credentials=credentials) bucket = client.bucket(bucket_name) try: bucket = client.create_bucket(bucket, project=project, location=location) except ( google_exceptions.Forbidden, google_exceptions.Conflict, google_exceptions.BadRequest, ) as e: if google_exceptions.Forbidden: _LOGGER.error( "Current account does not have required permissions to create " f"buckets in GCP project: [{project}]. Navigate to " f"https://console.cloud.google.com/iam-admin/iam?project={project} " 'and add the "Storage Admin" role to the appropriate account.' ) raise e _LOGGER.info(f"Bucket: [{bucket.name}] created successfully.") return bucket
[docs] def read_gcs(bucket_name, blob_name, decode=True, credentials=None): """ Reads the contents of a blob from GCS. Service account must have (at least) read permissions on the bucket/blob. Note that for **extremely** large files having ``decode=True`` can increase runtime substantially. .. code:: python from bibtutils.gcp.storage import read_gcs data = read_gcs('my_bucket', 'my_blob') print(data) :type bucket_name: :py:class:`str` :param bucket_name: the bucket hosting the specified blob. :type blob_name: :py:class:`str` :param blob_name: the blob to read from GCS. :type decode: :py:class:`bool` :param decode: (Optional) whether or not to decode the blob contents into utf-8. Defaults to ``True``. :type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials` :param credentials: the credentials object to use when making the API call, if not to use the account running the function for authentication. :rtype: :py:class:`str` :returns: blob contents, decoded to utf-8. """ _LOGGER.info(f"Getting gs://{bucket_name}/{blob_name}") client = storage.Client(credentials=credentials) blob = client.get_bucket(bucket_name).get_blob(blob_name) contents = blob.download_as_bytes() if decode: return contents.decode("utf-8") return contents
[docs] def read_gcs_nldjson(bucket_name, blob_name, **kwargs): """ Reads a blob in JSON NLD format from GCS and returns it as a list of dicts. Any extra arguments (``kwargs``) are passed to the :func:`~bibtutils.gcp.storage.read_gcs` function. .. code:: python from bibtutils.gcp.storage import read_gcs_nldjson data = read_gcs_nldjson('my_bucket', 'my_nldjson_blob') print(item['favorite_color'] for item in data) :type bucket_name: :py:class:`str` :param bucket_name: the bucket hosting the specified blob. :type blob_name: :py:class:`str` :param blob_name: the blob to read from GCS. :rtype: :py:class:`list` :returns: the data from the blob, converted into a list of :py:class:`dict`. """ json_nld = read_gcs(bucket_name, blob_name, decode=True, **kwargs) _LOGGER.info("Converting from JSON NLD to JSON...") json_list = "[" + json_nld.replace("\n", ",") json_list = json_list.rstrip(",") + "]" return json.loads(json_list)
[docs] def write_gcs( bucket_name, blob_name, data, mime_type="text/plain", create_bucket_if_not_found=False, timeout=storage.constants._DEFAULT_TIMEOUT, credentials=None, ): """ Writes a String to GCS storage under a given blob name to the given bucket. The executing account must have (at least) write permissions to the bucket. If ``data`` is a `str`, will be encoded as utf-8 before uploading. .. code:: python from bibtutils.gcp.storage import write_gcs write_gcs('my_bucket', 'my_blob', data='my favorite color is blue') :type bucket_name: :py:class:`str` :param bucket_name: the name of the bucket to which to write. :type blob_name: :py:class:`str` :param blob_name: the name of the blob to write. :type data: :py:class:`str` OR :py:class:`bytes` :param data: the data to be written. :type create_bucket_if_not_found: :py:class:`bool` :param create_bucket_if_not_found: (Optional) if ``True``, will attempt to create the bucket if it does not exist. Defaults to ``False``. :type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials` :param credentials: the credentials object to use when making the API call, if not to use the account running the function for authentication. :type content_type: :py:class:`str` :param content_type: (Optional) the `MIME type <https://www.iana.org/assignments/media-types/media-types.xhtml>`_ being uploaded. defaults to ``'text/plain'``. """ client = storage.Client(credentials=credentials) try: bucket = client.get_bucket(bucket_name) except google_exceptions.NotFound as e: _LOGGER.error(e.message) _LOGGER.info(f"create_bucket_if_not_found=={create_bucket_if_not_found}") if not create_bucket_if_not_found: raise e bucket = create_bucket(client.project, bucket_name) blob = bucket.blob(blob_name) _LOGGER.info(f"Writing to GCS: gs://{bucket_name}/{blob_name}") blob.upload_from_string(data, content_type=mime_type, timeout=timeout) _LOGGER.info("Upload complete.") return
[docs] def write_gcs_nldjson(bucket_name, blob_name, json_data, add_date=False, **kwargs): """ Writes a dict to GCS storage under a given blob name to the given bucket. The executing account must have (at least) write permissions to the bucket. Use in conjunction with :func:`~bibtutils.gcp.bigquery.upload_gcs_json` to upload JSON data to BigQuery tables. Any extra arguments (``kwargs``) are passed to the :func:`~bibtutils.gcp.storage.write_gcs` function. .. code:: python from bibtutils.gcp.storage import write_gcs_nldjson write_gcs_nldjson( 'my_bucket', 'my_nldjson_blob', json_data=[ {'name': 'leo', 'favorite_color': 'red'}, {'name': 'matthew', 'favorite_color': 'blue'} ] ) :type bucket_name: :py:class:`str` :param bucket_name: the name of the bucket to which to write. :type blob_name: :py:class:`str` :param blob_name: the name of the blob to write. :type json_data: :py:class:`list` OR :py:class:`dict` :param json_data: the data to be written. can be a list or a dict. will treat a dict as one row of data (and convert it to a one-item list). data will be converted to a JSON NLD formatted string before uploading for compatibility with :func:`~bibtutils.gcp.bigquery.upload_gcs_json`. :type add_date: :py:class:`bool` :param add_date: (Optional) whether or not to add upload date to the data before upload. Defaults to ``False``. :type create_bucket_if_not_found: :py:class:`bool` :param create_bucket_if_not_found: (Optional) if ``True``, will attempt to create the bucket if it does not exist. Defaults to ``False``. """ nld_json = _generate_json_nld(json_data, add_date) write_gcs(bucket_name, blob_name, nld_json, **kwargs) return
def _generate_json_nld(json_data, add_date): """ Takes a dict object and returns a string in JSON NLD format. Compatible with uploading to BQ. :type json_data: :py:class:`dict` :param json_data: the data to be converted to JSON NLD. :type add_date: :py:class:`bool` :param add_date: whether or not to add upload date to the data before upload. :rtype: :py:class:`str` :returns: formatted JSON NLD. """ _LOGGER.info("Generating JSON NLD...") json_nld = "" if isinstance(json_data, dict): json_data = [json_data] for item in json_data: if add_date: item["upload_date"] = datetime.date.today().isoformat() json_nld += f"{json.dumps(item)}\n" _LOGGER.info("Generated.") return json_nld