Index: src/Common/DelftTools.Utils/Threading/NotifyingThreadQueue.cs
===================================================================
diff -u -r8f6ae890fed8e8eae3a32f9c0498a10f82e0ddf9 -r5fc71a385897af92ccb092f2f969b5709afab85a
--- src/Common/DelftTools.Utils/Threading/NotifyingThreadQueue.cs (.../NotifyingThreadQueue.cs) (revision 8f6ae890fed8e8eae3a32f9c0498a10f82e0ddf9)
+++ src/Common/DelftTools.Utils/Threading/NotifyingThreadQueue.cs (.../NotifyingThreadQueue.cs) (revision 5fc71a385897af92ccb092f2f969b5709afab85a)
@@ -14,29 +14,32 @@
/// Nothing to do.
///
Idle,
+
///
/// Running items in the queue.
///
Running,
+
///
/// Processing of items in the queue is Paused,
/// Item(s) currently running will finish their run.
///
Pausing,
+
///
/// Processing of items in the queue is Paused.
///
Paused,
+
///
/// Unprocessed items are removed from the queue
/// Running itmes will finish their run.
///
Stopping,
-
+
Aborting
- } ;
+ };
-
///
/// Subscribe to this if you want to be notified in case items are added/removed
/// from the queue.
@@ -86,14 +89,12 @@
#region my vars
- private object syncobj;
- private int maxthreads;
+ private readonly object syncobj;
private int currentthreads;
- private QueueState qs;
- private Queue>> queue;
+ private readonly Queue>> queue;
private readonly QueueOperationHandler defaultop;
//private AsyncOperation asyncOperation;
- private Dictionary>, Thread> threadDictionary =
+ private readonly Dictionary>, Thread> threadDictionary =
new Dictionary>, Thread>();
#endregion
@@ -106,9 +107,7 @@
/// The default operation to perform on an enqueued item.
/// defaultoperation is null
public NotifyingThreadQueue(QueueOperationHandler defaultoperation)
- : this(int.MaxValue, defaultoperation)
- {
- }
+ : this(int.MaxValue, defaultoperation) {}
///
/// Constructs the NotifyingThreadQueue. sets the state to QueueState.Idle.
@@ -120,14 +119,18 @@
public NotifyingThreadQueue(int maxthreads, QueueOperationHandler defaultoperation)
{
if (maxthreads <= 0)
+ {
throw new ArgumentException("maxthreads can not be <= 0");
+ }
if (defaultoperation == null)
+ {
throw new ArgumentNullException("defaultoperation", "defaultoperation can not be null");
+ }
- qs = QueueState.Idle;
+ QueueState = QueueState.Idle;
syncobj = new object();
currentthreads = 0;
- this.maxthreads = maxthreads;
+ MaxThreads = maxthreads;
queue = new Queue>>();
defaultop = defaultoperation;
}
@@ -143,24 +146,24 @@
{
lock (syncobj)
{
- if (qs == QueueState.Idle)
+ if (QueueState == QueueState.Idle)
{
/// this is a judgment call if you pause this when you
/// don�t have any elements in it then you can go directly
/// to paused and this means that you basically want to
/// keep queuing until something happens
- qs = QueueState.Paused;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Paused;
+ QueueStateChangedInternal(QueueState);
}
- else if (qs == QueueState.Running)
+ else if (QueueState == QueueState.Running)
{
- qs = QueueState.Pausing;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Pausing;
+ QueueStateChangedInternal(QueueState);
/// running means you had some active threads so you couldn�t
/// get to paused right away
}
- else if (qs == QueueState.Stopping || qs == QueueState.Aborting)
+ else if (QueueState == QueueState.Stopping || QueueState == QueueState.Aborting)
{
ThreadErrorInternal(default(T),
new ThreadStateException("Once the queue is stopping its done processing"));
@@ -178,39 +181,41 @@
{
lock (syncobj)
{
- if ((qs == QueueState.Idle) || (qs == QueueState.Stopping) || (qs == QueueState.Aborting))
+ if ((QueueState == QueueState.Idle) || (QueueState == QueueState.Stopping) || (QueueState == QueueState.Aborting))
{
/// do nothing idle has nothing to stop and stopping
/// is already working on stopping
return;
}
- else if (qs == QueueState.Paused)
+ else if (QueueState == QueueState.Paused)
{
- qs = QueueState.Stopping;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Stopping;
+ QueueStateChangedInternal(QueueState);
/// if we are already paused then we have no threads running
/// so just drop all the extra items in the queue
while (queue.Count != 0)
+ {
ThreadErrorInternal(queue.Dequeue().Key,
new ThreadStateException("the Queue is stopping . no processing done"));
+ }
/// ensure proper event flow paused-> stopping -> idle
- qs = QueueState.Idle;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Idle;
+ QueueStateChangedInternal(QueueState);
}
else
{
- qs = QueueState.Stopping;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Stopping;
+ QueueStateChangedInternal(QueueState);
/// why are we not dequeuing everything? that�s b/c if we have threads
/// left they have to finish in their own good time so they can go
/// through the process of getting rid of all the others. both ways work
if (currentthreads == 0)
{
- qs = QueueState.Idle;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Idle;
+ QueueStateChangedInternal(QueueState);
}
}
}
@@ -223,16 +228,16 @@
{
lock (syncobj)
{
- if ((qs == QueueState.Idle) || (qs == QueueState.Stopping) || (qs == QueueState.Aborting))
+ if ((QueueState == QueueState.Idle) || (QueueState == QueueState.Stopping) || (QueueState == QueueState.Aborting))
{
/// do nothing idle has nothing to stop and stopping
/// is already working on stopping
return;
}
- else if (qs == QueueState.Paused)
+ else if (QueueState == QueueState.Paused)
{
- qs = QueueState.Aborting;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Aborting;
+ QueueStateChangedInternal(QueueState);
// if we are already paused then we have no threads running
/// so just drop all the extra items in the queue
@@ -243,13 +248,13 @@
}
// ensure proper event flow paused-> stopping -> idle
- qs = QueueState.Idle;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Idle;
+ QueueStateChangedInternal(QueueState);
}
else
{
- qs = QueueState.Aborting;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Aborting;
+ QueueStateChangedInternal(QueueState);
/// why are we not dequeuing everything? that�s b/c if we have threads
/// left they have to finish in their own good time so they can go
@@ -274,8 +279,8 @@
if (currentthreads == 0)
{
- qs = QueueState.Idle;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Idle;
+ QueueStateChangedInternal(QueueState);
}
}
}
@@ -312,28 +317,28 @@
{
lock (syncobj)
{
- if (qs == QueueState.Idle)
+ if (QueueState == QueueState.Idle)
{
return;
}
- else if (qs == QueueState.Paused)
+ else if (QueueState == QueueState.Paused)
{
/// if we are already paused then we have no threads running
/// remove item from the queue
RemoveQueueKvp(kvp);
if (queue.Count == 0)
{
- qs = QueueState.Stopping;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Stopping;
+ QueueStateChangedInternal(QueueState);
/// ensure proper event flow paused-> stopping -> idle
- qs = QueueState.Idle;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Idle;
+ QueueStateChangedInternal(QueueState);
}
return;
}
else //running, pausing or stopping
- //check first if item is in the queue
+ //check first if item is in the queue
{
RemoveQueueKvp(kvp);
@@ -395,21 +400,23 @@
{
lock (syncobj)
{
- if ((qs == QueueState.Pausing) || (qs == QueueState.Paused))
+ if ((QueueState == QueueState.Pausing) || (QueueState == QueueState.Paused))
{
- qs = QueueState.Running;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Running;
+ QueueStateChangedInternal(QueueState);
- while (currentthreads < maxthreads)
+ while (currentthreads < MaxThreads)
+ {
TryNewThread();
+ }
}
- else if ((qs == QueueState.Idle) || (qs == QueueState.Running))
+ else if ((QueueState == QueueState.Idle) || (QueueState == QueueState.Running))
{
/// Continuing to process while the queue is idle is meaning
/// less just ignore the command
return;
}
- else if (qs == QueueState.Stopping)
+ else if (QueueState == QueueState.Stopping)
{
ThreadErrorInternal(default(T),
new ThreadStateException("Once the queue is stopping its done processing"));
@@ -424,18 +431,12 @@
///
/// Gets the current QueueState.
///
- public QueueState QueueState
- {
- get { return qs; }
- }
+ public QueueState QueueState { get; private set; }
///
/// Gets the maximum number of operations that can be executed at once.
///
- public int MaxThreads
- {
- get { return maxthreads; }
- }
+ public int MaxThreads { get; private set; }
///
/// Gets the current number of current ongoing operations.
@@ -456,7 +457,10 @@
///
public int Count
{
- get { return queue.Count + threadDictionary.Count; }
+ get
+ {
+ return queue.Count + threadDictionary.Count;
+ }
}
#endregion
@@ -482,16 +486,18 @@
public void EnQueue(T item, QueueOperationHandler opp)
{
if (opp == null)
+ {
throw new ArgumentNullException("opp", "operation can not be null");
+ }
lock (syncobj)
{
- if (qs == QueueState.Idle)
+ if (QueueState == QueueState.Idle)
{
#region idle
- qs = QueueState.Running;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Running;
+ QueueStateChangedInternal(QueueState);
/// the problem with generics is that sometimes the fully
/// descriptive name goes on for a while
@@ -516,7 +522,7 @@
#endregion
}
- else if ((qs == QueueState.Paused) || (qs == QueueState.Pausing))
+ else if ((QueueState == QueueState.Paused) || (QueueState == QueueState.Pausing))
{
#region pause
@@ -533,7 +539,7 @@
#endregion
}
- else if (qs == QueueState.Running)
+ else if (QueueState == QueueState.Running)
{
#region running
@@ -553,7 +559,7 @@
#endregion
}
- else if (qs == QueueState.Stopping)
+ else if (QueueState == QueueState.Stopping)
{
#region stopping
@@ -625,13 +631,13 @@
{
lock (syncobj)
{
- if (qs == QueueState.Running)
+ if (QueueState == QueueState.Running)
{
#region Running
if (queue.Count != 0)
{
- if (currentthreads < maxthreads)
+ if (currentthreads < MaxThreads)
{
currentthreads++;
@@ -648,14 +654,14 @@
{
if (currentthreads == 0)
{
- qs = QueueState.Idle;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Idle;
+ QueueStateChangedInternal(QueueState);
}
}
#endregion
}
- else if (qs == QueueState.Stopping || qs == QueueState.Aborting)
+ else if (QueueState == QueueState.Stopping || QueueState == QueueState.Aborting)
{
#region stopping
@@ -665,27 +671,29 @@
/// important to keep with that and get rid of all the queue items in
/// that same way
while (queue.Count != 0)
+ {
ThreadErrorInternal(queue.Dequeue().Key,
new ThreadStateException("the Queue is stopping . no processing done"));
+ }
/// all threads come through here so its up to us to single the change
/// from stopping to idle
if (currentthreads == 0)
{
- qs = QueueState.Idle;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Idle;
+ QueueStateChangedInternal(QueueState);
}
#endregion
}
- else if (qs == QueueState.Pausing)
+ else if (QueueState == QueueState.Pausing)
{
#region Pausing
if (currentthreads == 0)
{
- qs = QueueState.Paused;
- QueueStateChangedInternal(qs);
+ QueueState = QueueState.Paused;
+ QueueStateChangedInternal(QueueState);
}
#endregion
@@ -714,7 +722,6 @@
EventsHelper.UnsafeFire(QueueStateChanged, qs);
}
-
private void ThreadFinishedInternal(KeyValuePair> finisheditem)
{
{