"""
Module to read MODFLOW output files. The module contains shared
abstract classes that should not be directly accessed.
"""
import os
from pathlib import Path
from typing import Union
import numpy as np
[docs]class LayerFile:
"""
The LayerFile class is the abstract base class from which specific derived
classes are formed. LayerFile This class should not be instantiated
directly.
"""
def __init__(
self, filename: Union[str, os.PathLike], precision, verbose, kwargs
):
from ..discretization.structuredgrid import StructuredGrid
self.filename = Path(filename).expanduser().absolute()
self.precision = precision
self.verbose = verbose
self.file = open(self.filename, "rb")
# Get filesize to ensure this is not an empty file
self.file.seek(0, 2)
totalbytes = self.file.tell()
self.file.seek(0, 0) # reset to beginning
assert self.file.tell() == 0
if totalbytes == 0:
raise ValueError(f"datafile error: file is empty: {filename}")
self.nrow = 0
self.ncol = 0
self.nlay = 0
self.times = []
self.kstpkper = []
self.recordarray = []
self.iposarray = []
if precision == "single":
self.realtype = np.float32
elif precision == "double":
self.realtype = np.float64
else:
raise Exception(f"Unknown precision specified: {precision}")
self.model = None
self.dis = None
self.mg = None
if "model" in kwargs.keys():
self.model = kwargs.pop("model")
self.mg = self.model.modelgrid
self.dis = self.model.dis
if "dis" in kwargs.keys():
self.dis = kwargs.pop("dis")
self.mg = self.dis.parent.modelgrid
if "modelgrid" in kwargs.keys():
self.mg = kwargs.pop("modelgrid")
if len(kwargs.keys()) > 0:
args = ",".join(kwargs.keys())
raise Exception(f"LayerFile error: unrecognized kwargs: {args}")
# read through the file and build the pointer index
self._build_index()
# now that we read the data and know nrow and ncol,
# we can make a generic mg if needed
if self.mg is None:
self.mg = StructuredGrid(
delc=np.ones((self.nrow,)),
delr=np.ones(
self.ncol,
),
nlay=self.nlay,
xoff=0.0,
yoff=0.0,
angrot=0.0,
)
[docs] def to_shapefile(
self,
filename: Union[str, os.PathLike],
kstpkper=None,
totim=None,
mflay=None,
attrib_name="lf_data",
verbose=False,
):
"""
Export model output data to a shapefile at a specific location
in LayerFile instance.
Parameters
----------
filename : str or PathLike
Shapefile path to write
kstpkper : tuple of ints
A tuple containing the time step and stress period (kstp, kper).
These are zero-based kstp and kper values.
totim : float
The simulation time.
mflay : integer
MODFLOW zero-based layer number to return. If None, then layer 1
will be written
attrib_name : str
Base name of attribute columns. (default is 'lf_data')
verbose : bool
Whether to print verbose output
Returns
----------
None
See Also
--------
Notes
-----
Examples
--------
>>> import flopy
>>> hdobj = flopy.utils.HeadFile('test.hds')
>>> times = hdobj.get_times()
>>> hdobj.to_shapefile('test_heads_sp6.shp', totim=times[-1])
"""
plotarray = np.atleast_3d(
self.get_data(
kstpkper=kstpkper, totim=totim, mflay=mflay
).transpose()
).transpose()
if mflay != None:
attrib_dict = {f"{attrib_name}{mflay}": plotarray[0, :, :]}
else:
attrib_dict = {}
for k in range(plotarray.shape[0]):
name = f"{attrib_name}{k}"
attrib_dict[name] = plotarray[k]
from ..export.shapefile_utils import write_grid_shapefile
write_grid_shapefile(filename, self.mg, attrib_dict, verbose)
[docs] def plot(
self,
axes=None,
kstpkper=None,
totim=None,
mflay=None,
filename_base=None,
**kwargs,
):
"""
Plot 3-D model output data in a specific location
in LayerFile instance
Parameters
----------
axes : list of matplotlib.pyplot.axis
List of matplotlib.pyplot.axis that will be used to plot
data for each layer. If axes=None axes will be generated.
(default is None)
kstpkper : tuple of ints
A tuple containing the time step and stress period (kstp, kper).
These are zero-based kstp and kper values.
totim : float
The simulation time.
mflay : int
MODFLOW zero-based layer number to return. If None, then all
all layers will be included. (default is None)
filename_base : str
Base file name that will be used to automatically generate file
names for output image files. Plots will be exported as image
files if file_name_base is not None. (default is None)
**kwargs : dict
pcolor : bool
Boolean used to determine if matplotlib.pyplot.pcolormesh
plot will be plotted. (default is True)
colorbar : bool
Boolean used to determine if a color bar will be added to
the matplotlib.pyplot.pcolormesh. Only used if pcolor=True.
(default is False)
contour : bool
Boolean used to determine if matplotlib.pyplot.contour
plot will be plotted. (default is False)
clabel : bool
Boolean used to determine if matplotlib.pyplot.clabel
will be plotted. Only used if contour=True. (default is False)
grid : bool
Boolean used to determine if the model grid will be plotted
on the figure. (default is False)
masked_values : list
List of unique values to be excluded from the plot.
file_extension : str
Valid matplotlib.pyplot file extension for savefig(). Only used
if filename_base is not None. (default is 'png')
Returns
----------
None
See Also
--------
Notes
-----
Examples
--------
>>> import flopy
>>> hdobj = flopy.utils.HeadFile('test.hds')
>>> times = hdobj.get_times()
>>> hdobj.plot(totim=times[-1])
"""
if "file_extension" in kwargs:
fext = kwargs.pop("file_extension")
fext = fext.replace(".", "")
else:
fext = "png"
masked_values = kwargs.pop("masked_values", [])
if self.model is not None:
if hasattr(self.model, "bas6") and self.model.bas6 is not None:
masked_values.append(self.model.bas6.hnoflo)
kwargs["masked_values"] = masked_values
filenames = None
if filename_base is not None:
if mflay is not None:
i0 = int(mflay)
if i0 + 1 >= self.nlay:
i0 = self.nlay - 1
i1 = i0 + 1
else:
i0 = 0
i1 = self.nlay
filenames = [
f"{filename_base}_Layer{k + 1}.{fext}" for k in range(i0, i1)
]
# make sure we have a (lay,row,col) shape plotarray
plotarray = np.atleast_3d(
self.get_data(
kstpkper=kstpkper, totim=totim, mflay=mflay
).transpose()
).transpose()
from ..plot.plotutil import PlotUtilities
return PlotUtilities._plot_array_helper(
plotarray,
model=self.model,
axes=axes,
filenames=filenames,
mflay=mflay,
modelgrid=self.mg,
**kwargs,
)
def _build_index(self):
"""
Build the recordarray and iposarray, which maps the header information
to the position in the formatted file.
"""
raise Exception(
"Abstract method _build_index called in LayerFile. "
"This method needs to be overridden."
)
[docs] def list_records(self):
"""
Print a list of all of the records in the file
obj.list_records()
"""
for header in self.recordarray:
print(header)
return
def _get_data_array(self, totim=0):
"""
Get the three dimensional data array for the
specified kstp and kper value or totim value.
"""
if totim >= 0.0:
keyindices = np.where(self.recordarray["totim"] == totim)[0]
if len(keyindices) == 0:
msg = f"totim value ({totim}) not found in file..."
raise Exception(msg)
else:
raise Exception("Data not found...")
# initialize head with nan and then fill it
idx = keyindices[0]
nrow = self.recordarray["nrow"][idx]
ncol = self.recordarray["ncol"][idx]
data = np.empty((self.nlay, nrow, ncol), dtype=self.realtype)
data[:, :, :] = np.nan
for idx in keyindices:
ipos = self.iposarray[idx]
ilay = self.recordarray["ilay"][idx]
if self.verbose:
msg = f"Byte position in file: {ipos} for layer {ilay}"
print(msg)
self.file.seek(ipos, 0)
nrow = self.recordarray["nrow"][idx]
ncol = self.recordarray["ncol"][idx]
shp = (nrow, ncol)
data[ilay - 1] = self._read_data(shp)
return data
[docs] def get_times(self):
"""
Get a list of unique times in the file
Returns
----------
out : list of floats
List contains unique simulation times (totim) in binary file.
"""
return self.times
[docs] def get_kstpkper(self):
"""
Get a list of unique stress periods and time steps in the file
Returns
----------
out : list of (kstp, kper) tuples
List of unique kstp, kper combinations in binary file. kstp and
kper values are presently zero-based.
"""
kstpkper = []
for kstp, kper in self.kstpkper:
kstpkper.append((kstp - 1, kper - 1))
return kstpkper
[docs] def get_data(self, kstpkper=None, idx=None, totim=None, mflay=None):
"""
Get data from the file for the specified conditions.
Parameters
----------
idx : int
The zero-based record number. The first record is record 0.
kstpkper : tuple of ints
A tuple containing the time step and stress period (kstp, kper).
These are zero-based kstp and kper values.
totim : float
The simulation time.
mflay : integer
MODFLOW zero-based layer number to return. If None, then all
all layers will be included. (Default is None.)
Returns
----------
data : numpy array
Array has size (nlay, nrow, ncol) if mflay is None or it has size
(nrow, ncol) if mlay is specified.
See Also
--------
Notes
-----
if both kstpkper and totim are None, will return the last entry
Examples
--------
"""
# One-based kstp and kper for pulling out of recarray
if kstpkper is not None:
kstp1 = kstpkper[0] + 1
kper1 = kstpkper[1] + 1
idx = np.where(
(self.recordarray["kstp"] == kstp1)
& (self.recordarray["kper"] == kper1)
)
if idx[0].shape[0] == 0:
raise Exception(
f"get_data() error: kstpkper not found:{kstpkper}"
)
totim1 = self.recordarray[idx]["totim"][0]
elif totim is not None:
totim1 = totim
elif idx is not None:
totim1 = self.recordarray["totim"][idx]
else:
totim1 = self.times[-1]
data = self._get_data_array(totim1)
if mflay is None:
return data
else:
return data[mflay, :, :]
[docs] def get_alldata(self, mflay=None, nodata=-9999):
"""
Get all of the data from the file.
Parameters
----------
mflay : integer
MODFLOW zero-based layer number to return. If None, then all
all layers will be included. (Default is None.)
nodata : float
The nodata value in the data array. All array values that have the
nodata value will be assigned np.nan.
Returns
----------
data : numpy array
Array has size (ntimes, nlay, nrow, ncol) if mflay is None or it
has size (ntimes, nrow, ncol) if mlay is specified.
See Also
--------
Notes
-----
Examples
--------
"""
rv = []
for totim in self.times:
h = self.get_data(totim=totim, mflay=mflay)
rv.append(h)
rv = np.array(rv)
rv[rv == nodata] = np.nan
return rv
def _read_data(self, shp):
"""
Read data from file
"""
raise Exception(
"Abstract method _read_data called in LayerFile. "
"This method needs to be overridden."
)
def _build_kijlist(self, idx):
if isinstance(idx, list):
kijlist = idx
elif isinstance(idx, tuple):
kijlist = [idx]
else:
raise Exception("Could not build kijlist from ", idx)
# Check to make sure that k, i, j are within range, otherwise
# the seek approach won't work. Can't use k = -1, for example.
for k, i, j in kijlist:
fail = False
if k < 0 or k > self.nlay - 1:
fail = True
if i < 0 or i > self.nrow - 1:
fail = True
if j < 0 or j > self.ncol - 1:
fail = True
if fail:
raise Exception(
"Invalid cell index. Cell {} not within model grid: "
"{}".format((k, i, j), (self.nlay, self.nrow, self.ncol))
)
return kijlist
def _get_nstation(self, idx, kijlist):
if isinstance(idx, list):
return len(kijlist)
elif isinstance(idx, tuple):
return 1
def _init_result(self, nstation):
# Initialize result array and put times in first column
result = np.empty((len(self.times), nstation + 1), dtype=self.realtype)
result[:, :] = np.nan
result[:, 0] = np.array(self.times)
return result
[docs] def close(self):
"""
Close the file handle.
"""
self.file.close()