// Copyright (C) Stichting Deltares 2025. All rights reserved.
//
// This file is part of the Dam Engine.
//
// The Dam Engine is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
//
// All names, logos, and references to "Deltares" are registered trademarks of
// Stichting Deltares and remain full property of Stichting Deltares at all times.
// All rights reserved.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using Deltares.DamEngine.Data.Standard.Calculation;
using ThreadState = System.Threading.ThreadState;
namespace Deltares.DamEngine.Calculators.General;
///
/// Utility to run tasks in parallel
///
public class Parallel
{
private static readonly Queue queue = new Queue();
private static TaskDelegate task;
private static ProgressDelegate progress;
private static int total;
private static int executed;
private static readonly List threads = new List();
private static readonly List processes = new List();
private static bool stopPressed;
private static bool errorOccured;
private static readonly bool limitToAvailableProcessors = true;
///
/// Runs a number of tasks in parallel. Returns when all tasks have been executed.
///
/// The objects on which the task is to be performed
/// The task to be executed
/// The maximum number of processes, will be equal to the number of cores if omitted
/// Optional delegate indicating the progress of the tasks
public static void Run(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate, int maxProcesses)
{
if (list.Count == 0)
{
return;
}
stopPressed = false;
processes.Clear();
if (maxProcesses <= 0)
{
maxProcesses = Int32.MaxValue;
}
maxProcesses = Math.Min(list.Count, maxProcesses);
if (limitToAvailableProcessors)
{
maxProcesses = Math.Min(Environment.ProcessorCount - 1, maxProcesses);
}
//DateTime start = DateTime.Now;
if (progressDelegate != null)
{
progressDelegate(0);
}
if (maxProcesses == 1)
{
total = list.Count;
RunSequential(list, parallelTask, progressDelegate);
}
else
{
try
{
queue.Clear();
task = parallelTask;
progress = progressDelegate;
foreach (object argument in list)
{
queue.Enqueue(argument);
}
total = queue.Count;
executed = 0;
threads.Clear();
for (var i = 0; i < maxProcesses; i++)
{
var thread = new Thread(RunTask);
threads.Add(thread);
thread.Start(Context.CurrentContext); // Pass the current context as task context
}
foreach (Thread thread in threads)
{
thread.Join();
}
}
catch (ThreadAbortException)
{
Abort();
}
}
if (errorOccured)
{
throw new ParallelException("Error occured in one of the parallel runs.");
}
//DateTime end = DateTime.Now;
//TimeSpan period = end - start;
//if (period.TotalMilliseconds > 0)
//{
// string msg = (maxProcesses == 1 ? "Duration of tasks: " : "Parallel duration: ");
// Console.WriteLine(msg + period.TotalMilliseconds + " ms");
//}
}
///
/// Kills all running and queued tasks
///
public static void Abort()
{
stopPressed = true;
foreach (Process process in processes.ToArray())
{
if (!process.HasExited)
{
process.Kill();
}
}
foreach (Thread thread in threads)
{
switch (thread.ThreadState)
{
case ThreadState.Running:
try
{
#pragma warning disable SYSLIB0006
thread.Abort();
#pragma warning restore SYSLIB0006
}
catch (ThreadAbortException)
{
// ignore
}
break;
case ThreadState.WaitSleepJoin:
thread.Interrupt();
break;
}
}
threads.Clear();
processes.Clear();
}
private static void RunSequential(IList list, TaskDelegate parallelTask, ProgressDelegate progressDelegate)
{
for (var i = 0; i < list.Count; i++)
{
parallelTask(list[i]);
if (progressDelegate != null)
{
progressDelegate(Convert.ToDouble(i + 1) / Convert.ToDouble(total));
}
}
}
private static void RunTask(object context)
{
object argument;
// Set the context of the task thread
Context.CurrentContext = context as IContext;
while (true)
{
lock (queue)
{
if (!stopPressed && queue.Count > 0)
{
argument = queue.Dequeue();
}
else
{
return;
}
}
try
{
if (!stopPressed)
{
task(argument);
}
}
catch (ThreadAbortException)
{
stopPressed = true;
}
catch (Exception e)
{
// most errors already reported to logfile
while (e != null) // Extra instrumentation to trace errors occurring during parallel executions
{
// LogManager.Add(new LogMessage(LogMessageType.Trace, task, "Exception occurred in parallel run: " + e.Message + Environment.NewLine +
// "Stacktrace: " + Environment.NewLine
// + e.StackTrace)); ##BKA: replace with some other mechanism!?
e = e.InnerException;
}
errorOccured = true;
}
finally
{
if (progress != null && !stopPressed)
{
lock (queue)
{
progress(Convert.ToDouble(++executed) / Convert.ToDouble(total));
}
}
}
}
}
}