#!/usr/bin/env python
# encoding: utf-8
#
# Copyright © 2019, SAS Institute Inc., Cary, NC, USA. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Base functionality for all services."""
import logging
import time
import warnings
from urllib.parse import quote
from .. import core
from ..core import HTTPError, PagedItemIterator, sasctl_command
from ..exceptions import JobTimeoutError
class Service(object): # skipcq PYL-R0205
"""Base class for all services. Should not be used directly."""
_SERVICE_ROOT = None
is_uuid = staticmethod(core.is_uuid)
get_link = staticmethod(core.get_link)
request_link = staticmethod(core.request_link)
log = logging.getLogger(__name__)
@property
def _SERVICE_ROOT(self):
raise NotImplementedError()
@classmethod
def is_available(cls):
"""Check if the service is currently available.
Returns
-------
bool
"""
try:
response = cls.head("/", format_="response")
return response.status_code == 200
except HTTPError:
return False
@classmethod
def info(cls):
"""Version and build information for the service.
Returns
-------
RestObj
"""
return cls.get("/apiMeta")
@classmethod
def request(cls, verb, path, session=None, format_="auto", **kwargs):
"""Send an HTTP request with a session.
Parameters
----------
verb : str
A valid HTTP request verb.
path : str
Path portion of URL to request. Assumed to be relative to
`_SERVICE_ROOT`.
session : Session, optional
Defaults to `current_session()`.
format_ : {'auto', 'response', 'content', 'json', 'text'}
The format of the return response. Defaults to `auto`.
response: the raw `Response` object.
content: Response.content
json: Response.json()
text: Response.text
auto: `RestObj` constructed from JSON if possible, otherwise same as
`text`.
**kwargs
Additional arguments are passed to the session `request` method.
Returns
-------
"""
if path.startswith("/"):
path = cls._SERVICE_ROOT + path
else:
path = cls._SERVICE_ROOT + "/" + path
return core.request(verb, path, session, format_, **kwargs)
@classmethod
def get(cls, *args, **kwargs):
"""Send a GET request."""
try:
return cls.request("get", *args, **kwargs)
except HTTPError as e:
if e.code == 404:
return None # Resource not found
raise e
@classmethod
def head(cls, *args, **kwargs):
"""Send a HEAD request."""
return cls.request("head", *args, **kwargs)
@classmethod
def post(cls, *args, **kwargs):
"""Send a POST request."""
return cls.request("post", *args, **kwargs)
@classmethod
def put(cls, *args, **kwargs):
"""Send a PUT request."""
return cls.request("put", *args, **kwargs)
@classmethod
def delete(cls, *args, **kwargs):
"""Send a DELETE request."""
return cls.request("delete", *args, **kwargs)
@staticmethod
def _crud_funcs(
path, single_term=None, plural_term=None, service_name=None, get_filter=None
):
"""Utility method for defining CRUD functions.
Can be used to define simple functions that perform CRUD operations
on a REST endpoint.
Parameters
----------
path : str
URL path to use for the requests
single_term : str
English name of the item being manipulated.
Defaults to `plural_term`.
plural_term : str
English name of the items being manipulated. Defaults to the last
segment of `path`.
service_name : str
Name of the service under which the command will be listed in
the `sasctl` CLI. Defaults to `plural_term`.
get_filter : callable, optional
A function that accepts an `item` and returns a dictionary
representing quary parameters to be added to the request for
filtering. Defaults to filter=eq(name, item.name).
Returns
-------
functions : tuple
tuple of `classmethod` instances representating CRUD functions:
list_items, get_item, update_item, delete_item
Examples
--------
>>> list_spam, get_spam, update_spam, delete_spam = _build_crud_funcs('/spam')
"""
# Set a default filter
if get_filter is None:
def default_filter(item):
return dict(filter='eq(name, "%s")' % item)
get_filter = default_filter
@sasctl_command("list")
def list_items(cls, filter=None, start=None, limit=None, **kwargs):
"""List all {items} available in the environment.
Parameters
----------
filter : str, optional
start : int, optional
Zero-based index of the first item to return. Defaults to 0.
limit : int, optional
The maximum number of items to return. Defaults to 20.
Returns
-------
list
A list of dictionaries containing the {items}.
Notes
-----
See the filtering_ reference for details on the `filter` parameter.
.. _filtering: https://developer.sas.com/reference/filtering/
"""
if filter is not None:
kwargs["filter"] = filter
if start is not None:
kwargs["start"] = int(start)
if limit is not None:
kwargs["limit"] = int(limit)
params = "&".join(
"%s=%s" % (k, quote(str(v), safe='/(),"')) for k, v in kwargs.items()
)
results = cls.get(path, params=params)
if results is None:
return []
if isinstance(results, (list, PagedItemIterator)):
return results
return [results]
@sasctl_command("get")
def get_item(cls, item, refresh=False):
"""Return a {item} instance.
Parameters
----------
item : str or dict
Name, ID, or dictionary representation of the {item}.
refresh : bool, optional
Obtain an updated copy of the {item}.
Returns
-------
RestObj or None
A dictionary containing the {item} attributes or None.
Notes
-----
If `item` is a complete representation of the {item} it will be
returned unless `refresh` is set. This prevents unnecessary REST
calls when data is already available on the client.
"""
if item is None:
return item
# If the input already appears to be the requested object just
# return it, unless a refresh of the data was explicitly requested.
if isinstance(item, dict) and all(k in item for k in ("id", "name")):
if refresh:
item = item["id"]
else:
return item
if cls.is_uuid(item):
return cls.get(path + "/{id}".format(id=item))
results = list_items(cls, **get_filter(item))
match = None
for result in results:
if result["name"] == str(item):
# The first result that matches on name should be stored.
# Will be returned after determining that there aren't additional
# matches
if match is None:
# Make a request for the specific object so that ETag
# is included, allowing updates.
if cls.get_link(result, "self"):
match = cls.request_link(result, "self")
else:
id_ = result.get("id", result["name"])
match = cls.get(path + "/{id}".format(id=id_))
# We already found a match so this is a duplicate. Warn the user so they know
# the item returned may not be the one they were expecting.
else:
warnings.warn(
"Multiple items found with name '%s'. Only the first result is returned."
% item
)
break
return match
@sasctl_command("update")
def update_item(cls, item):
"""Update a {item} instance.
Parameters
----------
item : dict
Returns
-------
None
"""
headers = getattr(item, "_headers", None)
if headers is None or headers.get("etag") is None:
raise ValueError("Could not find ETag for update of %s." % item)
id_ = getattr(item, "id", None)
if id_ is None:
raise ValueError(
"Could not find property `id` for update of %s." % item
)
headers = {
"If-Match": item._headers.get("etag"),
"Content-Type": item._headers.get("content-type"),
}
return cls.put(path + "/%s" % id_, json=item, headers=headers)
@sasctl_command("delete")
def delete_item(cls, item):
"""Delete a {item} instance.
Parameters
----------
item
Returns
-------
None
"""
item_name = str(item)
# Try to find the item if the id can't be found
if not (isinstance(item, dict) and "id" in item):
item = get_item(cls, item)
if item is None:
cls.log.info("Object '%s' not found. Skipping delete.", item_name)
return
if isinstance(item, dict) and "id" in item:
item = item["id"]
if cls.is_uuid(item):
response = cls.delete(path + "/{id}".format(id=item))
# Response generally seems to be an empty string. If so, just return None
# BUT, if the service provides an actual response, return it.
if response:
return response
return
raise ValueError("Unrecognized id '%s'" % item)
# Pull object name from path if unspecified (many paths end in
# nouns like /folders or /repositories).
plural_term = plural_term or str(path).split("/")[-1]
single_term = single_term or plural_term
service_name = service_name or plural_term
service_name = service_name.replace(" ", "_")
for func in [list_items, get_item, update_item, delete_item]:
func.__doc__ = func.__doc__.format(item=single_term, items=plural_term)
func._cli_service = service_name
prefix = func.__name__.split("_")[0] + "_"
suffix = plural_term if prefix == "list_" else single_term
func.__name__ = prefix + suffix
return [
classmethod(f) for f in (list_items, get_item, update_item, delete_item)
]
# Compatibility with Python 2.7 requires *args to be after key-words
# arguments.
# skipcq: PYL-W1113
@classmethod
def _get_rel(cls, item, rel, *args, func=None, filter_=None):
"""Get `item` and request a link.
Parameters
----------
item : str or dict
rel : str
*args
Passed to `func`
func : function, optional
Callable that takes (item, *args) and returns a RestObj of `item`
filter_ : str, optional
Returns
-------
list
"""
if func is not None:
obj = func(item, *args)
if obj is None:
return None
params = "filter={}".format(filter_) if filter_ is not None else {}
resources = cls.request_link(obj, rel, params=params)
if isinstance(resources, (list, PagedItemIterator)):
return resources
return [resources]
@classmethod
def _monitor_job(cls, job, max_retries=60):
"""Continually poll a job until it reaches the desired status.
Parameters
----------
job : dict
Dictionary representation of a currently execution job
max_retries : int
Maximum number of ties to refresh `job` before failing.
Returns
-------
job : dict
Raises
------
TimeoutError
`max_retries` reached with a successful status check
"""
def completed(job):
return job["state"].lower() in ("completed", "failed")
retries = 0
if cls.get_link(job, "self") is None:
raise ValueError("Link 'self' not found on %s" % job)
# TODO: Log
while not completed(job) and retries < max_retries:
time.sleep(0.5)
retries += 1
job = cls.request_link(job, "self")
if completed(job):
return job
raise JobTimeoutError("Timeout while waiting on job %s" % job)