Source code for sasctl.pzmm.write_score_code

# Copyright (c) 2020, SAS Institute Inc., Cary, NC, USA.  All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import re
import textwrap
from pathlib import Path
from typing import Any, Callable, Generator, List, Optional, Tuple, Union
from warnings import warn

import pandas as pd
from pandas import DataFrame

from .._services.model_repository import ModelRepository as mr
from ..core import RestObj, current_session

MAS_CODE_NAME = "dmcas_packagescorecode.sas"
CAS_CODE_NAME = "dmcas_epscorecode.sas"


[docs] class ScoreCode: def __init__(self): self.score_code = ""
[docs] def write_score_code( self, model_prefix: str, input_data: Union[DataFrame, List[dict]], predict_method: Union[Callable[..., List], List[Any]], target_variable: Optional[str] = None, target_values: Optional[List] = None, score_metrics: Optional[List[str]] = None, predict_threshold: Optional[float] = None, model: Union[str, dict, RestObj, None] = None, pickle_type: str = "pickle", missing_values: Union[bool, list, DataFrame] = False, score_cas: Optional[bool] = True, score_code_path: Union[Path, str, None] = None, target_index: Optional[int] = None, preprocess_function: Optional[Callable[[DataFrame], DataFrame]] = None, **kwargs, ) -> Union[dict, None]: """ Generates Python score code based on training data used to create the model object. If a score_code_path argument is included, then a Python file is written to disk and can be included in the zip archive that is imported or registered into the common model repository. If no path is provided, then a dictionary is returned with the relevant score code files as strings. The following files are generated by this function if score_code_path: - '\*_score.py' - The Python score code file for the model. - 'dcmas_epscorecode.sas' (for SAS Viya 3.5 models) - Python score code wrapped in DS2 and prepared for CAS scoring or\ publishing. - 'dmcas_packagescorecode.sas' (for SAS Viya 3.5 models) - Python score code wrapped in DS2 and prepared for SAS Microanalytic \ Score scoring or publishing. The function determines the type of model based on the following arguments: output_variables, target_values, predict_threshold. As an example, consider the popular iris dataset, in which the input dataset contains a number of flower features and their numerical values. For a binary classification model, where the model is determining if a flower is or is not the `setosa` species, the following can be passed: - score_metrics = ["Setosa"] or ["Setosa", "Setosa_Proba"], - target_values = ["1", "0"], - predict_threshold = ["0.4"] For a multi-classification model, where the model is determining if a flower is one of three species, the following can be passed: - score_metrics = ["Species"] or ["Species", "Setosa_Proba",\ "Versicolor_Proba", "Virginica_Proba"] - target_values = ["Setosa", "Versicolor", "Virginica"] - predict_threshold = None Disclaimer: The score code that is generated is designed to be a working template for any Python model, but is not guaranteed to work out of the box for scoring, publishing, or validating the model. Parameters ---------- model_prefix : str The variable for the model name that is used when naming model files. (For example: hmeqClassTree + [Score.py || .pickle]). input_data : pandas.DataFrame or list of dict The :class:`pandas.DataFrame` object contains the training data, and includes only the predictor columns. The write_score_code function currently supports int(64), float(64), and string data types for scoring. Providing a list of dict objects signals that the model files are being created from an MLFlow model. predict_method : Callable or list of Any The Python function used for model predictions and the expected output types. The expected output types can be passed as example values or as the value types. For example, if the model is a Scikit-Learn DecisionTreeClassifier, then pass either of the following: * [sklearn.tree.DecisionTreeClassifier.predict, ["A"]] * [sklearn.tree.DecisionTreeClassifier.predict_proba, [0.4, float]] target_variable : str, optional Target variable to be predicted by the model. The default value is None. target_values : list of str, optional A list of target values for the target variable. The default value is None. score_metrics : list of str, optional The scoring metrics for the model. For classification models, it is assumed that the first value in the list represents the classification output. This function supports single- and multi- classification models as well as prediction models. The default value is None. predict_threshold : float, optional The prediction threshold for normalized probability output_variables. Values are expected to be between 0 and 1. The default value is None. model : str, dict, or RestObj, optional The name or id of the model, or a dictionary representation of the model. The default value is None and is only necessary for models that will be hosted on SAS Viya 3.5. pickle_type : str, optional Indicator for the package used to serialize the model file to be uploaded to SAS Model Manager. The default value is `pickle`. missing_values : bool, list, or dict, optional Sets whether data handled by the score code will impute for missing values. If set to True, then the function determines the imputed values based on the input_data argument. In order to set the imputation values, pass a dict with variable and value key-value pairs or a list of values in the same variable order as the input_data argument. The default value is False. score_cas : bool, optional Sets whether models registered to SAS Viya 3.5 should be able to be scored and validated through both CAS and SAS Micro Analytic Service. If set to false, then the model will only be able to be scored and validated through SAS Micro Analytic Service. The default value is True. score_code_path : str or pathlib.Path, optional Path for output score code file(s) to be generated. If no value is supplied a dict is returned instead. The default value is None. target_index : int, optional Sets the index of success for a binary model. If target_values are given, this index should match the index of the target outcome in target_values. If target_values are not given, this index should indicate whether the the target probability variable is the first or second variable returned by the model. The default value is 1. **kwargs Other keyword arguments are passed to one of the following functions: * sasctl.pzmm.ScoreCode._write_imports(pickle_type, mojo_model=None, binary_h2o_model=None, binary_string=None) * sasctl.pzmm.ScoreCode._viya35_model_load(model_id, pickle_type, model_file_name, mojo_model=None, binary_h2o_model=None) * sasctl.pzmm.ScoreCode._viya4_model_load(pickle_type, model_file_name, mojo_model=None, binary_h2o_model=None) * sasctl.pzmm.ScoreCode._predict_method(predict_method, input_var_list, dtype_list=None, statsmodels_model=None) * sasctl.pzmm.ScoreCode._predictions_to_metrics(output_variables, target_values=None, predict_threshold=None, h2o_model=None) """ # Extract the variable names and types from the input data input_var_list, input_dtypes_list = self._input_var_lists(input_data) model_id = self._check_viya_version(model) sanitized_model_prefix = self.sanitize_model_prefix(model_prefix) # Set the model_file_name based on kwargs input if "model_file_name" in kwargs and "binary_string" in kwargs: raise ValueError( "Please specify either the binary_string or the model_file_name " "argument. This function does not support a binary string model and a " "serialized model file within the same model." ) elif "model_file_name" in kwargs: model_file_name = kwargs["model_file_name"] binary_string = None elif "binary_string" in kwargs: model_file_name = None binary_string = kwargs["binary_string"] else: raise ValueError( "Either the binary_string or the model_file_name argument needs to be" " specified in order to generate the score code." ) # Add the core imports to the score code with the specified model serializer self._write_imports( pickle_type, mojo_model="mojo_model" in kwargs, binary_h2o_model="binary_h2o_model" in kwargs, tf_model="tf_keras_model" in kwargs or "tf_core_model" in kwargs, binary_string=binary_string, ) # Generate model loading code for SAS Viya 3.5 models without binary strings if model_id and not binary_string: model_load = self._viya35_model_load( model_id, model_file_name, pickle_type=pickle_type, mojo_model="mojo_model" in kwargs, binary_h2o_model="binary_h2o_model" in kwargs, ) # As above, but for SAS Viya 4 models elif not binary_string: model_load = self._viya4_model_load( model_file_name, pickle_type=pickle_type, mojo_model="mojo_model" in kwargs, binary_h2o_model="binary_h2o_model" in kwargs, tf_keras_model="tf_keras_model" in kwargs, tf_core_model="tf_core_model" in kwargs, ) else: model_load = None # Define the score function using the variables found in input_data self.score_code += f"def score({', '.join(input_var_list)}):\n" """ def score(var1, var2, var3, var4): """ if not score_metrics: score_metrics = self._determine_score_metrics( predict_method[1], target_variable, target_values ) # Set the output variables in the line below from score_metrics self.score_code += f"{'':4}\"Output: {', '.join(score_metrics)}\"\n\n" """ "Output: classification_variable, prediction_variable" """ # Run a try/except block to catch errors for model loading (skip binary string) if model_load: self.score_code += ( f"{'':4}try:\n{'':8}global model\n{'':4}" f"except NameError:\n{model_load}\n" ) """ try: global model except NameError: with open(settings.pickle_path + "model.pickle", "rb") as pickle_file: model = pickle.load(pickle_file) """ # Create the appropriate style of input array and write out the predict method if any(x in ["mojo_model", "binary_h2o_model"] for x in kwargs): self._predict_method( predict_method[0], input_var_list, missing_values=missing_values, dtype_list=input_dtypes_list, preprocess_function=preprocess_function, ) self._predictions_to_metrics( score_metrics, predict_method[1], target_values=target_values, predict_threshold=predict_threshold, target_index=target_index, h2o_model=True, ) else: self._predict_method( predict_method[0], input_var_list, missing_values=missing_values, statsmodels_model="statsmodels_model" in kwargs, tf_model="tf_keras_model" in kwargs or "tf_core_model" in kwargs, preprocess_function=preprocess_function, ) # Include check for numpy values and a conversion operation as needed self.score_code += ( f"\n{'':4}# Check for numpy values and convert to a CAS readable " f"representation\n" f"{'':4}if isinstance(prediction, np.ndarray):\n" f"{'':8}prediction = prediction.tolist()\n\n" ) """ # Check for numpy values and conver to a CAS readable representation if isinstance(prediction, np.ndarray): prediction = prediction.tolist() """ self._predictions_to_metrics( score_metrics, predict_method[1], target_values=target_values, predict_threshold=predict_threshold, target_index=target_index, ) if missing_values: self._impute_missing_values(input_data, missing_values) if preprocess_function: self._add_preprocess_code(preprocess_function) # SAS Viya 3.5 model if model_id: mas_code, cas_code = self._viya35_score_code_import( model_prefix, model_id, score_cas ) if score_code_path: py_code_path = Path(score_code_path) / f"score_{sanitized_model_prefix}.py" with open(py_code_path, "w") as py_file: py_file.write(self.score_code) if model_id and score_cas: with open(Path(score_code_path) / MAS_CODE_NAME, "w") as sas_file: # noinspection PyUnboundLocalVariable sas_file.write(mas_code) with open(Path(score_code_path) / CAS_CODE_NAME, "w") as sas_file: # noinspection PyUnboundLocalVariable sas_file.write(cas_code) else: output_dict = {f"score_{sanitized_model_prefix}.py": self.score_code} if model_id and score_cas: # noinspection PyUnboundLocalVariable output_dict[MAS_CODE_NAME] = mas_code # noinspection PyUnboundLocalVariable output_dict[CAS_CODE_NAME] = cas_code return output_dict
[docs] @staticmethod def upload_and_copy_score_resources( model: Union[str, dict, RestObj], files: List[Any] ) -> RestObj: """ Upload score resources to SAS Model Manager and copy them to the Compute server. Parameters ---------- model : str, dict, or RestObj The name or id of the model, or a dictionary representation of the model. files : list of Any The list of score resource files to upload. Returns ------- RestObj API response to the call to copy resources to the Compute server. """ for file in files: mr.add_model_content(model, **file) return mr.copy_python_resources(model)
@staticmethod def _get_model_id(model: Union[str, dict, RestObj]) -> str: """ Get the model uuid from SAS Model Manager. Parameters ---------- model : str, dict, or RestObj The name or id of the model, or a dictionary representation of the model. Returns ------- model_id : str UUID representation of the model from SAS Model Manager. """ if not model: raise ValueError( "No model identification was provided. Python score code" " generation for SAS Viya 3.5 requires the model's UUID." ) else: model_response = mr.get_model(model) try: model_id = model_response["id"] except TypeError: raise ValueError( "No model could be found using the model argument provided." ) return model_id @staticmethod def _check_for_invalid_variable_names(var_list: List[str]) -> None: """ Check for invalid variable names in the input dataset. Input data predictors must be valid Python variable names in order for the score code to be executed. Parameters ---------- var_list : list of str A list of strings pulled from the input dataset. Raises ------ SyntaxError If an invalid variable name is supplied. """ invalid_variables = [] for name in var_list: if not str(name).isidentifier(): invalid_variables.append(str(name)) if len(invalid_variables) > 0: raise SyntaxError( f"The following are not valid variable names: " f"{', '.join(invalid_variables)}. Please confirm that all variable " f"names can be used as Python variables. " f"E.g. `str(name).isidentifier() == True`." ) def _write_imports( self, pickle_type: Optional[str] = None, mojo_model: Optional[bool] = False, binary_h2o_model: Optional[bool] = False, tf_model: Optional[bool] = False, binary_string: Optional[str] = None, ) -> None: """ Write the import section of the Python score code. The session connection to SAS Viya is utilized to determine if the settings package used solely in SAS Viya 4 is needed. Parameters ---------- pickle_type : str, optional Indicator for the package used to serialize the model file to be uploaded to SAS Model Manager. The default value is `pickle`. mojo_model : bool, optional Flag to indicate that the model is a H2O.ai MOJO model. The default value is None. binary_h2o_model : bool, optional Flag to indicate that the model is a H2O.ai binary model. The default value is None. tf_model : bool, optional Flag to indicate that the model is a tensorflow model. The default value is None. binary_string : str, optional A binary representation of the Python model object. The default value is None. """ pickle_type = pickle_type if pickle_type else "pickle" self.score_code += ( f"import math\nimport {pickle_type}\nimport pandas as pd\n" "import numpy as np\nfrom pathlib import Path\n\n" ) """ import math import pickle import pandas as pd import numpy as np from pathlib import Path """ try: if current_session().version_info() != 3.5: self.score_code += "import settings\n\n" """ import settings """ except AttributeError: warn( "No current session connection was found to a SAS Viya server. Score " "code will be written under the assumption that the target server is " "SAS Viya 4." ) self.score_code += "import settings\n\n" if mojo_model or binary_h2o_model: self.score_code += "import h2o\n\nh2o.init()\n\n" """ import h2o h2o.init() """ elif tf_model: self.score_code += "import tensorflow as tf\n\n" """ import tensorflow as tf """ elif binary_string: self.score_code += ( f'import codecs\n\nbinary_string = "{binary_string}"' f"\nmodel = {pickle_type}.loads(codecs.decode(binary_string" '.encode(), "base64"))\n\n' ) """ import codecs binary_string = "<binary string>" model = pickle.load(codecs.decode(binary_string.encode(), "base64")) """ def _viya35_model_load( self, model_id: str, model_file_name: str, pickle_type: Optional[str] = None, mojo_model: Optional[bool] = False, binary_h2o_model: Optional[bool] = False, ) -> str: """ Write the model load section of the score code assuming the model is being uploaded to SAS Viya 3.5. Parameters ---------- model_id : str UUID representation of the model from SAS Model Manager. model_file_name : str Name of the model file that contains the model. pickle_type : str, optional Indicator for the package used to serialize the model file to be uploaded to SAS Model Manager. The default value is `pickle`. mojo_model : bool, optional Flag to indicate that the model is a H2O.ai MOJO model. The default value is None. binary_h2o_model : bool, optional Flag to indicate that the model is a H2O.ai binary model. The default value is None. Returns ------- str Preformatted string for the next section of score code. """ pickle_type = pickle_type if pickle_type else "pickle" if mojo_model: self.score_code += ( f"model = h2o.import_mojo(str(Path(" f'"/models/resources/viya/{model_id}/{model_file_name}")))\n\n' ) """ model = h2o.import_mojo(str(Path("/models/resources/viya/<UUID>/model.mojo"))) """ return ( f"{'':8}model = h2o.import_mojo(str(Path(" f'"/models/resources/viya/{model_id}/{model_file_name}")))' ) elif binary_h2o_model: self.score_code += ( f'model = h2o.load(str(Path("/models/resources/viya/' f'{model_id}/{model_file_name}")))\n\n' ) """ model = h2o.load(str(Path("/models/resources/viya/<UUID>/model.h2o"))) """ return ( f"{'':8}model = h2o.load(str(Path(\"/models/resources/viya/" f'{model_id}/{model_file_name}")))' ) else: self.score_code += ( f'model_path = Path("/models/resources/viya/{model_id}' f'")\nwith open(model_path / "{model_file_name}", ' f"\"rb\") as pickle_model:\n{'':4}model = {pickle_type}" ".load(pickle_model)\n\n" ) """ model_path = Path("/models/resources/viya/<UUID>") with open(model_path / "model.pickle", "rb") as pickle_model: model = pickle.load(pickle_model) """ return ( f"{'':8}model_path = Path(\"/models/resources/viya/{model_id}" f"\")\n{'':8}with open(model_path / \"{model_file_name}\", " f"\"rb\") as pickle_model:\n{'':12}model = {pickle_type}" ".load(pickle_model)" ) def _viya4_model_load( self, model_file_name: str, pickle_type: Optional[str] = None, mojo_model: Optional[bool] = False, binary_h2o_model: Optional[bool] = False, tf_keras_model: Optional[bool] = False, tf_core_model: Optional[bool] = False, ) -> str: """ Write the model load section of the score code assuming the model is being uploaded to SAS Viya 4. Parameters ---------- model_file_name : string Name of the model file that contains the model. pickle_type : string, optional Indicator for the package used to serialize the model file to be uploaded to SAS Model Manager. The default value is `pickle`. mojo_model : boolean, optional Flag to indicate that the model is a H2O.ai MOJO model. The default value is None. binary_h2o_model : boolean, optional Flag to indicate that the model is a H2O.ai binary model. The default value is None. tf_keras_model : boolean, optional Flag to indicate that the model is a tensorflow keras model. The default value is False. tf_core_model : boolean, optional Flag to indicate that the model is a tensorflow core model. The default value is False. """ pickle_type = pickle_type if pickle_type else "pickle" if mojo_model: self.score_code += ( f"model = h2o.import_mojo(str(Path(settings.pickle_path" f') / "{model_file_name}"))\n\n' ) """ model = h2o.import_mojo(str(Path(settings.pickle_path) / "model.mojo")) """ return ( f"{'':8}model = h2o.import_mojo(str(Path(settings.pickle_path) / " f'"{model_file_name}"))\n\n' ) elif binary_h2o_model: self.score_code += ( f"model = h2o.load(str(Path(settings.pickle_path) / " f"{model_file_name}))\n\n" ) """ model = h2o.load(str(Path(settings.pickle_path) / "model.h2o")) """ return ( f"{'':8}model = h2o.load(str(Path(settings.pickle_path) / " f"{model_file_name}))\n\n" ) elif tf_keras_model: self.score_code += ( f"model = tf.keras.models.load_model(Path(settings.pickle_path) / " f"\"{str(Path(model_file_name).with_suffix('.h5'))}\", " f"safe_mode=True)\n\n" ) """ model = tf.keras.models.load_model(Path(settings.pickle_path) / "model.h5", safe_mode=True) """ return ( f"{'':8}model = tf.keras.models.load_model(Path(settings.pickle_path) " f"/ \"{str(Path(model_file_name).with_suffix('.h5'))}\", " f"safe_mode=True)\n" ) else: self.score_code += ( f"with open(Path(settings.pickle_path) / " f'"{model_file_name}", "rb") as pickle_model:\n' f"{'':4}model = {pickle_type}.load(pickle_model)\n\n" ) """ with open(Path(settings.pickle_path) / "model.pickle", "rb") as pickle_model: model = pickleload(pickle_model) """ return ( f"{'':8}with open(Path(settings.pickle_path) / " f'"{model_file_name}", "rb") as pickle_model:\n' f"{'':12}model = {pickle_type}.load(pickle_model)\n\n" ) def _impute_missing_values( self, data: DataFrame, missing_values: Union[bool, list, dict] ) -> None: """ Write the missing value imputation function of the score code. This section of the score code is optional and is in a separate function at the bottom of the generated score code. Parameters ---------- data : pandas.DataFrame Input dataset for model training or predictions. missing_values : bool, list, or dict """ self.score_code += "\n\ndef impute_missing_values(data):\n" """ def impute_missing_values(data): """ if isinstance(missing_values, bool): numeric_columns = [ col for col in data.columns if pd.api.types.is_numeric_dtype(data[col]) ] character_columns = data.columns.difference(numeric_columns).tolist() binary_columns = [] for col in data.columns: unique_values = data[col].dropna().unique() if len(unique_values) == 2 and all( value in [0, 1] for value in unique_values ): binary_columns.append(col) numeric_columns = list(set(numeric_columns) - set(binary_columns)) character_columns = list(set(character_columns) - set(binary_columns)) impute_values = {} for col in data[numeric_columns]: impute_values[col] = data[col].mean() for col in data[character_columns]: impute_values[col] = "" for col in data[binary_columns]: impute_values[col] = data[col].mode().iloc[0] elif isinstance(missing_values, list): impute_values = {} for col, imp_val in zip(data.columns.tolist(), missing_values): impute_values[col] = imp_val else: impute_values = missing_values self.score_code += f"{'':4}impute_values = \\\n" + self._wrap_indent_string( impute_values, 8 ) """ impute_values = \\\n + {"var1": 0, "var2": "", "var3": 125.3} """ self.score_code += f"\n{'':4}return data.replace(' .', np.nan).fillna(impute_values).apply(pd.to_numeric, errors='ignore')\n" """ return data.replace(' .', np.nan).fillna(impute_values).apply(pd.to_numeric, errors='ignore') """ # TODO: Needs unit test @staticmethod def _wrap_indent_string(text, indent=0): """ Use the textwrap package to wrap and indent strings longer than 88 characters in width. The indent value is subtracted from 88 to determine the correct length. Parameters ---------- text : any value accepted by builtins.str() String text to be wrapped and indented. indent : int, optional Indent length for the wrapped text. Default value is 0. Returns ------- str Wrapped and indented string. """ wrapped_lines = textwrap.fill(str(text), width=88 - indent).split("\n") if indent > 0: return "\n".join(f"{'':{indent}}" + line for line in wrapped_lines) else: return "\n".join(line for line in wrapped_lines) def _predict_method( self, method: Callable[..., List], var_list: List[str], dtype_list: Optional[List[str]] = None, missing_values: Optional[Any] = None, statsmodels_model: Optional[bool] = False, tf_model: Optional[bool] = False, preprocess_function: Optional[Callable[[DataFrame], DataFrame]] = None, ) -> None: """ Write the model prediction section of the score code. Parameters ---------- method : function -> list The Python function used for model predictions. var_list : list of str List of variable names. dtype_list : list of str, optional List of variable data types. The default value is None. missing_values : any, optional Flag for indicating if missing values should be imputed. The default value is None. statsmodels_model : bool, optional Flag to indicate that the model is a statsmodels model. The default value is False. tf_model : bool, optional Flag to indicate that the model is a tensorflow model. The default value is False. """ self.score_code += ( f"{'':4}index=None\n" f"{'':4}if not isinstance({var_list[0]}, pd.Series):\n" f"{'':8}index=[0]\n" ) """ index=None if not isinstance(var1, pd.Series): index=[0] """ # H2O models if dtype_list: column_types = "{" for var, dtype in zip(var_list, dtype_list): if any(x in dtype for x in ["int", "float"]): col_type = "numeric" else: col_type = "string" column_types += f'"{var}": "{col_type}", ' column_types = column_types.rstrip(", ") column_types += "}" input_dict = [f'"{var}": {var}' for var in var_list] self.score_code += f"{'':4}input_array = pd.DataFrame(\n" input_frame = f'{{{", ".join(input_dict)}}}, index=index' self.score_code += self._wrap_indent_string(input_frame, 8) self.score_code += f"\n{'':4})\n" if missing_values: self.score_code += ( f"{'':4}input_array = impute_missing_values(input_array)\n" ) if preprocess_function: self.score_code += ( f"{'':4}input_array = {preprocess_function.__name__}(input_array)\n" ) self.score_code += ( f"{'':4}column_types = {column_types}\n" f"{'':4}h2o_array = h2o.H2OFrame(input_array, " f"column_types=column_types)\n{'':4}prediction = " f"model.{method.__name__}(h2o_array)\n{'':4}prediction" f" = h2o.as_list(prediction, use_pandas=prediction.shape[0]>1)\n" ) """ input_array = pd.DataFrame( {"var1": var1, "var2": var2, "var3": var3} ) input_array = impute_missing_values(input_array) column_types = {"var1": "string", "var2": "numeric", "var3": "numeric"} h2o_array = h2o.H2OFrame(input_array, column_types=column_types) prediction = model.predict(h2o_array) prediction = h2o.as_list(prediction, use_pandas=False) """ # Statsmodels models elif statsmodels_model: var_list.insert(0, "const") input_dict = [f'"{var}": {var}' for var in var_list] self.score_code += ( f"{'':4}if not isinstance(\"{var_list[0]}\", pd.Series):\n" f"{'':8}const = 1\n" f"{'':4}else:\n" f"{'':8}const = pd.Series([1 for x in len({var_list[0]})])" ) """ if not isinstance("var1", pd.Series): const = 1 else: const = pd.Series([1 for x in len(var1)]) """ self.score_code += f"{'':4}input_array = pd.DataFrame(\n" input_frame = f'{{{", ".join(input_dict)}}}, index=index' self.score_code += self._wrap_indent_string(input_frame, 8) self.score_code += f"\n{'':4})\n" if missing_values: self.score_code += ( f"{'':4}input_array = impute_missing_values(input_array)\n" ) if preprocess_function: self.score_code += ( f"{'':4}input_array = {preprocess_function.__name__}(input_array)\n" ) self.score_code += ( f"{'':4}prediction = model.{method.__name__}(input_array)\n" ) """ input_array = pd.DataFrame( {"const": const, "var1": var1, "var2": var2, "var3": var3} ) input_array = impute_missing_values(input_array) prediction = model.predict(input_array) """ elif tf_model: input_dict = [f'"{var}": {var}' for var in var_list] self.score_code += f"{'':4}input_array = pd.DataFrame(\n" input_frame = f'{{{", ".join(input_dict)}}}, index=index' self.score_code += self._wrap_indent_string(input_frame, 8) self.score_code += f"\n{'':4})\n" if missing_values: self.score_code += ( f"{'':4}input_array = impute_missing_values(input_array)\n" ) if preprocess_function: self.score_code += ( f"{'':4}input_array = {preprocess_function.__name__}(input_array)\n" ) self.score_code += ( f"{'':4}prediction = model.{method.__name__}(input_array)\n\n" f"{'':4} # Check if model returns logits or probabilities\n" f"{'':4}if not math.isclose(sum(predictions[0]), 1, rel_tol=.01):\n" f"{'':8}predictions = [tf.nn.softmax(p).numpy().tolist() for p in " f"predictions]\n{'':4}else:\n" f"{'':8}predictions = [p.tolist() for p in predictions]\n" ) """ input_array = pd.DataFrame( {"var1": var1, "var2": var2, "var3": var3} ) input_array = impute_missing_values(input_array) prediction = model.predict(input_array) # Check if model returns logits or probabilities if not math.isclose(sum(predictions[0]), 1, rel_tol=.01): predictions = [tf.nn.softmax(p).numpy().tolist() for p in predictions] else: predictions = [p.tolist() for p in predictions] """ else: input_dict = [f'"{var}": {var}' for var in var_list] self.score_code += f"{'':4}input_array = pd.DataFrame(\n" input_frame = f'{{{", ".join(input_dict)}}}, index=index' self.score_code += self._wrap_indent_string(input_frame, 8) self.score_code += f"\n{'':4})\n" if missing_values: self.score_code += ( f"{'':4}input_array = impute_missing_values(input_array)\n" ) if preprocess_function: self.score_code += ( f"{'':4}input_array = {preprocess_function.__name__}(input_array)\n" ) self.score_code += ( f"{'':4}prediction = model.{method.__name__}(input_array).tolist()\n" ) """ input_array = pd.DataFrame( {"var1": var1, "var2": var2, "var3": var3} ) input_array = impute_missing_values(input_array) prediction = model.predict(input_array) """ @classmethod def _determine_score_metrics( cls, predict_returns: List[Any], target_variable: Optional[str] = None, target_values: Optional[List] = None, ) -> List[str]: """ Using the types from the prediction method returns in `predict_method`, create output score metrics for the score code. If no target_variable is provided and classification outputs are expected, the function will produce a warning and use the generic "I_Classification" score metric. Parameters ---------- predict_returns : list A list of the return types of the prediction method. These can be direct types or example values. target_variable : str Target variable to be predicted by the model. The default value is None. target_values : list, optional A list of target values for the target variable. The default is None. Returns ------- list of str A list containing string values for the model's score metrics. """ # Create a list mapped to the prediction returns that signifies their type predict_returns = cls._determine_returns_type(predict_returns) # No target values signals that the model is a prediction (regression) model if not target_values: if len(predict_returns) > 1: raise ValueError( "When no target values are provided, a prediction model is assumed." " Currently, SAS Model Manager only supports prediction models with" " singular outputs, therefore the score code cannot be written for " "this model. To continue with generating the score code for this" "model, please either provide the target values or define the score" " metrics explicitly." ) # Use generic prediction variable if none provided elif not target_variable: warn( "WARNING: No score metrics or target variables were provided for a " "prediction model. Therefore the output score metric is defaulted " 'to "I_Prediction"' ) return ["I_Prediction"] elif target_variable: return [f"I_{target_variable}"] # A model with only one expected target value will always get the same answer elif len(target_values) == 1 or not isinstance(target_values, list): raise ValueError( "Please provide all possible values for the target variable, including" " a no-event value." ) # Binary classification models elif len(target_values) == 2: if predict_returns.count(True) > 1 or predict_returns.count(False) > 2: raise ValueError( "Binary classification models should not return more than 1 " "classification value or more than 2 probability values. For " "example: [I_Class, P_1, P_0] has the maximum number of returns." ) else: gen = cls._yield_score_metrics( predict_returns, target_values, target_variable ) return [metric for metric in gen] # Multiclass classification models elif len(target_values) > 2: if predict_returns.count(True) > 1: raise ValueError( "SAS Model Manager does not currently support models with more than" " one target variable." ) elif predict_returns.count(False) not in [0, len(target_values)]: raise ValueError( "The number of target values provided does not match the number of" " returns from the prediction method that are probabilities." ) else: gen = cls._yield_score_metrics( predict_returns, target_values, target_variable ) return [metric for metric in gen] @staticmethod def _yield_score_metrics( returns: List[bool], values: list, variable: Optional[str] = None, ) -> Generator: """ For classification models without provided metrics, yield score metrics as determined by the target values, target variable, and returns from the prediction method. Parameters ---------- returns : list of bool A list of bools, such that `True` represents classification values and `False` represents probability or prediction values. variable : str Target variable to be predicted by the model. values : list A list of target values for the target variable. Yields ------ generator A generator containing metrics corresponding to the prediction returns. """ proba_count = 0 for val in returns: if val: # If True, then classification score metric if not variable: warn( "WARNING: No target variable was provided, therefore the " 'classification variable defaults to "I_Classification".' ) yield "I_" + variable if variable else "I_Classification" else: # If False, then prediction or probability score metric yield "P_" + str(values[proba_count]) proba_count += 1 @staticmethod def _determine_returns_type(outputs: List[Any]) -> List[bool]: """ Determine the return type of the prediction method. Returns a list of equal size to input argument, which contains `True` for classification values and `False` for probability or prediction values. Parameters ---------- outputs : list The list of expected outputs from the prediction method. Returns ------- returns : List of bool A list mapped to the input argument, such that `True` represents classification values and `False` represents probability or prediction values. """ def is_str(val): if isinstance(val, str) or val == str: return True elif isinstance(val, (float, int)) or val in [float, int]: return False else: return True return [is_str(val) for val in outputs] def _predictions_to_metrics( self, metrics: List[str], predict_returns: List[Any], target_values: Optional[List[str]] = None, predict_threshold: Optional[float] = None, h2o_model: Optional[bool] = False, target_index: Optional[int] = 1, ) -> None: """ Using the provided arguments, write in to the score code the method for handling the generated predictions. Errors are raised for improper combinations of metrics, target values, and predict method returns. Parameters ---------- metrics : list of str A list of strings corresponding to the outputs of the model to SAS Model Manager. predict_returns : list A list of the return types of the prediction method. These can be direct types or example values. target_values : list of str, optional A list of target values for the target variable. The default value is None. predict_threshold : float, optional The prediction threshold for normalized probability score_metrics. Values are expected to be between 0 and 1. The default value is None. h2o_model : bool, optional Flag to indicate that the model is an H2O.ai model. The default value is False. target_index : int, optional Sets the index of success for a binary model. If target_values are given, this index should match the index of the target outcome in target_values. If target_values are not given, this index should indicate whether the the target probability variable is the first or second variable returned by the model. The default value is 1. """ if not target_index: target_index = 1 if len(metrics) == 1 and isinstance(metrics, list): # Flatten single valued list metrics = metrics[0] # Prediction model or no-calculation classification model if not (target_values or predict_threshold): self._no_targets_no_thresholds(metrics, predict_returns, h2o_model) elif not target_values and predict_threshold: raise ValueError( "A threshold was provided to interpret the prediction results, however " "a target value was not, therefore, a valid output cannot be generated." ) # Binary classification model elif len(target_values) == 2: self._binary_target( metrics, target_values, predict_returns, predict_threshold, target_index, h2o_model, ) # Multiclass classification model elif len(target_values) > 2: self._nonbinary_targets(metrics, target_values, predict_returns, h2o_model) def _no_targets_no_thresholds( self, metrics: Union[List[str], str], returns: List[Any], h2o_model: Optional[bool] = False, ) -> None: """ Handle prediction outputs where the prediction does not expect handling by the score code. Parameters ---------- metrics : list of str or str A list of strings corresponding to the outputs of the model to SAS Model Manager. returns : list The list of expected outputs from the prediction method. h2o_model : bool, optional Flag to indicate that the model is an H2O.ai model. The default value is False. """ if ( (len(returns) != len(metrics) and not isinstance(metrics, str)) or (len(returns) != 1 and isinstance(metrics, str)) ) and not h2o_model: raise ValueError( "The number of returns from the predict function does not match the " "number of score metrics provided. Either provide target values for the" " score code to use in calculating the classification value or update " "the provided score metrics and prediction returns." ) elif isinstance(metrics, str): # Classification (with only classification output) or prediction model if h2o_model: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}{metrics} = prediction[1][0]\n{'':8}return {metrics}\n" f"{'':4}else:\n" f"{'':8}output_table = prediction.drop(prediction.columns[1:], axis=1)\n" f"{'':8}output_table.columns = ['{metrics}']\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: Classification = prediction[1][0] return Classification else: output_table = prediction.drop(prediction.columns[1:], axis=1) output_table.columns = ['Classification'] return output_table """ else: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}{metrics} = prediction[0][0]\n{'':8}return {metrics}\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame({{'{metrics}': prediction}})\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: Classification = prediction[0][0] return Classification else: output_table = pd.DataFrame({'Classification': prediction}) return output_table """ else: # Classification model including predictions and classification if h2o_model: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}{metrics[0]} = prediction[1][0]\n" ) for i in range(len(metrics) - 1): self.score_code += ( f"{'':8}{metrics[i + 1]} = float(prediction[1][{i + 1}])\n" ) self.score_code += ( f"{'':8}return {', '.join(metrics)}\n" f"{'':4}else:\n" f"{'':8}prediction.columns = {metrics}\n" f"{'':8}return prediction\n" ) """ if input_array.shape[0] == 1: Classification = prediction[1][0] Proba_A = float(prediction[1][1]) Proba_B = float(prediction[1][2]) Proba_C = float(prediction[1][3]) return Classification, Proba_A, Proba_B, Proba_C else: prediction.columns = ['Classification', 'Proba_A', 'Proba_B', 'Proba_C'] return prediction """ else: self.score_code += f"{'':4}if input_array.shape[0] == 1:\n" for i in range(len(metrics)): self.score_code += f"{'':8}{metrics[i]} = prediction[0][{i}]\n" self.score_code += f"\n{'':8}return {', '.join(metrics)}\n" self.score_code += ( f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame(prediction, columns={metrics})" f"\n{'':8}return output_table\n" ) """ if input_array.shape[0] == 1: Classification = prediction[0][0] Proba_A = prediction[0][1] Proba_B = prediction[0][2] Proba_C = prediction[0][3] return Classification, Proba_A, Proba_B, Proba_C else: output_table = pd.DataFrame(prediction, columns=['Classification', 'Proba_A', 'Proba_B', 'Proba_C']) return output_table """ def _binary_target( self, metrics: Union[List[str], str], target_values: List[str], returns: List[Any], threshold: Optional[float] = None, target_index: Optional[int] = 1, h2o_model: Optional[bool] = None, ) -> None: """ Handle binary model prediction outputs. Parameters ---------- metrics : list of str or str A list of strings corresponding to the outputs of the model to SAS Model Manager. target_values : list of str A list of target values for the target variable. returns : list The list of expected outputs from the prediction method. threshold : float, optional The prediction threshold for normalized probability score_metrics. Values are expected to be between 0 and 1. The default value is None. h2o_model : bool, optional Flag to indicate that the model is an H2O.ai model. The default value is False. target_index : int, optional Sets the index of the probability value to be returned from a binary model. The default value is two for h2o models, and one otherwise. """ if not threshold: # Set default threshold threshold = 0.5 returns = self._determine_returns_type(returns) if len(returns) > 3: raise ValueError( f"The prediction method has {len(returns)} returns. The score code " f"generation cannot parse that many return values for a binary " f"classification model." ) elif sum(returns) >= 2: raise ValueError( "Based on the return types provided, the prediction method returns " "multiple classification values. Multilabel models are not supported." ) if isinstance(metrics, str): # For h2o models with only one metric provided, return the classification if h2o_model: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}if prediction[1][{target_index+1}] > {threshold}:\n" f"{'':12}{metrics} = \"{target_values[target_index]}\"\n" f"{'':8}else:\n" f"{'':12}{metrics} = \"{target_values[abs(target_index-1)]}\"\n" f"{'':8}return {metrics}\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame({{'{metrics}': np.where(prediction[prediction.columns[{target_index+1}]] > {threshold}, '{target_values[target_index]}', '{target_values[abs(target_index-1)]}')}})\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: if prediction[1][2] > 0.5: Classification = "A" else: Classification = "B" return Classification else: output_table = pd.DataFrame({'Classification': np.where(prediction[prediction.columns[2]] > .5, 'B', 'A')}) return output_table """ # One return that is the classification elif len(returns) == 1 and returns[0]: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0]\n" f"{'':4}else:\n" f"{'':8}return pd.DataFrame({{'{metrics}': prediction}})" ) """ if input_array.shape[0] == 1: return prediction[0] else: return pd.DataFrame({'Classification': prediction}) """ # One return that is a probability elif len(returns) == 1 and not returns[0]: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}if prediction[0] > {threshold}:\n" f"{'':12}{metrics} = \"{target_values[target_index]}\"\n" f"{'':8}else:\n" f"{'':12}{metrics} = \"{target_values[abs(target_index-1)]}\"\n" f"{'':8}return {metrics}\n" f"{'':4}else:\n" f"{'':8}return pd.DataFrame({{'{metrics}': ['{target_values[target_index]}' if p > {threshold} else '{target_values[abs(target_index-1)]}' for p in prediction]}})\n" ) """ if input_array.shape[0] == 1: if prediction[0] > 0.5: Classification = "A" else: Classification = "B" return Classification else: return pd.DataFrame({'Classification': ['B' if p > 0.5 else 'A' for p in prediction]}) """ # Two returns from the prediction method elif len(returns) == 2 and sum(returns) == 0: # Only probabilities returned; return classification for larger value self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}if prediction[0][{target_index}] > {threshold}:\n" f"{'':12}{metrics} = \"{target_values[target_index]}\"\n" f"{'':8}else:\n" f"{'':12}{metrics} = \"{target_values[abs(target_index-1)]}\"\n\n" f"{'':8}return {metrics}\n" f"{'':4}else:\n" f"{'':8}target_values = {target_values}\n" f"{'':8}prediction = pd.DataFrame(prediction)\n" f"{'':8}output_table = pd.DataFrame({{'{metrics}': np.where(prediction[prediction.columns[{target_index}]] > {threshold}, '{target_values[target_index]}', '{target_values[abs(target_index-1)]}')}})\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: if prediction[0][0] > .5: Classification = "B" else: Classification = "A" return Classification else: target_values = ['A', 'B'] output_table = pd.DataFrame({'Classification' : np.array(target_values)[np.argmax(prediction, axis=1)]}) return output_table """ # Classification and probability returned; return classification value elif len(returns) > 1 and sum(returns) == 1: # TODO: Either figure out how to handle threshold or add warning # Determine which return is the classification value class_index = [i for i, x in enumerate(returns) if x][0] self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}{metrics} = prediction[0][{class_index}]\n{'':8}return {metrics}\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame({{'{metrics}': [p[{class_index}] for p in prediction]}})\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: Classification = prediction[0][1] return Classification else: output_table = pd.DataFrame({'Classification': [p[1] for p in prediction]}) return output_table """ else: self._invalid_predict_config() elif len(metrics) == 2: # H2O models with two metrics are assumed to be classification + probability if h2o_model: warn( "For H2O models, it is assumed if two metrics are provided, the " "score code should output the classification and probability for " "the target event to occur." ) self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}if prediction[1][{target_index+1}] > {threshold}:\n" f"{'':12}{metrics[0]} = '{target_values[target_index]}'\n" f"{'':8}else:\n" f"{'':12}{metrics[0]} = '{target_values[abs(target_index-1)]}'\n" f"{'':8}return {metrics[0]}, float(prediction[1][{target_index+1}])\n" f"{'':4}else:\n" f"{'':8}output_table = prediction.drop(prediction.columns[{abs(target_index-1)+1}], axis=1)\n" f"{'':8}classifications = np.where(prediction[prediction.columns[{target_index+1}]] > {threshold}, '{target_values[target_index]}', '{target_values[abs(target_index-1)]}')\n" f"{'':8}output_table.columns = {metrics}\n" f"{'':8}output_table['{metrics[0]}'] = classifications\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: if prediction[1][1] > 0.5: Classification = '1' else: Classification = '0' return EM_CLASSIFICATION, float(prediction[1][1]) else: output_table = prediction.drop(prediction.columns[2], axis=1) classifications = np.where(prediction[prediction.columns[1]] > 0.5, '0', '1') output_table.columns = ['EM_CLASSIFICATION', 'EM_EVENTPROBABILITY'] output_table['EM_CLASSIFICATION'] = classifications """ # Calculate the classification; return the classification and probability elif sum(returns) == 0 and len(returns) == 1: warn( "Due to the ambiguity of differentiating the classification and " "probability output metrics, it is assumed that the classification " "metric is returned first." ) self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}if prediction[0] > {threshold}:\n" f"{'':12}{metrics[0]} = \"{target_values[target_index]}\"\n" f"{'':8}else:\n" f"{'':12}{metrics[0]} = \"{target_values[abs(target_index-1)]}\"\n\n" f"{'':8}return {metrics[0]}, prediction[0]\n" f"{'':4}else:\n" f"{'':8}classifications = ['{target_values[target_index]}' if p > {threshold} else '{target_values[abs(target_index-1)]}' for p in prediction]\n" f"{'':8}return pd.DataFrame({{'{metrics[0]}': classifications, '{metrics[1]}': prediction}})" ) """ if input_array.shape[0] == 1: if prediction[0] > 0.5: Classification = "B" else: Classification = "A" return Classification, prediction[0] else: classifications = ['B' if p > 0.5 else 'A' for p in prediction] return pd.DataFrame({'Classification': classifications, 'Probability': prediction}) """ # Calculate the classification; return the classification and probability elif sum(returns) == 0 and len(returns) == 2: warn( "Due to the ambiguity of the provided metrics and prediction return" " types, the score code assumes that a classification and the " "target event probability should be returned." ) self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}if prediction[0][{target_index}] > {threshold}:\n" f"{'':12}{metrics[0]} = \"{target_values[target_index]}\"\n" f"{'':8}else:\n" f"{'':12}{metrics[0]} = \"{target_values[abs(target_index-1)]}\"\n" f"{'':8}return {metrics[0]}, prediction[0][{target_index}]\n" f"{'':4}else:\n" f"{'':8}df = pd.DataFrame(prediction)\n" f"{'':8}proba = df[{target_index}]\n" f"{'':8}classifications = np.where(df[{target_index}] > {threshold}, '{target_values[target_index]}', '{target_values[abs(target_index-1)]}')\n" f"{'':8}return pd.DataFrame({{'{metrics[0]}': classifications, '{metrics[1]}': proba}})" ) """ if input_array.shape[0] == 1: if prediction[0][1] > .5: Classification = "B" else: Classification = "A" return Classification, prediction[0][1] else: df = pd.DataFrame(prediction) proba = df[0] classifications = np.where(df[1] > .5, 'B', 'A') return pd.DataFrame({'Classification': classifications, 'Probability': proba}) """ # TODO: Potentially add threshold # Return classification and probability value elif sum(returns) == 1 and len(returns) == 2: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0][0], prediction[0][1]\n" f"{'':4}else:\n" f"{'':8}return pd.DataFrame(prediction, columns={metrics})" ) """ if input_array.shape[0] == 1: return prediction[0][0], prediction[0][1] else: return pd.DataFrame(prediction, columns=['Classification', 'Probability']) """ elif sum(returns) == 1 and len(returns) == 3: warn( "Due to the ambiguity of the provided metrics and prediction return" " types, the score code assumes that a classification and the " "target event probability should be returned." ) # Determine which return is the classification value class_index = [i for i, x in enumerate(returns) if x][0] if class_index == 0: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0][0], prediction[0][{target_index+1}]\n" f"{'':4}else:\n" f"{'':8}prediction = pd.DataFrame(prediction)\n" f"{'':8}output_table = prediction.drop(prediction.columns[{abs(target_index-1)+1}], axis=1)\n" f"{'':8}output_table.columns = {metrics}\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: return prediction[0][0], prediction[0][2] else: output_table = prediction.drop(prediction.columns[1], axis=1) output_table.columns = ["Classification", "Probability"] return output_table """ else: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0][{class_index}], prediction[0][{target_index}]\n" f"{'':4}else:\n" f"{'':8}prediction = pd.DataFrame(prediction)\n" f"{'':8}output_table = prediction.drop(prediction.columns[{abs(target_index-1)}], axis=1)\n" f"{'':8}output_table = output_table[output_table.columns[::-1]]\n" f"{'':8}output_table.columns = {metrics}\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: return prediction[0][2], prediction[0][0] else: output_table = prediction.drop(prediction.columns[0], axis=1) output_table = output_table[output_table.columns[::-1]] output_table.columns = ["Classification", "Probability"] return output_table.drop('drop', axis=1) """ else: self._invalid_predict_config() elif len(metrics) == 3: if h2o_model: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[1][0], float(prediction[1][1]), " f"float(prediction[1][2])\n" f"{'':4}else:\n" f"{'':8}prediction.columns = {metrics}\n" f"{'':8}return prediction" ) """ if input_array.shape[0] == 1: return prediction[1][0], float(prediction[1][1]), float(prediction[1][2]) else: prediction.columns = ['Classification', 'Proba_0', 'Proba_1'] return prediction """ elif sum(returns) == 0 and len(returns) == 1: warn( "Due to the ambiguity of the provided metrics and prediction return" " types, the score code assumes the return order to be: " "[classification, probability of event, probability of no event]." ) self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}if prediction[0] > {threshold}:\n" f"{'':12}{metrics[0]} = \"{target_values[target_index]}\"\n" f"{'':8}else:\n" f"{'':12}{metrics[0]} = \"{target_values[abs(target_index-1)]}\"\n" f"{'':8}return {metrics[0]}, prediction[0], 1 - prediction[0]\n" f"{'':4}else:\n" f"{'':8}classifications = ['{target_values[target_index]}' if p > {threshold} else '{target_values[abs(target_index-1)]}' for p in prediction]\n" f"{'':8}output_table = pd.DataFrame({{'{metrics[0]}': classifications, '{metrics[1]}': prediction}})\n" f"{'':8}output_table['{metrics[2]}'] = 1 - output_table['{metrics[1]}']\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: if prediction[0] > 0.5: Classification = "B" else: Classification = "A" return Classification, prediction[0], 1 - prediction[0] else: classifications = ['B' if p > 0.5 else 'A' for p in prediction] output_table = pd.DataFrame({'Classification': classifications, 'Proba_0': prediction}) output_table['Proba_1'] = 1 - output_table['Proba_0'] return output_table """ elif sum(returns) == 0 and len(returns) == 2: # TODO: Make decision on whether ordering should follow given pattern or reflect input ordering warn( "Due to the ambiguity of the provided metrics and prediction return" " types, the score code assumes the return order to be: " "[classification, probability of event, probability of no event] " "for a single return. For batch scoring, the return order of the " "probabilities will mirror their return order in the model." ) self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}if prediction[0][{target_index}] > {threshold}:\n" f"{'':12}{metrics[0]} = \"{target_values[target_index]}\"\n" f"{'':8}else:\n" f"{'':12}{metrics[0]} = \"{target_values[abs(target_index-1)]}\"\n" f"{'':8}return {metrics[0]}, prediction[0][{target_index}], prediction[0][{abs(target_index-1)}]\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame(prediction, columns={metrics[1:]})\n" f"{'':8}classifications = np.where(output_table[output_table.columns[{target_index}]] > {threshold}, '{target_values[target_index]}', '{target_values[abs(target_index-1)]}')\n" f"{'':8}output_table.insert(loc=0, column='{metrics[0]}', value=classifications)\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: if prediction[0][0] > prediction[0][1]: Classification = "A" else: Classification = "B" return Classification, prediction[0][0], prediction[0][1] else: output_table = pd.DataFrame(prediction, columns=["Proba_0","Proba_1"]) classifications = np.where(prediction[prediction.columns[2]] > .5, 'B', 'A') output_table.insert(loc=0, column='Classification', value=classifications) return output_table """ # Find which return is the classification, then return probabilities elif sum(returns) == 1 and len(returns) == 2: # Determine which return is the classification value class_index = [i for i, x in enumerate(returns) if x][0] if class_index == 0: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0][0], prediction[0][1], 1 - prediction[0][1]\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame(prediction, columns={metrics[:2]})\n" f"{'':8}output_table['{metrics[2]}'] = 1 - output_table['{metrics[1]}']\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: return prediction[0][0], prediction[0][1], 1 - prediction[0][1] else: output_table = pd.DataFrame(prediction, columns=["Classification","Proba_0"]) output_table['Proba_1'] = 1 - output_table['Proba_0'] return output_table """ else: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0][1], prediction[0][0], 1 - prediction[0][0]\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame(prediction, columns={metrics[1::-1]})\n" f"{'':8}output_table = output_table[output_table.columns[::-1]]\n" f"{'':8}output_table['{metrics[2]}'] = 1 - output_table['{metrics[1]}']\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: return prediction[0][1], prediction[0][0], 1 - prediction[0][0] else: output_table = pd.DataFrame(prediction, columns=["Proba_0","Classification"]) output_table = output_table[output_table.columns[::-1]] output_table['Proba_1'] = 1 - output_table['Proba_0'] return output_table """ # Return all values from prediction method elif sum(returns) == 1 and len(returns) == 3: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0][0], prediction[0][1], prediction[0][2]\n" f"{'':4}else:\n" f"{'':8}return pd.DataFrame(prediction, columns={metrics})" ) """ if input_array.shape[0] == 1: return prediction[0][0], prediction[0][1], prediction[0][2] else: return pd.DataFrame(prediction, columns=['Classification', 'Proba_0', 'Proba_1']) """ else: self._invalid_predict_config() else: raise ValueError("Too many score metrics were provided for a binary model.") def _nonbinary_targets( self, metrics: Union[List[str], str], target_values: List[str], returns: List[Any], h2o_model: Optional[bool] = None, ) -> None: """ Handle multiclass model prediction outputs. Parameters ---------- metrics : list of str or str A list of strings corresponding to the outputs of the model to SAS Model Manager. target_values : list of str, optional A list of target values for the target variable. returns : list The list of expected outputs from the prediction method. h2o_model : bool, optional Flag to indicate that the model is an H2O.ai model. The default value is False. """ returns = self._determine_returns_type(returns) if sum(returns) >= 2: raise ValueError( "Based on the return types provided, the prediction method returns " "multiple classification values. Multilabel models are not supported." ) # Find the target value with the highest probability if isinstance(metrics, str): # For h2o models with only one metric provided, return the classification if h2o_model: self.score_code += ( f"{'':4}target_values = {target_values}\n" f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}{metrics} = target_values[prediction[1][1:]." f"index(max(prediction[1][1:]))]\n" f"{'':8}return {metrics}\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame({{'{metrics}': np.array(target_values)[np.argmax(prediction.iloc[:, 1:].values, axis=1)]}})\n" f"{'':8}return output_table" ) """ target_values = ['A', 'B', 'C'] if input_array.shape[0] == 1: Classification = target_values[prediction[1][1:].index(max(prediction[1][1:]))] return Classification else: output_table = pd.DataFrame({'Classification': np.array(target_values)[np.argmax(prediction.iloc[:, 1:].values, axis=1)]}) return output_table """ # One return that is the classification elif len(returns) == 1: self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0][0]\n" f"{'':4}else:\n" f"{'':8}return pd.DataFrame({{'{metrics}': prediction}})" ) """ if input_array.shape[0] == 1: return prediction[0] else: return pd.DataFrame({'Classification': prediction}) """ elif len(returns) == len(target_values): self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}target_values = {target_values}\n" f"{'':8}return target_values[prediction[0].index(max(prediction[0]))]\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame({{'{metrics}' : np.array({target_values})[np.argmax(prediction, axis=1)]}})\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: target_values = ['A', 'B', 'C'] return target_values[prediction[0].index(max(prediction[0]))] else: output_table = pd.DataFrame({'Classification' : np.array(['A', 'B', 'C'])[np.argmax(prediction, axis=1)]}) return output_table """ elif len(returns) == (len(target_values) + 1): # Determine which return is the classification value class_index = [i for i, x in enumerate(returns) if x][0] self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0][{class_index}]\n" f"{'':4}else:\n" f"{'':8}return pd.DataFrame({{'{metrics}': [p[{class_index}] for p in prediction]}})" ) """ if input_array.shape[0] == 1: return prediction[0][0] else: return pd.DataFrame({'Classification': [p[0] for p in prediction]}) """ else: self._invalid_predict_config() elif len(metrics) == 2: if h2o_model: self.score_code += ( f"{'':4}target_values = {target_values}\n" f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}{metrics[0]} = target_values[prediction[1][1:]." f"index(max(prediction[1][1:]))]\n" f"{'':8}return {metrics[0]}, max(prediction[1][1:])\n" f"{'':4}else:\n" f"{'':8}index = np.argmax(prediction.iloc[0:, 1:].values, axis=1)\n" f"{'':8}return pd.DataFrame({{'{metrics[0]}': np.array(target_values)[index], '{metrics[1]}': np.max(prediction.iloc[0:, 1:], axis=1)}})\n" ) """ target_values = ['A', 'B', 'C'] if input_array.shape[0] == 1: Classification = target_values[prediction[1][1:].index(max(prediction[1][1:]))] return Classification, max(prediction[1][1:]) else: index = np.argmax(prediction.iloc[0:, 1:].values, axis=1) return pd.DataFrame({'Classification': np.array(target_values)[index], 'Probability': np.max(prediction.iloc[0:, 1:], axis=1)}) """ elif len(returns) == len(target_values): self.score_code += ( f"{'':4}target_values = {target_values}\n" f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return target_values[prediction[0].index(max(prediction[0]))], " f"max(prediction[0])\n" f"{'':4}else:\n" f"{'':8}df = pd.DataFrame(prediction)\n" f"{'':8}index = np.argmax(df.values, axis=1)\n" f"{'':8}classifications = np.array(target_values)[index]\n" f"{'':8}max_proba = np.max(df.values, axis=1)\n" f"{'':8}return pd.DataFrame({{'{metrics[0]}': classifications, '{metrics[1]}': max_proba}})" ) """ target_values = ['A', 'B', 'C'] if input_array.shape[0] == 1: return target_values[prediction[0].index(max(prediction[0]))], max(prediction[0]) else: df = pd.DataFrame(prediction) index = np.argmax(df.values, axis=1) classifications = np.array(target_values)[index] max_proba = np.max(df.values, axis=1) return pd.DataFrame({'Classification': classifications, 'Probability': max_proba}) """ elif len(returns) == (len(target_values) + 1): # Determine which return is the classification value class_index = [i for i, x in enumerate(returns) if x][0] self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return prediction[0][{class_index}], " f"max(prediction[0][:{class_index}] + prediction[0][{class_index + 1}:])\n" f"{'':4}else:\n" f"{'':8}df = pd.DataFrame(prediction)\n" f"{'':8}probas = df.drop({class_index}, axis=1)\n" f"{'':8}max_proba = np.max(probas.values, axis=1)\n" f"{'':8}return pd.DataFrame({{'{metrics[0]}': df[{class_index}], '{metrics[1]}': max_proba}})" ) """ if input_array.shape[0] == 1: return prediction[0][0], max(prediction[0][:0] + prediction[0][1:]) else: df = pd.DataFrame(prediction) probas = df.drop(0, axis=1) max_proba = np.max(probas.values, axis=1) return pd.DataFrame({'Classification': df[0], 'Probability': max_proba}) """ else: self._invalid_predict_config() elif len(metrics) > 2: if h2o_model: if len(metrics) == len(target_values): h2o_returns = [f"prediction[1][{i+1}]" for i in range(len(metrics))] self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return {', '.join(h2o_returns)}\n" f"{'':4}else:\n" f"{'':8}output_table = prediction.drop(prediction.columns[0], axis=1)\n" f"{'':8}output_table.columns = {metrics}\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: return prediction[1][1], prediction[1][2], prediction[1][3] else: output_table = prediction.drop(prediction.columns[0], axis=1) output_table.columns = ['Proba_0', 'Proba_1', 'Proba_2'] return output_table """ elif len(metrics) == (len(target_values) + 1): h2o_returns = [f"prediction[1][{i}]" for i in range(len(metrics))] self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return {', '.join(h2o_returns)}\n" f"{'':4}else:\n" f"{'':8}prediction.columns = {metrics}\n" f"{'':8}return prediction" ) """ if input_array.shape[0] == 1: return prediction[1][0], prediction[1][1], prediction[1][2], prediction[1][3] else: prediction.columns = ['Classification', 'Proba_0', 'Proba_1', 'Proba_2'] return prediction """ elif ( len(metrics) == len(target_values) == len(returns) and sum(returns) == 0 ) or ( len(metrics) == (len(target_values) + 1) == len(returns) and sum(returns) == 1 ): proba_returns = [f"prediction[0][{i}]" for i in range(len(returns))] self.score_code += ( f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return {', '.join(proba_returns)}\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame(prediction, columns={metrics})\n" f"{'':8}return output_table" ) """ if input_array.shape[0] == 1: return prediction[0][0], prediction[0][1], prediction[0][2] else: output_table = pd.DataFrame(prediction, columns=['Proba_0', 'Proba_1', 'Proba_2']) return output_table """ elif (len(metrics) - 1) == len(returns) == len(target_values) and sum( returns ) == 0: proba_returns = [f"prediction[0][{i}]" for i in range(len(returns))] self.score_code += ( f"{'':4}target_values = {target_values}\n\n" f"{'':4}if input_array.shape[0] == 1:\n" f"{'':8}return target_values[prediction[0].index(max(prediction[0]))], " f"{', '.join(proba_returns)}\n" f"{'':4}else:\n" f"{'':8}output_table = pd.DataFrame(prediction, columns={metrics[1:]})\n" f"{'':8}classifications = np.array(target_values)[np.argmax(output_table.values, axis=1)]\n" f"{'':8}output_table.insert(0, '{metrics[0]}', classifications)\n" f"{'':8}return output_table" ) """ target_values = ['A', 'B', 'C'] if input_array.shape[0] == 1: return target_values[prediction[0].index(max(prediction[0]))], prediction[0][0], prediction[0][1], prediction[0][2] else: output_table = pd.DataFrame(prediction, columns=['Proba_0', 'Proba_1', 'Proba_2']) classifications = np.array(target_values)[np.argmax(output_table.values, axis=1)] output_table.insert(0, 'Classification', classifications) return output_table """ else: self._invalid_predict_config() @staticmethod def _invalid_predict_config(): raise ValueError( "An invalid combination of score metrics, target values, predict " "returns, and predict return types was provided, such that the " "expected return statement for the score code could not be " "determined." )
[docs] @staticmethod def convert_mas_to_cas(mas_code: str, model: Union[str, dict, RestObj]) -> str: """ Using the generated score.sas code from the Python wrapper API, convert the SAS Microanalytic Service based code to CAS compatible. Parameters ---------- mas_code : str String representation of the dmcas_packagescorecode.sas DS2 wrapper model : str, dict, or RestObj The name or id of the model, or a dictionary representation of the model Returns ------- CASCode : str String representation of the dmcas_epscorecode.sas DS2 wrapper code """ model = mr.get_model(model) output_string = "" for out_var in model["outputVariables"]: output_string += "dcl " if out_var["type"] == "string": output_string = output_string + "varchar(100) " else: output_string += "double " output_string += out_var["name"] + ";\n" start = mas_code.find("score(") finish = mas_code[start:].find(");") score_vars = mas_code[start + 6 : start + finish] input_string = " ".join( [ x for x in score_vars.split(" ") if (x != "double" and x != "in_out" and x != "varchar(100)") ] ) end_block = ( f"method run();\n{'':4}set SASEP.IN;\n{'':4}score({input_string});\nend;" f"\nenddata;" ) replace_strings = { "package pythonScore / overwrite=yes;": "data sasep.out;", "dcl int resultCode revision;": "dcl double resultCode revision;\n" + output_string, "endpackage;": end_block, } replace_strings = dict((re.escape(k), v) for k, v in replace_strings.items()) pattern = re.compile("|".join(replace_strings.keys())) cas_code = pattern.sub( lambda m: replace_strings[re.escape(m.group(0))], mas_code ) return cas_code
@classmethod def _input_var_lists( cls, input_data: Union[DataFrame, List[dict]] ) -> Tuple[List[str], List[str]]: """ Using an input dataset, generate lists of variables and their types. MLFlow models are handled differently and expect a list of dicts instead of a Pandas DataFrame. Parameters ---------- input_data : pandas.DataFrame or list of dict The `DataFrame` object contains the training data, and includes only the predictor columns. The write_score_code function currently supports int(64), float(64), and string data types for scoring. Providing a list of dict objects signals that the model files are being created from an MLFlow model. Returns ------- input_var_list : List of str A list of variable names for the input dataset. input_dtypes_list : List of str A list of variable types for the input dataset. """ if isinstance(input_data, pd.DataFrame): # From the input dataframe columns, create a list of input variables, # then check for viability input_var_list = input_data.columns.to_list() cls._check_for_invalid_variable_names(input_var_list) input_dtypes_list = input_data.dtypes.astype(str).to_list() else: # For MLFlow models, extract the variables and data types input_var_list = [var["name"] for var in input_data] cls._check_for_invalid_variable_names(input_var_list) input_dtypes_list = [var["type"] for var in input_data] return input_var_list, input_dtypes_list @classmethod def _check_viya_version(cls, model: Union[str, dict, RestObj]) -> Union[str, None]: """ Check that a valid SAS Viya version and model argument are provided. For SAS Viya 3.5, model score code requires the model UUID. Parameters ---------- model : str, dict, or RestObj The name or id of the model, or a dictionary representation of the model. The default value is None and is only necessary for models that will be hosted on SAS Viya 3.5. Returns ------- model_id : str or None SAS Model Manager model uuid for SAS Viya 3.5 models or None """ # No session supplied, assume SAS Viya 4 model if not current_session(): warn( "No current session connection was found to a SAS Viya server. Score " "code will be written under the assumption that the target server is " "SAS Viya 4." ) return None # Session and no model, raise error if SAS Viya 3.5 model elif current_session() and not model: if current_session().version_info() == 3.5: raise SystemError( "Score code for SAS Viya 3.5 requires the model's " "UUID. Please provide either the model name, uuid, or" "dictionary response from mr.get_model(model)." ) else: return None # Session and model, return uuid if SAS Viya 3.5 model elif current_session() and model: if current_session().version_info() == 3.5: return cls._get_model_id(model) else: return None
[docs] @staticmethod def sanitize_model_prefix(prefix: str) -> str: """ Check the model_prefix for a valid Python function name. Parameters ---------- prefix : str The variable for the model name that is used when naming model files. (For example: hmeqClassTree + [Score.py || .pickle]). Returns ------- model_prefix : str Returns a model_prefix, adjusted as needed for valid Python function names. """ # Replace model_prefix if a valid function name is not provided if not prefix.isidentifier(): new_prefix = re.sub(r"\W|^(?=\d)", "_", prefix) warn( f"The model_prefix argument needs to be a valid Python function " f"name. The provided value of {prefix} has been replaced " f"with {new_prefix}." ) return new_prefix else: return prefix
def _viya35_score_code_import( self, prefix: str, model_id: str, score_cas: bool ) -> Tuple[str, str]: """ Upload the score code to SAS Model Manager and generate DS2 wrappers as needed. If score_cas is True, then the function pulls down the score.sas default wrapper generated in SAS Viya 3.5 and modifies it to work in both MAS and CAS. Parameters ---------- prefix : str The variable for the model name that is used when naming model files. (For example: hmeqClassTree + [Score.py || .pickle]). model_id : str SAS Model Manager uuid for the model. score_cas : bool Sets whether models registered to SAS Viya 3.5 should be able to be scored and validated through both CAS and SAS Micro Analytic Service. If set to false, then the model will only be able to be scored and validated through SAS Micro Analytic Service. The default value is True. Returns ------- mas_code : str A string representation of the dmcas_packagescorecode.sas code used in MAS. cas_code : str A string representation of the dmcas_epscorecode.sas code used in CAS. """ files = [ { "name": f"score_{prefix}.py", "file": self.score_code, "role": "score", } ] self.upload_and_copy_score_resources(model_id, files) # The typeConversion endpoint is only valid for models with Python score code model = mr.get_model(model_id) model["scoreCodeType"] = "Python" model = mr.update_model(model) mr.convert_python_to_ds2(model) if score_cas: model_contents = mr.get_model_contents(model) for file in model_contents: if file.name == "score.sas": mas_code = mr.get( f"models/{file.modelId}/contents/{file.id}/content" ) self.upload_and_copy_score_resources( model_id, [ { "name": MAS_CODE_NAME, "file": mas_code, "role": "score", } ], ) cas_code = self.convert_mas_to_cas(mas_code, model) self.upload_and_copy_score_resources( model, [ { "name": CAS_CODE_NAME, "file": cas_code, "role": "score", } ], ) model = mr.get_model(model_id) model["scoreCodeType"] = "ds2MultiType" mr.update_model(model) return mas_code, cas_code def _add_preprocess_code( self, preprocess_function: Callable[[DataFrame], DataFrame] ): """ Places the given preprocess function, which must both take a DataFrame as an argument and return a DataFrame, into the score code. If the preprocess function does not return anything, an error is thrown. Parameters ---------- preprocess_function: function The preprocess function to be added to the score code. """ import inspect preprocess_code = inspect.getsource(preprocess_function) if not "return" in preprocess_code: raise ValueError( "The given score code does not return a value. " + "To allow for the score code to work correctly, please ensure the preprocessed " + "data is returned." ) if self.score_code[-1] == "\n": self.score_code += preprocess_code else: self.score_code += "\n" + preprocess_code