"""Functions for reading and writing stuff to disk, and working with file paths."""importdatetimeasdtimportinspectimportjsonimportosimportshutilimportsysimporttimefrompathlibimportPathimportflopyimportnumpyasnpimportpandasaspdimportyamlfromflopy.mf6.dataimportmfstructurefromflopy.mf6.mfbaseimport(ExtFileAction,FlopyException,MFDataException,MFFileMgmt,PackageContainer,PackageContainerType,VerbosityLevel,)fromflopy.mf6.modflowimportmfims,mftdisfromflopy.modflow.mfimportModflowGlobalfromflopy.utilsimportmfreadnamimportmfsetupfrommfsetup.gridimportMFsetupGridfrommfsetup.utilsimportget_input_arguments,update
[docs]defcheck_source_files(fileslist):"""Check that the files in fileslist exist. """ifisinstance(fileslist,str):fileslist=[fileslist]forfinfileslist:f=Path(f)ifnotf.exists():raiseIOError(f'Cannot find {f.absolute()}')
[docs]defload(filename):"""Load a configuration file."""filename=Path(filename)ifset(filename.suffixes).intersection({'.yml','.yaml'}):returnload_yml(filename)eliffilename.suffix=='.json':returnload_json(filename)
[docs]defdump(filename,data):"""Write a dictionary to a configuration file."""ifstr(filename).endswith('.yml')orstr(filename).endswith('.yaml'):returndump_yml(filename,data)eliffilename.endswith('.json'):returndump_json(filename,data)
[docs]defload_json(jsonfile):"""Convenience function to load a json file; replacing some escaped characters."""withopen(jsonfile)asf:returnjson.load(f)
[docs]defdump_json(jsonfile,data):"""Write a dictionary to a json file."""withopen(jsonfile,'w')asoutput:json.dump(data,output,indent=4,sort_keys=True)print('wrote {}'.format(jsonfile))
[docs]defload_modelgrid(filename):"""Create a MFsetupGrid instance from model config json file."""cfg=load(filename)rename={'xll':'xoff','yll':'yoff',}fork,vinrename.items():ifkincfg:cfg[v]=cfg.pop(k)ifnp.isscalar(cfg['delr']):cfg['delr']=np.ones(cfg['ncol'])*cfg['delr']ifnp.isscalar(cfg['delc']):cfg['delc']=np.ones(cfg['nrow'])*cfg['delc']kwargs=get_input_arguments(cfg,MFsetupGrid)returnMFsetupGrid(**kwargs)
[docs]defload_yml(yml_file):"""Load yaml file into a dictionary."""withopen(yml_file)assrc:cfg=yaml.load(src,Loader=yaml.Loader)returncfg
[docs]defdump_yml(yml_file,data):"""Write a dictionary to a yaml file."""withopen(yml_file,'w')asoutput:yaml.dump(data,output)#, Dumper=yaml.Dumper)print('wrote {}'.format(yml_file))
[docs]defload_array(filename,shape=None,nodata=-9999):"""Load an array, ensuring the correct shape."""t0=time.time()ifnotisinstance(filename,list):filename=[filename]shape2d=shapeifshapeisnotNoneandlen(shape)==3:shape2d=shape[1:]arraylist=[]forfinfilename:ifisinstance(f,dict):f=f['filename']txt='loading {}'.format(f)ifshape2disnotNone:txt+=', shape={}'.format(shape2d)print(txt,end=', ')# arr = np.loadtxt# pd.read_csv is >3x faster than np.load_txtarr=pd.read_csv(f,delim_whitespace=True,header=None).valuesifshape2disnotNone:ifarr.shape!=shape2d:ifarr.size==np.prod(shape2d):arr=np.reshape(arr,shape2d)else:raiseValueError("Data in {} have size {}; should be {}".format(f,arr.shape,shape2d))arraylist.append(arr)array=np.squeeze(arraylist)ifissubclass(array.dtype.type,np.floating):array[array==nodata]=np.nanprint("took {:.2f}s".format(time.time()-t0))returnarray
[docs]defsave_array(filename,arr,nodata=-9999,**kwargs):"""Save and array and print that it was written."""ifisinstance(filename,dict)and'filename'infilename.keys():filename=filename.copy().pop('filename')t0=time.time()ifnp.issubdtype(arr.dtype,np.unsignedinteger):arr=arr.copy()arr=arr.astype(int)arr[np.isnan(arr)]=nodatanp.savetxt(filename,arr,**kwargs)print('wrote {}'.format(filename),end=', ')print("took {:.2f}s".format(time.time()-t0))
[docs]defappend_csv(filename,df,**kwargs):"""Read data from filename, append to dataframe, and write appended dataframe back to filename."""ifos.path.exists(filename):written=pd.read_csv(filename)df=pd.concat([df,written],axis=0)df.to_csv(filename,**kwargs)
[docs]defload_cfg(cfgfile,verbose=False,default_file=None):"""This method loads a YAML or JSON configuration file, applies configuration defaults from a default_file if specified, adds the absolute file path of the configuration file to the configuration dictionary, and converts any relative paths in the configuration dictionary to absolute paths, assuming the paths are relative to the configuration file location. Parameters ---------- cfgfile : str Path to MFsetup configuration file (json or yaml) Returns ------- cfg : dict Dictionary of configuration data Notes ----- This function is used by the model instance load and setup_from_yaml classmethods, so that configuration defaults can be applied to the simulation and model blocks before they are passed to the flopy simulation constructor and the model constructor. """print('loading configuration file {}...'.format(cfgfile))source_path=Path(__file__).parentdefault_file=Path(default_file)check_source_files([cfgfile,source_path/default_file])# default configurationdefault_cfg={}ifdefault_fileisnotNone:default_cfg=load(source_path/default_file)default_cfg['filename']=source_path/default_file# for now, only apply defaults for the model and simulation blocks# which are needed for the model instance constructor# other defaults are applied in _set_cfg,# which is called by model.__init__# intermediate_data is needed by some testsapply_defaults={'simulation','model','intermediate_data'}default_cfg={k:vfork,vindefault_cfg.items()ifkinapply_defaults}# recursively update defaults with information from yamlfilecfg=default_cfg.copy()user_specified_cfg=load(cfgfile)update(cfg,user_specified_cfg)cfg['model'].update({'verbose':verbose})cfg['filename']=os.path.abspath(cfgfile)# convert relative paths in the configuration dictionary# to absolute paths, based on the location of the config fileconfig_file_location=os.path.split(os.path.abspath(cfgfile))[0]cfg=set_cfg_paths_to_absolute(cfg,config_file_location)returncfg
[docs]defset_cfg_paths_to_absolute(cfg,config_file_location):version=Noneif'simulation'incfg:version='mf6'else:version=cfg['model'].get('version')ifversion=='mf6':file_path_keys_relative_to_config=['simulation.sim_ws','parent.model_ws','parent.simulation.sim_ws','parent.headfile',#'setup_grid.lgr.config_file']model_ws=os.path.normpath(os.path.join(config_file_location,cfg['simulation']['sim_ws']))else:file_path_keys_relative_to_config=['model.model_ws','parent.model_ws','parent.simulation.sim_ws','parent.headfile','nwt.use_existing_file']model_ws=os.path.normpath(os.path.join(config_file_location,cfg['model']['model_ws']))file_path_keys_relative_to_model_ws=['setup_grid.grid_file']# add additional paths by looking for source_data# within these input blocks, convert file paths to absolutelook_for_files_in=['source_data','perimeter_boundary','lgr','sfrmaker_options']forpckgname,pckgincfg.items():ifisinstance(pckg,dict):forinput_blockinlook_for_files_in:ifinput_blockinpckg.keys():# handle LGR sub-blocks separately# if LGR configuration is specified within the yaml file# (or as a dictionary), we don't want to touch it at this point# (just convert filepaths to configuration files for sub-models)ifinput_block=='lgr':formodel_name,configinpckg[input_block].items():if'filename'inconfig:file_keys=_parse_file_path_keys_from_source_data({model_name:config})else:file_keys=_parse_file_path_keys_from_source_data(pckg[input_block])forkeyinfile_keys:file_path_keys_relative_to_config. \
append('.'.join([pckgname,input_block,key]))forlocin['output_files','output_folders','output_folder','output_path']:iflocinpckg.keys():file_keys=_parse_file_path_keys_from_source_data(pckg[loc],paths=True)forkeyinfile_keys:file_path_keys_relative_to_model_ws. \
append('.'.join([pckgname,loc,key]).strip('.'))# set locations that are relative to configuration filecfg=_set_absolute_paths_to_location(file_path_keys_relative_to_config,config_file_location,cfg)# set locations that are relative to model_wscfg=_set_absolute_paths_to_location(file_path_keys_relative_to_model_ws,model_ws,cfg)returncfg
def_set_path(keys,abspath,cfg):"""From a sequence of keys that point to a file path in a nested dictionary, convert the file path at that location from relative to absolute, based on a provided absolute path. Parameters ---------- keys : sequence or str of dict keys separated by '.' that point to a relative path Example: 'parent.model_ws' for cfg['parent']['model_ws'] abspath : absolute path cfg : dictionary Returns ------- updates cfg with an absolute path based on abspath, at the location in the dictionary specified by keys. """ifisinstance(keys,str):keys=keys.split('.')d=cfg.get(keys[0])ifdisnotNone:forlevelinrange(1,len(keys)):iflevel==len(keys)-1:k=keys[level]ifkind:ifd[k]isnotNone:d[k]=os.path.normpath(os.path.join(abspath,d[k]))elifk.isdigit():k=int(k)ifd[k]isnotNone:d[k]=os.path.join(abspath,d[k])else:key=keys[level]ifkeyind:d=d[keys[level]]returncfgdef_set_absolute_paths_to_location(paths,location,cfg):"""Set relative file paths in a configuration dictionary to a specified location. Parameters ---------- paths : sequence Sequence of dictionary keys read by set_path. e.g. ['parent.model_ws', 'parent.headfile'] location : str (path to folder) cfg : configuration dictionary (as read in by load_cfg) """forkeysinpaths:cfg=_set_path(keys,location,cfg)returncfgdef_parse_file_path_keys_from_source_data(source_data,prefix=None,paths=False):"""Parse a source data entry in the configuration file. pseudo code: For each key or item in source_data, If it is a string that ends with a valid extension, a file is expected. If it is a dict or list, it is expected to be a file or set of files with metadata. For each item in the dict or list, If it is a string that ends with a valid extension, a file is expected. If it is a dict or list, A set of files corresponding to model layers or stress periods is expected. valid source data file extensions: csv, shp, tif, asc Parameters ---------- source_data : dict prefix : str text to prepend to results, e.g. keys = prefix.keys paths = Bool if True, overrides check for valid extension Returns ------- keys """valid_extensions=['csv','shp','tif','ref','dat','nc','yml','json','hds','cbb','cbc','grb']file_keys=['filename','filenames','binaryfile','nhdplus_paths']keys=[]ifsource_dataisNone:return[]ifisinstance(source_data,str):return['']ifisinstance(source_data,list):items=enumerate(source_data)elifisinstance(source_data,dict):items=source_data.items()fork0,vinitems:ifisinstance(v,str):ifk0infile_keys:keys.append(k0)elifv[-3:]invalid_extensionsorpaths:keys.append(k0)elif'output'insource_data:keys.append(k0)elifisinstance(v,list):fori,v1inenumerate(v):ifk0infile_keys:keys.append('.'.join([str(k0),str(i)]))elifpathsorisinstance(v1,str)andv1[-3:]invalid_extensions:keys.append('.'.join([str(k0),str(i)]))elifisinstance(v,dict):keys+=_parse_file_path_keys_from_source_data(v,prefix=k0,paths=paths)ifprefixisnotNone:keys=['{}.{}'.format(prefix,k)forkinkeys]returnkeys
[docs]defsetup_external_filepaths(model,package,variable_name,filename_format,file_numbers=None,relative_external_paths=True):"""Set up external file paths for a MODFLOW package variable. Sets paths for intermediate files, which are written from the (processed) source data. Intermediate files are supplied to Flopy as external files for a given package variable. Flopy writes external files to a specified location when the MODFLOW package file is written. This method gets the external file paths that will be written by FloPy, and puts them in the configuration dictionary under their respective variables. Parameters ---------- model : mfsetup.MF6model or mfsetup.MFnwtModel instance Model with cfg attribute to update. package : str Three-letter package abreviation (e.g. 'DIS' for discretization) variable_name : str FloPy name of variable represented by external files (e.g. 'top' or 'botm') filename_format : str File path to the external file(s). Can be a string representing a single file (e.g. 'top.dat'), or for variables where a file is written for each layer or stress period, a format string that will be formated with the zero-based layer number (e.g. 'botm{}.dat') for files botm0.dat, botm1.dat, ... file_numbers : list of ints List of numbers for the external files. Usually these represent zero-based layers or stress periods. Returns ------- filepaths : list List of external file paths Adds intermediated file paths to model.cfg[<package>]['intermediate_data'] For MODFLOW-6 models, Adds external file paths to model.cfg[<package>][<variable_name>] """package=package.lower()iffile_numbersisNone:file_numbers=[0]# in lieu of a way to get these from Flopy somehowgriddata_variables=['top','botm','idomain','strt','k','k33','sy','ss']transient2D_variables={'rech','recharge','finf','pet','extdp','extwc',}transient3D_variables={'lakarr','bdlknc'}tabular_variables={'connectiondata'}transient_tabular_variables={'stress_period_data'}transient_variables=transient2D_variables|transient3D_variables|transient_tabular_variablesmodel.get_package(package)# intermediate datafilename_format=os.path.split(filename_format)[-1]ifnotrelative_external_paths:intermediate_files=[os.path.normpath(os.path.join(model.tmpdir,filename_format).format(i))foriinfile_numbers]else:intermediate_files=[os.path.join(model.tmpdir,filename_format).format(i)foriinfile_numbers]ifvariable_nameintransient2D_variablesorvariable_nameintransient_tabular_variables:model.cfg['intermediate_data'][variable_name]={per:fforper,finzip(file_numbers,intermediate_files)}elifvariable_nameintransient3D_variables:model.cfg['intermediate_data'][variable_name]={0:intermediate_files}elifvariable_nameintabular_variables:model.cfg['intermediate_data']['{}_{}'.format(package,variable_name)]=intermediate_fileselse:model.cfg['intermediate_data'][variable_name]=intermediate_files# external array(s) read by MODFLOW# (set to reflect expected locations where flopy will save them)ifnotrelative_external_paths:external_files=[os.path.normpath(os.path.join(model.model_ws,model.external_path,filename_format.format(i)))foriinfile_numbers]else:external_files=[os.path.join(model.model_ws,model.external_path,filename_format.format(i))foriinfile_numbers]ifvariable_nameintransient2D_variablesorvariable_nameintransient_tabular_variables:model.cfg['external_files'][variable_name]={per:fforper,finzip(file_numbers,external_files)}elifvariable_nameintransient3D_variables:model.cfg['external_files'][variable_name]={0:external_files}else:model.cfg['external_files'][variable_name]=external_filesifmodel.version=='mf6':# skip these for now (not implemented yet for MF6)ifvariable_nameintransient3D_variables:returnext_files_key='external_files'ifvariable_namenotintransient_variables:filepaths=[{'filename':f}forfinmodel.cfg[ext_files_key][variable_name]]else:filepaths={per:{'filename':f}forper,finmodel.cfg[ext_files_key][variable_name].items()}# set package variable input (to Flopy)ifvariable_nameingriddata_variables:model.cfg[package]['griddata'][variable_name]=filepathselifvariable_nameintabular_variables:model.cfg[package][variable_name]=filepaths[0]model.cfg[ext_files_key]['{}_{}'.format(package,variable_name)]=model.cfg[ext_files_key].pop(variable_name)#elif variable_name in transient_variables:# filepaths = {per: {'filename': f} for per, f in# zip(file_numbers, model.cfg[ext_files_key][variable_name])}# model.cfg[package][variable_name] = filepathselifvariable_nameintransient_tabular_variables:model.cfg[package][variable_name]=filepathsmodel.cfg[ext_files_key]['{}_{}'.format(package,variable_name)]=model.cfg[ext_files_key].pop(variable_name)else:model.cfg[package][variable_name]=filepaths# {per: d for per, d in zip(file_numbers, filepaths)}else:filepaths=model.cfg['intermediate_data'][variable_name]model.cfg[package][variable_name]=filepathsreturnfilepaths
[docs]defflopy_mf2005_load(m,load_only=None,forgive=False,check=False):"""Execute the code in flopy.modflow.Modflow.load on an existing flopy.modflow.Modflow instance."""version=m.versionverbose=m.verbosemodel_ws=m.model_ws# similar to modflow command: if file does not exist , try file.namnamefile_path=os.path.join(model_ws,m.namefile)if(notos.path.isfile(namefile_path)andos.path.isfile(namefile_path+'.nam')):namefile_path+='.nam'ifnotos.path.isfile(namefile_path):raiseIOError('cannot find name file: '+str(namefile_path))files_successfully_loaded=[]files_not_loaded=[]# set the reference informationattribs=mfreadnam.attribs_from_namfile_header(namefile_path)#ref_attributes = SpatialReference.load(namefile_path)# read name fileext_unit_dict=mfreadnam.parsenamefile(namefile_path,m.mfnam_packages,verbose=verbose)ifm.verbose:print('\n{}\nExternal unit dictionary:\n{}\n{}\n'.format(50*'-',ext_unit_dict,50*'-'))# create a dict where key is the package name, value is unitnumberext_pkg_d={v.filetype:kfor(k,v)inext_unit_dict.items()}# reset version based on packages in the name fileif"NWT"inext_pkg_dor"UPW"inext_pkg_d:version="mfnwt"if"GLOBAL"inext_pkg_d:ifversion!="mf2k":m.glo=ModflowGlobal(m)version="mf2k"if"SMS"inext_pkg_d:version="mfusg"if"DISU"inext_pkg_d:version="mfusg"m.structured=False# update the modflow versionm.set_version(version)# reset unit number for glo fileifversion=="mf2k":if"GLOBAL"inext_pkg_d:unitnumber=ext_pkg_d["GLOBAL"]filepth=os.path.basename(ext_unit_dict[unitnumber].filename)m.glo.unit_number=[unitnumber]m.glo.file_name=[filepth]else:# TODO: is this necessary? it's not done for LIST.m.glo.unit_number=[0]m.glo.file_name=[""]# reset unit number for list fileif'LIST'inext_pkg_d:unitnumber=ext_pkg_d['LIST']filepth=os.path.basename(ext_unit_dict[unitnumber].filename)m.lst.unit_number=[unitnumber]m.lst.file_name=[filepth]# look for the free format flag in bas6bas_key=ext_pkg_d.get('BAS6')ifbas_keyisnotNone:bas=ext_unit_dict[bas_key]start=bas.filehandle.tell()line=bas.filehandle.readline()whileline.startswith("#"):line=bas.filehandle.readline()if"FREE"inline.upper():m.free_format_input=Truebas.filehandle.seek(start)ifverbose:print("ModflowBas6 free format:{0}\n".format(m.free_format_input))# load disdis_key=ext_pkg_d.get('DIS')orext_pkg_d.get('DISU')ifdis_keyisNone:raiseKeyError('discretization entry not found in nam file')disnamdata=ext_unit_dict[dis_key]dis=disnamdata.package.load(disnamdata.filename,m,ext_unit_dict=ext_unit_dict,check=False)files_successfully_loaded.append(disnamdata.filename)ifm.verbose:print(' {:4s} package load...success'.format(dis.name[0]))m.setup_grid()# reset model grid now that DIS package is loadedassertm.pop_key_list.pop()==dis_keyext_unit_dict.pop(dis_key)#.filehandle.close()ifload_onlyisNone:# load all packages/filesload_only=ext_pkg_d.keys()else:# check items in listifnotisinstance(load_only,list):load_only=[load_only]not_found=[]fori,filetypeinenumerate(load_only):load_only[i]=filetype=filetype.upper()iffiletypenotinext_pkg_d:not_found.append(filetype)ifnot_found:raiseKeyError("the following load_only entries were not found ""in the ext_unit_dict: "+str(not_found))# zone, mult, pvalif"PVAL"inext_pkg_d:m.mfpar.set_pval(m,ext_unit_dict)assertm.pop_key_list.pop()==ext_pkg_d.get("PVAL")if"ZONE"inext_pkg_d:m.mfpar.set_zone(m,ext_unit_dict)assertm.pop_key_list.pop()==ext_pkg_d.get("ZONE")if"MULT"inext_pkg_d:m.mfpar.set_mult(m,ext_unit_dict)assertm.pop_key_list.pop()==ext_pkg_d.get("MULT")# try loading packages in ext_unit_dictforkey,iteminext_unit_dict.items():ifitem.packageisnotNone:ifitem.filetypeinload_only:ifforgive:try:package_load_args= \
list(inspect.getfullargspec(item.package.load))[0]if"check"inpackage_load_args:item.package.load(item.filename,m,ext_unit_dict=ext_unit_dict,check=False)else:item.package.load(item.filename,m,ext_unit_dict=ext_unit_dict)files_successfully_loaded.append(item.filename)ifm.verbose:print(' {:4s} package load...success'.format(item.filetype))exceptExceptionase:m.load_fail=Trueifm.verbose:print(' {:4s} package load...failed\n{!s}'.format(item.filetype,e))files_not_loaded.append(item.filename)else:package_load_args= \
list(inspect.getfullargspec(item.package.load))[0]if"check"inpackage_load_args:item.package.load(item.filename,m,ext_unit_dict=ext_unit_dict,check=False)else:item.package.load(item.filename,m,ext_unit_dict=ext_unit_dict)files_successfully_loaded.append(item.filename)ifm.verbose:print(' {:4s} package load...success'.format(item.filetype))else:ifm.verbose:print(' {:4s} package load...skipped'.format(item.filetype))files_not_loaded.append(item.filename)elif"data"notinitem.filetype.lower():files_not_loaded.append(item.filename)ifm.verbose:print(' {:4s} package load...skipped'.format(item.filetype))elif"data"initem.filetype.lower():ifm.verbose:print(' {} file load...skipped\n{}'.format(item.filetype,os.path.basename(item.filename)))ifkeynotinm.pop_key_list:# do not add unit number (key) if it already existsifkeynotinm.external_units:m.external_fnames.append(item.filename)m.external_units.append(key)m.external_binflag.append("binary"initem.filetype.lower())m.external_output.append(False)else:raiseKeyError('unhandled case: {}, {}'.format(key,item))# pop binary output keys and any external file units that are now# internalforkeyinm.pop_key_list:try:m.remove_external(unit=key)ext_unit_dict.pop(key)exceptKeyError:ifm.verbose:print('Warning: external file unit {} does not exist in ''ext_unit_dict.'.format(key))# write message indicating packages that were successfully loadedifm.verbose:print('')print(' The following {0} packages were successfully loaded.'.format(len(files_successfully_loaded)))forfnameinfiles_successfully_loaded:print(' '+os.path.basename(fname))iflen(files_not_loaded)>0:print(' The following {0} packages were not loaded.'.format(len(files_not_loaded)))forfnameinfiles_not_loaded:print(' '+os.path.basename(fname))ifcheck:m.check(f='{}.chk'.format(m.name),verbose=m.verbose,level=0)# return model objectreturnm
[docs]defflopy_mfsimulation_load(sim,model,strict=True,load_only=None,verify_data=False):"""Execute the code in flopy.mf6.MFSimulation.load on existing instances of flopy.mf6.MFSimulation and flopy.mf6.MF6model"""instance=simifnotisinstance(model,list):model_instances=[model]else:model_instances=modelversion=sim.versionexe_name=sim.exe_nameverbosity_level=instance.simulation_data.verbosity_levelifverbosity_level.value>=VerbosityLevel.normal.value:print('loading simulation...')# build case consistent load_only dictionary for quick lookupsload_only=PackageContainer._load_only_dict(load_only)# load simulation name fileifverbosity_level.value>=VerbosityLevel.normal.value:print(' loading simulation name file...')instance.name_file.load(strict)# load TDIS filetdis_pkg='tdis{}'.format(mfstructure.MFStructure().get_version_string())tdis_attr=getattr(instance.name_file,tdis_pkg)instance._tdis_file=mftdis.ModflowTdis(instance,filename=tdis_attr.get_data())instance._tdis_file._filename=instance.simulation_data.mfdata[('nam','timing',tdis_pkg)].get_data()ifverbosity_level.value>=VerbosityLevel.normal.value:print(' loading tdis package...')instance._tdis_file.load(strict)# load modelstry:model_recarray=instance.simulation_data.mfdata[('nam','models','models')]models=model_recarray.get_data()exceptMFDataExceptionasmfde:message='Error occurred while loading model names from the ' \
'simulation name file.'raiseMFDataException(mfdata_except=mfde,model=instance.name,package='nam',message=message)foriteminmodels:# resolve model working folder and name filepath,name_file=os.path.split(item[1])# get the existing model instance# corresponding to its entry in the simulation name file# (in flopy the model instance is obtained from PackageContainer.model_factory below)model_obj=[mforminmodel_instancesifm.namefile==name_file]iflen(model_obj)==0:print('model {} attached to {} not found in {}'.format(item,instance,model_instances))returnmodel_obj=model_obj[0]#model_obj = PackageContainer.model_factory(item[0][:-1].lower())# load modelifverbosity_level.value>=VerbosityLevel.normal.value:print(' loading model {}...'.format(item[0].lower()))instance._models[item[2]]=flopy_mf6model_load(instance,model_obj,strict=strict,model_rel_path=path,load_only=load_only)# original flopy code to load model#instance._models[item[2]] = model_obj.load(# instance,# instance.structure.model_struct_objs[item[0].lower()], item[2],# name_file, version, exe_name, strict, path, load_only)# load exchange packages and dependent packagestry:exchange_recarray=instance.name_file.exchangeshas_exch_data=exchange_recarray.has_data()exceptMFDataExceptionasmfde:message='Error occurred while loading exchange names from the ' \
'simulation name file.'raiseMFDataException(mfdata_except=mfde,model=instance.name,package='nam',message=message)ifhas_exch_data:try:exch_data=exchange_recarray.get_data()exceptMFDataExceptionasmfde:message='Error occurred while loading exchange names from ' \
'the simulation name file.'raiseMFDataException(mfdata_except=mfde,model=instance.name,package='nam',message=message)forexgfileinexch_data:ifload_onlyisnotNoneandnot \
PackageContainer._in_pkg_list(load_only,exgfile[0],exgfile[2]):ifinstance.simulation_data.verbosity_level.value>= \
VerbosityLevel.normal.value:print(' skipping package {}..''.'.format(exgfile[0].lower()))continue# get exchange type by removing numbers from exgtypeexchange_type=''.join([charforcharinexgfile[0]ifnotchar.isdigit()]).upper()# get exchange number for this typeifexchange_typenotininstance._exg_file_num:exchange_file_num=0instance._exg_file_num[exchange_type]=1else:exchange_file_num=instance._exg_file_num[exchange_type]instance._exg_file_num[exchange_type]+=1exchange_name='{}_EXG_{}'.format(exchange_type,exchange_file_num)# find package class the corresponds to this exchange typepackage_obj=PackageContainer.package_factory(exchange_type.replace('-','').lower(),'')ifnotpackage_obj:message='An error occurred while loading the ' \
'simulation name file. Invalid exchange type ' \
'"{}" specified.'.format(exchange_type)type_,value_,traceback_=sys.exc_info()raiseMFDataException(instance.name,'nam','nam','loading simulation name file',exchange_recarray.structure.name,inspect.stack()[0][3],type_,value_,traceback_,message,instance._simulation_data.debug)# build and load exchange package objectexchange_file=package_obj(instance,exgtype=exgfile[0],exgmnamea=exgfile[2],exgmnameb=exgfile[3],filename=exgfile[1],pname=exchange_name,loading_package=True)ifverbosity_level.value>=VerbosityLevel.normal.value:print(' loading exchange package {}..''.'.format(exchange_file._get_pname()))exchange_file.load(strict)# Flopy>=3.9ifhasattr(instance,'_package_container'):instance._package_container.add_package(exchange_file)instance._exchange_files[exgfile[1]]=exchange_file# load simulation packagessolution_recarray=instance.simulation_data.mfdata[('nam','solutiongroup','solutiongroup')]try:solution_group_dict=solution_recarray.get_data()exceptMFDataExceptionasmfde:message='Error occurred while loading solution groups from ' \
'the simulation name file.'raiseMFDataException(mfdata_except=mfde,model=instance.name,package='nam',message=message)forsolution_groupinsolution_group_dict.values():forsolution_infoinsolution_group:ifload_onlyisnotNoneandnotPackageContainer._in_pkg_list(load_only,solution_info[0],solution_info[2]):ifinstance.simulation_data.verbosity_level.value>= \
VerbosityLevel.normal.value:print(' skipping package {}..''.'.format(solution_info[0].lower()))continueims_file=mfims.ModflowIms(instance,filename=solution_info[1],pname=solution_info[2])ifverbosity_level.value>=VerbosityLevel.normal.value:print(' loading ims package {}..''.'.format(ims_file._get_pname()))ims_file.load(strict)instance.simulation_data.mfpath.set_last_accessed_path()ifverify_data:instance.check()returninstance
[docs]defflopy_mf6model_load(simulation,model,strict=True,model_rel_path='.',load_only=None):"""Execute the code in flopy.mf6.MFmodel.load_base on an existing instance of MF6model."""instance=modelmodelname=model.namestructure=model.structure# build case consistent load_only dictionary for quick lookupsload_only=PackageContainer._load_only_dict(load_only)# load name fileinstance.name_file.load(strict)# order packagesvnum=mfstructure.MFStructure().get_version_string()# FIX: Transport - Priority packages maybe should not be hard codedpriority_packages={'dis{}'.format(vnum):1,'disv{}'.format(vnum):1,'disu{}'.format(vnum):1}packages_ordered=[]package_recarray=instance.simulation_data.mfdata[(modelname,'nam','packages','packages')]foriteminpackage_recarray.get_data():ifitem[0]inpriority_packages:packages_ordered.insert(0,(item[0],item[1],item[2]))else:packages_ordered.append((item[0],item[1],item[2]))# load packagessim_struct=mfstructure.MFStructure().sim_structinstance._ftype_num_dict={}forftype,fname,pnameinpackages_ordered:ftype_orig=ftypeftype=ftype[0:-1].lower()ifftypeinstructure.package_struct_objsorftypein \
sim_struct.utl_struct_objs:if(load_onlyisnotNoneandnotPackageContainer._in_pkg_list(priority_packages,ftype_orig,pname)andnotPackageContainer._in_pkg_list(load_only,ftype_orig,pname)):if(simulation.simulation_data.verbosity_level.value>=VerbosityLevel.normal.value):print(f" skipping package {ftype}...")continueifmodel_rel_pathandmodel_rel_path!='.':# strip off model relative path from the file pathfilemgr=simulation.simulation_data.mfpathfname=filemgr.strip_model_relative_path(modelname,fname)ifsimulation.simulation_data.verbosity_level.value>= \
VerbosityLevel.normal.value:print(' loading package {}...'.format(ftype))# load packageinstance.load_package(ftype,fname,pname,strict,None)# load referenced packagesifmodelnameininstance.simulation_data.referenced_files:forref_filein \
instance.simulation_data.referenced_files[modelname].values():if(ref_file.file_typeinstructure.package_struct_objsorref_file.file_typeinsim_struct.utl_struct_objs)and \
notref_file.loaded:instance.load_package(ref_file.file_type,ref_file.file_name,None,strict,ref_file.reference_path)ref_file.loaded=True# TODO: fix jagged lists where appropriatereturninstance
[docs]defwhich(program):"""Check for existance of executable. https://stackoverflow.com/questions/377017/test-if-executable-exists-in-python """defis_exe(fpath):returnos.path.isfile(fpath)andos.access(fpath,os.X_OK)fpath,fname=os.path.split(program)iffpath:ifis_exe(program):returnprogramelse:forpathinos.environ["PATH"].split(os.pathsep):exe_file=os.path.join(path,program)ifis_exe(exe_file):returnexe_file
[docs]defadd_version_to_fileheader(filename,model_info=None):"""Add modflow-setup, flopy and optionally model version info to an existing file header denoted by the comment characters ``#``, ``!``, or ``//``. """tempfile=str(filename)+'.temp'shutil.copy(filename,tempfile)withopen(tempfile)assrc:withopen(filename,'w')asdest:ifmodel_infoisNone:header=''else:header=f'# {model_info}\n'read_header=Trueforlineinsrc:ifread_headerandlen(line.strip())>0and \
line.strip()[0]in{'#','!','//'}:ifmodel_infoisNoneormodel_infonotinline:header+=lineelifread_header:if'modflow-setup'notinheader:headerlist=header.strip().split('\n')if'flopy'inheader.lower():pos,flopy_info=[(i,s)fori,sinenumerate(headerlist)if'flopy'ins.lower()][0]#flopy_info = header.strip().split('\n')[-1]if'version'notinflopy_info.lower():flopy_version=f'flopy version {flopy.__version__}'flopy_info=flopy_info.lower().replace('flopy',flopy_version)headerlist[pos]=flopy_info#header = '\n'.join(header.split('\n')[:-2] +# [flopy_info + '\n'])mfsetup_text='# via 'pos+=1# insert mfsetup header after flopyelse:mfsetup_text='# File created by 'pos=-1# insert mfsetup header at endmfsetup_text+='modflow-setup version {}'.format(mfsetup.__version__)mfsetup_text+=' at {:%Y-%m-%d %H:%M:%S}'.format(dt.datetime.now())headerlist.insert(pos,mfsetup_text)header='\n'.join(headerlist)+'\n'dest.write(header)read_header=Falsedest.write(line)else:dest.write(line)os.remove(tempfile)
[docs]defremove_file_header(filename):"""Remove the header of a MODFLOW input file, to allow comparison betwee files that have different headers but are otherwise the same, for example."""backup_file=str(filename)+'.backup'shutil.copy(filename,backup_file)withopen(backup_file)assrc:withopen(filename,'w')asdest:forlineinsrc:ifnotline.strip().startswith('#'):dest.write(line)os.remove(backup_file)