// Copyright (C) 2024, The Duplicati Team // https://duplicati.com, hello@duplicati.com // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. using System; using System.Collections.Generic; using System.Text; using System.Threading; // TODO: Delete this class. // It is essentially a queue that is processed by a worker thread, and can be implemented using a BlockingCollection or similar. namespace Duplicati.Library.Utility { /// /// Class to encapsulate a thread that runs a list of queued operations /// /// The type to operate on public class WorkerThread where Tx : class { /// /// Locking object for shared data /// private readonly object m_lock = new object(); /// /// The wait event /// private readonly AutoResetEvent m_event; /// /// The internal list of tasks to perform /// private Queue m_tasks; /// /// A flag used to terminate the thread /// private volatile bool m_terminate; /// /// The coordinating thread /// private Thread m_thread; /// /// A value indicating if the coordinating thread is running /// private volatile bool m_active; /// /// The current task being processed /// private Tx m_currentTask; /// /// A callback that performs the actual work on the item /// private readonly Action m_delegate; /// /// An event that is raised when the runner state changes /// public event Action, RunState> WorkerStateChanged; /// /// Event that occurs when a new operation is being processed /// public event Action, Tx> StartingWork; /// /// Event that occurs when an operation has completed /// public event Action, Tx> CompletedWork; /// /// Event that occurs when an error is detected /// public event Action, Tx, Exception> OnError; /// /// An event that occurs when a new task is added to the queue or an existing one is removed /// public event Action> WorkQueueChanged; /// /// The internal state /// private volatile RunState m_state; /// /// The states the scheduler can take /// public enum RunState { /// /// The program is running as normal /// Run, /// /// The program is suspended by the user /// Paused } /// /// Constructs a new WorkerThread /// /// The callback that performs the work public WorkerThread(Action item, bool paused) { m_delegate = item; m_event = new AutoResetEvent(paused); m_terminate = false; m_tasks = new Queue(); m_state = paused ? WorkerThread.RunState.Paused : WorkerThread.RunState.Run; m_thread = new Thread(new ThreadStart(Runner)); m_thread.IsBackground = true; m_thread.Name = "WorkerThread<" + typeof(Tx).Name + ">"; m_thread.Start(); } /// /// Gets a copy of the current queue /// public List CurrentTasks { get { lock (m_lock) return new List(m_tasks); } } /// /// Gets a value indicating if the worker is running /// public bool Active { get { return m_active; } } /// /// Adds a task to the queue /// /// The task to add public void AddTask(Tx task) { lock (m_lock) { m_tasks.Enqueue(task); m_event.Set(); } if (WorkQueueChanged != null) WorkQueueChanged(this); } /// /// An overloaded AddTask method that allows a task to skip to the front of a queue /// It does this by creating a new queue, adding the new task first, and then adding /// all the old tasks to the new queue. It's cleaner to use a linked list, /// but the performance difference is negligible on such a small queue. /// /// Task. /// If set to true skip queue. public void AddTask(Tx task, bool skipQueue) { if (!skipQueue) { // Fall back to default AddTask method AddTask(task); return; } lock (m_lock) { Queue newQueue = new Queue(); newQueue.Enqueue(task); while (m_tasks.Count > 0) { Tx n = m_tasks.Dequeue(); newQueue.Enqueue(n); } m_tasks = newQueue; m_event.Set(); } if (WorkQueueChanged != null) WorkQueueChanged(this); } /// /// Removes a task from the queue, does not remove the task if it is currently running /// /// The task to remove public void RemoveTask(Tx task) { lock (m_lock) { Queue tmp = new Queue(); while (m_tasks.Count > 0) { Tx n = m_tasks.Dequeue(); if (n != task) tmp.Enqueue(n); } m_tasks = tmp; } if (WorkQueueChanged != null) WorkQueueChanged(this); } /// /// This will clear the pending queue /// True if the current running thread should be aborted /// public void ClearQueue(bool abortThread) { lock (m_lock) m_tasks.Clear(); if (abortThread) { try { m_thread.Interrupt(); m_thread.Join(500); } catch { } m_thread = new Thread(new ThreadStart(Runner)); m_thread.Start(); } } /// /// Gets a reference to the currently executing task. /// BEWARE: This is not protected by a mutex, DO NOT MODIFY IT!!!! /// public Tx CurrentTask { get { return m_currentTask; } } /// /// Terminates the thread. Any items still in queue will be removed /// /// True if the call should block until the thread has exited, false otherwise public void Terminate(bool wait) { m_terminate = true; m_event.Set(); if (wait) m_thread.Join(); } /// /// This is the thread entry point /// private void Runner() { while (!m_terminate) { m_currentTask = null; lock (m_lock) if (m_state == WorkerThread.RunState.Run && m_tasks.Count > 0) m_currentTask = m_tasks.Dequeue(); if (m_currentTask == null && !m_terminate) { if (m_state == WorkerThread.RunState.Run) m_event.WaitOne(); //Sleep until signaled else { if (WorkerStateChanged != null) WorkerStateChanged(this, m_state); //Sleep for brief periods, until signaled while (!m_terminate && m_state != WorkerThread.RunState.Run) m_event.WaitOne(1000 * 60 * 5, false); //If we were not terminated, we are now ready to run if (!m_terminate) { m_state = WorkerThread.RunState.Run; if (WorkerStateChanged != null) WorkerStateChanged(this, m_state); } } } if (m_terminate) return; if (m_currentTask == null && m_state == WorkerThread.RunState.Run) lock (m_lock) if (m_tasks.Count > 0) m_currentTask = m_tasks.Dequeue(); if (m_currentTask == null) continue; if (StartingWork != null) StartingWork(this, m_currentTask); try { m_active = true; m_delegate(m_currentTask); } catch (Exception ex) { try { System.Threading.Thread.ResetAbort(); } catch { } if (OnError != null) try { OnError(this, m_currentTask, ex); } catch { } } finally { try { System.Threading.Thread.ResetAbort(); } catch { } m_active = false; } var task = m_currentTask; m_currentTask = null; if (CompletedWork != null) try { CompletedWork(this, task); } catch (Exception ex) { try { OnError(this, task, ex); } catch { } } } } /// /// Gets the current run state /// public RunState State { get { return m_state; } } /// /// Instructs Duplicati to run scheduled backups /// public void Resume() { m_state = RunState.Run; m_event.Set(); } /// /// Instructs Duplicati to pause scheduled backups /// public void Pause() { m_state = RunState.Paused; m_event.Set(); } /// /// Waits the specified number of milliseconds for the thread to terminate /// /// The number of milliseconds to wait /// True if the thread is terminated, false if a timeout occured public bool Join(int millisecondTimeout) { if (m_thread != null) return m_thread.Join(millisecondTimeout); return true; } } }