// Copyright (C) 2024, The Duplicati Team
// https://duplicati.com, hello@duplicati.com
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
// TODO: Delete this class.
// It is essentially a queue that is processed by a worker thread, and can be implemented using a BlockingCollection or similar.
namespace Duplicati.Library.Utility
{
///
/// Class to encapsulate a thread that runs a list of queued operations
///
/// The type to operate on
public class WorkerThread where Tx : class
{
///
/// Locking object for shared data
///
private readonly object m_lock = new object();
///
/// The wait event
///
private readonly AutoResetEvent m_event;
///
/// The internal list of tasks to perform
///
private Queue m_tasks;
///
/// A flag used to terminate the thread
///
private volatile bool m_terminate;
///
/// The coordinating thread
///
private Thread m_thread;
///
/// A value indicating if the coordinating thread is running
///
private volatile bool m_active;
///
/// The current task being processed
///
private Tx m_currentTask;
///
/// A callback that performs the actual work on the item
///
private readonly Action m_delegate;
///
/// An event that is raised when the runner state changes
///
public event Action, RunState> WorkerStateChanged;
///
/// Event that occurs when a new operation is being processed
///
public event Action, Tx> StartingWork;
///
/// Event that occurs when an operation has completed
///
public event Action, Tx> CompletedWork;
///
/// Event that occurs when an error is detected
///
public event Action, Tx, Exception> OnError;
///
/// An event that occurs when a new task is added to the queue or an existing one is removed
///
public event Action> WorkQueueChanged;
///
/// The internal state
///
private volatile RunState m_state;
///
/// The states the scheduler can take
///
public enum RunState
{
///
/// The program is running as normal
///
Run,
///
/// The program is suspended by the user
///
Paused
}
///
/// Constructs a new WorkerThread
///
/// The callback that performs the work
public WorkerThread(Action item, bool paused)
{
m_delegate = item;
m_event = new AutoResetEvent(paused);
m_terminate = false;
m_tasks = new Queue();
m_state = paused ? WorkerThread.RunState.Paused : WorkerThread.RunState.Run;
m_thread = new Thread(new ThreadStart(Runner));
m_thread.IsBackground = true;
m_thread.Name = "WorkerThread<" + typeof(Tx).Name + ">";
m_thread.Start();
}
///
/// Gets a copy of the current queue
///
public List CurrentTasks
{
get
{
lock (m_lock)
return new List(m_tasks);
}
}
///
/// Gets a value indicating if the worker is running
///
public bool Active
{
get { return m_active; }
}
///
/// Adds a task to the queue
///
/// The task to add
public void AddTask(Tx task)
{
lock (m_lock)
{
m_tasks.Enqueue(task);
m_event.Set();
}
if (WorkQueueChanged != null)
WorkQueueChanged(this);
}
///
/// An overloaded AddTask method that allows a task to skip to the front of a queue
/// It does this by creating a new queue, adding the new task first, and then adding
/// all the old tasks to the new queue. It's cleaner to use a linked list,
/// but the performance difference is negligible on such a small queue.
///
/// Task.
/// If set to true skip queue.
public void AddTask(Tx task, bool skipQueue)
{
if (!skipQueue)
{
// Fall back to default AddTask method
AddTask(task);
return;
}
lock (m_lock)
{
Queue newQueue = new Queue();
newQueue.Enqueue(task);
while (m_tasks.Count > 0)
{
Tx n = m_tasks.Dequeue();
newQueue.Enqueue(n);
}
m_tasks = newQueue;
m_event.Set();
}
if (WorkQueueChanged != null)
WorkQueueChanged(this);
}
///
/// Removes a task from the queue, does not remove the task if it is currently running
///
/// The task to remove
public void RemoveTask(Tx task)
{
lock (m_lock)
{
Queue tmp = new Queue();
while (m_tasks.Count > 0)
{
Tx n = m_tasks.Dequeue();
if (n != task)
tmp.Enqueue(n);
}
m_tasks = tmp;
}
if (WorkQueueChanged != null)
WorkQueueChanged(this);
}
///
/// This will clear the pending queue
/// True if the current running thread should be aborted
///
public void ClearQueue(bool abortThread)
{
lock (m_lock)
m_tasks.Clear();
if (abortThread)
{
try
{
m_thread.Interrupt();
m_thread.Join(500);
}
catch
{
}
m_thread = new Thread(new ThreadStart(Runner));
m_thread.Start();
}
}
///
/// Gets a reference to the currently executing task.
/// BEWARE: This is not protected by a mutex, DO NOT MODIFY IT!!!!
///
public Tx CurrentTask
{
get
{
return m_currentTask;
}
}
///
/// Terminates the thread. Any items still in queue will be removed
///
/// True if the call should block until the thread has exited, false otherwise
public void Terminate(bool wait)
{
m_terminate = true;
m_event.Set();
if (wait)
m_thread.Join();
}
///
/// This is the thread entry point
///
private void Runner()
{
while (!m_terminate)
{
m_currentTask = null;
lock (m_lock)
if (m_state == WorkerThread.RunState.Run && m_tasks.Count > 0)
m_currentTask = m_tasks.Dequeue();
if (m_currentTask == null && !m_terminate)
{
if (m_state == WorkerThread.RunState.Run)
m_event.WaitOne(); //Sleep until signaled
else
{
if (WorkerStateChanged != null)
WorkerStateChanged(this, m_state);
//Sleep for brief periods, until signaled
while (!m_terminate && m_state != WorkerThread.RunState.Run)
m_event.WaitOne(1000 * 60 * 5, false);
//If we were not terminated, we are now ready to run
if (!m_terminate)
{
m_state = WorkerThread.RunState.Run;
if (WorkerStateChanged != null)
WorkerStateChanged(this, m_state);
}
}
}
if (m_terminate)
return;
if (m_currentTask == null && m_state == WorkerThread.RunState.Run)
lock (m_lock)
if (m_tasks.Count > 0)
m_currentTask = m_tasks.Dequeue();
if (m_currentTask == null)
continue;
if (StartingWork != null)
StartingWork(this, m_currentTask);
try
{
m_active = true;
m_delegate(m_currentTask);
}
catch (Exception ex)
{
try { System.Threading.Thread.ResetAbort(); }
catch { }
if (OnError != null)
try { OnError(this, m_currentTask, ex); }
catch { }
}
finally
{
try { System.Threading.Thread.ResetAbort(); }
catch { }
m_active = false;
}
var task = m_currentTask;
m_currentTask = null;
if (CompletedWork != null)
try { CompletedWork(this, task); }
catch (Exception ex)
{
try { OnError(this, task, ex); }
catch { }
}
}
}
///
/// Gets the current run state
///
public RunState State { get { return m_state; } }
///
/// Instructs Duplicati to run scheduled backups
///
public void Resume()
{
m_state = RunState.Run;
m_event.Set();
}
///
/// Instructs Duplicati to pause scheduled backups
///
public void Pause()
{
m_state = RunState.Paused;
m_event.Set();
}
///
/// Waits the specified number of milliseconds for the thread to terminate
///
/// The number of milliseconds to wait
/// True if the thread is terminated, false if a timeout occured
public bool Join(int millisecondTimeout)
{
if (m_thread != null)
return m_thread.Join(millisecondTimeout);
return true;
}
}
}