#!/usr/bin/env python
# encoding: utf-8
#
# Copyright © 2019, SAS Institute Inc., Cary, NC, USA. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import os
from typing import TextIO, Union
from .service import Service
QUERY_PARAMETERS = "query parameters"
DEFAULT_SERVER = "cas-shared-default"
DEFAULT_CASLIB = "Public"
[docs]
def check_keys(valid_keys: list, input_keys: list, parameters: str):
"""Compares the input_keys against the valid_keys
to see if they are allowed to be passed
as parameters in the request.
Parameters
----------
valid_keys: list
List of allowed parameters
input_keys: list
List of input parameters
parameters: str
String describing the type of parameters
that are being tested.
Raises
------
ValueError
if input_keys are not valid
"""
if not all(key in valid_keys for key in input_keys):
raise ValueError(
"The only acceptable values for %s are %s" % (parameters, valid_keys)
)
[docs]
def check_required_key(
required_key: Union[str, list], input_keys: list, parameters: str
):
"""Check whether the required parameters
are in the list of input_key.
Parameters
----------
required_key: str or list
Required parameters
input_keys: list
The input parameters
parameters: str
String describing the type of parameters
that are being tested.
Raises
------
ValueError
if required_key is not present.
TypeError
if required_key is neither a list or a string.
"""
if isinstance(required_key, str):
if required_key not in input_keys:
raise ValueError(
"The %s is a required %s parameter." % (required_key, parameters)
)
elif isinstance(required_key, list):
required_set = set(required_key)
input_set = set(input_keys)
if not required_set.issubset(input_set):
raise ValueError(
"The %s are required %s parameters." % (required_key, parameters)
)
else:
raise TypeError(
"Please enter either a list or a string of required parameters."
)
[docs]
class CASManagement(Service):
"""The CAS Management service provides the ability to manage and perform
actions on common resources as they relate to Cloud Analytic Services (CAS).
"""
_SERVICE_ROOT = "/casManagement"
list_servers, get_server, _, _ = Service._crud_funcs("/servers", "server")
[docs]
@classmethod
def list_sessions(cls, query_params: dict = None, server: str = None):
"""
Returns a collection of sessions available on the CAS server.
Parameters
----------
query_params : dict, optional
Query parameters.
Valid keys are `start`, `limit`, `filter`,
`sortBy`, `excludeItemLink`, `sessionId`.
server : str, optional
Name of the CAS server. Defaults to 'cas-shared-default'.
Returns
-------
list
A collection of :class:`.RestObj` instances.
"""
server = server or DEFAULT_SERVER
if query_params is not None:
allowed_query = [
"start",
"limit",
"filter",
"sortBy",
"excludeItemLink",
"sessionId",
]
check_keys(allowed_query, query_params, QUERY_PARAMETERS)
else:
query_params = {}
sess_list = cls.get("/servers/%s/sessions" % server, params=query_params)
return sess_list
[docs]
@classmethod
def create_session(cls, properties: dict, server: str = None):
"""Creates a new session on the CAS server.
Parameters
----------
properties : dict
Properties of the session.
Valid keys are `authenticationType` (required),
`locale`, `name`, `nodeCount`, `timeOut`.
server : str
Name of the CAS server. Defaults to 'cas-shared-default'.
Returns
-------
RestObj
"""
server = server or DEFAULT_SERVER
allowed_body = [
"authenticationType",
"locale",
"name",
"nodeCount",
"replace",
"timeOut",
]
check_keys(allowed_body, properties.keys(), "body")
check_required_key("authenticationType", properties.keys(), "body")
sess = cls.post("/servers/%s/sessions" % (server), json=properties)
return sess
[docs]
@classmethod
def delete_session(
cls, sess_id: str, server: str = None, query_params: dict = None
):
"""Terminates a session on the CAS server.
Parameters
----------
sess_id : str
A string indicating the Session id.
server : str
Name of the CAS server. Defaults to 'cas-shared-default'.
query_params : dict, optional
Query parameters.
Valid keys are `force`, `superUserSessionId`.
Returns
-------
RestObj
"""
server = server or DEFAULT_SERVER
if query_params is not None:
allowed_query = ["force", "superUserSessionId"]
check_keys(allowed_query, query_params, QUERY_PARAMETERS)
else:
query_params = {}
sess = cls.delete(
"/servers/%s/sessions/%s" % (server, sess_id), params=query_params
)
return sess
[docs]
@classmethod
def list_caslibs(cls, server: Union[str, dict], filter_: str = None):
"""List caslibs available on a server.
Parameters
----------
server : str or dict
Name, ID, or dictionary representation of the server.
filter_ : str, optional
A `formatted <https://developer.sas.com/reference/filtering>`_
filter string.
Returns
-------
list
A collection of :class:`.RestObj` instances.
"""
return (
cls._get_rel(server, "caslibs", func=cls.get_server, filter_=filter_) or []
)
[docs]
@classmethod
def get_caslib(cls, name: str, server: str = None):
"""Get a caslib by name.
Parameters
----------
name : str
Name of the caslib
server : str, optional
Name of the CAS server. Defaults to 'cas-shared-default'.
Returns
-------
RestObj or None
"""
server = server or DEFAULT_SERVER
caslibs = cls.list_caslibs(server, filter_='eq($primary,name, "%s")' % name)
if caslibs:
return caslibs[0]
return None
[docs]
@classmethod
def list_tables(
cls, caslib: Union[str, dict], server: str = None, filter_: str = None
):
"""List tables available in a caslib.
Parameters
----------
caslib : str or dict
Name, ID, or dictionary representation of the caslib.
server : str, optional
Server where the `caslib` is registered.
filter_ : str, optional
Filter string in the `https://developer.sas.com/reference/filtering
/` format.
Returns
-------
list
A collection of :class:`.RestObj` instances.
"""
return (
cls._get_rel(caslib, "tables", server, func=cls.get_caslib, filter_=filter_)
or []
)
[docs]
@classmethod
def get_table(cls, name: str, caslib: Union[str, dict] = None, server: str = None):
"""Get a table by name.
Parameters
----------
name : str
Name of the table.
caslib : str or dict, optional
Name, ID, or dictionary representation of the caslib. Defaults to
CASUSER.
server : str, optional
Server where the `caslib` is registered.
Returns
-------
RestObj or None
"""
caslib = caslib or DEFAULT_CASLIB
tables = cls.list_tables(
caslib, server=server, filter_='eq($primary,name, "%s")' % name
)
if tables:
return tables[0]
return None
[docs]
@classmethod
def upload_file(
cls,
file: Union[str, TextIO],
name: str,
caslib: str = None,
server: str = None,
header: bool = None,
format_: str = None,
*,
detail: dict = None
):
"""Upload a file to a CAS table.
Uploads the contents of a CSV, XLS, XLSX, SAS7BDT or SASHDAT file to a
newly created CAS table.
Parameters
----------
file : str or typing.TextIO
File containing data to upload or path to the file.
name : str
Name of the table to create
caslib : str, optional
caslib in which the table will be created. Defaults to 'Public'.
server : str, optional
CAS server on which the table will be created. Defaults to
cas-shared-default.
header : bool, optional
Whether the first row of data contains column headers. Defaults to
True.
format_ : str, optional
Choose from {"csv", "xls", "xlsx", "sas7bdat", "sashdat"}.
File of input `file`. Not required if format can be discerned from
the file path.
detail : dict, optional
Additional body parameters. Allowed parameters are
'sessionId', 'variables', 'label', 'scope', 'replace', 'encoding',
'allowTruncation', 'allowEmbeddedNewLines', 'delimiter',
'varchars', 'scanRows', 'threadCount', 'stripBlanks', 'sheetName',
'password', 'decryptionKey', 'stringLengthMultiplier',
'varcharConversionThreshold'.
Returns
-------
RestObj
Table reference
"""
name = str(name)
caslib = caslib or DEFAULT_CASLIB
server = server or DEFAULT_SERVER
header = True if header is None else bool(header)
# Not a file-like object, assuming it's a file path
if not hasattr(file, "read"):
path = os.path.abspath(os.path.expanduser(file))
format_ = os.path.splitext(path)[-1].lstrip(".").lower()
# Extension should be supported & needs to be explicitly set in
# the "format" parameter to avoid errors.
if format_ not in ("csv", "xls", "xlsx", "sas7bdat", "sashdat"):
raise ValueError("File '%s' has an unsupported file type." % file)
with open(path, "rb") as f:
file = f.read()
data = {
"tableName": name,
"containsHeaderRow": header,
}
if format_ is not None:
data["format"] = format_
allowed_body = [
"sessionId",
"variables",
"label",
"scope",
"replace",
"encoding",
"allowTruncation",
"allowEmbeddedNewLines",
"delimiter",
"varchars",
"scanRows",
"threadCount",
"stripBlanks",
"sheetName",
"password",
"decryptionKey",
"stringLengthMultiplier",
"varcharConversionThreshold",
]
allowed_csv = [
"sessionId",
"variables",
"label",
"scope",
"replace",
"encoding",
"allowTruncation",
"allowEmbeddedNewLines",
"delimiter",
"varchars",
"scanRows",
"threadCount",
"stripBlanks",
]
allowed_xls = [
"sessionId",
"variables",
"label",
"scope",
"replace",
"sheetName",
]
allowed_sas = [
"sessionId",
"variables",
"label",
"scope",
"replace",
"password",
"decryptionKey",
"stringLengthMultiplier",
"varcharConversionThreshold",
]
if detail is not None:
check_keys(allowed_body, detail.keys(), "body")
if format_ == "csv":
check_keys(allowed_csv, detail.keys(), "csv files")
elif format_ in ["xls", "xlsx"]:
check_keys(allowed_xls, detail.keys(), "excel files")
elif format_ in ["sashdat", "sas7bdat"]:
check_keys(allowed_sas, detail.keys(), "sas files")
data.update(detail)
tbl = cls.post(
"/servers/%s/caslibs/%s/tables" % (server, caslib),
data=data,
files={"file": (name, file)},
)
return tbl
[docs]
@classmethod
def update_state_table(
cls,
value: str,
name: str,
caslib: str = None,
server: str = None,
*,
query_params: dict = None,
body: dict = None
):
"""Modifies the state of a table to loaded or unloaded.
Returns loaded or unloaded to indicate the state after the operation.
Parameters
----------
value : str
State to which to set the table. Valid values include `loaded` or `unloaded`.
name : str
Name of the table.
caslib : str, optional
Name of the caslib. Defaults to 'Public'.
server : str, optional
Server where the `caslib` is registered.
Defaults to 'cas-shared-default'.
query_params: dict, optional
Additional query parameters.
Valid keys are `sessionId`, `scope`, `sourceTableName`, `createRelationships`
body : dict, optional
Extra instructions providing greater control over the output when a state change to loaded is requested.
Valid keys are `copies`, `label`, `outputCaslibName`, `outputTableName`, `parameters`,
`replace`, `replaceMode`, `scope`.
Returns
-------
RestObj
"""
server = server or DEFAULT_SERVER
caslib = caslib or DEFAULT_CASLIB
if value in ["loaded", "unloaded"]:
query = {"value": value}
else:
raise ValueError(
"The state can only have values of `loaded` or `unloaded`."
)
if query_params is not None:
allowed_query = [
"sessionId",
"scope",
"sourceTableName",
"createRelationships",
]
check_keys(allowed_query, query_params.keys(), QUERY_PARAMETERS)
query.update(query_params)
if body is not None:
allowed_body = [
"copies",
"label",
"outputCaslibName",
"outputTableName",
"parameters",
"replace",
"replaceMode",
"scope",
]
check_keys(allowed_body, body.keys(), "body parameters")
else:
body = {}
tbl = cls.put(
"/servers/%s/caslibs/%s/tables/%s/state" % (server, caslib, name),
params=query,
json=body,
)
return tbl
[docs]
@classmethod
def save_table(
cls,
name: str,
caslib: str,
properties: dict = None,
sess_id: str = None,
server: str = None,
):
"""Saves a CAS table to a source table
Parameters
----------
name : str
Name of the table.
sess_id: str
The session ID
caslib : str
Name of the caslib.
properties : dict, optional
Properties of the table.
Valid keys are `caslibName`, `format`, `replace`,
`compress`, `tableName`, `sourceTableName`, `parameters`.
server : str
Server where the `caslib` is registered. Defaults to 'cas-shared-default'.
Returns
-------
RestObj
"""
server = server or DEFAULT_SERVER
if properties is not None:
allowed_body = [
"caslibName",
"format",
"replace",
"compress",
"tableName",
"sourceTableName",
"parameters",
]
check_keys(allowed_body, properties.keys(), "body")
else:
properties = {}
query = {"sessionId": sess_id} if sess_id else {}
sess = cls.post(
"/servers/%s/caslibs/%s/tables/%s" % (server, caslib, name),
params=query,
json=properties,
)
return sess
[docs]
@classmethod
def del_table(
cls,
name: str,
query_params: dict = None,
caslib: str = None,
server: str = None,
):
"""Deletes a table from Caslib source. Note that is not an unload.
This operation physically removes the source table (if the source is writable).
For path-based caslibs, this physically removes the file.
Parameters
----------
name : str
Name of the table.
query_params : dict
Query parameters.
The allowed query parameters are `sessionId`,
`sourceTableName`, `quiet`, `removeAcs`.
Note that the last three are required.
caslib : str
Name of the caslib. Defaults to 'Public'
server : str
Server where the `caslib` is registered.
Defaults to 'cas-shared-default'.
Returns
-------
RestObj
Raises
------
ValueError
If `query_params` is empty
"""
server = server or DEFAULT_SERVER
caslib = caslib or DEFAULT_CASLIB
required_queries = ["sourceTableName", "quiet", "removeAcs"]
if query_params is not None:
allowed_query = ["sessionId", "sourceTableName", "quiet", "removeAcs"]
check_keys(allowed_query, query_params.keys(), QUERY_PARAMETERS)
check_required_key(required_queries, query_params.keys(), "query")
else:
raise ValueError(
"You must provide these query parameters: %s" % required_queries
)
tbl = cls.delete(
"servers/%s/caslibs/%s/tables/%s" % (server, caslib, name),
params=query_params,
)
return tbl