Source code for bibt.gcp.asset.classes

"""
Classes
~~~~~~~

"""

import logging
import re

import google.auth.transport.requests
from google.cloud import asset_v1

_LOGGER = logging.getLogger(__name__)

_GCP_PROJECT_NUM_REGEX = (
    r"^//cloudresourcemanager.googleapis.com/(?P<project_id>projects/[0-9]{5,20}$)"
)


[docs] class Client: """Instantiates a Client object for further API calls. .. code:: python from bibt.gcp import asset client = asset.Client(os.environ["GCP_ORG_ID"]) asset = client.get_asset(...) :type gcp_org_id: :py:class:`str` :param gcp_org_id: your GCP organization ID. needed to query the Cloud Asset Inventory API. :type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials` :param credentials: the credentials object to use when making API calls, if not using the account running the function for authentication. """ def __init__(self, gcp_org_id, credentials=None): self._client = asset_v1.AssetServiceClient(credentials=credentials) _LOGGER.debug( "Client token will expire: " f"[{str(self._client._transport._credentials.expiry)}]" ) self.gcp_org_id = gcp_org_id def _ensure_valid_client(self): try: credentials = self._client._credentials except AttributeError: try: credentials = self._client._transport._credentials except AttributeError: _LOGGER.error("Could not verify credentials in client.") return if not credentials.valid or not credentials.expiry: _LOGGER.info( "Refreshing client credentials, token expired: " f"[{str(credentials.expiry)}]" ) request = google.auth.transport.requests.Request() credentials.refresh(request=request) _LOGGER.info(f"New expiration: [{str(credentials.expiry)}]") else: _LOGGER.debug( f"Token is valid: [{credentials.valid}] " f"expires: [{str(credentials.expiry)}]" ) return
[docs] def list_assets(self, parent, asset_types=None, content_type=None, page_size=1000): """List assets from the CAI API. For more information, view `the documentation <https://cloud.google.com/asset-inventory/docs/reference/rest/v1/assets/list>`__. :type parent: :py:class:`str` :param parent: the parent resource to search under. Can be one of: ``organizations/1234``, ``folders/1234``, ``projects/1234``, or ``projects/name``. :type asset_types: :py:class:`list` :param asset_types: a list of asset types to return, in the format ``compute.googleapis.com/Disk`` (for example). See `here <https://cloud.google.com/asset-inventory/docs/supported-asset-types#searchable_asset_types>`__ for all supported types. :type content_type: :py:class:`str` :param content_type: the type of data to return. usually you'll want ``RESOURCE``, but can be any value from `here <https://cloud.google.com/asset-inventory/docs/reference/rest/v1/feeds#ContentType>`__. :type page_size: :py:class:`int` :param page_size: the number of results to return per page. a lower number will result in more API calls. :rtype: `ListAssetsPager <https://cloud.google.com/python/docs/reference/cloudasset/latest/google.cloud.asset_v1.services.asset_service.pagers.ListAssetsPager>`__ :returns: a pager which may be iterated on to retrieve results. contains `Asset <https://cloud.google.com/asset-inventory/docs/reference/rest/v1/Asset>`__ objects. """ # noqa E501 _LOGGER.info( "Building list_assets request with parent " f"[{parent}] and type {asset_types}" ) request = { "parent": parent, "read_time": None, "page_size": page_size, } if type(asset_types) is not list and asset_types is not None: asset_types = [asset_types] if asset_types is not None: request["asset_types"] = asset_types if content_type is not None: request["content_type"] = content_type _LOGGER.debug(f"Request: {request}") self._ensure_valid_client() result = self._client.list_assets(request=request) if len(result.assets) < 1: _LOGGER.warning(f"No assets returned for list_assets({request})") return result
[docs] def get_asset( self, scope, asset_name, asset_types=None, detailed=True, page_size=1000 ): """Get a specific asset by name from the CAI API. :type scope: :py:class:`str` :param scope: the parent resource to search under. Can be one of: ``organizations/1234``, ``folders/1234``, ``projects/1234``, or ``projects/name``. :type asset_name: :py:class:`str` :param asset_name: the name of the asset, in the form of ``//cloudsql.googleapis.com/projects/my-project/instances/my-db``. :type asset_types: :py:class:`list` :param asset_types: a list of asset types to return, in the format ``compute.googleapis.com/Disk`` (for example). See `here <https://cloud.google.com/asset-inventory/docs/supported-asset-types#searchable_asset_types>`__ for all supported types. :type detailed: :py:class:`bool` :param detailed: if true, will get the full resource metadata from a ``list_assets`` call, otherwise just returns basic metadata from ``search_assets``. :type page_size: :py:class:`int` :param page_size: the number of results to return per page. a lower number will result in more API calls. :rtype: `Asset <https://cloud.google.com/asset-inventory/docs/reference/rest/v1/Asset>`__ :returns: an asset object (or ``None``). """ # noqa E501 _LOGGER.info( f"Searching for asset: {asset_name} under scope " f"{scope} with type {asset_types}" ) search_str = self._generate_asset_search_str(asset_name) _LOGGER.debug(f"Searching: {search_str}") result = self.search_assets( scope, search_str, asset_types=asset_types, page_size=1, ) if len(result.results) > 0: asset = result.results[0] else: _LOGGER.warning( f"No asset returned for {search_str} under scope " f"{scope} with type {asset_types}" ) asset = None if asset and detailed: _LOGGER.info("Getting detailed metadata from list_assets endpoint...") for _asset in self.list_assets( asset.project, asset_types=[asset.asset_type], content_type="RESOURCE", page_size=page_size, ): if _asset.name == asset.name: _LOGGER.debug(f"Match found on {asset.name}") asset = _asset break else: _LOGGER.debug(f"Does not match: {_asset.name} != {asset.name}") return asset
[docs] def get_parent_project(self, scope, asset): """For a given scope and asset, attempts to retrieve the parent project. If a project, folder, or organization is passed, simply returns that. :type scope: :py:class:`str` :param scope: the parent resource to search under. Can be one of: ``organizations/1234``, ``folders/1234``, ``projects/1234``, or ``projects/name``. :type asset: `Asset <https://cloud.google.com/asset-inventory/docs/reference/rest/v1/Asset>`__ :param asset: the asset object for which to return the parent. :rtype: `Asset <https://cloud.google.com/asset-inventory/docs/reference/rest/v1/Asset>`__ :returns: an asset object representing a project, folder, or organization (or ``None``). """ # noqa E501 _LOGGER.info( f"Trying to get parent project of {asset.name} using scope {scope}" ) if (asset.asset_type == "cloudresourcemanager.googleapis.com/Folder") or ( asset.asset_type == "cloudresourcemanager.googleapis.com/Organization" ): raise Exception( "Parent project cannot be retrieved for folders or organizations!" ) if asset.asset_type == ("cloudresourcemanager.googleapis.com/Project"): return asset try: _LOGGER.debug( "Trying to get parent project using asset.project attribute..." ) return self.search_assets( scope, f'project="{asset.project}"', asset_types=["cloudresourcemanager.googleapis.com/Project"], page_size=1, ).results[0] except Exception as e: _LOGGER.debug(f"That didn't work: {type(e).__name__}: {e}") pass _LOGGER.debug( "Trying to get parent project using " "asset.parent_full_resource_name attribute..." ) search_str = self._generate_asset_search_str(asset.parent_full_resource_name) _LOGGER.debug(f"Searching: {search_str}") parent = self.search_assets( scope, search_str, asset_types=[asset.parent_asset_type], page_size=1, ) if len(parent.results) > 0: return self.get_parent_project(scope, parent.results[0]) _LOGGER.warning(f'No asset returned for get_parent_project({asset})")') return None
[docs] def search_assets( self, scope, query, asset_types=None, order_by=None, page_size=1000 ): """Search assets from the CAI API. This provides minimal asset metadata, use ``list_assets`` for full information. For more information, view `the documentation <https://cloud.google.com/asset-inventory/docs/searching-resources#search_resources>`__. :type scope: :py:class:`str` :param scope: the parent resource to search under. Can be one of: ``organizations/1234``, ``folders/1234``, ``projects/1234``, or ``projects/name``. :type query: :py:class:`str` :param query: an asset query, see `here <https://cloud.google.com/asset-inventory/docs/query-syntax>`__ for more information. :type asset_types: :py:class:`list` :param asset_types: a list of asset types to return, in the format ``compute.googleapis.com/Disk`` (for example). See `here <https://cloud.google.com/asset-inventory/docs/supported-asset-types#searchable_asset_types>`__ for all supported types. :type order_by: :py:class:`str` :param order_by: the field(s) to order results by. sorting can increase response wait time. view the documentation linked above for usable fields. :type page_size: :py:class:`int` :param page_size: the number of results to return per page. a lower number will result in more API calls. :rtype: `SearchAllResourcesPager <https://cloud.google.com/python/docs/reference/cloudasset/latest/google.cloud.asset_v1.services.asset_service.pagers.SearchAllResourcesPager>`__ :returns: a pager which may be iterated on to retrieve results. contains `ResourceSearchResult <https://cloud.google.com/python/docs/reference/cloudasset/latest/google.cloud.asset_v1.types.ResourceSearchResult>`__ objects. """ # noqa E501 _LOGGER.info( f"Searching assets with scope {scope} query " f"[{query}] asset_types = {asset_types}" ) request = { "scope": scope, "query": query, "page_size": page_size, } if type(asset_types) is not list and asset_types is not None: asset_types = [asset_types] if asset_types is not None: request["asset_types"] = asset_types if order_by is not None: request["order_by"] = order_by _LOGGER.debug(f"Request: {request}") self._ensure_valid_client() result = self._client.search_all_resources(request) if len(result.results) < 1: _LOGGER.warning(f"No assets returned for search_assets({request})") return result
[docs] def search_asset_iam_policy(self, scope, query): """Search IAM policies from the CAI API. For more information, view `the documentation <https://cloud.google.com/asset-inventory/docs/searching-iam-policies#search_policies>`__. :type scope: :py:class:`str` :param scope: the parent resource to search under. Can be one of: ``organizations/1234``, ``folders/1234``, ``projects/1234``, or ``projects/name``. :type query: :py:class:`str` :param query: an asset query, see `here <https://cloud.google.com/asset-inventory/docs/query-syntax>`__ for more information. :rtype: `SearchAllIamPoliciesPager <https://cloud.google.com/python/docs/reference/cloudasset/latest/google.cloud.asset_v1.services.asset_service.pagers.SearchAllIamPoliciesPager>`__ :returns: a pager which may be iterated on to retrieve results. contains `IamPolicySearchResult <https://cloud.google.com/python/docs/reference/cloudasset/latest/google.cloud.asset_v1.types.IamPolicySearchResult>`__ objects. """ # noqa E501 _LOGGER.info(f"Searching IAM policies with scope {scope} and query {query}") request = {"scope": scope, "query": query} self._ensure_valid_client() result = self._client.search_all_iam_policies(request=request) if len(result.results) < 1: _LOGGER.warning( f"No IAM policy returned for search_asset_iam_policy({request})" ) return result
def _generate_asset_search_str(self, asset_name): """Generates a query string for ``get_asset`` based on whether or not a project ID is passed. """ match = re.match(_GCP_PROJECT_NUM_REGEX, asset_name) if match: project_id = match.group("project_id") return f'project="{project_id}"' return f'name="{asset_name}"'