"""
Code for preprocessing streamflow data.
"""
import collections
import os
from pathlib import Path
from shapely.geometry import Point
import numpy as np
import pandas as pd
from gisutils import shp2df, df2shp, project, get_values_at_points
from mfsetup.obs import make_obsname
from mfsetup.units import convert_volume_units, convert_time_units
from mapgwm.utils import makedirs, assign_geographic_obsgroups, cull_data_to_active_area
[docs]def preprocess_flows(data, metadata=None, flow_data_columns=['flow'],
start_date=None, active_area=None,
active_area_id_column=None,
active_area_feature_id=None,
source_crs=4269, dest_crs=5070,
datetime_col='datetime',
site_no_col='site_no',
line_id_col='line_id',
x_coord_col='x',
y_coord_col='y',
name_col='name',
flow_qualifier_column=None,
default_qualifier='measured',
include_sites=None,
include_line_ids=None,
source_volume_units='ft3',
source_time_units='s',
dest_volume_units='m3',
dest_time_units='d',
geographic_groups=None,
geographic_groups_col=None,
max_obsname_len=None,
add_leading_zeros_to_sw_site_nos=False,
column_renames=None,
outfile=None,
):
"""Preprocess stream flow observation data, for example, from NWIS or another data source that
outputs time series in CSV format with site locations and identifiers.
* Data are reprojected from a `source_crs` (Coordinate reference system; assumed to be in geographic coordinates)
to the CRS of the model (`dest_crs`)
* Data are culled to a `start_date` and optionally, a polygon or set of polygons defining the model area
* length and time units are converted to those of the groundwater model.
* Prefixes for observation names (with an optional length limit) that identify the location are generated
* Preliminary observation groups can also be assigned, based on geographic areas defined by polygons
(`geographic_groups` parameter)
Parameters
----------
data : csv file or DataFrame
Time series of stream flow observations.
Columns:
===================== ======================================
site_no site identifier
datetime measurement dates/times
x x-coordinate of site
y y-coordinate of site
flow_data_columns Columns of observed streamflow values
flow_qualifier_column Optional column with qualifiers for flow values
===================== ======================================
Notes:
* x and y columns can alternatively be in the metadata table
* flow_data_columns are denoted in `flow_data_columns`; multiple
columns can be included to process base flow and total flow, or
other statistics in tandem
* For example, `flow_qualifier_column` may have "estimated" or "measured"
flags denoting whether streamflows were derived from measured values
or statistical estimates.
metadata : csv file or DataFrame
Stream flow observation site information.
May include columns:
================= ================================================================================
site_no site identifier
x x-coordinate of site
y y-coordinate of site
name name of site
line_id_col Identifier for a line in a hydrography dataset that the site is associated with.
================= ================================================================================
Notes:
* other columns in metadata will be passed through to the metadata output
flow_data_columns : list of strings
Columns in data with flow values or their statistics.
By default, ['q_cfs']
start_date : str (YYYY-mm-dd)
Simulation start date (cull observations before this date)
active_area : str
Shapefile with polygon to cull observations to. Automatically reprojected
to dest_crs if the shapefile includes a .prj file.
by default, None.
active_area_id_column : str, optional
Column in active_area with feature ids.
By default, None, in which case all features are used.
active_area_feature_id : str, optional
ID of feature to use for active area
By default, None, in which case all features are used.
source_crs : obj
Coordinate reference system of the head observation locations.
A Python int, dict, str, or :class:`pyproj.crs.CRS` instance
passed to :meth:`pyproj.crs.CRS.from_user_input`
Can be any of:
- PROJ string
- Dictionary of PROJ parameters
- PROJ keyword arguments for parameters
- JSON string with PROJ parameters
- CRS WKT string
- An authority string [i.e. 'epsg:4326']
- An EPSG integer code [i.e. 4326]
- A tuple of ("auth_name": "auth_code") [i.e ('epsg', '4326')]
- An object with a `to_wkt` method.
- A :class:`pyproj.crs.CRS` class
By default, epsg:4269
dest_crs : obj
Coordinate reference system of the model. Same input types
as ``source_crs``.
By default, epsg:5070
datetime_col : str, optional
Column name in data with observation date/times,
by default 'datetime'
site_no_col : str, optional
Column name in data and metadata with site identifiers,
by default 'site_no'
line_id_col : str, optional
Column name in data or metadata with identifiers for
hydrography lines associated with observation sites.
by default 'line_id'
x_coord_col : str, optional
Column name in data or metadata with x-coordinates,
by default 'x'
y_coord_col : str, optional
Column name in data or metadata with y-coordinates,
by default 'y'
name_col : str, optional
Column name in data or metadata with observation site names,
by default 'name'
flow_qualifier_column : str, optional
Column name in data with flow observation qualifiers, such
as "measured" or "estimated"
by default 'category'
default_qualifier : str, optional
Default qualifier to populate flow_qualifier_column if it
is None. By default, "measured"
include_sites : list-like, optional
Exclude output to these sites.
by default, None (include all sites)
include_line_ids : list-like, optional
Exclude output to these sites, represented by line identifiers.
by default, None (include all sites)
source_volume_units : str, 'm3', 'cubic meters', 'ft3', etc.
Volume units of the source data. By default, 'ft3'
source_time_units : str, 's', 'seconds', 'days', etc.
Time units of the source data. By default, 's'
dest_volume_units : str, 'm3', 'cubic meters', 'ft3', etc.
Volume units of the output (model). By default, 'm3'
dest_time_units : str, 's', 'seconds', 'days', etc.
Time units of the output (model). By default, 'd'
geographic_groups : file, dict or list-like
Option to group observations by area(s) of interest. Can
be a shapefile, list of shapefiles, or dictionary of shapely polygons.
A 'group' column will be created in the metadata, and observation
sites within each polygon will be assigned the group name
associated with that polygon.
For example::
geographic_groups='../source_data/extents/CompositeHydrographArea.shp'
geographic_groups=['../source_data/extents/CompositeHydrographArea.shp']
geographic_groups={'cha': <shapely Polygon>}
Where 'cha' is an observation group name for observations located within the
the area defined by CompositeHydrographArea.shp. For shapefiles,
group names are provided in a `geographic_groups_col`.
geographic_groups_col : str
Field name in the `geographic_groups` shapefile(s) containing the
observation group names associated with each polygon.
max_obsname_len : int or None
Maximum length for observation name prefix. Default of 13
allows for a PEST obsnme of 20 characters or less with
<prefix>_yyyydd or <prefix>_<per>d<per>
(e.g. <prefix>_2d1 for a difference between stress periods 2 and 1)
If None, observation names will not be truncated. PEST++ does not have
a limit on observation name length.
add_leading_zeros_to_sw_site_nos : bool
Whether or not to pad site numbers using the
:func:~`mapgwm.swflows.format_usgs_sw_site_id` function.
By default, False.
column_renames : dict, optional
Option to rename columns in the data or metadata that are different than those listed above.
For example, if the data file has a 'SITE_NO' column instead of 'SITE_BADGE'::
column_renames={'SITE_NO': 'site_no'}
by default None, in which case the renames listed above will be used.
Note that the renames must be the same as those listed above for
:func:`mapgwm.swflows.preprocess_flows` to work.
outfile : str
Where output file will be written. Metadata are written to a file
with the same name, with an additional "_info" suffix prior to
the file extension.
Returns
-------
data : DataFrame
Preprocessed time series
metadata : DataFrame
Preprocessed metadata
References
----------
`The PEST++ Manual <https://github.com/usgs/pestpp/tree/master/documentation>`
Notes
-----
"""
# outputs
if outfile is not None:
outpath, filename = os.path.split(outfile)
makedirs(outpath)
outname, ext = os.path.splitext(outfile)
out_info_csvfile = outname + '_info.csv'
out_data_csvfile = outfile
out_shapefile = outname + '_info.shp'
# read the source data
if not isinstance(data, pd.DataFrame):
df = pd.read_csv(data, dtype={site_no_col: object})
else:
df = data.copy()
# check the columns
for col in [datetime_col] + flow_data_columns:
assert col in df.columns, "Column {} not found in {}".format(col,
data)
assert any({site_no_col, line_id_col}.intersection(df.columns)), \
"Neither {} or {} found in {}. Need to specify a site_no_col or line_id_col".format(site_no_col,
line_id_col, data)
# rename input columns to these names,
# for consistent output
dest_columns = {datetime_col: 'datetime',
site_no_col: 'site_no',
line_id_col: 'line_id',
x_coord_col: 'x',
y_coord_col: 'y',
name_col: 'name',
flow_qualifier_column: 'category'
}
# update the default column renames
# with any supplied via column_renames parameter
if isinstance(column_renames, collections.Mapping):
dest_columns.update(column_renames)
df.rename(columns=dest_columns, inplace=True)
flow_data_columns = [c if c not in dest_columns else dest_columns[c]
for c in flow_data_columns]
# convert site numbers to strings;
# add leading 0s to any USGS sites that should have them
if 'site_no' in df.columns:
df['site_no'] = format_site_ids(df['site_no'], add_leading_zeros_to_sw_site_nos)
else:
df['site_no'] = df[line_id_col]
# read the source data
if metadata is not None:
if not isinstance(metadata, pd.DataFrame):
md = pd.read_csv(metadata, dtype={site_no_col: object})
else:
md = metadata.copy()
if site_no_col not in md.columns or 'site_no' not in df.columns:
raise IndexError('If metadata are supplied, both data and metadata must '
'have a site_no column.')
md.rename(columns=dest_columns, inplace=True)
md['site_no'] = format_site_ids(md['site_no'], add_leading_zeros_to_sw_site_nos)
md.index = md['site_no']
by_site = df.groupby('site_no')
md['start_dt'] = pd.DataFrame(by_site['datetime'].first())
else:
by_site = df.groupby('site_no')
md = pd.DataFrame(by_site['datetime'].first())
md.columns = ['start_dt']
md['site_no'] = md.index
md['end_dt'] = pd.DataFrame(by_site['datetime'].last())
md['n'] = pd.DataFrame(by_site['datetime'].count())
md.reset_index(inplace=True, drop=True)
# assign metadata if supplied
for col in 'x', 'y', 'line_id', 'name':
if col in df.columns and col not in md.columns:
by_site_no = dict(zip(df['site_no'], df[col]))
md[col] = [by_site_no[sn] for sn in md['site_no']]
if col != 'line_id':
df.drop(col, axis=1, inplace=True)
# index the dataframe to times;
# truncate data before start date
df.index = pd.to_datetime(df['datetime'])
df.index.name = 'datetime'
df = df.loc[start_date:].copy()
# project x, y to model crs
x_pr, y_pr = project((md.x.values, md.y.values), source_crs, dest_crs)
md['x'], md['y'] = x_pr, y_pr
md['geometry'] = [Point(x, y) for x, y in zip(x_pr, y_pr)]
# cull data to that within the model area
if active_area is not None:
df, md = cull_data_to_active_area(df, active_area,
active_area_id_column,
active_area_feature_id,
data_crs=dest_crs, metadata=md)
# get the hydrography IDs corresponding to each site
# using the included lookup table
#if 'line_id' not in df.columns:
# assert line_id_lookup is not None, \
# "need to include line_ids in a column, or line_id_lookup dictionary mapping line_ids to site numbers"
# df = df.loc[df['site_no'].isin(line_id_lookup)].copy()
# df['line_id'] = [line_id_lookup[sn] for sn in df['site_no']]
if include_sites is not None:
md = md.loc[md.site_no.isin(include_sites)]
df = df.loc[df.site_no.isin(include_sites)]
if include_line_ids is not None:
md = md.loc[md.line_id.isin(include_line_ids)]
df = df.loc[df.line_id.isin(include_line_ids)]
# convert units
# ensure that flow values are numeric (may be objects if taken directly from NWIS)
unit_conversion = (convert_volume_units(source_volume_units, dest_volume_units) /
convert_time_units(source_time_units, dest_time_units))
for flow_col in flow_data_columns:
df[flow_col] = pd.to_numeric(df[flow_col], errors='coerce') * unit_conversion
df.dropna(subset=flow_data_columns, axis=0, inplace=True)
# reformat qualifiers for consistent output
# (lump to dest category columns of either estimated or measured)
# with measured including values derived from baseflow separation or actual measurements)
# output column name for flow qualifier column:
dest_flow_qualifier_column = 'category'
if flow_qualifier_column is not None:
flow_qualifiers = {'calculated': 'measured', # 'measured',
'base flow separated from measured values': 'measured', # 'measured',
'measured total flow': 'measured',
'estimated gaged': 'estimated',
'estimated ungaged': 'estimated'}
df[dest_flow_qualifier_column] = df[flow_qualifier_column].replace(flow_qualifiers)
else:
df['category'] = default_qualifier
# make unique n-character prefixes (site identifiers) for each observation location
# 13 character length allows for prefix_yyyymmm in 20 character observation names
# (BeoPEST limit)
unique_obsnames = set()
obsnames = []
for sn in md['site_no'].tolist():
if max_obsname_len is not None:
name = make_obsname(sn, unique_names=unique_obsnames,
maxlen=max_obsname_len)
assert name not in unique_obsnames
else:
name = sn
unique_obsnames.add(name)
obsnames.append(name)
md['obsprefix'] = obsnames
# add area of interest information
md['group'] = 'fluxes'
md = assign_geographic_obsgroups(md, geographic_groups,
geographic_groups_col,
metadata_crs=dest_crs)
# data columns
data_cols = ['site_no', 'line_id', 'datetime'] + flow_data_columns + ['category']
#if 'line_id' in md.columns and 'line_id' not in df.columns:
# # only map line_ids to data if there are more site numbers
# # implying that no site number maps to more than one line_id
# if len(set(df.site_no)) >= len(set(df.line_id)):
# ids = dict(zip(md['site_no'], md['line_id']))
# df['line_id'] = [ids[sn] for sn in df['site_no']]
data_cols = [c for c in data_cols if c in df.columns]
df = df[data_cols]
md.index = md['site_no']
# save out the results
if outfile is not None:
df2shp(md.drop(['x', 'y'], axis=1),
out_shapefile, crs=dest_crs)
print('writing {}'.format(out_info_csvfile))
md.drop('geometry', axis=1).to_csv(out_info_csvfile, index=False, float_format='%g')
print('writing {}'.format(out_data_csvfile))
df.to_csv(out_data_csvfile, index=False, float_format='%g')
return df, md
[docs]def combine_measured_estimated_values(measured_values, estimated_values,
measured_values_data_col, estimated_values_data_col,
dest_values_col='obsval',
resample_freq='MS', how='mean'):
"""Combine time series of measured and estimated values for multiple sites,
giving preference to measured values.
Parameters
----------
measured_values : csv file or DataFrame
Time series of measured values at multiple sites, similar to that
output by :func:`~mapgwm.swflows.preprocess_flows`.
Columns:
===================== ===========================================
site_no site identifiers; read-in as strings
datetime measurement dates/times
data columns columns with floating-point data to combine
===================== ===========================================
estimated_values : csv file or DataFrame
Time series of measured values at multiple sites, similar to that
output by :func:`~mapgwm.swflows.preprocess_flows`.
Columns:
===================== ===========================================
site_no site identifiers; read-in as strings
datetime measurement dates/times
data columns columns with floating-point data to combine
===================== ===========================================
measured_values_data_col : str
Column in `measured_values` with data to combine.
estimated_values_data_col : str
Column in `estimated_values` with data to combine.
dest_values_col : str
Output column with combined data from `measured_values_data_col`
and estimated_values_data_col, by default 'obsval'
resample_freq : str or DateOffset
Any `pandas frequency alias <https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-offset-aliases>`_
The data columns in `measured_values` and `estimated_values` are resampled
to this fequency using the method specified by ``how``.
By default, 'MS' (month-start)
how : str
Resample method. Can be any of the method calls on the pandas
`Resampler <https://pandas.pydata.org/pandas-docs/stable/reference/resampling.html>`_
object. By default, 'mean'
Returns
-------
combined : DataFrame
DataFrame containing all columns from `estimated_values`, the data columns
from `measured_values`, and a `dest_values_col` consisting of measured values where
present, and estimated values otherwise. An ``"est_"`` prefix is added to the
estimated data columns, and a ``"meas"`` prefix is added to the measured data columns.
Example:
======== ========== ========= ============== ============== ============
site_no datetime category est_qbase_m3d meas_qbase_m3d obsval
======== ========== ========= ============== ============== ============
07288000 2017-10-01 measured 47872.1 28438.7 28438.7
07288000 2017-11-01 measured 47675.9 24484.5 24484.5
======== ========== ========= ============== ============== ============
Where ``category`` denotes whether the value in obsval is measured or estimated.
Notes
-----
All columns with a floating-point dtype are identified as "Data columns," and
are resampled as specified by the `resample_freq` and `how` arguments. For all other
columns, the first value for each time at each site is used. The resampled measured
data are joined to the resampled estimated data on the basis of site numbers
and times (as a pandas `MultiIndex`).
"""
# read in the data
if not isinstance(measured_values, pd.DataFrame):
measured = pd.read_csv(measured_values, dtype={'site_no': object})
measured['datetime'] = pd.to_datetime(measured['datetime'])
measured.index = measured['datetime']
else:
measured = measured_values.copy()
if not isinstance(estimated_values, pd.DataFrame):
df = pd.read_csv(estimated_values, dtype={'site_no': object})
df['datetime'] = pd.to_datetime(df['datetime'])
df.index = df['datetime']
else:
df = estimated_values.copy()
# resample both timeseries to the resample_freq
df_rs = resample_group_timeseries(df, resample_freq=resample_freq, how=how,
add_data_prefix='est')
df_rs['category'] = 'estimated'
measured_rs = resample_group_timeseries(measured, resample_freq=resample_freq, how=how,
add_data_prefix='meas')
# fill any nan values created in resampling with 'measured' classifier
measured_rs['category'] = 'measured'
# add the measured values to the estimated
measured_data_columns = measured_rs.select_dtypes(include=[np.float]).columns
# join the data columns
combined = df_rs.join(measured_rs[measured_data_columns], how='outer')
# update the site_no, datetime and category columns from measured
# (that were not filled in outer join)
combined.update(measured_rs)
# make the obsval column, starting with the estimated values
combined[dest_values_col] = combined['est_' + estimated_values_data_col]
# prefix was added to measured data column by resample_group_timeseries
measured_values_data_col = 'meas_' + measured_values_data_col
# populate obsval column with all measured values
# (over-writing any pre-existing estimated values)
has_measured = ~combined[measured_values_data_col].isna()
combined.loc[has_measured, dest_values_col] = combined.loc[has_measured, measured_values_data_col]
# verify that only nan values in obsval column are for measured sites
# (that don't have estimated values)
assert np.all(combined.loc[combined.obsval.isna(), 'category'] == 'measured')
# drop observations with no measured or estimated values
combined.dropna(subset=['obsval'], axis=0, inplace=True)
return combined.reset_index(drop=True)
[docs]def resample_group_timeseries(df, resample_freq='MS', how='mean',
add_data_prefix=None):
"""Resample a DataFrame with both groups (e.g. measurement sites)
and time series (measurements at each site).
Parameters
----------
df : DataFrame
Time series of values at multiple sites, similar to that
output by :func:`~mapgwm.swflows.preprocess_flows`.
Columns:
===================== ===========================================
site_no site identifiers; read-in as strings
datetime measurement dates/times
data columns columns with floating-point data to resample
===================== ===========================================
resample_freq : str or DateOffset
Any `pandas frequency alias <https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-offset-aliases>`_
The data columns in `measured_values` and `estimated_values` are resampled
to this fequency using the method specified by ``how``.
By default, 'MS' (month-start)
how : str
Resample method. Can be any of the method calls on the pandas
`Resampler <https://pandas.pydata.org/pandas-docs/stable/reference/resampling.html>`_
object. By default, 'mean'
add_data_prefix : str
Option to add prefix to data columns. By default, None
Returns
-------
resampled : DataFrame
Resampled data at each site.
Notes
-----
All columns with a floating-point dtype are identified as "Data columns," and
are resampled as specified by the `resample_freq` and `how` arguments. For all other
columns, the first value for each time at each site is used.
"""
if not isinstance(df.index, pd.DatetimeIndex):
raise ValueError("Input must have a DatetimeIndex.")
df_resampler = df.groupby('site_no').resample(resample_freq)
df_rs = df_resampler.first().copy()
# resample the data columns
data_columns = df_rs.select_dtypes(include=[np.float]).columns
non_data_columns = [c for c in df.columns if c not in data_columns]
resampled_values = getattr(df_resampler[data_columns], how)()
# put the non-data and data columns back together
df_rs = df_rs[non_data_columns].join(resampled_values)
# add 'est' suffix to column names
if add_data_prefix is not None:
for col in data_columns:
df_rs[add_data_prefix + '_' + col] = df_rs[col]
df_rs.drop(col, axis=1, inplace=True)
# index dates may vary depending on freq (e.g. 'MS' vs 'M')
df_rs['site_no'] = df_rs.index.get_level_values(0)
df_rs['datetime'] = df_rs.index.get_level_values(1)
return df_rs
[docs]def aggregrate_values_to_stress_periods(data, perioddata,
datetime_col='datetime',
values_col='values',
id_col='id',
category_col='qualifier',
keep_columns=None):
"""Pandas sausage-making to take flow values at arbitrary times,
and average them to model stress periods defined in a perioddata dataframe.
Optionally, a category column identifying flow values as 'measured' or
'estimated' can also be read. Measured values are used for the averages
where available; the number of measured and estimated values contributing
to each average are tallied in the output.
Parameters
----------
data : DataFrame
Input data
perioddata : DataFrame
Stress Period start/end times. Must have columns:
============== ================= ========================
per int MODFLOW Stress Period
start_datetime str or datetime64 Period start time
end_datetime str or datetime64 Period end time
============== ================= ========================
datetime_col : str
Column in data for Measurement dates (str or datetime64)
id_col: int or str
Column in data identifying the hydrography line or measurement site
associated with flow value. Valid identifiers corresponding to the source
hydrography (e.g. NHDPlus COMIDs) may be needed for locating the flows
within the stream network, unless x and y coordinates are available.
values_col : float
Column in data with observed or estimated values.
category_col : str; categorical
Column in data with 'measured' or 'estimated' flags indicating how each flow
value was derived. If None, 'measured' is used for all flows. By default, 'qualifier'.
site_no_col : str
Optional column in data identifying the measurement site associated with each value,
for example, for keeping track of measurement site numbers or names that are different
than the hydrography identifiers in line_id_col.
Returns
-------
df_per : DataFrame
Stress period averages, and metadata describing the source of the
averages (number of estimated vs. measured values). For each site,
also includes averages and standard deviations for measurements
from outside the stress periods defined in perioddata. For sites
with no measurements within a stress period, an average of all
other measurements is used.
Notes
-----
This method is similar to mfsetup.tdis.aggregate_dataframe_to_stress_period in
what it does (resample time series to model stress periods), and for modflow-setup,
has is superceded by that method (called via TransientTabularData object). Keeping this
method here though, in case we need to use it in the future.
Key differences between two methods:
* this method operates on the whole timeseres of model stress periods instead of a single stress period
* this method allows for specification of both measured and estimated values; the number of estimated and measured values
contributing to each average are included in the output
* duplicate sites in mfsetup.tdis.aggregate_dataframe_to_stress_period (for example,
to handle wells located in the same model cell) can be aggregated with sum, mean, first, etc., or
by raising an error. In this method, the duplicate site with the most measurements (vs. estimates) is retained.
* this method fills stress periods without measurements using the mean values for all time.
"""
# make dictionary of data keyed by site
values = data.copy()
# optionally keep track of auxillary data
# (site number, name, etc.)
if keep_columns is not None:
auxillary_info = {}
for col in keep_columns:
auxillary_info[col] = dict(zip(values[id_col], values[col]))
# create category column if there is none, to conform to logic below
if category_col not in values.columns:
values[category_col] = 'measured'
values[datetime_col] = pd.to_datetime(values[datetime_col])
values.index = pd.MultiIndex.from_tuples(list(zip(values[id_col], values[datetime_col], values[category_col])))
values = {s: values.loc[s] for s in values.index.levels[0]}
# Compute average base flow for each site, stress period
# fill missing values using means
# add columns describing source of base flow values
# vs = [] # list of processed monthly baseflow dataframes
vpers = [] # list of processed period baseflow dataframes
for k, v in values.items():
# create single set of values
# Use measured data where available; estimates where unvailable
# Note: unstack will fail if there are two measurement sites on the same line_id
# (with the same dates, which would be duplicates wrt the line_id)
# look at the fraction of values that are estimated;
# keep the site with the most measurements
if np.any(np.any(v.index.duplicated())):
duplicate_sites = np.array(list(set(v.site_no)))
pct_estimated = []
for site_no in duplicate_sites:
site_values = v.loc[v.site_no == site_no]
pct_estimated.append(np.sum(site_values[category_col] == 'estimated') / len(site_values))
inds = np.argsort(pct_estimated)
duplicate_sites = duplicate_sites[inds]
v = v.loc[v.site_no == duplicate_sites[0]].copy()
v = v.drop([datetime_col, category_col], axis=1).unstack(level=1)
if 'measured' in v[values_col].columns:
if len(v[values_col].columns) > 1:
v['picked'] = v[values_col]['measured']
notmeasured = np.isnan(v[values_col]['measured'])
v.loc[notmeasured, 'picked'] = v[values_col]['estimated'].loc[notmeasured]
v['method'] = 'measured'
v.loc[notmeasured, 'method'] = 'estimated'
if np.any(notmeasured):
assert v.loc[notmeasured].method.unique()[0] == 'estimated'
if np.any(~notmeasured):
assert v.loc[~notmeasured].method.unique()[0] == 'measured'
else:
v['picked'] = v[values_col]['measured']
v['method'] = 'measured'
elif 'estimated' in v[values_col].columns:
v['picked'] = v[values_col]['estimated']
v['method'] = 'estimated'
elif 'measured total flow' in v[values_col].columns:
v['picked'] = v[values_col]['measured total flow'] # so that tallies go in "n_measured"
v['method'] = 'measured total flow'
else:
pass
v = v[['picked', 'method']].copy()
v.columns = v.columns.droplevel(level=1)
# Figure out which stress period each discharge value is in
# 999 indicates values outside of the simulation timeframe
v['per'] = -9999
dfs = []
for i, r in perioddata.iterrows():
v.loc[r.start_datetime:r.end_datetime, 'per'] = r.per
# shave end date off dates in period
# (otherwise first month of next period will be included)
per_end_datetime = pd.Timestamp(r.end_datetime) - pd.Timedelta(1)
df = v.loc[r.start_datetime:per_end_datetime].copy()
df['per'] = r.per
dfs.append(df)
dfs.append(v.loc[v.per == -9999]) # keep values outside of the simulation timeframe
v = pd.concat(dfs)
# Tally number of measured vs. estimated values for each SP
vper = v.groupby(['per', 'method']).count().picked.unstack()
vper.rename(columns={'estimated': 'n_estimated',
'measured': 'n_measured'}, inplace=True)
for c in ['n_estimated', 'n_measured']:
if c not in vper.columns:
vper[c] = 0
# compute mean value and stdev for each stress period
vper['Q_avg'] = v.groupby('per').mean()
vper['Q_std'] = v.groupby('per').std()
vper.columns.name = None
# reindex to model times dataframe
# (pandas will fill empty periods with nans)
# use an outer join to retain values outside of the model time period
vper = perioddata.join(vper, how='outer')
# fill in the start/end times for data outside of simulation period
vper.loc[-9999, 'start_datetime'] = v.loc[v.per == -9999].index.min()
vper.loc[-9999, 'end_datetime'] = v.loc[v.per == -9999].index.max()
vper.loc[-9999, 'per'] = -9999
vper.per = vper.per.astype(int)
# nans in n_measured and n_estimated columns mean there were no values
# for that period. Fill with zero, convert columns to ints.
vper.fillna({'n_measured': 0, 'n_estimated': 0}, inplace=True)
vper['n_measured'] = vper.n_measured.astype(int)
vper['n_estimated'] = vper.n_estimated.astype(int)
# fill in the missing data with mean value
vper['filled'] = np.isnan(vper.Q_avg)
vper['Q_avg'] = vper.Q_avg.fillna(vper.Q_avg.mean())
v[id_col] = k
vper[id_col] = k
# map any auxillary info back to output using site numbers
if keep_columns is not None:
for col in keep_columns:
vper[col] = auxillary_info[col][k]
# vs.append(v)
vpers.append(vper)
df_per = pd.concat(vpers, sort=True)
return df_per