Source code for flopy.utils.datafile

"""
Module to read MODFLOW output files.  The module contains shared
abstract classes that should not be directly accessed.

"""
import os
from pathlib import Path
from typing import Union

import numpy as np





[docs]class LayerFile: """ The LayerFile class is the abstract base class from which specific derived classes are formed. LayerFile This class should not be instantiated directly. """ def __init__( self, filename: Union[str, os.PathLike], precision, verbose, kwargs ): from ..discretization.structuredgrid import StructuredGrid self.filename = Path(filename).expanduser().absolute() self.precision = precision self.verbose = verbose self.file = open(self.filename, "rb") # Get filesize to ensure this is not an empty file self.file.seek(0, 2) totalbytes = self.file.tell() self.file.seek(0, 0) # reset to beginning assert self.file.tell() == 0 if totalbytes == 0: raise ValueError(f"datafile error: file is empty: {filename}") self.nrow = 0 self.ncol = 0 self.nlay = 0 self.times = [] self.kstpkper = [] self.recordarray = [] self.iposarray = [] if precision == "single": self.realtype = np.float32 elif precision == "double": self.realtype = np.float64 else: raise Exception(f"Unknown precision specified: {precision}") self.model = None self.dis = None self.mg = None if "model" in kwargs.keys(): self.model = kwargs.pop("model") self.mg = self.model.modelgrid self.dis = self.model.dis if "dis" in kwargs.keys(): self.dis = kwargs.pop("dis") self.mg = self.dis.parent.modelgrid if "modelgrid" in kwargs.keys(): self.mg = kwargs.pop("modelgrid") if len(kwargs.keys()) > 0: args = ",".join(kwargs.keys()) raise Exception(f"LayerFile error: unrecognized kwargs: {args}") # read through the file and build the pointer index self._build_index() # now that we read the data and know nrow and ncol, # we can make a generic mg if needed if self.mg is None: self.mg = StructuredGrid( delc=np.ones((self.nrow,)), delr=np.ones( self.ncol, ), nlay=self.nlay, xoff=0.0, yoff=0.0, angrot=0.0, )
[docs] def to_shapefile( self, filename: Union[str, os.PathLike], kstpkper=None, totim=None, mflay=None, attrib_name="lf_data", verbose=False, ): """ Export model output data to a shapefile at a specific location in LayerFile instance. Parameters ---------- filename : str or PathLike Shapefile path to write kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : integer MODFLOW zero-based layer number to return. If None, then layer 1 will be written attrib_name : str Base name of attribute columns. (default is 'lf_data') verbose : bool Whether to print verbose output Returns ---------- None See Also -------- Notes ----- Examples -------- >>> import flopy >>> hdobj = flopy.utils.HeadFile('test.hds') >>> times = hdobj.get_times() >>> hdobj.to_shapefile('test_heads_sp6.shp', totim=times[-1]) """ plotarray = np.atleast_3d( self.get_data( kstpkper=kstpkper, totim=totim, mflay=mflay ).transpose() ).transpose() if mflay != None: attrib_dict = {f"{attrib_name}{mflay}": plotarray[0, :, :]} else: attrib_dict = {} for k in range(plotarray.shape[0]): name = f"{attrib_name}{k}" attrib_dict[name] = plotarray[k] from ..export.shapefile_utils import write_grid_shapefile write_grid_shapefile(filename, self.mg, attrib_dict, verbose)
[docs] def plot( self, axes=None, kstpkper=None, totim=None, mflay=None, filename_base=None, **kwargs, ): """ Plot 3-D model output data in a specific location in LayerFile instance Parameters ---------- axes : list of matplotlib.pyplot.axis List of matplotlib.pyplot.axis that will be used to plot data for each layer. If axes=None axes will be generated. (default is None) kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : int MODFLOW zero-based layer number to return. If None, then all all layers will be included. (default is None) filename_base : str Base file name that will be used to automatically generate file names for output image files. Plots will be exported as image files if file_name_base is not None. (default is None) **kwargs : dict pcolor : bool Boolean used to determine if matplotlib.pyplot.pcolormesh plot will be plotted. (default is True) colorbar : bool Boolean used to determine if a color bar will be added to the matplotlib.pyplot.pcolormesh. Only used if pcolor=True. (default is False) contour : bool Boolean used to determine if matplotlib.pyplot.contour plot will be plotted. (default is False) clabel : bool Boolean used to determine if matplotlib.pyplot.clabel will be plotted. Only used if contour=True. (default is False) grid : bool Boolean used to determine if the model grid will be plotted on the figure. (default is False) masked_values : list List of unique values to be excluded from the plot. file_extension : str Valid matplotlib.pyplot file extension for savefig(). Only used if filename_base is not None. (default is 'png') Returns ---------- None See Also -------- Notes ----- Examples -------- >>> import flopy >>> hdobj = flopy.utils.HeadFile('test.hds') >>> times = hdobj.get_times() >>> hdobj.plot(totim=times[-1]) """ if "file_extension" in kwargs: fext = kwargs.pop("file_extension") fext = fext.replace(".", "") else: fext = "png" masked_values = kwargs.pop("masked_values", []) if self.model is not None: if hasattr(self.model, "bas6") and self.model.bas6 is not None: masked_values.append(self.model.bas6.hnoflo) kwargs["masked_values"] = masked_values filenames = None if filename_base is not None: if mflay is not None: i0 = int(mflay) if i0 + 1 >= self.nlay: i0 = self.nlay - 1 i1 = i0 + 1 else: i0 = 0 i1 = self.nlay filenames = [ f"{filename_base}_Layer{k + 1}.{fext}" for k in range(i0, i1) ] # make sure we have a (lay,row,col) shape plotarray plotarray = np.atleast_3d( self.get_data( kstpkper=kstpkper, totim=totim, mflay=mflay ).transpose() ).transpose() from ..plot.plotutil import PlotUtilities return PlotUtilities._plot_array_helper( plotarray, model=self.model, axes=axes, filenames=filenames, mflay=mflay, modelgrid=self.mg, **kwargs, )
def _build_index(self): """ Build the recordarray and iposarray, which maps the header information to the position in the formatted file. """ raise Exception( "Abstract method _build_index called in LayerFile. " "This method needs to be overridden." )
[docs] def list_records(self): """ Print a list of all of the records in the file obj.list_records() """ for header in self.recordarray: print(header) return
def _get_data_array(self, totim=0): """ Get the three dimensional data array for the specified kstp and kper value or totim value. """ if totim >= 0.0: keyindices = np.where(self.recordarray["totim"] == totim)[0] if len(keyindices) == 0: msg = f"totim value ({totim}) not found in file..." raise Exception(msg) else: raise Exception("Data not found...") # initialize head with nan and then fill it idx = keyindices[0] nrow = self.recordarray["nrow"][idx] ncol = self.recordarray["ncol"][idx] data = np.empty((self.nlay, nrow, ncol), dtype=self.realtype) data[:, :, :] = np.nan for idx in keyindices: ipos = self.iposarray[idx] ilay = self.recordarray["ilay"][idx] if self.verbose: msg = f"Byte position in file: {ipos} for layer {ilay}" print(msg) self.file.seek(ipos, 0) nrow = self.recordarray["nrow"][idx] ncol = self.recordarray["ncol"][idx] shp = (nrow, ncol) data[ilay - 1] = self._read_data(shp) return data
[docs] def get_times(self): """ Get a list of unique times in the file Returns ---------- out : list of floats List contains unique simulation times (totim) in binary file. """ return self.times
[docs] def get_kstpkper(self): """ Get a list of unique stress periods and time steps in the file Returns ---------- out : list of (kstp, kper) tuples List of unique kstp, kper combinations in binary file. kstp and kper values are presently zero-based. """ kstpkper = [] for kstp, kper in self.kstpkper: kstpkper.append((kstp - 1, kper - 1)) return kstpkper
[docs] def get_data(self, kstpkper=None, idx=None, totim=None, mflay=None): """ Get data from the file for the specified conditions. Parameters ---------- idx : int The zero-based record number. The first record is record 0. kstpkper : tuple of ints A tuple containing the time step and stress period (kstp, kper). These are zero-based kstp and kper values. totim : float The simulation time. mflay : integer MODFLOW zero-based layer number to return. If None, then all all layers will be included. (Default is None.) Returns ---------- data : numpy array Array has size (nlay, nrow, ncol) if mflay is None or it has size (nrow, ncol) if mlay is specified. See Also -------- Notes ----- if both kstpkper and totim are None, will return the last entry Examples -------- """ # One-based kstp and kper for pulling out of recarray if kstpkper is not None: kstp1 = kstpkper[0] + 1 kper1 = kstpkper[1] + 1 idx = np.where( (self.recordarray["kstp"] == kstp1) & (self.recordarray["kper"] == kper1) ) if idx[0].shape[0] == 0: raise Exception( f"get_data() error: kstpkper not found:{kstpkper}" ) totim1 = self.recordarray[idx]["totim"][0] elif totim is not None: totim1 = totim elif idx is not None: totim1 = self.recordarray["totim"][idx] else: totim1 = self.times[-1] data = self._get_data_array(totim1) if mflay is None: return data else: return data[mflay, :, :]
[docs] def get_alldata(self, mflay=None, nodata=-9999): """ Get all of the data from the file. Parameters ---------- mflay : integer MODFLOW zero-based layer number to return. If None, then all all layers will be included. (Default is None.) nodata : float The nodata value in the data array. All array values that have the nodata value will be assigned np.nan. Returns ---------- data : numpy array Array has size (ntimes, nlay, nrow, ncol) if mflay is None or it has size (ntimes, nrow, ncol) if mlay is specified. See Also -------- Notes ----- Examples -------- """ rv = [] for totim in self.times: h = self.get_data(totim=totim, mflay=mflay) rv.append(h) rv = np.array(rv) rv[rv == nodata] = np.nan return rv
def _read_data(self, shp): """ Read data from file """ raise Exception( "Abstract method _read_data called in LayerFile. " "This method needs to be overridden." ) def _build_kijlist(self, idx): if isinstance(idx, list): kijlist = idx elif isinstance(idx, tuple): kijlist = [idx] else: raise Exception("Could not build kijlist from ", idx) # Check to make sure that k, i, j are within range, otherwise # the seek approach won't work. Can't use k = -1, for example. for k, i, j in kijlist: fail = False if k < 0 or k > self.nlay - 1: fail = True if i < 0 or i > self.nrow - 1: fail = True if j < 0 or j > self.ncol - 1: fail = True if fail: raise Exception( "Invalid cell index. Cell {} not within model grid: " "{}".format((k, i, j), (self.nlay, self.nrow, self.ncol)) ) return kijlist def _get_nstation(self, idx, kijlist): if isinstance(idx, list): return len(kijlist) elif isinstance(idx, tuple): return 1 def _init_result(self, nstation): # Initialize result array and put times in first column result = np.empty((len(self.times), nstation + 1), dtype=self.realtype) result[:, :] = np.nan result[:, 0] = np.array(self.times) return result
[docs] def close(self): """ Close the file handle. """ self.file.close()