from __future__ import unicode_literals
import logging
logger = logging.getLogger(__name__)
"""logging references with a __name__ set to this module."""
import os
import copy
from ship.tuflow.tuflowmodel import TuflowFilepartTypes
from ship.tuflow import FILEPART_TYPES as fpt
from ship.tuflow import tuflowfilepart as tuflowpart
from ship.utils import utilfunctions as uf
[docs]class TuflowFactory(object):
def __init__(self):
[docs] def getTuflowPart(cls, line, parent, part_type=None, logic=None):
filepart_types = TuflowFilepartTypes()
line = line.strip()
upline = line.upper()
if part_type is None:
found, key = filepart_types.find(upline)
found, key = filepart_types.find(upline, part_type)
if not found:
raise TypeError("Provided part type (%s) doesn't match line (%s)" % (part_type, line))
# vars = {'filepart_type': part_type}
vars = {}
if logic is not None:
vars['logic'] = logic
# Don't know what to do with it
if not found:
vars['filepart_type'] = fpt.UNKNOWN
vars['data'] = line
return [tuflowpart.UnknownPart(parent, **vars)]
key = checkMultiTypes(line, key)
vars['filepart_type'] = key
if key == fpt.MODEL:
parts = TuflowFactory.createModelType(line, parent, **vars)
elif key == fpt.GIS:
parts = TuflowFactory.createGisType(line, parent, **vars)
elif key == fpt.RESULT:
parts = TuflowFactory.createResultType(line, parent, **vars)
elif key == fpt.DATA:
parts = TuflowFactory.createDataType(line, parent, **vars)
elif key == fpt.VARIABLE:
parts = TuflowFactory.createVariableType(line, parent, **vars)
elif key == fpt.MODEL_VARIABLE:
parts = TuflowFactory.createModelVariableType(line, parent, **vars)
elif key == fpt.EVENT_VARIABLE:
parts = TuflowFactory.createBcEventVariable(line, parent, **vars)
elif key == fpt.USER_VARIABLE:
parts = TuflowFactory.createUserVariableType(line, parent, **vars)
return parts
TuflowFilepart type builders.
[docs] def createModelVariableType(line, parent, **kwargs):
command, kwargs['comment'], cchar = separateComment(line)
kwargs['command'], variable = breakLine(command)
split_var = variable.split('|')
if 'MODEL SCENARIOS' in kwargs['command'].upper():
prefix = 's'
prefix = 'e'
parts = []
for i, s in enumerate(split_var):
s = s.strip()
name = prefix + uf.encodeStr(str(i+1))
parts.append(tuflowpart.TuflowModelVariable(parent, **{
'logic': kwargs.get('logic', None), 'command': kwargs['command'],
'comment': kwargs['comment'], 'variable': s, 'name': name
parts = assignSiblings(parts)
return parts
[docs] def createUserVariableType(line, parent, **kwargs):
command, kwargs['comment'], cchar = separateComment(line)
kwargs['command'], kwargs['variable'] = breakLine(command)
part = tuflowpart.TuflowUserVariable(parent, **kwargs)
return [part]
[docs] def createBcEventVariable(line, parent, **kwargs):
l = line.strip().upper()
if l.startswith('BC EVENT NAME'):
return TuflowFactory.createVariableType(line, parent, **kwargs)
if l.startswith('BC EVENT TEXT'):
return TuflowFactory.createVariableType(line, parent, **kwargs)
if l.startswith('BC EVENT SOURCE'):
return TuflowFactory.createKeyValueType(line, parent, **kwargs)
[docs] def createVariableType(line, parent, **kwargs):
command, kwargs['comment'], cchar = separateComment(line)
kwargs['command'], kwargs['variable'] = breakLine(command)
part = tuflowpart.TuflowVariable(parent, **kwargs)
return [part]
[docs] def createKeyValueType(line, parent, **kwargs):
command, kwargs['comment'], cchar = separateComment(line)
kwargs['command'], kwargs['variable'] = breakLine(command)
part = tuflowpart.TuflowKeyValue(parent, **kwargs)
return [part]
[docs] def createDataType(line, parent, **kwargs):
command, kwargs['comment'], cchar = separateComment(line)
kwargs['command'], kwargs['path'] = breakLine(command)
kwargs['root'] = parent.root
part = tuflowpart.DataFile(parent, **kwargs)
return [part]
[docs] def createResultType(line, parent, **kwargs):
command, kwargs['comment'], cchar = separateComment(line)
kwargs['command'], kwargs['path'] = breakLine(command)
kwargs['root'] = parent.root
part = tuflowpart.ResultFile(parent, **kwargs)
part = resolveResult(part)
return [part]
[docs] def createGisType(line, parent, **kwargs):
command, kwargs['comment'], cchar = separateComment(line)
kwargs['command'], kwargs['path'] = breakLine(command)
kwargs['root'] = parent.root
if '|' in kwargs['path']:
return partsFromPipedFiles(tuflowpart.GisFile, parent, **kwargs)
part = tuflowpart.GisFile(parent, **kwargs)
return [part]
[docs] def createModelType(line, parent, **kwargs):
command, kwargs['comment'], cchar = separateComment(line)
# Check for Estry auto command
command, has_auto = checkEstryAuto(command, parent)
kwargs['has_auto'] = has_auto
kwargs['command'], kwargs['path'] = breakLine(command)
kwargs['root'] = parent.root
if takeParentType(kwargs['path']):
kwargs['model_type'] = parent.model_type
kwargs['model_type'] = getExtension(kwargs['path'])
part = tuflowpart.ModelFile(parent, **kwargs)
return [part]
[docs] def createIfLogic(parent, commands, terms, comments):
"""Create a new IfLogic object.
parent(ModelFile): the parent TuflowPart.
commands(list): a list of the command part to add for each clause.
e.g. 'If Scenario', 'Else', 'Else If Event', etc.
terms:(list(list)): terms to add for each clause. e.g.
[['scen1', 'scen2'], ['scen3']]
comments(list): the comment that should be attached to the end of
each clause line. e.g. 'If Scenario == scen1 | scen2 ! comment'
IfLogic - created with given args.
ValueError - if commands, terms and comments are not the same length.
if not len(commands) == len(terms) == len(comments):
raise ValueError('commands, terms and comments must be the same length')
iflogic = None
for i, c in enumerate(commands):
if i == 0:
vars = {'command': c, 'terms': terms[i], 'comment': comments[i]}
iflogic = tuflowpart.IfLogic(parent, **vars)
iflogic.addClause(c, terms[i], comments[i])
return iflogic
[docs] def createBlockLogic(parent, commands, terms, comments):
"""Create a new IfLogic object.
parent(ModelFile): the parent TuflowPart.
commands(str): the command part e.g. 'Define Event'.
terms:(list): terms to add for the clause ['scen1', 'scen2'].
comments(str): the comment that should be attached to the end of
the clause line. e.g. 'If Scenario == scen1 | scen2 ! comment'
BlockLogic - created with given args.
vars = {'command': commands, 'terms': terms, 'comment': comments}
blocklogic = tuflowpart.BlockLogic(parent, **vars)
return blocklogic
[docs]def partsFromPipedFiles(part_type, parent, **kwargs):
"""Separates piped file paths and creates a TuflowFilepart for each.
The 'path' entry in the kwargs will be replaced with the specific section
of the piped file that is being created for each part.
This function also call assignSiblings().
part_type(TuflowFilePart): class to crate the resultant parts with.
parent(TuflowFilePart): the control file part containing this command.
kwargs(dict): the arguments to use when building the TuflowFilePart.
These are dependent on the type of TuflowFilePart that you create.
list - containing TuflowFilePart instance of part_type type.
split_var = kwargs['path'].split('|')
parts = []
for i, s in enumerate(split_var):
vars = copy.deepcopy(kwargs)
s = s.strip()
vars['path'] = s
parts.append(part_type(parent, **vars))
parts = assignSiblings(parts)
return parts
[docs]def assignSiblings(parts):
"""Assign's next and previours sibling's in an ordered list.
parts(list): a list of TuflowFilepart's in associate order.
parts(list) - with sibling_next and sibling_prev objects set to
adjacent parts in the provided list.
for i, p in enumerate(parts):
if i == 0:
if len(parts) > 1:
parts[i].associates.sibling_next = parts[i+1]
if i > 0:
parts[i].associates.sibling_prev = parts[i-1]
if i < len(parts) - 1:
parts[i].associates.sibling_next = parts[i+1]
return parts
[docs]def checkMultiTypes(line, part_type):
"""Returns the corrent filepart _type for multi-type commands.
For example projection can be either a string variable or a gis file.
line(str): the control file line to check.
part_type(FILEPART_TYPE): the default value to return if everything
was already correct.
l = line.strip().upper()
if l.startswith('SHP PROJECTION') or l.startswith('MI PROJECTION'):
splitl = l.split('==')
val = splitl[1].upper()
if 'COORDSYS' in val and 'EARTH' in val and 'PROJECTION' in val:
return fpt.VARIABLE
return part_type
[docs]def checkEstryAuto(line, parent):
l = line.strip().upper()
has_auto = False
if l.startswith('ESTRY CONTROL FILE'):
if 'AUTO' in l:
has_auto = True
line = 'Estry Control File == ' + parent.filename + '.ecf'
return line, has_auto
[docs]def takeParentType(path):
types = ['TCF', 'TBC', 'TGC', 'TEF', 'ECF']
ext = getExtension(path)
if ext in types:
return False
return True
[docs]def getExtension(path, upper=True):
ext = os.path.splitext(path)[1][1:].upper()
if upper == True:
return ext.upper()
return ext.lower()
[docs]def breakLine(line):
"""Breaks a file line into it's command/instruction components.
Most lines in tuflow files are in the form::
Read Command == ..\some\path.ext ! comment
This separates the command from the rest.
line(str): the line as read from the file.
Tuple - (command, instruction).
line = line.strip()
command, instruction = line.split('==', 1)
command = command.strip()
instruction = instruction.strip()
return command, instruction
[docs]def resolveResult(result_part):
"""Fixes the self.type.RESULT paths after load.
Result and Check file paths are a bit tricky because they can be set
up in a range of ways as either relative or absolute e.g.:
If output is a checkfile no '\' on the end indicates that the final
string should be prepended to all files, but if it's a result output
it is the same as having a '\' on the end.
This function attempts to work out what's going on with it all and setup
the root, relative_root and filename accordingly.
Need to account for the case where a string follows the directory
is given that will be prepended to all output files.
ResultFile - ammended.
rtype = result_part.result_type.upper()
is_absolute = os.path.isabs(result_part.path_as_read)
basename = os.path.basename(result_part.path_as_read)
final_char = result_part.path_as_read[-1]
trailing_slash = final_char == '\\' or final_char == '/'
if is_absolute:
result_part.has_own_root = True
result_part.relative_root = ''
# If there's a slash on the end keep path as it is
if trailing_slash:
result_part.root = result_part.path_as_read
# Get directory for CHECK files so we can set a filename prefix later
# or stick a slash on the end for the others to make it easier to
# deal with it later
elif not trailing_slash and rtype == 'CHECK':
result_part.root = os.path.dirname(result_part.path_as_read) + os.sep
result_part.root = result_part.path_as_read + os.sep
# This shouldn't ever happen, but in case it does we set it to
# '' here so it doesn't catch anyone out later
if result_part.root is None: result_part.root = ''
result_part.has_own_root = False
if trailing_slash:
result_part.relative_root = result_part.path_as_read
elif not trailing_slash and rtype == 'CHECK':
result_part.relative_root = os.path.dirname(result_part.path_as_read)
result_part.relative_root = result_part.path_as_read + os.sep
result_part.filename = ''
result_part.extension = ''
# result_part.filename_is_prefix = False
# A trailing string is a prefix in check files so set that up here
if rtype == 'CHECK':
if not trailing_slash:
if not result_part.has_own_root:
result_part.relative_root = result_part.relative_root + os.sep
result_part.filename_is_prefix = True
result_part.filename = os.path.basename(result_part.path_as_read)
return result_part