Index: trunk/SDToolBox/statistical_model.py =================================================================== diff -u -r100 -r101 --- trunk/SDToolBox/statistical_model.py (.../statistical_model.py) (revision 100) +++ trunk/SDToolBox/statistical_model.py (.../statistical_model.py) (revision 101) @@ -14,6 +14,10 @@ from sklearn.model_selection import train_test_split from sklearn.model_selection import TimeSeriesSplit from sklearn.linear_model import LinearRegression +from sklearn.metrics import explained_variance_score +from sklearn.metrics import mean_squared_error +from scipy.stats import pearsonr +from scipy.stats import ks_2samp from sklearn import datasets from sklearn import preprocessing from sklearn import svm @@ -120,29 +124,43 @@ scaler.transform(test_model) @staticmethod - def robutness_metrics(train_model, test_model) -> Tuple[float, float]: + def robutness_metrics(obs: np.array, sim: np.array) -> Dict[str, float]: """Compares terms of bias, ratio variances, RMSE, correlation coefficient and validity of the distribution using Kolmogorov-Smirnov Arguments: - train_model {np.Array} -- Trained model. - test_model {np.Array} -- Test / validation model. + obs {np.Array} -- Original data split validation. + sim {np.Array} -- Regression result model. Returns: - Tuple[float,float] -- (D, p-value) + Dict[str, float] -- Dictionary of metrics and their results. """ - return stats.ks_2samp(train_model, test_model) + return { + 'BIAS': StatisticalModel.get_BIAS(obs, sim, 0), + 'RMSE': np.sqrt(mean_squared_error(obs, sim, multioutput='raw_values')), + 'Ratio variances': obs.std()/sim.std(), + 'Explained variance': explained_variance_score(obs, sim, multioutput='raw_values'), + 'Correlation coefficient': pearsonr(obs, sim), + 'Distribution validation': ks_2samp(obs, sim), + } @staticmethod - def multivariate_mlr(X, Y): - """Multivariate multiple linear regression. + def get_BIAS( + measured_sample: np.array, + predicted_sample: np.array, + ax: int) -> np.array: + """Calculates the bias between two samples. Arguments: - X {[np.Array]} -- Reference predictor data. + measured_sample {np.array} -- Measured sample. + predicted_sample {np.array} -- Predicted sample. + ax {int} -- Axis for the n_samples. + + Returns: + np.array -- Calculated bias based on the input samples. """ - # Y = sum(alpha_i + X_i) - raise Exception(om.error_function_not_implemented) + return np.mean(predicted_sample, axis=ax) - np.mean(measured_sample, axis=ax) @staticmethod def bias_and_variance_correction(train_model): @@ -157,3 +175,57 @@ scaler = preprocessing.StandardScaler().fit(train_model) projection = scaler.transform(train_model) return (projection - projection.mean(axis=0))/(projection.std(axis=0))*train_model.std(axis=0) + train_model.mean(axis=0) + + @staticmethod + def multivariate_mlr(X: np.array, Y: np.array): + """Multivariate multiple linear regression. + """ + if(len(X) != len(Y)): + raise Exception('Argument arrays should have the same size.') + + alpha_coeff = [Y[i] - X[i] for i in range(0, len(X))] + + raise Exception(om.error_function_not_implemented) + + + @staticmethod + def optimum_regression_model_ess(X: np.array, Y: np.array): + raise Exception(om.error_function_not_implemented) + + @staticmethod + def __get_angular_components(component: np.array) -> Tuple[np.array, np.array]: + """Gets the angular components (cos / sin) for the given component. + + Arguments: + component {np.array} -- Component to calculate. + + Returns: + Tuple[np.array, np.array] -- Calculated angular components. + """ + pi = math.pi + temp = pi/2 - component*pi/180 + neg_idx = [idx for idx in range(0, len(temp)) if temp[idx] < -pi] + # Correct indices. + for idx in neg_idx: + temp[idx] = 2*pi+temp[idx] + + return math.cos(temp), math.sin(temp) + + @staticmethod + def regression_model_circular(X: np.array, Y: np.array): + raise Exception(om.error_function_not_implemented) + # preinitialize + # Ycalx, Ycaly = StatisticalModel.__get_angular_components(X) + + # # Regression + # Xcal + + # # Validation + + # # Linear regression + # regression = LinearRegression().fit(X, Y) + # b = regress(Ycal, Xcal) + # Ymod = [np.ones(len(Xcal))] * b[0] + # Ymod = [ Ymod[i] + b[i+1]*Xcal[:, i+1] for i in range(0, len(b)-1)] + # result = Ycal - Ymod + # return sum(np.square(result)), result