#! /usr/bin/env python from datetime import datetime import os import random import string import SDToolBox.outputmessages as om from netCDF4 import Dataset from sklearn.neighbors import BallTree as BallTree import xarray as xr import numpy as np def get_nearest_neighbor(value, data_array): """ search for nearest decimal degree in an array of decimal degrees and return the index. np.argmin returns the indices of minium value along an axis. so subtract value from all values in data_array, take absolute value and find index of minium. """ index_found = (np.abs(data_array - value)).argmin() value_found = data_array[index_found] return index_found, value_found class InputData: coord_list = [] input_dict = {} date_from = None date_to = None # These parameters are set in the extraction methods min_longitude = None max_longitude = None lon_key = 'lon' lat_key = 'lat' def validate(self, dataset: Dataset): """Validates the data structure based on the given NetCDF dataset Arguments: dataset {Dataset} -- Input Dataset Returns: Boolean -- Whether the validation succeeds or not """ valid_dates = self.validate_input_dates(self.date_from, self.date_to) if not valid_dates: return False valid_coords = self.validate_input_coordinates( dataset) if not valid_coords: return False def validate_input_coordinates(self, dataset: Dataset): """Validates whether the structure coordinates are valid for the given NetCDF dataset. Arguments: dataset {Dataset} -- Input dataset NetCDF4 based. Returns: Bool -- Status of the validation """ # Verify coordinates are valid. if not self.coord_list or len(self.coord_list) == 0: print("At least one coordinate point needs to be provided") return False if not dataset: print("No dataset was provided thus cannot be validated.") return False dataset_keys = list(dataset.variables.keys()) if self.lat_key not in dataset_keys \ or self.lon_key not in dataset_keys: print( "The dataset does not contain all needed variables, " + "latitude: {} ".format(self.lat_key) + "longitude: {}".format(self.lon_key)) return False xarray = xr.DataArray(dataset) lat_list = dataset.variables[self.lat_key][:] lon_list = dataset.variables[self.lon_key][:] # validate grid within points, otherwise create bounding box. if len(self.coord_list) == 1: # Only one point, meaning we need to retrieve its closest. lon, lat = self.coord_list[0] # near_coord = # self.__nearest_neighbors(dataset, self.coord_list[0]) # self.coord_list.clear() lat_approx = get_nearest_neighbor(lat, lat_list) lon_approx = get_nearest_neighbor(lon, lon_list) self.coord_list.append((lon, lat)) return True return False def __nearest_neighbors(self, dataset, coord): # lon = x, lat = y x, y = coord dataset_lon = dataset[self.lon_key][:] dataset_lat = dataset[self.lat_key][:] # lon[lon>180]=lon[lon>180]-360 grid_lon, grid_lat = np.meshgrid(dataset_lon, dataset_lat) kd = BallTree( np.column_stack( (grid_lon.ravel(), grid_lat.ravel())), leaf_size=10) _, indx = kd.query(np.column_stack((x, y))) pos_lon = np.where(dataset_lon == grid_lon.ravel()[indx].squeeze())[0] pos_lat = np.where(dataset_lat == grid_lat.ravel()[indx].squeeze())[0] return (pos_lon, pos_lat) @staticmethod def validate_input_dates(date_from: datetime, date_to: datetime): """Validates the given dates as a range. Arguments: date_from {datetime} -- Start input data in time. date_to {datetime} -- End of input data in time. Returns: Boolean -- Dates validation. """ # Verify date is valid. if not date_from or not date_to: print("Both date from and date to are required.") return False if date_from > date_to: print("Input date from needs cannot be later than the date to.") return False return True class OutputData: dim_station_key = 'station' dim_time_key = 'time' var_station_key = 'station' var_time_key = 'time' var_lat_key = 'lat' var_lon_key = 'lon' var_val_key = 'variables' var_proj_key = 'projection' epsg_code = 'EPSG:4326' __output_netcdf_file = None __output_xarray = None __data_dict = {} def __init__(self, var_list: list): """Creates a proxy dictionary with a dictionary of values from the given list. Arguments: var_list {list} -- List of variable names. Returns: dict -- Dictionary of formated input. """ if not var_list: var_list = [] print(om.error_missing_list_of_variables) values_dict = {} for var in var_list: # Set the values to None so we can # assign them later. values_dict[var] = None self.__data_dict = { OutputData.var_lat_key: [], OutputData.var_lon_key: [], OutputData.var_time_key: [], OutputData.var_val_key: values_dict } def get_data_dict(self): return self.__data_dict def get_xarray(self): return self.__output_xarray def get_netcdf_filepath(self): return self.__output_netcdf_file def generate_output( self, dir_path: str = None, base_name: str = None, dataset_code: str = None): """Generates an unified netcdf4 file and its xarray equivalent from the self.__data_dict. Keyword Arguments: dir_path {str} -- Target file path. (default: {None}) base_name {str} -- Base name for the output file. (default: {None}) dataset_code {str} -- Code of the dataset. (default: {None}) Returns: xarray {xarray.Dataset} -- Equivalent xarray of the dict. """ if not self.__data_dict or \ not self.__data_dict[self.var_lat_key] or \ not self.__data_dict[self.var_lon_key] or \ not self.__data_dict[self.var_time_key] or \ not self.__data_dict[self.var_val_key]: raise IOError(om.error_not_initialized_data_dict) if not dir_path or \ not base_name or \ not dataset_code: # Get current directory dir_path = os.getcwd() # temp_generated_netcdf base_name = 'temp_generated_{}'.format(self.__get_random_str(4)) dataset_code = 'CF' netcdf_filepath = self.generate_netcdf( dir_path=dir_path, base_name=base_name, dataset_code=dataset_code ) self.__output_netcdf_file = netcdf_filepath return self.generate_xarray_from_netcdf(netcdf_filepath) def generate_xarray_from_netcdf(self, file_path: str): """Generates an xarray and sets a new dataset Arguments: file_path {str} -- path to the new netcdf. Returns: xarray -- Xarray dataset. """ self.__output_xarray = xr.open_dataset(file_path) self.__output_xarray.close() # TODO: # we should probably update the data dictionary # so basically an importer. return self.__output_xarray def generate_netcdf( self, dir_path: str, base_name: str, dataset_code: str): """Generates an unified netcdf4 file from the self.__data_dict. Arguments: file_path {str} -- Target file path. base_name {str} -- Base name for the output file. dataset_code {str} -- Code of the dataset Returns: nc_filename {str} -- Filepath to the generated dataset. """ file_name = '{}_{}.nc'.format( base_name, dataset_code) nc_filename = os.path.join(dir_path, file_name) print('Writing {}'.format(nc_filename)) self.__cleanup_data_dict() with Dataset(nc_filename, mode='w', format="NETCDF4") as netcdf: self.__set_dataset_structure(netcdf) self.__set_wave_variables(netcdf, dataset_code) self.__set_wave_data(netcdf, dataset_code) self.__set_global_data(netcdf, dataset_code) self.__output_netcdf_file = nc_filename return nc_filename def __cleanup_data_dict(self): """Removes variable entries with no values. """ remove_keys = [ key for key in self.__data_dict[self.var_val_key].keys() if self.__data_dict[self.var_val_key][key] is None ] for key in remove_keys: del self.__data_dict[self.var_val_key][key] def __get_random_str(self, length: int = 10): """Generate a random string of fixed length """ letters = string.ascii_lowercase return ''.join(random.choice(letters) for i in range(length)) def __get_station_number(self): lon_keys = self.__data_dict[self.var_lon_key] lat_keys = self.__data_dict[self.var_lat_key] return len(lon_keys) * len(lat_keys) def __get_time_samples(self): time_array = self.__data_dict[self.var_time_key] if not time_array: return 0 return len(time_array) def __get_time_values(self): return self.__data_dict[self.var_time_key] def __set_global_data( self, netcdf: Dataset, dataset_code: str): """Sets the global data for our new output dataset. see: http://www.unidata.ucar.edu/software/thredds/current/ netcdf-java/formats/DataDiscoveryAttConvention.html Arguments: netcdf {Dataset} -- Output dataset output_format {str} -- Format for the netcdf file. dataset_code {str} -- Input dataset format. """ netcdf.featureType = 'timeSeries' netcdf.Conventions = 'CF-1.4' netcdf.standard_name_vocabulary = 'CF Standard Name Table vNN' netcdf.title = '{}'.format(dataset_code) + \ ' data North Sea Dutch coast' netcdf.summary = '' + \ 'North Sea Water Level data in coastal gauges for the Dutch coast' netcdf.period = '{}'.format(dataset_code) netcdf.keywords = '' + \ 'water level, storm surge, astronomical tide, mean sea level' netcdf.institution = 'Deltares' netcdf.publisher_name = 'Deltares' netcdf.publisher_url = 'https://www.deltares.nl' netcdf.publisher_email = 'jose.antolinez@deltares.nl' # Bounds (spatial, temporal) netcdf.lat_bounds = [ min(self.__data_dict[self.var_lat_key]), max(self.__data_dict[self.var_lat_key])] netcdf.lon_bounds = [ min(self.__data_dict[self.var_lat_key]), max(self.__data_dict[self.var_lat_key])] netcdf.time_bounds = [ self.__get_time_values()[0].strftime('%d/%m/%Y %H:%M:%S'), self.__get_time_values()[-1].strftime('%d/%m/%Y %H:%M:%S')] netcdf.geospatial_lat_units = 'degrees_north' netcdf.geospatial_lon_units = 'degrees_east' netcdf.time_coverage_resolution = 'hourly' netcdf.date_created = datetime.strftime( datetime.utcnow(), format='%Y-%m-%dT%H:%MZ') netcdf.date_modified = datetime.strftime( datetime.utcnow(), format='%Y-%m-%dT%H:%MZ') netcdf.date_issued = datetime.strftime( datetime.utcnow(), format='%Y-%m-%dT%H:%MZ') netcdf.cdm_data_type = 'gridded' def __set_dataset_structure( self, netcdf: Dataset): """Sets the default dimensions for our standarize outpud netCDF4 file. Arguments: netcdf {Dataset} -- Extracted dataset. """ self.__set_time_and_stations(netcdf) self.__set_geo_data(netcdf) def __set_time_and_stations(self, netcdf: Dataset): """Creates the needed header defined by the dataset time and station dimension and variables. Arguments: netcdf {Dataset} -- Output netCDF4. Returns: netCDF4.Dataset -- Output netCDF4 dataset. """ # Create dimensions netcdf.createDimension( self.dim_station_key, self.__get_station_number()) netcdf.createDimension( self.dim_time_key, self.__get_time_samples()) # Stations and Time station_variable = netcdf.createVariable( self.var_station_key, 'u2', (self.dim_station_key,), zlib=True, complevel=5, shuffle=True) station_variable.long_name = 'station name' station_variable.cf_role = 'timeseries_id' # TODO: not able to set correctly the dimensions. time_variable = netcdf.createVariable( self.var_time_key, 'u4', (self.dim_time_key,), zlib=True, complevel=5, shuffle=True) time_variable.long_name = 'time of simulation' time_variable.standard_name = 'time' # Start of epoch time_variable.units = '' + \ 'seconds since 1970-01-01 00:00:00' time_variable.calendar = 'julian' time_variable.axis = 'T' # As offset, # use the minimal occuring value in REAL UNITS, i.e. seconds. time_delta_from_origin = [ time - datetime(1970, 1, 1, 0, 0, 0) for time in self.__get_time_values()] netcdf.variables[self.var_time_key].add_offset = \ min(time_delta_from_origin).total_seconds() # The data is given per hour, therefore: 3600 scale-factor # is sufficient. However: # there are more than 65536 hours of data, # so it should still be stored as an u4 # resolution of one second time_variable.scale_factor = 1 # NO scaling when scale_factor is already applied time_variable[:] = list(map( lambda t: t.total_seconds(), time_delta_from_origin)) return netcdf def __set_geo_data(self, netcdf: Dataset): """Sets the Geo variables needed in the Dataset. Arguments: netcdf {Dataset} -- Output netCDF4 dataset. Returns: netCDF4.Dataset -- Output netCDF4 dataset. """ self.__set_geo_variable( netcdf, self.var_lat_key, 'station latitude', 'latitude', 'degrees_north', 'Y', self.__data_dict[self.var_lat_key] ) self.__set_geo_variable( netcdf, self.var_lon_key, 'station longitude', 'longitude', 'degrees_east', 'X', self.__data_dict[self.var_lon_key] ) # Set projection details. netcdf.createVariable( self.var_proj_key, '|S1', (), zlib=True, complevel=5, shuffle=True) netcdf.variables[self.var_proj_key].EPSG_code = 'EPSG:4326' return netcdf def __set_wave_data( self, netcdf: Dataset, dataset_code: str): """ Sets the variables and values for the waves. Arguments: netcdf {Dataset} -- Output dataset. dataset_code {str} -- Data type code. dict_value {dict} -- Dictionary of input variables. """ # Add data. variables = self.__data_dict[self.var_val_key] for var_name in variables.keys(): if var_name.lower() not in map(str.lower, netcdf.variables.keys()): print( 'parameter {} '.format(var_name) + 'not present in output netCDF') # to next parameter continue # Assign the data from the frame towards the netCDF-parameter print('writing variable {}'.format(var_name)) netcdf.variables[var_name.upper()][:] = variables[var_name] def __set_wave_variables(self, netcdf: Dataset, dataset_code: str): """Sets all sort of variables for the wave model. Arguments: netcdf {Dataset} -- Dataset where to store the variables. dataset_code {str} -- Auxiliar string with the input type. """ self.__set_model_variable( netcdf, 'SWH', 'significant wave height', 'sea_surface_wave_significant_height', 'significant wave height of combined wind waves and swell', 'lat lon', 'm', 0.01, 0 ) self.__set_model_variable( netcdf, 'PP1D', 'peak wave period of 1D spectr', 'sea_surface_wave_period_at_variance_spectral_density_maximum', 'peak wave period', 'lat lon', 's', 0.01, 0 ) self.__set_model_variable( netcdf, 'MP1', 'mean wave period based on first moment', 'sea_surface_wave_mean_period_from_variance_' + 'spectral_density_first_frequency_moment', 'mean wave period', 'lat lon', 's', 0.01, 0 ) self.__set_model_variable( netcdf, 'MWD', 'mean wave direction', 'sea_surface_wave_from_direction', 'mean wave direction', 'lat lon', 'degree', 0.01, 0 ) self.__set_model_variable( netcdf, 'WDW', 'wave spectral directional width', 'wave_spectral_directional_width', 'wave spectral directional width', 'lat lon', 'dimensionless', 0.01, 0 ) self.__set_model_variable( netcdf, 'WL', 'total water level', 'total_water_level', 'total water level', 'lat lon', 'm', 0.01, 0 ) self.__set_model_variable( netcdf, 'AT', 'astronomical tide', 'astronomical_tide', 'astronomical tide', 'lat lon', 'm', 0.01, 0 ) self.__set_model_variable( netcdf, 'SS', 'non tidal residual', 'non_tidal_residual', 'non tidal residual', 'lat lon', 'm', 0.01, 0 ) if dataset_code in['RCP45', 'RCP85']: self.__set_model_variable( netcdf, 'SLR', 'mean sea level', 'mean_sea_level', 'mean sea level', 'lat lon', 'm', 0.01, 0 ) def __set_model_variable( self, netcdf: Dataset, var_name: str, description: str, standard_name: str, long_name: str, coordinates: str, units: str, scale_factor: float, offset: float): """Creates a new variable in the given netCDF dataset. Arguments: netcdf {Dataset} -- Dataset for new variable. var_name {str} -- Variable name. description {str} -- Description of variable. standard_name {str} -- Standard name for variable. long_name {str} -- Long name for variable. coordinates {str} -- Coordinates name for variable. units {str} -- Units name for variable. scale_factor {float} -- Scale factor. offset {float} -- Offset for the given variable. Returns: Dataset.Variable -- New created variable. """ keys_lower = map(str.lower, self.__data_dict[self.var_val_key].keys()) if var_name.lower() not in keys_lower: return model_var = netcdf.createVariable( var_name, 'i2', (self.dim_time_key, self.dim_station_key), zlib=True, complevel=5, shuffle=True) model_var.coordinates = coordinates model_var.description = description model_var.standard_name = standard_name model_var.units = units model_var.long_name = long_name model_var.scale_factor = scale_factor model_var.add_offset = offset return model_var def __set_geo_variable( self, netcdf: Dataset, variable_name: str, long_name: str, standard_name: str, units: str, axis: str, values: list): """Creates a given geo variable with the standard parameters. Arguments: netcdf {Dataset} -- Output dataset. variable_name {str} -- Name to set for the new variable. long_name {str} -- Long name in NetCDF4. standard_name {str} -- Standard name in NetCDF4. units {str} -- Units for the given variable. axis {str} -- Axis for the given variable. values {list} -- Array of values for the given variable. Returns: Dataset.Variable -- New created geo variable. """ geo_variable = netcdf.createVariable( variable_name, 'f4', (self.dim_station_key,), zlib=True, complevel=5, shuffle=True) geo_variable.long_name = long_name geo_variable.standard_name = standard_name geo_variable.units = units geo_variable.axis = axis geo_variable[:] = values return geo_variable