#! /usr/bin/env python """ """ # region // imports import sys import os from datetime import datetime, timedelta from SDToolBox import outputmessages as outputmessages from SDToolBox import data_acquisition from netCDF4 import Dataset import numpy as np # endregion # region // variables # endregion class ExtractSeaLevelPressureERA5: __ds_format = 'netCDF4' __lon_key = 'longitude' __lat_key = 'latitude' __time_key = 'time' __input_lon = None __input_lat = None __out_lat_key = data_acquisition.OutputData.var_lat_key __out_lon_key = data_acquisition.OutputData.var_lon_key __out_time_key = data_acquisition.OutputData.var_time_key __out_val_key = data_acquisition.OutputData.var_val_key def __init__(self, input_data: data_acquisition.InputData) : """Initialize the waves extraction. Arguments: input_data {data_acquisition.InputData} -- Required. """ # verify input_data not none if not input_data or \ not input_data.input_coordinates or \ len(input_data.input_coordinates) < 1: raise IOError('No valid input data.') input_data.lon_key = self.__lon_key input_data.lat_key = self.__lat_key self.__input_data = input_data self.__input_lon, self.__input_lat = zip(*input_data.input_coordinates) def subset_slp_era5(self, directory_path: str, year_from: int, year_to: int) : """Extracts a collection of netCDF4 subsets based on the input data set when creating the extract_sea_level_pressure object. Arguments: directory_path {str} -- Location of all the variable diretories. area_latitude {list} -- List with x,y coordinate for the latitude area_longitude {list} -- List with x,y coordinate for the longitude year_from {int} -- Start of time data to substract. year_to {int} -- End of time data to substract. we suppose that the input is already a point of coordinates lon and lat, since the point is already calculated previously (see Slp_Grd_ERA5_extractR.m lines 11-41) Returns: list(Dataset) -- collection of netCDF4 subsets per variable. """ variable_dict = { 'msl': 'msl' } output_data = data_acquisition.OutputData(variable_dict.keys()) cases_dict = output_data.get_data_dict() years = self.generate_years_array(year_from, year_to) nn_lon_idx, nn_lat_idx = self.__get_corrected_lon_lat( directory_path, cases_dict ) for n_variable, variable_name in enumerate(variable_dict): case_name_value = variable_dict[variable_name] cases_dict[self.__out_val_key][variable_name] #extracts for all the years for year_idx, year in enumerate(years): base_file_name = 'era5_Global_{}_p_{}.nc'.format(case_name_value, year) case_file_path = os.path.join(directory_path, base_file_name) # If file does not exist simply go to the next one if not os.path.exists(case_file_path): print('File {} does not exist or could not be found.'.format(case_file_path)) continue with Dataset(case_file_path, 'r', self.__ds_format) \ as case_dataset: # Get dataset # Find nearest point (considering we are only selecting a point) cases_dict[self.__out_val_key][variable_name] = \ self.__get_variable_subset( cases_dict[self.__out_val_key][variable_name], case_dataset, variable_name, (nn_lon_idx, nn_lat_idx) ) # Get the time for the variable. # add the lines to get the reference time # automatically just in case if n_variable == 0: reftime = case_dataset[self.__time_key].units.split(' ') # This is an assumption that all the grids have # the same scale in regards of time. cases_dict[self.__out_time_key].append( [datetime.strptime( reftime[2]+' '+reftime[3], '%Y-%m-%d %H:%M:%S.%f') + timedelta(hours=int(ti)) for ti in case_dataset[self.__time_key][:]]) return output_data def __get_variable_subset( self, variable_values: list, netcdf_dataset: Dataset, variable_name: str, nn_idx): """Gets the subset of vaues for the given variable. Arguments: variable_values {list} -- Stored values. netcdf_dataset {Dataset} -- Input netCDF dataset. variable_name {str} -- Name of the variable. nn_idx {duple} -- Duple of lon or lat index. Returns: Array -- Array of values. """ nn_lon_idx, nn_lat_idx = nn_idx if variable_values is None: return self.get_case_subset( netcdf_dataset, variable_name, nn_lon_idx, nn_lat_idx) return np.concatenate( (variable_values, self.get_case_subset( netcdf_dataset, variable_name, nn_lon_idx, nn_lat_idx)), axis=0) def __get_corrected_lon_lat( self, directory_path: str, cases_dict: dict): """Gets the corrected index and value for the given input coordinates. Arguments: directory_path {str} -- Parent directory. cases_dict {dict} -- Dictionary with all values that need format. Returns: [type] -- [description] """ nn_lon_idx = [] nn_lat_idx = [] base_file_name = 'era5_Global_msl_p_1981.nc' ref_file_path = os.path.join(directory_path, base_file_name) # Extract index and value for all input lat, lon. with Dataset(ref_file_path, 'r', self.__ds_format) \ as ref_dataset: lat_list = ref_dataset.variables[self.__lat_key][:] lon_list = ref_dataset.variables[self.__lon_key][:] for lon_point in self.__input_lon: idx, value = data_acquisition.get_nearest_neighbor( lon_point, lon_list) cases_dict[self.__out_lon_key].append(value) nn_lon_idx.append(idx) for lat_point in self.__input_lat: idx, value = data_acquisition.get_nearest_neighbor( lat_point, lat_list) cases_dict[self.__out_lat_key].append(value) nn_lat_idx.append(idx) return nn_lon_idx, nn_lat_idx @staticmethod def get_case_subset(dataset, variable_name, lon, lat): return dataset[variable_name][:, lat, lon] @staticmethod def generate_years_array(year_from, year_to): years = [] for i in range(year_to - year_from): years.append(year_from + i) # fills an array of years years.append(year_to) return years