Index: src/Common/DelftTools.Utils/Threading/NotifyingThreadQueue.cs =================================================================== diff -u -r8f6ae890fed8e8eae3a32f9c0498a10f82e0ddf9 -r5fc71a385897af92ccb092f2f969b5709afab85a --- src/Common/DelftTools.Utils/Threading/NotifyingThreadQueue.cs (.../NotifyingThreadQueue.cs) (revision 8f6ae890fed8e8eae3a32f9c0498a10f82e0ddf9) +++ src/Common/DelftTools.Utils/Threading/NotifyingThreadQueue.cs (.../NotifyingThreadQueue.cs) (revision 5fc71a385897af92ccb092f2f969b5709afab85a) @@ -14,29 +14,32 @@ /// Nothing to do. /// Idle, + /// /// Running items in the queue. /// Running, + /// /// Processing of items in the queue is Paused, /// Item(s) currently running will finish their run. /// Pausing, + /// /// Processing of items in the queue is Paused. /// Paused, + /// /// Unprocessed items are removed from the queue /// Running itmes will finish their run. /// Stopping, - + Aborting - } ; + }; - /// /// Subscribe to this if you want to be notified in case items are added/removed /// from the queue. @@ -86,14 +89,12 @@ #region my vars - private object syncobj; - private int maxthreads; + private readonly object syncobj; private int currentthreads; - private QueueState qs; - private Queue>> queue; + private readonly Queue>> queue; private readonly QueueOperationHandler defaultop; //private AsyncOperation asyncOperation; - private Dictionary>, Thread> threadDictionary = + private readonly Dictionary>, Thread> threadDictionary = new Dictionary>, Thread>(); #endregion @@ -106,9 +107,7 @@ /// The default operation to perform on an enqueued item. /// defaultoperation is null public NotifyingThreadQueue(QueueOperationHandler defaultoperation) - : this(int.MaxValue, defaultoperation) - { - } + : this(int.MaxValue, defaultoperation) {} /// /// Constructs the NotifyingThreadQueue. sets the state to QueueState.Idle. @@ -120,14 +119,18 @@ public NotifyingThreadQueue(int maxthreads, QueueOperationHandler defaultoperation) { if (maxthreads <= 0) + { throw new ArgumentException("maxthreads can not be <= 0"); + } if (defaultoperation == null) + { throw new ArgumentNullException("defaultoperation", "defaultoperation can not be null"); + } - qs = QueueState.Idle; + QueueState = QueueState.Idle; syncobj = new object(); currentthreads = 0; - this.maxthreads = maxthreads; + MaxThreads = maxthreads; queue = new Queue>>(); defaultop = defaultoperation; } @@ -143,24 +146,24 @@ { lock (syncobj) { - if (qs == QueueState.Idle) + if (QueueState == QueueState.Idle) { /// this is a judgment call if you pause this when you /// don�t have any elements in it then you can go directly /// to paused and this means that you basically want to /// keep queuing until something happens - qs = QueueState.Paused; - QueueStateChangedInternal(qs); + QueueState = QueueState.Paused; + QueueStateChangedInternal(QueueState); } - else if (qs == QueueState.Running) + else if (QueueState == QueueState.Running) { - qs = QueueState.Pausing; - QueueStateChangedInternal(qs); + QueueState = QueueState.Pausing; + QueueStateChangedInternal(QueueState); /// running means you had some active threads so you couldn�t /// get to paused right away } - else if (qs == QueueState.Stopping || qs == QueueState.Aborting) + else if (QueueState == QueueState.Stopping || QueueState == QueueState.Aborting) { ThreadErrorInternal(default(T), new ThreadStateException("Once the queue is stopping its done processing")); @@ -178,39 +181,41 @@ { lock (syncobj) { - if ((qs == QueueState.Idle) || (qs == QueueState.Stopping) || (qs == QueueState.Aborting)) + if ((QueueState == QueueState.Idle) || (QueueState == QueueState.Stopping) || (QueueState == QueueState.Aborting)) { /// do nothing idle has nothing to stop and stopping /// is already working on stopping return; } - else if (qs == QueueState.Paused) + else if (QueueState == QueueState.Paused) { - qs = QueueState.Stopping; - QueueStateChangedInternal(qs); + QueueState = QueueState.Stopping; + QueueStateChangedInternal(QueueState); /// if we are already paused then we have no threads running /// so just drop all the extra items in the queue while (queue.Count != 0) + { ThreadErrorInternal(queue.Dequeue().Key, new ThreadStateException("the Queue is stopping . no processing done")); + } /// ensure proper event flow paused-> stopping -> idle - qs = QueueState.Idle; - QueueStateChangedInternal(qs); + QueueState = QueueState.Idle; + QueueStateChangedInternal(QueueState); } else { - qs = QueueState.Stopping; - QueueStateChangedInternal(qs); + QueueState = QueueState.Stopping; + QueueStateChangedInternal(QueueState); /// why are we not dequeuing everything? that�s b/c if we have threads /// left they have to finish in their own good time so they can go /// through the process of getting rid of all the others. both ways work if (currentthreads == 0) { - qs = QueueState.Idle; - QueueStateChangedInternal(qs); + QueueState = QueueState.Idle; + QueueStateChangedInternal(QueueState); } } } @@ -223,16 +228,16 @@ { lock (syncobj) { - if ((qs == QueueState.Idle) || (qs == QueueState.Stopping) || (qs == QueueState.Aborting)) + if ((QueueState == QueueState.Idle) || (QueueState == QueueState.Stopping) || (QueueState == QueueState.Aborting)) { /// do nothing idle has nothing to stop and stopping /// is already working on stopping return; } - else if (qs == QueueState.Paused) + else if (QueueState == QueueState.Paused) { - qs = QueueState.Aborting; - QueueStateChangedInternal(qs); + QueueState = QueueState.Aborting; + QueueStateChangedInternal(QueueState); // if we are already paused then we have no threads running /// so just drop all the extra items in the queue @@ -243,13 +248,13 @@ } // ensure proper event flow paused-> stopping -> idle - qs = QueueState.Idle; - QueueStateChangedInternal(qs); + QueueState = QueueState.Idle; + QueueStateChangedInternal(QueueState); } else { - qs = QueueState.Aborting; - QueueStateChangedInternal(qs); + QueueState = QueueState.Aborting; + QueueStateChangedInternal(QueueState); /// why are we not dequeuing everything? that�s b/c if we have threads /// left they have to finish in their own good time so they can go @@ -274,8 +279,8 @@ if (currentthreads == 0) { - qs = QueueState.Idle; - QueueStateChangedInternal(qs); + QueueState = QueueState.Idle; + QueueStateChangedInternal(QueueState); } } } @@ -312,28 +317,28 @@ { lock (syncobj) { - if (qs == QueueState.Idle) + if (QueueState == QueueState.Idle) { return; } - else if (qs == QueueState.Paused) + else if (QueueState == QueueState.Paused) { /// if we are already paused then we have no threads running /// remove item from the queue RemoveQueueKvp(kvp); if (queue.Count == 0) { - qs = QueueState.Stopping; - QueueStateChangedInternal(qs); + QueueState = QueueState.Stopping; + QueueStateChangedInternal(QueueState); /// ensure proper event flow paused-> stopping -> idle - qs = QueueState.Idle; - QueueStateChangedInternal(qs); + QueueState = QueueState.Idle; + QueueStateChangedInternal(QueueState); } return; } else //running, pausing or stopping - //check first if item is in the queue + //check first if item is in the queue { RemoveQueueKvp(kvp); @@ -395,21 +400,23 @@ { lock (syncobj) { - if ((qs == QueueState.Pausing) || (qs == QueueState.Paused)) + if ((QueueState == QueueState.Pausing) || (QueueState == QueueState.Paused)) { - qs = QueueState.Running; - QueueStateChangedInternal(qs); + QueueState = QueueState.Running; + QueueStateChangedInternal(QueueState); - while (currentthreads < maxthreads) + while (currentthreads < MaxThreads) + { TryNewThread(); + } } - else if ((qs == QueueState.Idle) || (qs == QueueState.Running)) + else if ((QueueState == QueueState.Idle) || (QueueState == QueueState.Running)) { /// Continuing to process while the queue is idle is meaning /// less just ignore the command return; } - else if (qs == QueueState.Stopping) + else if (QueueState == QueueState.Stopping) { ThreadErrorInternal(default(T), new ThreadStateException("Once the queue is stopping its done processing")); @@ -424,18 +431,12 @@ /// /// Gets the current QueueState. /// - public QueueState QueueState - { - get { return qs; } - } + public QueueState QueueState { get; private set; } /// /// Gets the maximum number of operations that can be executed at once. /// - public int MaxThreads - { - get { return maxthreads; } - } + public int MaxThreads { get; private set; } /// /// Gets the current number of current ongoing operations. @@ -456,7 +457,10 @@ /// public int Count { - get { return queue.Count + threadDictionary.Count; } + get + { + return queue.Count + threadDictionary.Count; + } } #endregion @@ -482,16 +486,18 @@ public void EnQueue(T item, QueueOperationHandler opp) { if (opp == null) + { throw new ArgumentNullException("opp", "operation can not be null"); + } lock (syncobj) { - if (qs == QueueState.Idle) + if (QueueState == QueueState.Idle) { #region idle - qs = QueueState.Running; - QueueStateChangedInternal(qs); + QueueState = QueueState.Running; + QueueStateChangedInternal(QueueState); /// the problem with generics is that sometimes the fully /// descriptive name goes on for a while @@ -516,7 +522,7 @@ #endregion } - else if ((qs == QueueState.Paused) || (qs == QueueState.Pausing)) + else if ((QueueState == QueueState.Paused) || (QueueState == QueueState.Pausing)) { #region pause @@ -533,7 +539,7 @@ #endregion } - else if (qs == QueueState.Running) + else if (QueueState == QueueState.Running) { #region running @@ -553,7 +559,7 @@ #endregion } - else if (qs == QueueState.Stopping) + else if (QueueState == QueueState.Stopping) { #region stopping @@ -625,13 +631,13 @@ { lock (syncobj) { - if (qs == QueueState.Running) + if (QueueState == QueueState.Running) { #region Running if (queue.Count != 0) { - if (currentthreads < maxthreads) + if (currentthreads < MaxThreads) { currentthreads++; @@ -648,14 +654,14 @@ { if (currentthreads == 0) { - qs = QueueState.Idle; - QueueStateChangedInternal(qs); + QueueState = QueueState.Idle; + QueueStateChangedInternal(QueueState); } } #endregion } - else if (qs == QueueState.Stopping || qs == QueueState.Aborting) + else if (QueueState == QueueState.Stopping || QueueState == QueueState.Aborting) { #region stopping @@ -665,27 +671,29 @@ /// important to keep with that and get rid of all the queue items in /// that same way while (queue.Count != 0) + { ThreadErrorInternal(queue.Dequeue().Key, new ThreadStateException("the Queue is stopping . no processing done")); + } /// all threads come through here so its up to us to single the change /// from stopping to idle if (currentthreads == 0) { - qs = QueueState.Idle; - QueueStateChangedInternal(qs); + QueueState = QueueState.Idle; + QueueStateChangedInternal(QueueState); } #endregion } - else if (qs == QueueState.Pausing) + else if (QueueState == QueueState.Pausing) { #region Pausing if (currentthreads == 0) { - qs = QueueState.Paused; - QueueStateChangedInternal(qs); + QueueState = QueueState.Paused; + QueueStateChangedInternal(QueueState); } #endregion @@ -714,7 +722,6 @@ EventsHelper.UnsafeFire(QueueStateChanged, qs); } - private void ThreadFinishedInternal(KeyValuePair> finisheditem) { {