// Copyright (C) Stichting Deltares 2025. All rights reserved. // // This file is part of the Dam Engine. // // The Dam Engine is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . // // All names, logos, and references to "Deltares" are registered trademarks of // Stichting Deltares and remain full property of Stichting Deltares at all times. // All rights reserved. using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using Deltares.DamEngine.Data.Standard.Calculation; using ThreadState = System.Threading.ThreadState; namespace Deltares.DamEngine.Calculators.General; /// /// Utility to run tasks in parallel /// public class Parallel { private static readonly Queue queue = new Queue(); private static TaskDelegate task; private static ProgressDelegate progress; private static int total; private static int executed; private static readonly List threads = new List(); private static readonly List processes = new List(); private static bool stopPressed; private static bool errorOccured; private static readonly bool limitToAvailableProcessors = true; /// /// Runs a number of tasks in parallel. Returns when all tasks have been executed. /// /// The objects on which the task is to be performed /// The task to be executed /// The maximum number of processes, will be equal to the number of cores if omitted /// Optional delegate indicating the progress of the tasks public static void Run(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate, int maxProcesses) { if (list.Count == 0) { return; } stopPressed = false; processes.Clear(); if (maxProcesses <= 0) { maxProcesses = Int32.MaxValue; } maxProcesses = Math.Min(list.Count, maxProcesses); if (limitToAvailableProcessors) { maxProcesses = Math.Min(Environment.ProcessorCount - 1, maxProcesses); } //DateTime start = DateTime.Now; if (progressDelegate != null) { progressDelegate(0); } if (maxProcesses == 1) { total = list.Count; RunSequential(list, parallelTask, progressDelegate); } else { try { queue.Clear(); task = parallelTask; progress = progressDelegate; foreach (object argument in list) { queue.Enqueue(argument); } total = queue.Count; executed = 0; threads.Clear(); for (var i = 0; i < maxProcesses; i++) { var thread = new Thread(RunTask); threads.Add(thread); thread.Start(Context.CurrentContext); // Pass the current context as task context } foreach (Thread thread in threads) { thread.Join(); } } catch (ThreadAbortException) { Abort(); } } if (errorOccured) { throw new ParallelException("Error occured in one of the parallel runs."); } //DateTime end = DateTime.Now; //TimeSpan period = end - start; //if (period.TotalMilliseconds > 0) //{ // string msg = (maxProcesses == 1 ? "Duration of tasks: " : "Parallel duration: "); // Console.WriteLine(msg + period.TotalMilliseconds + " ms"); //} } /// /// Kills all running and queued tasks /// public static void Abort() { stopPressed = true; foreach (Process process in processes.ToArray()) { if (!process.HasExited) { process.Kill(); } } foreach (Thread thread in threads) { switch (thread.ThreadState) { case ThreadState.Running: try { #pragma warning disable SYSLIB0006 thread.Abort(); #pragma warning restore SYSLIB0006 } catch (ThreadAbortException) { // ignore } break; case ThreadState.WaitSleepJoin: thread.Interrupt(); break; } } threads.Clear(); processes.Clear(); } private static void RunSequential(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate) { for (var i = 0; i < list.Count; i++) { parallelTask(list[i]); if (progressDelegate != null) { progressDelegate(Convert.ToDouble(i + 1) / Convert.ToDouble(total)); } } } private static void RunTask(object context) { object argument; // Set the context of the task thread Context.CurrentContext = context as IContext; while (true) { lock (queue) { if (!stopPressed && queue.Count > 0) { argument = queue.Dequeue(); } else { return; } } try { if (!stopPressed) { task(argument); } } catch (ThreadAbortException) { stopPressed = true; } catch (Exception e) { // most errors already reported to logfile while (e != null) // Extra instrumentation to trace errors occurring during parallel executions { // LogManager.Add(new LogMessage(LogMessageType.Trace, task, "Exception occurred in parallel run: " + e.Message + Environment.NewLine + // "Stacktrace: " + Environment.NewLine // + e.StackTrace)); ##BKA: replace with some other mechanism!? e = e.InnerException; } errorOccured = true; } finally { if (progress != null && !stopPressed) { lock (queue) { progress(Convert.ToDouble(++executed) / Convert.ToDouble(total)); } } } } } }