Source code for sasctl._services.model_management

#!/usr/bin/env python
# encoding: utf-8
#
# Copyright © 2019, SAS Institute Inc., Cary, NC, USA.  All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import json

from ..utils.decorators import experimental
from .service import Service


[docs] class ModelManagement(Service): """The Model Management API provides basic resources for monitoring performance, comparing models, and running workflow processes. """ _SERVICE_ROOT = "/modelManagement" ( list_performance_definitions, get_performance_definition, update_performance_definition, delete_performance_definition, ) = Service._crud_funcs( "/performanceTasks", "performance task", "performance tasks" ) # TODO: set ds2MultiType
[docs] @classmethod def publish_model( cls, model, destination, name=None, force=False, reload_model_table=False ): """ Parameters ---------- model : str or dict The name or id of the model, or a dictionary representation of the model. destination : str Name of destination to publish the model to. name : str, optional Provide a custom name for the published model. Defaults to None. force : bool, optional Whether to overwrite the model if it already exists in the publishing `destination`. Defaults to False. reload_model_table : bool, optional Whether the model table in CAS should be reloaded. Defaults to False. Returns ------- """ from .model_publish import ModelPublish from .model_repository import ModelRepository mr = ModelRepository() mp = ModelPublish() model_obj = mr.get_model(model) if model_obj is None: model_name = model.name if hasattr(model, "name") else str(model) raise ValueError("Model '{}' was not found.".format(model_name)) # Include refresh in case get_model call does not return enough information model_uri = mr.get_model_link(model_obj, "self", True) # TODO: Verify allowed formats by destination type. # As of 19w04 MAS throws HTTP 500 if name is in invalid format. model_name = name or "{}_{}".format( model_obj["name"].replace(" ", ""), model_obj["id"] ).replace("-", "") request = { "name": model_obj.get("name"), "notes": model_obj.get("description"), "modelContents": [ { "modelName": mp._publish_name(model_name), "sourceUri": model_uri.get("uri"), "publishLevel": "model", } ], "destinationName": destination, } # Publishes a model that has already been registered in the model # repository. # Unlike model_publish service, does not require Code to be specified. r = cls.post( "/publish", json=request, params=dict(force=force, reloadModelTable=reload_model_table), headers={ "Content-Type": "application/vnd.sas.models.publishing.request.asynchronous+json" }, ) return r
[docs] @classmethod def create_performance_definition( cls, table_prefix, project=None, models=None, library_name="Public", name=None, description=None, monitor_champion=False, monitor_challenger=False, max_bins=None, scoring_required=False, all_data=False, save_output=True, output_library=None, autoload_output=False, cas_server=None, trace=False, ): """Create the performance task definition in the model project to monitor model performance. In order to execute the performance task definition, run the execute_performance_definition function with the output from this function. Note that the performance task definition cannot be created if the project is not a valid form of model, or if targetVariable or targetLevel are not defined. Parameters ---------- table_prefix : str The name used for the prefix of the performance data. project : str, optional Name or ID of the project. If no project is specified, it is inferred from the models argument. Defaults to None. models : str, list, or dict, optional The name or id of the model(s), or a dictionary representation of the model(s). For multiple models, input a list of model names, or a list of dictionaries. If no models are specified, all models in the project specified will be used. Defaults to None. library_name : str The library containing the input data, default is 'Public'. name : str, optional The name of the performance task, default is None. description : str, optional The description of the performance task, default is None. monitor_champion : bool, optional Indicates to monitor the project champion model, default is None. monitor_challenger : bool, optional Indicates to monitor challenger models, default is None. max_bins : int, optional The maximum bins number. Must be >= 2. Defaults to 10. scoring_required : bool, optional Whether model scoring must be performed on the input data before performance results can be computed. Should be `False` if target values are included in the `table_prefix` tables. Defaults to `False`. all_data : bool, optional Whether to run the performance job against all matching data tables in `library_name` or just the new tables. Defaults to `False`. save_output : bool, optional Whether to save the computed results to a table in `output_library`. Defaults to True. output_library : str, optional Name of a CASLIB where computed results should be saved. Defaults to 'ModelPerformanceData'. autoload_output : bool, optional Whether computed results should automatically be re-loaded after a CAS server restart, defaults to False. cas_server : str, optional The CAS Server for the monitoring task, default is 'cas-shared-default'. trace : bool, optional Whether to enable trace messages in the SAS job log when executing the performance definition, defaults to False. Returns ------- RestObj The performance task definition schema """ from .model_repository import ModelRepository as mr if not scoring_required and "_" in table_prefix: raise ValueError( "Parameter 'table_prefix' cannot contain underscores." " Received a value of '%s'." % table_prefix ) max_bins = 10 if max_bins is None else int(max_bins) if int(max_bins) < 2: raise ValueError( "Parameter 'max_bins' must be at least 2. Received a value of '%s'." % max_bins ) # Separate single models from multiple models if not isinstance(models, list): models = [models] if not project and not models[0]: raise ValueError( "No project or model specified for performance definition creation.\n" "Please specify at least one of the two values." ) # If no models were specified, search the supplied project for all models elif not models[0]: project = mr.get_project(project) models = mr.list_models(filter=f"eq(projectName, '{project.name}')") else: for i, model in enumerate(models): models[i] = mr.get_model(model) if project: project = mr.get_project(project) else: project = mr.get_project(models[0].projectId) # Ensures that all models are in the same project if not all([model.projectId == project.id for model in models]): raise ValueError( "Not all models are contained within the same project. Try " "specifying a project in the arguments and verify that all models" "are from the same project." ) # Performance data cannot be captured unless certain project properties have been configured. for required in ["targetVariable", "targetLevel"]: if getattr(project, required, None) is None: raise ValueError( "Project %s must have the '%s' property set." % (project.name, required) ) if ( project.get("function") == "classification" and project.get("eventProbabilityVariable") is None ): raise ValueError( "Project %s must have the " "'eventProbabilityVariable' property set." % project.name ) if ( project.get("function") == "prediction" and project.get("predictionVariable") is None ): raise ValueError( "Project %s must have the 'predictionVariable' " "property set." % project.name ) request = { "projectId": project.id, "name": name or project.name + " Performance", "modelIds": [model.id for model in models], "championMonitored": monitor_champion, "challengerMonitored": monitor_challenger, "maxBins": max_bins, "resultLibrary": output_library or "ModelPerformanceData", "includeAllData": all_data, "scoreExecutionRequired": scoring_required, "performanceResultSaved": save_output, "loadPerformanceResult": autoload_output, "dataLibrary": library_name or "Public", "description": description or "Performance definition for model " + ", ".join([model.name for model in models]), "casServerId": cas_server or "cas-shared-default", "dataPrefix": table_prefix, "traceOn": trace, } # If model doesn't specify input/output variables, try to pull from project definition if models[0].get("inputVariables", []): request["inputVariables"] = [ v.get("name") for v in models[0]["inputVariables"] ] request["outputVariables"] = [ v.get("name") for v in models[0]["outputVariables"] ] else: request["inputVariables"] = [ v.get("name") for v in project.get("variables", []) if v.get("role") == "input" ] request["outputVariables"] = [ v.get("name") for v in project.get("variables", []) if v.get("role") == "output" ] return cls.post( "/performanceTasks", json=request, headers={ "Content-Type": "application/vnd.sas.models.performance.task+json" }, )
[docs] @classmethod def execute_performance_definition(cls, definition): """Launches a job to run a performance definition. Parameters ---------- definition : str or dict The id or dictionary representation of a performance definition. Returns ------- RestObj The executing job """ definition = cls.get_performance_definition(definition) return cls.post("/performanceTasks/%s" % definition.id)
[docs] @classmethod @experimental def list_model_workflow_definition(cls): """List all enabled Workflow Processes to execute on Model Project. Returns ------- list of RestObj The list of workflows """ from .workflow import Workflow return Workflow.list_enabled_definitions()
[docs] @classmethod @experimental def list_model_workflow_prompt(cls, workflowName): """List prompt Workflow Processes Definitions. Parameters ---------- workflowName : str Name or ID of an enabled workflow to retrieve inputs Returns ------- list The list of prompts for specific workflow """ from .workflow import Workflow return Workflow.list_workflow_prompt(workflowName)
[docs] @classmethod @experimental def list_model_workflow_executed(cls, projectName): """List prompt Workflow Processes Definitions. Parameters ---------- projectName : str Name of the Project list executed workflow Returns ------- RestObj List of workflows associated to project """ from .model_repository import ModelRepository mr = ModelRepository() project = mr.get_project(projectName) return cls.get( "/workflowProcesses?filter=eq(associations.solutionObjectId,%22" + project["id"] + "%22)" )
[docs] @classmethod @experimental def execute_model_workflow_definition( cls, project_name, workflow_name, prompts=None ): """Runs specific Workflow Processes Definitions. Parameters ---------- project_name : str Name of the Project that will execute workflow workflow_name : str Name or ID of an enabled workflow to execute prompts : dict, optional Input values to provide for the initial workflow prompts. Should be specified as name:value pairs. Returns ------- RestObj The executing workflow .. versionchanged:: 1.8.2 Renamed the `input` parameter to `prompts`. """ from .model_repository import ModelRepository from .workflow import Workflow mr = ModelRepository() project = mr.get_project(project_name) workflow = Workflow.run_workflow_definition(workflow_name, prompts=prompts) # Associate running workflow to model project. # NOTE: workflow has to be running data = { "processName": workflow["name"], "processId": workflow["id"], "objectType": "MM_Project", "solutionObjectName": project_name, "solutionObjectId": project["id"], "solutionObjectUri": "/modelRepository/projects/" + project["id"], "solutionObjectMediaType": "application/vnd.sas.models.project+json", } # Note: you can get a HTTP Error 404: # {"errorCode":74052,"message":"The workflow process for id <> cannot be found. # Associations can only be made to running processes.","details":["correlator: # e62c5562-2b11-45db-bcb7-933200cb0f0a","traceId: 3118c0fb1eb9702d","path: # /modelManagement/workflowAssociations"],"links":[],"version":2,"httpStatusCode":404} # Which is fine and expected like the Visual Experience. return cls.post( "/workflowAssociations", json=data, headers={ "Content-Type": "application/vnd.sas.workflow.object.association+json" }, )
[docs] @classmethod def create_custom_kpi( cls, model, project, timeLabel, kpiName, kpiValue, timeSK=None ): """Post a user supplied custom KPI to a SAS Model Manager project's MM_STD_KPI table. A custom KPI consists of the time label, KPI name, KPI value, and optionally the timeSK. Additionally, the model and project associated with the custom KPI are required. Multiple custom KPIs can be uploaded at once by passing lists in for the four arguments mentioned above. Parameters ---------- model : str or dict The name or id of the model, or a dictionary representation of the model. project : str or dict The name or id of the project, or a dictionary representation of the project. timeLabel : str or list Label associated with the dataset used within the performance definition. kpiName : str or list Name of the custom KPI. kpiValue : int or float or list Value of the custom KPI. timeSK : int or list, by default None Indicator for the MM_STD_KPI table to denote performance task order. """ from .model_repository import ModelRepository mr = ModelRepository() # Step through options to determine project UUID if cls.is_uuid(project): projectId = project elif isinstance(project, dict) and "id" in project: projectId = project["id"] else: project = mr.get_project(project) projectId = project["id"] # Step through options to determine model UUID if cls.is_uuid(model): modelId = model elif isinstance(model, dict) and "id" in model: modelId = model["id"] else: model = mr.list_models( filter=f"and(eq('projectId','{projectId}'), eq('name','{model}'))" ) modelId = model[0]["id"] model = mr.get_model(modelId) # If no timeSK is provided, create a list of 0's for the API call if not timeSK: timeSK = [0] * len(timeLabel) # Create a list of dicts mapped to each custom KPI value customKPI = [ {"TimeLabel": label, "KPI": name, "Value": str(value), "TimeSK": SK} for label, name, value, SK in zip(timeLabel, kpiName, kpiValue, timeSK) ] headers = {"Accept": "application/vnd.sas.collection+json"} requestData = {"ProjectID": projectId, "ModelID": modelId, "KPIs": customKPI} # Include a terminal output, since it can take up to 60 seconds to POST the API print("Uploading custom kpis to SAS Viya...") return cls.post( "/projects/{}/kpis".format(projectId), headers=headers, data=json.dumps(requestData), )