#!/usr/bin/env python
# encoding: utf-8
#
# Copyright © 2019, SAS Institute Inc., Cary, NC, USA. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Contains utilities for wrapping Python models in DS2 for publishing."""
from __future__ import print_function
import base64
import importlib
import os
import pickle # skipcq BAN-B301
import re
import sys
from collections import OrderedDict
from ..decorators import versionadded, versionchanged
from ..misc import random_string
from .ds2 import DS2PyMASPackage, DS2Thread, DS2Variable
from .python import ds2_variables
[docs]
@versionchanged(reason="Added `name` parameter.", version="1.5")
def build_wrapper_function(
func, variables, array_input, name="wrapper", setup=None, return_msg=None
):
"""Wraps a function to ensure compatibility when called by PyMAS.
PyMAS has strict expectations regarding the format of any function called
directly by PyMAS. Isolating the desired function inside a wrapping
function provides a simple way to ensure that functions called by PyMAS
are compliant.
Parameters
----------
func : Callable or str
Function name or an instance of Function which will be wrapped
variables : list of DS2Variable
array_input : bool
Whether `variables` should be combined into a single array before
passing to `func`
name : str, optional
Name of the generated wrapper function. Defaults to 'wrapper'.
setup : Iterable
Python source code lines to be executed during package setup
return_msg : bool, optional
Deprecated.
Whether error messages will be captured and returned as an additional
output variable. Defaults to True.
Returns
-------
str
The Python definition for the wrapper function.
Notes
-----
The format for the `# Output: ` is very strict. It must be exactly
"# Output: <var>, <var>". Any changes to spelling, capitalization,
punctuation, or spacing will result in an error when the DS2 code is
executed.
"""
if return_msg is not None:
raise DeprecationWarning(
"The 'return_msg' parameter is ignored and "
"will be removed completely in a future "
"version"
)
# Always return an error message out of any Python method call.
return_msg = True
# need to know func & input/output vars for each func
input_names = [v.name for v in variables if not v.out]
output_names = [v.name for v in variables if v.out]
args = input_names
func = func.__name__ if callable(func) else func
# HELPER: SAS to python char issue where SAS char have spaces and python string does not.
# NOTE: we assume SAS char always need white space to be trimmed. This seems to match python model built so far
# Note: Using this string fix to help with None or blank input situation for numerics
string_input = ("",)
for v in variables:
if not v.out:
if v.type == "char":
string_input += (" if {0}: {0} = {0}.strip()".format(v.name),)
else:
string_input += (" if {0} is None: {0} = np.nan".format(v.name),)
# Statement to execute the function w/ provided parameters
if array_input:
middle = string_input + (
" input_array = np.array([{}]).reshape((1, -1))".format(
", ".join(args)
),
" columns = [{}]".format(", ".join('"{0}"'.format(w) for w in args)),
" input_df = pd.DataFrame(data=input_array, columns=columns)",
" result = {}(input_df)".format(func),
)
else:
func_call = "{}({})".format(func, ",".join(args))
middle = (" result = {}".format(func_call),)
if setup:
header = (
("try:",)
+ tuple(" " + line for line in setup)
+ (
" _compile_error = None",
"except Exception as e:",
" _compile_error = e",
"",
)
)
else:
header = ("",)
# NOTE: 'Output:' section is required. All return variables must be listed
# separated by ', '
definition = (
header
+ (
"def {name}({args}):".format(name=name, args=", ".join(args)),
' "Output: {}"'.format(
", ".join(output_names + ["msg"])
if return_msg
else ", ".join(output_names)
),
" result = None",
" msg = None" if return_msg else "",
" try:",
" global _compile_error",
" if _compile_error is not None:",
" raise _compile_error",
" import numpy as np",
" import pandas as pd",
)
+ middle
+ (
" result = tuple(result.ravel()) if hasattr(result, "
'"ravel") else tuple(result)',
" if len(result) == 0:",
" result = tuple(None for i in range(%s))" % len(output_names),
' elif "numpy" in str(type(result[0])):',
" result = tuple(np.asscalar(i) for i in result)",
" except Exception as e:",
" from traceback import format_exc",
" msg = str(e) + format_exc()" if return_msg else "",
" if result is None:",
" result = tuple(None for i in range(%s))" % len(output_names),
" return result + (msg, )",
)
)
return "\n".join(definition)
[docs]
@versionadded(version="1.5")
def wrap_predict_method(func, variables, **kwargs):
"""Create a PyMAS wrapper designed for Sci-kit's `.predict` methods.
Parameters
----------
func : Callable or str
Function name or an instance of Function which will be wrapped. Assumed
to behave as `.predict()` methods.
variables : list of DS2Variable
Input and output variables for the function
**kwargs
Will be passed to :meth:`build_wrapper_function`.
Returns
-------
str
the Python definition for the wrapper function.
See Also
--------
build_wrapper_function
"""
kwargs.setdefault("array_input", True)
kwargs.setdefault("name", "predict")
return build_wrapper_function(func, variables, **kwargs)
[docs]
@versionadded(version="1.5")
def wrap_predict_proba_method(func, variables, **kwargs):
"""Create a PyMAS wrapper designed for Sci-kit's `.predict_proba` methods.
Parameters
----------
func : Callable or str
Function name or an instance of Function which will be wrapped. Assumed
to behave as `.predict_proba()` methods.
variables : list of DS2Variable
Input and output variables for the function
**kwargs
Will be passed to `build_wrapper_function`.
Returns
-------
str
the Python definition for the wrapper function.
See Also
--------
build_wrapper_function
"""
kwargs.setdefault("array_input", True)
kwargs.setdefault("name", "predict_proba")
wrapper = build_wrapper_function(func, variables, **kwargs)
old_code = r"if result.size == 1:\s*result = np.asscalar\(result\)"
new_code = "assert result.shape[0] == 1\n"
new_code += " result = tuple(result[0].tolist())"
return re.sub(old_code, new_code, wrapper)
[docs]
@versionchanged("Return code and message are disabled by default.", version="1.5")
def from_inline(
func, input_types=None, array_input=False, return_code=None, return_message=None
):
"""Creates a PyMAS wrapper to execute the inline python function.
Parameters
----------
func : Callable
A Python function object to be used
input_types : list of type, optional
The expected type for each input value of `func`. Can be ommitted if
`func` includes type hints.
array_input : bool
Whether the function inputs should be treated as an array instead of
individual parameters
return_code : bool
Deprecated.
Whether the DS2-generated return code should be included
return_message : bool
Deprecated.
Whether the DS2-generated return message should be included
Returns
-------
PyMAS
Generated DS2 code which can be executed in a SAS scoring environment
"""
if return_code is not None or return_message is not None:
raise DeprecationWarning(
"The 'return_code' and 'return_message' "
"parameters are ignored and will be "
"removed completely in a future version"
)
obj = pickle.dumps(func)
return from_pickle(obj, None, input_types, array_input, return_code, return_message)
[docs]
@versionchanged("Return code and message are disabled by default.", version="1.5")
def from_python_file(
file,
func_name=None,
input_types=None,
array_input=False,
return_code=None,
return_message=None,
):
"""Creates a PyMAS wrapper to execute a function defined in an
external .py file.
Parameters
----------
file : str
The path to a python source code file
func_name : str
Name of the target function to call
input_types : list of type, optional
The expected type for each input value of the target function.
Can be ommitted if target function includes type hints.
array_input : bool
Whether the function inputs should be treated as an array instead of
individual parameters
return_code : bool
Deprecated
Whether the DS2-generated return code should be included
return_message : bool
Deprecated
Whether the DS2-generated return message should be included
Returns
-------
PyMAS
Generated DS2 code which can be executed in a SAS scoring environment
"""
if return_code is not None or return_message is not None:
raise DeprecationWarning(
"The 'return_code' and 'return_message' "
"parameters are ignored and will be "
"removed completely in a future version"
)
if not str(file.lower().endswith(".py")):
raise ValueError("File {} does not have a .py extension.".format(file))
# Extract just the filename from the path
directory, file_name = os.path.split(file)
# Drop the file extension to get the module name
module_name = os.path.splitext(file_name)[0]
# Temporarily add the module location to Python's path
sys.path.append(directory)
module = importlib.import_module(module_name)
sys.path.pop()
target_func = getattr(module, func_name)
if not callable(target_func):
raise RuntimeError("Could not find a valid function named %s" % func_name)
with open(file, "r") as f:
code = [line.strip("\n") for line in f.readlines()]
return _build_pymas(target_func, None, input_types, array_input, code=code)
[docs]
@versionchanged("Return code and message are disabled by default.", version="1.5")
def from_pickle(
file,
func_name=None,
input_types=None,
array_input=False,
return_code=None,
return_message=None,
):
"""Create a deployable DS2 package from a Python pickle file.
Parameters
----------
file : str or bytes or io.BytesIO
Pickled object to use. String is assumed to be a path to a picked
file, file_like is assumed to be an open file handle to a pickle
object, and bytes is assumed to be the raw pickled bytes.
func_name : str
Name of the target function to call
input_types : pandas.DataFrame, type, list of type, or dict of str, optional
The expected type for each input value of the target function.
Can be omitted if target function includes type hints. If a DataFrame
is provided, the columns will be inspected to determine type information.
If a single type is provided, all columns will be assumed to be that type,
otherwise a list of column types or a dictionary of column_name: type
may be provided.
array_input : bool
Whether the function inputs should be treated as an array instead of
individual parameters
return_code : bool
Deprecated.
Whether the DS2-generated return code should be included
return_message : bool
Deprecated.
Whether the DS2-generated return message should be included
Returns
-------
PyMAS
Generated DS2 code which can be executed in a SAS scoring environment
"""
if return_code is not None or return_message is not None:
raise DeprecationWarning(
"The 'return_code' and 'return_message' "
"parameters are ignored and will be "
"removed completely in a future version"
)
try:
# In Python2 str could either be a path or the binary pickle data,
# so check if its a valid filepath too.
is_file_path = isinstance(file, str) and os.path.isfile(file)
except TypeError:
is_file_path = False
# Path to a pickle file
if is_file_path:
with open(file, "rb") as f:
obj = pickle.load(f) # skipcq BAN-B301
# The actual pickled bytes
elif isinstance(file, bytes):
obj = pickle.loads(file) # skipcq BAN-B301
else:
obj = pickle.load(file) # skipcq BAN-B301
# Encode the pickled data so we can inline it in the DS2 package
pkl = base64.b64encode(pickle.dumps(obj))
code = (
"import pickle, base64",
# Replace b' with " before embedding in DS2.
"bytes = {}".format(pkl).replace("'", '"'),
"obj = pickle.loads(base64.b64decode(bytes))",
)
return _build_pymas(
obj, func_name, input_types, array_input, func_prefix="obj.", code=code
)
def _build_pymas(
obj,
func_name=None,
input_types=None,
array_input=False,
func_prefix=None,
return_code=None,
return_message=None,
code=None,
):
"""
Parameters
----------
obj
func_name
input_types
array_input
return_code
return_message
code
Returns
-------
"""
code = code or []
if not isinstance(func_name, list):
func_name = [func_name]
def parse_function(obj, func_name):
# If the object passed was a function, no need to search for
# target function
if callable(obj) and (func_name is None or obj.__name__ == func_name):
target_func = obj
elif func_name is None:
raise ValueError("Parameter `func_name` must be specified.")
else:
target_func = getattr(obj, func_name)
if not callable(target_func):
raise RuntimeError("Could not find a valid function named %s" % func_name)
target_func_name = target_func.__name__
if target_func_name.lower() == "predict_proba":
names = "P_"
else:
names = None
# Need to create DS2Variable instances to pass to PyMAS
if hasattr(input_types, "columns"):
# Assuming input is a DataFrame representing model inputs. Use to
# get input variables
vars = ds2_variables(input_types)
# Run one observation through the model and use the result to
# determine output variables
output = target_func(input_types.head(1))
output_vars = ds2_variables(output, output_vars=True, names=names)
vars.extend(output_vars)
elif isinstance(input_types, type):
params = OrderedDict(
[(k, input_types) for k in target_func.__code__.co_varnames]
)
vars = ds2_variables(params)
elif isinstance(input_types, dict):
vars = ds2_variables(input_types)
else:
# Inspect the Python method to determine arguments
vars = ds2_variables(target_func)
if not any(v for v in vars if v.out):
vars.append(DS2Variable(name="result", type="float", out=True))
return target_func_name, vars
# Get variables for each function
temp = [parse_function(obj, f) for f in func_name]
target_func, variables = zip(*temp)
# Convert from tuples to lists
target_func = list(target_func)
variables = list(variables)
return PyMAS(
target_func, variables, code, array_input=array_input, func_prefix=func_prefix
)
[docs]
class PyMAS:
"""
Parameters
----------
target_function : str
The Python function to be executed.
variables : list of DS2Variable
The input/ouput variables be declared in the module.
python_source : str
Additional Python code to be executed during setup.
return_code : bool
Deprecated.
Whether the DS2-generated return code should be included.
return_msg : bool
Deprecated.
Whether the DS2-generated return message should be included.
**kwargs
Passed to :func:`build_wrapper_function`.
"""
def __init__(
self,
target_function,
variables,
python_source,
return_code=None,
return_msg=None,
func_prefix=None,
**kwargs
):
if return_code is not None or return_msg is not None:
raise DeprecationWarning(
"The 'return_code' and 'return_msg' "
"parameters are ignored and will be "
"removed completely in a future version"
)
func_prefix = func_prefix or ""
if not isinstance(target_function, list):
target_function = [target_function]
variables = [variables]
return_code = False
return_msg = False
# Replicate parameter for each function to be exposed
if isinstance(return_code, bool):
return_code = [return_code] * len(target_function)
# Replicate parameter for each function to be exposed
if isinstance(return_msg, bool):
return_msg = [return_msg] * len(target_function)
self.wrapper = []
wrapper_names = []
for func, vars, code, msg in zip(
target_function, variables, return_code, return_msg
):
wrapper_names.append("_" + random_string(20))
if func.lower() == "predict":
lines = wrap_predict_method(
func_prefix + func,
vars,
setup=python_source,
name=wrapper_names[-1],
**kwargs
)
elif func.lower() == "predict_proba":
lines = wrap_predict_proba_method(
func_prefix + func,
vars,
name=wrapper_names[-1],
setup=python_source,
**kwargs
)
else:
lines = build_wrapper_function(
func_prefix + func,
vars,
name=wrapper_names[-1],
setup=python_source,
**kwargs
)
# Add DS2 variables for returning error codes/messages
# NOTE: add these *after* wrapper function is generated to prevent
# double-counting them.
if code:
vars.append(DS2Variable(name="rc", type="int32", out=True))
if msg:
vars.append(DS2Variable(name="msg", type="char", out=True))
# Clear setup code once it's been added once. No need to duplicate
# if multiple functions are defined.
python_source = None
self.wrapper.extend(lines.split("\n"))
self.variables = variables[0]
# self.return_code = return_code[0]
self.return_message = return_msg[0]
self.package = DS2PyMASPackage(self.wrapper)
for idx, func in enumerate(target_function):
self.package.add_method(func, wrapper_names[idx], variables[idx])
[docs]
@versionchanged(version="1.4", reason="Added `dest='Python'` option")
def score_code(self, input_table=None, output_table=None, columns=None, dest="MAS"):
"""Generate DS2 score code
Parameters
----------
input_table : str
The name of the table to execute the function against
output_table : str
The name of the table where execution results will be written
columns : list of str
Names of the columns from `table` that will be passed to `func`
dest : str
Choose from ``{'MAS', 'EP', 'CAS', 'Python'}``.
Specifies the publishing destination for the score code to ensure
that compatible code is generated.
Returns
-------
str
Score code
"""
dest = dest.upper()
# Check for names that could result in DS2 errors.
DS2_KEYWORDS = ["input", "output"]
for k in DS2_KEYWORDS:
if input_table and k == input_table.lower():
raise ValueError(
"Input table name `{}` is a reserved term.".format(input_table)
)
if output_table and k == output_table.lower():
raise ValueError(
"Output table name `{}` is a reserved term.".format(output_table)
)
# Get package code
code = tuple(self.package.code().split("\n"))
if dest == "EP":
code = (
("data sasep.out;",)
+ code
+ (
" method run();",
" set SASEP.IN;",
" end;",
" method term();",
" end;",
"enddata;",
)
)
elif dest == "CAS":
thread = DS2Thread(
self.variables,
input_table,
column_names=columns,
return_message=self.return_message,
package=self.package,
)
code += (
str(thread),
"data SASEP.out;",
" dcl thread {} t;".format(thread.name),
" method run();",
" set from t;",
" output;",
" end;",
"enddata;",
)
elif dest == "PYTHON":
# Python code return
code = self.package._python_code
return "\n".join(code)