Source code for tests.interface.test_cas_interface

#!/usr/bin/env python
# encoding: utf-8
#
# Copyright SAS Institute
#
#  Licensed under the Apache License, Version 2.0 (the License);
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

from inspect import cleandoc
import os
import unittest
import warnings

from swat import CAS, SWATError

import sasoptpy as so

from tests.swat_config import create_cas_connection


class Arbitrary:

    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)


class MockCASResponse:

    def __init__(self, **kwargs):
        self.status = kwargs.get('status', 'OK')
        self.solutionStatus = kwargs.get('solutionStatus', 'OPTIMAL')

    def get_tables(self, *args):
        return [self.status]

    def keys(self):
        return list()


class MockCASServer:
    """
    Represents CAS Server for unit tests.
    """

    def __init__(self, **kwargs):
        so.util.get_session_type = self.get_session_type
        so.util.package_utils.get_session_type = self.get_session_type
        self.response = kwargs.get('response', MockCASResponse())
        self.optimization = Arbitrary()

    def get_session_type(self, *args, **kwargs):
        return 'CAS'

    def loadactionset(self, *args, **kwargs):
        pass

    def upload_frame(self, *args, **kwargs):
        return Arbitrary(name='name')

    def runOptmodel(self, *args, **kwargs):
        return self.response

    def solveLp(self, *args, **kwargs):
        return self.response

    def solveMilp(self, *args, **kwargs):
        return self.response


[docs]class TestCASInterface(unittest.TestCase): """ Unit tests for the CAS interface """ @classmethod def setUpClass(cls): so.reset() cls.conn = None try: cls.conn = create_cas_connection() except SWATError: warnings.warn('CAS connection is not available', RuntimeWarning) except TypeError: warnings.warn('CAS variables are not available', RuntimeWarning) @classmethod def tearDownClass(cls): if cls.conn is not None: cls.conn.close() def setUp(self): self.original_get_type = so.util.get_session_type self.original_package_get_type = so.util.package_utils.get_session_type def tearDown(self): so.util.get_session_type = self.original_get_type so.util.package_utils.get_session_type = self.original_package_get_type def test_unbounded(self): if not TestCASInterface.conn: self.skipTest('No session is available') m = so.Model(name='m', session=TestCASInterface.conn) x = m.add_variable(name='x', lb=0) m.set_objective(x, sense=so.MAX, name='obj') def warn_user(): m.solve(frame=True, name='unbounded_test') self.assertWarns(UserWarning, warn_user) def test_forced_optmodel(self): if not TestCASInterface.conn: self.skipTest('No session is available') # Forced to OPTMODEL due abstract elements m = so.Model(name='test_forced_optmodel', session=TestCASInterface.conn) S = so.Set(name='S') m.include(S) x = m.add_variable(name='x', ub=1) m.set_objective(2*x, sense=so.MAX, name='obj') def warn_abstract(): response = m.solve(mps=True, submit=False) self.assertIn('set S;', response) self.assertWarns(UserWarning, warn_abstract) # Uploaded MPS Table m.drop(S) response = m.solve(mps=True, submit=False) self.assertIn('CASTable', str(response)) # Forced to OPTMODEL due nonlinearity m.set_objective(x**2, sense=so.MAX, name='obj2') def warn_nonlinear(): response = m.solve(mps=True, submit=False) self.assertIn('var x', response) self.assertWarns(UserWarning, warn_nonlinear) # Forced to give error def force_decomp(): m.solve(mps=True, options={'decomp': 'USER'}) self.assertRaises(RuntimeError, force_decomp) # No objective m.solve(mps=True, verbose=True, name='no_obj') def test_regular_lp(self): if not TestCASInterface.conn: self.skipTest('No session is available') m = so.Model(name='test_regular_lp', session=TestCASInterface.conn) x = m.add_variable(name='x', lb=1) c = m.add_constraint(x <= 3, name='c') m.set_objective(x, sense=so.MIN, name='obj') m.solve(name='simple_lp') self.assertEqual(m.get_solution_summary().loc['Solver', 'Value'], 'LP') self.assertEqual(x.get_value(), 1) def test_primalin(self): if TestCASInterface.conn is None: self.skipTest('No session is available.') session = TestCASInterface.conn from sasoptpy import integer, minimize m = so.Model(name='test_primalin_mps', session=session) x = m.add_variable(name='x', vartype=integer, ub=9.5, lb=0) y = m.add_variable(name='y', vartype=integer, ub=5, lb=2) m.add_constraint(x + 2 * y >= 1, name='c1') m.add_constraint(x - 2 * y >= 2, name='c2') o = m.set_objective(x + y, sense=minimize, name='obj') m.solve(mps=True) self.assertEqual(so.get_value_table(x, y).to_string(), cleandoc(''' x y - 6.0 2.0 ''')) m.set_objective(x + 1.1 * y, sense=minimize, name='obj2') m.solve(mps=True, primalin=True) self.assertEqual( session.CASTable('PRIMALINTABLE').to_frame().to_string(), cleandoc( ''' _VAR_ _VALUE_ 0 x 6.0 1 y 2.0 ''' )) # Drop option m.solve(mps=True, primalin=True, drop=True) def deleted_table(): s = session.CASTable('PRIMALINTABLE').to_frame() self.assertRaises(SWATError, deleted_table) def test_response(self): if TestCASInterface.conn is None: self.skipTest('No session is available.') from sasoptpy import maximize, integer m = so.Model(name='test_response', session=TestCASInterface.conn) x = so.Variable(name='x', lb=1) o = so.Objective(x ** 2, sense=maximize, name='obj') m.include(x, o) m.solve(verbose=True) self.assertEqual(str(m.response.solutionStatus), 'UNBOUNDED') self.assertEqual(m._objval, None) def test_lp(self): if TestCASInterface.conn is None: self.skipTest('No session is available.') m = so.Model(name='model_lp', session=TestCASInterface.conn) x = m.add_variable(name='x', lb=1, ub=4) y = m.add_variable(name='y', lb=4, ub=10) m.add_constraint(x + y == 8, name='c1') m.set_objective(x-y, sense=so.MIN, name='obj') m.solve(mps=True, verbose=True) self.assertEqual(x.get_value(), 1) def test_workspace(self): if not TestCASInterface.conn: self.skipTest('No session is available') with so.Workspace('w', session=TestCASInterface.conn) as w: x = so.VariableGroup(3, name='x') c = so.ConstraintGroup((x[i] <= i for i in range(3)), name='c') o = so.Objective(x[1], name='obj', sense=so.MAX) s = so.actions.solve() self.assertEqual(so.to_optmodel(w), cleandoc(''' proc optmodel; var x {{0,1,2}}; con c_0 : x[0] <= 0; con c_1 : x[1] <= 1; con c_2 : x[2] <= 2; max obj = x[1]; solve; quit;''')) r = w.submit() self.assertIsNotNone(s.get_solution_summary()) self.assertIsNotNone(s.get_problem_summary()) self.assertEqual(s.get_solution_summary().loc['Solver', 'Value'], 'LP') self.assertEqual(r.to_string(), cleandoc(''' i var value lb ub rc 0 1.0 x[0] 0.0 -1.797693e+308 1.797693e+308 0.0 1 2.0 x[1] 1.0 -1.797693e+308 1.797693e+308 0.0 2 3.0 x[2] 2.0 -1.797693e+308 1.797693e+308 0.0''')) self.assertEqual(x[1].get_value(), 1) def test_workspace_errors(self): mockResponse = MockCASResponse(status='Syntax Error') mockSession = MockCASServer(response=mockResponse) with so.Workspace('w', session=mockSession) as w: x = so.Variable(name='x') def produce_syntax_error(): r = w.submit() self.assertRaises(SyntaxError, produce_syntax_error) mockResponse = MockCASResponse(status='Semantic Error') mockSession = MockCASServer(response=mockResponse) mockSession.optimization = Arbitrary(runoptmodel=True) m = so.Model(name='test_runtime', session=mockSession) m.add_variable(name='x') def produce_runtime_error(): m.solve() self.assertRaises(RuntimeError, produce_runtime_error) with so.Workspace('w', session=mockSession) as w: x = so.Variable(name='x') def produce_runtime_error_for_ws(): r = w.submit() self.assertRaises(RuntimeError, produce_runtime_error_for_ws) def test_errors_and_warnings(self): s = MockCASServer() m = so.Model(name='test_errors_for_cas', session=s) x = m.add_variable(name='x', lb=0) m.set_objective(x ** 2, name='obj', sense=so.MIN) def produce_runtime_error(): m.solve(frame=False) self.assertRaises(RuntimeError, produce_runtime_error) m.set_objective(x, name='obj', sense=so.MIN) s = MockCASServer(response=MockCASResponse(status=['ERROR'])) m.set_session(s) def produce_error_status(): m.solve(frame=True) self.assertRaises(RuntimeError, produce_error_status) def test_tuner(self): import pandas as pd if TestCASInterface.conn is None: self.skipTest('No session is available') m = so.Model(name='knapsack_with_tuner', session=TestCASInterface.conn) data = [ ['clock', 8, 4, 3], ['mug', 10, 6, 5], ['headphone', 15, 7, 2], ['book', 20, 12, 10], ['pen', 1, 1, 15] ] df = pd.DataFrame(data, columns=['item', 'value', 'weight', 'limit']).set_index(['item']) ITEMS = df.index value = df['value'] weight = df['weight'] limit = df['limit'] total_weight = 55 # Variables get = m.add_variables(ITEMS, name='get', vartype=so.INT) # Constraints m.add_constraints((get[i] <= limit[i] for i in ITEMS), name='limit_con') m.add_constraint( so.expr_sum(weight[i] * get[i] for i in ITEMS) <= total_weight, name='weight_con') # Objective total_value = so.expr_sum(value[i] * get[i] for i in ITEMS) m.set_objective(total_value, name='total_value', sense=so.MAX) results = m.tune_parameters(tunerParameters={'maxConfigs': 10}) self.assertEqual(len(results), 10) self.assertTrue('Configuration' in results.iloc[0].index)