Contains the convenience collection pattern UnitCollection.
This is used to hold all of the isisunit objects loaded from the dat
Provides convenience methods for retrieving units and getting key
meta-data on the units held in this collection.
Duncan Runnacles
01 Apr 2016
Duncan Runnacles 2016
from __future__ import unicode_literals
import os
from datetime import datetime
from ship.fmp.datunits.isisunit import AUnit
from ship.fmp.datunits import ROW_DATA_TYPES as rdt
from ship.fmp.datunits.isisunit import CommentUnit
from ship.fmp import fmpunitfactory as iuf
from ship.fmp import unitgroups as ugroups
from ship.utils import utilfunctions as uf
from ship.utils import filetools as ft
import logging
logger = logging.getLogger(__name__)
"""logging references with a __name__ set to this module."""
[docs]class DatCollection(object):
"""Collection of isisunit type classes.
This is a sort of composite/facade for all of the isisunit concrete
classes loaded.
Each unit that is loaded is added to this class. They can then be accessed
through the convenience methods outlined here.
def __init__(self, path_holder):
Setup the list that will hold the units.
path_holder (PathHolder): object containing the references to the
file path details of the .dat file.
See Also:
PathHolder class.
self.units = []
self.path_holder = path_holder
self._ic_index = -999 # DON'T MESS WITH THIS!
self._gis_index = -999 # DON'T MESS WITH THIS!
self._min = 0
self._max = len(self.units)
self._current = 0
def __iter__(self):
"""Return an iterator for the units list"""
return iter(self.units)
def __next__(self):
"""Iterate to the next unit"""
if self._current > self._max or self._current < self._min:
raise StopIteration
self._current += 1
return self.units[self._current]
def __getitem__(self, key):
"""Gets a value from units using index notation.
contents of the units element at index.
return self.units[key]
# def __setitem__(self, key, value):
# """Sets a value using index notation
# Calls the setValue() function to do the hard work.
# Args:
# key (int): index to update.
# value: the value to add to the units.
# """
# self.units[key] = value
[docs] def addUnit(self, unit, index=None, **kwargs):
"""Adds a new isisunit type to the collection.
If the index value provided is greater than the index of the
InitialConditions unit the unit will be added before the IC unit. This
is the last slot in the ordering of the units in the dat file.
Accepts ``**kwargs``:
update_node_count(bool): if True will update the node count
value at the top of the .dat file. You probably want to do this.
if missing it will default to True
ics(dict): inital conditions to add for the unit being put in the
collection. If missing the default values will be applied.
These will only be applied if the unit supports initial conditions.
unit (AIsisInit): The instance to add to the collection.
index=None(int): Index to insert the unit at.
AttributeError: When a non-isisunit type is given.
if not isinstance(unit, AUnit):
raise AttributeError ('Given unit is not of type AUnit')
update_node_count = kwargs.get('update_node_count', True)
ics = kwargs.get('ics', {})
Treat initial_conditions, gis_info and header a little differently.
They are always at the top and bottom af the file.
if unit._unit_type == 'header' or unit._unit_type == 'gis_info' or \
unit._unit_type == 'initial_conditions':
if unit._unit_type == 'header':
if self.units and self.units[0]._unit_type == 'header':
self.units[0] = unit
self.units.insert(0, unit)
elif unit._unit_type == 'gis_info':
if self._gis_index == -999:
self._gis_index = len(self.units) -1
self.units[self._gis_index] = unit
# If it already exists in the collection
if self._ic_index != -999:
self.units[self._ic_index] = unit
# If gis_info unit exists put it before that, otherwise it
# goes on the end
if self._gis_index == -999:
self._ic_index = len(self.units) -1
self.units.insert(self._gis_index, unit)
self._ic_index = self._gis_index - 1
self._max = len(self.units)
All the others.
if index is None: index = len(self.units)
# Check if we need to go in front of ic and gis end of file units
if self._ic_index != -999 and index > self._ic_index:
index = self._ic_index
elif self._gis_index != -999 and index > self._gis_index:
index = self._gis_index
elif index > len(self.units):
index = None
if index is not None:
self.units.insert(index, unit)
if self._ic_index != -999: self._ic_index += 1
if self._gis_index != -999: self._gis_index += 1
self._max = len(self.units)
if not self._ic_index == -999 and update_node_count and unit.has_ics:
header = self.units[0]
temp = unit.icLabels()
for name in unit.icLabels():
ics[rdt.LABEL] = unit._name
node_count = self.units[self._ic_index].addRow(ics, unit._unit_type,
header.head_data['node_count'].value = node_count
[docs] def removeUnit(self, unit, unit_type=None, **kwargs):
"""Remove one of the units previously added to the list.
Accepts ``**kwargs``:
update_node_count(bool): if True will update the node count
value at the top of the .dat file. You probably want to do this.
if missing it will default to True
unit(str | AUnit): Can either be the AUnit.name or the
unit_type=None(str): If unit is a name str this must be provided to
ensure that the correct unit is removed. E.g. a RiverUnit and
an RefhUnit can both have the same AUnit.name value, but
different .unit_type's.
If unit is an AUnit this will be ignored.
KeyError: if the name doesn't exist.
update_node_count = kwargs.get('update_node_count', True)
if isinstance(unit, AUnit):
index = self.index(unit)
if unit_type is None:
raise AttributeError('A unit_type must be given when a unit name is supplied')
index = self.index(unit, unit_type)
if index != -1:
name = self.units[index]._name
name_ds = self.units[index]._name_ds
utype = self.units[index]._unit_type
if update_node_count:
ic = self.unit('initial_conditions')
header = self.unit('header')
for l in self.units[index].icLabels():
ic.deleteRowByName(l, utype)
except KeyError:
logger.warning('No intitial conditions found for initial conditions label: ' + name)
header.head_data['node_count'].value = ic.node_count
del self.units[index]
self._max = len(self.units)
return True
return False
[docs] def index(self, unit, unit_type=None):
"""Get the index a particular AUnit in the collection.
Either the unit itself or its name can be provided as the argument.
If a name is supplied a unit_type should also be given. This is because
some units can have the same name (e.g. river and refh) and it is not
possible to know which one to return with the name alone. If no unit_type
is given the first unit with the matching name will be returned.
unit(AUnit or str): the AUnit or the name of the AUnit
to find the index for.
unit_type=None(str): the unit_type member of the AUnit (e.g.
for a USBPR bridge the unit_category == Bridge and unit_type == 'Usbpr').
int - the index of the given unit, or -1 if it could not be found.
index = -1
if isinstance(unit, AUnit):
index = self.units.index(unit)
elif uf.isString(unit):
for i, u in enumerate(self.units):
if u.name == unit:
if unit_type == u.unit_type:
index = i
elif unit_type is None:
index = i
index = -1
return index
[docs] def getPrintableContents(self):
"""Get the formatted contents of each isisunit in the collection.
Iterates through each of the units in the collection and
calls their getData() method.
List containing all lines for each unit formatted for printing
out to the dat file.
out_data = []
logger.debug('Returning printable unit data')
# For each unit call the isisunit object and ask it
# for its .DAT file formatted text to save to file
for u in self.units:
logger.debug('Unit Type: ' + u._unit_type)
return out_data
[docs] def write(self, filepath=None, overwrite=False):
"""Write the contents of this file to disk.
Writes out to file in the format required for reading by ISIS/FMP.
If a filepath is not provided and the settings in this objects
PathHolder class have not been updated you will write over the
file that was loaded.
filepath=None(str): if a filename is provided it the file will be
written to that location. If not, the current settings in this
object path_holder object will be used.
overwrite=False(bool): if the file already exists it will raise
an IOError.
IOError - If unable to write to file.
if filepath is None:
filepath = self.path_holder.absolutePath()
if not overwrite and os.path.exists(filepath):
raise IOError('filepath %s already exists. Set overwrite=True to ignore this warning.' % filepath)
contents = self.getPrintableContents()
ft.writeFile(contents, filepath)
[docs] def unitsByCategory(self, unit_keys):
"""Return all the units in the requested unit(s).
Iterate through the collection and get all of the different categories
within the model.
Categories are defined by the AUnits. For example:
USBPR and Arch bridge units are different, but both will be
categorised as 'bridge'.
unit_keys (str | []): The unit variables defined in
the unit. Can be either a string representing a single CATEGORY
of AUnit or a list of strings for multiple types.
List containing all the specified CATEGORY of unit in the model or
False if there are none of the CATEGORY in the
if uf.isString(unit_keys):
unit_keys = [unit_keys]
types = []
for u in self.units:
if u.unit_category in unit_keys:
return types
[docs] def unitsByType(self, type_keys):
"""Return all of the units of the requested type.
Iterate through the collection and get all of the different unit types
within the model.
Types are set by the isisunit subclasses. They differentiate the
from categories by providing further definition. For example:
USBPR and ARCH bridges would both be returned in the same UNIT_CATEGORY,
but on ARCH bridges would be return using the ArchBridgeUnit.TYPE.
Use the class constants in the isisunit classes as the type key
See Also:
type_keys (str | []): The unit_type variables defined in
the unit. Can be either a string representing a single type
of AUnit or a list of strings for multiple types.
List of the specified unit type.
if uf.isString(type_keys):
type_keys = [type_keys]
types = []
for u in self.units:
if u.unit_type in type_keys:
return types
[docs] def allUnits(self):
"""Get all of the isisunit in the collection
Don't use this it is being deprecated and will probably be
removed in a later release.
list of isisunit objects
Remove this function it can be accessed through the variables or
by setting up a property if needed.
return self.units
[docs] def unit(self, key, unit_type=None, unit_category=None):
"""Fetch a unit from the collection by name.
Each isisunit in the collection is guaranteed to have a unique id.
You can access the unit if you know it's ID. The ID is the
AUnit.name variable.
Sometimes different units can have the same name (e.g. RefhUnit and
RiverUnit). This function will always return the first unit it finds.
To avoid this you can specifiy an AUnit.UNIT_TYPE to retrieve::
>>> getUnit(river.name, river.UNIT_TYPE)
Note that if both unit_type and unit_category are given whichever is
found to match against the unit.name first will be returned. This
shouldn't make any difference in practise.
name_key (str): name of the unit.
unit_type=None(str): the AUnit.TYPE to find.
unit_category=None(str): the AUnit.CATEGORY to find.
isisunit object corresponding to the given name, or False
if the name doesn't exist.
# Do a quick lookup on these as we know roughly where they are
if key == 'initial_conditions':
if self.units and self.units[-1]._unit_type == 'initial_conditions':
return self.units[-1]
elif len(self.units) > 1 and self.units[-2]._unit_type == 'initial_conditions':
return self.units[-2]
return False
if key == 'header':
if self.units and self.units[0]._unit_type == 'header':
return self.units[0]
return False
for u in self.units:
if u.name == key:
if unit_type is None and unit_category is None:
return u
elif unit_type and u.unit_type == unit_type:
return u
elif unit_category and u.unit_category == unit_category:
return u
return False
[docs] def setUnit(self, unit):
"""Replace the contents of a certain unit with the given one.
Each isisunit have a .name and .unit_type variable. The .nane and
.unit_type will be checked against the collection. If the name and
matching unit_type is found within thecollection that unit will be
replaced with the given one.
unit (AUnit): the unit to replace.
NameError, AttributeError - if the .name or .unit_type could not
be found.
name = unit._name
utype = unit._unit_type
except (NameError, AttributeError) as err:
logger.error('Provided AUnit does not have a name and/or unit_type variable - Data Corruption!')
index = self.index(unit._name, unit._unit_type)
self.units[index] = unit
for i, u in enumerate(self.units, 0):
if u.name == unit.name:
self.units[i] = unit
[docs] def numberOfUnits(self):
"""The number of units currently held in the collection.
Int Units in the collection.
return len(self.units)
[docs] def linkedUnits(self, unit):
linksect = ugroups.LinkedUnits(unit)
index = self.index(unit)
linksect.addLinkedUnit(self.units[index-1], 'upstream')
linksect.addLinkedUnit(self.units[index+1], 'downstream')
unit_links = [val for val in unit.linkLabels().values() if val.strip() != '']
unit_links = set(unit_links)
associates = [] # Used for named_units in linksect
temp_junctions = [] # Store any junctions that refer to this unit
# Store the name, name_ds and index of the units in this collection
# so that we don't have to loop twice
temp_locations = {}
for i, u in enumerate(self.units):
# Add the name, name_ds and index of all the units
# Same name's/name_ds' are stored in a list
if not u.name in temp_locations.keys():
temp_locations[u.name] = []
if u.name_ds != 'unknown' and not u.name_ds in temp_locations.keys():
temp_locations[u.name_ds] = []
if not i in temp_locations[u.name]:
if u.name_ds != 'unknown' and not i in temp_locations[u.name_ds]:
# Check if unit is in the linkLabels of u and add to associates or
# temp_junctions as appropriate if it is
links = [val for val in u.linkLabels().values() if val.strip() != '']
if not unit_links.isdisjoint(links):
if u.unit_type == 'junction':
temp_junctions.append((u, links))
if not u == unit:
# Check all of the junction linkLabels that we found and put the actual
# unit in the list for each name it references
junctions = []
if temp_junctions:
for i, junc in enumerate(temp_junctions):
junctions.append((junc[0], []))
for link in junc[1]:
for index in temp_locations[link]:
if not self.units[index] in junctions[i][1] and not \
self.units[index].unit_type == 'junction':
linksect.named_units = associates
linksect.junctions = junctions
del junctions
del associates
del temp_junctions
del temp_locations
return linksect
[docs] def initialisedDat(cls, dat_path, units=[], **kwargs):
"""Create a new ISIS .dat file with basic header info and no units.
Creates the equivelant of generating a new .dat unit in the software. The
DatCollection returned can then be manipulated in the same way that any
other one loaded from file would be.
A single comment unit will be be added to the file stating that it was
created by the SHIP library at timestamp.
Example unit_kwargs. Note that first index is a placholder for no args::
unit_kwargs== [{},
'ics': {rdt.FLOW: 3.0, rdt.STAGE: 15.0, rdt.ELEVATION: 14.5},
unit_kwargs(list): contains a dict with the kwargs for each unit
that is included in units. The list must be the same length
as units, or not included. If no kwargs for a particular unit
are to be given an empty dict should be used as a placholder in
the list.
dat_path(str): the path to set for the newly created .dat file.
DatCollection - setup as an empty ISIS .dat file.
unit_kwargs = kwargs.get('unit_kwargs', [{}]*len(units))
if not len(unit_kwargs) == len(units):
raise ValueError('unit_kwargs kwarg must be the same length as unit or not be given')
path_holder = ft.PathHolder(dat_path)
dat = cls(path_holder)
hunit = iuf.FmpUnitFactory.createUnit('header')
icunit = iuf.FmpUnitFactory.createUnit('initial_conditions')
cunit = CommentUnit(text=('Created by SHIP library on %s' % datetime.now().strftime('%Y-%M-%d %H:%M')))
dat.addUnit(hunit, update_node_count=False)
dat.addUnit(cunit, update_node_count=False)
dat.addUnit(icunit, update_node_count=False)
for i, u in enumerate(units):
dat.addUnit(u, **unit_kwargs[i])
return dat