Source code for sasctl._services.saslogon
#!/usr/bin/env python
# encoding: utf-8
#
# Copyright © 2022, SAS Institute Inc., Cary, NC, USA.  All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""The SAS Logon service provides standard OAuth endpoints for client management."""
from ..core import HTTPError
from .service import Service
[docs]
class SASLogon(Service):
    """The SAS Logon service client management related endpoints.
    Provides functionality for managing client IDs and secrets  This class
    is somewhat different from the other Service classes because many of
    the operations on the associated SA SLogon REST service are related to
    authentication.  In sasctl all authentication is handled in the
    `Session` class, so only the operations that are not related to
    authentication are implemented here.
    The operations provided by this service are only accessible to users
    with administrator permissions.
    """
    _SERVICE_ROOT = "/SASLogon"
[docs]
    @classmethod
    def create_client(
        cls,
        client_id,
        client_secret,
        scopes=None,
        redirect_uri=None,
        allow_password=False,
        allow_client_secret=False,
        allow_auth_code=False,
    ):
        """Register a new client with the SAS Viya environment.
        Parameters
        ----------
        client_id : str
            The ID to be assigned to the client.
        client_secret : str
            The client secret used for authentication.
        scopes : list of str, optional
            Specifies the levels of access that the client will be able to
            obtain on behalf of users when not using client credential
            authentication.  If `allow_password` or `allow_auth_code` are
            true, the 'openid' scope will also be included.  This is used
            to assert the identity of the user that the client is acting on
            behalf of.  For clients that only use client credential
            authentication and therefore do not act on behalf of users,
            the 'uaa.none' scope will automatically be included.
        redirect_uri : str, optional
            The allowed URI pattern for redirects during authorization.
            Defaults to 'urn:ietf:wg:oauth:2.0:oob'.
        allow_password : bool, optional
            Whether to allow username & password authentication with this
            client.  Defaults to false.
        allow_client_secret : bool
            Whether to allow authentication using just the client ID and
            client secret.  Defaults to false.
        allow_auth_code : bool, optional
            Whether to allow authorization code access using this client.
            Defaults to false.
        Returns
        -------
        RestObj
        """
        scopes = set() if scopes is None else set(scopes)
        # Include default scopes depending on allowed grant types
        if allow_password or allow_auth_code:
            scopes.add("openid")
        elif allow_client_secret:
            scopes.add("uaa.none")
        else:
            raise ValueError("At least one authentication method must be allowed.")
        redirect_uri = redirect_uri or "urn:ietf:wg:oauth:2.0:oob"
        grant_types = set()
        if allow_auth_code:
            grant_types.update(["authorization_code", "refresh_token"])
        if allow_client_secret:
            grant_types.add("client_credentials")
        if allow_password:
            grant_types.update(["password", "refresh_token"])
        data = {
            "client_id": client_id,
            "client_secret": client_secret,
            "scope": list(scopes),
            "authorized_grant_types": list(grant_types),
            "redirect_uri": redirect_uri,
        }
        # Use access token to define a new client, along with client secret & allowed
        # authorization types (auth code)
        response = cls.post("/oauth/clients", json=data)
        return response 
[docs]
    @classmethod
    def delete_client(cls, client):
        """Remove and existing client.
        Parameters
        ----------
        client : str or RestObj
            The client ID or a RestObj containing the client details.
        Returns
        -------
        RestObj
            The deleted client
        Raises
        ------
        ValueError
            If `client` is not found.
        """
        id_ = client.get("client_id") if isinstance(client, dict) else str(client)
        try:
            return cls.delete(f"/oauth/clients/{id_}")
        except HTTPError as e:
            if e.code == 404:
                raise ValueError(f"Client with ID '{id_}' not found.") from e
            raise 
[docs]
    @classmethod
    def get_client(cls, client_id):
        """Retrieve information about a specific client
        Parameters
        ----------
        client_id : str
            The id of the client.
        Returns
        -------
        RestObj or None
        """
        return cls.get(f"/oauth/clients/{client_id}") 
[docs]
    @classmethod
    def list_clients(cls, start_index=None, count=None, descending=False):
        """Retrieve a details of multiple clients.
        Parameters
        ----------
        start_index : int, optional
            Index of first client to return.  Defaults to 1.
        count : int, optional
            Number of clients to retrieve.  Defaults to 100.
        descending : bool, optional
            Whether to clients should be returned in descending order.
        Returns
        -------
        list of dict
            Each dict contains details for a single client.  If no
            clients were found and empty list is returned.
        """
        params = {}
        if start_index:
            params["startIndex"] = int(start_index)
        if count:
            params["count"] = int(count)
        if descending:
            params["sortOrder"] = "descending"
        results = cls.get("/oauth/clients", params=params)
        if results is None:
            return []
        # Response does not conform to format expected by PagedList (items
        # under an 'items' property and a URL to request additional items).
        # Instead, just return the raw list.
        return results["resources"] 
[docs]
    @classmethod
    def update_client_secret(cls, client, secret):
        """
        Parameters
        ----------
        client : str or RestObj
            The client ID or a RestObj containing the client details.
        secret : str
            The new client secret.
        Returns
        -------
        None
        Raises
        ------
        ValueError
            If `client` is not found.
        """
        id_ = client.get("client_id") if isinstance(client, dict) else str(client)
        data = {"secret": secret}
        try:
            # Ignoring response ({"status": "ok", "message": "secret updated"})
            _ = cls.put(f"/oauth/clients/{id_}/secret", json=data)
        except HTTPError as e:
            if e.code == 404:
                raise ValueError(f"Client with ID '{id_}' not found.") from e
            raise