using System;
using System.Collections.Generic;
using System.Threading;
namespace DelftTools.Utils.Threading
{
///
/// QueueState can be one of the following: Idle, Running, Pausing
/// Paused, Stopping.
///
public enum QueueState
{
///
/// Nothing to do.
///
Idle,
///
/// Running items in the queue.
///
Running,
///
/// Processing of items in the queue is Paused,
/// Item(s) currently running will finish their run.
///
Pausing,
///
/// Processing of items in the queue is Paused.
///
Paused,
///
/// Unprocessed items are removed from the queue
/// Running itmes will finish their run.
///
Stopping,
Aborting
} ;
///
/// Subscribe to this if you want to be notified in case items are added/removed
/// from the queue.
///
///
///
public delegate void QueueOperationHandler(T item);
///
/// http://www.codeproject.com/useritems/NotifyingThreadQueue.asp
///
/// Questions:
///
/// Why to store running activities as KeyValuePair
/// It is better to look for some workflow library or use WPF when it will be in Mono (Seems to be a task for Google Summer of Code 2007)
/// Overall class looks tooooooo complicated.
///
///
public class NotifyingThreadQueue //:IDisposable
{
#region my events
///
/// Subscribe to this in case you want to monitor the state of the queue.
///
public event GenericEventHandler QueueStateChanged;
///
/// Subscribe to this in case you want to know when the threadqueue
/// finishes on of the items in the queue.
///
public event GenericEventHandler ThreadFinished;
///
/// Subscribe to this to be informed about errors taking place
/// while calculating one of the items in the queue.
///
public event GenericEventHandler ThreadError;
///
/// Subscribe to this in case you want to know when the threadqueue
/// starts running one of the items in the queue
///
public event GenericEventHandler ThreadStarted;
#endregion
#region my vars
private object syncobj;
private int maxthreads;
private int currentthreads;
private QueueState qs;
private Queue>> queue;
private readonly QueueOperationHandler defaultop;
//private AsyncOperation asyncOperation;
private Dictionary>, Thread> threadDictionary =
new Dictionary>, Thread>();
#endregion
#region constructors
///
/// Constructs the NotifyingThreadQueue. sets the state to QueueState.Idle.
///
/// The default operation to perform on an enqueued item.
/// defaultoperation is null
public NotifyingThreadQueue(QueueOperationHandler defaultoperation)
: this(int.MaxValue, defaultoperation)
{
}
///
/// Constructs the NotifyingThreadQueue. sets the state to QueueState.Idle.
///
/// Sets the maximum number of simultaneous operations
/// The default operation to perform on an enqueued item
/// maxthreads is less than or equal to 0
/// defaultoperation is null
public NotifyingThreadQueue(int maxthreads, QueueOperationHandler defaultoperation)
{
if (maxthreads <= 0)
throw new ArgumentException("maxthreads can not be <= 0");
if (defaultoperation == null)
throw new ArgumentNullException("defaultoperation", "defaultoperation can not be null");
qs = QueueState.Idle;
syncobj = new object();
currentthreads = 0;
this.maxthreads = maxthreads;
queue = new Queue>>();
defaultop = defaultoperation;
}
#endregion
#region control ops
///
/// Pauses the execution of future operations. the current operations are allowed to finish.
///
public void Pause()
{
lock (syncobj)
{
if (qs == QueueState.Idle)
{
/// this is a judgment call if you pause this when you
/// don’t have any elements in it then you can go directly
/// to paused and this means that you basically want to
/// keep queuing until something happens
qs = QueueState.Paused;
QueueStateChangedInternal(qs);
}
else if (qs == QueueState.Running)
{
qs = QueueState.Pausing;
QueueStateChangedInternal(qs);
/// running means you had some active threads so you couldn’t
/// get to paused right away
}
else if (qs == QueueState.Stopping || qs == QueueState.Aborting)
{
ThreadErrorInternal(default(T),
new ThreadStateException("Once the queue is stopping its done processing"));
}
/// if we are already paused or pausing we dont need to do anything
}
}
///
/// Stops the execution of future operations. clears out all pending operations.
/// No further operations are allowed to be enqueued. the current operations are
/// allowed to finish.
///
public void Stop()
{
lock (syncobj)
{
if ((qs == QueueState.Idle) || (qs == QueueState.Stopping) || (qs == QueueState.Aborting))
{
/// do nothing idle has nothing to stop and stopping
/// is already working on stopping
return;
}
else if (qs == QueueState.Paused)
{
qs = QueueState.Stopping;
QueueStateChangedInternal(qs);
/// if we are already paused then we have no threads running
/// so just drop all the extra items in the queue
while (queue.Count != 0)
ThreadErrorInternal(queue.Dequeue().Key,
new ThreadStateException("the Queue is stopping . no processing done"));
/// ensure proper event flow paused-> stopping -> idle
qs = QueueState.Idle;
QueueStateChangedInternal(qs);
}
else
{
qs = QueueState.Stopping;
QueueStateChangedInternal(qs);
/// why are we not dequeuing everything? that’s b/c if we have threads
/// left they have to finish in their own good time so they can go
/// through the process of getting rid of all the others. both ways work
if (currentthreads == 0)
{
qs = QueueState.Idle;
QueueStateChangedInternal(qs);
}
}
}
}
///
/// Abort running item and empty queue.
///
public void Abort()
{
lock (syncobj)
{
if ((qs == QueueState.Idle) || (qs == QueueState.Stopping) || (qs == QueueState.Aborting))
{
/// do nothing idle has nothing to stop and stopping
/// is already working on stopping
return;
}
else if (qs == QueueState.Paused)
{
qs = QueueState.Aborting;
QueueStateChangedInternal(qs);
// if we are already paused then we have no threads running
/// so just drop all the extra items in the queue
while (queue.Count != 0)
{
queue.Dequeue();
//ThreadErrorInternal(queue.Dequeue().Key, new ThreadStateException("the Queue is stopping . no processing done"));
}
// ensure proper event flow paused-> stopping -> idle
qs = QueueState.Idle;
QueueStateChangedInternal(qs);
}
else
{
qs = QueueState.Aborting;
QueueStateChangedInternal(qs);
/// why are we not dequeuing everything? that’s b/c if we have threads
/// left they have to finish in their own good time so they can go
/// through the process of getting rid of all the others. both ways work
//abort running threads
KeyValuePair>[] kvpArr =
new KeyValuePair>[threadDictionary.Keys.Count];
threadDictionary.Keys.CopyTo(kvpArr, 0);
int kvpCount = threadDictionary.Keys.Count;
for (int i = 0; i < kvpCount; i++)
{
if (threadDictionary.ContainsKey(kvpArr[i]))
{
threadDictionary[kvpArr[i]].Abort();
//ThreadErrorInternal(kvpArr[i].Key,new Exception("Aborted thread"));
}
}
//call to ensure the list is cleared
threadDictionary.Clear();
if (currentthreads == 0)
{
qs = QueueState.Idle;
QueueStateChangedInternal(qs);
}
}
}
}
///
/// Abort thread for a specific item / default operation.
///
///
public void Abort(T Item)
{
KeyValuePair> kvp =
new KeyValuePair>(Item, defaultop);
Abort(kvp);
}
///
/// Abort thread for a specific item/operation combination.
///
///
///
public void Abort(T Item, QueueOperationHandler operation)
{
KeyValuePair> kvp =
new KeyValuePair>(Item, operation);
Abort(kvp);
}
///
/// Abort thread for key-value pair.
///
///
private void Abort(KeyValuePair> kvp)
{
lock (syncobj)
{
if (qs == QueueState.Idle)
{
return;
}
else if (qs == QueueState.Paused)
{
/// if we are already paused then we have no threads running
/// remove item from the queue
RemoveQueueKvp(kvp);
if (queue.Count == 0)
{
qs = QueueState.Stopping;
QueueStateChangedInternal(qs);
/// ensure proper event flow paused-> stopping -> idle
qs = QueueState.Idle;
QueueStateChangedInternal(qs);
}
return;
}
else //running, pausing or stopping
//check first if item is in the queue
{
RemoveQueueKvp(kvp);
if (threadDictionary.ContainsKey(kvp))
{
threadDictionary[kvp].Abort();
threadDictionary.Remove(kvp);
ThreadErrorInternal(kvp.Key, new Exception("Aborted process"));
}
}
}
}
///
/// Remove specific item from the queue.
///
///
///
private void RemoveQueueKvp(KeyValuePair> kvp)
{
lock (syncobj)
{
if (!queue.Contains(kvp))
{
return;
}
//QueueState presentState = qs;
//qs = QueueState.Pausing;
Queue>> copyOfQueue =
new Queue>>();
//enqueue all items of the present queue in a copy
while (queue.Count != 0)
{
KeyValuePair> item = queue.Dequeue();
if (!item.Equals(kvp))
{
copyOfQueue.Enqueue(queue.Dequeue());
}
else
{
ThreadErrorInternal(kvp.Key, new Exception("Item removed from queue"));
}
}
while (copyOfQueue.Count != 0)
{
queue.Enqueue(copyOfQueue.Dequeue());
}
//qs = presentState;
return;
}
}
///
/// Continues the execution of enqueued operations after a pause.
///
public void Continue()
{
lock (syncobj)
{
if ((qs == QueueState.Pausing) || (qs == QueueState.Paused))
{
qs = QueueState.Running;
QueueStateChangedInternal(qs);
while (currentthreads < maxthreads)
TryNewThread();
}
else if ((qs == QueueState.Idle) || (qs == QueueState.Running))
{
/// Continuing to process while the queue is idle is meaning
/// less just ignore the command
return;
}
else if (qs == QueueState.Stopping)
{
ThreadErrorInternal(default(T),
new ThreadStateException("Once the queue is stopping its done processing"));
}
}
}
#endregion
#region data accessors
///
/// Gets the current QueueState.
///
public QueueState QueueState
{
get { return qs; }
}
///
/// Gets the maximum number of operations that can be executed at once.
///
public int MaxThreads
{
get { return maxthreads; }
}
///
/// Gets the current number of current ongoing operations.
///
public int CurrentRunningThreads
{
get
{
lock (syncobj)
{
return currentthreads;
}
}
}
///
/// Return all running items and all items queued for running.
///
public int Count
{
get { return queue.Count + threadDictionary.Count; }
}
#endregion
#region enque ops
///
/// Adds the item to the queue to process asynchronously.
///
/// the item to enqueue
public void EnQueue(T item)
{
EnQueue(item, defaultop);
}
///
/// Adds the item to the queue to process asynchronously and
/// uses the different operation instead of the default.
///
/// the item to enqueue
/// the new operation that overrides the default
/// opp is null
public void EnQueue(T item, QueueOperationHandler opp)
{
if (opp == null)
throw new ArgumentNullException("opp", "operation can not be null");
lock (syncobj)
{
if (qs == QueueState.Idle)
{
#region idle
qs = QueueState.Running;
QueueStateChangedInternal(qs);
/// the problem with generics is that sometimes the fully
/// descriptive name goes on for a while
KeyValuePair> kvp =
new KeyValuePair>(item, opp);
/// thread demands that its ParameterizedThreadStart take an object not a generic type
/// one might have resonably thought that there would be a generic constructor that
/// took a strongly typed value but there is not one
currentthreads++;
ParameterizedThreadStart threadStart = RunOpp;
Thread thread = new Thread(threadStart);
//store reference for this thread
threadDictionary.Add(kvp, thread);
//trigger event : started proces for this item
ThreadStartedInternal(kvp.Key);
thread.Start(kvp);
#endregion
}
else if ((qs == QueueState.Paused) || (qs == QueueState.Pausing))
{
#region pause
KeyValuePair> kvp =
new KeyValuePair>(item, opp);
if (queue.Contains(kvp))
{
ThreadErrorInternal(kvp.Key, new Exception("This item is in the queue already"));
return;
}
/// in the case that we are pausing or currently paused we just add the value to the
/// queue we dont try to run the process
queue.Enqueue(kvp);
#endregion
}
else if (qs == QueueState.Running)
{
#region running
KeyValuePair> kvp =
new KeyValuePair>(item, opp);
if ((threadDictionary.ContainsKey(kvp)) || queue.Contains(kvp))
{
ThreadErrorInternal(kvp.Key, new Exception("This item is in the queue already"));
return;
}
/// you have to enqueue the item then try to execute the first item in the process
/// always enqueue first as this ensures that you get the oldest item first since
/// that is what you wanted to do you did not want a stack
queue.Enqueue(kvp);
TryNewThread();
#endregion
}
else if (qs == QueueState.Stopping)
{
#region stopping
/// when you are stopping the queue i assume that you wanted to stop it not pause it this
/// means that if you try to enqueue something it will throw an exception since you
/// shouldnt be enqueueing anything since when the queue gets done all its current
/// threads it clears the rest out so why bother enqueueing it. at this point we have
/// a choice we can make the notifyer die or we can use the error event we already
/// have built in to tell the sender. i chose the later. also try to pick an appropriate
/// exception not just the base
ThreadErrorInternal(item, new ThreadStateException("the Queue is stopping . no processing done"));
#endregion
}
}
}
///
/// If the queue contains a specific item this method will return true.
///
///
///
public bool Contains(T obj)
{
foreach (KeyValuePair> kvp in threadDictionary.Keys)
{
if (kvp.Key.Equals(obj))
{
return true;
}
}
foreach (KeyValuePair> kvp in queue)
{
if (kvp.Key.Equals(obj))
{
return true;
}
}
return false;
}
#region tools
private void RunOpp(object o)
{
KeyValuePair> kvp = (KeyValuePair>) o;
try
{
kvp.Value(kvp.Key);
ThreadFinishedInternal(kvp);
}
catch (Exception ex)
{
ThreadErrorInternal(kvp.Key, new ThreadStateException("error processing. partial processing done.", ex));
}
finally
{
lock (syncobj)
{
currentthreads--;
}
TryNewThread();
}
}
private void TryNewThread()
{
lock (syncobj)
{
if (qs == QueueState.Running)
{
#region Running
if (queue.Count != 0)
{
if (currentthreads < maxthreads)
{
currentthreads++;
//trigger event: started process for item
ThreadStartedInternal(queue.Peek().Key);
ParameterizedThreadStart threadStart = RunOpp;
Thread thread = new Thread(threadStart);
//store a reference for this thread in the dictionary
threadDictionary.Add(queue.Peek(), thread);
thread.Start(queue.Dequeue());
}
}
else
{
if (currentthreads == 0)
{
qs = QueueState.Idle;
QueueStateChangedInternal(qs);
}
}
#endregion
}
else if (qs == QueueState.Stopping || qs == QueueState.Aborting)
{
#region stopping
/// normally when we stop a queue we can just clear out the remaining
/// values and let the threads peter out. however, we made the decision
/// to throw an exception by way of our exception handler. it is therefore
/// important to keep with that and get rid of all the queue items in
/// that same way
while (queue.Count != 0)
ThreadErrorInternal(queue.Dequeue().Key,
new ThreadStateException("the Queue is stopping . no processing done"));
/// all threads come through here so its up to us to single the change
/// from stopping to idle
if (currentthreads == 0)
{
qs = QueueState.Idle;
QueueStateChangedInternal(qs);
}
#endregion
}
else if (qs == QueueState.Pausing)
{
#region Pausing
if (currentthreads == 0)
{
qs = QueueState.Paused;
QueueStateChangedInternal(qs);
}
#endregion
}
else
{
#region Idle / Paused
/// there should be no way to got in here while your idle or paused
/// this is just an error check
ThreadErrorInternal(default(T), new Exception("internal state bad"));
#endregion
}
}
}
#endregion
#endregion
#region event forwarders
private void QueueStateChangedInternal(QueueState qs)
{
EventsHelper.UnsafeFire(QueueStateChanged, qs);
}
private void ThreadFinishedInternal(KeyValuePair> finisheditem)
{
{
threadDictionary.Remove(finisheditem);
}
EventsHelper.UnsafeFire(ThreadFinished, finisheditem.Key);
}
private void ThreadErrorInternal(T unfinisheditem, Exception ex)
{
lock (syncobj)
{
foreach (KeyValuePair> item in threadDictionary.Keys)
{
if (item.Key.Equals(unfinisheditem))
{
threadDictionary.Remove(item);
break;
}
}
}
EventsHelper.UnsafeFire(ThreadError, unfinisheditem, ex);
}
private void ThreadStartedInternal(T startedItem)
{
EventsHelper.UnsafeFire(ThreadStarted, startedItem);
}
#endregion
}
}