# Copyright 2016 Sandia Corporation and the National Renewable Energy
# Laboratory
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
The Extreme Sea State Contour (ESSC) module contains the tools necessary to
calculate environmental contours of extreme sea states for buoy data.
'''
import numpy as np
import scipy.stats as stats
import scipy.optimize as optim
import scipy.interpolate as interp
import matplotlib.pyplot as plt
import h5py
from sklearn.decomposition import PCA as skPCA
import requests
import bs4
import urllib.request
import re
from datetime import datetime, date
import os
import glob
import copy
import statsmodels.api as sm
from statsmodels import robust
import urllib
import matplotlib
[docs]class EA:
'''The Environmental Assessment (EA) class points to functions for
various contour methods (including getContours and getSamples) and allows
the user to plot results (plotData), sample along the contour
(getContourPoints), calculate the wave breaking steepness curve (steepness)
and/or use the bootstrap method to calculate 95% confidence bounds about
the contours (bootStrap).'''
def __init__():
return
[docs] def getContours():
'''Points to the getContours function in whatever contouring method is used'''
return
[docs] def getSamples():
'''Points to the getSamples function in whatever contouring method is
used, currently only implemented for PCA contours. Implementation for
additional contour methods planned for future release.'''
return
[docs] def saveContour(self, fileName=None):
'''Saves all available contour data obtained via the EA module to
a .h5 file
Parameters
----------
fileName : string
relevent path and filename where the .h5 file will be created and
saved. If no filename, the h5 file will be named NDBC(buoyNum).h5
'''
if (fileName is None):
fileName = 'NDBC' + str(self.buoy.buoyNum) + '.h5'
else:
_, file_extension = os.path.splitext(fileName)
if not file_extension:
fileName = fileName + '.h5'
print(fileName);
with h5py.File(fileName, 'a') as f:
if('method' in f):
f['method'][...] = self.method
else:
f.create_dataset('method', data=self.method)
if('parameters' in f):
gp = f['parameters']
else:
gp = f.create_group('parameters')
self._saveParams(gp)
if(self.Hs_ReturnContours is not None):
if('ReturnContours' in f):
grc = f['ReturnContours']
else:
grc = f.create_group('ReturnContours')
if('T_Return' in grc):
f_T_Return = grc['T_Return']
f_T_Return[...] = self.T_ReturnContours
else:
f_T_Return = grc.create_dataset('T_Return', data=self.T_ReturnContours)
f_T_Return.attrs['units'] = 's'
f_T_Return.attrs['description'] = 'contour, energy period'
if('Hs_Return' in grc):
f_Hs_Return = grc['Hs_Return']
f_Hs_Return[...] = self.Hs_ReturnContours
else:
f_Hs_Return = grc.create_dataset('Hs_Return', data=self.Hs_ReturnContours)
f_Hs_Return.attrs['units'] = 'm'
f_Hs_Return.attrs['description'] = 'contours, significant wave height'
# Samples for full sea state long term analysis
if(hasattr(self, 'Hs_SampleFSS') and self.Hs_SampleFSS is not None):
if('Samples_FullSeaState' in f):
gfss = f['Samples_FullSeaState']
else:
gfss = f.create_group('Samples_FullSeaState')
if('Hs_SampleFSS' in gfss):
f_Hs_SampleFSS = gfss['Hs_SampleFSS']
f_Hs_SampleFSS[...] = self.Hs_SampleFSS
else:
f_Hs_SampleFSS = gfss.create_dataset('Hs_SampleFSS', data=self.Hs_SampleFSS)
f_Hs_SampleFSS.attrs['units'] = 'm'
f_Hs_SampleFSS.attrs['description'] = 'full sea state significant wave height samples'
if('T_SampleFSS' in gfss):
f_T_SampleFSS = gfss['T_SampleFSS']
f_T_SampleFSS[...] = self.T_SampleFSS
else:
f_T_SampleFSS = gfss.create_dataset('T_SampleFSS', data=self.T_SampleFSS)
f_T_SampleFSS.attrs['units'] = 's'
f_T_SampleFSS.attrs['description'] = 'full sea state energy period samples'
if('Weight_SampleFSS' in gfss):
f_Weight_SampleFSS = gfss['Weight_SampleFSS']
f_Weight_SampleFSS[...] = self.Weight_SampleFSS
else:
f_Weight_SampleFSS = gfss.create_dataset('Weight_SampleFSS', data = self.Weight_SampleFSS)
f_Weight_SampleFSS.attrs['description'] = 'full sea state relative weighting samples'
# Samples for contour approach long term analysis
if(hasattr(self, 'Hs_SampleCA') and self.Hs_SampleCA is not None):
if('Samples_ContourApproach' in f):
gca = f['Samples_ContourApproach']
else:
gca = f.create_group('Samples_ContourApproach')
if('Hs_SampleCA' in gca):
f_Hs_sampleCA = gca['Hs_SampleCA']
f_Hs_sampleCA[...] = self.Hs_SampleCA
else:
f_Hs_sampleCA = gca.create_dataset('Hs_SampleCA', data=self.Hs_SampleCA)
f_Hs_sampleCA.attrs['units'] = 'm'
f_Hs_sampleCA.attrs['description'] = 'contour approach significant wave height samples'
if('T_SampleCA' in gca):
f_T_sampleCA = gca['T_SampleCA']
f_T_sampleCA[...] = self.T_SampleCA
else:
f_T_sampleCA = gca.create_dataset('T_SampleCA', data=self.T_SampleCA)
f_T_sampleCA.attrs['units'] = 's'
f_T_sampleCA.attrs['description'] = 'contour approach energy period samples'
[docs] def plotData(self):
"""
Display a plot of the 100-year return contour, full sea state samples
and contour samples
"""
plt.figure()
plt.plot(self.buoy.T, self.buoy.Hs, 'bo', alpha=0.1, label='NDBC data')
plt.plot(self.T_ReturnContours, self.Hs_ReturnContours, 'k-', label='100 year contour')
#plt.plot(self.T_SampleFSS, self.Hs_SampleFSS, 'ro', label='full sea state samples')
#plt.plot(self.T_SampleCA, self.Hs_SampleCA, 'y^', label='contour approach samples')
plt.legend(loc='lower right', fontsize='small')
plt.grid(True)
plt.xlabel('Energy period, $T_e$ [s]')
plt.ylabel('Sig. wave height, $H_s$ [m]')
plt.show()
[docs] def getContourPoints(self, T_Sample):
'''Get Hs points along a specified environmental contour using
user-defined T values.
Parameters
----------
T_Sample : nparray
points for sampling along return contour
Returns
-------
Hs_SampleCA : nparray
points sampled along return contour
Example
-------
To calculate Hs values along the contour at specific
user-defined T values:
import WDRT.ESSC as ESSC
import numpy as np
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create PCA EA object for buoy
pca46022 = ESSC.PCA(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Generate contour
Hs_Return, T_Return = pca46022.getContours(Time_SS, Time_r,nb_steps)
# Use getContourPoints to find specific points along the contour
T_sampleCA = np.arange(12, 26, 2)
Hs_sampleCA = pca46022.getContourPoints(T_sampleCA)
'''
#finds minimum and maximum energy period values
amin = np.argmin(self.T_ReturnContours)
amax = np.argmax(self.T_ReturnContours)
#finds points along the contour
w1 = self.Hs_ReturnContours[amin:amax]
w2 = np.concatenate((self.Hs_ReturnContours[amax:], self.Hs_ReturnContours[:amin]))
if (np.max(w1) > np.max(w2)):
x1 = self.T_ReturnContours[amin:amax]
y = self.Hs_ReturnContours[amin:amax]
else:
x1 = np.concatenate((self.T_ReturnContours[amax:], self.T_ReturnContours[:amin]))
y1 = np.concatenate((self.Hs_ReturnContours[amax:], self.Hs_ReturnContours[:amin]))
#sorts data based on the max and min energy period values
ms = np.argsort(x1)
x = x1[ms]
y = y1[ms]
#interpolates the sorted data
si = interp.interp1d(x, y)
#finds the wave height based on the user specified energy period values
Hs_SampleCA = si(T_Sample)
self.T_SampleCA = T_Sample
self.Hs_SampleCA = Hs_SampleCA
return Hs_SampleCA
[docs] def steepness(self, SteepMax, T_vals, depth = None):
'''This function calculates a steepness curve to be plotted on an H vs. T
diagram. First, the function calculates the wavelength based on the
depth and T. The T vector can be the input data vector, or will be
created below to cover the span of possible T values.
The function solves the dispersion relation for water waves
using the Newton-Raphson method. All outputs are solved for exactly
using: :math:`hw^2/g = kh*tanh(khG)`
Approximations that could be used in place of this code for deep
and shallow water, as appropriate:
deep water: :math:`h/\lambda \geq 1/2, tanh(kh) \sim 1, \lambda = (gT^2)/(2\pi)`
shallow water: :math:`h/\lambda \leq 1/20, tanh(kh) \sim kh, \lambda = \sqrt{T(gh)}`
Parameters
----------
SteepMax: float
Wave breaking steepness estimate (e.g., 0.07).
T_vals :np.array
Array of T values [sec] at which to calculate the breaking height.
depth: float
Depth at site
Note: if not inputted, the depth will tried to be grabbed from the respective
buoy type's website.
Returns
-------
SteepH: np.array
H values [m] that correspond to the T_mesh values creating the
steepness curve.
T_steep: np.array
T values [sec] over which the steepness curve is defined.
Example
-------
To find limit the steepness of waves on a contour by breaking:
import numpy as np
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create PCA EA object for buoy
pca46022 = ESSC.PCA(buoy46022)
T_vals = np.arange(0.1, np.amax(buoy46022.T), 0.1)
# Enter estimate of breaking steepness
SteepMax = 0.07 # Reference DNV-RP-C205
# Declare required parameters
depth = 391.4 # Depth at measurement point (m)
SteepH = pca46022.steepness(depth,SteepMax,T_vals)
'''
# Calculate the wavelength at a given depth at each value of T
if depth == None:
depth = self.__fetchDepth()
lambdaT = []
g = 9.81 # [m/s^2]
omega = ((2 * np.pi) / T_vals)
lambdaT = []
for i in range(len(T_vals)):
# Initialize kh using Eckart 1952 (mentioned in Holthuijsen pg. 124)
kh = (omega[i]**2) * depth / \
(g * (np.tanh((omega[i]**2) * depth / g)**0.5))
# Find solution using the Newton-Raphson Method
for j in range(1000):
kh0 = kh
f0 = (omega[i]**2) * depth / g - kh0 * np.tanh(kh0)
df0 = -np.tanh(kh) - kh * (1 - np.tanh(kh)**2)
kh = -f0 / df0 + kh0
f = (omega[i]**2) * depth / g - kh * np.tanh(kh)
if abs(f0 - f) < 10**(-6):
break
lambdaT.append((2 * np.pi) / (kh / depth))
del kh, kh0
lambdaT = np.array(lambdaT, dtype=np.float)
SteepH = lambdaT * SteepMax
return SteepH
def __fetchDepth(self):
'''Obtains the depth from the website for a buoy (either NDBC or CDIP)'''
if self.buoy.buoyType == "NDBC":
url = "https://www.ndbc.noaa.gov/station_page.php?station=%s" % (46022)
ndbcURL = requests.get(url)
ndbcURL.raise_for_status()
ndbcHTML = bs4.BeautifulSoup(ndbcURL.text, "lxml")
header = ndbcHTML.find("b", text="Water depth:")
return float(str(header.nextSibling).split()[0])
elif self.buoy.buoyType == "CDIP":
url = "http://cdip.ucsd.edu/cgi-bin/wnc_metadata?ARCHIVE/%sp1/%sp1_historic" % (self.buoy.buoyNum, self.buoy.buoyNum)
cdipURL = requests.get(url)
cdipURL.raise_for_status()
cdipHTML = bs4.BeautifulSoup(cdipURL.text, "lxml")
#Parse the table for the depth value
depthString = str(cdipHTML.findChildren("td", {"class" : "plus"})[0])
depthString = depthString.split("<br/>")[2]
return float(re.findall(r"[-+]?\d*\.\d+|\d+", depthString)[0])
[docs] def bootStrap(self, boot_size=1000, plotResults=True):
'''Get 95% confidence bounds about a contour using the bootstrap
method. Warning - this function is time consuming. Computation
time depends on selected boot_size.
Parameters
----------
boot_size: int (optional)
Number of bootstrap samples that will be used to calculate 95%
confidence interval. Should be large enough to calculate stable
statistics. If left blank will be set to 1000.
plotResults: boolean (optional)
Option for showing plot of bootstrap confidence bounds. If left
blank will be set to True and plot will be shown.
Returns
-------
contourmean_Hs : nparray
Hs values for mean contour calculated as the average over all
bootstrap contours.
contourmean_T : nparray
T values for mean contour calculated as the average over all
bootstrap contours.
Example
-------
To generate 95% boostrap contours for a given contour method:
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create PCA EA object for buoy
pca46022 = ESSC.PCA(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Contour generation
Hs_Return, T_Return = pca46022.getContours(Time_SS, Time_r,nb_steps)
# Calculate boostrap confidence interval
contourmean_Hs, contourmean_T = pca46022.bootStrap(boot_size=10)
'''
#preallocates arrays
n = len(self.buoy.Hs)
Hs_Return_Boot = np.zeros([self.nb_steps,boot_size])
T_Return_Boot = np.zeros([self.nb_steps,boot_size])
buoycopy = copy.deepcopy(self.buoy);
#creates copies of the data based on how it was modeled.
for i in range(boot_size):
boot_inds = np.random.randint(0, high=n, size=n)
buoycopy.Hs = copy.deepcopy(self.buoy.Hs[boot_inds])
buoycopy.T = copy.deepcopy(self.buoy.T[boot_inds])
essccopy=None
if self.method == "Principle component analysis":
essccopy = PCA(buoycopy, self.size_bin)
elif self.method == "Gaussian Copula":
essccopy = GaussianCopula(buoycopy, self.n_size, self.bin_1_limit, self.bin_step)
elif self.method == "Rosenblatt":
essccopy = Rosenblatt(buoycopy, self.n_size, self.bin_1_limit, self.bin_step)
elif self.method == "Clayton Copula":
essccopy = ClaytonCopula(buoycopy, self.n_size, self.bin_1_limit, self.bin_step)
elif self.method == "Gumbel Copula":
essccopy = GumbelCopula(buoycopy, self.n_size, self.bin_1_limit, self.bin_step, self.Ndata)
elif self.method == "Non-parametric Gaussian Copula":
essccopy = NonParaGaussianCopula(buoycopy, self.Ndata, self.max_T, self.max_Hs)
elif self.method == "Non-parametric Clayton Copula":
essccopy = NonParaClaytonCopula(buoycopy, self.Ndata, self.max_T, self.max_Hs)
elif self.method == "Non-parametric Gumbel Copula":
essccopy = NonParaGumbelCopula(buoycopy, self.Ndata, self.max_T, self.max_Hs)
Hs_Return_Boot[:,i],T_Return_Boot[:,i] = essccopy.getContours(self.time_ss, self.time_r, self.nb_steps)
#finds 95% CI values for wave height and energy
contour97_5_Hs = np.percentile(Hs_Return_Boot,97.5,axis=1)
contour2_5_Hs = np.percentile(Hs_Return_Boot,2.5,axis=1)
contourmean_Hs = np.mean(Hs_Return_Boot, axis=1)
contour97_5_T = np.percentile(T_Return_Boot,97.5,axis=1)
contour2_5_T = np.percentile(T_Return_Boot,2.5,axis=1)
contourmean_T = np.mean(T_Return_Boot, axis=1)
self.contourMean_Hs = contourmean_Hs
self.contourMean_T = contourmean_T
#plotting function
def plotResults():
plt.figure()
plt.plot(self.buoy.T, self.buoy.Hs, 'bo', alpha=0.1, label='NDBC data')
plt.plot(self.T_ReturnContours, self.Hs_ReturnContours, 'k-', label='100 year contour')
plt.plot(contour97_5_T, contour97_5_Hs, 'r--', label='95% bootstrap confidence interval')
plt.plot(contour2_5_T, contour2_5_Hs, 'r--')
plt.plot(contourmean_T, contourmean_Hs, 'r-', label='Mean bootstrap contour')
plt.legend(loc='lower right', fontsize='small')
plt.grid(True)
plt.xlabel('Energy period, $T_e$ [s]')
plt.ylabel('Sig. wave height, $H_s$ [m]')
plt.show()
if plotResults:
plotResults()
return contourmean_Hs, contourmean_T
[docs] def outsidePoints(self):
'''Determines which buoy observations are outside of a given contour.
Parameters
----------
None
Returns
-------
outsideHs : nparray
The Hs values of the observations that are outside of the contour
outsideT : nparray
The T values of the observations that are outside of the contour
Example
-------
To get correseponding T and Hs arrays of observations that are outside
of a given contour:
import WDRT.ESSC as ESSC
import numpy as np
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create PCA EA object for buoy
rosen46022 = ESSC.Rosenblatt(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
# Generate contour
Hs_Return, T_Return = rosen46022.getContours(Time_SS, Time_r)
# Return the outside point Hs/T combinations
outsideT, outsideHs = rosen46022.outsidePoints()
'''
#checks if the contour type is a KDE contour - if so, finds the outside points for the KDE contour.
if isinstance(self.T_ReturnContours,list):
contains_test = np.zeros(len(self.buoy.T),dtype=bool)
for t,hs in zip(self.T_ReturnContours,self.Hs_ReturnContours):
path_contour = []
path_contour = matplotlib.path.Path(np.column_stack((t,hs)))
contains_test = contains_test+path_contour.contains_points(np.column_stack((self.buoy.T,self.buoy.Hs)))
out_inds = np.where(~contains_test)
else: # For non-KDE methods (copulas, etc.)
path_contour = matplotlib.path.Path(np.column_stack((self.T_ReturnContours,self.Hs_ReturnContours)))
contains_test = path_contour.contains_points(np.column_stack((self.buoy.T,self.buoy.Hs)))
out_inds = np.where(~contains_test)
outsideHs =self.buoy.Hs[out_inds]
outsideT = self.buoy.T[out_inds]
return(outsideT, outsideHs)
[docs] def contourIntegrator(self):
'''Calculates the area of the contour over the two-dimensional input
space of interest.
Parameters
----------
None
Returns
-------
area : float
The area of the contour in TxHs units.
Example
-------
To obtain the area of the contour:
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create PCA EA object for buoy
rosen46022 = ESSC.Rosenblatt(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
# Generate contour
Hs_Return, T_Return = rosen46022.getContours(Time_SS, Time_r)
# Return the area of the contour
rosenArea = rosen46022.contourIntegrator()
'''
contourTs = self.T_ReturnContours
contourHs = self.Hs_ReturnContours
area = 0.5*np.abs(np.dot(contourTs,np.roll(contourHs,1))-np.dot(contourHs,np.roll(contourTs,1)))
return area
[docs] def dataContour(self, tStepSize = 1, hsStepSize = .5):
'''Creates a contour around the ordered pairs of buoy observations. How tightly
the contour fits around the data will be determined by step size parameters.
Please note that this function currently is in beta; it needs further work to be
optimized for use.
Parameters
----------
tStepSize : float
Determines how far to search for the next point in the T direction.
Smaller values will produce contours that follow the data more closely.
hsStepSize : float
Determines how far to search for the next point in the Hs direction.
Smaller values will produce contours that follow the data more closely.
Returns
-------
dataBoundryHs : nparray
The Hs values of the boundry observations
dataBoundryT : nparray
The Hs values of the boundry observations
Example
-------
To get the corresponding data contour:
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create PCA EA object for buoy
rosen46022 = ESSC.Rosenblatt(buoy46022)
# Calculate the data contour
dataHs, dataT = rosen46022.dataContour(tStepSize = 1, hsStepSize = .5)
'''
maxHs = max(self.buoy.Hs)
minHs = min(self.buoy.Hs)
sortedHsBuoy = copy.deepcopy(self.buoy)
sortedTBuoy = copy.deepcopy(self.buoy)
sortedTIndex = sorted(range(len(self.buoy.T)),key=lambda x:self.buoy.T[x])
sortedHsIndex = sorted(range(len(self.buoy.Hs)),key=lambda x:self.buoy.Hs[x])
sortedHsBuoy.Hs = self.buoy.Hs[sortedHsIndex]
sortedHsBuoy.T = self.buoy.T[sortedHsIndex]
sortedTBuoy.Hs = self.buoy.Hs[sortedTIndex]
sortedTBuoy.T = self.buoy.T[sortedTIndex]
hsBin1 = []
hsBin2 = []
hsBin3 = []
hsBin4 = []
tBin1 = []
tBin2 = []
tBin3 = []
tBin4 = []
startingPoint = sortedTBuoy.T[0]
hsBin4.append(sortedTBuoy.Hs[0])
tBin4.append(sortedTBuoy.T[0])
while True:
tempNextBinTs = sortedTBuoy.T[sortedTBuoy.T < startingPoint + tStepSize]
tempNextBinHs = sortedTBuoy.Hs[sortedTBuoy.T < startingPoint + tStepSize]
nextBinTs = tempNextBinTs[tempNextBinTs > startingPoint]
nextBinHs = tempNextBinHs[tempNextBinTs > startingPoint]
try:
nextHs = max(nextBinHs)
nextT = nextBinTs[nextBinHs.argmax(axis=0)]
hsBin4.append(nextHs)
tBin4.append(nextT)
startingPoint = nextT
except ValueError:
startingPoint += tStepSize
break
if nextHs == maxHs:
break
startingPoint = sortedTBuoy.T[0]
hsBin1.append(sortedTBuoy.Hs[0])
tBin1.append(sortedTBuoy.T[0])
while True:
tempNextBinTs = sortedTBuoy.T[sortedTBuoy.T < startingPoint + tStepSize]
tempNextBinHs = sortedTBuoy.Hs[sortedTBuoy.T < startingPoint + tStepSize]
nextBinTs = tempNextBinTs[tempNextBinTs > startingPoint]
nextBinHs = tempNextBinHs[tempNextBinTs > startingPoint]
try:
nextHs = min(nextBinHs)
nextT = nextBinTs[nextBinHs.argmin(axis=0)]
hsBin1.append(nextHs)
tBin1.append(nextT)
startingPoint = nextT
except ValueError:
startingPoint += tStepSize
break
if nextHs == minHs:
break
startingPoint = sortedHsBuoy.Hs[sortedHsBuoy.T.argmax(axis=0)]
hsBin3.append(sortedHsBuoy.Hs[sortedHsBuoy.T.argmax(axis=0)])
tBin3.append(sortedHsBuoy.T[sortedHsBuoy.T.argmax(axis=0)])
while True:
tempNextBinTs = sortedHsBuoy.T[sortedHsBuoy.Hs < startingPoint + hsStepSize]
tempNextBinHs = sortedHsBuoy.Hs[sortedHsBuoy.Hs < startingPoint + hsStepSize]
nextBinTs = tempNextBinTs[tempNextBinHs > startingPoint]
nextBinHs = tempNextBinHs[tempNextBinHs > startingPoint]
try:
nextT = max(nextBinTs)
nextHs = nextBinHs[nextBinTs.argmax(axis=0)]
if nextHs not in hsBin4 and nextHs not in hsBin1:
hsBin3.append(nextHs)
tBin3.append(nextT)
startingPoint = nextHs
except ValueError:
startingPoint += hsStepSize
break
if nextHs == maxHs:
break
startingPoint = sortedHsBuoy.Hs[sortedHsBuoy.T.argmax(axis=0)]
while True:
tempNextBinTs = sortedHsBuoy.T[sortedHsBuoy.Hs > startingPoint - hsStepSize]
tempNextBinHs = sortedHsBuoy.Hs[sortedHsBuoy.Hs > startingPoint - hsStepSize]
nextBinTs = tempNextBinTs[tempNextBinHs < startingPoint]
nextBinHs = tempNextBinHs[tempNextBinHs < startingPoint]
try:
nextT = max(nextBinTs)
nextHs = nextBinHs[nextBinTs.argmax(axis=0)]
if nextHs not in hsBin1 and nextHs not in hsBin4:
hsBin2.append(nextHs)
tBin2.append(nextT)
startingPoint = nextHs
except ValueError:
startingPoint = startingPoint - hsStepSize
break
if nextHs == minHs:
break
hsBin2 = hsBin2[::-1] # Reverses the order of the array
tBin2 = tBin2[::-1]
hsBin4 = hsBin4[::-1] # Reverses the order of the array
tBin4 = tBin4[::-1]
dataBoundryHs = np.concatenate((hsBin1,hsBin2,hsBin3,hsBin4),axis = 0)
dataBoundryT = np.concatenate((tBin1,tBin2,tBin3,tBin4),axis = 0)
dataBoundryHs = dataBoundryHs[::-1]
dataBoundryT = dataBoundryT[::-1]
return(dataBoundryHs, dataBoundryT)
def __getCopulaParams(self,n_size,bin_1_limit,bin_step):
sorted_idx = sorted(range(len(self.buoy.Hs)),key=lambda x:self.buoy.Hs[x])
Hs = self.buoy.Hs[sorted_idx]
T = self.buoy.T[sorted_idx]
# Estimate parameters for Weibull distribution for component 1 (Hs) using MLE
# Estimate parameters for Lognormal distribution for component 2 (T) using MLE
para_dist_1=stats.exponweib.fit(Hs,floc=0,fa=1)
para_dist_2=stats.norm.fit(np.log(T))
# Binning
ind = np.array([])
ind = np.append(ind,sum(Hs_val <= bin_1_limit for Hs_val in Hs))
# Make sure first bin isn't empty or too small to avoid errors
while ind == 0 or ind < n_size:
ind = np.array([])
bin_1_limit = bin_1_limit + bin_step
ind = np.append(ind,sum(Hs_val <= bin_1_limit for Hs_val in Hs))
for i in range(1,200):
bin_i_limit = bin_1_limit+bin_step*(i)
ind = np.append(ind,sum(Hs_val <= bin_i_limit for Hs_val in Hs))
if (ind[i-0]-ind[i-1]) < n_size:
break
# Parameters for conditional distribution of T|Hs for each bin
num=len(ind) # num+1: number of bins
para_dist_cond = []
hss = []
para_dist_cond.append(stats.norm.fit(np.log(T[range(0,int(ind[0]))]))) # parameters for first bin
hss.append(np.mean(Hs[range(0,int(ind[0])-1)])) # mean of Hs (component 1 for first bin)
para_dist_cond.append(stats.norm.fit(np.log(T[range(0,int(ind[1]))]))) # parameters for second bin
hss.append(np.mean(Hs[range(0,int(ind[1])-1)])) # mean of Hs (component 1 for second bin)
for i in range(2,num):
para_dist_cond.append(stats.norm.fit(np.log(T[range(int(ind[i-2]),int(ind[i]))])));
hss.append(np.mean(Hs[range(int(ind[i-2]),int(ind[i]))]))
# Estimate coefficient using least square solution (mean: third order, sigma: 2nd order)
para_dist_cond.append(stats.norm.fit(np.log(T[range(int(ind[num-2]),int(len(Hs)))]))); # parameters for last bin
hss.append(np.mean(Hs[range(int(ind[num-2]),int(len(Hs)))])) # mean of Hs (component 1 for last bin)
para_dist_cond = np.array(para_dist_cond)
hss = np.array(hss)
phi_mean = np.column_stack((np.ones(num+1),hss[:],hss[:]**2,hss[:]**3))
phi_std = np.column_stack((np.ones(num+1),hss[:],hss[:]**2))
# Estimate coefficients of mean of Ln(T|Hs)(vector 4x1) (cubic in Hs)
mean_cond = np.linalg.lstsq(phi_mean,para_dist_cond[:,0])[0]
# Estimate coefficients of standard deviation of Ln(T|Hs) (vector 3x1) (quadratic in Hs)
std_cond = np.linalg.lstsq(phi_std,para_dist_cond[:,1])[0]
return para_dist_1, para_dist_2, mean_cond, std_cond
def __getNonParaCopulaParams(self,Ndata, max_T, max_Hs):
sorted_idx = sorted(range(len(self.buoy.Hs)),key=lambda x:self.buoy.Hs[x])
Hs = self.buoy.Hs[sorted_idx]
T = self.buoy.T[sorted_idx]
# Calcualte KDE bounds (this may be added as an input later)
min_limit_1 = 0
max_limit_1 = max_Hs
min_limit_2 = 0
max_limit_2 = max_T
# Discretize for KDE
pts_hs = np.linspace(min_limit_1, max_limit_1, self.Ndata)
pts_t = np.linspace(min_limit_2, max_limit_2, self.Ndata)
# Calculate optimal bandwidth for T and Hs
sig = robust.scale.mad(T)
num = float(len(T))
bwT = sig*(4.0/(3.0*num))**(1.0/5.0)
sig = robust.scale.mad(Hs)
num = float(len(Hs))
bwHs = sig*(4.0/(3.0*num))**(1.0/5.0)
# Nonparametric PDF for T
temp = sm.nonparametric.KDEUnivariate(T)
temp.fit(bw = bwT)
f_t = temp.evaluate(pts_t)
# Nonparametric CDF for Hs
temp = sm.nonparametric.KDEUnivariate(Hs)
temp.fit(bw = bwHs)
tempPDF = temp.evaluate(pts_hs)
F_hs = tempPDF/sum(tempPDF)
F_hs = np.cumsum(F_hs)
# Nonparametric CDF for T
F_t = f_t/sum(f_t)
F_t = np.cumsum(F_t)
nonpara_dist_1 = np.transpose(np.array([pts_hs, F_hs]))
nonpara_dist_2 = np.transpose(np.array([pts_t, F_t]))
nonpara_pdf_2 = np.transpose(np.array([pts_t, f_t]))
return nonpara_dist_1, nonpara_dist_2, nonpara_pdf_2
def __gumbelCopula(self, u, alpha):
''' Calculates the Gumbel copula density
Parameters
----------
u: np.array
Vector of equally spaced points between 0 and twice the
maximum value of T.
alpha: float
Copula parameter. Must be greater than or equal to 1.
Returns
-------
y: np.array
Copula density function.
'''
#Ignore divide by 0 warnings and resulting NaN warnings
np.seterr(all='ignore')
v = -np.log(u)
v = np.sort(v, axis=0)
vmin = v[0, :]
vmax = v[1, :]
nlogC = vmax * (1 + (vmin / vmax) ** alpha) ** (1 / alpha)
y = (alpha - 1 +nlogC)*np.exp(-nlogC+np.sum((alpha-1)*np.log(v)+v, axis =0) +(1-2*alpha)*np.log(nlogC))
np.seterr(all='warn')
return(y)
[docs]class PCA(EA):
def __init__(self, buoy, size_bin=250.):
'''
Create a PCA EA class for a buoy object. Contours generated under this
class will use principal component analysis (PCA) with improved
distribution fitting (Eckert et. al 2015) and the I-FORM.
Parameters
___________
size_bin : float
chosen bin size
buoy : NDBCData
ESSC.Buoy Object
'''
self.method = "Principle component analysis"
self.buoy = buoy
if size_bin > len(buoy.Hs)*0.25:
self.size_bin = len(buoy.Hs)*0.25
print(round(len(buoy.Hs)*0.25,2),'is the max bin size for this buoy. The bin size has been set to this amount.')
else:
self.size_bin = size_bin
self.Hs_ReturnContours = None
self.Hs_SampleCA = None
self.Hs_SampleFSS = None
self.T_ReturnContours = None
self.T_SampleCA = None
self.T_SampleFSS = None
self.Weight_points = None
self.coeff, self.shift, self.comp1_params, self.sigma_param, self.mu_param = self.__generateParams(self.size_bin)
def __generateParams(self, size_bin=250.0):
pca = skPCA(n_components=2)
pca.fit(np.array((self.buoy.Hs - self.buoy.Hs.mean(axis=0), self.buoy.T - self.buoy.T.mean(axis=0))).T)
coeff = abs(pca.components_) # Apply correct/expected sign convention
coeff[1, 1] = -1.0 * coeff[1, 1] # Apply correct/expected sign convention
Comp1_Comp2 = np.dot (np.array((self.buoy.Hs, self.buoy.T)).T, coeff)
shift = abs(min(Comp1_Comp2[:, 1])) + 0.1 # Calculate shift
shift = abs(min(Comp1_Comp2[:, 1])) + 0.1 # Calculate shift
# Apply shift to Component 2 to make all values positive
Comp1_Comp2[:, 1] = Comp1_Comp2[:, 1] + shift
Comp1_Comp2_sort = Comp1_Comp2[Comp1_Comp2[:, 0].argsort(), :]
# Fitting distribution of component 1
comp1_params = stats.invgauss.fit(Comp1_Comp2_sort[:, 0], floc=0)
n_data = len(self.buoy.Hs) # Number of observations
edges = np.hstack((np.arange(0, size_bin * np.ceil(n_data / size_bin),
size_bin), n_data + 1))
ranks = np.arange(n_data)
hist_count, _ = np.histogram(ranks, bins=edges)
bin_inds = np.digitize(ranks, bins=edges) - 1
Comp2_bins_params = np.zeros((2, int(max(bin_inds) + 1)))
Comp1_mean = np.array([])
for bin_loop in range(np.max(bin_inds) + 1):
mask_bins = bin_inds == bin_loop # Find location of bin values
Comp2_bin = np.sort(Comp1_Comp2_sort[mask_bins, 1])
Comp1_mean = np.append(Comp1_mean,
np.mean(Comp1_Comp2_sort[mask_bins, 0]))
# Calcualte normal distribution parameters for C2 in each bin
Comp2_bins_params[:, bin_loop] = np.array(stats.norm.fit(Comp2_bin))
mu_param, pcov = optim.curve_fit(self.__mu_fcn,
Comp1_mean.T, Comp2_bins_params[0, :])
sigma_param = self.__sigma_fits(Comp1_mean, Comp2_bins_params[1, :])
return coeff, shift, comp1_params, sigma_param, mu_param
def _saveParams(self, groupObj):
if('nb_steps' in groupObj):
groupObj['nb_steps'][...] = self.nb_steps
else:
groupObj.create_dataset('nb_steps', data=self.nb_steps)
if('time_r' in groupObj):
groupObj['time_r'][...] = self.time_r
else:
groupObj.create_dataset('time_r', data=self.time_r)
if('time_ss' in groupObj):
groupObj['time_ss'][...] = self.time_ss
else:
groupObj.create_dataset('time_ss', data=self.time_ss)
if('coeff' in groupObj):
groupObj['coeff'][...] = self.coeff
else:
groupObj.create_dataset('coeff', data=self.coeff)
if('shift' in groupObj):
groupObj['shift'][...] = self.shift
else:
groupObj.create_dataset('shift', data=self.shift)
if('comp1_params' in groupObj):
groupObj['comp1_params'][...] = self.comp1_params
else:
groupObj.create_dataset('comp1_params', data=self.comp1_params)
if('sigma_param' in groupObj):
groupObj['sigma_param'][...] = self.sigma_param
else:
groupObj.create_dataset('sigma_param', data=self.sigma_param)
if('mu_param' in groupObj):
groupObj['mu_param'][...] = self.mu_param
else:
groupObj.create_dataset('mu_param', data=self.mu_param)
[docs] def getContours(self, time_ss, time_r, nb_steps=1000):
'''WDRT Extreme Sea State PCA Contour function
This function calculates environmental contours of extreme sea states using
principal component analysis and the inverse first-order reliability
method.
Parameters
___________
time_ss : float
Sea state duration (hours) of measurements in input.
time_r : np.array
Desired return period (years) for calculation of environmental
contour, can be a scalar or a vector.
nb_steps : int
Discretization of the circle in the normal space used for
inverse FORM calculation.
Returns
-------
Hs_Return : np.array
Calculated Hs values along the contour boundary following
return to original input orientation.
T_Return : np.array
Calculated T values along the contour boundary following
return to original input orientation.
nb_steps : float
Discretization of the circle in the normal space
Example
-------
To obtain the contours for a NDBC buoy::
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create PCA EA object for buoy
pca46022 = ESSC.PCA(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Contour generation example
Hs_Return, T_Return = pca46022.getContours(Time_SS, Time_r,nb_steps)
'''
self.time_ss = time_ss
self.time_r = time_r
self.nb_steps = nb_steps
# IFORM
# Failure probability for the desired return period (time_R) given the
# duration of the measurements (time_ss)
p_f = 1 / (365 * (24 / time_ss) * time_r)
beta = stats.norm.ppf((1 - p_f), loc=0, scale=1) # Reliability
theta = np.linspace(0, 2 * np.pi, num = nb_steps)
# Vary U1, U2 along circle sqrt(U1^2+U2^2)=beta
U1 = beta * np.cos(theta)
U2 = beta * np.sin(theta)
# Calculate C1 values along the contour
Comp1_R = stats.invgauss.ppf(stats.norm.cdf(U1, loc=0, scale=1),
mu= self.comp1_params[0], loc=0,
scale= self.comp1_params[2])
# Calculate mu values at each point on the circle
mu_R = self.__mu_fcn(Comp1_R, self.mu_param[0], self.mu_param[1])
# Calculate sigma values at each point on the circle
sigma_R = self.__sigma_fcn(self.sigma_param, Comp1_R)
# Use calculated mu and sigma values to calculate C2 along the contour
Comp2_R = stats.norm.ppf(stats.norm.cdf(U2, loc=0, scale=1),
loc=mu_R, scale=sigma_R)
# Calculate Hs and T along the contour
Hs_Return, T_Return = self.__princomp_inv(Comp1_R, Comp2_R, self.coeff, self.shift)
Hs_Return = np.maximum(0, Hs_Return) # Remove negative values
self.Hs_ReturnContours = Hs_Return
self.T_ReturnContours = T_Return
return Hs_Return, T_Return
[docs] def getSamples(self, num_contour_points, contour_returns, random_seed=None):
'''WDRT Extreme Sea State Contour Sampling function.
This function calculates samples of Hs and T using the EA function to
sample between contours of user-defined return periods.
Parameters
----------
num_contour_points : int
Number of sample points to be calculated per contour interval.
contour_returns: np.array
Vector of return periods that define the contour intervals in
which samples will be taken. Values must be greater than zero and
must be in increasing order.
random_seed: int (optional)
Random seed for sample generation, required for sample
repeatability. If left blank, a seed will automatically be
generated.
Returns
-------
Hs_Samples: np.array
Vector of Hs values for each sample point.
Te_Samples: np.array
Vector of Te values for each sample point.
Weight_points: np.array
Vector of probabilistic weights for each sampling point
to be used in risk calculations.
Example
-------
To get weighted samples from a set of contours::
import numpy as np
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create PCA EA object for buoy
pca46022 = ESSC.PCA(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
num_contour_points = 10 # Number of points to be sampled for each contour interval
contour_returns = np.array([0.001, 0.01, 0.05, 0.1, 0.5, 1, 5, 10, 50, 100])
# Calculate contour to save required variables to PCA EA object
pca46022.getContours(Time_SS, Time_r,nb_steps)
# Probabilities defining sampling contour bounds.
random_seed = 2 # Random seed for sample generation
# Get samples for a full sea state long term analysis
Hs_sampleFSS, T_sampleFSS, Weight_sampleFSS = pca46022.getSamples(num_contour_points,
contour_returns, random_seed)
'''
# Calculate line where Hs = 0 to avoid sampling Hs in negative space
Te_zeroline = np.linspace(2.5, 30, 1000)
Te_zeroline = np.transpose(Te_zeroline)
Hs_zeroline = np.zeros(len(Te_zeroline))
# Transform zero line into principal component space
Comp_zeroline = np.dot(np.transpose(np.vstack([Hs_zeroline, Te_zeroline])),
self.coeff)
Comp_zeroline[:, 1] = Comp_zeroline[:, 1] + self.shift
# Find quantiles along zero line
C1_zeroline_prob = stats.invgauss.cdf(Comp_zeroline[:, 0],
mu = self.comp1_params[0], loc=0,
scale = self.comp1_params[2])
mu_zeroline = self.__mu_fcn(Comp_zeroline[:, 0], self.mu_param[0], self.mu_param[1])
sigma_zeroline = self.__sigma_fcn(self.sigma_param, Comp_zeroline[:, 0])
C2_zeroline_prob = stats.norm.cdf(Comp_zeroline[:, 1],
loc=mu_zeroline, scale=sigma_zeroline)
C1_normzeroline = stats.norm.ppf(C1_zeroline_prob, 0, 1)
C2_normzeroline = stats.norm.ppf(C2_zeroline_prob, 0, 1)
contour_probs = 1 / (365 * (24 / self.time_ss) * contour_returns)
# Reliability contour generation
beta_lines = stats.norm.ppf(
(1 - contour_probs), 0, 1) # Calculate reliability
beta_lines = np.hstack((0, beta_lines)) # Add zero as lower bound to first
# contour
theta_lines = np.linspace(0, 2 * np.pi, 1000) # Discretize the circle
contour_probs = np.hstack((1, contour_probs)) # Add probablity of 1 to the
# reliability set, corresponding to probability of the center point of the
# normal space
# Vary U1,U2 along circle sqrt(U1^2+U2^2) = beta
U1_lines = np.dot(np.cos(theta_lines[:, None]), beta_lines[None, :])
U2_lines = np.dot(np.sin(theta_lines[:, None]), beta_lines[None, :])
# Removing values on the H_s = 0 line that are far from the circles in the
# normal space that will be evaluated to speed up calculations
minval = np.amin(U1_lines) - 0.5
mask = C1_normzeroline > minval
C1_normzeroline = C1_normzeroline[mask]
C2_normzeroline = C2_normzeroline[mask]
# Transform to polar coordinates
Theta_zeroline = np.arctan2(C2_normzeroline, C1_normzeroline)
Rho_zeroline = np.sqrt(C1_normzeroline**2 + C2_normzeroline**2)
Theta_zeroline[Theta_zeroline < 0] = Theta_zeroline[
Theta_zeroline < 0] + 2 * np.pi
Sample_alpha, Sample_beta, Weight_points = self.__generateData(beta_lines,
Rho_zeroline, Theta_zeroline, num_contour_points,contour_probs,random_seed)
Hs_Sample, T_Sample = self.__transformSamples(Sample_alpha, Sample_beta)
self.Hs_SampleFSS = Hs_Sample
self.T_SampleFSS = T_Sample
self.Weight_SampleFSS = Weight_points
return Hs_Sample, T_Sample, Weight_points
[docs] def plotSampleData(self):
"""
Display a plot of the 100-year return contour, full sea state samples
and contour samples
"""
plt.figure()
plt.plot(self.buoy.T, self.buoy.Hs, 'bo', alpha=0.1, label='NDBC data')
plt.plot(self.T_ReturnContours, self.Hs_ReturnContours, 'k-', label='100 year contour')
plt.plot(self.T_SampleFSS, self.Hs_SampleFSS, 'ro', label='full sea state samples')
plt.plot(self.T_SampleCA, self.Hs_SampleCA, 'y^', label='contour approach samples')
plt.legend(loc='lower right', fontsize='small')
plt.grid(True)
plt.xlabel('Energy period, $T_e$ [s]')
plt.ylabel('Sig. wave height, $H_s$ [m]')
plt.show()
def __generateData(self, beta_lines, Rho_zeroline, Theta_zeroline, num_contour_points, contour_probs, random_seed):
"""
Calculates radius, angle, and weight for each sample point
"""
''' Data generating function that calculates the radius, angle, and
weight for each sample point.
Parameters
----------
beta_lines: np.array
Array of mu fitting function parameters.
Rho_zeroline: np.array
array of radii
Theta_zeroline: np.array
num_contour_points: np.array
contour_probs: np.array
random_seed: int
seed for generating random data.
Returns
-------
Sample_alpha: np.array
Array of fitted sample angle values.
Sample_beta: np.array
Array of fitted sample radius values.
Weight_points: np.array
Array of weights for each point.
'''
np.random.seed(random_seed)
num_samples = (len(beta_lines) - 1) * num_contour_points
Alpha_bounds = np.zeros((len(beta_lines) - 1, 2))
Angular_dist = np.zeros(len(beta_lines) - 1)
Angular_ratio = np.zeros(len(beta_lines) - 1)
Alpha = np.zeros((len(beta_lines) - 1, num_contour_points + 1))
Weight = np.zeros(len(beta_lines) - 1)
Sample_beta = np.zeros(num_samples)
Sample_alpha = np.zeros(num_samples)
Weight_points = np.zeros(num_samples)
for i in range(len(beta_lines) - 1): # Loop over contour intervals
# Check if any of the radii for the
r = Rho_zeroline - beta_lines[i + 1]
# Hs=0, line are smaller than the radii of the contour, meaning
# that these lines intersect
if any(r < 0):
left = np.amin(np.where(r < -0.01))
right = np.amax(np.where(r < -0.01))
Alpha_bounds[i, :] = (Theta_zeroline[left], Theta_zeroline[right] -
2 * np.pi) # Save sampling bounds
else:
Alpha_bounds[i, :] = np.array((0, 2 * np.pi))
# Find the angular distance that will be covered by sampling the disc
Angular_dist[i] = sum(abs(Alpha_bounds[i]))
# Calculate ratio of area covered for each contour
Angular_ratio[i] = Angular_dist[i] / (2 * np.pi)
# Discretize the remaining portion of the disc into 10 equally spaced
# areas to be sampled
Alpha[i, :] = np.arange(min(Alpha_bounds[i]),
max(Alpha_bounds[i]) + 0.1, Angular_dist[i] / num_contour_points)
# Calculate the weight of each point sampled per contour
Weight[i] = ((contour_probs[i] - contour_probs[i + 1]) *
Angular_ratio[i] / num_contour_points)
for j in range(num_contour_points):
# Generate sample radius by adding a randomly sampled distance to
# the 'disc' lower bound
Sample_beta[(i) * num_contour_points + j] = (beta_lines[i] +
np.random.random_sample() * (beta_lines[i + 1] - beta_lines[i]))
# Generate sample angle by adding a randomly sampled distance to
# the lower bound of the angle defining a discrete portion of the
# 'disc'
Sample_alpha[(i) * num_contour_points + j] = (Alpha[i, j] +
np.random.random_sample() * (Alpha[i, j + 1] - Alpha[i, j]))
# Save the weight for each sample point
Weight_points[(i) * num_contour_points + j] = Weight[i]
return Sample_alpha, Sample_beta, Weight_points
def __transformSamples(self, Sample_alpha, Sample_beta):
Sample_U1 = Sample_beta * np.cos(Sample_alpha)
Sample_U2 = Sample_beta * np.sin(Sample_alpha)
# Sample transformation to principal component space
Comp1_sample = stats.invgauss.ppf(stats.norm.cdf(Sample_U1, loc=0, scale=1),
mu=self.comp1_params[0], loc=0,
scale=self.comp1_params[2])
mu_sample = self.__mu_fcn(Comp1_sample, self.mu_param[0], self.mu_param[1])
# Calculate sigma values at each point on the circle
sigma_sample = self.__sigma_fcn(self.sigma_param, Comp1_sample)
# Use calculated mu and sigma values to calculate C2 along the contour
Comp2_sample = stats.norm.ppf(stats.norm.cdf(Sample_U2, loc=0, scale=1),
loc=mu_sample, scale=sigma_sample)
# Sample transformation into Hs-T space
Hs_Sample, T_Sample = self.__princomp_inv(
Comp1_sample, Comp2_sample, self.coeff, self.shift)
return Hs_Sample, T_Sample
def __mu_fcn(self, x, mu_p_1, mu_p_2):
''' Linear fitting function for the mean(mu) of Component 2 normal
distribution as a function of the Component 1 mean for each bin.
Used in the EA and getSamples functions.
Parameters
----------
mu_p: np.array
Array of mu fitting function parameters.
x: np.array
Array of values (Component 1 mean for each bin) at which to evaluate
the mu fitting function.
Returns
-------
mu_fit: np.array
Array of fitted mu values.
'''
mu_fit = mu_p_1 * x + mu_p_2
return mu_fit
def __sigma_fcn(self,sig_p, x):
'''Quadratic fitting formula for the standard deviation(sigma) of Component
2 normal distribution as a function of the Component 1 mean for each bin.
Used in the EA and getSamples functions.
Parameters
----------
sig_p: np.array
Array of sigma fitting function parameters.
x: np.array
Array of values (Component 1 mean for each bin) at which to evaluate
the sigma fitting function.
Returns
-------
sigma_fit: np.array
Array of fitted sigma values.
'''
sigma_fit = sig_p[0] * x**2 + sig_p[1] * x + sig_p[2]
return sigma_fit
def __princomp_inv(self, princip_data1, princip_data2, coeff, shift):
'''Takes the inverse of the principal component rotation given data,
coefficients, and shift. Used in the EA and getSamples functions.
Parameters
----------
princip_data1: np.array
Array of Component 1 values.
princip_data2: np.array
Array of Component 2 values.
coeff: np.array
Array of principal component coefficients.
shift: float
Shift applied to Component 2 to make all values positive.
Returns
-------
original1: np.array
Hs values following rotation from principal component space.
original2: np.array
T values following rotation from principal component space.
'''
original1 = np.zeros(len(princip_data1))
original2 = np.zeros(len(princip_data1))
for i in range(len(princip_data2)):
original1[i] = (((coeff[0, 1] * (princip_data2[i] - shift)) +
(coeff[0, 0] * princip_data1[i])) / (coeff[0, 1]**2 +
coeff[0, 0]**2))
original2[i] = (((coeff[0, 1] * princip_data1[i]) -
(coeff[0, 0] * (princip_data2[i] -
shift))) / (coeff[0, 1]**2 + coeff[0, 0]**2))
return original1, original2
def __betafcn(self, sig_p, rho):
'''Penalty calculation for sigma parameter fitting function to impose
positive value constraint.
Parameters
----------
sig_p: np.array
Array of sigma fitting function parameters.
rho: float
Penalty function variable that drives the solution towards
required constraint.
Returns
-------
Beta1: float
Penalty function variable that applies the constraint requiring
the y-intercept of the sigma fitting function to be greater than
or equal to 0.
Beta2: float
Penalty function variable that applies the constraint requiring
the minimum of the sigma fitting function to be greater than or
equal to 0.
'''
if -sig_p[2] <= 0:
Beta1 = 0.0
else:
Beta1 = rho
if -sig_p[2] + (sig_p[1]**2) / (4 * sig_p[0]) <= 0:
Beta2 = 0.0
else:
Beta2 = rho
return Beta1, Beta2
# Sigma function sigma_fcn defined outside of EA function
def __objfun(self, sig_p, x, y_actual):
'''Sum of least square error objective function used in sigma
minimization.
Parameters
----------
sig_p: np.array
Array of sigma fitting function parameters.
x: np.array
Array of values (Component 1 mean for each bin) at which to evaluate
the sigma fitting function.
y_actual: np.array
Array of actual sigma values for each bin to use in least
square error calculation with fitted values.
Returns
-------
obj_fun_result: float
Sum of least square error objective function for fitted
and actual values.
'''
obj_fun_result = np.sum((self.__sigma_fcn(sig_p, x) - y_actual)**2)
return obj_fun_result # Sum of least square error
def __objfun_penalty(self, sig_p, x, y_actual, Beta1, Beta2):
'''Penalty function used for sigma function constrained optimization.
Parameters
----------
sig_p: np.array
Array of sigma fitting function parameters.
x: np.array
Array of values (Component 1 mean for each bin) at which to evaluate
the sigma fitting function.
y_actual: np.array
Array of actual sigma values for each bin to use in least
square error calculation with fitted values.
Beta1: float
Penalty function variable that applies the constraint requiring
the y-intercept of the sigma fitting function to be greater than
or equal to 0.
Beta2: float
Penalty function variable that applies the constraint requiring
the minimum of the sigma fitting function to be greater than or
equal to 0.
Returns
-------
penalty_fcn: float
Objective function result with constraint penalties
applied for out of bound solutions.
'''
penalty_fcn = (self.__objfun(sig_p, x, y_actual) + Beta1 * (-sig_p[2])**2 +
Beta2 * (-sig_p[2] + (sig_p[1]**2) / (4 * sig_p[0]))**2)
return penalty_fcn
def __sigma_fits(self, Comp1_mean, sigma_vals):
'''Sigma parameter fitting function using penalty optimization.
Parameters
----------
Comp1_mean: np.array
Mean value of Component 1 for each bin of Component 2.
sigma_vals: np.array
Value of Component 2 sigma for each bin derived from normal
distribution fit.
Returns
-------
sig_final: np.array
Final sigma parameter values after constrained optimization.
'''
sig_0 = np.array((0.1, 0.1, 0.1)) # Set initial guess
rho = 1.0 # Set initial penalty value
# Set tolerance, very small values (i.e.,smaller than 10^-5) may cause
# instabilities
epsilon = 10**-5
# Set inital beta values using beta function
Beta1, Beta2 = self.__betafcn(sig_0, rho)
# Initial search for minimum value using initial guess
sig_1 = optim.fmin(func=self.__objfun_penalty, x0=sig_0,
args=(Comp1_mean, sigma_vals, Beta1, Beta2), disp=False)
# While either the difference between iterations or the difference in
# objective function evaluation is greater than the tolerance, continue
# iterating
while (np.amin(abs(sig_1 - sig_0)) > epsilon and
abs(self.__objfun(sig_1, Comp1_mean, sigma_vals) -
self.__objfun(sig_0, Comp1_mean, sigma_vals)) > epsilon):
sig_0 = sig_1
# Calculate penalties for this iteration
Beta1, Beta2 = self.__betafcn(sig_0, rho)
# Find a new minimum
sig_1 = optim.fmin(func=self.__objfun_penalty, x0=sig_0,
args=(Comp1_mean, sigma_vals, Beta1, Beta2), disp=False)
rho = 10 * rho # Increase penalization
sig_final = sig_1
return sig_final
[docs]class GaussianCopula(EA):
'''Create a GaussianCopula EA class for a buoy object. Contours generated
under this class will use a Gaussian copula.'''
def __init__(self, buoy, n_size=40., bin_1_limit=1., bin_step=0.25):
'''
Parameters
----------
buoy : NDBCData
ESSC.Buoy Object
n_size: float
minimum bin size used for Copula contour methods
bin_1_limit: float
maximum value of Hs for the first bin
bin_step: float
overlap interval for each bin
'''
self.method = "Gaussian Copula"
self.buoy = buoy
self.n_size = n_size
self.bin_1_limit = bin_1_limit
self.bin_step = bin_step
self.Hs_ReturnContours = None
# self.Hs_SampleCA = None
# self.Hs_SampleFSS = None
self.T_ReturnContours = None
# self.T_SampleCA = None
# self.T_SampleFSS = None
# self.Weight_points = None
# self.coeff, self.shift, self.comp1_params, self.sigma_param, self.mu_param = self.__generateParams(size_bin)
self.para_dist_1,self.para_dist_2,self.mean_cond,self.std_cond = self._EA__getCopulaParams(n_size,bin_1_limit,bin_step)
[docs] def getContours(self, time_ss, time_r, nb_steps = 1000):
'''WDRT Extreme Sea State Gaussian Copula Contour function.
This function calculates environmental contours of extreme sea states using
a Gaussian copula and the inverse first-order reliability
method.
Parameters
___________
time_ss : float
Sea state duration (hours) of measurements in input.
time_r : np.array
Desired return period (years) for calculation of environmental
contour, can be a scalar or a vector.
nb_steps : float
Discretization of the circle in the normal space used for
inverse FORM calculation.
Returns
-------
Hs_Return : np.array
Calculated Hs values along the contour boundary following
return to original input orientation.
T_Return : np.array
Calculated T values along the contour boundary following
return to original input orientation.
nb_steps : float
Discretization of the circle in the normal space
Example
-------
To obtain the contours for a NDBC buoy::
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create Environtmal Analysis object using above parameters
Gauss46022 = ESSC.GaussianCopula(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Gaussian copula contour generation example
Hs_Return, T_Return = Gauss46022.getContours(Time_SS, Time_r,nb_steps)
'''
self.time_ss = time_ss
self.time_r = time_r
self.nb_steps = nb_steps
p_f = 1 / (365 * (24 / time_ss) * time_r)
beta = stats.norm.ppf((1 - p_f), loc=0, scale=1) # Reliability
theta = np.linspace(0, 2 * np.pi, num = nb_steps)
# Vary U1, U2 along circle sqrt(U1^2+U2^2)=beta
U1 = beta * np.cos(theta)
U2 = beta * np.sin(theta)
comp_1 = stats.exponweib.ppf(stats.norm.cdf(U1),a=self.para_dist_1[0],c=self.para_dist_1[1],loc=self.para_dist_1[2],scale=self.para_dist_1[3])
tau = stats.kendalltau(self.buoy.T,self.buoy.Hs)[0] # Calculate Kendall's tau
rho_gau=np.sin(tau*np.pi/2.)
z2_Gau=stats.norm.cdf(U2*np.sqrt(1.-rho_gau**2.)+rho_gau*U1);
comp_2_Gaussian = stats.lognorm.ppf(z2_Gau,s=self.para_dist_2[1],loc=0,scale=np.exp(self.para_dist_2[0])) #lognormalinverse
Hs_Return = comp_1
T_Return = comp_2_Gaussian
self.Hs_ReturnContours = Hs_Return
self.T_ReturnContours = T_Return
return Hs_Return, T_Return
[docs] def getSamples(self):
'''Currently not implemented in this version.'''
raise NotImplementedError
def _saveParams(self, groupObj):
groupObj.create_dataset('n_size', data=self.n_size)
groupObj.create_dataset('bin_1_limit', data=self.bin_1_limit)
groupObj.create_dataset('bin_step', data=self.bin_step)
groupObj.create_dataset('para_dist_1', data=self.para_dist_1)
groupObj.create_dataset('para_dist_2', data=self.para_dist_2)
groupObj.create_dataset('mean_cond', data=self.mean_cond)
groupObj.create_dataset('std_cond', data=self.std_cond)
[docs]class Rosenblatt(EA):
'''Create a Rosenblatt EA class for a buoy object. Contours generated
under this class will use a Rosenblatt transformation and the I-FORM.'''
def __init__(self, buoy, n_size=50., bin_1_limit= .5, bin_step=0.25):
'''
Parameters
----------
buoy : NDBCData
ESSC.Buoy Object
n_size: float
minimum bin size used for Copula contour methods
bin_1_limit: float
maximum value of Hs for the first bin
bin_step: float
overlap interval for each bin
'''
self.method = "Rosenblatt"
self.buoy = buoy
if n_size > 100:
self.n_size = 100
print(100,'is the maximum "minimum bin size" for this buoy. The minimum bin size has been set to this amount.')
else:
self.n_size = n_size
if bin_step > max(buoy.Hs)*.1:
self.bin_step = max(buoy.Hs)*.1
print(round(max(buoy.Hs)*.1,2),'is the maximum bin overlap for this buoy. The bin overlap has been set to this amount.')
else:
self.bin_step = bin_step
if bin_1_limit > max(buoy.Hs)*.25:
self.bin_1_limit = max(buoy.Hs)*.25
print(round(max(buoy.Hs)*.25,2),'is the maximum limit for the first for this buoy. The first bin limit has been set to this amount.')
else:
self.bin_1_limit = bin_1_limit
self.Hs_ReturnContours = None
# self.Hs_SampleCA = None
# self.Hs_SampleFSS = None
self.T_ReturnContours = None
# self.T_SampleCA = None
# self.T_SampleFSS = None
# self.Weight_points = None
# self.coeff, self.shift, self.comp1_params, self.sigma_param, self.mu_param = self.__generateParams(size_bin)
self.para_dist_1,self.para_dist_2,self.mean_cond,self.std_cond = self._EA__getCopulaParams(self.n_size,self.bin_1_limit,self.bin_step)
[docs] def getContours(self, time_ss, time_r, nb_steps = 1000):
'''WDRT Extreme Sea State Rosenblatt Copula Contour function.
This function calculates environmental contours of extreme sea states using
a Rosenblatt transformation and the inverse first-order reliability
method.
Parameters
___________
time_ss : float
Sea state duration (hours) of measurements in input.
time_r : np.array
Desired return period (years) for calculation of environmental
contour, can be a scalar or a vector.
nb_steps : float
Discretization of the circle in the normal space used for
inverse FORM calculation.
Returns
-------
Hs_Return : np.array
Calculated Hs values along the contour boundary following
return to original input orientation.
T_Return : np.array
Calculated T values along the contour boundary following
return to original input orientation.
nb_steps : float
Discretization of the circle in the normal space
Example
-------
To obtain the contours for a NDBC buoy::
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create Environtmal Analysis object using above parameters
Rosen46022 = ESSC.Rosenblatt(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Rosenblatt contour generation example
Hs_Return, T_Return = Rosen46022.getContours(Time_SS, Time_r,nb_steps)
'''
self.time_ss = time_ss
self.time_r = time_r
self.nb_steps = nb_steps
p_f = 1 / (365 * (24 / time_ss) * time_r)
beta = stats.norm.ppf((1 - p_f), loc=0, scale=1) # Reliability
theta = np.linspace(0, 2 * np.pi, num = nb_steps)
# Vary U1, U2 along circle sqrt(U1^2+U2^2)=beta
U1 = beta * np.cos(theta)
U2 = beta * np.sin(theta)
comp_1 = stats.exponweib.ppf(stats.norm.cdf(U1),a=self.para_dist_1[0],c=self.para_dist_1[1],loc=self.para_dist_1[2],scale=self.para_dist_1[3])
lamda_cond=self.mean_cond[0]+self.mean_cond[1]*comp_1+self.mean_cond[2]*comp_1**2+self.mean_cond[3]*comp_1**3 # mean of Ln(T) as a function of Hs
sigma_cond=self.std_cond[0]+self.std_cond[1]*comp_1+self.std_cond[2]*comp_1**2 # Standard deviation of Ln(T) as a function of Hs
comp_2_Rosenblatt = stats.lognorm.ppf(stats.norm.cdf(U2),s=sigma_cond,loc=0,scale=np.exp(lamda_cond)) # lognormal inverse
Hs_Return = comp_1
T_Return = comp_2_Rosenblatt
self.Hs_ReturnContours = Hs_Return
self.T_ReturnContours = T_Return
return Hs_Return, T_Return
[docs] def getSamples(self):
'''Currently not implemented in this version'''
raise NotImplementedError
def _saveParams(self, groupObj):
groupObj.create_dataset('n_size', data=self.n_size)
groupObj.create_dataset('bin_1_limit', data=self.bin_1_limit)
groupObj.create_dataset('bin_step', data=self.bin_step)
groupObj.create_dataset('para_dist_1', data=self.para_dist_1)
groupObj.create_dataset('para_dist_2', data=self.para_dist_2)
groupObj.create_dataset('mean_cond', data=self.mean_cond)
groupObj.create_dataset('std_cond', data=self.std_cond)
[docs]class ClaytonCopula(EA):
'''Create a ClaytonCopula EA class for a buoy object. Contours generated
under this class will use a Clayton copula.'''
def __init__(self, buoy, n_size=40., bin_1_limit=1., bin_step=0.25):
'''
Parameters
----------
buoy : NDBCData
ESSC.Buoy Object
n_size: float
minimum bin size used for Copula contour methods
bin_1_limit: float
maximum value of Hs for the first bin
bin_step: float
overlap interval for each bin
'''
self.method = "Clayton Copula"
self.buoy = buoy
self.n_size = n_size
self.bin_1_limit = bin_1_limit
self.bin_step = bin_step
self.Hs_ReturnContours = None
# self.Hs_SampleCA = None
# self.Hs_SampleFSS = None
self.T_ReturnContours = None
# self.T_SampleCA = None
# self.T_SampleFSS = None
# self.Weight_points = None
# self.coeff, self.shift, self.comp1_params, self.sigma_param, self.mu_param = self.__generateParams(size_bin)
self.para_dist_1,self.para_dist_2,self.mean_cond,self.std_cond = self._EA__getCopulaParams(n_size,bin_1_limit,bin_step)
[docs] def getContours(self, time_ss, time_r, nb_steps = 1000):
'''WDRT Extreme Sea State Clayton Copula Contour function.
This function calculates environmental contours of extreme sea states using
a Clayton copula and the inverse first-order reliability
method.
Parameters
----------
time_ss : float
Sea state duration (hours) of measurements in input.
time_r : np.array
Desired return period (years) for calculation of environmental
contour, can be a scalar or a vector.
nb_steps : float
Discretization of the circle in the normal space used for
inverse FORM calculation.
Returns
-------
Hs_Return : np.array
Calculated Hs values along the contour boundary following
return to original input orientation.
T_Return : np.array
Calculated T values along the contour boundary following
return to original input orientation.
nb_steps : float
Discretization of the circle in the normal space
Example
-------
To obtain the contours for a NDBC buoy::
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create Environtmal Analysis object using above parameters
Clayton46022 = ESSC.ClaytonCopula(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Clayton copula contour generation example
Hs_Return, T_Return = Clayton46022.getContours(Time_SS, Time_r,nb_steps)
'''
self.time_ss = time_ss
self.time_r = time_r
self.nb_steps = nb_steps
p_f = 1 / (365 * (24 / time_ss) * time_r)
beta = stats.norm.ppf((1 - p_f), loc=0, scale=1) # Reliability
theta = np.linspace(0, 2 * np.pi, num = nb_steps)
# Vary U1, U2 along circle sqrt(U1^2+U2^2)=beta
U1 = beta * np.cos(theta)
U2 = beta * np.sin(theta)
comp_1 = stats.exponweib.ppf(stats.norm.cdf(U1),a=self.para_dist_1[0],c=self.para_dist_1[1],loc=self.para_dist_1[2],scale=self.para_dist_1[3])
tau = stats.kendalltau(self.buoy.T,self.buoy.Hs)[0] # Calculate Kendall's tau
theta_clay = (2.*tau)/(1.-tau)
z2_Clay=((1.-stats.norm.cdf(U1)**(-theta_clay)+stats.norm.cdf(U1)**(-theta_clay)/stats.norm.cdf(U2))**(theta_clay/(1.+theta_clay)))**(-1./theta_clay)
comp_2_Clayton = stats.lognorm.ppf(z2_Clay,s=self.para_dist_2[1],loc=0,scale=np.exp(self.para_dist_2[0])) #lognormalinverse
Hs_Return = comp_1
T_Return = comp_2_Clayton
self.Hs_ReturnContours = Hs_Return
self.T_ReturnContours = T_Return
return Hs_Return, T_Return
[docs] def getSamples(self):
'''Currently not implemented in this version'''
raise NotImplementedError
def _saveParams(self, groupObj):
groupObj.create_dataset('n_size', data=self.n_size)
groupObj.create_dataset('bin_1_limit', data=self.bin_1_limit)
groupObj.create_dataset('bin_step', data=self.bin_step)
groupObj.create_dataset('para_dist_1', data=self.para_dist_1)
groupObj.create_dataset('para_dist_2', data=self.para_dist_2)
groupObj.create_dataset('mean_cond', data=self.mean_cond)
groupObj.create_dataset('std_cond', data=self.std_cond)
[docs]class GumbelCopula(EA):
'''Create a GumbelCopula EA class for a buoy object. Contours generated
under this class will use a Gumbel copula.'''
def __init__(self, buoy, n_size=40., bin_1_limit=1., bin_step=0.25,Ndata = 1000):
'''
Parameters
----------
buoy : NDBCData
ESSC.Buoy Object
n_size: float
minimum bin size used for Copula contour methods
bin_1_limit: float
maximum value of Hs for the first bin
bin_step: float
overlap interval for each bin
Ndata: int
discretization used in the Gumbel copula density estimation,
must be less than the number of contour points used in
getContours
'''
self.method = "Gumbel Copula"
self.buoy = buoy
self.n_size = n_size
self.bin_1_limit = bin_1_limit
self.bin_step = bin_step
self.Hs_ReturnContours = None
# self.Hs_SampleCA = None
# self.Hs_SampleFSS = None
self.T_ReturnContours = None
# self.T_SampleCA = None
# self.T_SampleFSS = None
# self.Weight_points = None
# self.coeff, self.shift, self.comp1_params, self.sigma_param, self.mu_param = self.__generateParams(size_bin)
self.Ndata = Ndata
self.min_limit_2 = 0.
self.max_limit_2 = np.ceil(np.amax(self.buoy.T)*2)
self.para_dist_1,self.para_dist_2,self.mean_cond,self.std_cond = self._EA__getCopulaParams(n_size,bin_1_limit,bin_step)
[docs] def getContours(self, time_ss, time_r, nb_steps = 1000):
'''WDRT Extreme Sea State Gumbel Copula Contour function
This function calculates environmental contours of extreme sea states using
a Gumbel copula and the inverse first-order reliability
method.
Parameters
___________
time_ss : float
Sea state duration (hours) of measurements in input.
time_r : np.array
Desired return period (years) for calculation of environmental
contour, can be a scalar or a vector.
nb_steps : float
Discretization of the circle in the normal space used for
inverse FORM calculation.
Returns
-------
Hs_Return : np.array
Calculated Hs values along the contour boundary following
return to original input orientation.
T_Return : np.array
Calculated T values along the contour boundary following
return to original input orientation.
nb_steps : float
Discretization of the circle in the normal space
Example
-------
To obtain the contours for a NDBC buoy::
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create Environtmal Analysis object using above parameters
Gumbel46022 = ESSC.GumbelCopula(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Gumbel copula contour generation example
Hs_Return, T_Return = Gumbel46022.getContours(Time_SS,Time_r,nb_steps)
'''
self.time_ss = time_ss
self.time_r = time_r
self.nb_steps = nb_steps
p_f = 1 / (365 * (24 / time_ss) * time_r)
beta = stats.norm.ppf((1 - p_f), loc=0, scale=1) # Reliability
theta = np.linspace(0, 2 * np.pi, num = nb_steps)
# Vary U1, U2 along circle sqrt(U1^2+U2^2)=beta
U1 = beta * np.cos(theta)
U2 = beta * np.sin(theta)
comp_1 = stats.exponweib.ppf(stats.norm.cdf(U1),a=self.para_dist_1[0],c=self.para_dist_1[1],loc=self.para_dist_1[2],scale=self.para_dist_1[3])
tau = stats.kendalltau(self.buoy.T,self.buoy.Hs)[0] # Calculate Kendall's tau
theta_gum = 1./(1.-tau)
fi_u1=stats.norm.cdf(U1);
fi_u2=stats.norm.cdf(U2);
x2 = np.linspace(self.min_limit_2,self.max_limit_2,self.Ndata)
z2 = stats.lognorm.cdf(x2,s=self.para_dist_2[1],loc=0,scale=np.exp(self.para_dist_2[0]))
comp_2_Gumb = np.zeros(nb_steps)
for k in range(0,int(nb_steps)):
z1 = np.linspace(fi_u1[k],fi_u1[k],self.Ndata)
Z = np.array((z1,z2))
Y = self._EA__gumbelCopula(Z, theta_gum) # Copula density function
Y =np.nan_to_num(Y)
p_x2_x1 = Y*(stats.lognorm.pdf(x2, s = self.para_dist_2[1], loc=0, scale = np.exp(self.para_dist_2[0]))) # pdf 2|1, f(comp_2|comp_1)=c(z1,z2)*f(comp_2)
dum = np.cumsum(p_x2_x1)
cdf = dum/(dum[self.Ndata-1]) # Estimate CDF from PDF
table = np.array((x2, cdf)) # Result of conditional CDF derived based on Gumbel copula
table = table.T
for j in range(self.Ndata):
if fi_u2[k] <= table[0,1]:
comp_2_Gumb[k] = min(table[:,0])
break
elif fi_u2[k] <= table[j,1]:
comp_2_Gumb[k] = (table[j,0]+table[j-1,0])/2
break
else:
comp_2_Gumb[k] = table[:,0].max()
Hs_Return = comp_1
T_Return = comp_2_Gumb
self.Hs_ReturnContours = Hs_Return
self.T_ReturnContours = T_Return
return Hs_Return, T_Return
[docs] def getSamples(self):
'''Currently not implemented in this version.'''
raise NotImplementedError
def _saveParams(self, groupObj):
groupObj.create_dataset('Ndata', data=self.Ndata)
groupObj.create_dataset('min_limit_2', data=self.min_limit_2)
groupObj.create_dataset('max_limit_2', data=self.max_limit_2)
groupObj.create_dataset('n_size', data=self.n_size)
groupObj.create_dataset('bin_1_limit', data=self.bin_1_limit)
groupObj.create_dataset('bin_step', data=self.bin_step)
groupObj.create_dataset('para_dist_1', data=self.para_dist_1)
groupObj.create_dataset('para_dist_2', data=self.para_dist_2)
groupObj.create_dataset('mean_cond', data=self.mean_cond)
groupObj.create_dataset('std_cond', data=self.std_cond)
[docs]class NonParaGaussianCopula(EA):
'''Create a NonParaGaussianCopula EA class for a buoy object. Contours
generated under this class will use a Gaussian copula with non-parametric
marginal distribution fits.'''
def __init__(self, buoy, Ndata = 1000, max_T=None, max_Hs=None):
'''
Parameters
----------
buoy : NDBCData
ESSC.Buoy Object
NData: int
discretization resolution used in KDE construction
max_T:float
Maximum T value for KDE contstruction, must include possible
range of contour. Default value is 2*max(T)
max_Hs:float
Maximum Hs value for KDE contstruction, must include possible
range of contour. Default value is 2*max(Hs)
'''
self.method = "Non-parametric Gaussian Copula"
self.buoy = buoy
self.Ndata = Ndata
self.Hs_ReturnContours = None
# self.Hs_SampleCA = None
# self.Hs_SampleFSS = None
self.T_ReturnContours = None
# self.T_SampleCA = None
# self.T_SampleFSS = None
# self.Weight_points = None
# self.coeff, self.shift, self.comp1_params, self.sigma_param, self.mu_param = self.__generateParams(size_bin)
if max_T == None:
max_T = max(self.buoy.T)*2.
if max_Hs == None:
max_Hs = max(self.buoy.Hs)*2.
self.max_T = max_T
self.max_Hs = max_Hs
self.nonpara_dist_1,self.nonpara_dist_2,self.nonpara_pdf_2 = self._EA__getNonParaCopulaParams(Ndata,max_T,max_Hs)
[docs] def getContours(self, time_ss, time_r, nb_steps = 1000):
'''WDRT Extreme Sea State Gaussian Copula Contour function.
This function calculates environmental contours of extreme sea states
using a Gaussian copula with non-parametric marginal distribution fits
and the inverse first-order reliability method.
Parameters
___________
time_ss : float
Sea state duration (hours) of measurements in input.
time_r : np.array
Desired return period (years) for calculation of environmental
contour, can be a scalar or a vector.
nb_steps : float
Discretization of the circle in the normal space used for
inverse FORM calculation.
Returns
-------
Hs_Return : np.array
Calculated Hs values along the contour boundary following
return to original input orientation.
T_Return : np.array
Calculated T values along the contour boundary following
return to original input orientation.
nb_steps : float
Discretization of the circle in the normal space
Example
-------
To obtain the contours for a NDBC buoy::
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create Environtmal Analysis object using above parameters
NonParaGauss46022 = ESSC.NonParaGaussianCopula(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Non-Parametric Gaussian copula contour generation example
Hs_Return, T_Return = NonParaGauss46022.getContours(Time_SS, Time_r,nb_steps)
'''
self.time_ss = time_ss
self.time_r = time_r
self.nb_steps = nb_steps
comp_1 = np.zeros(nb_steps)
comp_2_Gau = np.zeros(nb_steps)
# Inverse FORM
p_f = 1 / (365 * (24 / time_ss) * time_r)
beta = stats.norm.ppf((1 - p_f), loc=0, scale=1) # Reliability
# Normal Space
theta = np.linspace(0, 2 * np.pi, num = nb_steps)
U1 = beta * np.cos(theta)
U2 = beta * np.sin(theta)
# Copula parameters
tau = stats.kendalltau(self.buoy.T,self.buoy.Hs)[0]# Calculate Kendall's tau
rho_gau=np.sin(tau*np.pi/2.);
# Component 1 (Hs)
z1_Hs = stats.norm.cdf(U1)
for k in range(0,nb_steps):
for j in range(0,np.size(self.nonpara_dist_1,0)):
if z1_Hs[k] <= self.nonpara_dist_1[0,1]:
comp_1[k] = min(self.nonpara_dist_1[:,0])
break
elif z1_Hs[k] <= self.nonpara_dist_1[j,1]:
comp_1[k] = (self.nonpara_dist_1[j,0] + self.nonpara_dist_1[j-1,0])/2
break
else:
comp_1[k]= max(self.nonpara_dist_1[:,0])
# Component 2 (T)
z2_Gau=stats.norm.cdf(U2*np.sqrt(1.-rho_gau**2.)+rho_gau*U1);
for k in range(0,nb_steps):
for j in range(0,np.size(self.nonpara_dist_2,0)):
if z2_Gau[k] <= self.nonpara_dist_2[0,1]:
comp_2_Gau[k] = min(self.nonpara_dist_2[:,0])
break
elif z2_Gau[k] <= self.nonpara_dist_2[j,1]:
comp_2_Gau[k] = (self.nonpara_dist_2[j,0] + self.nonpara_dist_2[j-1,0])/2
break
else:
comp_2_Gau[k]= max(self.nonpara_dist_2[:,0])
Hs_Return = comp_1
T_Return = comp_2_Gau
self.Hs_ReturnContours = Hs_Return
self.T_ReturnContours = T_Return
return Hs_Return, T_Return
[docs] def getSamples(self):
'''Currently not implemented in this version.'''
raise NotImplementedError
def _saveParams(self, groupObj):
groupObj.create_dataset('nonpara_dist_1', data=self.nonpara_dist_1)
groupObj.create_dataset('nonpara_dist_2', data=self.nonpara_dist_2)
[docs]class NonParaClaytonCopula(EA):
'''Create a NonParaClaytonCopula EA class for a buoy object. Contours
generated under this class will use a Clayton copula with non-parametric
marginal distribution fits.'''
def __init__(self, buoy, Ndata = 1000, max_T=None, max_Hs=None):
'''
Parameters
----------
buoy : NDBCData
ESSC.Buoy Object
NData: int
discretization resolution used in KDE construction
max_T:float
Maximum T value for KDE contstruction, must include possible
range of contour. Default value is 2*max(T)
max_Hs:float
Maximum Hs value for KDE contstruction, must include possible
range of contour. Default value is 2*max(Hs)
'''
self.method = "Non-parametric Clayton Copula"
self.buoy = buoy
self.Ndata = Ndata
self.Hs_ReturnContours = None
# self.Hs_SampleCA = None
# self.Hs_SampleFSS = None
self.T_ReturnContours = None
# self.T_SampleCA = None
# self.T_SampleFSS = None
# self.Weight_points = None
# self.coeff, self.shift, self.comp1_params, self.sigma_param, self.mu_param = self.__generateParams(size_bin)
if max_T == None:
max_T = max(self.buoy.T)*2.
if max_Hs == None:
max_Hs = max(self.buoy.Hs)*2.
self.max_T = max_T
self.max_Hs = max_Hs
self.nonpara_dist_1,self.nonpara_dist_2,self.nonpara_pdf_2 = self._EA__getNonParaCopulaParams(Ndata,max_T,max_Hs)
[docs] def getContours(self, time_ss, time_r, nb_steps = 1000):
'''WDRT Extreme Sea State non-parameteric Clayton Copula Contour
function. This function calculates environmental contours of extreme
sea states using a Clayton copula with non-parametric marginal
distribution fits and the inverse first-order reliability method.
Parameters
___________
time_ss : float
Sea state duration (hours) of measurements in input.
time_r : np.array
Desired return period (years) for calculation of environmental
contour, can be a scalar or a vector.
nb_steps : float
Discretization of the circle in the normal space used for
inverse FORM calculation.
Returns
-------
Hs_Return : np.array
Calculated Hs values along the contour boundary following
return to original input orientation.
T_Return : np.array
Calculated T values along the contour boundary following
return to original input orientation.
nb_steps : float
Discretization of the circle in the normal space
Example
-------
To obtain the contours for a NDBC buoy::
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create Environtmal Analysis object using above parameters
NonParaClayton46022 = ESSC.NonParaClaytonCopula(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Non-Parametric Clayton copula contour generation example
Hs_Return, T_Return = NonParaClayton46022.getContours(Time_SS, Time_r,nb_steps)
'''
self.time_ss = time_ss
self.time_r = time_r
self.nb_steps = nb_steps
comp_1 = np.zeros(nb_steps)
comp_2_Clay = np.zeros(nb_steps)
# Inverse FORM
p_f = 1 / (365 * (24 / time_ss) * time_r)
beta = stats.norm.ppf((1 - p_f), loc=0, scale=1) # Reliability
# Normal Space
theta = np.linspace(0, 2 * np.pi, num = nb_steps)
U1 = beta * np.cos(theta)
U2 = beta * np.sin(theta)
# Copula parameters
tau = stats.kendalltau(self.buoy.T,self.buoy.Hs)[0]# Calculate Kendall's tau
theta_clay = (2.*tau)/(1.-tau);
# Component 1 (Hs)
z1_Hs = stats.norm.cdf(U1)
for k in range(0,nb_steps):
for j in range(0,np.size(self.nonpara_dist_1,0)):
if z1_Hs[k] <= self.nonpara_dist_1[0,1]:
comp_1[k] = min(self.nonpara_dist_1[:,0])
break
elif z1_Hs[k] <= self.nonpara_dist_1[j,1]:
comp_1[k] = (self.nonpara_dist_1[j,0] + self.nonpara_dist_1[j-1,0])/2
break
else:
comp_1[k]= max(self.nonpara_dist_1[:,0])
# Component 2 (T)
z2_Clay=((1.-stats.norm.cdf(U1)**(-theta_clay)+stats.norm.cdf(U1)**(-theta_clay)/stats.norm.cdf(U2))**(theta_clay/(1.+theta_clay)))**(-1./theta_clay)
for k in range(0,nb_steps):
for j in range(0,np.size(self.nonpara_dist_2,0)):
if z2_Clay[k] <= self.nonpara_dist_2[0,1]:
comp_2_Clay[k,0] = min(self.nonpara_dist_2[:,0])
break
elif z2_Clay[k] <= self.nonpara_dist_2[j,1]:
comp_2_Clay[k] = (self.nonpara_dist_2[j,0] + self.nonpara_dist_2[j-1,0])/2
break
else:
comp_2_Clay[k]= max(self.nonpara_dist_2[:,0])
Hs_Return = comp_1
T_Return = comp_2_Clay
self.Hs_ReturnContours = Hs_Return
self.T_ReturnContours = T_Return
return Hs_Return, T_Return
[docs] def getSamples(self):
'''Currently not implemented in this version.'''
raise NotImplementedError
def _saveParams(self, groupObj):
groupObj.create_dataset('nonpara_dist_1', data=self.nonpara_dist_1)
groupObj.create_dataset('nonpara_dist_2', data=self.nonpara_dist_2)
[docs]class NonParaGumbelCopula(EA):
'''Create a NonParaGumbelCopula EA class for a buoy object. Contours
generated under this class will use a Gumbel copula with non-parametric
marginal distribution fits.'''
def __init__(self, buoy, Ndata = 1000, max_T=None, max_Hs=None):
'''
Parameters
----------
buoy : NDBCData
ESSC.Buoy Object
NData: int
discretization resolution used in KDE construction
max_T:float
Maximum T value for KDE contstruction, must include possible
range of contour. Default value is 2*max(T)
max_Hs:float
Maximum Hs value for KDE contstruction, must include possible
range of contour. Default value is 2*max(Hs)
'''
self.method = "Non-parametric Gumbel Copula"
self.buoy = buoy
self.Ndata = Ndata
self.Hs_ReturnContours = None
# self.Hs_SampleCA = None
# self.Hs_SampleFSS = None
self.T_ReturnContours = None
# self.T_SampleCA = None
# self.T_SampleFSS = None
# self.Weight_points = None
# self.coeff, self.shift, self.comp1_params, self.sigma_param, self.mu_param = self.__generateParams(size_bin)
if max_T == None:
max_T = max(self.buoy.T)*2.
if max_Hs == None:
max_Hs = max(self.buoy.Hs)*2.
self.max_T = max_T
self.max_Hs = max_Hs
self.nonpara_dist_1,self.nonpara_dist_2,self.nonpara_pdf_2 = self._EA__getNonParaCopulaParams(Ndata,max_T,max_Hs)
[docs] def getContours(self, time_ss, time_r, nb_steps = 1000):
'''WDRT Extreme Sea State non-parameteric Gumbel Copula Contour
function. This function calculates environmental contours of extreme
sea states using a Gumbel copula with non-parametric marginal
distribution fits and the inverse first-order reliability method.
Parameters
___________
time_ss : float
Sea state duration (hours) of measurements in input.
time_r : np.array
Desired return period (years) for calculation of environmental
contour, can be a scalar or a vector.
nb_steps : float
Discretization of the circle in the normal space used for
inverse FORM calculation.
Returns
-------
Hs_Return : np.array
Calculated Hs values along the contour boundary following
return to original input orientation.
T_Return : np.array
Calculated T values along the contour boundary following
return to original input orientation.
nb_steps : float
Discretization of the circle in the normal space
Example
-------
To obtain the contours for a NDBC buoy::
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create Environtmal Analysis object using above parameters
NonParaGumbel46022 = ESSC.NonParaGumbelCopula(buoy46022)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
nb_steps = 1000 # Enter discretization of the circle in the normal space (optional)
# Non-Parametric Gumbel copula contour generation example
Hs_Return, T_Return = NonParaGumbel46022.getContours(Time_SS, Time_r,nb_steps)
'''
self.time_ss = time_ss
self.time_r = time_r
self.nb_steps = nb_steps
comp_1 = np.zeros(nb_steps)
comp_2_Gumb = np.zeros(nb_steps)
# Inverse FORM
p_f = 1 / (365 * (24 / time_ss) * time_r)
beta = stats.norm.ppf((1 - p_f), loc=0, scale=1) # Reliability
# Normal Space
theta = np.linspace(0, 2 * np.pi, num = nb_steps)
U1 = beta * np.cos(theta)
U2 = beta * np.sin(theta)
# Copula parameters
tau = stats.kendalltau(self.buoy.T,self.buoy.Hs)[0]# Calculate Kendall's tau
theta_gum = 1./(1.-tau);
# Component 1 (Hs)
z1_Hs = stats.norm.cdf(U1)
for k in range(0,nb_steps):
for j in range(0,np.size(self.nonpara_dist_1,0)):
if z1_Hs[k] <= self.nonpara_dist_1[0,1]:
comp_1[k] = min(self.nonpara_dist_1[:,0])
break
elif z1_Hs[k] <= self.nonpara_dist_1[j,1]:
comp_1[k] = (self.nonpara_dist_1[j,0] + self.nonpara_dist_1[j-1,0])/2
break
else:
comp_1[k]= max(self.nonpara_dist_1[:,0])
# Component 2 (T)
fi_u1=stats.norm.cdf(U1);
fi_u2=stats.norm.cdf(U2);
for k in range(0,nb_steps):
z1 = np.linspace(fi_u1[k],fi_u1[k],self.Ndata)
Z = np.array((np.transpose(z1),self.nonpara_dist_2[:,1]))
Y = self._EA__gumbelCopula(Z, theta_gum)
Y =np.nan_to_num(Y) # Need to look into this
p_x2_x1 = Y*self.nonpara_pdf_2[:,1]
dum = np.cumsum(p_x2_x1)
cdf = dum/(dum[self.Ndata-1])
table = np.array((self.nonpara_pdf_2[:,0], cdf))
table = table.T
for j in range(self.Ndata):
if fi_u2[k] <= table[0,1]:
comp_2_Gumb[k] = min(table[:,0])
break
elif fi_u2[k] <= table[j,1]:
comp_2_Gumb[k] = (table[j,0]+table[j-1,0])/2
break
else:
comp_2_Gumb[k] = max(table[:,0])
Hs_Return = comp_1
T_Return = comp_2_Gumb
self.Hs_ReturnContours = Hs_Return
self.T_ReturnContours = T_Return
return Hs_Return, T_Return
[docs] def getSamples(self):
'''Currently not implemented in this version.'''
raise NotImplementedError
def _saveParams(self, groupObj):
groupObj.create_dataset('nonpara_dist_1', data=self.nonpara_dist_1)
groupObj.create_dataset('nonpara_dist_2', data=self.nonpara_dist_2)
[docs]class BivariateKDE(EA):
'''Create a BivariateKDE EA class for a buoy object. Contours
generated under this class will use a non-parametric KDE to fit the joint distribution.'''
def __init__(self, buoy, bw, NData = 100, logTransform = False, max_T=None, max_Hs=None):
'''
Parameters
----------
buoy : NDBCData
ESSC.Buoy Object
bw: np.array
Array containing KDE bandwidth for Hs and T
NData: int
Discretization resolution used in KDE construction
logTransform: Boolean
Logical. True if log transformation should be taken prior to
KDE construction. Default value is False.
max_T:float
Maximum T value for KDE contstruction, must include possible
range of contour. Default value is 2*max(T)
max_Hs:float
Maximum Hs value for KDE contstruction, must include possible
range of contour. Default value is 2*max(Hs)
'''
if logTransform:
self.method = "Bivariate KDE, Log Transform"
else:
self.method = "Bivariate KDE"
self.buoy = buoy
if max_T == None:
max_T = max(self.buoy.T)*2.
if max_Hs == None:
max_Hs = max(self.buoy.Hs)*2.
self.max_T = max_T
self.max_Hs = max_Hs
self.Hs_ReturnContours = None
self.T_ReturnContours = None
self.NData = NData
self.bw = bw
self.logTransform = logTransform
[docs] def getContours(self, time_ss, time_r):
'''WDRT Extreme Sea State non-parameteric bivariate KDE Contour
function. This function calculates environmental contours of extreme
sea states using a bivariate KDE to estimate the joint distribution.
The contour is then calculcated directly from the joint distribution.
Parameters
___________
time_ss : float
Sea state duration (hours) of measurements in input.
time_r : np.array
Desired return period (years) for calculation of environmental
contour, can be a scalar or a vector.
Returns
-------
Hs_Return : np.array
Calculated Hs values along the contour boundary following
return to original input orientation.
T_Return : np.array
Calculated T values along the contour boundary following
return to original input orientation.
Example
-------
To obtain the contours for a NDBC buoy::
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create Environmental Analysis object using above parameters
BivariateKDE46022 = ESSC.BivariateKDE(buoy46022, bw = [0.23,0.19], logTransform = False)
# Declare required parameters
Time_SS = 1. # Sea state duration (hrs)
Time_r = 100 # Return periods (yrs) of interest
# KDE contour generation example
Hs_Return, T_Return = BivariateKDE46022.getContours(Time_SS, Time_r)
'''
p_f = 1 / (365 * (24 / time_ss) * time_r)
if self.logTransform:
# Take log of both variables
logTp = np.log(self.buoy.T)
logHs = np.log(self.buoy.Hs)
ty = [logTp, logHs]
else:
ty = [self.buoy.T, self.buoy.Hs]
# Create grid of points
Ndata = self.NData
min_limit_1 = 0.01
max_limit_1 = self.max_T
min_limit_2 = 0.01
max_limit_2 = self.max_Hs
pts_tp = np.linspace(min_limit_1, max_limit_1, Ndata)
pts_hs = np.linspace(min_limit_2, max_limit_2, Ndata)
pt1,pt2 = np.meshgrid(pts_tp, pts_hs)
pts_tp = pt1.flatten()
pts_hs = pt2.flatten()
# Transform gridded points using log
xi = [pts_tp, pts_hs]
if self.logTransform:
txi = [np.log(pts_tp), np.log(pts_hs)]
else:
txi = xi
m = len(txi[0])
n = len(ty[0])
d = 2
# Create contour
f = np.zeros((1,m))
weight = np.ones((1,n))
for i in range(0,m):
ftemp = np.ones((n,1))
for j in range(0,d):
z = (txi[j][i] - ty[j])/self.bw[j]
fk = stats.norm.pdf(z)
if self.logTransform:
fnew = fk*(1/np.transpose(xi[j][i]))
else:
fnew = fk
fnew = np.reshape(fnew, (n,1))
ftemp = np.multiply(ftemp,fnew)
f[:,i] = np.dot(weight,ftemp)
fhat = f.reshape(100,100)
vals = plt.contour(pt1,pt2,fhat, levels = [p_f])
plt.clf()
self.Hs_ReturnContours = []
self.T_ReturnContours = []
for i,seg in enumerate(vals.allsegs[0]):
self.Hs_ReturnContours.append(seg[:,1])
self.T_ReturnContours.append(seg[:,0])
#
self.Hs_ReturnContours = np.transpose(np.asarray(self.Hs_ReturnContours)[0])
self.T_ReturnContours = np.transpose(np.asarray(self.T_ReturnContours)[0])
# self.vals = vals
# contourVals = np.empty((0,2))
# for seg in vals.allsegs[0]:
# contourVals = np.append(contourVals,seg, axis = 0)
# self.Hs_ReturnContours = contourVals[:,1]
# self.T_ReturnContours = contourVals[:,0]
return self.Hs_ReturnContours, self.T_ReturnContours
[docs]class Buoy(object):
'''
This class creates a buoy object to store buoy data for use in the
environmental assessment functions available in the ESSC module.
Attributes
__________
swdList : list
List that contains numpy arrays of the spectral wave density data,
separated by year.
freqList: list
List that contains numpy arrays that contain the frequency values
for each year
dateList : list
List that contains numpy arrays of the date values for each line of
spectral data, separated by year
Hs : list
Significant wave height.
T : list
Energy period.
dateNum : list
List of datetime objects.
'''
def __init__(self, buoyNum, buoyType):
'''
Parameters
___________
buoyNum : string
device number for desired buoy
buoyType : string
type of buoy device, available options are 'NDBC' or 'CDIP'
savePath : string
relative path where the data read from ndbc.noaa.gov will be stored
'''
self.swdList = []
self.freqList = []
self.dateList = []
self.Hs = []
self.T = []
self.dateNum = []
self.buoyNum = buoyNum
self.buoyType = buoyType.upper()
[docs] def fetchFromWeb(self, savePath = "./Data/",proxy=None):
'''
Calls either __fetchCDIP() or __fetchNDBC() depending on the given
buoy's type and fetches the necessary data from its respective website.
Parameters
----------
saveType: string
If set to to "h5", the data will be saved in a compressed .h5
file
If set to "txt", the data will be stored in a raw .txt file
Otherwise, a file will not be created
NOTE: Only applies
savePath : string
Relative path to place directory with data files.
proxy: dict
Proxy server and port, i.e., {http":"http://proxyserver:port"}
Example
_________
>>> import WDRT.ESSC as ESSC
>>> buoy = ESSC.Buoy('46022','NDBC')
>>> buoy.fetchFromWeb()
'''
if self.buoyType == "NDBC":
self.__fetchNDBC(proxy)
elif self.buoyType == "CDIP":
self.__fetchCDIP(savePath,proxy)
def __fetchNDBC(self, proxy):
'''
Searches ndbc.noaa.gov for the historical spectral wave density
data of a given device and writes the annual files from the website
to a single .txt file, and stores the values in the swdList, freqList,
and dateList member variables.
Parameters
----------
saveType: string
If set to to "h5", the data will be saved in a compressed .h5
file
If set to "txt", the data will be stored in a raw .txt file
Otherwise, a file will not be created
NOTE: Only applies
savePath : string
Relative path to place directory with data files.
'''
maxRecordedDateValues = 4
#preallocates data
numLines = 0
numCols = 0
numDates = 0
dateVals = []
spectralVals = []
#prepares to pull the data from the NDBC website
url = "https://www.ndbc.noaa.gov/station_history.php?station=%s" % (self.buoyNum)
if proxy == None:
ndbcURL = requests.get(url)
else:
ndbcURL = requests.get(url,proxies=proxy)
ndbcURL.raise_for_status()
ndbcHTML = bs4.BeautifulSoup(ndbcURL.text, "lxml")
headers = ndbcHTML.findAll("b", text="Spectral wave density data: ")
#checks for headers in differently formatted webpages
if len(headers) == 0:
raise Exception("Spectral wave density data for buoy #%s not found" % self.buoyNum)
if len(headers) == 2:
headers = headers[1]
else:
headers = headers[0]
links = [a["href"] for a in headers.find_next_siblings("a", href=True)]
#downloads files
for link in links:
dataLink = "https://ndbc.noaa.gov" + link
fileName = dataLink.replace('download_data', 'view_text_file')
data = urllib.request.urlopen(fileName)
print("Reading from:", data.geturl())
#First Line of every file contains the frequency data
frequency = data.readline()
if frequency.split()[4] == b'mm':
numDates = 5
else:
numDates = 4
frequency = np.array(frequency.split()[numDates:], dtype = np.float)
#splits and organizes data into arrays.
for line in data:
currentLine = line.split()
numCols = len(currentLine)
if numCols - numDates != len(frequency):
print("NDBC File is corrupted - Skipping and deleting data")
spectralVals = []
dateVals = []
break
if float(currentLine[numDates+1]) < 999:
numLines += 1
for j in range(maxRecordedDateValues):
dateVals.append(currentLine[j])
for j in range(numCols - numDates):
spectralVals.append(currentLine[j + numDates])
if len(spectralVals) != 0:
dateValues = np.array(dateVals, dtype=np.int)
spectralValues = np.array(spectralVals, dtype=np.float)
dateValues = np.reshape(dateValues, (numLines, maxRecordedDateValues))
spectralValues = np.reshape(spectralValues, (numLines,
(numCols - numDates)))
numLines = 0
numCols = 0
if len(spectralVals) != 0:
del dateVals[:]
del spectralVals[:]
self.swdList.append(spectralValues)
self.freqList.append(frequency)
self.dateList.append(dateValues)
self._prepData()
[docs] def loadFromTxt(self, dirPath = None):
'''Loads NDBC data previously downloaded to a series of text files in the
specified directory.
Parameters
----------
dirPath : string
Relative path to directory containing NDBC text files (created by
NBDCdata.fetchFromWeb). If left blank, the method will search
all directories for the data using the current directory as
the root.
Example
-------
To load data from previously downloaded files
created using fetchFromWeb():
import WDRT.ESSC as ESSC
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.loadFromText()
'''
#preallocates arrays
dateVals = []
spectralVals = []
numLines = 0
maxRecordedDateValues = 4
#finds the text files (if they exist on the machine)
if dirPath is None:
for dirpath, subdirs, files in os.walk('.'):
for dirs in subdirs:
if ("NDBC%s" % self.buoyNum) in dirs:
dirPath = os.path.join(dirpath,dirs)
break
if dirPath is None:
raise IOError("Could not find directory containing NDBC data")
fileList = glob.glob(os.path.join(dirPath,'SWD*.txt'))
if len(fileList) == 0:
raise IOError("No NDBC data files found in " + dirPath)
#reads in the files
for fileName in fileList:
print('Reading from: %s' % (fileName))
f = open(fileName, 'r')
frequency = f.readline().split()
numCols = len(frequency)
if frequency[4] == 'mm':
frequency = np.array(frequency[5:], dtype=np.float)
numTimeVals = 5
else:
frequency = np.array(frequency[4:], dtype=np.float)
numTimeVals = 4
for line in f:
currentLine = line.split()
if float(currentLine[numTimeVals + 1]) < 999:
numLines += 1
for i in range(maxRecordedDateValues):
dateVals.append(currentLine[i])
for i in range(numCols - numTimeVals):
spectralVals.append(currentLine[i + numTimeVals])
dateValues = np.array(dateVals, dtype=np.int)
spectralValues = np.array(spectralVals, dtype=np.double)
dateValues = np.reshape(dateValues, (numLines, maxRecordedDateValues))
spectralValues = np.reshape(
spectralValues, (numLines, (numCols - numTimeVals)))
del dateVals[:]
del spectralVals[:]
numLines = 0
numCols = 0
self.swdList.append(spectralValues)
self.freqList.append(frequency)
self.dateList.append(dateValues)
self._prepData()
[docs] def loadFile(self, dirPath = None):
'''Loads file depending on whether it's NDBC or CDIP.'''
if self.buoyType == "NDBC":
self.loadFromText(dirPath)
if self.buoyType == "CDIP":
self.loadCDIP(dirPath)
[docs] def loadFromH5(self, fileName = None):
"""
Loads NDBC data previously saved in a .h5 file
Parameters
----------
fileName : string
Name of the .h5 file to load data from.
Example
-------
To load data from previously downloaded files:
import WDRT.ESSC as ESSC
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
buoy46022.saveData()
buoy46022.loadFromH5('NDBC46022.h5')
"""
if fileName == None:
fileName = self.buoyType + self.buoyNum + ".h5"
_, file_extension = os.path.splitext(fileName)
if not file_extension:
fileName = fileName + '.h5'
print("Reading from: ", fileName)
try:
f = h5py.File(fileName, 'r')
except IOError:
raise IOError("Could not find file: " + fileName)
self.Hs = np.array(f['buoy_Data/Hs'][:])
self.T = np.array(f['buoy_Data/Te'][:])
self.dateNum = np.array(f['buoy_Data/dateNum'][:])
self.dateList = np.array(f['buoy_Data/dateList'][:])
print("----> SUCCESS")
[docs] def saveAsH5(self, fileName=None):
'''
Saves NDBC buoy data to h5 file after fetchFromWeb() or loadFromText().
This data can later be used to create a buoy object using the
loadFromH5() function.
Parameters
----------
fileName : string
relevent path and filename where the .h5 file will be created
and saved. If no filename, the h5 file will be named
NDBC(buoyNum).h5 in location where code is running.
Example
-------
To save data to h5 file after fetchFromWeb or loadFromText:
import WDRT.ESSC as ESSC
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
buoy46022.saveAsH5()
'''
if (fileName == None):
fileName = 'NDBC' + str(self.buoyNum) + '.h5'
else:
_, file_extension = os.path.splitext(fileName)
if not file_extension:
fileName = fileName + '.h5'
f = h5py.File(fileName, 'w')
self._saveData(f)
f.close()
print("Saved buoy data");
[docs] def saveAsTxt(self, savePath = "./Data/"):
"""
Saves spectral wave density data to a .txt file in the same format as the files
found on NDBC's website.
Parameters
----------
savePath : string
Relative file path where the .txt files will be saved.
Example
-------
To save data to h5 file after fetchFromWeb or loadFromText:
import WDRT.ESSC as ESSC
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
buoy46022.saveAsTxt()
"""
curYear = self.dateList[0][0]
dateIndexDiff = 0
bFile = False #NDBC sometimes splits years into two files, the second one titled "YYYYb"
saveDir = os.path.join(savePath, 'NDBC%s' % (self.buoyNum))
print("Saving in :", saveDir)
if not os.path.exists(saveDir):
os.makedirs(saveDir)
for i in range(len(self.swdList)):
if not bFile:
swdFile = open(os.path.join(saveDir, "SWD-%s-%d.txt" %
(self.buoyNum, curYear)), 'w')
else:
swdFile = open(os.path.join(saveDir, "SWD-%s-%db.txt" %
(self.buoyNum, curYear)), 'w')
bFile = False
freqLine = "YYYY MM DD hh"
for j in range(len(self.freqList[i])):
freqLine += (" " + "%2.4f" % self.freqList[i][j])
freqLine += "\n"
swdFile.write(freqLine)
for j in range(len(self.dateList)):
if (j + dateIndexDiff + 1) > len(self.dateList):
break
newYear = self.dateList[j + dateIndexDiff][0]
if curYear != newYear:
dateIndexDiff += (j)
curYear = newYear
break
if (j+1) > len(self.swdList[i]):
dateIndexDiff += (j)
bFile = True
break
swdLine = ' '.join("%0*d" % (2,dateVal) for dateVal in self.dateList[j + dateIndexDiff]) + " "
swdLine += " ".join("%6s" % val for val in self.swdList[i][j]) + "\n"
swdFile.write(swdLine)
[docs] def createSubsetBuoy(self, trainingSize):
'''Takes a given buoy and creates a subset buoy of a given length in years.
Parameters
----------
trainingSize : int
The size in years of the subset buoy you would like to create
Returns
-------
# subsetBuoy : ESSC.Buoy object
A buoy (with Hs, T, and dateList values) that is a subset of the given buoy
Example
-------
To get a corresponding subset of a buoy with a given number of years:
import WDRT.ESSC as ESSC
# Pull spectral data from NDBC website
buoy46022 = ESSC.Buoy('46022','NDBC')
buoy46022.fetchFromWeb()
# Create a subset of buoy 46022 consisting of the first 10 years
subsetBuoy = buoy46022.createSubsetBuoy(10)
'''
subsetBuoy = copy.deepcopy(self)
sortedIndex = sorted(range(len(self.dateNum)),key=lambda x:self.dateNum[x])
self.dateNum = self.dateNum[sortedIndex]
self.dateList = self.dateList[sortedIndex]
self.Hs = self.Hs[sortedIndex]
self.T = self.T[sortedIndex]
years = [0] * len(self.dateList)
for i in range(len(self.dateList)):
years[i] = self.dateList[i][0]
trainingYear = self.dateList[0][0] + trainingSize
cond = years <= trainingYear
subsetBuoy.Hs = self.Hs[cond]
subsetBuoy.T = self.T[cond]
subsetBuoy.dateList = self.dateList[cond]
return(subsetBuoy)
def _saveData(self, fileObj):
'''Organizes and saves wave height, energy period, and date data.'''
if(self.Hs is not None):
gbd = fileObj.create_group('buoy_Data')
f_Hs = gbd.create_dataset('Hs', data=self.Hs)
f_Hs.attrs['units'] = 'm'
f_Hs.attrs['description'] = 'significant wave height'
f_T = gbd.create_dataset('Te', data=self.T)
f_T.attrs['units'] = 'm'
f_T.attrs['description'] = 'energy period'
f_dateNum = gbd.create_dataset('dateNum', data=self.dateNum)
f_dateNum.attrs['description'] = 'datenum'
f_dateList = gbd.create_dataset('dateList', data=self.dateList)
f_dateList.attrs['description'] = 'date list'
else:
RuntimeError('Buoy object contains no data')
def __fetchCDIP(self,savePath,proxy):
"""
Fetches the Hs and T values of a CDIP site by downloading the respective .nc file from
http://cdip.ucsd.edu/
Parameters
----------
savePath : string
Relative path to place directory with data files.
"""
url = "http://thredds.cdip.ucsd.edu/thredds/fileServer/cdip/archive/" + str(self.buoyNum) + "p1/" + \
str(self.buoyNum) +"p1_historic.nc"
print("Downloading data from: " + url)
filePath = savePath + "/" + str(self.buoyNum) + "-CDIP.nc"
urllib.request.urlretrieve (url, filePath)
self.__processCDIPData(filePath)
[docs] def loadCDIP(self, filePath = None):
"""
Loads the Hs and T values of the given site from the .nc file downloaded from
http://cdip.ucsd.edu/
Parameters
----------
filePath : string
File path to the respective .nc file containing the Hs and T values
"""
if filePath == None:
filePath = "data/" + self.buoyNum + "-CDIP.nc"
self.__processCDIPData(filePath)
def __averageValues(self):
"""
Averages the Hs and T values of the given buoy to get hour time-steps rather than
half hour time-steps
"""
self.Hs = np.mean(self.Hs.reshape(-1,2), axis = 1)
self.T = np.mean(self.T.reshape(-1,2), axis = 1)
def __processCDIPData(self,filePath):
"""
Loads the Hs and T values from the .nc file downloaded from http://cdip.ucsd.edu/
Parameters
----------
filePath : string
File path to the respective .nc file containing the Hs and T values
"""
import netCDF4
try:
data = netCDF4.Dataset(filePath)
except IOError:
raise IOError("Could not find data for CDIP site: " + self.buoyNum)
self.Hs = np.array(data["waveHs"][:], dtype = np.double)
self.T = np.array(data["waveTa"][:], dtype = np.double)
data.close()
#Some CDIP buoys record data every half hour rather than every hour
if len(self.Hs)%2 == 0:
self.__averageValues()
def _prepData(self):
'''Runs _getStats and _getDataNums for full set of data, then removes any
NaNs. This cleans and prepares the data for use. Returns wave height,
energy period, and dates.
'''
n = len(self.swdList)
Hs = []
T = []
dateNum = []
for ii in range(n):
tmp1, tmp2 = _getStats(self.swdList[ii], self.freqList[ii])
Hs.extend(tmp1)
T.extend(tmp2)
dateNum.extend(_getDateNums(self.dateList[ii]))
dateList = [date for year in self.dateList for date in year]
Hs = np.array(Hs, dtype=np.float)
T = np.array(T, dtype=np.float)
dateNum = np.array(dateNum, dtype=np.float)
dateList = np.array(dateList)
# Removing NaN data, assigning T label depending on input (Te or Tp)
Nanrem = np.logical_not(np.isnan(T) | np.isnan(Hs))
# Find NaN data in Hs or T
dateNum = dateNum[Nanrem] # Remove any NaN data from DateNum
Hs = Hs[Nanrem] # Remove any NaN data from Hs
T = T[Nanrem] # Remove any NaN data from T
self.Hs = Hs
self.T = T
self.dateNum = dateNum
self.dateList = dateList
return Hs, T, dateNum, dateList
def _getDateNums(dateArr):
'''datetime objects
Parameters
----------
dateArr : np.array
Array of a specific years date vals from NDBC.fetchFromWeb
Returns
-------
dateNum : np.array
Array of datetime objects.
'''
dateNum = []
for times in dateArr:
if times[0] < 1900:
times[0] = 1900 + times[0]
dateNum.append(date.toordinal(datetime(times[0], times[1],
times[2], times[3])))
return dateNum
def _getStats(swdArr, freqArr):
'''Significant wave height and energy period
Parameters
----------
swdArr : np.array
Numpy array of the spectral wave density data for a specific year
freqArr: np.array
Numpy array that contains the frequency values for a specific year
Returns
-------
Hm0 : list
Significant wave height.
Te : list
Energy period.
'''
#Ignore divide by 0 warnings and resulting NaN warnings
np.seterr(all='ignore')
Hm0 = []
T = []
for line in swdArr:
m_1 = np.trapz(line * freqArr ** (-1), freqArr)
m0 = np.trapz(line, freqArr)
Hm0.append(4.004 * m0 ** 0.5)
T.append(m_1 / m0)
np.seterr(all='warn')
return Hm0, T