Index: trunk/SDToolBox/extract_data_EARTH.py =================================================================== diff -u --- trunk/SDToolBox/extract_data_EARTH.py (revision 0) +++ trunk/SDToolBox/extract_data_EARTH.py (revision 55) @@ -0,0 +1,237 @@ +#! /usr/bin/env python +""" + +""" + +# region // imports +import sys +import os +from typing import Tuple +from datetime import datetime, timedelta +from SDToolBox import outputmessages as outputmessages +from SDToolBox import data_acquisition +from SDToolBox.extract_data import ExtractData +from netCDF4 import Dataset +import numpy as np + +# endregion + +# region // variables + +# endregion + + +class ExtractDataEARTH(ExtractData): + + __lon_key = 'lon' + __lat_key = 'lat' + + _variable_dict = { + 'var151': 'var151' + } + + def subset_sea_level_pressure(self, directory_path: str): + """Extracts a collection of netCDF4 subsets based on the input data + set when creating the extract_waves object. + + Arguments: + directory_path {str} -- Location of all the variable diretories. + + Returns: + list(Dataset) -- collection of netCDF4 subsets per variable. + """ + month = 1 + + filtered_dict, output_data, cases_dict = \ + self.__get_initial_extraction_data() + nn_idx = None + + for n_variable, variable_name in enumerate(filtered_dict): + case_name_value = filtered_dict[variable_name] + for yearidx, year in enumerate(self._input_years): + if month < 10: + base_file_name = self._input_EARTH_scenario + '_\\' + 'EC-Earth_RCP4.5_MSLP_' + str(year) + '0' + str(month) + '.nc' + else: + base_file_name = self._input_EARTH_scenario + '_\\' + 'EC-Earth_RCP4.5_MSLP_' + str(year) + str(month) + '.nc' + # If file does not exist simply go to the next one + # case_dir = os.path.join(directory_path, case_name_value) + case_file_path = os.path.join(directory_path, base_file_name) + nn_idx = self.__get_case_subset_from_netcdf( + case_file_path, + cases_dict, + nn_idx, + variable_name, + n_variable + ) + month = month+1 + + return output_data + + def __get_filtered_dict(self): + """Filters the defined dictionary with only the + values provided by the user as input_variables. + + Returns: + dict -- Dictionary of type str: str. + """ + return { + k: v + for k, v in self._variable_dict.items() + if k in self._input_variables} + + def __get_initial_extraction_data(self): + """Gets the basic elements for extracting ERA5 data. + + Returns: + Tuple(dict, OutputData, dict) -- + Tuple of values needed for extracting data. + """ + filtered_dict = self.__get_filtered_dict() + + output_data = data_acquisition.OutputData( + self._input_variables + ) + cases_dict = output_data.get_data_dict() + + # longitude should be found as the 'x' in the first coordinate of + self.__input_lon = [ + self.__check_for_longitude(lon) + for lon in self._input_lon] + + return filtered_dict, output_data, cases_dict + + def __get_case_subset_from_netcdf( + self, + case_file_path: str, + cases_dict: str, + nn_idx: Tuple[int, int], + variable_name: str, + n_variable: int): + """Gets all the values from a netcdf for the given variable + and delimited nearest neighbors. + + Arguments: + case_file_path {str} -- Path to the netcdf file. + cases_dict {str} -- Output values. + nn_idx {Tuple[int, int]} -- Nearest Neighbors lon/lat. + variable_name {str} -- Name of the variable to extract. + n_variable {int} -- Index of the variable to search. + + Returns: + Tuple[int, int] -- Nearest neigbors lon/lat. + """ + + # If file does not exist simply go to the next one + if not os.path.exists(case_file_path): + print( + 'File {}'.format(case_file_path) + + 'does not exist or could not be found.') + return + + if not nn_idx: + nn_idx = self.__get_corrected_lon_lat( + case_file_path, cases_dict + ) + + # Lazy loading of the dataset. + with Dataset(case_file_path, 'r', self._ds_format) \ + as case_dataset: + cases_dict[self._out_val_key][variable_name] = \ + self.__get_variable_subset( + cases_dict[self._out_val_key][variable_name], + case_dataset, + variable_name, + nn_idx + ) + # Get the time for the variable. + if n_variable == 0: + # add the lines to get the reference time + # automatically just in case + reftime = \ + case_dataset[self._out_time_key].units.split(' ') + # This is an assumption that all the grids have + # the same scale in regards of time. + cases_dict[self._out_time_key].extend( + [datetime.strptime( + reftime[2]+' '+reftime[3], + '%Y-%m-%d %H:%M:%S') + + timedelta(hours=int(ti)) + for ti in case_dataset[self._time_key][:]] + ) + return nn_idx + + def __get_variable_subset( + self, + variable_values: list, + netcdf_dataset: Dataset, + variable_name: str, + nn_idx): + """Gets the subset of vaues for the given variable. + + Arguments: + variable_values {list} -- Stored values. + netcdf_dataset {Dataset} -- Input netCDF dataset. + variable_name {str} -- Name of the variable. + nn_idx {duple} -- Duple of lon or lat index. + + Returns: + Array -- Array of values. + """ + nn_lon_idx, nn_lat_idx = nn_idx + if variable_values is None: + return self.__get_case_subset( + netcdf_dataset, + variable_name, + nn_lon_idx, + nn_lat_idx) + return np.concatenate( + (variable_values, + self.__get_case_subset( + netcdf_dataset, + variable_name, + nn_lon_idx, + nn_lat_idx)), + axis=0) + + def __get_corrected_lon_lat( + self, ref_file_path: str, cases_dict: dict): + """Gets the corrected index and value for the given input coordinates. + + Arguments: + directory_path {str} -- Parent directory. + cases_dict {dict} -- Dictionary with all values that need format. + + Returns: + [type] -- [description] + """ + nn_lon_idx = [] + nn_lat_idx = [] + + # Extract index and value for all input lat, lon. + with Dataset(ref_file_path, 'r', self._ds_format) \ + as ref_dataset: + lat_list = ref_dataset.variables[self.__lat_key][:] + lon_list = ref_dataset.variables[self.__lon_key][:] + for lon_point in self._input_lon: + idx, value = data_acquisition.get_nearest_neighbor( + lon_point, + lon_list) + cases_dict[self._out_lon_key].append(value) + nn_lon_idx.append(idx) + for lat_point in self._input_lat: + idx, value = data_acquisition.get_nearest_neighbor( + lat_point, + lat_list) + cases_dict[self._out_lat_key].append(value) + nn_lat_idx.append(idx) + return nn_lon_idx, nn_lat_idx + + @staticmethod + def __get_case_subset(dataset, variable_name, lon, lat): + return dataset[variable_name][:, lat, lon] + + @staticmethod + def __check_for_longitude(longitude): + if longitude > 180: + return longitude-180 + return longitude