Index: DamClients/DamPythonInterface/trunk/src/tests/test_dampythonframework.py =================================================================== diff -u -r3495 -r3529 --- DamClients/DamPythonInterface/trunk/src/tests/test_dampythonframework.py (.../test_dampythonframework.py) (revision 3495) +++ DamClients/DamPythonInterface/trunk/src/tests/test_dampythonframework.py (.../test_dampythonframework.py) (revision 3529) @@ -19,23 +19,377 @@ # Stichting Deltares and remain full property of Stichting Deltares at all times. # All rights reserved. - +from trunk.src.dampythoninterface.location import Location +from trunk.src.dampythoninterface.soil import Soil +from trunk.src.dampythoninterface.surface_line import Point from dampythoninterface import __version__ -import dampythoninterface +import dampythoninterface as dpi +from .utils import TestUtils +import pandas as pd +from pathlib import Path import pytest -@pytest.mark.acceptance -def test_version(): - assert __version__ == "0.1.0" +class TestDamPythonInterface: + @pytest.mark.acceptance + def test_version(self): + assert __version__ == "0.1.0" + @pytest.mark.unittest + def test_imports(self): + assert dpi.surface_line + assert dpi.segment + assert dpi.soil + assert dpi.stability_parameters + assert dpi.soilprofile1D + assert dpi.input -@pytest.mark.unittest -def test_imports(): - assert dampythoninterface.surface_line - assert dampythoninterface.segment - assert dampythoninterface.soil - assert dampythoninterface.stability_parameters - assert dampythoninterface.soilprofile1D - assert dampythoninterface.input + +class TestTutorial: + def get_failure_mechanism(self, soil_profile: str): + import dampythoninterface as dpi + + output_dictionary = { + "All": dpi.SegmentFailureMechanismTypeClass.All, + "Stability": dpi.SegmentFailureMechanismTypeClass.Stability, + "Piping": dpi.SegmentFailureMechanismTypeClass.Piping, + "Liquefaction": dpi.SegmentFailureMechanismTypeClass.Liquefaction, + } + return output_dictionary[soil_profile] + + def read_segment_csv_file(self, file_name: str): + # read csv file using pandas + segment_csv = pd.read_csv(file_name, delimiter=";") + # initialize segment list that will be used for the dam input + segments_list = [] + # Loop through all unique segment names of the csv + for segment_id in segment_csv["segment_id"].unique(): + # loop through all different probabilities and create a list of the SoilGeometryProbabilityElement + list_of_probability_elements = [] + for csv_row in segment_csv[segment_csv["segment_id"] == segment_id].index: + list_of_probability_elements.append( + dpi.SoilGeometryProbabilityElement( + **{ + "SoilProfileName": segment_csv["soilprofile_id"][csv_row], + "SoilProfileType": dpi.SoilProfileTypeClass.ProfileType1D, + "Probability": segment_csv["probability"][csv_row], + "SegmentFailureMechanismType": self.get_failure_mechanism( + segment_csv["calculation_type"][csv_row] + ), + } + ) + ) + # create segment and append it to the segment list + segments_list.append( + dpi.Segment( + Name=str(segment_id), + SoilGeometryProbability=list_of_probability_elements, + ) + ) + return segments_list + + def read_surface_lines_csv(self, file_name: str): + # read csv file using pandas + surface_lines_csv = pd.read_csv(file_name, delimiter=";", index_col=False) + surface_lines = {} + # loop though all locations + for location_id in surface_lines_csv["LOCATIONID"]: + # get row of one of the locations + row_csv = surface_lines_csv[surface_lines_csv["LOCATIONID"] == location_id] + row_csv = row_csv.values[0][1:] + # reshape row so that each point is separated + row_csv = row_csv.reshape((-1, 3)) + # create list of points all values all values have a none type + surface_lines[location_id] = [] + for point in row_csv: + # check if point is defined or if it is nan + if not (str(point[0]) == "nan") or not (str(point[-1]) == "nan"): + surface_lines[location_id].append( + dpi.Point( + X=point[0], Z=point[-1], PointType=dpi.PointTypeEnum.NONE + ) + ) + return surface_lines + + def get_characteristic_point_type(self, point_type: str): + output_dictionary = { + "Maaiveld buitenwaarts": dpi.PointTypeEnum.SurfaceLevelOutside, + "Teen dijk buitenwaarts": dpi.PointTypeEnum.DikeToeAtRiver, + "Kruin buitenberm": dpi.PointTypeEnum.ShoulderTopOutside, + "Insteek buitenberm": dpi.PointTypeEnum.ShoulderBaseOutside, + "Kruin buitentalud": dpi.PointTypeEnum.DikeTopAtRiver, + "referentielijn": dpi.PointTypeEnum.DikeLine, + "Verkeersbelasting kant buitenwaarts": dpi.PointTypeEnum.TrafficLoadOutside, + "Verkeersbelasting kant binnenwaarts": dpi.PointTypeEnum.TrafficLoadInside, + "Kruin binnentalud": dpi.PointTypeEnum.DikeTopAtPolder, + "Insteek binnenberm": dpi.PointTypeEnum.ShoulderBaseInside, + "Kruin binnenberm": dpi.PointTypeEnum.ShoulderTopInside, + "Teen dijk binnenwaarts": dpi.PointTypeEnum.DikeToeAtPolder, + "Insteek sloot dijkzijde": dpi.PointTypeEnum.DitchDikeSide, + "Slootbodem dijkzijde": dpi.PointTypeEnum.BottomDitchDikeSide, + "Slootbodem polderzijde": dpi.PointTypeEnum.BottomDitchPolderSide, + "Insteek sloot polderzijde": dpi.PointTypeEnum.DitchPolderSide, + "Maaiveld binnenwaarts": dpi.PointTypeEnum.SurfaceLevelInside, + } + return output_dictionary.get(point_type, dpi.PointTypeEnum.NONE) + + def read_characteristic_points(self, file_name: str): + # read csv file using pandas + characteristic_points_csv = pd.read_csv(file_name, delimiter=";") + column_groups = [ + char_type_point.split("_")[-1] + for char_type_point in characteristic_points_csv.columns[1:] + ] + # remove duplicates + column_groups = list(dict.fromkeys(column_groups)) + characteristic_points_results = {} + # loop though all locations + for characteristic_points in characteristic_points_csv["LOCATIONID"]: + row_csv = characteristic_points_csv[ + characteristic_points_csv["LOCATIONID"] == characteristic_points + ] + characteristic_points_results[characteristic_points] = [] + for characteristic_point_type in column_groups: + # get relevant columns + relevant_columns = [ + char_type_point + for char_type_point in characteristic_points_csv.columns + if characteristic_point_type in char_type_point + ] + char_point = row_csv[relevant_columns] + if not (char_point.isin([-1]).sum().sum() == 3): + characteristic_points_results[characteristic_points].append( + Point( + X=list(char_point["X_" + characteristic_point_type])[0], + Z=list(char_point["Z_" + characteristic_point_type])[0], + PointType=self.get_characteristic_point_type( + characteristic_point_type + ), + ) + ) + return characteristic_points_results + + def get_X(self, d): + return d.X + + def read_profile(self, file_name: str): + # read csv file using pandas + profile_csv = pd.read_csv(file_name, delimiter=";") + # initialize profile list that will be used for the dam input + profile_list = [] + # Loop through all unique profile names of the csv + for profile_id in profile_csv["soilprofile_id"].unique(): + list_of_1D_layers = [] + for csv_row in profile_csv[ + profile_csv["soilprofile_id"] == profile_id + ].index: + list_of_1D_layers.append( + dpi.Layer1D( + **{ + "Name": profile_csv["soil_name"][csv_row], + "SoilName": profile_csv["soil_name"][csv_row], + "TopLevel": profile_csv["top_level"][csv_row], + "IsAquifer": False, + "WaterpressureInterpolationModel": dpi.WaterpressureInterpolationModelType.Automatic, + } + ) + ) + # create profile and append it to the profile list + profile_list.append( + dpi.SoilProfile1D( + Name=str(profile_id), + BottomLevel=-30, + Layers1D=list_of_1D_layers, + ) + ) + return profile_list + + def get_zone_type(self, csv_zone_type: str): + dictionary = { + "NoZones": dpi.ZoneTypeClass.NoZones, + "ZoneAreas": dpi.ZoneTypeClass.ZoneAreas, + "ForbiddenZones": dpi.ZoneTypeClass.ForbiddenZones, + } + return dictionary[csv_zone_type] + + def read_locations_csv(self, location_csv_name: str, scenario_csv_name: str): + # read csv file using pandas + location_csv = pd.read_csv(location_csv_name, delimiter=";") + scenario_csv = pd.read_csv(scenario_csv_name, delimiter=";") + merged_csv = pd.merge(location_csv, scenario_csv, on="location_id") + # initialize location list that will be used for the dam input + locations_list = [] + # Loop through all unique profile names of the csv + for index, row in merged_csv.iterrows(): + # initialize DesignScenario + design_scenario = dpi.DesignScenario( + Id=str(index), + PolderLevel=row["polderlevel"], + HeadPl2=row["head_pl2"], + HeadPl3=row["head_pl3"], + PlLineOffsetBelowDikeTopAtRiver=row["PLLineOffsetBelowDikeTopAtRiver"], + PlLineOffsetBelowDikeTopAtPolder=row[ + "PLLineOffsetBelowDikeTopAtPolder" + ], + PlLineOffsetBelowShoulderBaseInside=row[ + "PLLineOffsetBelowShoulderBaseInside" + ], + PlLineOffsetBelowDikeToeAtPolder=row[ + "PLLIneOffsetBelowDikeToeAtPolder" + ], + RiverLevel=row["water_height"], + RiverLevelLow=row["water_height_low"], + DikeTableHeight=row["dike_table_height"], + RequiredSafetyFactorStabilityInnerSlope=row[ + "safety_factor_stability_inner_slope" + ], + RequiredSafetyFactorStabilityOuterSlope=row[ + "safety_factor_stability_outer_slope" + ], + UpliftCriterionPiping=row["uplift_criterion_piping"], + UpliftCriterionStability=row["uplift_criterion_stability"], + RequiredSafetyFactorPiping=row["safety_factor_piping"], + ) + # initialize design options class + design_options = dpi.DesignOptions( + ShoulderEmbankmentMaterial=row["ophoogmateriaalberm"], + StabilityShoulderGrowSlope=row["StabilityShoulderGrowSlope"], + StabilityShoulderGrowDeltaX=row["StabilityShoulderGrowDeltaX"], + StabilitySlopeAdaptionDeltaX=row["StabilitySlopeAdaptionDeltaX"], + NewDikeTopWidth=row["NewDikeTopWidth"], + NewDikeSlopeInside=row["NewDikeSlopeInside"], + NewDikeSlopeOutside=row["NewDikeSlopeOutside"], + NewShoulderTopSlope=row["NewShoulderTopSlope"], + NewShoulderBaseSlope=row["NewShoulderBaseSlope"], + NewMaxHeightShoulderAsFraction=row["NewMaxHeightShoulderAsFraction"], + NewMinDistanceDikeToeStartDitch=row["NewMinDistanceDikeToeStartDitch"], + UseNewDitchDefinition=row["UseNewDitchDefinition"], + NewWidthDitchBottom=row["NewWidthDitchBottom"], + NewDepthDitch=row["NewDepthDitch"], + NewSlopeAngleDitch=row["NewSlopeAngleDitch"], + RedesignDikeHeight=0, + RedesignDikeShoulder=0, + SlopeAdaptionStartCotangent=1, + SlopeAdaptionEndCotangent=1, + SlopeAdaptionStepCotangent=1, + StabilityDesignMethod=dpi.StabilityDesignMethodType.OptimizedSlopeAndShoulderAdaption, + ) + # Initialize stability options + stability_options = dpi.StabilityOptions( + MinimumCircleDepth=row["minimal_circle_depth"], + TrafficLoad=row["trafficload"], + ZoneType=self.get_zone_type(row["ZoneType"]), + ) + # initialize waternet options + waternet_options = dpi.WaternetOptions( + DampingFactorPl3=row["dempingsfactor_pl3"], + DampingFactorPl4=row["dempingsfactor_pl4"], + PhreaticLineCreationMethod=row["PLLineCreationMethod"], + PenetrationLength=0, + SlopeDampingFactor=1, + IntrusionVerticalWaterPressure=dpi.IntrusionVerticalWaterPressureType.Standard, + DikeSoilScenario=dpi.DikeSoilScenarioType.ClayDikeOnClay, + ) + # initialize location class + location = dpi.Location( + XSoilGeometry2DOrigin=row["x_soilgeometry2D_origin"], + SurfaceLineName=row["surfaceline_id"], + SegmentName=row["segment_id"], + DikeEmbankmentMaterial=row["ophoogmateriaaldijk"], + Name=row["location_id"], + DesignOptions=design_options, + DesignScenarios=[design_scenario], + StabilityOptions=stability_options, + WaternetOptions=waternet_options, + ) + locations_list.append(location) + return locations_list + + def read_soilmaterials_csv(self, soilmaterials_csv_name: str): + # read csv file using pandas + soils_csv = pd.read_csv(soilmaterials_csv_name, delimiter=";") + # initialize soil list that will be used for the dam input + soils_list = [] + # Loop through all unique soil names of the csv + for index, row in soils_csv.iterrows(): + soil = dpi.Soil( + Name=row["Name"], + BelowPhreaticLevel=row["Saturatedunitweight"], + AbovePhreaticLevel=row["Unsaturatedunitweight"], + Cohesion=row["Cohesion"], + FrictionAngle=row["Frictionangle"], + DiameterD70=row["DiameterD70"], + PermeabKx=row["PermeabilityX"], + ShearStrengthModel=dpi.ShearStrengthModelType.CPhi, + StrengthIncreaseExponent=row["Strengthincreaseexp(m)"], + RatioCuPc=row["RatioSu/Pc"], + ) + soils_list.append(soil) + return soils_list + + @pytest.mark.acceptance + def test_tutorial_read_csv_files(self): + segments_list = self.read_segment_csv_file( + "trunk\\src\\tests\\test_data\\csvfiles\\segments.csv" + ) + surface_lines = self.read_surface_lines_csv( + "trunk\\src\\tests\\test_data\\csvfiles\\surfacelines.csv" + ) + characteristic_points = self.read_characteristic_points( + "trunk\\src\\tests\\test_data\\csvfiles\\characteristicpoints.csv" + ) + # merge the two dictionaries to create surface line + for key, value in surface_lines.items(): + surface_lines[key] += characteristic_points.get(key, []) + # sort points based on X coordinate + surface_lines[key].sort(key=self.get_X) + # all the points of the surface lines are merged so the surface lines list can be created + surface_lines_dam_input = [] + for location, points in surface_lines.items(): + surface_lines_dam_input.append( + dpi.SurfaceLine(Name=location, Points=points) + ) + # the soil profile should also be created + # In this case only 1D segments will be inputted + profiles = self.read_profile( + "trunk\\src\\tests\\test_data\\csvfiles\\soilprofiles.csv" + ) + # read location csv + locations = self.read_locations_csv( + "trunk\\src\\tests\\test_data\\csvfiles\\locations.csv", + "trunk\\src\\tests\\test_data\\csvfiles\\scenarios.csv", + ) + # read soil materials csv + soils = self.read_soilmaterials_csv( + "trunk\\src\\tests\\test_data\\csvfiles\\soilmaterials.csv" + ) + # create stability parameters + stability_parameter = dpi.StabilityParameters( + SearchMethod=dpi.SearchMethodType.Calculationgrid, + GridDetermination=dpi.GridDeterminationType.Automatic, + BishopTangentLinesDefinition=dpi.BishopTangentLinesDefinitionType.OnBoundaryLines, + ) + # create dam input + dam_input = dpi.DamInput( + SurfaceLines=surface_lines_dam_input, + Soils=soils, + SoilProfiles1D=profiles, + Segments=segments_list, + Locations=locations, + FailureMechanismSystemType=dpi.FailureMechanismSystem.StabilityInside, + StabilityModelType=dpi.StabilityType.UpliftVan, + ProjectPath=Path(""), + StabilityParameters=stability_parameter, + ) + + xml_dam_input = Path( + TestUtils.get_output_test_data_dir(""), "InputTutorialFile.xml" + ) + dam_input.ExportToXml(xml_file=xml_dam_input) + xml_dam_output = Path( + TestUtils.get_output_test_data_dir(""), "OutputTutorialxml.xml" + ) + result_code = dam_input.execute( + xml_input_file=xml_dam_input, xml_output_file=xml_dam_output + )