Index: DamEngine/trunk/src/Deltares.DamEngine.Calculators/General/Parallel.cs
===================================================================
diff -u -r4031 -r4052
--- DamEngine/trunk/src/Deltares.DamEngine.Calculators/General/Parallel.cs (.../Parallel.cs) (revision 4031)
+++ DamEngine/trunk/src/Deltares.DamEngine.Calculators/General/Parallel.cs (.../Parallel.cs) (revision 4052)
@@ -27,228 +27,227 @@
using Deltares.DamEngine.Data.Standard.Calculation;
using ThreadState = System.Threading.ThreadState;
-namespace Deltares.DamEngine.Calculators.General
+namespace Deltares.DamEngine.Calculators.General;
+
+///
+/// Utility to run tasks in parallel
+///
+public class Parallel
{
+ private static readonly Queue queue = new Queue();
+ private static TaskDelegate task;
+ private static ProgressDelegate progress;
+ private static int total;
+ private static int executed;
+ private static readonly List threads = new List();
+ private static readonly List processes = new List();
+ private static bool stopPressed;
+ private static bool errorOccured;
+ private static readonly bool limitToAvailableProcessors = true;
+
///
- /// Utility to run tasks in parallel
+ /// Runs a number of tasks in parallel. Returns when all tasks have been executed.
///
- public class Parallel
+ /// The objects on which the task is to be performed
+ /// The task to be executed
+ /// The maximum number of processes, will be equal to the number of cores if omitted
+ /// Optional delegate indicating the progress of the tasks
+ public static void Run(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate, int maxProcesses)
{
- private static readonly Queue queue = new Queue();
- private static TaskDelegate task;
- private static ProgressDelegate progress;
- private static int total;
- private static int executed;
- private static readonly List threads = new List();
- private static readonly List processes = new List();
- private static bool stopPressed;
- private static bool errorOccured;
- private static readonly bool limitToAvailableProcessors = true;
-
- ///
- /// Runs a number of tasks in parallel. Returns when all tasks have been executed.
- ///
- /// The objects on which the task is to be performed
- /// The task to be executed
- /// The maximum number of processes, will be equal to the number of cores if omitted
- /// Optional delegate indicating the progress of the tasks
- public static void Run(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate, int maxProcesses)
+ if (list.Count == 0)
{
- if (list.Count == 0)
- {
- return;
- }
+ return;
+ }
- stopPressed = false;
- processes.Clear();
+ stopPressed = false;
+ processes.Clear();
- if (maxProcesses <= 0)
- {
- maxProcesses = Int32.MaxValue;
- }
+ if (maxProcesses <= 0)
+ {
+ maxProcesses = Int32.MaxValue;
+ }
- maxProcesses = Math.Min(list.Count, maxProcesses);
+ maxProcesses = Math.Min(list.Count, maxProcesses);
- if (limitToAvailableProcessors)
- {
- maxProcesses = Math.Min(Environment.ProcessorCount, maxProcesses);
- }
+ if (limitToAvailableProcessors)
+ {
+ maxProcesses = Math.Min(Environment.ProcessorCount, maxProcesses);
+ }
- //DateTime start = DateTime.Now;
+ //DateTime start = DateTime.Now;
- if (progressDelegate != null)
- {
- progressDelegate(0);
- }
+ if (progressDelegate != null)
+ {
+ progressDelegate(0);
+ }
- if (maxProcesses == 1)
+ if (maxProcesses == 1)
+ {
+ total = list.Count;
+ RunSequential(list, parallelTask, progressDelegate);
+ }
+ else
+ {
+ try
{
- total = list.Count;
- RunSequential(list, parallelTask, progressDelegate);
- }
- else
- {
- try
+ queue.Clear();
+ task = parallelTask;
+ progress = progressDelegate;
+
+ foreach (object argument in list)
{
- queue.Clear();
- task = parallelTask;
- progress = progressDelegate;
+ queue.Enqueue(argument);
+ }
- foreach (object argument in list)
- {
- queue.Enqueue(argument);
- }
+ total = queue.Count;
+ executed = 0;
- total = queue.Count;
- executed = 0;
+ threads.Clear();
+ for (var i = 0; i < maxProcesses; i++)
+ {
+ var thread = new Thread(RunTask);
+ threads.Add(thread);
- threads.Clear();
- for (var i = 0; i < maxProcesses; i++)
+ thread.Start(Context.CurrentContext); // Pass the current context as task context
+ if (i == 0)
{
- var thread = new Thread(RunTask);
- threads.Add(thread);
-
- thread.Start(Context.CurrentContext); // Pass the current context as task context
- if (i == 0)
- {
- // Delay was there to prevent problems with DGSMStabDam.dll initilializing muliple times after the first thread
- // If this was not included, DGSMStabDam.dll crashed. So this Sleep could probably be removed now that
- // DGSMStabDam.dll is no longer used. Keep it here for now, later halve it first then try without.
- Thread.Sleep(1000);
- }
+ // Delay was there to prevent problems with DGSMStabDam.dll initilializing muliple times after the first thread
+ // If this was not included, DGSMStabDam.dll crashed. So this Sleep could probably be removed now that
+ // DGSMStabDam.dll is no longer used. Keep it here for now, later halve it first then try without.
+ Thread.Sleep(1000);
}
-
- foreach (Thread thread in threads)
- {
- thread.Join();
- }
}
- catch (ThreadAbortException)
+
+ foreach (Thread thread in threads)
{
- Abort();
+ thread.Join();
}
}
-
- if (errorOccured)
+ catch (ThreadAbortException)
{
- throw new ParallelException("Error occured in one of the parallel runs.");
+ Abort();
}
- //DateTime end = DateTime.Now;
- //TimeSpan period = end - start;
-
- //if (period.TotalMilliseconds > 0)
- //{
- // string msg = (maxProcesses == 1 ? "Duration of tasks: " : "Parallel duration: ");
- // Console.WriteLine(msg + period.TotalMilliseconds + " ms");
- //}
}
- ///
- /// Kills all running and queued tasks
- ///
- public static void Abort()
+ if (errorOccured)
{
- stopPressed = true;
+ throw new ParallelException("Error occured in one of the parallel runs.");
+ }
+ //DateTime end = DateTime.Now;
+ //TimeSpan period = end - start;
- foreach (Process process in processes.ToArray())
+ //if (period.TotalMilliseconds > 0)
+ //{
+ // string msg = (maxProcesses == 1 ? "Duration of tasks: " : "Parallel duration: ");
+ // Console.WriteLine(msg + period.TotalMilliseconds + " ms");
+ //}
+ }
+
+ ///
+ /// Kills all running and queued tasks
+ ///
+ public static void Abort()
+ {
+ stopPressed = true;
+
+ foreach (Process process in processes.ToArray())
+ {
+ if (!process.HasExited)
{
- if (!process.HasExited)
- {
- process.Kill();
- }
+ process.Kill();
}
+ }
- foreach (Thread thread in threads)
+ foreach (Thread thread in threads)
+ {
+ switch (thread.ThreadState)
{
- switch (thread.ThreadState)
- {
- case ThreadState.Running:
- try
- {
- thread.Abort();
- }
- catch (ThreadAbortException)
- {
- // ignore
- }
+ case ThreadState.Running:
+ try
+ {
+ thread.Abort();
+ }
+ catch (ThreadAbortException)
+ {
+ // ignore
+ }
- break;
- case ThreadState.WaitSleepJoin:
- thread.Interrupt();
- break;
- }
+ break;
+ case ThreadState.WaitSleepJoin:
+ thread.Interrupt();
+ break;
}
-
- threads.Clear();
- processes.Clear();
}
- private static void RunSequential(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate)
+ threads.Clear();
+ processes.Clear();
+ }
+
+ private static void RunSequential(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate)
+ {
+ for (var i = 0; i < list.Count; i++)
{
- for (var i = 0; i < list.Count; i++)
- {
- parallelTask(list[i]);
+ parallelTask(list[i]);
- if (progressDelegate != null)
- {
- progressDelegate(Convert.ToDouble(i + 1) / Convert.ToDouble(total));
- }
+ if (progressDelegate != null)
+ {
+ progressDelegate(Convert.ToDouble(i + 1) / Convert.ToDouble(total));
}
}
+ }
- private static void RunTask(object context)
- {
- object argument;
+ private static void RunTask(object context)
+ {
+ object argument;
- // Set the context of the task thread
- Context.CurrentContext = context as IContext;
+ // Set the context of the task thread
+ Context.CurrentContext = context as IContext;
- while (true)
+ while (true)
+ {
+ lock (queue)
{
- lock (queue)
+ if (!stopPressed && queue.Count > 0)
{
- if (!stopPressed && queue.Count > 0)
- {
- argument = queue.Dequeue();
- }
- else
- {
- return;
- }
+ argument = queue.Dequeue();
}
-
- try
+ else
{
- if (!stopPressed)
- {
- task(argument);
- }
+ return;
}
- catch (ThreadAbortException)
+ }
+
+ try
+ {
+ if (!stopPressed)
{
- stopPressed = true;
+ task(argument);
}
- catch (Exception e)
+ }
+ catch (ThreadAbortException)
+ {
+ stopPressed = true;
+ }
+ catch (Exception e)
+ {
+ // most errors already reported to logfile
+ while (e != null) // Extra instrumentation to trace errors occurring during parallel executions
{
- // most errors already reported to logfile
- while (e != null) // Extra instrumentation to trace errors occurring during parallel executions
- {
// LogManager.Add(new LogMessage(LogMessageType.Trace, task, "Exception occurred in parallel run: " + e.Message + Environment.NewLine +
// "Stacktrace: " + Environment.NewLine
// + e.StackTrace)); ##BKA: replace with some other mechanism!?
- e = e.InnerException;
- }
-
- errorOccured = true;
+ e = e.InnerException;
}
- finally
+
+ errorOccured = true;
+ }
+ finally
+ {
+ if (progress != null && !stopPressed)
{
- if (progress != null && !stopPressed)
+ lock (queue)
{
- lock (queue)
- {
- progress(Convert.ToDouble(++executed) / Convert.ToDouble(total));
- }
+ progress(Convert.ToDouble(++executed) / Convert.ToDouble(total));
}
}
}