Source code for ship.utils.fileloaders.tuflowloader

from __future__ import unicode_literals

import logging
logger = logging.getLogger(__name__)
"""logging references with a __name__ set to this module."""

import os
import uuid

from ship.tuflow import FILEPART_TYPES as fpt
from ship.utils import utilfunctions as uf
from ship.tuflow.tuflowmodel import TuflowModel, TuflowFilepartTypes, UserVariables
from ship.tuflow import controlfile as control
from ship.tuflow import tuflowfilepart as tuflowpart
from ship.utils.fileloaders.loader import ALoader
from ship.utils import filetools
from ship.tuflow import tuflowfactory as tfactory


[docs]class TuflowLoader(ALoader): def __init__(self): ALoader.__init__(self) self.types = TuflowFilepartTypes() self.user_variables = UserVariables() # self.scenario_vals = {} # """Any scenario values that are passed through.""" # # self.event_vals = {} """Any event values that are passed through.""" self.tuflow_model = None """TuflowModel class instance""" # Internal stuff self._resetLoader() def _resetLoader(self): self._file_queue = uf.FileQueue() self.user_variables = UserVariables() self._load_list = {} self._file_list = {} self._logic_list = {} self._bc_event = {} # self.scenario_vals = {} # self.event_vals = {} self.tuflow_model = None self._control_files = []
[docs] def loadFile(self, tcf_path, arg_dict={}): """Main loader function defined by the ALoader interface. The presumption with this loader is that a .tcf file (i.e. the root of all tuflow models will be given as a starting point. The rest of the model can then be accessed from there. """ return self.loadModel(tcf_path, arg_dict)
[docs] def loadModel(self, tcf_path, arg_dict={}): """Load a full tuflow model from the given tcf path.""" self._resetLoader() if 'scenario' in arg_dict.keys(): self._has_scenario = True self.user_Variables.has_cmd_vals = True self.scenario_vals = arg_dict['scenario'] for key, val in arg_dict['scenario'].items(): self.user_variables.addVariable(tuflowpart.TuflowModelVariable.noParent(key, val), 'scen') if 'event' in arg_dict.keys(): self._has_event = True self.user_Variables.has_cmd_vals = True self.event_vals = arg_dict['event'] for key, val in arg_dict['event'].items(): self.user_variables.addVariable(tuflowpart.TuflowModelVariable.noParent(key, val), 'evnt') # Check that the tcf exists if not os.path.exists(tcf_path): raise IOError('Tcf file at %s does not exist' % tcf_path) root, tcf_name = os.path.split(tcf_path) root = uf.encodeStr(root) # root = unicode(root) tcf_name = uf.encodeStr(tcf_name) self.tuflow_model = TuflowModel(root) # Parse the tuflow control files main_file = tuflowpart.ModelFile(None, **{'path': tcf_name, 'command': None, 'comment': None, 'model_type': 'TCF', 'root': root}) tcf_path = main_file.absolutePath() # self.tuflow_model.main_file = main_file # Setup the file and object holders self._file_queue.enqueue(main_file) # Read the control files and their contents into memory self._fetchTuflowModel(root) # Order the input and create the actual ControlFile objects self._orderModel(tcf_path) # Return the loaded model self.tuflow_model.root = root self.tuflow_model.control_files = self._control_files self.tuflow_model.bc_event = self._bc_event self.tuflow_model.user_variables = self.user_variables self.tuflow_model.control_files['TCF'].add_callback = self.tuflow_model.addTcfModelFile self.tuflow_model.control_files['TCF'].remove_callback = self.tuflow_model.removeTcfModelFile self.tuflow_model.control_files['TCF'].replace_callback = self.tuflow_model.replaceTcfModelFile self.tuflow_model.missing_model_files = self.missing_model_files return self.tuflow_model
[docs] def loadControlFile(self, model_file): """ """ path = model_file.absolutePath() if not os.path.exists(path): raise IOError('model_file path does not exists at: ' + path) self._resetLoader() root = model_file.root self._file_queue.enqueue(model_file) self._fetchTuflowModel(root) _load_list = self._load_list[path] model = self._file_list[path] mtype = model_file.model_type self.current_control = [mtype] self._control_files = {mtype: control.ControlFile(mtype)} self._control_files[mtype].logic.add(self._logic_list[path]) self._control_files[mtype].control_files.append(model_file) self.buildControlFiles(_load_list, model) return self._control_files[mtype]
def _orderModel(self, tcf_path): """Setup the ControlFile's with ordered TuflowFilepart's. There will always be a tcf file as an entry point, so that is setup first. Then builControlFiles is called to deal with all the others. """ _load_list = self._load_list[tcf_path] model = self._file_list[tcf_path] self.current_control = ['TCF'] self._control_files = {'TCF': control.TcfControlFile(model)} self._control_files['TCF'].logic.add(self._logic_list[tcf_path]) self._control_files['TCF'].control_files.append(model) self.buildControlFiles(_load_list, model)
[docs] def buildControlFiles(self, _load_list, model): """Add TuflowFilepart's to the correct ControlFile's. Loops through the list of TuflowFilePart's created when parsing the tuflow input files. When it finds a TuflowFilepart containing a reference to a tuflow control file it will recursively call itself and start adding parts to the ControlFile for that type instead. It will either find another control file and, again, make a recursive call, or will finish adding the TuflowFileparts to the ControlFile and drop back into the previous function call (recursive) to continue reading the previous list of TuflowFilepart's. """ for part in _load_list: self._control_files[self.current_control[-1]].parts.append(part) if part.obj_type == 'model': p = part.absolutePath() if p in self.missing_model_files: continue if not part.model_type in self._control_files.keys(): self._control_files[part.model_type] = control.ControlFile(part.model_type) self._control_files[part.model_type].logic.add(self._logic_list[p]) self.current_control.append(part.model_type) self._control_files[part.model_type].control_files.append(part) self.buildControlFiles(self._load_list[p], self._file_list[p]) elif part.associates.parent.model_type == 'TCF' and part.filepart_type == fpt.EVENT_VARIABLE: self._bc_event[part.command] = part.variable self.current_control.pop()
''' # Control file parsers. # ''' def _fetchTuflowModel(self, root): """Read all of the control files into memory. """ self.missing_model_files = [] # Keep processing control files until there are none left in the queue while not self._file_queue.isEmpty(): control_part = self._file_queue.dequeue() cpath = control_part.absolutePath() raw_contents = self.getFile(cpath) # If we couldn't load the file add it to the missing list if raw_contents == False: self.missing_model_files.append(cpath) continue contents, logic = self._readControlFile(raw_contents, root, control_part) self._load_list[cpath] = contents self._logic_list[cpath] = logic self._file_list[cpath] = control_part del self._file_queue def _readControlFile(self, raw_contents, root, control_part): """Load the content of a control file. """ contents = [] unknown_store = [] logic = [] logic_done = [] factory = tfactory.TuflowFactory() def createUnknown(unknown_store, l): """Creates an UnknownPart from the current list.""" if unknown_store: vars = {} vars['data'] = '\n'.join(unknown_store) vars['logic'] = l contents.append(tuflowpart.UnknownPart(control_part, **vars)) return [] def addLogicAssociate(lpart, logic_stack): if logic: lpart.associates.logic = logic[-1] logic[-1].addPart(lpart, skip_callback=True) logic.append(lpart) return logic for line in raw_contents: current_logic = logic[-1] if logic else None line = line.strip() upline = line.upper() if tfactory.checkIsComment(line): unknown_store.append(line) continue found, key = self.types.find(upline) # If we don't know what it is if not found: unknown_store.append(line) continue else: unknown_store = createUnknown(unknown_store, current_logic) # Build logic types if key == fpt.IF_LOGIC: vars = self.parseIfLogic(line, control_part, root, key) if vars['command'].upper().strip().startswith('IF'): lfile = tuflowpart.IfLogic(control_part, **vars) logic = addLogicAssociate(lfile, logic) elif vars['command'].upper().strip().startswith('END IF'): logic_done.append(logic.pop()) else: current_logic.addClause(vars['command'], vars['terms'], vars['comment']) continue elif key == fpt.EVENT_LOGIC or key == fpt.SECTION_LOGIC: vars = self.parseDefineLogic(line, control_part, key) if vars['command'].upper().strip().startswith('END'): logic_done.append(logic.pop()) else: if key == fpt.EVENT_LOGIC: dfile = tuflowpart.BlockLogic(control_part, **vars) else: dfile = tuflowpart.SectionLogic(control_part, **vars) logic = addLogicAssociate(dfile, logic) continue # All other FilePart types else: parts = factory.getTuflowPart(line, control_part, key, current_logic) if key == fpt.MODEL: for p in parts: self._file_queue.enqueue(p) if key == fpt.MODEL_VARIABLE: for p in parts: if not self.user_variables.has_cmd_args: self.user_variables.add(p) if key == fpt.USER_VARIABLE: for p in parts: self.user_variables.add(p) for p in parts: contents.append(p) if logic: current_logic.addPart(p, skip_callback=True) unknown_store = createUnknown(unknown_store, current_logic) return contents, logic_done ''' # TuflowFilepart type builders. # '''
[docs] def parseDefineLogic(self, line, parent, key): vars = {} command, vars['comment'], cchar = tfactory.separateComment(line) if '==' in line: vars['command'], vars['terms'] = tfactory.breakLine(command) vars['terms'] = [vars['terms']] else: vars['command'] = command vars['terms'] = None vars['filepart_type'] = key return vars
[docs] def parseIfLogic(self, line, parent, root, key): vars = {} command, vars['comment'], cchar = tfactory.separateComment(line) if '==' in line: vars['command'], vars['terms'] = tfactory.breakLine(command) vars['terms'] = vars['terms'].split('|') else: vars['command'] = command vars['terms'] = None vars['type'] = key return vars
def _addModelVariable(self, part): """Adds any model variables read in to the global variables dict. The dict will only be updated if there are no values in it already. This is inline with the Tuflow manual guidance. Args: part(TuflowVariable): with self.filepart_type == FILEPART_TYPE.MODEL_VARIABLE. """ if not part.filepart_type == fpt.MODEL_VARIABLE: return if 'EVENT' in part.command.upper(): if not self.event_vals: for i, e in enumerate(part.split_variable): self.event_vals['e' + uf.encodeStr(i+1)] = e else: if not self.scenario_vals: for i, s in enumerate(part.split_variable): self.scenario_vals['s' + uf.encodeStr(i+1)] = s ''' # File processing methods # '''
[docs] def getFile(self, path): """Load the file into the contents list. Args: file_path (str): path to the required file. Returns: True if loaded ok, False otherwise. """ logger.debug('loading File: ' + path) try: raw_contents = filetools.getFile(path) except IOError: logger.error('IOError - Unable to load file') return False if(raw_contents == None): logger.error('model file is empty at: ' + path) return False return raw_contents