#!/usr/bin/env python
# encoding: utf-8
#
# Copyright SAS Institute
#
# Licensed under the Apache License, Version 2.0 (the License);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import unittest
import warnings
import sasoptpy as so
from inspect import cleandoc
from sasoptpy.actions import read_data
from tests.swat_config import create_cas_connection
[docs]class TestReadData(unittest.TestCase):
"""
Unit tests for READ DATA statements
"""
@classmethod
def setUpClass(cls):
so.reset()
cls.conn = None
from swat import CAS, SWATError
try:
cls.conn = create_cas_connection()
except SWATError:
warnings.warn('CAS connection is not available', RuntimeWarning)
except TypeError:
warnings.warn('CAS variables are not available', RuntimeWarning)
@classmethod
def tearDownClass(cls):
if cls.conn is not None:
cls.conn.close()
def setUp(self):
pass
def test_read_regular_existing(self):
from sasoptpy import Workspace, VariableGroup
from sasoptpy.abstract import Set, ParameterGroup
from sasoptpy.actions import solve
with Workspace('test_workspace') as ws:
ITEMS = Set(name='ITEMS')
value = ParameterGroup(ITEMS, name='value', init=0)
get = VariableGroup(ITEMS, name='get', vartype=so.INT, lb=0)
read_data(
table="values",
index={'target': ITEMS, 'key': None},
columns=[{'target': value}])
solve(options={'with': so.BLACKBOX, 'maxgen': 10})
optmodel_code = so.to_optmodel(ws)
self.assertEqual(optmodel_code, cleandoc("""
proc optmodel;
set ITEMS;
num value {ITEMS} init 0;
var get {{ITEMS}} integer >= 0;
read data values into ITEMS value;
solve with blackbox / maxgen=10;
quit;
"""))
def test_read_data_index(self):
with so.Workspace('test_read_data_n') as ws:
ASSETS = so.Set(name='ASSETS')
ret = so.ParameterGroup(ASSETS, name='return', ptype=so.NUM)
read_data(
table='means',
index={'target': ASSETS, 'key': so.N},
columns=[{'target': ret}]
)
optmodel_code = so.to_optmodel(ws)
self.assertEqual(optmodel_code, cleandoc("""
proc optmodel;
set ASSETS;
num return {ASSETS};
read data means into ASSETS=[_N_] return;
quit;
"""))
def test_read_data_no_index_expression(self):
from sasoptpy.util import iterate
with so.Workspace('test_read_data_no_index_expression') as ws:
ASSETS = so.Set(name='ASSETS')
cov = so.ParameterGroup(ASSETS, ASSETS, name='cov', init=0)
with iterate(ASSETS, 'asset1') as asset1, iterate(ASSETS, 'asset2') as asset2:
read_data(
table='covdata',
index={'key': [asset1, asset2]},
columns=[
{'target': cov},
{'target': cov[asset2, asset1],
'column': 'cov'}
]
)
optmodel_code = so.to_optmodel(ws)
self.assertEqual(optmodel_code, cleandoc("""
proc optmodel;
set ASSETS;
num cov {ASSETS, ASSETS} init 0;
read data covdata into [asset1 asset2] cov cov[asset2, asset1]=cov;
quit;
"""))
def test_read_data_col(self):
from sasoptpy.util import concat
with so.Workspace('test_read_data_col') as ws:
n = so.Parameter(name='n', init=2)
indx = so.Set(name='indx', settype=['num'])
p = so.ParameterGroup(indx, name='p')
q = so.ParameterGroup(indx, name='q')
read_data(
table='exdata',
index={
'target': indx,
'key': so.N
},
columns=[
{'target': p, 'column': 'column1'},
{'target': q, 'column': concat('column', n)}
]
)
optmodel_code = so.to_optmodel(ws)
self.assertEqual(optmodel_code, cleandoc("""
proc optmodel;
num n init 2;
set indx;
num p {indx};
num q {indx};
read data exdata into indx=[_N_] p=column1 q=col('column' || n);
quit;
"""))
def test_read_data_indexed_column(self):
from sasoptpy.util import iterate, concat
with so.Workspace(name='test_read_data_idx_col') as ws:
dow = so.Set(name='DOW', value=so.exp_range(1, 6))
locs = so.Set(name='LOCS', settype=so.STR)
demand = so.ParameterGroup(locs, dow, name='demand')
with iterate(locs, name='loc') as loc:
r = read_data(
table='dmnd',
index={'target': locs, 'key': loc}
)
with iterate(dow, name='d') as d:
r.append({
'index': d,
'target': demand[loc, d],
'column': concat('day', d)
})
optmodel_code = so.to_optmodel(ws)
self.assertEqual(optmodel_code, cleandoc("""
proc optmodel;
set DOW = 1..5;
set <str> LOCS;
num demand {LOCS, DOW};
read data dmnd into LOCS=[loc] {d in DOW} < demand[loc, d]=col('day' || d) >;
quit;"""))
def test_with_cas_data(self):
if TestReadData.conn is None:
self.skipTest('No session is available')
from sasoptpy.util import concat
import pandas as pd
df = pd.DataFrame([
[1, 2], [3, 4]
], columns=['column1', 'column2'])
s = TestReadData.conn
exdata = s.upload_frame(df, casout='exdata')
with so.Workspace('test_read_data_col', session=s) as ws:
n = so.Parameter(name='n', init=2)
indx = so.Set(name='indx', settype=['num'])
p = so.ParameterGroup(indx, name='p')
q = so.ParameterGroup(indx, name='q')
read_data(
table=exdata,
index={
'target': indx,
'key': so.N
},
columns=[
{'target': p, 'column': 'column1'},
{'target': q, 'column': concat('column', n)}
]
)
from sasoptpy.actions import print_item
print_item(p, q)
optmodel_code = so.to_optmodel(ws)
self.assertEqual(optmodel_code, cleandoc("""
proc optmodel;
num n init 2;
set indx;
num p {indx};
num q {indx};
read data EXDATA into indx=[_N_] p=column1 q=col('column' || n);
print p q;
quit;
"""))
ws.submit(verbose=True)
self.assertEqual(ws.response['Print1.PrintTable'].to_string(), cleandoc(
"""
COL1 p q
0 1.0 1.0 2.0
1 2.0 3.0 4.0
"""
))
def test_read_data_no_index_target(self):
with so.Workspace('test_no_index_target') as ws:
NODES = so.Set(name='NODES', settype=so.STR)
nodesByPri = so.ParameterGroup(NODES, name='nodesByPri')
read_data(table='temppri',
index={'key': so.N},
columns=[
{'target': nodesByPri, 'column': 'id'}
])
self.assertEqual(so.to_optmodel(ws), cleandoc("""
proc optmodel;
set <str> NODES;
num nodesByPri {NODES};
read data temppri into [_N_] nodesByPri=id;
quit;
"""))
def test_read_data_same_name_cols(self):
from sasoptpy.actions import set_objective, solve
with so.Workspace('test_same_name_cols') as ws:
x = so.Variable(name='x')
y = so.Variable(name='y')
c_xx = so.Parameter(name='c_xx')
c_x = so.Parameter(name='c_x')
c_y = so.Parameter(name='c_y')
c_xy = so.Parameter(name='c_xy')
c_yy = so.Parameter(name='c_yy')
read_data(table='coeff',
index={},
columns=[
{'target': c_xx, 'column': 'c_xx'},
{'target': c_x},
{'column': 'c_y'},
{'target': c_xy, 'column': 'c_xy'},
{'target': c_yy}
])
set_objective(c_xx * x**2 + c_x * x + c_xy * x * y + c_yy * y**2,
name='z', sense=so.MIN)
solve()
self.assertEqual(so.to_optmodel(ws), cleandoc("""
proc optmodel;
var x;
var y;
num c_xx;
num c_x;
num c_y;
num c_xy;
num c_yy;
read data coeff into c_xx c_x c_y c_xy c_yy;
MIN z = c_xx * (x) ^ (2) + c_x * x + c_xy * x * y + c_yy * (y) ^ (2);
solve;
quit;
"""))
def test_read_data_var_bound(self):
from sasoptpy.util import iterate
with so.Workspace('test_read_data_var_bound') as ws:
plants = so.Set(name='plants', settype=so.STR)
prod = so.VariableGroup(plants, name='prod', lb=0)
cost = so.Parameter(name='cost', ptype=so.NUM)
p = so.SetIterator(plants, name='p')
r = read_data(
table='pdat',
index={'target': plants, 'key': p},
columns=[
{'target': prod[p].ub, 'column': 'maxprod'},
{'target': cost}
])
self.assertEqual(so.to_optmodel(ws), cleandoc("""
proc optmodel;
set <str> plants;
var prod {{plants}} >= 0;
num cost;
read data pdat into plants=[p] prod[p].ub=maxprod cost;
quit;
"""))
def test_read_data_with_exp(self):
with so.Workspace('test_read_data_with_exp') as ws:
ss = so.Set(name='subscripts', value=so.exp_range(1, 5), settype=so.NUM)
letter = so.ParameterGroup(ss, name='letter', ptype=so.STR)
read_data(
table='abcd',
index={'key': so.N},
columns=[
{'target': letter[5-so.N]}
]
)
self.assertEqual(so.to_optmodel(ws), cleandoc("""
proc optmodel;
set subscripts = 1..4;
str letter {subscripts};
read data abcd into [_N_] letter[- _N_ + 5];
quit;
"""))
def test_read_data_append_later(self):
from sasoptpy.util import concat, iterate
with so.Workspace('let') as ws:
actors = so.Set(name='ACTORS')
actor_name = so.ParameterGroup(actors, ptype=so.STR,
name='actor_name')
daily_fee = so.ParameterGroup(actors, name='daily_fee')
most_scenes = so.Parameter(name='most_scenes', value=9)
scene_list = so.ParameterGroup(actors, so.exp_range(1, most_scenes),
name='scene_list')
r = read_data(
table='scene',
index={
'target': actors,
'key': so.N
},
columns=[
{
'target': actor_name,
'column': 'Actor'
},
{
'target': daily_fee,
'column': 'DailyFee'
}
]
)
with iterate(so.exp_range(1, most_scenes), name='j') as j:
r.append({
'index': j,
'target': scene_list[so.N, j],
'column': concat('S_Var', j)
})
self.assertEqual(so.to_optmodel(ws), cleandoc("""
proc optmodel;
set ACTORS;
str actor_name {ACTORS};
num daily_fee {ACTORS};
num most_scenes = 9;
num scene_list {ACTORS, 1..most_scenes};
read data scene into ACTORS=[_N_] actor_name=Actor daily_fee=DailyFee {j in 1..most_scenes} < scene_list[_N_, j]=col('S_Var' || j) >;
quit;
"""))
def test_read_data_multi_index(self):
with so.Workspace('test_read_data_multi_index') as ws:
arcs = so.Set(name='ARCS', settype=[so.STR, so.STR])
arc_lower = so.ParameterGroup(arcs, name='arcLower')
arc_upper = so.ParameterGroup(arcs, name='arcUpper')
arc_cost = so.ParameterGroup(arcs, name='arcCost')
read_data(
table='arcdata',
index={
'target': arcs,
'key': ['_tail_', '_head_']
},
columns=[
{'target': arc_lower, 'column': '_lo_'},
{'target': arc_upper, 'column': '_capac_'},
{'target': arc_cost, 'column': '_cost_'}
]
)
self.assertEqual(so.to_optmodel(ws), cleandoc("""
proc optmodel;
set <str, str> ARCS;
num arcLower {ARCS};
num arcUpper {ARCS};
num arcCost {ARCS};
read data arcdata into ARCS=[_tail_ _head_] arcLower=_lo_ arcUpper=_capac_ arcCost=_cost_;
quit;
"""))
def test_read_data_single_col(self):
from sasoptpy.util import concat, iterate
with so.Workspace('test_read_data_single_col') as ws:
daily_employee_slots = so.Set(
name='DailyEmployeeSlots', settype=[so.STR, so.NUM])
weekdays = so.Set(
name='WeekDays', value=['mon', 'tue', 'wed', 'thu', 'fri'])
preference_weights = so.ParameterGroup(
daily_employee_slots, weekdays, name='PreferenceWeights')
with iterate(daily_employee_slots, name=['name', 'slot']) as keys:
r = read_data(
table='preferences',
index={
'target': daily_employee_slots,
'key': keys
}
)
with iterate(weekdays, name='day') as day:
r.append({
'index': day,
'target': preference_weights[keys['name'], keys['slot'], day],
'column': day
})
optmodel_code = so.to_optmodel(ws)
self.assertEqual(optmodel_code, cleandoc("""
proc optmodel;
set <str, num> DailyEmployeeSlots;
set WeekDays = {'mon','tue','wed','thu','fri'};
num PreferenceWeights {DailyEmployeeSlots, WeekDays};
read data preferences into DailyEmployeeSlots=[name slot] {day in WeekDays} < PreferenceWeights[name, slot, day]=col(day) >;
quit;
"""))
def test_read_data_N_in_index(self):
from sasoptpy.util import concat
with so.Workspace('read_data_N_as_index') as ws:
tasks = so.Set(name='TASKS', value=so.exp_range(1, 25))
machines = so.Set(name='MACHINES', value=so.exp_range(1, 9))
profit = so.ParameterGroup(machines, tasks, name='profit')
j = so.SetIterator(tasks, name='j')
read_data(
table='profit_data',
index={'key': so.N},
columns=[
{'index': j, 'target': profit[so.N, j], 'column': concat('p', j)}
]
)
self.assertEqual(so.to_optmodel(ws), cleandoc("""
proc optmodel;
set TASKS = 1..24;
set MACHINES = 1..8;
num profit {MACHINES, TASKS};
read data profit_data into [_N_] {j in TASKS} < profit[_N_, j]=col('p' || j) >;
quit;
"""))
def test_with_model(self):
m = so.Model(name='m')
ITEMS = m.add_set(name='ITEMS')
value = m.add_parameter(ITEMS, name='value', init=0)
weight = m.add_parameter(ITEMS, name='weight')
limit = m.add_parameter(ITEMS, name='limit')
get = m.add_variables(ITEMS, name='get', vartype=so.INT, lb=0)
m.set_objective(so.expr_sum(get[i] for i in ITEMS), name='max_get', sense=so.MAX)
m.include(read_data(table='values', index={'target': ITEMS, 'key': None},
columns=[value, weight, limit]))
self.assertEqual(
so.to_optmodel(m, options={'with': so.BLACKBOX, 'maxgen': 10}),
cleandoc('''
proc optmodel;
set ITEMS;
num value {{ITEMS}} init 0;
num weight {{ITEMS}};
num limit {{ITEMS}};
var get {{ITEMS}} integer >= 0;
max max_get = sum {i in ITEMS} (get[i]);
read data values into ITEMS value weight limit;
solve with blackbox / maxgen=10;
quit;'''))
def tearDown(self):
pass