import os
import numpy as np
from ..pakbase import Package
from ..utils import parsenamefile
[docs]class ModflowFlwob(Package):
"""
Head-dependent flow boundary Observation package class. Minimal working
example that will be refactored in a future version.
Parameters
----------
nqfb : int
Number of cell groups for the head-dependent flow boundary
observations
nqcfb : int
Greater than or equal to the total number of cells in all cell groups
nqtfb : int
Total number of head-dependent flow boundary observations for all cell
groups
iufbobsv : int
unit number where output is saved
tomultfb : float
Time-offset multiplier for head-dependent flow boundary observations.
The product of tomultfb and toffset must produce a time value in units
consistent with other model input. tomultfb can be dimensionless or
can be used to convert the units of toffset to the time unit used in
the simulation.
nqobfb : int list of length nqfb
The number of times at which flows are observed for the group of cells
nqclfb : int list of length nqfb
Is a flag, and the absolute value of nqclfb is the number of cells in
the group. If nqclfb is less than zero, factor = 1.0 for all cells in
the group.
obsnam : string list of length nqtfb
Observation name
irefsp : int of length nqtfb
The zero-based stress period to which the observation time is
referenced.
The reference point is the beginning of the specified stress period.
toffset : float list of length nqtfb
Is the time from the beginning of the stress period irefsp to the time
of the observation. toffset must be in units such that the product of
toffset and tomultfb are consistent with other model input. For
steady state observations, specify irefsp as the steady state stress
period and toffset less than or equal to perlen of the stress period.
If perlen is zero, set toffset to zero. If the observation falls
within a time step, linearly interpolation is used between values at
the beginning and end of the time step.
flwobs : float list of length nqtfb
Observed flow value from the head-dependent flow boundary into the
aquifer (+) or the flow from the aquifer into the boundary (-)
layer : int list of length(nqfb, nqclfb)
The zero-based layer index for the cell included in the cell group.
row : int list of length(nqfb, nqclfb)
The zero-based row index for the cell included in the cell group.
column : int list of length(nqfb, nqclfb)
The zero-based column index of the cell included in the cell group.
factor : float list of length(nqfb, nqclfb)
Is the portion of the simulated gain or loss in the cell that is
included in the total gain or loss for this cell group (fn of eq. 5).
flowtype : string
String that corresponds to the head-dependent flow boundary condition
type (CHD, GHB, DRN, RIV)
extension : list of string
Filename extension. If extension is None, extension is set to
['chob','obc','gbob','obg','drob','obd', 'rvob','obr']
(default is None).
no_print : boolean
When True or 1, a list of flow observations will not be
written to the Listing File (default is False)
options : list of strings
Package options (default is None).
unitnumber : list of int
File unit number. If unitnumber is None, unitnumber is set to
[40, 140, 41, 141, 42, 142, 43, 143] (default is None).
filenames : str or list of str
Filenames to use for the package and the output files. If
filenames=None the package name will be created using the model name
and package extension and the flwob output name will be created using
the model name and .out extension (for example,
modflowtest.out), if iufbobsv is a number greater than zero.
If a single string is passed the package will be set to the string
and flwob output name will be created using the model name and .out
extension, if iufbobsv is a number greater than zero. To define the
names for all package files (input and output) the length of the list
of strings should be 2. Default is None.
Attributes
----------
Methods
-------
See Also
--------
Notes
-----
This represents a minimal working example that will be refactored in a
future version.
"""
def __init__(
self,
model,
nqfb=0,
nqcfb=0,
nqtfb=0,
iufbobsv=0,
tomultfb=1.0,
nqobfb=None,
nqclfb=None,
obsnam=None,
irefsp=None,
toffset=None,
flwobs=None,
layer=None,
row=None,
column=None,
factor=None,
flowtype=None,
extension=None,
no_print=False,
options=None,
filenames=None,
unitnumber=None,
):
if nqobfb is None:
nqobfb = []
if nqclfb is None:
nqclfb = []
if obsnam is None:
obsnam = []
if irefsp is None:
irefsp = []
if toffset is None:
toffset = []
if flwobs is None:
flwobs = []
if layer is None:
layer = []
if row is None:
row = []
if column is None:
column = []
if factor is None:
factor = []
if extension is None:
extension = [
"chob",
"obc",
"gbob",
"obg",
"drob",
"obd",
"rvob",
"obr",
]
pakunits = {"chob": 40, "gbob": 41, "drob": 42, "rvob": 43}
outunits = {"chob": 140, "gbob": 141, "drob": 142, "rvob": 143}
# if unitnumber is None:
# unitnumber = [40, 140, 41, 141, 42, 142, 43, 143]
if flowtype.upper().strip() == "CHD":
name = ["CHOB", "DATA"]
extension = extension[0:2]
# unitnumber = unitnumber[0:2]
# iufbobsv = unitnumber[1]
self._ftype = "CHOB"
self.url = "chob.html"
self.heading = "# CHOB for MODFLOW, generated by Flopy."
elif flowtype.upper().strip() == "GHB":
name = ["GBOB", "DATA"]
extension = extension[2:4]
# unitnumber = unitnumber[2:4]
# iufbobsv = unitnumber[1]
self._ftype = "GBOB"
self.url = "gbob.html"
self.heading = "# GBOB for MODFLOW, generated by Flopy."
elif flowtype.upper().strip() == "DRN":
name = ["DROB", "DATA"]
extension = extension[4:6]
# unitnumber = unitnumber[4:6]
# iufbobsv = unitnumber[1]
self._ftype = "DROB"
self.url = "drob.html"
self.heading = "# DROB for MODFLOW, generated by Flopy."
elif flowtype.upper().strip() == "RIV":
name = ["RVOB", "DATA"]
extension = extension[6:8]
# unitnumber = unitnumber[6:8]
# iufbobsv = unitnumber[1]
self._ftype = "RVOB"
self.url = "rvob.html"
self.heading = "# RVOB for MODFLOW, generated by Flopy."
else:
msg = "ModflowFlwob: flowtype must be CHD, GHB, DRN, or RIV"
raise KeyError(msg)
if unitnumber is None:
unitnumber = [pakunits[name[0].lower()], outunits[name[0].lower()]]
elif isinstance(unitnumber, int):
unitnumber = [unitnumber]
if len(unitnumber) == 1:
if unitnumber[0] in outunits.keys():
unitnumber = [pakunits[name[0].lower()], unitnumber[0]]
else:
unitnumber = [unitnumber[0], outunits[name[0].lower()]]
iufbobsv = unitnumber[1]
# call base package constructor
super().__init__(
model,
extension=extension,
name=name,
unit_number=unitnumber,
allowDuplicates=True,
filenames=self._prepare_filenames(filenames, 2),
)
self.nqfb = nqfb
self.nqcfb = nqcfb
self.nqtfb = nqtfb
self.iufbobsv = iufbobsv
self.tomultfb = tomultfb
self.nqobfb = nqobfb
self.nqclfb = nqclfb
self.obsnam = obsnam
self.irefsp = irefsp
self.toffset = toffset
self.flwobs = flwobs
self.layer = layer
self.row = row
self.column = column
self.factor = factor
# -create empty arrays of the correct size
self.layer = np.zeros(
(self.nqfb, max(np.abs(self.nqclfb))), dtype="int32"
)
self.row = np.zeros(
(self.nqfb, max(np.abs(self.nqclfb))), dtype="int32"
)
self.column = np.zeros(
(self.nqfb, max(np.abs(self.nqclfb))), dtype="int32"
)
self.factor = np.zeros(
(self.nqfb, max(np.abs(self.nqclfb))), dtype="float32"
)
self.nqobfb = np.zeros((self.nqfb), dtype="int32")
self.nqclfb = np.zeros((self.nqfb), dtype="int32")
self.irefsp = np.zeros((self.nqtfb), dtype="int32")
self.toffset = np.zeros((self.nqtfb), dtype="float32")
self.flwobs = np.zeros((self.nqtfb), dtype="float32")
# -assign values to arrays
self.nqobfb[:] = nqobfb
self.nqclfb[:] = nqclfb
self.obsnam[:] = obsnam
self.irefsp[:] = irefsp
self.toffset[:] = toffset
self.flwobs[:] = flwobs
for i in range(self.nqfb):
self.layer[i, : len(layer[i])] = layer[i]
self.row[i, : len(row[i])] = row[i]
self.column[i, : len(column[i])] = column[i]
self.factor[i, : len(factor[i])] = factor[i]
# add more checks here
self.no_print = no_print
self.np = 0
if options is None:
options = []
if self.no_print:
options.append("NOPRINT")
self.options = options
# add checks for input compliance (obsnam length, etc.)
self.parent.add_package(self)
[docs] def ftype(self):
return self._ftype
[docs] def write_file(self):
"""
Write the package file
Returns
-------
None
"""
# open file for writing
f_fbob = open(self.fn_path, "w")
# write header
f_fbob.write(f"{self.heading}\n")
# write sections 1 and 2 : NOTE- what about NOPRINT?
line = f"{self.nqfb:10d}"
line += f"{self.nqcfb:10d}"
line += f"{self.nqtfb:10d}"
line += f"{self.iufbobsv:10d}"
if self.no_print or "NOPRINT" in self.options:
line += "{: >10}".format("NOPRINT")
line += "\n"
f_fbob.write(line)
f_fbob.write(f"{self.tomultfb:10e}\n")
# write sections 3-5 looping through observations groups
c = 0
for i in range(self.nqfb):
# while (i < self.nqfb):
# write section 3
f_fbob.write(f"{self.nqobfb[i]:10d}{self.nqclfb[i]:10d}\n")
# Loop through observation times for the groups
for j in range(self.nqobfb[i]):
# write section 4
line = f"{self.obsnam[c]:12}"
line += f"{self.irefsp[c] + 1:8d}"
line += f"{self.toffset[c]:16.10g}"
line += f" {self.flwobs[c]:10.4g}\n"
f_fbob.write(line)
c += 1 # index variable
# write section 5 - NOTE- need to adjust factor for multiple
# observations in the same cell
for j in range(abs(self.nqclfb[i])):
# set factor to 1.0 for all cells in group
if self.nqclfb[i] < 0:
self.factor[i, :] = 1.0
line = f"{self.layer[i, j] + 1:10d}"
line += f"{self.row[i, j] + 1:10d}"
line += f"{self.column[i, j] + 1:10d}"
# note is 10f good enough here?
line += f"{self.factor[i, j]:10f}\n"
f_fbob.write(line)
f_fbob.close()
#
# swm: BEGIN hack for writing standard file
sfname = self.fn_path
sfname += "_ins"
# write header
f_ins = open(sfname, "w")
f_ins.write("jif @\n")
f_ins.write(f"StandardFile 0 1 {self.nqtfb}\n")
for i in range(0, self.nqtfb):
f_ins.write(f"{self.obsnam[i]}\n")
f_ins.close()
# swm: END hack for writing standard file
return
[docs] @classmethod
def load(cls, f, model, ext_unit_dict=None, check=True):
"""
Load an existing package.
Parameters
----------
f : filename or file handle
File to load.
model : model object
The model object (of type :class:`flopy.modflow.mf.Modflow`) to
which this package will be added.
ext_unit_dict : dictionary, optional
If the arrays in the file are specified using EXTERNAL,
or older style array control records, then `f` should be a file
handle. In this case ext_unit_dict is required, which can be
constructed using the function
:class:`flopy.utils.mfreadnam.parsenamefile`.
check : boolean
Check package data for common errors. (default True)
Returns
-------
flwob : ModflowFlwob package object
ModflowFlwob package object.
Examples
--------
>>> import flopy
>>> m = flopy.modflow.Modflow()
>>> hobs = flopy.modflow.ModflowFlwob.load('test.drob', m)
"""
if model.verbose:
print("loading flwob package file...")
openfile = not hasattr(f, "read")
if openfile:
filename = f
f = open(filename, "r")
# dataset 0 -- header
while True:
line = f.readline()
if line[0] != "#":
break
# read dataset 1 -- NQFB NQCFB NQTFB IUFBOBSV Options
t = line.strip().split()
nqfb = int(t[0])
nqcfb = int(t[1])
nqtfb = int(t[2])
iufbobsv = int(t[3])
options = []
if len(t) > 4:
options = t[4:]
# read dataset 2 -- TOMULTFB
line = f.readline()
t = line.strip().split()
tomultfb = float(t[0])
nqobfb = np.zeros(nqfb, dtype=np.int32)
nqclfb = np.zeros(nqfb, dtype=np.int32)
obsnam = []
irefsp = []
toffset = []
flwobs = []
layer = []
row = []
column = []
factor = []
# read datasets 3, 4, and 5 for each of nqfb groups
# of cells
nobs = 0
while True:
# read dataset 3 -- NQOBFB NQCLFB
line = f.readline()
t = line.strip().split()
nqobfb[nobs] = int(t[0])
nqclfb[nobs] = int(t[1])
# read dataset 4 -- OBSNAM IREFSP TOFFSET FLWOBS
ntimes = 0
while True:
line = f.readline()
t = line.strip().split()
obsnam.append(t[0])
irefsp.append(int(t[1]))
toffset.append(float(t[2]))
flwobs.append(float(t[3]))
ntimes += 1
if ntimes == nqobfb[nobs]:
break
# read dataset 5 -- Layer Row Column Factor
k = np.zeros(abs(nqclfb[nobs]), np.int32)
i = np.zeros(abs(nqclfb[nobs]), np.int32)
j = np.zeros(abs(nqclfb[nobs]), np.int32)
fac = np.zeros(abs(nqclfb[nobs]), np.float32)
ncells = 0
while True:
line = f.readline()
t = line.strip().split()
k[ncells] = int(t[0])
i[ncells] = int(t[1])
j[ncells] = int(t[2])
fac[ncells] = float(t[3])
ncells += 1
if ncells == abs(nqclfb[nobs]):
layer.append(k)
row.append(i)
column.append(j)
factor.append(fac)
break
nobs += 1
if nobs == nqfb:
break
irefsp = np.array(irefsp) - 1
layer = np.array(layer) - 1
row = np.array(row) - 1
column = np.array(column) - 1
factor = np.array(factor)
if openfile:
f.close()
# get ext_unit_dict if none passed
if ext_unit_dict is None:
namefile = os.path.join(model.model_ws, model.namefile)
ext_unit_dict = parsenamefile(namefile, model.mfnam_packages)
flowtype, ftype = _get_ftype_from_filename(f.name, ext_unit_dict)
# set package unit number
unitnumber = None
filenames = [None, None]
if ext_unit_dict is not None:
unitnumber, filenames[0] = model.get_ext_dict_attr(
ext_unit_dict, filetype=ftype.upper()
)
if iufbobsv > 0:
iu, filenames[1] = model.get_ext_dict_attr(
ext_unit_dict, unit=iufbobsv
)
model.add_pop_key_list(iufbobsv)
# create ModflowFlwob object instance
flwob = cls(
model,
iufbobsv=iufbobsv,
tomultfb=tomultfb,
nqfb=nqfb,
nqcfb=nqcfb,
nqtfb=nqtfb,
nqobfb=nqobfb,
nqclfb=nqclfb,
obsnam=obsnam,
irefsp=irefsp,
toffset=toffset,
flwobs=flwobs,
layer=layer,
row=row,
column=column,
factor=factor,
options=options,
flowtype=flowtype,
unitnumber=unitnumber,
filenames=filenames,
)
return flwob
def _get_ftype_from_filename(fn, ext_unit_dict=None):
"""
Returns the boundary flowtype and filetype for a given ModflowFlwob
package filename.
Parameters
----------
fn : str
The filename to be parsed.
ext_unit_dict : dictionary, optional
If the arrays in the file are specified using EXTERNAL,
or older style array control records, then `f` should be a file
handle. In this case ext_unit_dict is required, which can be
constructed using the function
:class:`flopy.utils.mfreadnam.parsenamefile`.
Returns
-------
flowtype : str
Corresponds to the type of the head-dependent boundary package for
which observations are desired (e.g. "CHD", "GHB", "DRN", or "RIV").
ftype : str
Corresponds to the observation file type (e.g. "CHOB", "GBOB",
"DROB", or "RVOB").
"""
ftype = None
# determine filetype from filename using ext_unit_dict
if ext_unit_dict is not None:
for key, value in ext_unit_dict.items():
if value.filename == fn:
ftype = value.filetype
break
# else, try to infer filetype from filename extension
else:
ext = fn.split(".")[-1].lower()
if "ch" in ext.lower():
ftype = "CHOB"
elif "gb" in ext.lower():
ftype = "GBOB"
elif "dr" in ext.lower():
ftype = "DROB"
elif "rv" in ext.lower():
ftype = "RVOB"
msg = f"ModflowFlwob: filetype cannot be inferred from file name {fn}"
if ftype is None:
raise AssertionError(msg)
flowtype_dict = {
"CHOB": "CHD",
"GOBO": "GHB",
"DROB": "DRN",
"RVOB": "RIV",
}
flowtype = flowtype_dict[ftype]
return flowtype, ftype