Index: DamEngine/trunk/src/Deltares.DamEngine.Calculators/General/Parallel.cs =================================================================== diff -u -r4031 -r4052 --- DamEngine/trunk/src/Deltares.DamEngine.Calculators/General/Parallel.cs (.../Parallel.cs) (revision 4031) +++ DamEngine/trunk/src/Deltares.DamEngine.Calculators/General/Parallel.cs (.../Parallel.cs) (revision 4052) @@ -27,228 +27,227 @@ using Deltares.DamEngine.Data.Standard.Calculation; using ThreadState = System.Threading.ThreadState; -namespace Deltares.DamEngine.Calculators.General +namespace Deltares.DamEngine.Calculators.General; + +/// +/// Utility to run tasks in parallel +/// +public class Parallel { + private static readonly Queue queue = new Queue(); + private static TaskDelegate task; + private static ProgressDelegate progress; + private static int total; + private static int executed; + private static readonly List threads = new List(); + private static readonly List processes = new List(); + private static bool stopPressed; + private static bool errorOccured; + private static readonly bool limitToAvailableProcessors = true; + /// - /// Utility to run tasks in parallel + /// Runs a number of tasks in parallel. Returns when all tasks have been executed. /// - public class Parallel + /// The objects on which the task is to be performed + /// The task to be executed + /// The maximum number of processes, will be equal to the number of cores if omitted + /// Optional delegate indicating the progress of the tasks + public static void Run(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate, int maxProcesses) { - private static readonly Queue queue = new Queue(); - private static TaskDelegate task; - private static ProgressDelegate progress; - private static int total; - private static int executed; - private static readonly List threads = new List(); - private static readonly List processes = new List(); - private static bool stopPressed; - private static bool errorOccured; - private static readonly bool limitToAvailableProcessors = true; - - /// - /// Runs a number of tasks in parallel. Returns when all tasks have been executed. - /// - /// The objects on which the task is to be performed - /// The task to be executed - /// The maximum number of processes, will be equal to the number of cores if omitted - /// Optional delegate indicating the progress of the tasks - public static void Run(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate, int maxProcesses) + if (list.Count == 0) { - if (list.Count == 0) - { - return; - } + return; + } - stopPressed = false; - processes.Clear(); + stopPressed = false; + processes.Clear(); - if (maxProcesses <= 0) - { - maxProcesses = Int32.MaxValue; - } + if (maxProcesses <= 0) + { + maxProcesses = Int32.MaxValue; + } - maxProcesses = Math.Min(list.Count, maxProcesses); + maxProcesses = Math.Min(list.Count, maxProcesses); - if (limitToAvailableProcessors) - { - maxProcesses = Math.Min(Environment.ProcessorCount, maxProcesses); - } + if (limitToAvailableProcessors) + { + maxProcesses = Math.Min(Environment.ProcessorCount, maxProcesses); + } - //DateTime start = DateTime.Now; + //DateTime start = DateTime.Now; - if (progressDelegate != null) - { - progressDelegate(0); - } + if (progressDelegate != null) + { + progressDelegate(0); + } - if (maxProcesses == 1) + if (maxProcesses == 1) + { + total = list.Count; + RunSequential(list, parallelTask, progressDelegate); + } + else + { + try { - total = list.Count; - RunSequential(list, parallelTask, progressDelegate); - } - else - { - try + queue.Clear(); + task = parallelTask; + progress = progressDelegate; + + foreach (object argument in list) { - queue.Clear(); - task = parallelTask; - progress = progressDelegate; + queue.Enqueue(argument); + } - foreach (object argument in list) - { - queue.Enqueue(argument); - } + total = queue.Count; + executed = 0; - total = queue.Count; - executed = 0; + threads.Clear(); + for (var i = 0; i < maxProcesses; i++) + { + var thread = new Thread(RunTask); + threads.Add(thread); - threads.Clear(); - for (var i = 0; i < maxProcesses; i++) + thread.Start(Context.CurrentContext); // Pass the current context as task context + if (i == 0) { - var thread = new Thread(RunTask); - threads.Add(thread); - - thread.Start(Context.CurrentContext); // Pass the current context as task context - if (i == 0) - { - // Delay was there to prevent problems with DGSMStabDam.dll initilializing muliple times after the first thread - // If this was not included, DGSMStabDam.dll crashed. So this Sleep could probably be removed now that - // DGSMStabDam.dll is no longer used. Keep it here for now, later halve it first then try without. - Thread.Sleep(1000); - } + // Delay was there to prevent problems with DGSMStabDam.dll initilializing muliple times after the first thread + // If this was not included, DGSMStabDam.dll crashed. So this Sleep could probably be removed now that + // DGSMStabDam.dll is no longer used. Keep it here for now, later halve it first then try without. + Thread.Sleep(1000); } - - foreach (Thread thread in threads) - { - thread.Join(); - } } - catch (ThreadAbortException) + + foreach (Thread thread in threads) { - Abort(); + thread.Join(); } } - - if (errorOccured) + catch (ThreadAbortException) { - throw new ParallelException("Error occured in one of the parallel runs."); + Abort(); } - //DateTime end = DateTime.Now; - //TimeSpan period = end - start; - - //if (period.TotalMilliseconds > 0) - //{ - // string msg = (maxProcesses == 1 ? "Duration of tasks: " : "Parallel duration: "); - // Console.WriteLine(msg + period.TotalMilliseconds + " ms"); - //} } - /// - /// Kills all running and queued tasks - /// - public static void Abort() + if (errorOccured) { - stopPressed = true; + throw new ParallelException("Error occured in one of the parallel runs."); + } + //DateTime end = DateTime.Now; + //TimeSpan period = end - start; - foreach (Process process in processes.ToArray()) + //if (period.TotalMilliseconds > 0) + //{ + // string msg = (maxProcesses == 1 ? "Duration of tasks: " : "Parallel duration: "); + // Console.WriteLine(msg + period.TotalMilliseconds + " ms"); + //} + } + + /// + /// Kills all running and queued tasks + /// + public static void Abort() + { + stopPressed = true; + + foreach (Process process in processes.ToArray()) + { + if (!process.HasExited) { - if (!process.HasExited) - { - process.Kill(); - } + process.Kill(); } + } - foreach (Thread thread in threads) + foreach (Thread thread in threads) + { + switch (thread.ThreadState) { - switch (thread.ThreadState) - { - case ThreadState.Running: - try - { - thread.Abort(); - } - catch (ThreadAbortException) - { - // ignore - } + case ThreadState.Running: + try + { + thread.Abort(); + } + catch (ThreadAbortException) + { + // ignore + } - break; - case ThreadState.WaitSleepJoin: - thread.Interrupt(); - break; - } + break; + case ThreadState.WaitSleepJoin: + thread.Interrupt(); + break; } - - threads.Clear(); - processes.Clear(); } - private static void RunSequential(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate) + threads.Clear(); + processes.Clear(); + } + + private static void RunSequential(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate) + { + for (var i = 0; i < list.Count; i++) { - for (var i = 0; i < list.Count; i++) - { - parallelTask(list[i]); + parallelTask(list[i]); - if (progressDelegate != null) - { - progressDelegate(Convert.ToDouble(i + 1) / Convert.ToDouble(total)); - } + if (progressDelegate != null) + { + progressDelegate(Convert.ToDouble(i + 1) / Convert.ToDouble(total)); } } + } - private static void RunTask(object context) - { - object argument; + private static void RunTask(object context) + { + object argument; - // Set the context of the task thread - Context.CurrentContext = context as IContext; + // Set the context of the task thread + Context.CurrentContext = context as IContext; - while (true) + while (true) + { + lock (queue) { - lock (queue) + if (!stopPressed && queue.Count > 0) { - if (!stopPressed && queue.Count > 0) - { - argument = queue.Dequeue(); - } - else - { - return; - } + argument = queue.Dequeue(); } - - try + else { - if (!stopPressed) - { - task(argument); - } + return; } - catch (ThreadAbortException) + } + + try + { + if (!stopPressed) { - stopPressed = true; + task(argument); } - catch (Exception e) + } + catch (ThreadAbortException) + { + stopPressed = true; + } + catch (Exception e) + { + // most errors already reported to logfile + while (e != null) // Extra instrumentation to trace errors occurring during parallel executions { - // most errors already reported to logfile - while (e != null) // Extra instrumentation to trace errors occurring during parallel executions - { // LogManager.Add(new LogMessage(LogMessageType.Trace, task, "Exception occurred in parallel run: " + e.Message + Environment.NewLine + // "Stacktrace: " + Environment.NewLine // + e.StackTrace)); ##BKA: replace with some other mechanism!? - e = e.InnerException; - } - - errorOccured = true; + e = e.InnerException; } - finally + + errorOccured = true; + } + finally + { + if (progress != null && !stopPressed) { - if (progress != null && !stopPressed) + lock (queue) { - lock (queue) - { - progress(Convert.ToDouble(++executed) / Convert.ToDouble(total)); - } + progress(Convert.ToDouble(++executed) / Convert.ToDouble(total)); } } }