mirror of
https://github.com/duplicati/duplicati.git
synced 2026-05-09 16:49:35 -04:00
Fixed channels to use type-safe initializations.
Updated logging channel to use a wrapper for later early-filter optimizations. Various bug fixes and optimizations.
This commit is contained in:
@@ -133,6 +133,9 @@
|
||||
<Compile Include="Operation\Backup\BackendUploader.cs" />
|
||||
<Compile Include="Operation\Common\IndexVolumeCreator.cs" />
|
||||
<Compile Include="Operation\Backup\UploadSyntheticFilelist.cs" />
|
||||
<Compile Include="Operation\Backup\Channels.cs" />
|
||||
<Compile Include="Operation\Common\Channels.cs" />
|
||||
<Compile Include="Operation\Common\LogWrapper.cs" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Utility\Duplicati.Library.Utility.csproj">
|
||||
|
||||
@@ -23,57 +23,124 @@ using System.Collections.Generic;
|
||||
|
||||
namespace Duplicati.Library.Main.Operation.Backup
|
||||
{
|
||||
internal class UploadRequest
|
||||
internal interface IUploadRequest
|
||||
{
|
||||
}
|
||||
|
||||
internal class FlushRequest : IUploadRequest
|
||||
{
|
||||
public Task<long> LastWriteSizeAync { get { return m_tcs.Task; } }
|
||||
private TaskCompletionSource<long> m_tcs = new TaskCompletionSource<long>();
|
||||
public void SetFlushed(long size)
|
||||
{
|
||||
m_tcs.TrySetResult(size);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
internal class IndexVolumeUploadRequest : IUploadRequest
|
||||
{
|
||||
public IndexVolumeWriter IndexVolume { get; private set; }
|
||||
|
||||
public IndexVolumeUploadRequest(IndexVolumeWriter indexVolume)
|
||||
{
|
||||
IndexVolume = indexVolume;
|
||||
}
|
||||
}
|
||||
|
||||
internal class FilesetUploadRequest : IUploadRequest
|
||||
{
|
||||
public FilesetVolumeWriter Fileset { get; private set; }
|
||||
|
||||
public FilesetUploadRequest(FilesetVolumeWriter fileset)
|
||||
{
|
||||
Fileset = fileset;
|
||||
}
|
||||
}
|
||||
|
||||
internal class VolumeUploadRequest : IUploadRequest
|
||||
{
|
||||
public BlockVolumeWriter BlockVolume { get; private set; }
|
||||
public IndexVolumeWriter IndexVolume { get; private set; }
|
||||
|
||||
public UploadRequest(BlockVolumeWriter blockvolume, IndexVolumeWriter indexvolume)
|
||||
public VolumeUploadRequest(BlockVolumeWriter blockvolume, IndexVolumeWriter indexvolume)
|
||||
{
|
||||
BlockVolume = blockvolume;
|
||||
IndexVolume = indexvolume;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
internal static class BackendUploader
|
||||
{
|
||||
public static Task Run(Common.BackendHandler backend, Options options, Common.DatabaseCommon database, BackupResults results)
|
||||
{
|
||||
return AutomationExtensions.RunTask(new
|
||||
{
|
||||
Input = ChannelMarker.ForRead<UploadRequest>("BackendRequests"),
|
||||
},
|
||||
{
|
||||
Input = Channels.BackendRequest.ForRead,
|
||||
},
|
||||
|
||||
async self =>
|
||||
async self =>
|
||||
{
|
||||
var inProgress = new Queue<KeyValuePair<int, Task>>();
|
||||
var max_pending = options.AsynchronousUploadLimit == 0 ? long.MaxValue : options.AsynchronousUploadLimit;
|
||||
var noIndexFiles = options.IndexfilePolicy == Options.IndexFileStrategy.None;
|
||||
var active = 0;
|
||||
var lastSize = -1L;
|
||||
|
||||
while(!self.Input.IsRetired)
|
||||
{
|
||||
var inProgress = new Queue<Task>();
|
||||
var max_pending = options.AsynchronousUploadLimit == 0 ? long.MaxValue : options.AsynchronousUploadLimit;
|
||||
if (options.IndexfilePolicy != Options.IndexFileStrategy.None)
|
||||
max_pending = max_pending / 2;
|
||||
|
||||
while(!self.Input.IsRetired)
|
||||
try
|
||||
{
|
||||
try
|
||||
var req = await self.Input.ReadAsync();
|
||||
KeyValuePair<int, Task> task = default(KeyValuePair<int, Task>);
|
||||
if (req is VolumeUploadRequest)
|
||||
{
|
||||
var req = await self.Input.ReadAsync();
|
||||
inProgress.Enqueue(backend.UploadFileAsync(req.BlockVolume, name => IndexVolumeCreator.CreateIndexVolume(name, options, database)));
|
||||
lastSize = ((VolumeUploadRequest)req).BlockVolume.SourceSize;
|
||||
|
||||
if (noIndexFiles)
|
||||
task = new KeyValuePair<int, Task>(1, backend.UploadFileAsync(((VolumeUploadRequest)req).BlockVolume, null));
|
||||
else
|
||||
task = new KeyValuePair<int, Task>(2, backend.UploadFileAsync(((VolumeUploadRequest)req).BlockVolume, name => IndexVolumeCreator.CreateIndexVolume(name, options, database)));
|
||||
}
|
||||
catch(Exception ex)
|
||||
else if (req is FilesetUploadRequest)
|
||||
task = new KeyValuePair<int, Task>(1, backend.UploadFileAsync(((FilesetUploadRequest)req).Fileset));
|
||||
else if (req is IndexVolumeUploadRequest)
|
||||
task = new KeyValuePair<int, Task>(1, backend.UploadFileAsync(((IndexVolumeUploadRequest)req).IndexVolume));
|
||||
else if (req is FlushRequest)
|
||||
{
|
||||
if (!ex.IsRetiredException())
|
||||
throw;
|
||||
while(inProgress.Count > 0)
|
||||
await inProgress.Dequeue().Value;
|
||||
active = 0;
|
||||
|
||||
((FlushRequest)req).SetFlushed(lastSize);
|
||||
}
|
||||
|
||||
if (task.Value != null)
|
||||
{
|
||||
inProgress.Enqueue(task);
|
||||
active += task.Key;
|
||||
}
|
||||
|
||||
while(inProgress.Count >= max_pending)
|
||||
await inProgress.Dequeue();
|
||||
}
|
||||
|
||||
results.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_WaitForUpload);
|
||||
|
||||
while(inProgress.Count > 0)
|
||||
await inProgress.Dequeue();
|
||||
catch(Exception ex)
|
||||
{
|
||||
if (!ex.IsRetiredException())
|
||||
throw;
|
||||
}
|
||||
|
||||
while(active >= max_pending)
|
||||
{
|
||||
var top = inProgress.Dequeue();
|
||||
await top.Value;
|
||||
active -= top.Key;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
results.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_WaitForUpload);
|
||||
|
||||
while(inProgress.Count > 0)
|
||||
await inProgress.Dequeue().Value;
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
// Copyright (C) 2015, The Duplicati Team
|
||||
// http://www.duplicati.com, info@duplicati.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as
|
||||
// published by the Free Software Foundation; either version 2.1 of the
|
||||
// License, or (at your option) any later version.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful, but
|
||||
// WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public
|
||||
// License along with this library; if not, write to the Free Software
|
||||
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
using System;
|
||||
using CoCoL;
|
||||
|
||||
namespace Duplicati.Library.Main.Operation.Backup
|
||||
{
|
||||
internal static class Channels
|
||||
{
|
||||
public static readonly ChannelMarkerWrapper<IUploadRequest> BackendRequest = new ChannelMarkerWrapper<IUploadRequest>("BackendRequests");
|
||||
public static readonly ChannelMarkerWrapper<VolumeUploadRequest> SpillPickup = new ChannelMarkerWrapper<VolumeUploadRequest>("SpillPickup");
|
||||
public static readonly ChannelMarkerWrapper<DataBlock> OutputBlocks = new ChannelMarkerWrapper<DataBlock>("OutputBlocks");
|
||||
public static readonly ChannelMarkerWrapper<MetadataPreProcess.FileEntry> AcceptedChangedFile = new ChannelMarkerWrapper<MetadataPreProcess.FileEntry>("AcceptedChangedFile");
|
||||
public static readonly ChannelMarkerWrapper<MetadataPreProcess.FileEntry> ProcessedFiles = new ChannelMarkerWrapper<MetadataPreProcess.FileEntry>("ProcessedFiles");
|
||||
public static readonly ChannelMarkerWrapper<string> SourcePaths = new ChannelMarkerWrapper<string>("SourcePaths");
|
||||
public static readonly ChannelMarkerWrapper<Common.ProgressEvent> ProgressEvents = new ChannelMarkerWrapper<Common.ProgressEvent>("ProgressEvents");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ using CoCoL;
|
||||
|
||||
namespace Duplicati.Library.Main.Operation.Backup
|
||||
{
|
||||
public struct DataBlock
|
||||
internal struct DataBlock
|
||||
{
|
||||
public string HashKey;
|
||||
public byte[] Data;
|
||||
|
||||
@@ -33,91 +33,91 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
public static Task Run(BackupDatabase database, Options options)
|
||||
{
|
||||
return AutomationExtensions.RunTask(
|
||||
new
|
||||
{
|
||||
LogChannel = ChannelMarker.ForWrite<LogMessage>("LogChannel"),
|
||||
Input = ChannelMarker.ForRead<DataBlock>("OutputBlocks"),
|
||||
Output = ChannelMarker.ForWrite<UploadRequest>("BackendRequests"),
|
||||
SpillPickup = ChannelMarker.ForWrite<UploadRequest>("SpillPickup"),
|
||||
},
|
||||
new
|
||||
{
|
||||
LogChannel = Common.Channels.LogChannel.ForWrite,
|
||||
Input = Channels.OutputBlocks.ForRead,
|
||||
Output = Channels.BackendRequest.ForWrite,
|
||||
SpillPickup = Channels.SpillPickup.ForWrite,
|
||||
},
|
||||
|
||||
async self =>
|
||||
{
|
||||
BlockVolumeWriter blockvolume = null;
|
||||
async self =>
|
||||
{
|
||||
BlockVolumeWriter blockvolume = null;
|
||||
var log = new LogWrapper(self.LogChannel);
|
||||
|
||||
try
|
||||
try
|
||||
{
|
||||
while(true)
|
||||
{
|
||||
while(true)
|
||||
var b = await self.Input.ReadAsync();
|
||||
|
||||
// Lazy-start a new block volume
|
||||
if (blockvolume == null)
|
||||
{
|
||||
var b = await self.Input.ReadAsync();
|
||||
|
||||
// Lazy-start a new block volume
|
||||
if (blockvolume == null)
|
||||
// Before we start a new volume, probe to see if it exists
|
||||
// This will delay creation of volumes for differential backups
|
||||
// There can be a race, such that two workers determine that
|
||||
// the block is missing, but this will be solved by the AddBlock call
|
||||
// which runs atomically
|
||||
if (await database.FindBlockIDAsync(b.HashKey, b.Size) >= 0)
|
||||
{
|
||||
// Before we start a new volume, probe to see if it exists
|
||||
// This will delay creation of volumes for differential backups
|
||||
// There can be a race, such that two workers determine that
|
||||
// the block is missing, but this will be solved by the AddBlock call
|
||||
// which runs atomically
|
||||
if (await database.FindBlockIDAsync(b.HashKey, b.Size) >= 0)
|
||||
{
|
||||
b.TaskCompletion.TrySetResult(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
blockvolume = new BlockVolumeWriter(options);
|
||||
blockvolume.VolumeID = await database.RegisterRemoteVolumeAsync(blockvolume.RemoteFilename, RemoteVolumeType.Blocks, RemoteVolumeState.Temporary);
|
||||
b.TaskCompletion.TrySetResult(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
var newBlock = await database.AddBlockAsync(b.HashKey, b.Size, blockvolume.VolumeID);
|
||||
b.TaskCompletion.TrySetResult(newBlock);
|
||||
|
||||
if (newBlock)
|
||||
{
|
||||
|
||||
blockvolume.AddBlock(b.HashKey, b.Data, b.Offset, (int)b.Size, b.Hint);
|
||||
|
||||
if (blockvolume.Filesize > options.VolumeSize - options.Blocksize)
|
||||
{
|
||||
if (options.Dryrun)
|
||||
{
|
||||
blockvolume.Close();
|
||||
await self.LogChannel.WriteAsync(LogMessage.DryRun("Would upload block volume: {0}, size: {1}", blockvolume.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(blockvolume.LocalFilename).Length)));
|
||||
|
||||
blockvolume.Dispose();
|
||||
blockvolume = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
//When uploading a new volume, we register the volumes and then flush the transaction
|
||||
// this ensures that the local database and remote storage are as closely related as possible
|
||||
await database.UpdateRemoteVolumeAsync(blockvolume.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
|
||||
|
||||
blockvolume.Close();
|
||||
|
||||
await database.CommitTransactionAsync("CommitAddBlockToOutputFlush");
|
||||
|
||||
await self.Output.WriteAsync(new UploadRequest(blockvolume, null));
|
||||
blockvolume = null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
blockvolume = new BlockVolumeWriter(options);
|
||||
blockvolume.VolumeID = await database.RegisterRemoteVolumeAsync(blockvolume.RemoteFilename, RemoteVolumeType.Blocks, RemoteVolumeState.Temporary);
|
||||
}
|
||||
}
|
||||
catch(Exception ex)
|
||||
{
|
||||
if (ex.IsRetiredException())
|
||||
|
||||
var newBlock = await database.AddBlockAsync(b.HashKey, b.Size, blockvolume.VolumeID);
|
||||
b.TaskCompletion.TrySetResult(newBlock);
|
||||
|
||||
if (newBlock)
|
||||
{
|
||||
// If we have collected data, merge all pending volumes into a single volume
|
||||
if (blockvolume != null && blockvolume.SourceSize > 0)
|
||||
await self.SpillPickup.WriteAsync(new UploadRequest(blockvolume, null));
|
||||
}
|
||||
|
||||
throw;
|
||||
blockvolume.AddBlock(b.HashKey, b.Data, b.Offset, (int)b.Size, b.Hint);
|
||||
|
||||
if (blockvolume.Filesize > options.VolumeSize - options.Blocksize)
|
||||
{
|
||||
if (options.Dryrun)
|
||||
{
|
||||
blockvolume.Close();
|
||||
await log.WriteDryRunAsync("Would upload block volume: {0}, size: {1}", blockvolume.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(blockvolume.LocalFilename).Length));
|
||||
|
||||
blockvolume.Dispose();
|
||||
blockvolume = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
//When uploading a new volume, we register the volumes and then flush the transaction
|
||||
// this ensures that the local database and remote storage are as closely related as possible
|
||||
await database.UpdateRemoteVolumeAsync(blockvolume.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
|
||||
|
||||
blockvolume.Close();
|
||||
|
||||
await database.CommitTransactionAsync("CommitAddBlockToOutputFlush");
|
||||
|
||||
await self.Output.WriteAsync(new VolumeUploadRequest(blockvolume, null));
|
||||
blockvolume = null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
catch(Exception ex)
|
||||
{
|
||||
if (ex.IsRetiredException())
|
||||
{
|
||||
// If we have collected data, merge all pending volumes into a single volume
|
||||
if (blockvolume != null && blockvolume.SourceSize > 0)
|
||||
await self.SpillPickup.WriteAsync(new VolumeUploadRequest(blockvolume, null));
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -36,14 +36,15 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
return AutomationExtensions.RunTask(
|
||||
new
|
||||
{
|
||||
Input = ChannelMarker.ForRead<MetadataPreProcess.FileEntry>("AcceptedChangedFile"),
|
||||
LogChannel = ChannelMarker.ForWrite<LogMessage>("LogChannel"),
|
||||
ProgressChannel = ChannelMarker.ForWrite<ProgressEvent>("ProgressChannel"),
|
||||
BlockOutput = ChannelMarker.ForWrite<DataBlock>("OutputBlocks")
|
||||
Input = Channels.AcceptedChangedFile.ForRead,
|
||||
LogChannel = Common.Channels.LogChannel.ForWrite,
|
||||
ProgressChannel = Channels.ProgressEvents.ForWrite,
|
||||
BlockOutput = Channels.OutputBlocks.ForWrite
|
||||
},
|
||||
|
||||
async self =>
|
||||
{
|
||||
var log = new LogWrapper(self.LogChannel);
|
||||
var blocksize = options.Blocksize;
|
||||
var filehasher = System.Security.Cryptography.HashAlgorithm.Create(options.FileHashAlgorithm);
|
||||
var blockhasher = System.Security.Cryptography.HashAlgorithm.Create(options.BlockHashAlgorithm);
|
||||
@@ -69,7 +70,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
{
|
||||
long fslen = -1;
|
||||
try { fslen = fs.Length; }
|
||||
catch (Exception ex) { await self.LogChannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read file length for file {0}", e.Path), ex)); }
|
||||
catch (Exception ex) { await log.WriteWarningAsync(string.Format("Failed to read file length for file {0}", e.Path), ex); }
|
||||
|
||||
await self.ProgressChannel.WriteAsync(new ProgressEvent() { Filepath = e.Path, Length = fslen, Type = EventType.FileStarted });
|
||||
send_close = true;
|
||||
@@ -133,36 +134,36 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
if (oldHash != filekey)
|
||||
{
|
||||
if (oldHash == null)
|
||||
await self.LogChannel.WriteAsync(LogMessage.Verbose("New file {0}", e.Path));
|
||||
await log.WriteVerboseAsync("New file {0}", e.Path);
|
||||
else
|
||||
await self.LogChannel.WriteAsync(LogMessage.Verbose("File has changed {0}", e.Path));
|
||||
await log.WriteVerboseAsync("File has changed {0}", e.Path);
|
||||
|
||||
if (e.OldId < 0)
|
||||
{
|
||||
await stats.AddAddedFile(filesize);
|
||||
|
||||
if (options.Dryrun)
|
||||
await self.LogChannel.WriteAsync(LogMessage.DryRun("Would add new file {0}, size {1}", e.Path, Library.Utility.Utility.FormatSizeString(filesize)));
|
||||
await log.WriteDryRunAsync("Would add new file {0}, size {1}", e.Path, Library.Utility.Utility.FormatSizeString(filesize));
|
||||
}
|
||||
else
|
||||
{
|
||||
await stats.AddModifiedFile(filesize);
|
||||
|
||||
if (options.Dryrun)
|
||||
await self.LogChannel.WriteAsync(LogMessage.DryRun("Would add changed file {0}, size {1}", e.Path, Library.Utility.Utility.FormatSizeString(filesize)));
|
||||
await log.WriteDryRunAsync("Would add changed file {0}, size {1}", e.Path, Library.Utility.Utility.FormatSizeString(filesize));
|
||||
}
|
||||
|
||||
await AddFileToOutputAsync(e.Path, filesize, e.LastWrite, e.MetaHashAndSize, hashcollector, filekey, blocklisthashes, self.BlockOutput, blocksize, database);
|
||||
}
|
||||
else if (e.MetadataChanged)
|
||||
{
|
||||
await self.LogChannel.WriteAsync(LogMessage.Verbose("File has only metadata changes {0}", e.Path));
|
||||
await log.WriteVerboseAsync("File has only metadata changes {0}", e.Path);
|
||||
await AddFileToOutputAsync(e.Path, filesize, e.LastWrite, e.MetaHashAndSize, hashcollector, filekey, blocklisthashes, self.BlockOutput, blocksize, database);
|
||||
}
|
||||
else
|
||||
{
|
||||
// When we write the file to output, update the last modified time
|
||||
await self.LogChannel.WriteAsync(LogMessage.Verbose("File has not changed {0}", e.Path));
|
||||
await log.WriteVerboseAsync("File has not changed {0}", e.Path);
|
||||
await database.AddUnmodifiedAsync(e.OldId, e.LastWrite);
|
||||
}
|
||||
}
|
||||
@@ -172,7 +173,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
if (ex.IsRetiredException())
|
||||
return;
|
||||
else
|
||||
await self.LogChannel.WriteAsync(LogMessage.Warning(string.Format("Failed to process file {0}", e.Path), ex));
|
||||
await log.WriteWarningAsync(string.Format("Failed to process file {0}", e.Path), ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
||||
@@ -42,11 +42,10 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
private Queue<string> m_mixinqueue;
|
||||
private string[] m_changedfilelist;
|
||||
|
||||
[ChannelName("LogChannel")]
|
||||
private IWriteChannel<LogMessage> m_logchannel;
|
||||
|
||||
[ChannelName("SourcePaths")]
|
||||
private IWriteChannel<string> m_output;
|
||||
private IWriteChannel<LogMessage> m_logchannel = Common.Channels.LogChannel.ForWrite;
|
||||
private IWriteChannel<string> m_output = Backup.Channels.SourcePaths.ForWrite;
|
||||
private LogWrapper m_log;
|
||||
|
||||
|
||||
public FileEnumerationProcess(Snapshots.ISnapshotService snapshot, FileAttributes attributeFilter, Duplicati.Library.Utility.IFilter sourcefilter, Duplicati.Library.Utility.IFilter filter, Options.SymlinkStrategy symlinkPolicy, Options.HardlinkStrategy hardlinkPolicy, string[] changedfilelist)
|
||||
@@ -61,6 +60,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
m_hardlinkmap = new Dictionary<string, string>();
|
||||
m_mixinqueue = new Queue<string>();
|
||||
m_changedfilelist = changedfilelist;
|
||||
m_log = new LogWrapper(m_logchannel);
|
||||
|
||||
bool includes;
|
||||
bool excludes;
|
||||
@@ -93,13 +93,13 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
{
|
||||
if (m_snapshot.IsBlockDevice(path))
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding block device: {0}", path));
|
||||
await m_log.WriteVerboseAsync("Excluding block device: {0}", path);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to process path: {0}", path), ex));
|
||||
await m_log.WriteWarningAsync(string.Format("Failed to process path: {0}", path), ex);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
bool sourcematches;
|
||||
if (m_sourcefilter.Matches(path, out sourcematches, out sourcematch) && sourcematches)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Including source path: {0}", path));
|
||||
await m_log.WriteVerboseAsync("Including source path: {0}", path);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -122,7 +122,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
{
|
||||
if (m_hardlinkPolicy == Options.HardlinkStrategy.None)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding hardlink: {0} ({1})", path, id));
|
||||
await m_log.WriteVerboseAsync("Excluding hardlink: {0} ({1})", path, id);
|
||||
return false;
|
||||
}
|
||||
else if (m_hardlinkPolicy == Options.HardlinkStrategy.First)
|
||||
@@ -130,7 +130,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
string prevPath;
|
||||
if (m_hardlinkmap.TryGetValue(id, out prevPath))
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding hardlink ({1}) for: {0}, previous hardlink: {2}", path, id, prevPath));
|
||||
await m_log.WriteVerboseAsync("Excluding hardlink ({1}) for: {0}, previous hardlink: {2}", path, id, prevPath);
|
||||
return false;
|
||||
}
|
||||
else
|
||||
@@ -142,7 +142,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to process path: {0}", path), ex));
|
||||
await m_log.WriteWarningAsync(string.Format("Failed to process path: {0}", path), ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -150,7 +150,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
// If we exclude files based on attributes, filter that
|
||||
if ((m_attributeFilter & attributes) != 0)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding path due to attribute filter: {0}", path));
|
||||
await m_log.WriteVerboseAsync("Excluding path due to attribute filter: {0}", path);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -158,25 +158,25 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
Library.Utility.IFilter match;
|
||||
if (!Library.Utility.FilterExpression.Matches(m_enumeratefilter, path, out match))
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding path due to filter: {0} => {1}", path, match == null ? "null" : match.ToString()));
|
||||
await m_log.WriteVerboseAsync("Excluding path due to filter: {0} => {1}", path, match == null ? "null" : match.ToString());
|
||||
return false;
|
||||
}
|
||||
else if (match != null)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Including path due to filter: {0} => {1}", path, match.ToString()));
|
||||
await m_log.WriteVerboseAsync("Including path due to filter: {0} => {1}", path, match.ToString());
|
||||
}
|
||||
|
||||
// If the file is a symlink, apply special handling
|
||||
var isSymlink = (attributes & FileAttributes.ReparsePoint) == FileAttributes.ReparsePoint;
|
||||
if (isSymlink && m_symlinkPolicy == Options.SymlinkStrategy.Ignore)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding symlink: {0}", path));
|
||||
await m_log.WriteVerboseAsync("Excluding symlink: {0}", path);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isSymlink && m_symlinkPolicy == Options.SymlinkStrategy.Store)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Storing symlink: {0}", path));
|
||||
await m_log.WriteVerboseAsync("Storing symlink: {0}", path);
|
||||
|
||||
// We return false because we do not want to recurse into the path,
|
||||
// but we add the symlink to the mixin so we process the symlink itself
|
||||
|
||||
@@ -28,58 +28,58 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
public static Task Start(Snapshots.ISnapshotService snapshot, Options options, BackupStatsCollector stats, BackupDatabase database)
|
||||
{
|
||||
return AutomationExtensions.RunTask(
|
||||
new
|
||||
new
|
||||
{
|
||||
LogChannel = Common.Channels.LogChannel.ForWrite,
|
||||
Input = Channels.ProcessedFiles.ForRead,
|
||||
Output = Channels.AcceptedChangedFile.ForWrite
|
||||
},
|
||||
|
||||
async self =>
|
||||
{
|
||||
|
||||
var EMPTY_METADATA = Utility.WrapMetadata(new Dictionary<string, string>(), options);
|
||||
var blocksize = options.Blocksize;
|
||||
var log = new LogWrapper(self.LogChannel);
|
||||
|
||||
while (true)
|
||||
{
|
||||
LogChannel = ChannelMarker.ForWrite<LogMessage>("LogChannel"),
|
||||
Input = ChannelMarker.ForRead<MetadataPreProcess.FileEntry>("ProcessedFiles"),
|
||||
Output = ChannelMarker.ForWrite<MetadataPreProcess.FileEntry>("AcceptedChangedFile")
|
||||
},
|
||||
var e = await self.Input.ReadAsync();
|
||||
|
||||
async self =>
|
||||
{
|
||||
|
||||
var EMPTY_METADATA = Utility.WrapMetadata(new Dictionary<string, string>(), options);
|
||||
var blocksize = options.Blocksize;
|
||||
|
||||
while (true)
|
||||
long filestatsize = -1;
|
||||
try
|
||||
{
|
||||
var e = await self.Input.ReadAsync();
|
||||
filestatsize = snapshot.GetFileSize(e.Path);
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
|
||||
long filestatsize = -1;
|
||||
try
|
||||
{
|
||||
filestatsize = snapshot.GetFileSize(e.Path);
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
await stats.AddExaminedFile(filestatsize);
|
||||
|
||||
await stats.AddExaminedFile(filestatsize);
|
||||
e.MetaHashAndSize = options.StoreMetadata ? Utility.WrapMetadata(await MetadataGenerator.GenerateMetadataAsync(e.Path, e.Attributes, options, snapshot, log), options) : EMPTY_METADATA;
|
||||
|
||||
e.MetaHashAndSize = options.StoreMetadata ? Utility.WrapMetadata(await MetadataGenerator.GenerateMetadataAsync(e.Path, e.Attributes, options, snapshot, self.LogChannel), options) : EMPTY_METADATA;
|
||||
var timestampChanged = e.LastWrite != e.OldModified || e.LastWrite.Ticks == 0 || e.OldModified.Ticks == 0;
|
||||
var filesizeChanged = filestatsize < 0 || e.LastFileSize < 0 || filestatsize != e.LastFileSize;
|
||||
var tooLargeFile = options.SkipFilesLargerThan != long.MaxValue && options.SkipFilesLargerThan != 0 && filestatsize >= 0 && filestatsize > options.SkipFilesLargerThan;
|
||||
e.MetadataChanged = !options.SkipMetadata && (e.MetaHashAndSize.Size != e.OldMetaSize || e.MetaHashAndSize.Hash != e.OldMetaHash);
|
||||
|
||||
var timestampChanged = e.LastWrite != e.OldModified || e.LastWrite.Ticks == 0 || e.OldModified.Ticks == 0;
|
||||
var filesizeChanged = filestatsize < 0 || e.LastFileSize < 0 || filestatsize != e.LastFileSize;
|
||||
var tooLargeFile = options.SkipFilesLargerThan != long.MaxValue && options.SkipFilesLargerThan != 0 && filestatsize >= 0 && filestatsize > options.SkipFilesLargerThan;
|
||||
e.MetadataChanged = !options.SkipMetadata && (e.MetaHashAndSize.Size != e.OldMetaSize || e.MetaHashAndSize.Hash != e.OldMetaHash);
|
||||
|
||||
if ((e.OldId < 0 || options.DisableFiletimeCheck || timestampChanged || filesizeChanged || e.MetadataChanged) && !tooLargeFile)
|
||||
{
|
||||
await self.LogChannel.WriteAsync(LogMessage.Verbose("Checking file for changes {0}, new: {1}, timestamp changed: {2}, size changed: {3}, metadatachanged: {4}, {5} vs {6}", e.Path, e.OldId <= 0, timestampChanged, filesizeChanged, e.MetadataChanged, e.LastWrite, e.OldModified));
|
||||
await self.Output.WriteAsync(e);
|
||||
}
|
||||
if ((e.OldId < 0 || options.DisableFiletimeCheck || timestampChanged || filesizeChanged || e.MetadataChanged) && !tooLargeFile)
|
||||
{
|
||||
await log.WriteVerboseAsync("Checking file for changes {0}, new: {1}, timestamp changed: {2}, size changed: {3}, metadatachanged: {4}, {5} vs {6}", e.Path, e.OldId <= 0, timestampChanged, filesizeChanged, e.MetadataChanged, e.LastWrite, e.OldModified);
|
||||
await self.Output.WriteAsync(e);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (options.SkipFilesLargerThan == long.MaxValue || options.SkipFilesLargerThan == 0 || snapshot.GetFileSize(e.Path) < options.SkipFilesLargerThan)
|
||||
await log.WriteVerboseAsync("Skipped checking file, because timestamp was not updated {0}", e.Path);
|
||||
else
|
||||
{
|
||||
if (options.SkipFilesLargerThan == long.MaxValue || options.SkipFilesLargerThan == 0 || snapshot.GetFileSize(e.Path) < options.SkipFilesLargerThan)
|
||||
await self.LogChannel.WriteAsync(LogMessage.Verbose("Skipped checking file, because timestamp was not updated {0}", e.Path));
|
||||
else
|
||||
await self.LogChannel.WriteAsync(LogMessage.Verbose("Skipped checking file, because the size exceeds limit {0}", e.Path));
|
||||
await log.WriteVerboseAsync("Skipped checking file, because the size exceeds limit {0}", e.Path);
|
||||
|
||||
await database.AddUnmodifiedAsync(e.OldId, e.LastWrite);
|
||||
}
|
||||
await database.AddUnmodifiedAsync(e.OldId, e.LastWrite);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
{
|
||||
internal static class MetadataGenerator
|
||||
{
|
||||
public static async Task<Dictionary<string, string>> GenerateMetadataAsync(string path, System.IO.FileAttributes attributes, Options options, Snapshots.ISnapshotService snapshot, IWriteChannel<LogMessage> logchannel)
|
||||
public static async Task<Dictionary<string, string>> GenerateMetadataAsync(string path, System.IO.FileAttributes attributes, Options options, Snapshots.ISnapshotService snapshot, LogWrapper log)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -47,7 +47,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read timestamp on \"{0}\"", path), ex));
|
||||
await log.WriteWarningAsync(string.Format("Failed to read timestamp on \"{0}\"", path), ex);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read timestamp on \"{0}\"", path), ex));
|
||||
await log.WriteWarningAsync(string.Format("Failed to read timestamp on \"{0}\"", path), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -72,7 +72,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
}
|
||||
catch(Exception ex)
|
||||
{
|
||||
await logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to process metadata for \"{0}\", storing empty metadata", path), ex));
|
||||
await log.WriteWarningAsync(string.Format("Failed to process metadata for \"{0}\", storing empty metadata", path), ex);
|
||||
return new Dictionary<string, string>();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,18 +47,11 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
public bool MetadataChanged;
|
||||
}
|
||||
|
||||
[ChannelName("SourcePaths")]
|
||||
private IReadChannel<string> m_input;
|
||||
|
||||
[ChannelName("LogChannel")]
|
||||
private IWriteChannel<LogMessage> m_logchannel;
|
||||
|
||||
[ChannelName("ProcessedFiles")]
|
||||
private IWriteChannel<FileEntry> m_output;
|
||||
|
||||
[ChannelName("OutputBlocks")]
|
||||
private IWriteChannel<DataBlock> m_blockoutput;
|
||||
|
||||
private IReadChannel<string> m_input = Backup.Channels.SourcePaths.ForRead;
|
||||
private IWriteChannel<LogMessage> m_logchannel = Common.Channels.LogChannel.ForWrite;
|
||||
private IWriteChannel<FileEntry> m_output = Backup.Channels.ProcessedFiles.ForWrite;
|
||||
private IWriteChannel<DataBlock> m_blockoutput = Backup.Channels.OutputBlocks.ForWrite;
|
||||
private LogWrapper m_log;
|
||||
|
||||
private Snapshots.ISnapshotService m_snapshot;
|
||||
private Options m_options;
|
||||
@@ -72,6 +65,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
m_snapshot = snapshot;
|
||||
m_options = options;
|
||||
m_database = database;
|
||||
m_log = new LogWrapper(m_logchannel);
|
||||
m_blocksize = m_options.Blocksize;
|
||||
EMPTY_METADATA = Utility.WrapMetadata(new Dictionary<string, string>(), options);
|
||||
}
|
||||
@@ -82,13 +76,13 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
{
|
||||
if (m_options.SymlinkPolicy == Options.SymlinkStrategy.Ignore)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Ignoring symlink {0}", path));
|
||||
await m_log.WriteVerboseAsync("Ignoring symlink {0}", path);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (m_options.SymlinkPolicy == Options.SymlinkStrategy.Store)
|
||||
{
|
||||
var metadata = await MetadataGenerator.GenerateMetadataAsync(path, attributes, m_options, m_snapshot, m_logchannel);
|
||||
var metadata = await MetadataGenerator.GenerateMetadataAsync(path, attributes, m_options, m_snapshot, m_log);
|
||||
|
||||
if (!metadata.ContainsKey("CoreSymlinkTarget"))
|
||||
metadata["CoreSymlinkTarget"] = m_snapshot.GetSymlinkTarget(path);
|
||||
@@ -96,7 +90,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
var metahash = Utility.WrapMetadata(metadata, m_options);
|
||||
await AddSymlinkToOutputAsync(path, DateTime.UtcNow, metahash);
|
||||
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Stored symlink {0}", path));
|
||||
await m_log.WriteVerboseAsync("Stored symlink {0}", path);
|
||||
// Don't process further
|
||||
return false;
|
||||
}
|
||||
@@ -108,14 +102,14 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
|
||||
if (m_options.StoreMetadata)
|
||||
{
|
||||
metahash = Utility.WrapMetadata(await MetadataGenerator.GenerateMetadataAsync(path, attributes, m_options, m_snapshot, m_logchannel), m_options);
|
||||
metahash = Utility.WrapMetadata(await MetadataGenerator.GenerateMetadataAsync(path, attributes, m_options, m_snapshot, m_log), m_options);
|
||||
}
|
||||
else
|
||||
{
|
||||
metahash = EMPTY_METADATA;
|
||||
}
|
||||
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Adding directory {0}", path));
|
||||
await m_log.WriteVerboseAsync("Adding directory {0}", path);
|
||||
await AddFolderToOutputAsync(path, lastwrite, metahash);
|
||||
return false;
|
||||
}
|
||||
@@ -178,7 +172,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read timestamp on \"{0}\"", path), ex));
|
||||
await m_log.WriteWarningAsync(string.Format("Failed to read timestamp on \"{0}\"", path), ex);
|
||||
}
|
||||
|
||||
try
|
||||
@@ -187,7 +181,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read attributes on \"{0}\"", path), ex));
|
||||
await m_log.WriteWarningAsync(string.Format("Failed to read attributes on \"{0}\"", path), ex);
|
||||
}
|
||||
|
||||
// If we only have metadata, stop here
|
||||
@@ -210,7 +204,7 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
}
|
||||
catch(Exception ex)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Error(string.Format("Failed to process entry, path: {0}", path), ex));
|
||||
await m_log.WriteErrorAsync(string.Format("Failed to process entry, path: {0}", path), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,55 +28,55 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
public static Task Run(BackupResults stat)
|
||||
{
|
||||
return AutomationExtensions.RunTask(new
|
||||
{
|
||||
Input = ChannelMarker.ForRead<ProgressEvent>("ProgressChannel")
|
||||
},
|
||||
|
||||
async self =>
|
||||
{
|
||||
var filesStarted = new Dictionary<string, long>();
|
||||
var fileProgress = new Dictionary<string, long>();
|
||||
string current = null;
|
||||
{
|
||||
Input = Channels.ProgressEvents.ForRead
|
||||
},
|
||||
|
||||
async self =>
|
||||
{
|
||||
var filesStarted = new Dictionary<string, long>();
|
||||
var fileProgress = new Dictionary<string, long>();
|
||||
string current = null;
|
||||
|
||||
while(true)
|
||||
while(true)
|
||||
{
|
||||
var t = await self.Input.ReadAsync();
|
||||
switch(t.Type)
|
||||
{
|
||||
var t = await self.Input.ReadAsync();
|
||||
switch(t.Type)
|
||||
{
|
||||
case EventType.FileStarted:
|
||||
filesStarted[t.Filepath] = t.Length;
|
||||
fileProgress[t.Filepath] = 0;
|
||||
break;
|
||||
case EventType.FileProgressUpdate:
|
||||
if (t.Filepath == current)
|
||||
stat.OperationProgressUpdater.UpdateFileProgress(t.Length);
|
||||
break;
|
||||
case EventType.FileClosed:
|
||||
if (fileProgress.ContainsKey(t.Filepath))
|
||||
fileProgress[t.Filepath] = t.Length;
|
||||
|
||||
if (t.Filepath == current)
|
||||
{
|
||||
stat.OperationProgressUpdater.UpdateFileProgress(t.Length);
|
||||
current = null;
|
||||
}
|
||||
filesStarted.Remove(t.Filepath);
|
||||
fileProgress.Remove(t.Filepath);
|
||||
break;
|
||||
}
|
||||
|
||||
if (current == null)
|
||||
{
|
||||
current = filesStarted.OrderByDescending(x => x.Value).Select(x => x.Key).FirstOrDefault();
|
||||
if (current != null)
|
||||
case EventType.FileStarted:
|
||||
filesStarted[t.Filepath] = t.Length;
|
||||
fileProgress[t.Filepath] = 0;
|
||||
break;
|
||||
case EventType.FileProgressUpdate:
|
||||
if (t.Filepath == current)
|
||||
stat.OperationProgressUpdater.UpdateFileProgress(t.Length);
|
||||
break;
|
||||
case EventType.FileClosed:
|
||||
if (fileProgress.ContainsKey(t.Filepath))
|
||||
fileProgress[t.Filepath] = t.Length;
|
||||
|
||||
if (t.Filepath == current)
|
||||
{
|
||||
stat.OperationProgressUpdater.StartFile(current, filesStarted[current]);
|
||||
if (fileProgress.ContainsKey(current) && fileProgress[current] > 0)
|
||||
stat.OperationProgressUpdater.UpdateFileProgress(fileProgress[current]);
|
||||
stat.OperationProgressUpdater.UpdateFileProgress(t.Length);
|
||||
current = null;
|
||||
}
|
||||
filesStarted.Remove(t.Filepath);
|
||||
fileProgress.Remove(t.Filepath);
|
||||
break;
|
||||
}
|
||||
|
||||
if (current == null)
|
||||
{
|
||||
current = filesStarted.OrderByDescending(x => x.Value).Select(x => x.Key).FirstOrDefault();
|
||||
if (current != null)
|
||||
{
|
||||
stat.OperationProgressUpdater.StartFile(current, filesStarted[current]);
|
||||
if (fileProgress.ContainsKey(current) && fileProgress[current] > 0)
|
||||
stat.OperationProgressUpdater.UpdateFileProgress(fileProgress[current]);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -36,95 +36,94 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
public static Task Run(Options options, BackupDatabase database)
|
||||
{
|
||||
return AutomationExtensions.RunTask(
|
||||
new
|
||||
{
|
||||
Input = ChannelMarker.ForRead<UploadRequest>("SpillPickup"),
|
||||
Output = ChannelMarker.ForWrite<UploadRequest>("BackendRequests"),
|
||||
},
|
||||
new
|
||||
{
|
||||
Input = Channels.SpillPickup.ForRead,
|
||||
Output = Channels.BackendRequest.ForWrite,
|
||||
},
|
||||
|
||||
async self =>
|
||||
{
|
||||
var lst = new List<UploadRequest>();
|
||||
async self =>
|
||||
{
|
||||
var lst = new List<VolumeUploadRequest>();
|
||||
|
||||
while(!self.Input.IsRetired)
|
||||
try
|
||||
{
|
||||
lst.Add((UploadRequest)await self.Input.ReadAsync());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
if (ex.IsRetiredException())
|
||||
break;
|
||||
throw;
|
||||
}
|
||||
|
||||
|
||||
while(lst.Count > 1)
|
||||
while(!self.Input.IsRetired)
|
||||
try
|
||||
{
|
||||
lst.Add((VolumeUploadRequest)await self.Input.ReadAsync());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
if (ex.IsRetiredException())
|
||||
break;
|
||||
throw;
|
||||
}
|
||||
|
||||
UploadRequest target = null;
|
||||
var source = lst[0];
|
||||
|
||||
// Finalize the current work
|
||||
source.BlockVolume.Close();
|
||||
while(lst.Count > 1)
|
||||
{
|
||||
|
||||
// Remove it from the list of active operations
|
||||
lst.RemoveAt(0);
|
||||
VolumeUploadRequest target = null;
|
||||
var source = lst[0];
|
||||
|
||||
var buffer = new byte[options.Blocksize];
|
||||
// Finalize the current work
|
||||
source.BlockVolume.Close();
|
||||
|
||||
using(var rd = new BlockVolumeReader(options.CompressionModule, source.BlockVolume.LocalFilename, options))
|
||||
// Remove it from the list of active operations
|
||||
lst.RemoveAt(0);
|
||||
|
||||
var buffer = new byte[options.Blocksize];
|
||||
|
||||
using(var rd = new BlockVolumeReader(options.CompressionModule, source.BlockVolume.LocalFilename, options))
|
||||
{
|
||||
foreach(var file in rd.Blocks)
|
||||
{
|
||||
foreach(var file in rd.Blocks)
|
||||
// Grab a target
|
||||
if (target == null)
|
||||
{
|
||||
// Grab a target
|
||||
if (target == null)
|
||||
if (lst.Count == 0)
|
||||
{
|
||||
if (lst.Count == 0)
|
||||
{
|
||||
// No more targets, make one
|
||||
target = new UploadRequest(new BlockVolumeWriter(options), null);
|
||||
target.BlockVolume.VolumeID = await database.RegisterRemoteVolumeAsync(target.BlockVolume.RemoteFilename, RemoteVolumeType.Blocks, RemoteVolumeState.Temporary);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Grab the next target
|
||||
target = lst[0];
|
||||
lst.RemoveAt(0);
|
||||
}
|
||||
// No more targets, make one
|
||||
target = new VolumeUploadRequest(new BlockVolumeWriter(options), null);
|
||||
target.BlockVolume.VolumeID = await database.RegisterRemoteVolumeAsync(target.BlockVolume.RemoteFilename, RemoteVolumeType.Blocks, RemoteVolumeState.Temporary);
|
||||
}
|
||||
|
||||
|
||||
var len = rd.ReadBlock(file.Key, buffer);
|
||||
target.BlockVolume.AddBlock(file.Key, buffer, 0, len, Duplicati.Library.Interface.CompressionHint.Default);
|
||||
await database.MoveBlockToVolumeAsync(file.Key, len, source.BlockVolume.VolumeID, target.BlockVolume.VolumeID);
|
||||
|
||||
if (target.BlockVolume.Filesize > options.VolumeSize - options.Blocksize)
|
||||
else
|
||||
{
|
||||
await self.Output.WriteAsync(target);
|
||||
target = null;
|
||||
// Grab the next target
|
||||
target = lst[0];
|
||||
lst.RemoveAt(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
var len = rd.ReadBlock(file.Key, buffer);
|
||||
target.BlockVolume.AddBlock(file.Key, buffer, 0, len, Duplicati.Library.Interface.CompressionHint.Default);
|
||||
await database.MoveBlockToVolumeAsync(file.Key, len, source.BlockVolume.VolumeID, target.BlockVolume.VolumeID);
|
||||
|
||||
if (target.BlockVolume.Filesize > options.VolumeSize - options.Blocksize)
|
||||
{
|
||||
await self.Output.WriteAsync(target);
|
||||
target = null;
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure they are out of the database
|
||||
System.IO.File.Delete(source.BlockVolume.LocalFilename);
|
||||
await database.SafeDeleteRemoteVolumeAsync(source.BlockVolume.RemoteFilename);
|
||||
|
||||
// Re-inject the target if it has content
|
||||
if (target != null)
|
||||
lst.Insert(lst.Count == 0 ? 0 : 1, target);
|
||||
|
||||
}
|
||||
|
||||
foreach(var n in lst)
|
||||
{
|
||||
n.BlockVolume.Close();
|
||||
await self.Output.WriteAsync(n);
|
||||
}
|
||||
// Make sure they are out of the database
|
||||
System.IO.File.Delete(source.BlockVolume.LocalFilename);
|
||||
await database.SafeDeleteRemoteVolumeAsync(source.BlockVolume.RemoteFilename);
|
||||
|
||||
// Re-inject the target if it has content
|
||||
if (target != null)
|
||||
lst.Insert(lst.Count == 0 ? 0 : 1, target);
|
||||
|
||||
}
|
||||
);
|
||||
|
||||
foreach(var n in lst)
|
||||
{
|
||||
n.BlockVolume.Close();
|
||||
await self.Output.WriteAsync(n);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,113 +26,110 @@ namespace Duplicati.Library.Main.Operation.Backup
|
||||
{
|
||||
internal static class UploadSyntheticFilelist
|
||||
{
|
||||
public static Task Run(Common.BackendHandler backend, BackupDatabase database, Options options, BackupResults result)
|
||||
public static Task Run(BackupDatabase database, Options options, BackupResults result)
|
||||
{
|
||||
return AutomationExtensions.RunTask(new
|
||||
{
|
||||
LogChannel = ChannelMarker.ForWrite<LogMessage>("LogChannel"),
|
||||
},
|
||||
{
|
||||
LogChannel = Common.Channels.LogChannel.ForWrite,
|
||||
UploadChannel = Channels.BackendRequest.ForWrite
|
||||
},
|
||||
|
||||
async self =>
|
||||
async self =>
|
||||
{
|
||||
var log = new LogWrapper(self.LogChannel);
|
||||
var incompleteFilesets = await database.GetIncompleteFilesetsAsync();
|
||||
if (incompleteFilesets.Length != 0)
|
||||
{
|
||||
await database.CommitTransactionAsync("PreSyntheticFilelist");
|
||||
|
||||
var incompleteFilesets = await database.GetIncompleteFilesetsAsync();
|
||||
if (incompleteFilesets.Length != 0)
|
||||
result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_PreviousBackupFinalize);
|
||||
await log.WriteInformationAsync("Uploading filelist from previous interrupted backup");
|
||||
|
||||
var incompleteSet = incompleteFilesets.Last();
|
||||
var badIds = from n in incompleteFilesets select n.Key;
|
||||
|
||||
var prevs = (from n in await database.GetFilesetTimesAsync()
|
||||
where
|
||||
n.Key < incompleteSet.Key
|
||||
&&
|
||||
!badIds.Contains(n.Key)
|
||||
orderby n.Key
|
||||
select n.Key).ToArray();
|
||||
|
||||
var prevId = prevs.Length == 0 ? -1 : prevs.Last();
|
||||
|
||||
FilesetVolumeWriter fsw = null;
|
||||
try
|
||||
{
|
||||
await database.CommitTransactionAsync("PreSyntheticFilelist");
|
||||
var s = 1;
|
||||
var fileTime = incompleteSet.Value + TimeSpan.FromSeconds(s);
|
||||
var oldFilesetID = incompleteSet.Key;
|
||||
|
||||
result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_PreviousBackupFinalize);
|
||||
await self.LogChannel.WriteAsync(LogMessage.Information("Uploading filelist from previous interrupted backup"));
|
||||
|
||||
var incompleteSet = incompleteFilesets.Last();
|
||||
var badIds = from n in incompleteFilesets select n.Key;
|
||||
|
||||
var prevs = (from n in await database.GetFilesetTimesAsync()
|
||||
where
|
||||
n.Key < incompleteSet.Key
|
||||
&&
|
||||
!badIds.Contains(n.Key)
|
||||
orderby n.Key
|
||||
select n.Key).ToArray();
|
||||
|
||||
var prevId = prevs.Length == 0 ? -1 : prevs.Last();
|
||||
|
||||
FilesetVolumeWriter fsw = null;
|
||||
try
|
||||
// Probe for an unused filename
|
||||
while (s < 60)
|
||||
{
|
||||
var s = 1;
|
||||
var fileTime = incompleteSet.Value + TimeSpan.FromSeconds(s);
|
||||
var oldFilesetID = incompleteSet.Key;
|
||||
var id = await database.GetRemoteVolumeIDAsync(VolumeBase.GenerateFilename(RemoteVolumeType.Files, options, null, fileTime));
|
||||
if (id < 0)
|
||||
break;
|
||||
|
||||
// Probe for an unused filename
|
||||
while (s < 60)
|
||||
{
|
||||
var id = await database.GetRemoteVolumeIDAsync(VolumeBase.GenerateFilename(RemoteVolumeType.Files, options, null, fileTime));
|
||||
if (id < 0)
|
||||
break;
|
||||
|
||||
fileTime = incompleteSet.Value + TimeSpan.FromSeconds(++s);
|
||||
}
|
||||
|
||||
fsw = new FilesetVolumeWriter(options, fileTime);
|
||||
fsw.VolumeID = await database.RegisterRemoteVolumeAsync(fsw.RemoteFilename, RemoteVolumeType.Files, RemoteVolumeState.Temporary);
|
||||
|
||||
if (!string.IsNullOrEmpty(options.ControlFiles))
|
||||
foreach(var p in options.ControlFiles.Split(new char[] { System.IO.Path.PathSeparator }, StringSplitOptions.RemoveEmptyEntries))
|
||||
fsw.AddControlFile(p, options.GetCompressionHintFromFilename(p));
|
||||
|
||||
var newFilesetID = await database.CreateFilesetAsync(fsw.VolumeID, fileTime);
|
||||
await database.LinkFilesetToVolumeAsync(newFilesetID, fsw.VolumeID);
|
||||
await database.AppendFilesFromPreviousSetAsync(null, newFilesetID, prevId, fileTime);
|
||||
|
||||
await database.WriteFilesetAsync(fsw, newFilesetID);
|
||||
|
||||
if (options.Dryrun)
|
||||
{
|
||||
await self.LogChannel.WriteAsync(LogMessage.DryRun("Would upload fileset: {0}, size: {1}", fsw.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(fsw.LocalFilename).Length)));
|
||||
}
|
||||
else
|
||||
{
|
||||
await database.UpdateRemoteVolumeAsync(fsw.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
|
||||
await database.CommitTransactionAsync("CommitUpdateFilelistVolume");
|
||||
await backend.UploadFileAsync(fsw);
|
||||
fsw = null;
|
||||
}
|
||||
fileTime = incompleteSet.Value + TimeSpan.FromSeconds(++s);
|
||||
}
|
||||
catch
|
||||
|
||||
fsw = new FilesetVolumeWriter(options, fileTime);
|
||||
fsw.VolumeID = await database.RegisterRemoteVolumeAsync(fsw.RemoteFilename, RemoteVolumeType.Files, RemoteVolumeState.Temporary);
|
||||
|
||||
if (!string.IsNullOrEmpty(options.ControlFiles))
|
||||
foreach(var p in options.ControlFiles.Split(new char[] { System.IO.Path.PathSeparator }, StringSplitOptions.RemoveEmptyEntries))
|
||||
fsw.AddControlFile(p, options.GetCompressionHintFromFilename(p));
|
||||
|
||||
var newFilesetID = await database.CreateFilesetAsync(fsw.VolumeID, fileTime);
|
||||
await database.LinkFilesetToVolumeAsync(newFilesetID, fsw.VolumeID);
|
||||
await database.AppendFilesFromPreviousSetAsync(null, newFilesetID, prevId, fileTime);
|
||||
|
||||
await database.WriteFilesetAsync(fsw, newFilesetID);
|
||||
|
||||
if (options.Dryrun)
|
||||
{
|
||||
await database.RollbackTransactionAsync();
|
||||
throw;
|
||||
await log.WriteDryRunAsync("Would upload fileset: {0}, size: {1}", fsw.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(fsw.LocalFilename).Length));
|
||||
}
|
||||
finally
|
||||
else
|
||||
{
|
||||
if (fsw != null)
|
||||
try { fsw.Dispose(); }
|
||||
catch { fsw = null; }
|
||||
}
|
||||
await database.UpdateRemoteVolumeAsync(fsw.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
|
||||
await database.CommitTransactionAsync("CommitUpdateFilelistVolume");
|
||||
await self.UploadChannel.WriteAsync(new FilesetUploadRequest(fsw));
|
||||
fsw = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (options.IndexfilePolicy != Options.IndexFileStrategy.None)
|
||||
catch
|
||||
{
|
||||
var blockhasher = System.Security.Cryptography.HashAlgorithm.Create(options.BlockHashAlgorithm);
|
||||
var hashsize = blockhasher.HashSize / 8;
|
||||
await database.RollbackTransactionAsync();
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (fsw != null)
|
||||
try { fsw.Dispose(); }
|
||||
catch { fsw = null; }
|
||||
}
|
||||
}
|
||||
|
||||
foreach(var blockfile in await database.GetMissingIndexFilesAsync())
|
||||
if (options.IndexfilePolicy != Options.IndexFileStrategy.None)
|
||||
{
|
||||
foreach(var blockfile in await database.GetMissingIndexFilesAsync())
|
||||
{
|
||||
await log.WriteInformationAsync(string.Format("Re-creating missing index file for {0}", blockfile));
|
||||
var w = await Common.IndexVolumeCreator.CreateIndexVolume(blockfile, options, database);
|
||||
|
||||
if (options.Dryrun)
|
||||
await log.WriteDryRunAsync("would upload new index file {0}, with size {1}, previous size {2}", w.RemoteFilename, Library.Utility.Utility.FormatSizeString(new System.IO.FileInfo(w.LocalFilename).Length), Library.Utility.Utility.FormatSizeString(w.Filesize));
|
||||
else
|
||||
{
|
||||
await self.LogChannel.WriteAsync(LogMessage.Information(string.Format("Re-creating missing index file for {0}", blockfile)));
|
||||
var w = await Common.IndexVolumeCreator.CreateIndexVolume(blockfile, options, database);
|
||||
|
||||
if (options.Dryrun)
|
||||
await self.LogChannel.WriteAsync(LogMessage.DryRun("would upload new index file {0}, with size {1}, previous size {2}", w.RemoteFilename, Library.Utility.Utility.FormatSizeString(new System.IO.FileInfo(w.LocalFilename).Length), Library.Utility.Utility.FormatSizeString(w.Filesize)));
|
||||
else
|
||||
{
|
||||
await database.UpdateRemoteVolumeAsync(w.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
|
||||
await backend.UploadFileAsync(w);
|
||||
}
|
||||
await database.UpdateRemoteVolumeAsync(w.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
|
||||
await self.UploadChannel.WriteAsync(new IndexVolumeUploadRequest(w));
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,30 +18,18 @@ namespace Duplicati.Library.Main.Operation
|
||||
private readonly Options m_options;
|
||||
private string m_backendurl;
|
||||
|
||||
private byte[] m_blockbuffer;
|
||||
private byte[] m_blocklistbuffer;
|
||||
private System.Security.Cryptography.HashAlgorithm m_blockhasher;
|
||||
private System.Security.Cryptography.HashAlgorithm m_filehasher;
|
||||
|
||||
private LocalBackupDatabase m_database;
|
||||
private System.Data.IDbTransaction m_transaction;
|
||||
private BlockVolumeWriter m_blockvolume;
|
||||
private IndexVolumeWriter m_indexvolume;
|
||||
|
||||
private readonly IMetahash EMPTY_METADATA;
|
||||
|
||||
private Library.Utility.IFilter m_filter;
|
||||
private Library.Utility.IFilter m_sourceFilter;
|
||||
|
||||
private BackupResults m_result;
|
||||
|
||||
// Speed up things by caching this
|
||||
private int m_blocksize;
|
||||
|
||||
public BackupHandler(string backendurl, Options options, BackupResults results)
|
||||
{
|
||||
EMPTY_METADATA = Utility.WrapMetadata(new Dictionary<string, string>(), options);
|
||||
|
||||
m_options = options;
|
||||
m_result = results;
|
||||
m_backendurl = backendurl;
|
||||
@@ -79,39 +67,38 @@ namespace Duplicati.Library.Main.Operation
|
||||
var enumeratorTask = new Backup.FileEnumerationProcess(snapshot, options.FileAttributeFilter, sourcefilter, filter, options.SymlinkPolicy, options.HardlinkPolicy, options.ChangedFilelist).RunAsync();
|
||||
|
||||
var counterTask = AutomationExtensions.RunTask(new
|
||||
{
|
||||
Input = ChannelMarker.ForRead<string>("SourcePaths")
|
||||
},
|
||||
async self =>
|
||||
{
|
||||
var count = 0L;
|
||||
var size = 0L;
|
||||
{
|
||||
Input = Backup.Channels.SourcePaths.ForRead
|
||||
},
|
||||
async self =>
|
||||
{
|
||||
var count = 0L;
|
||||
var size = 0L;
|
||||
|
||||
try
|
||||
try
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
var path = await self.Input.ReadAsync();
|
||||
|
||||
count++;
|
||||
|
||||
try
|
||||
{
|
||||
var path = await self.Input.ReadAsync();
|
||||
|
||||
count++;
|
||||
|
||||
try
|
||||
{
|
||||
size += snapshot.GetFileSize(path);
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
|
||||
result.OperationProgressUpdater.UpdatefileCount(count, size, false);
|
||||
size += snapshot.GetFileSize(path);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
result.OperationProgressUpdater.UpdatefileCount(count, size, true);
|
||||
catch
|
||||
{
|
||||
}
|
||||
|
||||
result.OperationProgressUpdater.UpdatefileCount(count, size, false);
|
||||
}
|
||||
}
|
||||
);
|
||||
finally
|
||||
{
|
||||
result.OperationProgressUpdater.UpdatefileCount(count, size, true);
|
||||
}
|
||||
});
|
||||
|
||||
return Task.WhenAll(enumeratorTask, counterTask);
|
||||
}
|
||||
@@ -146,7 +133,7 @@ namespace Duplicati.Library.Main.Operation
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task RunMainOperation(Snapshots.ISnapshotService snapshot, Backup.BackupDatabase database, Common.BackendHandler backend, Backup.BackupStatsCollector stats, Options options, IFilter sourcefilter, IFilter filter, BackupResults result)
|
||||
private static async Task RunMainOperation(Snapshots.ISnapshotService snapshot, Backup.BackupDatabase database, Backup.BackupStatsCollector stats, Options options, IFilter sourcefilter, IFilter filter, BackupResults result)
|
||||
{
|
||||
using(new Logging.Timer("BackupMainOperation"))
|
||||
{
|
||||
@@ -160,8 +147,7 @@ namespace Duplicati.Library.Main.Operation
|
||||
Backup.FilePreFilterProcess.Start(snapshot, options, stats, database),
|
||||
new Backup.MetadataPreProcess(snapshot, options, database).RunAsync(),
|
||||
Backup.SpillCollectorProcess.Run(options, database),
|
||||
Backup.ProgressHandler.Run(result),
|
||||
Backup.BackendUploader.Run(backend, options, database, result)
|
||||
Backup.ProgressHandler.Run(result)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -271,6 +257,35 @@ namespace Duplicati.Library.Main.Operation
|
||||
}
|
||||
|
||||
public void Run(string[] sources, Library.Utility.IFilter filter)
|
||||
{
|
||||
RunAsync(sources, filter).WaitForTaskOrThrow();
|
||||
}
|
||||
|
||||
private static Exception BuildException(Exception source, params Task[] tasks)
|
||||
{
|
||||
if (tasks == null || tasks.Length == 0)
|
||||
return source;
|
||||
|
||||
var ex = new List<Exception>();
|
||||
ex.Add(source);
|
||||
|
||||
foreach(var t in tasks)
|
||||
if (t != null)
|
||||
{
|
||||
if (!t.IsCompleted && !t.IsFaulted && !t.IsCanceled)
|
||||
t.Wait(500);
|
||||
|
||||
if (t.IsFaulted && t.Exception != null)
|
||||
ex.Add(t.Exception);
|
||||
}
|
||||
|
||||
if (ex.Count == 1)
|
||||
return ex.First();
|
||||
else
|
||||
return new AggregateException(ex.First().Message, ex);
|
||||
}
|
||||
|
||||
private async Task RunAsync(string[] sources, Library.Utility.IFilter filter)
|
||||
{
|
||||
m_result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_Begin);
|
||||
|
||||
@@ -278,101 +293,97 @@ namespace Duplicati.Library.Main.Operation
|
||||
using(new ChannelScope(true))
|
||||
using(m_database = new LocalBackupDatabase(m_options.Dbpath, m_options))
|
||||
{
|
||||
// Start the log handler
|
||||
var lh = Common.LogHandler.Run(m_result);
|
||||
|
||||
m_result.SetDatabase(m_database);
|
||||
m_result.Dryrun = m_options.Dryrun;
|
||||
|
||||
// Check the database integrity
|
||||
Utility.UpdateOptionsFromDb(m_database, m_options);
|
||||
Utility.VerifyParameters(m_database, m_options);
|
||||
|
||||
m_blocksize = m_options.Blocksize;
|
||||
|
||||
m_blockbuffer = new byte[m_options.Blocksize * Math.Max(1, m_options.FileReadBufferSize / m_options.Blocksize)];
|
||||
m_blocklistbuffer = new byte[m_options.Blocksize];
|
||||
|
||||
m_blockhasher = System.Security.Cryptography.HashAlgorithm.Create(m_options.BlockHashAlgorithm);
|
||||
m_filehasher = System.Security.Cryptography.HashAlgorithm.Create(m_options.FileHashAlgorithm);
|
||||
|
||||
if (m_blockhasher == null)
|
||||
throw new Exception(Strings.Foresthash.InvalidHashAlgorithm(m_options.BlockHashAlgorithm));
|
||||
if (m_filehasher == null)
|
||||
throw new Exception(Strings.Foresthash.InvalidHashAlgorithm(m_options.FileHashAlgorithm));
|
||||
|
||||
if (!m_blockhasher.CanReuseTransform)
|
||||
throw new Exception(Strings.Foresthash.InvalidCryptoSystem(m_options.BlockHashAlgorithm));
|
||||
if (!m_filehasher.CanReuseTransform)
|
||||
throw new Exception(Strings.Foresthash.InvalidCryptoSystem(m_options.FileHashAlgorithm));
|
||||
|
||||
m_database.VerifyConsistency(null, m_options.Blocksize, m_options.BlockhashSize);
|
||||
|
||||
// If there is no filter, we set an empty filter to simplify the code
|
||||
// If there is a filter, we make sure that the sources are included
|
||||
m_filter = filter ?? new Library.Utility.FilterExpression();
|
||||
m_sourceFilter = new Library.Utility.FilterExpression(sources, true);
|
||||
|
||||
|
||||
Task parallelScanner = null;
|
||||
Task uploader = null;
|
||||
try
|
||||
{
|
||||
var flushReq = new Backup.FlushRequest();
|
||||
|
||||
// Setup runners and instances here
|
||||
using(var db = new Backup.BackupDatabase(m_database))
|
||||
using(var backend = new BackendManager(m_backendurl, m_options, m_result.BackendWriter, m_database))
|
||||
using(var filesetvolume = new FilesetVolumeWriter(m_options, m_database.OperationTimestamp))
|
||||
using(var logtarget = ChannelManager.GetChannel<Common.LogMessage>("LogChannel").AsWriteOnly())
|
||||
using(var stats = new Backup.BackupStatsCollector(m_result))
|
||||
using(var bk = new Common.BackendHandler(m_options, m_backendurl, db, stats))
|
||||
using(var logtarget = ChannelManager.GetChannel(Common.Channels.LogChannel.ForWrite))
|
||||
{
|
||||
var lh = Common.LogHandler.Run(m_result);
|
||||
long lastVolumeSize = -1L;
|
||||
|
||||
using(var snapshot = GetSnapshot(sources, m_options, m_result))
|
||||
using(var uploadtarget = ChannelManager.GetChannel(Backup.Channels.BackendRequest.ForWrite))
|
||||
{
|
||||
var counterToken = new CancellationTokenSource();
|
||||
|
||||
try
|
||||
using(var snapshot = GetSnapshot(sources, m_options, m_result))
|
||||
{
|
||||
// Start the parallel scanner
|
||||
parallelScanner = CountFilesHandler(snapshot, m_result, m_options, m_sourceFilter, m_filter, counterToken.Token);
|
||||
var counterToken = new CancellationTokenSource();
|
||||
|
||||
// Do a remote verification, unless disabled
|
||||
PreBackupVerify(backend);
|
||||
|
||||
using(var db = new Backup.BackupDatabase(m_database))
|
||||
using(var stats = new Backup.BackupStatsCollector(m_result))
|
||||
using(var bk = new Common.BackendHandler(m_options, m_backendurl, db, stats))
|
||||
try
|
||||
{
|
||||
// Start the parallel scanner
|
||||
parallelScanner = CountFilesHandler(snapshot, m_result, m_options, m_sourceFilter, m_filter, counterToken.Token);
|
||||
|
||||
// Do a remote verification, unless disabled
|
||||
PreBackupVerify(backend);
|
||||
|
||||
// Verify before uploading a synthetic list
|
||||
m_database.VerifyConsistency(null, m_options.Blocksize, m_options.BlockhashSize);
|
||||
|
||||
var res = Backup.UploadSyntheticFilelist.Run(bk, db, m_options, m_result).WaitForTask();
|
||||
if (res.IsFaulted)
|
||||
{
|
||||
if (res.Exception.Flatten().InnerExceptions.Count == 1)
|
||||
throw res.Exception.Flatten().InnerExceptions.First();
|
||||
else
|
||||
throw res.Exception;
|
||||
}
|
||||
// Start the uploader process
|
||||
uploader = Backup.BackendUploader.Run(bk, m_options, db, m_result);
|
||||
|
||||
// If the previous backup was interrupted, send a synthetic list
|
||||
await Backup.UploadSyntheticFilelist.Run(db, m_options, m_result);
|
||||
|
||||
// This should be removed
|
||||
m_database.BuildLookupTable(m_options);
|
||||
|
||||
// Prepare the operation
|
||||
m_result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_ProcessingFiles);
|
||||
await CreateFilesetAsync(db, filesetvolume.RemoteFilename);
|
||||
|
||||
var filesetvolumeid = CreateFilesetAsync(db, filesetvolume.RemoteFilename).Result;
|
||||
|
||||
res = RunMainOperation(snapshot, db, bk, stats, m_options, m_sourceFilter, m_filter, m_result).WaitForTask();
|
||||
if (res.IsFaulted)
|
||||
{
|
||||
if (res.Exception.Flatten().InnerExceptions.Count == 1)
|
||||
throw res.Exception.Flatten().InnerExceptions.First();
|
||||
else
|
||||
throw res.Exception;
|
||||
}
|
||||
// Run the backup operation
|
||||
await RunMainOperation(snapshot, db, stats, m_options, m_sourceFilter, m_filter, m_result);
|
||||
}
|
||||
finally
|
||||
{
|
||||
//If the scanner is still running for some reason, make sure we kill it now
|
||||
counterToken.Cancel();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
//If the scanner is still running for some reason, make sure we kill it now
|
||||
counterToken.Cancel();
|
||||
}
|
||||
|
||||
// Wait for upload completion
|
||||
m_result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_WaitForUpload);
|
||||
await uploadtarget.WriteAsync(flushReq);
|
||||
|
||||
// TODO: do not close the uploader once the other functions are ported
|
||||
}
|
||||
|
||||
// TODO: Implement this
|
||||
//var lastVolumeSize = FinalizeRemoteVolumes(backend);
|
||||
// In case the uploader crashes, we grab the exception here
|
||||
if (await Task.WhenAny(uploader, flushReq.LastWriteSizeAync) == uploader)
|
||||
await uploader;
|
||||
|
||||
// Grab the size of the last uploaded volume
|
||||
var lastVolumeSize = await flushReq.LastWriteSizeAync;
|
||||
|
||||
// Make sure we have the database up-to-date
|
||||
await db.CommitTransactionAsync("CommitAfterUpload", false);
|
||||
|
||||
// TODO: Remove this
|
||||
await uploader;
|
||||
|
||||
// TODO: Remove this later
|
||||
m_transaction = m_database.BeginTransaction();
|
||||
|
||||
using(new Logging.Timer("UpdateChangeStatistics"))
|
||||
@@ -423,8 +434,12 @@ namespace Duplicati.Library.Main.Operation
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
m_result.AddError("Fatal error", ex);
|
||||
throw;
|
||||
var aex = BuildException(ex, uploader, parallelScanner);
|
||||
m_result.AddError("Fatal error", aex);
|
||||
if (aex == ex)
|
||||
throw;
|
||||
|
||||
throw aex;
|
||||
}
|
||||
finally
|
||||
{
|
||||
@@ -435,6 +450,8 @@ namespace Duplicati.Library.Main.Operation
|
||||
if (m_transaction != null)
|
||||
try { m_transaction.Rollback(); }
|
||||
catch (Exception ex) { m_result.AddError(string.Format("Rollback error: {0}", ex.Message), ex); }
|
||||
|
||||
lh.Wait(500);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,10 +67,6 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
/// </summary>
|
||||
public long Size;
|
||||
/// <summary>
|
||||
/// A flag indicating if the final hash and size of the block volume has been written to the index file
|
||||
/// </summary>
|
||||
public bool IndexfileUpdated;
|
||||
/// <summary>
|
||||
/// A flag indicating if the file is a extra metadata file
|
||||
/// that has no entry in the database
|
||||
/// </summary>
|
||||
@@ -104,13 +100,13 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
this.LocalTempfile.Protected = true;
|
||||
}
|
||||
|
||||
public async Task Encrypt(Library.Interface.IEncryption encryption, IWriteChannel<LogMessage> logchannel)
|
||||
public async Task Encrypt(Library.Interface.IEncryption encryption, LogWrapper log)
|
||||
{
|
||||
if (encryption != null && !this.Encrypted)
|
||||
{
|
||||
var tempfile = new Library.Utility.TempFile();
|
||||
encryption.Encrypt(this.LocalFilename, tempfile);
|
||||
await this.DeleteLocalFile(logchannel);
|
||||
await this.DeleteLocalFile(log);
|
||||
this.LocalTempfile = tempfile;
|
||||
this.Hash = null;
|
||||
this.Size = 0;
|
||||
@@ -138,18 +134,18 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
return false;
|
||||
}
|
||||
|
||||
public async Task DeleteLocalFile(IWriteChannel<LogMessage> logchannel)
|
||||
public async Task DeleteLocalFile(LogWrapper log)
|
||||
{
|
||||
if (this.LocalTempfile != null)
|
||||
try { this.LocalTempfile.Dispose(); }
|
||||
catch (Exception ex) { await logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to dispose temporary file: {0}", this.LocalTempfile), ex)); }
|
||||
catch (Exception ex) { await log.WriteWarningAsync(string.Format("Failed to dispose temporary file: {0}", this.LocalTempfile), ex); }
|
||||
finally { this.LocalTempfile = null; }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
[ChannelName("LogChannel")]
|
||||
private IWriteChannel<LogMessage> m_logchannel;
|
||||
private IWriteChannel<LogMessage> m_logchannel = Channels.LogChannel.ForWrite;
|
||||
private LogWrapper m_log;
|
||||
|
||||
private DatabaseCommon m_database;
|
||||
private IEncryption m_encryption;
|
||||
@@ -167,6 +163,7 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
m_options = options;
|
||||
m_backendurl = backendUrl;
|
||||
m_stats = stats;
|
||||
m_log = new LogWrapper(m_logchannel);
|
||||
if (!options.NoEncryption)
|
||||
m_encryption = DynamicLoader.EncryptionLoader.GetModule(options.EncryptionModule, options.Passphrase, options.RawOptions);
|
||||
}
|
||||
@@ -201,6 +198,14 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
|
||||
var tcs = new TaskCompletionSource<bool>();
|
||||
|
||||
var backgroundhashAndEncrypt = Task.Run(async () =>
|
||||
{
|
||||
if (m_encryption != null)
|
||||
await fe.Encrypt(m_encryption, m_log).ConfigureAwait(false);
|
||||
|
||||
return fe.UpdateHashAndSize(m_options);
|
||||
});
|
||||
|
||||
await RunOnMain(async () =>
|
||||
{
|
||||
try
|
||||
@@ -209,6 +214,9 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
if (fe.IsRetry)
|
||||
await RenameFileAfterErrorAsync(fe);
|
||||
|
||||
// Make sure the encryption and hashing has completed
|
||||
await backgroundhashAndEncrypt;
|
||||
|
||||
return await DoPut(fe);
|
||||
});
|
||||
|
||||
@@ -306,7 +314,7 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
}
|
||||
catch (Exception dex)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Warning(LC.L("Failed to dispose backend instance: {0}", (ex ?? dex).Message), dex));
|
||||
await m_log.WriteWarningAsync(LC.L("Failed to dispose backend instance: {0}", (ex ?? dex).Message), dex);
|
||||
}
|
||||
|
||||
m_backend = null;
|
||||
@@ -337,7 +345,7 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
{
|
||||
item.IsRetry = true;
|
||||
lastException = ex;
|
||||
await m_logchannel.WriteAsync(LogMessage.RetryAttempt(string.Format("Operation {0} with file {1} attempt {2} of {3} failed with message: {4}", item.Operation, item.RemoteFilename, i + 1, m_options.NumberOfRetries, ex.Message), ex));
|
||||
await m_log.WriteRetryAttemptAsync(string.Format("Operation {0} with file {1} attempt {2} of {3} failed with message: {4}", item.Operation, item.RemoteFilename, i + 1, m_options.NumberOfRetries, ex.Message), ex);
|
||||
|
||||
// If the thread is aborted, we exit here
|
||||
if (ex is System.Threading.ThreadAbortException)
|
||||
@@ -356,7 +364,7 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
}
|
||||
catch (Exception dex)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to create folder: {0}", ex.Message), dex));
|
||||
await m_log.WriteWarningAsync(string.Format("Failed to create folder: {0}", ex.Message), dex);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -384,20 +392,21 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
|
||||
await m_stats.SendEventAsync(item.Operation, BackendEventType.Rename, oldname, item.Size);
|
||||
await m_stats.SendEventAsync(item.Operation, BackendEventType.Rename, newname, item.Size);
|
||||
await m_logchannel.WriteAsync(LogMessage.Information(string.Format("Renaming \"{0}\" to \"{1}\"", oldname, newname)));
|
||||
await m_log.WriteInformationAsync(string.Format("Renaming \"{0}\" to \"{1}\"", oldname, newname));
|
||||
await m_database.RenameRemoteFileAsync(oldname, newname);
|
||||
item.RemoteFilename = newname;
|
||||
}
|
||||
|
||||
private async Task<bool> DoPut(FileEntryItem item)
|
||||
private async Task<bool> DoPut(FileEntryItem item, bool updatedHash = false)
|
||||
{
|
||||
if (m_encryption != null)
|
||||
await item.Encrypt(m_encryption, m_logchannel);
|
||||
if (m_encryption != null && !item.Encrypted)
|
||||
await item.Encrypt(m_encryption, m_log);
|
||||
|
||||
if (item.UpdateHashAndSize(m_options) && item.TrackedInDb)
|
||||
updatedHash |= item.UpdateHashAndSize(m_options);
|
||||
|
||||
if (updatedHash && item.TrackedInDb)
|
||||
await m_database.UpdateRemoteVolumeAsync(item.RemoteFilename, RemoteVolumeState.Uploading, item.Size, item.Hash);
|
||||
|
||||
|
||||
await m_database.LogRemoteOperationAsync("put", item.RemoteFilename, JsonConvert.SerializeObject(new { Size = item.Size, Hash = item.Hash }));
|
||||
await m_stats.SendEventAsync(BackendActionType.Put, BackendEventType.Started, item.RemoteFilename, item.Size);
|
||||
|
||||
@@ -414,7 +423,7 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
m_backend.Put(item.RemoteFilename, item.LocalFilename);
|
||||
|
||||
var duration = DateTime.Now - begin;
|
||||
Logging.Log.WriteMessage(string.Format("Uploaded {0} in {1}, {2}/s", Library.Utility.Utility.FormatSizeString(item.Size), duration, Library.Utility.Utility.FormatSizeString((long)(item.Size / duration.TotalSeconds))), Duplicati.Library.Logging.LogMessageType.Profiling);
|
||||
await m_log.WriteProfilingAsync(string.Format("Uploaded {0} in {1}, {2}/s", Library.Utility.Utility.FormatSizeString(item.Size), duration, Library.Utility.Utility.FormatSizeString((long)(item.Size / duration.TotalSeconds))), null);
|
||||
|
||||
if (item.TrackedInDb)
|
||||
await m_database.UpdateRemoteVolumeAsync(item.RemoteFilename, RemoteVolumeState.Uploaded, item.Size, item.Hash);
|
||||
@@ -430,7 +439,7 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
throw new Exception(string.Format("List verify failed for file: {0}, size was {1} but expected to be {2}", f.Name, f.Size, item.Size));
|
||||
}
|
||||
|
||||
await item.DeleteLocalFile(m_logchannel);
|
||||
await item.DeleteLocalFile(m_log);
|
||||
await m_database.CommitTransactionAsync("CommitAfterUpload");
|
||||
|
||||
return true;
|
||||
@@ -478,7 +487,7 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
|
||||
if (isFileMissingException || (wr != null && wr.StatusCode == System.Net.HttpStatusCode.NotFound))
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Warning(LC.L("Delete operation failed for {0} with FileNotFound, listing contents", item.RemoteFilename), ex));
|
||||
await m_log.WriteWarningAsync(LC.L("Delete operation failed for {0} with FileNotFound, listing contents", item.RemoteFilename), ex);
|
||||
bool success = false;
|
||||
|
||||
try
|
||||
@@ -491,7 +500,7 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
|
||||
if (success)
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Information(LC.L("Listing indicates file {0} is deleted correctly", item.RemoteFilename)));
|
||||
await m_log.WriteInformationAsync(LC.L("Listing indicates file {0} is deleted correctly", item.RemoteFilename));
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -556,7 +565,7 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
|
||||
var duration = DateTime.Now - begin;
|
||||
var filehash = FileEntryItem.CalculateFileHash(tmpfile);
|
||||
Logging.Log.WriteMessage(string.Format("Downloaded {0} in {1}, {2}/s", Library.Utility.Utility.FormatSizeString(item.Size), duration, Library.Utility.Utility.FormatSizeString((long)(item.Size / duration.TotalSeconds))), Duplicati.Library.Logging.LogMessageType.Profiling);
|
||||
await m_log.WriteProfilingAsync(string.Format("Downloaded {0} in {1}, {2}/s", Library.Utility.Utility.FormatSizeString(item.Size), duration, Library.Utility.Utility.FormatSizeString((long)(item.Size / duration.TotalSeconds))), null);
|
||||
|
||||
await m_database.LogRemoteOperationAsync("get", item.RemoteFilename, JsonConvert.SerializeObject(new { Size = new System.IO.FileInfo(tmpfile).Length, Hash = filehash }));
|
||||
await m_stats.SendEventAsync(BackendActionType.Get, BackendEventType.Completed, item.RemoteFilename, new System.IO.FileInfo(tmpfile).Length);
|
||||
@@ -601,19 +610,19 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
// Check if the file is encrypted with something else
|
||||
if (DynamicLoader.EncryptionLoader.Keys.Contains(ext, StringComparer.InvariantCultureIgnoreCase))
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Filename extension \"{0}\" does not match encryption module \"{1}\", using matching encryption module", ext, m_options.EncryptionModule));
|
||||
await m_log.WriteVerboseAsync("Filename extension \"{0}\" does not match encryption module \"{1}\", using matching encryption module", ext, m_options.EncryptionModule);
|
||||
using(var encmodule = DynamicLoader.EncryptionLoader.GetModule(ext, m_options.Passphrase, m_options.RawOptions))
|
||||
(encmodule ?? m_encryption).Decrypt(tmpfile2, tmpfile);
|
||||
}
|
||||
// Check if the file is not encrypted
|
||||
else if (DynamicLoader.CompressionLoader.Keys.Contains(ext, StringComparer.InvariantCultureIgnoreCase))
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Filename extension \"{0}\" does not match encryption module \"{1}\", guessing that it is not encrypted", ext, m_options.EncryptionModule));
|
||||
await m_log.WriteVerboseAsync("Filename extension \"{0}\" does not match encryption module \"{1}\", guessing that it is not encrypted", ext, m_options.EncryptionModule);
|
||||
}
|
||||
// Fallback, lets see what happens...
|
||||
else
|
||||
{
|
||||
await m_logchannel.WriteAsync(LogMessage.Verbose("Filename extension \"{0}\" does not match encryption module \"{1}\", attempting to use specified encryption module as no others match", ext, m_options.EncryptionModule));
|
||||
await m_log.WriteVerboseAsync("Filename extension \"{0}\" does not match encryption module \"{1}\", attempting to use specified encryption module as no others match", ext, m_options.EncryptionModule);
|
||||
m_encryption.Decrypt(tmpfile2, tmpfile);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,14 +15,12 @@
|
||||
// License along with this library; if not, write to the Free Software
|
||||
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
using System;
|
||||
using CoCoL;
|
||||
|
||||
namespace Duplicati.Library.Main.Operation.Common
|
||||
{
|
||||
public class Channels
|
||||
internal static class Channels
|
||||
{
|
||||
public Channels()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
public static readonly ChannelMarkerWrapper<LogMessage> LogChannel = new ChannelMarkerWrapper<LogMessage>("LogChannel");
|
||||
}}
|
||||
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
// License along with this library; if not, write to the Free Software
|
||||
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
using System;
|
||||
using CoCoL;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Duplicati.Library.Main.Operation.Common
|
||||
{
|
||||
@@ -26,70 +28,6 @@ namespace Duplicati.Library.Main.Operation.Common
|
||||
public bool IsVerbose;
|
||||
public bool IsDryRun;
|
||||
public bool IsRetry;
|
||||
|
||||
public static LogMessage Warning(string message, Exception ex)
|
||||
{
|
||||
return new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Warning,
|
||||
Message = message,
|
||||
Exception = ex
|
||||
};
|
||||
}
|
||||
|
||||
public static LogMessage Error(string message, Exception ex)
|
||||
{
|
||||
return new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Error,
|
||||
Message = message,
|
||||
Exception = ex
|
||||
};
|
||||
}
|
||||
|
||||
public static LogMessage Profiling(string message, Exception ex)
|
||||
{
|
||||
return new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Profiling,
|
||||
Message = message,
|
||||
Exception = ex
|
||||
};
|
||||
}
|
||||
|
||||
public static LogMessage Information(string message, Exception ex = null)
|
||||
{
|
||||
return new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Information,
|
||||
Message = message,
|
||||
Exception = ex
|
||||
};
|
||||
}
|
||||
|
||||
public static LogMessage Verbose(string message, params object[] args)
|
||||
{
|
||||
return new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Information,
|
||||
Message = string.Format(message, args),
|
||||
IsVerbose = true
|
||||
};
|
||||
}
|
||||
|
||||
public static LogMessage DryRun(string message, params object[] args)
|
||||
{
|
||||
return new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Information,
|
||||
Message = string.Format(message, args),
|
||||
IsDryRun = true
|
||||
};
|
||||
}
|
||||
|
||||
public static LogMessage RetryAttempt(string message, params object[] args)
|
||||
{
|
||||
return new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Information,
|
||||
Message = string.Format(message, args),
|
||||
IsRetry = true
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
// Copyright (C) 2015, The Duplicati Team
|
||||
// http://www.duplicati.com, info@duplicati.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as
|
||||
// published by the Free Software Foundation; either version 2.1 of the
|
||||
// License, or (at your option) any later version.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful, but
|
||||
// WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public
|
||||
// License along with this library; if not, write to the Free Software
|
||||
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
using System;
|
||||
using CoCoL;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Duplicati.Library.Main.Operation.Common
|
||||
{
|
||||
public class LogWrapper : IDisposable
|
||||
{
|
||||
private bool m_autoLeave;
|
||||
private IWriteChannel<LogMessage> m_channel;
|
||||
|
||||
//TODO: Fix this so the verbose messages are not sent unless required
|
||||
|
||||
public LogWrapper()
|
||||
{
|
||||
m_channel = ChannelManager.GetChannel(Channels.LogChannel.ForWrite);
|
||||
m_autoLeave = true;
|
||||
}
|
||||
|
||||
public LogWrapper(IWriteChannel<LogMessage> channel)
|
||||
{
|
||||
m_channel = channel;
|
||||
m_autoLeave = false;
|
||||
}
|
||||
|
||||
public Task WriteWarningAsync(string message, Exception ex)
|
||||
{
|
||||
return m_channel.WriteAsync(new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Warning,
|
||||
Message = message,
|
||||
Exception = ex
|
||||
});
|
||||
}
|
||||
|
||||
public Task WriteErrorAsync(string message, Exception ex)
|
||||
{
|
||||
return m_channel.WriteAsync(new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Error,
|
||||
Message = message,
|
||||
Exception = ex
|
||||
});
|
||||
}
|
||||
|
||||
public Task WriteProfilingAsync(string message, Exception ex)
|
||||
{
|
||||
return m_channel.WriteAsync(new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Profiling,
|
||||
Message = message,
|
||||
Exception = ex
|
||||
});
|
||||
}
|
||||
|
||||
public Task WriteInformationAsync(string message, Exception ex = null)
|
||||
{
|
||||
return m_channel.WriteAsync(new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Information,
|
||||
Message = message,
|
||||
Exception = ex
|
||||
});
|
||||
}
|
||||
|
||||
public Task WriteVerboseAsync(string message, params object[] args)
|
||||
{
|
||||
return m_channel.WriteAsync(new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Information,
|
||||
Message = string.Format(message, args),
|
||||
IsVerbose = true
|
||||
});
|
||||
}
|
||||
|
||||
public Task WriteDryRunAsync(string message, params object[] args)
|
||||
{
|
||||
return m_channel.WriteAsync(new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Information,
|
||||
Message = string.Format(message, args),
|
||||
IsDryRun = true
|
||||
});
|
||||
}
|
||||
|
||||
public Task WriteRetryAttemptAsync(string message, params object[] args)
|
||||
{
|
||||
return m_channel.WriteAsync(new LogMessage() {
|
||||
Level = Duplicati.Library.Logging.LogMessageType.Information,
|
||||
Message = string.Format(message, args),
|
||||
IsRetry = true
|
||||
});
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (m_autoLeave && m_channel != null)
|
||||
{
|
||||
m_autoLeave = false;
|
||||
if (m_channel is IRetireAbleChannel)
|
||||
((IRetireAbleChannel)m_channel).Retire();
|
||||
}
|
||||
m_channel = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Vendored
BIN
Binary file not shown.
Reference in New Issue
Block a user