#!/usr/bin/env python # -*- coding: utf-8 -*- # # PCR-GLOBWB (PCRaster Global Water Balance) Global Hydrological Model # # Copyright (C) 2016, Ludovicus P. H. (Rens) van Beek, Edwin H. Sutanudjaja, Yoshihide Wada, # Joyce H. C. Bosmans, Niels Drost, Inge E. M. de Graaf, Kor de Jong, Patricia Lopez Lopez, # Stefanie Pessenteiner, Oliver Schmitz, Menno W. Straatsma, Niko Wanders, Dominik Wisser, # and Marc F. P. Bierkens, # Faculty of Geosciences, Utrecht University, Utrecht, The Netherlands # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import pcraster as pcr import virtualOS as vos from wflow.wflow_lib import configsection from wflow.wflow_lib import configget class SoilAndTopoParameters(object): def __init__(self, iniItems, landmask, Dir, cloneMap, tmpDir): object.__init__(self) # cloneMap, tmpDir, inputDir based on the configuration/setting given in the ini/configuration file self.cloneMap = cloneMap # iniItems.cloneMap self.tmpDir = tmpDir # iniItems.tmpDir self.inputDir = Dir # iniItems.globalOptions['inputDir'] self.landmask = landmask # How many soil layers (excluding groundwater): self.numberOfLayers = int( configget(iniItems, "landSurfaceOptions", "numberOfUpperSoilLayers", "2") ) def read(self, iniItems, optionDict=None): self.readTopo(iniItems, optionDict) self.readSoil(iniItems, optionDict) def readTopo(self, iniItems, optionDict): # a dictionary/section of options that will be used if optionDict == None: optionDict = iniItems._sections["landSurfaceOptions"] # maps of elevation attributes: topoParams = ["tanslope", "slopeLength", "orographyBeta"] if optionDict["topographyNC"] == str(None): for var in topoParams: input = configget(iniItems, "landSurfaceOptions", str(var), "None") vars(self)[var] = vos.readPCRmapClone( input, self.cloneMap, self.tmpDir, self.inputDir ) if var != "slopeLength": vars(self)[var] = pcr.cover(vars(self)[var], 0.0) else: topoPropertiesNC = vos.getFullPath( optionDict["topographyNC"], self.inputDir ) for var in topoParams: vars(self)[var] = vos.netcdf2PCRobjCloneWithoutTime( topoPropertiesNC, var, cloneMapFileName=self.cloneMap ) if var != "slopeLength": vars(self)[var] = pcr.cover(vars(self)[var], 0.0) # ~ self.tanslope = pcr.max(self.tanslope, 0.00001) # In principle, tanslope can be zero. Zero tanslope will provide zero TCL (no interflow) # covering slopeLength with its maximum value self.slopeLength = pcr.cover(self.slopeLength, pcr.mapmaximum(self.slopeLength)) # maps of relative elevation above flood plains dzRel = [ "dzRel0001", "dzRel0005", "dzRel0010", "dzRel0020", "dzRel0030", "dzRel0040", "dzRel0050", "dzRel0060", "dzRel0070", "dzRel0080", "dzRel0090", "dzRel0100", ] if optionDict["topographyNC"] == str(None): for i in range(0, len(dzRel)): var = dzRel[i] input = optionDict[str(var)] vars(self)[var] = vos.readPCRmapClone( input, self.cloneMap, self.tmpDir, self.inputDir ) vars(self)[var] = pcr.cover(vars(self)[var], 0.0) if i > 0: vars(self)[var] = pcr.max(vars(self)[var], vars(self)[dzRel[i - 1]]) else: for i in range(0, len(dzRel)): var = dzRel[i] vars(self)[var] = vos.netcdf2PCRobjCloneWithoutTime( topoPropertiesNC, var, cloneMapFileName=self.cloneMap ) vars(self)[var] = pcr.cover(vars(self)[var], 0.0) if i > 0: vars(self)[var] = pcr.max(vars(self)[var], vars(self)[dzRel[i - 1]]) def readSoilMapOfFAO(self, iniItems, optionDict=None): # a dictionary/section of options that will be used if optionDict == None: optionDict = iniItems._sections[ "landSurfaceOptions" ] # iniItems.landSurfaceOptions # soil variable names given either in the ini or netCDF file: soilParameters = [ "airEntryValue1", "airEntryValue2", "poreSizeBeta1", "poreSizeBeta2", "resVolWC1", "resVolWC2", "satVolWC1", "satVolWC2", "KSat1", "KSat2", "percolationImp", ] if optionDict["soilPropertiesNC"] == str(None): for var in soilParameters: input = optionDict[str(var)] vars(self)[var] = vos.readPCRmapClone( input, self.cloneMap, self.tmpDir, self.inputDir ) vars(self)[var] = pcr.scalar(vars(self)[var]) if input == "percolationImp": vars(self)[var] = pcr.cover(vars(self)[var], 0.0) # extrapolation # - TODO: Make a general extrapolation option as a function in the virtualOS.py vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 0.75) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover(vars(self)[var], 0.0) else: soilPropertiesNC = vos.getFullPath( optionDict["soilPropertiesNC"], self.inputDir ) for var in soilParameters: vars(self)[var] = vos.netcdf2PCRobjCloneWithoutTime( soilPropertiesNC, var, cloneMapFileName=self.cloneMap ) if var == "percolationImp": vars(self)[var] = pcr.cover(vars(self)[var], 0.0) # extrapolation # - TODO: Make a general extrapolation option as a function in the virtualOS.py vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 0.75) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover( vars(self)[var], pcr.windowaverage(vars(self)[var], 1.00) ) vars(self)[var] = pcr.cover(vars(self)[var], 0.01) # make sure that resVolWC1 <= satVolWC1 self.resVolWC1 = pcr.min(self.resVolWC1, self.satVolWC1) self.resVolWC2 = pcr.min(self.resVolWC2, self.satVolWC2) if self.numberOfLayers == 2: self.satVolMoistContUpp = ( self.satVolWC1 ) # saturated volumetric moisture content (m3.m-3) self.satVolMoistContLow = self.satVolWC2 self.resVolMoistContUpp = ( self.resVolWC1 ) # residual volumetric moisture content (m3.m-3) self.resVolMoistContLow = self.resVolWC2 self.airEntryValueUpp = ( self.airEntryValue1 ) # air entry value (m) according to soil water retention curve of Clapp & Hornberger (1978) self.airEntryValueLow = self.airEntryValue2 self.poreSizeBetaUpp = ( self.poreSizeBeta1 ) # pore size distribution parameter according to Clapp & Hornberger (1978) self.poreSizeBetaLow = self.poreSizeBeta2 self.kSatUpp = self.KSat1 # saturated hydraulic conductivity (m.day-1) self.kSatLow = self.KSat2 if self.numberOfLayers == 3: self.satVolMoistContUpp000005 = self.satVolWC1 self.satVolMoistContUpp005030 = self.satVolWC1 self.satVolMoistContLow030150 = self.satVolWC2 self.resVolMoistContUpp000005 = self.resVolWC1 self.resVolMoistContUpp005030 = self.resVolWC1 self.resVolMoistContLow030150 = self.resVolWC2 self.airEntryValueUpp000005 = self.airEntryValue1 self.airEntryValueUpp005030 = self.airEntryValue1 self.airEntryValueLow030150 = self.airEntryValue2 self.poreSizeBetaUpp000005 = self.poreSizeBeta1 self.poreSizeBetaUpp005030 = self.poreSizeBeta1 self.poreSizeBetaLow030150 = self.poreSizeBeta2 self.kSatUpp000005 = self.KSat1 self.kSatUpp005030 = self.KSat1 self.kSatLow030150 = self.KSat2 self.percolationImp = pcr.cover( self.percolationImp, 0.0 ) # fractional area where percolation to groundwater store is impeded (dimensionless) # soil thickness and storage variable names # as given either in the ini or netCDF file: soilStorages = [ "firstStorDepth", "secondStorDepth", "soilWaterStorageCap1", "soilWaterStorageCap2", ] if optionDict["soilPropertiesNC"] == str(None): for var in soilStorages: input = optionDict[str(var)] temp = str(var) + "Inp" vars(self)[temp] = vos.readPCRmapClone( input, self.cloneMap, self.tmpDir, self.inputDir ) # extrapolation # - TODO: Make a general extrapolation option as a function in the virtualOS.py vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 0.75) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover(vars(self)[temp], 0.0) else: soilPropertiesNC = vos.getFullPath( optionDict["soilPropertiesNC"], self.inputDir ) for var in soilStorages: temp = str(var) + "Inp" vars(self)[temp] = vos.netcdf2PCRobjCloneWithoutTime( soilPropertiesNC, var, cloneMapFileName=self.cloneMap ) # extrapolation # - TODO: Make a general extrapolation option as a function in the virtualOS.py vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 0.75) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover( vars(self)[temp], pcr.windowaverage(vars(self)[temp], 1.05) ) vars(self)[temp] = pcr.cover(vars(self)[temp], 0.0) # layer thickness if self.numberOfLayers == 2: self.thickUpp = (0.30 / 0.30) * self.firstStorDepthInp self.thickLow = (1.20 / 1.20) * self.secondStorDepthInp if self.numberOfLayers == 3: self.thickUpp000005 = (0.05 / 0.30) * self.firstStorDepthInp self.thickUpp005030 = (0.25 / 0.30) * self.firstStorDepthInp self.thickLow030150 = (1.20 / 1.20) * self.secondStorDepthInp # soil storage if self.numberOfLayers == 2: # ~ self.storCapUpp = (0.30/0.30)*self.soilWaterStorageCap1Inp # ~ self.storCapLow = (1.20/1.20)*self.soilWaterStorageCap2Inp # 22 Feb 2014: We can calculate this based on thickness and porosity. self.storCapUpp = self.thickUpp * ( self.satVolMoistContUpp - self.resVolMoistContUpp ) self.storCapLow = self.thickLow * ( self.satVolMoistContLow - self.resVolMoistContLow ) self.rootZoneWaterStorageCap = ( self.storCapUpp + self.storCapLow ) # This is called as WMAX in the original pcrcalc script. if self.numberOfLayers == 3: self.storCapUpp000005 = self.thickUpp000005 * ( self.satVolMoistContUpp000005 - self.resVolMoistContUpp000005 ) self.storCapUpp005030 = self.thickUpp005030 * ( self.satVolMoistContUpp005030 - self.resVolMoistContUpp005030 ) self.storCapLow030150 = self.thickLow030150 * ( self.satVolMoistContLow030150 - self.resVolMoistContLow030150 ) self.rootZoneWaterStorageCap = ( self.storCapUpp000005 + self.storCapUpp005030 + self.storCapLow030150 ) def readSoil(self, iniItems, optionDict=None): # a dictionary/section of options that will be used if optionDict == None: optionDict = iniItems._sections[ "landSurfaceOptions" ] # iniItems.landSurfaceOptions # default values of soil parameters that are constant/uniform for the entire domain: self.clappAddCoeff = pcr.scalar(3.0) # dimensionless self.matricSuctionFC = pcr.scalar(1.0) # unit: m # ~ self.matricSuction50 = pcr.scalar(10./3.) # unit: m self.matricSuction50 = pcr.scalar(3.33) # unit: m self.matricSuctionWP = pcr.scalar(156.0) # unit: m self.maxGWCapRise = pcr.scalar(5.0) # unit: m # # values defined in the ini/configuration file: soilParameterConstants = [ "clappAddCoeff", "matricSuctionFC", "matricSuction50", "matricSuctionWP", "maxGWCapRise", ] for var in soilParameterConstants: if var in configsection(iniItems, "landSurfaceOptions"): input = configget(iniItems, "landSurfaceOptions", str(var), "None") vars(self)[var] = vos.readPCRmapClone( input, self.cloneMap, self.tmpDir, self.inputDir ) # read soil parameter based on the FAO soil map: self.readSoilMapOfFAO(iniItems, optionDict) # assign Campbell's (1974) beta coefficient, as well as degree # of saturation at field capacity and corresponding unsaturated hydraulic conductivity # if self.numberOfLayers == 2: self.campbellBetaUpp = ( self.poreSizeBetaUpp * 2.0 + self.clappAddCoeff ) # Campbell's (1974) coefficient ; Rens's line: BCB = 2*BCH + BCH_ADD self.campbellBetaLow = self.poreSizeBetaLow * 2.0 + self.clappAddCoeff self.effSatAtFieldCapUpp = ( self.matricSuctionFC / self.airEntryValueUpp ) ** ( -1.0 / self.poreSizeBetaUpp ) # saturation degree at field capacity : THEFF_FC = (PSI_FC/PSI_A)**(-1/BCH) self.effSatAtFieldCapUpp = pcr.cover(self.effSatAtFieldCapUpp, 1.0) self.effSatAtFieldCapLow = ( self.matricSuctionFC / self.airEntryValueLow ) ** (-1.0 / self.poreSizeBetaLow) self.effSatAtFieldCapLow = pcr.cover(self.effSatAtFieldCapLow, 1.0) self.kUnsatAtFieldCapUpp = pcr.max( 0., (self.effSatAtFieldCapUpp ** self.campbellBetaUpp) * self.kSatUpp ) # unsaturated conductivity at field capacity: KTHEFF_FC = max(0,THEFF_FC[TYPE]**BCB*KS1) self.kUnsatAtFieldCapLow = pcr.max( 0., (self.effSatAtFieldCapLow ** self.campbellBetaLow) * self.kSatLow ) # if self.numberOfLayers == 3: self.campbellBetaUpp000005 = ( self.poreSizeBetaUpp000005 * 2.0 + self.clappAddCoeff ) self.campbellBetaUpp005030 = ( self.poreSizeBetaUpp005030 * 2.0 + self.clappAddCoeff ) self.campbellBetaLow030150 = ( self.poreSizeBetaLow030150 * 2.0 + self.clappAddCoeff ) self.effSatAtFieldCapUpp000005 = ( self.matricSuctionFC / self.airEntryValueUpp000005 ) ** (-1.0 / self.poreSizeBetaUpp000005) self.effSatAtFieldCapUpp005030 = ( self.matricSuctionFC / self.airEntryValueUpp005030 ) ** (-1.0 / self.poreSizeBetaUpp005030) self.effSatAtFieldCapLow030150 = ( self.matricSuctionFC / self.airEntryValueLow030150 ) ** (-1.0 / self.poreSizeBetaLow030150) self.kUnsatAtFieldCapUpp000005 = pcr.max( 0., (self.effSatAtFieldCapUpp000005 ** self.campbellBetaUpp000005) * self.kSatUpp000005, ) self.kUnsatAtFieldCapUpp005030 = pcr.max( 0., (self.effSatAtFieldCapUpp005030 ** self.campbellBetaUpp005030) * self.kSatUpp005030, ) self.kUnsatAtFieldCapLow030150 = pcr.max( 0., (self.effSatAtFieldCapLow030150 ** self.campbellBetaLow030150) * self.kSatLow030150, ) # calculate degree of saturation at which transpiration is halved (50) # and at wilting point # if self.numberOfLayers == 2: self.effSatAt50Upp = (self.matricSuction50 / self.airEntryValueUpp) ** ( -1.0 / self.poreSizeBetaUpp ) self.effSatAt50Upp = pcr.cover(self.effSatAt50Upp, 1.0) self.effSatAt50Low = (self.matricSuction50 / self.airEntryValueLow) ** ( -1.0 / self.poreSizeBetaLow ) self.effSatAt50Low = pcr.cover(self.effSatAt50Low, 1.0) self.effSatAtWiltPointUpp = pcr.cover( (self.matricSuctionWP / self.airEntryValueUpp) ** (-1.0 / self.poreSizeBetaUpp), 1.0, ) self.effSatAtWiltPointLow = pcr.cover( (self.matricSuctionWP / self.airEntryValueLow) ** (-1.0 / self.poreSizeBetaLow), 1.0, ) if self.numberOfLayers == 3: self.effSatAt50Upp000005 = ( self.matricSuction50 / self.airEntryValueUpp000005 ) ** (-1.0 / self.poreSizeBetaUpp000005) self.effSatAt50Upp005030 = ( self.matricSuction50 / self.airEntryValueUpp005030 ) ** (-1.0 / self.poreSizeBetaUpp005030) self.effSatAt50Low030150 = ( self.matricSuction50 / self.airEntryValueLow030150 ) ** (-1.0 / self.poreSizeBetaLow030150) self.effSatAtWiltPointUpp000005 = ( self.matricSuctionWP / self.airEntryValueUpp000005 ) ** (-1.0 / self.poreSizeBetaUpp000005) self.effSatAtWiltPointUpp005030 = ( self.matricSuctionWP / self.airEntryValueUpp005030 ) ** (-1.0 / self.poreSizeBetaUpp005030) self.effSatAtWiltPointLow030150 = ( self.matricSuctionWP / self.airEntryValueLow030150 ) ** (-1.0 / self.poreSizeBetaLow030150) # calculate interflow parameter (TCL): # if self.numberOfLayers == 2: self.interflowConcTime = (self.kSatLow * self.tanslope * 2.0) / ( self.slopeLength * (1. - self.effSatAtFieldCapLow) * (self.satVolMoistContLow - self.resVolMoistContLow) ) # TCL = Duration*(2*KS2*TANSLOPE)/(LSLOPE*(1-THEFF2_FC)*(THETASAT2-THETARES2)) # if self.numberOfLayers == 3: self.interflowConcTime = (self.kSatLow030150 * self.tanslope * 2.0) / ( self.slopeLength * (1. - self.effSatAtFieldCapLow030150) * (self.satVolMoistContLow030150 - self.resVolMoistContLow030150) ) self.interflowConcTime = pcr.max(0.0, pcr.cover(self.interflowConcTime, 0.0))