Fixed channels to use type-safe initializations.

Updated logging channel to use a wrapper for later early-filter optimizations.
Various bug fixes and optimizations.
This commit is contained in:
Kenneth Skovhede
2016-02-24 23:48:42 +01:00
parent 0eba4bf098
commit 20f7b42945
19 changed files with 767 additions and 594 deletions
@@ -133,6 +133,9 @@
<Compile Include="Operation\Backup\BackendUploader.cs" />
<Compile Include="Operation\Common\IndexVolumeCreator.cs" />
<Compile Include="Operation\Backup\UploadSyntheticFilelist.cs" />
<Compile Include="Operation\Backup\Channels.cs" />
<Compile Include="Operation\Common\Channels.cs" />
<Compile Include="Operation\Common\LogWrapper.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Utility\Duplicati.Library.Utility.csproj">
@@ -23,57 +23,124 @@ using System.Collections.Generic;
namespace Duplicati.Library.Main.Operation.Backup
{
internal class UploadRequest
internal interface IUploadRequest
{
}
internal class FlushRequest : IUploadRequest
{
public Task<long> LastWriteSizeAync { get { return m_tcs.Task; } }
private TaskCompletionSource<long> m_tcs = new TaskCompletionSource<long>();
public void SetFlushed(long size)
{
m_tcs.TrySetResult(size);
}
}
internal class IndexVolumeUploadRequest : IUploadRequest
{
public IndexVolumeWriter IndexVolume { get; private set; }
public IndexVolumeUploadRequest(IndexVolumeWriter indexVolume)
{
IndexVolume = indexVolume;
}
}
internal class FilesetUploadRequest : IUploadRequest
{
public FilesetVolumeWriter Fileset { get; private set; }
public FilesetUploadRequest(FilesetVolumeWriter fileset)
{
Fileset = fileset;
}
}
internal class VolumeUploadRequest : IUploadRequest
{
public BlockVolumeWriter BlockVolume { get; private set; }
public IndexVolumeWriter IndexVolume { get; private set; }
public UploadRequest(BlockVolumeWriter blockvolume, IndexVolumeWriter indexvolume)
public VolumeUploadRequest(BlockVolumeWriter blockvolume, IndexVolumeWriter indexvolume)
{
BlockVolume = blockvolume;
IndexVolume = indexvolume;
}
}
internal static class BackendUploader
{
public static Task Run(Common.BackendHandler backend, Options options, Common.DatabaseCommon database, BackupResults results)
{
return AutomationExtensions.RunTask(new
{
Input = ChannelMarker.ForRead<UploadRequest>("BackendRequests"),
},
{
Input = Channels.BackendRequest.ForRead,
},
async self =>
async self =>
{
var inProgress = new Queue<KeyValuePair<int, Task>>();
var max_pending = options.AsynchronousUploadLimit == 0 ? long.MaxValue : options.AsynchronousUploadLimit;
var noIndexFiles = options.IndexfilePolicy == Options.IndexFileStrategy.None;
var active = 0;
var lastSize = -1L;
while(!self.Input.IsRetired)
{
var inProgress = new Queue<Task>();
var max_pending = options.AsynchronousUploadLimit == 0 ? long.MaxValue : options.AsynchronousUploadLimit;
if (options.IndexfilePolicy != Options.IndexFileStrategy.None)
max_pending = max_pending / 2;
while(!self.Input.IsRetired)
try
{
try
var req = await self.Input.ReadAsync();
KeyValuePair<int, Task> task = default(KeyValuePair<int, Task>);
if (req is VolumeUploadRequest)
{
var req = await self.Input.ReadAsync();
inProgress.Enqueue(backend.UploadFileAsync(req.BlockVolume, name => IndexVolumeCreator.CreateIndexVolume(name, options, database)));
lastSize = ((VolumeUploadRequest)req).BlockVolume.SourceSize;
if (noIndexFiles)
task = new KeyValuePair<int, Task>(1, backend.UploadFileAsync(((VolumeUploadRequest)req).BlockVolume, null));
else
task = new KeyValuePair<int, Task>(2, backend.UploadFileAsync(((VolumeUploadRequest)req).BlockVolume, name => IndexVolumeCreator.CreateIndexVolume(name, options, database)));
}
catch(Exception ex)
else if (req is FilesetUploadRequest)
task = new KeyValuePair<int, Task>(1, backend.UploadFileAsync(((FilesetUploadRequest)req).Fileset));
else if (req is IndexVolumeUploadRequest)
task = new KeyValuePair<int, Task>(1, backend.UploadFileAsync(((IndexVolumeUploadRequest)req).IndexVolume));
else if (req is FlushRequest)
{
if (!ex.IsRetiredException())
throw;
while(inProgress.Count > 0)
await inProgress.Dequeue().Value;
active = 0;
((FlushRequest)req).SetFlushed(lastSize);
}
if (task.Value != null)
{
inProgress.Enqueue(task);
active += task.Key;
}
while(inProgress.Count >= max_pending)
await inProgress.Dequeue();
}
results.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_WaitForUpload);
while(inProgress.Count > 0)
await inProgress.Dequeue();
catch(Exception ex)
{
if (!ex.IsRetiredException())
throw;
}
while(active >= max_pending)
{
var top = inProgress.Dequeue();
await top.Value;
active -= top.Key;
}
}
);
results.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_WaitForUpload);
while(inProgress.Count > 0)
await inProgress.Dequeue().Value;
});
}
}
@@ -0,0 +1,33 @@
// Copyright (C) 2015, The Duplicati Team
// http://www.duplicati.com, info@duplicati.com
//
// This library is free software; you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation; either version 2.1 of the
// License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
using System;
using CoCoL;
namespace Duplicati.Library.Main.Operation.Backup
{
internal static class Channels
{
public static readonly ChannelMarkerWrapper<IUploadRequest> BackendRequest = new ChannelMarkerWrapper<IUploadRequest>("BackendRequests");
public static readonly ChannelMarkerWrapper<VolumeUploadRequest> SpillPickup = new ChannelMarkerWrapper<VolumeUploadRequest>("SpillPickup");
public static readonly ChannelMarkerWrapper<DataBlock> OutputBlocks = new ChannelMarkerWrapper<DataBlock>("OutputBlocks");
public static readonly ChannelMarkerWrapper<MetadataPreProcess.FileEntry> AcceptedChangedFile = new ChannelMarkerWrapper<MetadataPreProcess.FileEntry>("AcceptedChangedFile");
public static readonly ChannelMarkerWrapper<MetadataPreProcess.FileEntry> ProcessedFiles = new ChannelMarkerWrapper<MetadataPreProcess.FileEntry>("ProcessedFiles");
public static readonly ChannelMarkerWrapper<string> SourcePaths = new ChannelMarkerWrapper<string>("SourcePaths");
public static readonly ChannelMarkerWrapper<Common.ProgressEvent> ProgressEvents = new ChannelMarkerWrapper<Common.ProgressEvent>("ProgressEvents");
}
}
@@ -21,7 +21,7 @@ using CoCoL;
namespace Duplicati.Library.Main.Operation.Backup
{
public struct DataBlock
internal struct DataBlock
{
public string HashKey;
public byte[] Data;
@@ -33,91 +33,91 @@ namespace Duplicati.Library.Main.Operation.Backup
public static Task Run(BackupDatabase database, Options options)
{
return AutomationExtensions.RunTask(
new
{
LogChannel = ChannelMarker.ForWrite<LogMessage>("LogChannel"),
Input = ChannelMarker.ForRead<DataBlock>("OutputBlocks"),
Output = ChannelMarker.ForWrite<UploadRequest>("BackendRequests"),
SpillPickup = ChannelMarker.ForWrite<UploadRequest>("SpillPickup"),
},
new
{
LogChannel = Common.Channels.LogChannel.ForWrite,
Input = Channels.OutputBlocks.ForRead,
Output = Channels.BackendRequest.ForWrite,
SpillPickup = Channels.SpillPickup.ForWrite,
},
async self =>
{
BlockVolumeWriter blockvolume = null;
async self =>
{
BlockVolumeWriter blockvolume = null;
var log = new LogWrapper(self.LogChannel);
try
try
{
while(true)
{
while(true)
var b = await self.Input.ReadAsync();
// Lazy-start a new block volume
if (blockvolume == null)
{
var b = await self.Input.ReadAsync();
// Lazy-start a new block volume
if (blockvolume == null)
// Before we start a new volume, probe to see if it exists
// This will delay creation of volumes for differential backups
// There can be a race, such that two workers determine that
// the block is missing, but this will be solved by the AddBlock call
// which runs atomically
if (await database.FindBlockIDAsync(b.HashKey, b.Size) >= 0)
{
// Before we start a new volume, probe to see if it exists
// This will delay creation of volumes for differential backups
// There can be a race, such that two workers determine that
// the block is missing, but this will be solved by the AddBlock call
// which runs atomically
if (await database.FindBlockIDAsync(b.HashKey, b.Size) >= 0)
{
b.TaskCompletion.TrySetResult(false);
continue;
}
blockvolume = new BlockVolumeWriter(options);
blockvolume.VolumeID = await database.RegisterRemoteVolumeAsync(blockvolume.RemoteFilename, RemoteVolumeType.Blocks, RemoteVolumeState.Temporary);
b.TaskCompletion.TrySetResult(false);
continue;
}
var newBlock = await database.AddBlockAsync(b.HashKey, b.Size, blockvolume.VolumeID);
b.TaskCompletion.TrySetResult(newBlock);
if (newBlock)
{
blockvolume.AddBlock(b.HashKey, b.Data, b.Offset, (int)b.Size, b.Hint);
if (blockvolume.Filesize > options.VolumeSize - options.Blocksize)
{
if (options.Dryrun)
{
blockvolume.Close();
await self.LogChannel.WriteAsync(LogMessage.DryRun("Would upload block volume: {0}, size: {1}", blockvolume.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(blockvolume.LocalFilename).Length)));
blockvolume.Dispose();
blockvolume = null;
}
else
{
//When uploading a new volume, we register the volumes and then flush the transaction
// this ensures that the local database and remote storage are as closely related as possible
await database.UpdateRemoteVolumeAsync(blockvolume.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
blockvolume.Close();
await database.CommitTransactionAsync("CommitAddBlockToOutputFlush");
await self.Output.WriteAsync(new UploadRequest(blockvolume, null));
blockvolume = null;
}
}
}
blockvolume = new BlockVolumeWriter(options);
blockvolume.VolumeID = await database.RegisterRemoteVolumeAsync(blockvolume.RemoteFilename, RemoteVolumeType.Blocks, RemoteVolumeState.Temporary);
}
}
catch(Exception ex)
{
if (ex.IsRetiredException())
var newBlock = await database.AddBlockAsync(b.HashKey, b.Size, blockvolume.VolumeID);
b.TaskCompletion.TrySetResult(newBlock);
if (newBlock)
{
// If we have collected data, merge all pending volumes into a single volume
if (blockvolume != null && blockvolume.SourceSize > 0)
await self.SpillPickup.WriteAsync(new UploadRequest(blockvolume, null));
}
throw;
blockvolume.AddBlock(b.HashKey, b.Data, b.Offset, (int)b.Size, b.Hint);
if (blockvolume.Filesize > options.VolumeSize - options.Blocksize)
{
if (options.Dryrun)
{
blockvolume.Close();
await log.WriteDryRunAsync("Would upload block volume: {0}, size: {1}", blockvolume.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(blockvolume.LocalFilename).Length));
blockvolume.Dispose();
blockvolume = null;
}
else
{
//When uploading a new volume, we register the volumes and then flush the transaction
// this ensures that the local database and remote storage are as closely related as possible
await database.UpdateRemoteVolumeAsync(blockvolume.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
blockvolume.Close();
await database.CommitTransactionAsync("CommitAddBlockToOutputFlush");
await self.Output.WriteAsync(new VolumeUploadRequest(blockvolume, null));
blockvolume = null;
}
}
}
}
}
);
catch(Exception ex)
{
if (ex.IsRetiredException())
{
// If we have collected data, merge all pending volumes into a single volume
if (blockvolume != null && blockvolume.SourceSize > 0)
await self.SpillPickup.WriteAsync(new VolumeUploadRequest(blockvolume, null));
}
throw;
}
});
}
@@ -36,14 +36,15 @@ namespace Duplicati.Library.Main.Operation.Backup
return AutomationExtensions.RunTask(
new
{
Input = ChannelMarker.ForRead<MetadataPreProcess.FileEntry>("AcceptedChangedFile"),
LogChannel = ChannelMarker.ForWrite<LogMessage>("LogChannel"),
ProgressChannel = ChannelMarker.ForWrite<ProgressEvent>("ProgressChannel"),
BlockOutput = ChannelMarker.ForWrite<DataBlock>("OutputBlocks")
Input = Channels.AcceptedChangedFile.ForRead,
LogChannel = Common.Channels.LogChannel.ForWrite,
ProgressChannel = Channels.ProgressEvents.ForWrite,
BlockOutput = Channels.OutputBlocks.ForWrite
},
async self =>
{
var log = new LogWrapper(self.LogChannel);
var blocksize = options.Blocksize;
var filehasher = System.Security.Cryptography.HashAlgorithm.Create(options.FileHashAlgorithm);
var blockhasher = System.Security.Cryptography.HashAlgorithm.Create(options.BlockHashAlgorithm);
@@ -69,7 +70,7 @@ namespace Duplicati.Library.Main.Operation.Backup
{
long fslen = -1;
try { fslen = fs.Length; }
catch (Exception ex) { await self.LogChannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read file length for file {0}", e.Path), ex)); }
catch (Exception ex) { await log.WriteWarningAsync(string.Format("Failed to read file length for file {0}", e.Path), ex); }
await self.ProgressChannel.WriteAsync(new ProgressEvent() { Filepath = e.Path, Length = fslen, Type = EventType.FileStarted });
send_close = true;
@@ -133,36 +134,36 @@ namespace Duplicati.Library.Main.Operation.Backup
if (oldHash != filekey)
{
if (oldHash == null)
await self.LogChannel.WriteAsync(LogMessage.Verbose("New file {0}", e.Path));
await log.WriteVerboseAsync("New file {0}", e.Path);
else
await self.LogChannel.WriteAsync(LogMessage.Verbose("File has changed {0}", e.Path));
await log.WriteVerboseAsync("File has changed {0}", e.Path);
if (e.OldId < 0)
{
await stats.AddAddedFile(filesize);
if (options.Dryrun)
await self.LogChannel.WriteAsync(LogMessage.DryRun("Would add new file {0}, size {1}", e.Path, Library.Utility.Utility.FormatSizeString(filesize)));
await log.WriteDryRunAsync("Would add new file {0}, size {1}", e.Path, Library.Utility.Utility.FormatSizeString(filesize));
}
else
{
await stats.AddModifiedFile(filesize);
if (options.Dryrun)
await self.LogChannel.WriteAsync(LogMessage.DryRun("Would add changed file {0}, size {1}", e.Path, Library.Utility.Utility.FormatSizeString(filesize)));
await log.WriteDryRunAsync("Would add changed file {0}, size {1}", e.Path, Library.Utility.Utility.FormatSizeString(filesize));
}
await AddFileToOutputAsync(e.Path, filesize, e.LastWrite, e.MetaHashAndSize, hashcollector, filekey, blocklisthashes, self.BlockOutput, blocksize, database);
}
else if (e.MetadataChanged)
{
await self.LogChannel.WriteAsync(LogMessage.Verbose("File has only metadata changes {0}", e.Path));
await log.WriteVerboseAsync("File has only metadata changes {0}", e.Path);
await AddFileToOutputAsync(e.Path, filesize, e.LastWrite, e.MetaHashAndSize, hashcollector, filekey, blocklisthashes, self.BlockOutput, blocksize, database);
}
else
{
// When we write the file to output, update the last modified time
await self.LogChannel.WriteAsync(LogMessage.Verbose("File has not changed {0}", e.Path));
await log.WriteVerboseAsync("File has not changed {0}", e.Path);
await database.AddUnmodifiedAsync(e.OldId, e.LastWrite);
}
}
@@ -172,7 +173,7 @@ namespace Duplicati.Library.Main.Operation.Backup
if (ex.IsRetiredException())
return;
else
await self.LogChannel.WriteAsync(LogMessage.Warning(string.Format("Failed to process file {0}", e.Path), ex));
await log.WriteWarningAsync(string.Format("Failed to process file {0}", e.Path), ex);
}
finally
{
@@ -42,11 +42,10 @@ namespace Duplicati.Library.Main.Operation.Backup
private Queue<string> m_mixinqueue;
private string[] m_changedfilelist;
[ChannelName("LogChannel")]
private IWriteChannel<LogMessage> m_logchannel;
[ChannelName("SourcePaths")]
private IWriteChannel<string> m_output;
private IWriteChannel<LogMessage> m_logchannel = Common.Channels.LogChannel.ForWrite;
private IWriteChannel<string> m_output = Backup.Channels.SourcePaths.ForWrite;
private LogWrapper m_log;
public FileEnumerationProcess(Snapshots.ISnapshotService snapshot, FileAttributes attributeFilter, Duplicati.Library.Utility.IFilter sourcefilter, Duplicati.Library.Utility.IFilter filter, Options.SymlinkStrategy symlinkPolicy, Options.HardlinkStrategy hardlinkPolicy, string[] changedfilelist)
@@ -61,6 +60,7 @@ namespace Duplicati.Library.Main.Operation.Backup
m_hardlinkmap = new Dictionary<string, string>();
m_mixinqueue = new Queue<string>();
m_changedfilelist = changedfilelist;
m_log = new LogWrapper(m_logchannel);
bool includes;
bool excludes;
@@ -93,13 +93,13 @@ namespace Duplicati.Library.Main.Operation.Backup
{
if (m_snapshot.IsBlockDevice(path))
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding block device: {0}", path));
await m_log.WriteVerboseAsync("Excluding block device: {0}", path);
return false;
}
}
catch (Exception ex)
{
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to process path: {0}", path), ex));
await m_log.WriteWarningAsync(string.Format("Failed to process path: {0}", path), ex);
return false;
}
@@ -108,7 +108,7 @@ namespace Duplicati.Library.Main.Operation.Backup
bool sourcematches;
if (m_sourcefilter.Matches(path, out sourcematches, out sourcematch) && sourcematches)
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Including source path: {0}", path));
await m_log.WriteVerboseAsync("Including source path: {0}", path);
return true;
}
@@ -122,7 +122,7 @@ namespace Duplicati.Library.Main.Operation.Backup
{
if (m_hardlinkPolicy == Options.HardlinkStrategy.None)
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding hardlink: {0} ({1})", path, id));
await m_log.WriteVerboseAsync("Excluding hardlink: {0} ({1})", path, id);
return false;
}
else if (m_hardlinkPolicy == Options.HardlinkStrategy.First)
@@ -130,7 +130,7 @@ namespace Duplicati.Library.Main.Operation.Backup
string prevPath;
if (m_hardlinkmap.TryGetValue(id, out prevPath))
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding hardlink ({1}) for: {0}, previous hardlink: {2}", path, id, prevPath));
await m_log.WriteVerboseAsync("Excluding hardlink ({1}) for: {0}, previous hardlink: {2}", path, id, prevPath);
return false;
}
else
@@ -142,7 +142,7 @@ namespace Duplicati.Library.Main.Operation.Backup
}
catch (Exception ex)
{
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to process path: {0}", path), ex));
await m_log.WriteWarningAsync(string.Format("Failed to process path: {0}", path), ex);
return false;
}
}
@@ -150,7 +150,7 @@ namespace Duplicati.Library.Main.Operation.Backup
// If we exclude files based on attributes, filter that
if ((m_attributeFilter & attributes) != 0)
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding path due to attribute filter: {0}", path));
await m_log.WriteVerboseAsync("Excluding path due to attribute filter: {0}", path);
return false;
}
@@ -158,25 +158,25 @@ namespace Duplicati.Library.Main.Operation.Backup
Library.Utility.IFilter match;
if (!Library.Utility.FilterExpression.Matches(m_enumeratefilter, path, out match))
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding path due to filter: {0} => {1}", path, match == null ? "null" : match.ToString()));
await m_log.WriteVerboseAsync("Excluding path due to filter: {0} => {1}", path, match == null ? "null" : match.ToString());
return false;
}
else if (match != null)
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Including path due to filter: {0} => {1}", path, match.ToString()));
await m_log.WriteVerboseAsync("Including path due to filter: {0} => {1}", path, match.ToString());
}
// If the file is a symlink, apply special handling
var isSymlink = (attributes & FileAttributes.ReparsePoint) == FileAttributes.ReparsePoint;
if (isSymlink && m_symlinkPolicy == Options.SymlinkStrategy.Ignore)
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Excluding symlink: {0}", path));
await m_log.WriteVerboseAsync("Excluding symlink: {0}", path);
return false;
}
if (isSymlink && m_symlinkPolicy == Options.SymlinkStrategy.Store)
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Storing symlink: {0}", path));
await m_log.WriteVerboseAsync("Storing symlink: {0}", path);
// We return false because we do not want to recurse into the path,
// but we add the symlink to the mixin so we process the symlink itself
@@ -28,58 +28,58 @@ namespace Duplicati.Library.Main.Operation.Backup
public static Task Start(Snapshots.ISnapshotService snapshot, Options options, BackupStatsCollector stats, BackupDatabase database)
{
return AutomationExtensions.RunTask(
new
new
{
LogChannel = Common.Channels.LogChannel.ForWrite,
Input = Channels.ProcessedFiles.ForRead,
Output = Channels.AcceptedChangedFile.ForWrite
},
async self =>
{
var EMPTY_METADATA = Utility.WrapMetadata(new Dictionary<string, string>(), options);
var blocksize = options.Blocksize;
var log = new LogWrapper(self.LogChannel);
while (true)
{
LogChannel = ChannelMarker.ForWrite<LogMessage>("LogChannel"),
Input = ChannelMarker.ForRead<MetadataPreProcess.FileEntry>("ProcessedFiles"),
Output = ChannelMarker.ForWrite<MetadataPreProcess.FileEntry>("AcceptedChangedFile")
},
var e = await self.Input.ReadAsync();
async self =>
{
var EMPTY_METADATA = Utility.WrapMetadata(new Dictionary<string, string>(), options);
var blocksize = options.Blocksize;
while (true)
long filestatsize = -1;
try
{
var e = await self.Input.ReadAsync();
filestatsize = snapshot.GetFileSize(e.Path);
}
catch
{
}
long filestatsize = -1;
try
{
filestatsize = snapshot.GetFileSize(e.Path);
}
catch
{
}
await stats.AddExaminedFile(filestatsize);
await stats.AddExaminedFile(filestatsize);
e.MetaHashAndSize = options.StoreMetadata ? Utility.WrapMetadata(await MetadataGenerator.GenerateMetadataAsync(e.Path, e.Attributes, options, snapshot, log), options) : EMPTY_METADATA;
e.MetaHashAndSize = options.StoreMetadata ? Utility.WrapMetadata(await MetadataGenerator.GenerateMetadataAsync(e.Path, e.Attributes, options, snapshot, self.LogChannel), options) : EMPTY_METADATA;
var timestampChanged = e.LastWrite != e.OldModified || e.LastWrite.Ticks == 0 || e.OldModified.Ticks == 0;
var filesizeChanged = filestatsize < 0 || e.LastFileSize < 0 || filestatsize != e.LastFileSize;
var tooLargeFile = options.SkipFilesLargerThan != long.MaxValue && options.SkipFilesLargerThan != 0 && filestatsize >= 0 && filestatsize > options.SkipFilesLargerThan;
e.MetadataChanged = !options.SkipMetadata && (e.MetaHashAndSize.Size != e.OldMetaSize || e.MetaHashAndSize.Hash != e.OldMetaHash);
var timestampChanged = e.LastWrite != e.OldModified || e.LastWrite.Ticks == 0 || e.OldModified.Ticks == 0;
var filesizeChanged = filestatsize < 0 || e.LastFileSize < 0 || filestatsize != e.LastFileSize;
var tooLargeFile = options.SkipFilesLargerThan != long.MaxValue && options.SkipFilesLargerThan != 0 && filestatsize >= 0 && filestatsize > options.SkipFilesLargerThan;
e.MetadataChanged = !options.SkipMetadata && (e.MetaHashAndSize.Size != e.OldMetaSize || e.MetaHashAndSize.Hash != e.OldMetaHash);
if ((e.OldId < 0 || options.DisableFiletimeCheck || timestampChanged || filesizeChanged || e.MetadataChanged) && !tooLargeFile)
{
await self.LogChannel.WriteAsync(LogMessage.Verbose("Checking file for changes {0}, new: {1}, timestamp changed: {2}, size changed: {3}, metadatachanged: {4}, {5} vs {6}", e.Path, e.OldId <= 0, timestampChanged, filesizeChanged, e.MetadataChanged, e.LastWrite, e.OldModified));
await self.Output.WriteAsync(e);
}
if ((e.OldId < 0 || options.DisableFiletimeCheck || timestampChanged || filesizeChanged || e.MetadataChanged) && !tooLargeFile)
{
await log.WriteVerboseAsync("Checking file for changes {0}, new: {1}, timestamp changed: {2}, size changed: {3}, metadatachanged: {4}, {5} vs {6}", e.Path, e.OldId <= 0, timestampChanged, filesizeChanged, e.MetadataChanged, e.LastWrite, e.OldModified);
await self.Output.WriteAsync(e);
}
else
{
if (options.SkipFilesLargerThan == long.MaxValue || options.SkipFilesLargerThan == 0 || snapshot.GetFileSize(e.Path) < options.SkipFilesLargerThan)
await log.WriteVerboseAsync("Skipped checking file, because timestamp was not updated {0}", e.Path);
else
{
if (options.SkipFilesLargerThan == long.MaxValue || options.SkipFilesLargerThan == 0 || snapshot.GetFileSize(e.Path) < options.SkipFilesLargerThan)
await self.LogChannel.WriteAsync(LogMessage.Verbose("Skipped checking file, because timestamp was not updated {0}", e.Path));
else
await self.LogChannel.WriteAsync(LogMessage.Verbose("Skipped checking file, because the size exceeds limit {0}", e.Path));
await log.WriteVerboseAsync("Skipped checking file, because the size exceeds limit {0}", e.Path);
await database.AddUnmodifiedAsync(e.OldId, e.LastWrite);
}
await database.AddUnmodifiedAsync(e.OldId, e.LastWrite);
}
}
);
});
}
}
}
@@ -24,7 +24,7 @@ namespace Duplicati.Library.Main.Operation.Backup
{
internal static class MetadataGenerator
{
public static async Task<Dictionary<string, string>> GenerateMetadataAsync(string path, System.IO.FileAttributes attributes, Options options, Snapshots.ISnapshotService snapshot, IWriteChannel<LogMessage> logchannel)
public static async Task<Dictionary<string, string>> GenerateMetadataAsync(string path, System.IO.FileAttributes attributes, Options options, Snapshots.ISnapshotService snapshot, LogWrapper log)
{
try
{
@@ -47,7 +47,7 @@ namespace Duplicati.Library.Main.Operation.Backup
}
catch (Exception ex)
{
await logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read timestamp on \"{0}\"", path), ex));
await log.WriteWarningAsync(string.Format("Failed to read timestamp on \"{0}\"", path), ex);
}
}
@@ -59,7 +59,7 @@ namespace Duplicati.Library.Main.Operation.Backup
}
catch (Exception ex)
{
await logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read timestamp on \"{0}\"", path), ex));
await log.WriteWarningAsync(string.Format("Failed to read timestamp on \"{0}\"", path), ex);
}
}
}
@@ -72,7 +72,7 @@ namespace Duplicati.Library.Main.Operation.Backup
}
catch(Exception ex)
{
await logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to process metadata for \"{0}\", storing empty metadata", path), ex));
await log.WriteWarningAsync(string.Format("Failed to process metadata for \"{0}\", storing empty metadata", path), ex);
return new Dictionary<string, string>();
}
}
@@ -47,18 +47,11 @@ namespace Duplicati.Library.Main.Operation.Backup
public bool MetadataChanged;
}
[ChannelName("SourcePaths")]
private IReadChannel<string> m_input;
[ChannelName("LogChannel")]
private IWriteChannel<LogMessage> m_logchannel;
[ChannelName("ProcessedFiles")]
private IWriteChannel<FileEntry> m_output;
[ChannelName("OutputBlocks")]
private IWriteChannel<DataBlock> m_blockoutput;
private IReadChannel<string> m_input = Backup.Channels.SourcePaths.ForRead;
private IWriteChannel<LogMessage> m_logchannel = Common.Channels.LogChannel.ForWrite;
private IWriteChannel<FileEntry> m_output = Backup.Channels.ProcessedFiles.ForWrite;
private IWriteChannel<DataBlock> m_blockoutput = Backup.Channels.OutputBlocks.ForWrite;
private LogWrapper m_log;
private Snapshots.ISnapshotService m_snapshot;
private Options m_options;
@@ -72,6 +65,7 @@ namespace Duplicati.Library.Main.Operation.Backup
m_snapshot = snapshot;
m_options = options;
m_database = database;
m_log = new LogWrapper(m_logchannel);
m_blocksize = m_options.Blocksize;
EMPTY_METADATA = Utility.WrapMetadata(new Dictionary<string, string>(), options);
}
@@ -82,13 +76,13 @@ namespace Duplicati.Library.Main.Operation.Backup
{
if (m_options.SymlinkPolicy == Options.SymlinkStrategy.Ignore)
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Ignoring symlink {0}", path));
await m_log.WriteVerboseAsync("Ignoring symlink {0}", path);
return false;
}
if (m_options.SymlinkPolicy == Options.SymlinkStrategy.Store)
{
var metadata = await MetadataGenerator.GenerateMetadataAsync(path, attributes, m_options, m_snapshot, m_logchannel);
var metadata = await MetadataGenerator.GenerateMetadataAsync(path, attributes, m_options, m_snapshot, m_log);
if (!metadata.ContainsKey("CoreSymlinkTarget"))
metadata["CoreSymlinkTarget"] = m_snapshot.GetSymlinkTarget(path);
@@ -96,7 +90,7 @@ namespace Duplicati.Library.Main.Operation.Backup
var metahash = Utility.WrapMetadata(metadata, m_options);
await AddSymlinkToOutputAsync(path, DateTime.UtcNow, metahash);
await m_logchannel.WriteAsync(LogMessage.Verbose("Stored symlink {0}", path));
await m_log.WriteVerboseAsync("Stored symlink {0}", path);
// Don't process further
return false;
}
@@ -108,14 +102,14 @@ namespace Duplicati.Library.Main.Operation.Backup
if (m_options.StoreMetadata)
{
metahash = Utility.WrapMetadata(await MetadataGenerator.GenerateMetadataAsync(path, attributes, m_options, m_snapshot, m_logchannel), m_options);
metahash = Utility.WrapMetadata(await MetadataGenerator.GenerateMetadataAsync(path, attributes, m_options, m_snapshot, m_log), m_options);
}
else
{
metahash = EMPTY_METADATA;
}
await m_logchannel.WriteAsync(LogMessage.Verbose("Adding directory {0}", path));
await m_log.WriteVerboseAsync("Adding directory {0}", path);
await AddFolderToOutputAsync(path, lastwrite, metahash);
return false;
}
@@ -178,7 +172,7 @@ namespace Duplicati.Library.Main.Operation.Backup
}
catch (Exception ex)
{
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read timestamp on \"{0}\"", path), ex));
await m_log.WriteWarningAsync(string.Format("Failed to read timestamp on \"{0}\"", path), ex);
}
try
@@ -187,7 +181,7 @@ namespace Duplicati.Library.Main.Operation.Backup
}
catch (Exception ex)
{
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to read attributes on \"{0}\"", path), ex));
await m_log.WriteWarningAsync(string.Format("Failed to read attributes on \"{0}\"", path), ex);
}
// If we only have metadata, stop here
@@ -210,7 +204,7 @@ namespace Duplicati.Library.Main.Operation.Backup
}
catch(Exception ex)
{
await m_logchannel.WriteAsync(LogMessage.Error(string.Format("Failed to process entry, path: {0}", path), ex));
await m_log.WriteErrorAsync(string.Format("Failed to process entry, path: {0}", path), ex);
}
}
}
@@ -28,55 +28,55 @@ namespace Duplicati.Library.Main.Operation.Backup
public static Task Run(BackupResults stat)
{
return AutomationExtensions.RunTask(new
{
Input = ChannelMarker.ForRead<ProgressEvent>("ProgressChannel")
},
async self =>
{
var filesStarted = new Dictionary<string, long>();
var fileProgress = new Dictionary<string, long>();
string current = null;
{
Input = Channels.ProgressEvents.ForRead
},
async self =>
{
var filesStarted = new Dictionary<string, long>();
var fileProgress = new Dictionary<string, long>();
string current = null;
while(true)
while(true)
{
var t = await self.Input.ReadAsync();
switch(t.Type)
{
var t = await self.Input.ReadAsync();
switch(t.Type)
{
case EventType.FileStarted:
filesStarted[t.Filepath] = t.Length;
fileProgress[t.Filepath] = 0;
break;
case EventType.FileProgressUpdate:
if (t.Filepath == current)
stat.OperationProgressUpdater.UpdateFileProgress(t.Length);
break;
case EventType.FileClosed:
if (fileProgress.ContainsKey(t.Filepath))
fileProgress[t.Filepath] = t.Length;
if (t.Filepath == current)
{
stat.OperationProgressUpdater.UpdateFileProgress(t.Length);
current = null;
}
filesStarted.Remove(t.Filepath);
fileProgress.Remove(t.Filepath);
break;
}
if (current == null)
{
current = filesStarted.OrderByDescending(x => x.Value).Select(x => x.Key).FirstOrDefault();
if (current != null)
case EventType.FileStarted:
filesStarted[t.Filepath] = t.Length;
fileProgress[t.Filepath] = 0;
break;
case EventType.FileProgressUpdate:
if (t.Filepath == current)
stat.OperationProgressUpdater.UpdateFileProgress(t.Length);
break;
case EventType.FileClosed:
if (fileProgress.ContainsKey(t.Filepath))
fileProgress[t.Filepath] = t.Length;
if (t.Filepath == current)
{
stat.OperationProgressUpdater.StartFile(current, filesStarted[current]);
if (fileProgress.ContainsKey(current) && fileProgress[current] > 0)
stat.OperationProgressUpdater.UpdateFileProgress(fileProgress[current]);
stat.OperationProgressUpdater.UpdateFileProgress(t.Length);
current = null;
}
filesStarted.Remove(t.Filepath);
fileProgress.Remove(t.Filepath);
break;
}
if (current == null)
{
current = filesStarted.OrderByDescending(x => x.Value).Select(x => x.Key).FirstOrDefault();
if (current != null)
{
stat.OperationProgressUpdater.StartFile(current, filesStarted[current]);
if (fileProgress.ContainsKey(current) && fileProgress[current] > 0)
stat.OperationProgressUpdater.UpdateFileProgress(fileProgress[current]);
}
}
});
}
});
}
@@ -36,95 +36,94 @@ namespace Duplicati.Library.Main.Operation.Backup
public static Task Run(Options options, BackupDatabase database)
{
return AutomationExtensions.RunTask(
new
{
Input = ChannelMarker.ForRead<UploadRequest>("SpillPickup"),
Output = ChannelMarker.ForWrite<UploadRequest>("BackendRequests"),
},
new
{
Input = Channels.SpillPickup.ForRead,
Output = Channels.BackendRequest.ForWrite,
},
async self =>
{
var lst = new List<UploadRequest>();
async self =>
{
var lst = new List<VolumeUploadRequest>();
while(!self.Input.IsRetired)
try
{
lst.Add((UploadRequest)await self.Input.ReadAsync());
}
catch (Exception ex)
{
if (ex.IsRetiredException())
break;
throw;
}
while(lst.Count > 1)
while(!self.Input.IsRetired)
try
{
lst.Add((VolumeUploadRequest)await self.Input.ReadAsync());
}
catch (Exception ex)
{
if (ex.IsRetiredException())
break;
throw;
}
UploadRequest target = null;
var source = lst[0];
// Finalize the current work
source.BlockVolume.Close();
while(lst.Count > 1)
{
// Remove it from the list of active operations
lst.RemoveAt(0);
VolumeUploadRequest target = null;
var source = lst[0];
var buffer = new byte[options.Blocksize];
// Finalize the current work
source.BlockVolume.Close();
using(var rd = new BlockVolumeReader(options.CompressionModule, source.BlockVolume.LocalFilename, options))
// Remove it from the list of active operations
lst.RemoveAt(0);
var buffer = new byte[options.Blocksize];
using(var rd = new BlockVolumeReader(options.CompressionModule, source.BlockVolume.LocalFilename, options))
{
foreach(var file in rd.Blocks)
{
foreach(var file in rd.Blocks)
// Grab a target
if (target == null)
{
// Grab a target
if (target == null)
if (lst.Count == 0)
{
if (lst.Count == 0)
{
// No more targets, make one
target = new UploadRequest(new BlockVolumeWriter(options), null);
target.BlockVolume.VolumeID = await database.RegisterRemoteVolumeAsync(target.BlockVolume.RemoteFilename, RemoteVolumeType.Blocks, RemoteVolumeState.Temporary);
}
else
{
// Grab the next target
target = lst[0];
lst.RemoveAt(0);
}
// No more targets, make one
target = new VolumeUploadRequest(new BlockVolumeWriter(options), null);
target.BlockVolume.VolumeID = await database.RegisterRemoteVolumeAsync(target.BlockVolume.RemoteFilename, RemoteVolumeType.Blocks, RemoteVolumeState.Temporary);
}
var len = rd.ReadBlock(file.Key, buffer);
target.BlockVolume.AddBlock(file.Key, buffer, 0, len, Duplicati.Library.Interface.CompressionHint.Default);
await database.MoveBlockToVolumeAsync(file.Key, len, source.BlockVolume.VolumeID, target.BlockVolume.VolumeID);
if (target.BlockVolume.Filesize > options.VolumeSize - options.Blocksize)
else
{
await self.Output.WriteAsync(target);
target = null;
// Grab the next target
target = lst[0];
lst.RemoveAt(0);
}
}
var len = rd.ReadBlock(file.Key, buffer);
target.BlockVolume.AddBlock(file.Key, buffer, 0, len, Duplicati.Library.Interface.CompressionHint.Default);
await database.MoveBlockToVolumeAsync(file.Key, len, source.BlockVolume.VolumeID, target.BlockVolume.VolumeID);
if (target.BlockVolume.Filesize > options.VolumeSize - options.Blocksize)
{
await self.Output.WriteAsync(target);
target = null;
}
}
// Make sure they are out of the database
System.IO.File.Delete(source.BlockVolume.LocalFilename);
await database.SafeDeleteRemoteVolumeAsync(source.BlockVolume.RemoteFilename);
// Re-inject the target if it has content
if (target != null)
lst.Insert(lst.Count == 0 ? 0 : 1, target);
}
foreach(var n in lst)
{
n.BlockVolume.Close();
await self.Output.WriteAsync(n);
}
// Make sure they are out of the database
System.IO.File.Delete(source.BlockVolume.LocalFilename);
await database.SafeDeleteRemoteVolumeAsync(source.BlockVolume.RemoteFilename);
// Re-inject the target if it has content
if (target != null)
lst.Insert(lst.Count == 0 ? 0 : 1, target);
}
);
foreach(var n in lst)
{
n.BlockVolume.Close();
await self.Output.WriteAsync(n);
}
});
}
}
}
@@ -26,113 +26,110 @@ namespace Duplicati.Library.Main.Operation.Backup
{
internal static class UploadSyntheticFilelist
{
public static Task Run(Common.BackendHandler backend, BackupDatabase database, Options options, BackupResults result)
public static Task Run(BackupDatabase database, Options options, BackupResults result)
{
return AutomationExtensions.RunTask(new
{
LogChannel = ChannelMarker.ForWrite<LogMessage>("LogChannel"),
},
{
LogChannel = Common.Channels.LogChannel.ForWrite,
UploadChannel = Channels.BackendRequest.ForWrite
},
async self =>
async self =>
{
var log = new LogWrapper(self.LogChannel);
var incompleteFilesets = await database.GetIncompleteFilesetsAsync();
if (incompleteFilesets.Length != 0)
{
await database.CommitTransactionAsync("PreSyntheticFilelist");
var incompleteFilesets = await database.GetIncompleteFilesetsAsync();
if (incompleteFilesets.Length != 0)
result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_PreviousBackupFinalize);
await log.WriteInformationAsync("Uploading filelist from previous interrupted backup");
var incompleteSet = incompleteFilesets.Last();
var badIds = from n in incompleteFilesets select n.Key;
var prevs = (from n in await database.GetFilesetTimesAsync()
where
n.Key < incompleteSet.Key
&&
!badIds.Contains(n.Key)
orderby n.Key
select n.Key).ToArray();
var prevId = prevs.Length == 0 ? -1 : prevs.Last();
FilesetVolumeWriter fsw = null;
try
{
await database.CommitTransactionAsync("PreSyntheticFilelist");
var s = 1;
var fileTime = incompleteSet.Value + TimeSpan.FromSeconds(s);
var oldFilesetID = incompleteSet.Key;
result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_PreviousBackupFinalize);
await self.LogChannel.WriteAsync(LogMessage.Information("Uploading filelist from previous interrupted backup"));
var incompleteSet = incompleteFilesets.Last();
var badIds = from n in incompleteFilesets select n.Key;
var prevs = (from n in await database.GetFilesetTimesAsync()
where
n.Key < incompleteSet.Key
&&
!badIds.Contains(n.Key)
orderby n.Key
select n.Key).ToArray();
var prevId = prevs.Length == 0 ? -1 : prevs.Last();
FilesetVolumeWriter fsw = null;
try
// Probe for an unused filename
while (s < 60)
{
var s = 1;
var fileTime = incompleteSet.Value + TimeSpan.FromSeconds(s);
var oldFilesetID = incompleteSet.Key;
var id = await database.GetRemoteVolumeIDAsync(VolumeBase.GenerateFilename(RemoteVolumeType.Files, options, null, fileTime));
if (id < 0)
break;
// Probe for an unused filename
while (s < 60)
{
var id = await database.GetRemoteVolumeIDAsync(VolumeBase.GenerateFilename(RemoteVolumeType.Files, options, null, fileTime));
if (id < 0)
break;
fileTime = incompleteSet.Value + TimeSpan.FromSeconds(++s);
}
fsw = new FilesetVolumeWriter(options, fileTime);
fsw.VolumeID = await database.RegisterRemoteVolumeAsync(fsw.RemoteFilename, RemoteVolumeType.Files, RemoteVolumeState.Temporary);
if (!string.IsNullOrEmpty(options.ControlFiles))
foreach(var p in options.ControlFiles.Split(new char[] { System.IO.Path.PathSeparator }, StringSplitOptions.RemoveEmptyEntries))
fsw.AddControlFile(p, options.GetCompressionHintFromFilename(p));
var newFilesetID = await database.CreateFilesetAsync(fsw.VolumeID, fileTime);
await database.LinkFilesetToVolumeAsync(newFilesetID, fsw.VolumeID);
await database.AppendFilesFromPreviousSetAsync(null, newFilesetID, prevId, fileTime);
await database.WriteFilesetAsync(fsw, newFilesetID);
if (options.Dryrun)
{
await self.LogChannel.WriteAsync(LogMessage.DryRun("Would upload fileset: {0}, size: {1}", fsw.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(fsw.LocalFilename).Length)));
}
else
{
await database.UpdateRemoteVolumeAsync(fsw.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
await database.CommitTransactionAsync("CommitUpdateFilelistVolume");
await backend.UploadFileAsync(fsw);
fsw = null;
}
fileTime = incompleteSet.Value + TimeSpan.FromSeconds(++s);
}
catch
fsw = new FilesetVolumeWriter(options, fileTime);
fsw.VolumeID = await database.RegisterRemoteVolumeAsync(fsw.RemoteFilename, RemoteVolumeType.Files, RemoteVolumeState.Temporary);
if (!string.IsNullOrEmpty(options.ControlFiles))
foreach(var p in options.ControlFiles.Split(new char[] { System.IO.Path.PathSeparator }, StringSplitOptions.RemoveEmptyEntries))
fsw.AddControlFile(p, options.GetCompressionHintFromFilename(p));
var newFilesetID = await database.CreateFilesetAsync(fsw.VolumeID, fileTime);
await database.LinkFilesetToVolumeAsync(newFilesetID, fsw.VolumeID);
await database.AppendFilesFromPreviousSetAsync(null, newFilesetID, prevId, fileTime);
await database.WriteFilesetAsync(fsw, newFilesetID);
if (options.Dryrun)
{
await database.RollbackTransactionAsync();
throw;
await log.WriteDryRunAsync("Would upload fileset: {0}, size: {1}", fsw.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(fsw.LocalFilename).Length));
}
finally
else
{
if (fsw != null)
try { fsw.Dispose(); }
catch { fsw = null; }
}
await database.UpdateRemoteVolumeAsync(fsw.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
await database.CommitTransactionAsync("CommitUpdateFilelistVolume");
await self.UploadChannel.WriteAsync(new FilesetUploadRequest(fsw));
fsw = null;
}
}
if (options.IndexfilePolicy != Options.IndexFileStrategy.None)
catch
{
var blockhasher = System.Security.Cryptography.HashAlgorithm.Create(options.BlockHashAlgorithm);
var hashsize = blockhasher.HashSize / 8;
await database.RollbackTransactionAsync();
throw;
}
finally
{
if (fsw != null)
try { fsw.Dispose(); }
catch { fsw = null; }
}
}
foreach(var blockfile in await database.GetMissingIndexFilesAsync())
if (options.IndexfilePolicy != Options.IndexFileStrategy.None)
{
foreach(var blockfile in await database.GetMissingIndexFilesAsync())
{
await log.WriteInformationAsync(string.Format("Re-creating missing index file for {0}", blockfile));
var w = await Common.IndexVolumeCreator.CreateIndexVolume(blockfile, options, database);
if (options.Dryrun)
await log.WriteDryRunAsync("would upload new index file {0}, with size {1}, previous size {2}", w.RemoteFilename, Library.Utility.Utility.FormatSizeString(new System.IO.FileInfo(w.LocalFilename).Length), Library.Utility.Utility.FormatSizeString(w.Filesize));
else
{
await self.LogChannel.WriteAsync(LogMessage.Information(string.Format("Re-creating missing index file for {0}", blockfile)));
var w = await Common.IndexVolumeCreator.CreateIndexVolume(blockfile, options, database);
if (options.Dryrun)
await self.LogChannel.WriteAsync(LogMessage.DryRun("would upload new index file {0}, with size {1}, previous size {2}", w.RemoteFilename, Library.Utility.Utility.FormatSizeString(new System.IO.FileInfo(w.LocalFilename).Length), Library.Utility.Utility.FormatSizeString(w.Filesize)));
else
{
await database.UpdateRemoteVolumeAsync(w.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
await backend.UploadFileAsync(w);
}
await database.UpdateRemoteVolumeAsync(w.RemoteFilename, RemoteVolumeState.Uploading, -1, null);
await self.UploadChannel.WriteAsync(new IndexVolumeUploadRequest(w));
}
}
}
);
});
}
}
}
+122 -105
View File
@@ -18,30 +18,18 @@ namespace Duplicati.Library.Main.Operation
private readonly Options m_options;
private string m_backendurl;
private byte[] m_blockbuffer;
private byte[] m_blocklistbuffer;
private System.Security.Cryptography.HashAlgorithm m_blockhasher;
private System.Security.Cryptography.HashAlgorithm m_filehasher;
private LocalBackupDatabase m_database;
private System.Data.IDbTransaction m_transaction;
private BlockVolumeWriter m_blockvolume;
private IndexVolumeWriter m_indexvolume;
private readonly IMetahash EMPTY_METADATA;
private Library.Utility.IFilter m_filter;
private Library.Utility.IFilter m_sourceFilter;
private BackupResults m_result;
// Speed up things by caching this
private int m_blocksize;
public BackupHandler(string backendurl, Options options, BackupResults results)
{
EMPTY_METADATA = Utility.WrapMetadata(new Dictionary<string, string>(), options);
m_options = options;
m_result = results;
m_backendurl = backendurl;
@@ -79,39 +67,38 @@ namespace Duplicati.Library.Main.Operation
var enumeratorTask = new Backup.FileEnumerationProcess(snapshot, options.FileAttributeFilter, sourcefilter, filter, options.SymlinkPolicy, options.HardlinkPolicy, options.ChangedFilelist).RunAsync();
var counterTask = AutomationExtensions.RunTask(new
{
Input = ChannelMarker.ForRead<string>("SourcePaths")
},
async self =>
{
var count = 0L;
var size = 0L;
{
Input = Backup.Channels.SourcePaths.ForRead
},
async self =>
{
var count = 0L;
var size = 0L;
try
try
{
while (!token.IsCancellationRequested)
{
while (!token.IsCancellationRequested)
var path = await self.Input.ReadAsync();
count++;
try
{
var path = await self.Input.ReadAsync();
count++;
try
{
size += snapshot.GetFileSize(path);
}
catch
{
}
result.OperationProgressUpdater.UpdatefileCount(count, size, false);
size += snapshot.GetFileSize(path);
}
}
finally
{
result.OperationProgressUpdater.UpdatefileCount(count, size, true);
catch
{
}
result.OperationProgressUpdater.UpdatefileCount(count, size, false);
}
}
);
finally
{
result.OperationProgressUpdater.UpdatefileCount(count, size, true);
}
});
return Task.WhenAll(enumeratorTask, counterTask);
}
@@ -146,7 +133,7 @@ namespace Duplicati.Library.Main.Operation
}
}
private static async Task RunMainOperation(Snapshots.ISnapshotService snapshot, Backup.BackupDatabase database, Common.BackendHandler backend, Backup.BackupStatsCollector stats, Options options, IFilter sourcefilter, IFilter filter, BackupResults result)
private static async Task RunMainOperation(Snapshots.ISnapshotService snapshot, Backup.BackupDatabase database, Backup.BackupStatsCollector stats, Options options, IFilter sourcefilter, IFilter filter, BackupResults result)
{
using(new Logging.Timer("BackupMainOperation"))
{
@@ -160,8 +147,7 @@ namespace Duplicati.Library.Main.Operation
Backup.FilePreFilterProcess.Start(snapshot, options, stats, database),
new Backup.MetadataPreProcess(snapshot, options, database).RunAsync(),
Backup.SpillCollectorProcess.Run(options, database),
Backup.ProgressHandler.Run(result),
Backup.BackendUploader.Run(backend, options, database, result)
Backup.ProgressHandler.Run(result)
);
}
@@ -271,6 +257,35 @@ namespace Duplicati.Library.Main.Operation
}
public void Run(string[] sources, Library.Utility.IFilter filter)
{
RunAsync(sources, filter).WaitForTaskOrThrow();
}
private static Exception BuildException(Exception source, params Task[] tasks)
{
if (tasks == null || tasks.Length == 0)
return source;
var ex = new List<Exception>();
ex.Add(source);
foreach(var t in tasks)
if (t != null)
{
if (!t.IsCompleted && !t.IsFaulted && !t.IsCanceled)
t.Wait(500);
if (t.IsFaulted && t.Exception != null)
ex.Add(t.Exception);
}
if (ex.Count == 1)
return ex.First();
else
return new AggregateException(ex.First().Message, ex);
}
private async Task RunAsync(string[] sources, Library.Utility.IFilter filter)
{
m_result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_Begin);
@@ -278,101 +293,97 @@ namespace Duplicati.Library.Main.Operation
using(new ChannelScope(true))
using(m_database = new LocalBackupDatabase(m_options.Dbpath, m_options))
{
// Start the log handler
var lh = Common.LogHandler.Run(m_result);
m_result.SetDatabase(m_database);
m_result.Dryrun = m_options.Dryrun;
// Check the database integrity
Utility.UpdateOptionsFromDb(m_database, m_options);
Utility.VerifyParameters(m_database, m_options);
m_blocksize = m_options.Blocksize;
m_blockbuffer = new byte[m_options.Blocksize * Math.Max(1, m_options.FileReadBufferSize / m_options.Blocksize)];
m_blocklistbuffer = new byte[m_options.Blocksize];
m_blockhasher = System.Security.Cryptography.HashAlgorithm.Create(m_options.BlockHashAlgorithm);
m_filehasher = System.Security.Cryptography.HashAlgorithm.Create(m_options.FileHashAlgorithm);
if (m_blockhasher == null)
throw new Exception(Strings.Foresthash.InvalidHashAlgorithm(m_options.BlockHashAlgorithm));
if (m_filehasher == null)
throw new Exception(Strings.Foresthash.InvalidHashAlgorithm(m_options.FileHashAlgorithm));
if (!m_blockhasher.CanReuseTransform)
throw new Exception(Strings.Foresthash.InvalidCryptoSystem(m_options.BlockHashAlgorithm));
if (!m_filehasher.CanReuseTransform)
throw new Exception(Strings.Foresthash.InvalidCryptoSystem(m_options.FileHashAlgorithm));
m_database.VerifyConsistency(null, m_options.Blocksize, m_options.BlockhashSize);
// If there is no filter, we set an empty filter to simplify the code
// If there is a filter, we make sure that the sources are included
m_filter = filter ?? new Library.Utility.FilterExpression();
m_sourceFilter = new Library.Utility.FilterExpression(sources, true);
Task parallelScanner = null;
Task uploader = null;
try
{
var flushReq = new Backup.FlushRequest();
// Setup runners and instances here
using(var db = new Backup.BackupDatabase(m_database))
using(var backend = new BackendManager(m_backendurl, m_options, m_result.BackendWriter, m_database))
using(var filesetvolume = new FilesetVolumeWriter(m_options, m_database.OperationTimestamp))
using(var logtarget = ChannelManager.GetChannel<Common.LogMessage>("LogChannel").AsWriteOnly())
using(var stats = new Backup.BackupStatsCollector(m_result))
using(var bk = new Common.BackendHandler(m_options, m_backendurl, db, stats))
using(var logtarget = ChannelManager.GetChannel(Common.Channels.LogChannel.ForWrite))
{
var lh = Common.LogHandler.Run(m_result);
long lastVolumeSize = -1L;
using(var snapshot = GetSnapshot(sources, m_options, m_result))
using(var uploadtarget = ChannelManager.GetChannel(Backup.Channels.BackendRequest.ForWrite))
{
var counterToken = new CancellationTokenSource();
try
using(var snapshot = GetSnapshot(sources, m_options, m_result))
{
// Start the parallel scanner
parallelScanner = CountFilesHandler(snapshot, m_result, m_options, m_sourceFilter, m_filter, counterToken.Token);
var counterToken = new CancellationTokenSource();
// Do a remote verification, unless disabled
PreBackupVerify(backend);
using(var db = new Backup.BackupDatabase(m_database))
using(var stats = new Backup.BackupStatsCollector(m_result))
using(var bk = new Common.BackendHandler(m_options, m_backendurl, db, stats))
try
{
// Start the parallel scanner
parallelScanner = CountFilesHandler(snapshot, m_result, m_options, m_sourceFilter, m_filter, counterToken.Token);
// Do a remote verification, unless disabled
PreBackupVerify(backend);
// Verify before uploading a synthetic list
m_database.VerifyConsistency(null, m_options.Blocksize, m_options.BlockhashSize);
var res = Backup.UploadSyntheticFilelist.Run(bk, db, m_options, m_result).WaitForTask();
if (res.IsFaulted)
{
if (res.Exception.Flatten().InnerExceptions.Count == 1)
throw res.Exception.Flatten().InnerExceptions.First();
else
throw res.Exception;
}
// Start the uploader process
uploader = Backup.BackendUploader.Run(bk, m_options, db, m_result);
// If the previous backup was interrupted, send a synthetic list
await Backup.UploadSyntheticFilelist.Run(db, m_options, m_result);
// This should be removed
m_database.BuildLookupTable(m_options);
// Prepare the operation
m_result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_ProcessingFiles);
await CreateFilesetAsync(db, filesetvolume.RemoteFilename);
var filesetvolumeid = CreateFilesetAsync(db, filesetvolume.RemoteFilename).Result;
res = RunMainOperation(snapshot, db, bk, stats, m_options, m_sourceFilter, m_filter, m_result).WaitForTask();
if (res.IsFaulted)
{
if (res.Exception.Flatten().InnerExceptions.Count == 1)
throw res.Exception.Flatten().InnerExceptions.First();
else
throw res.Exception;
}
// Run the backup operation
await RunMainOperation(snapshot, db, stats, m_options, m_sourceFilter, m_filter, m_result);
}
finally
{
//If the scanner is still running for some reason, make sure we kill it now
counterToken.Cancel();
}
}
finally
{
//If the scanner is still running for some reason, make sure we kill it now
counterToken.Cancel();
}
// Wait for upload completion
m_result.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_WaitForUpload);
await uploadtarget.WriteAsync(flushReq);
// TODO: do not close the uploader once the other functions are ported
}
// TODO: Implement this
//var lastVolumeSize = FinalizeRemoteVolumes(backend);
// In case the uploader crashes, we grab the exception here
if (await Task.WhenAny(uploader, flushReq.LastWriteSizeAync) == uploader)
await uploader;
// Grab the size of the last uploaded volume
var lastVolumeSize = await flushReq.LastWriteSizeAync;
// Make sure we have the database up-to-date
await db.CommitTransactionAsync("CommitAfterUpload", false);
// TODO: Remove this
await uploader;
// TODO: Remove this later
m_transaction = m_database.BeginTransaction();
using(new Logging.Timer("UpdateChangeStatistics"))
@@ -423,8 +434,12 @@ namespace Duplicati.Library.Main.Operation
}
catch (Exception ex)
{
m_result.AddError("Fatal error", ex);
throw;
var aex = BuildException(ex, uploader, parallelScanner);
m_result.AddError("Fatal error", aex);
if (aex == ex)
throw;
throw aex;
}
finally
{
@@ -435,6 +450,8 @@ namespace Duplicati.Library.Main.Operation
if (m_transaction != null)
try { m_transaction.Rollback(); }
catch (Exception ex) { m_result.AddError(string.Format("Rollback error: {0}", ex.Message), ex); }
lh.Wait(500);
}
}
}
@@ -67,10 +67,6 @@ namespace Duplicati.Library.Main.Operation.Common
/// </summary>
public long Size;
/// <summary>
/// A flag indicating if the final hash and size of the block volume has been written to the index file
/// </summary>
public bool IndexfileUpdated;
/// <summary>
/// A flag indicating if the file is a extra metadata file
/// that has no entry in the database
/// </summary>
@@ -104,13 +100,13 @@ namespace Duplicati.Library.Main.Operation.Common
this.LocalTempfile.Protected = true;
}
public async Task Encrypt(Library.Interface.IEncryption encryption, IWriteChannel<LogMessage> logchannel)
public async Task Encrypt(Library.Interface.IEncryption encryption, LogWrapper log)
{
if (encryption != null && !this.Encrypted)
{
var tempfile = new Library.Utility.TempFile();
encryption.Encrypt(this.LocalFilename, tempfile);
await this.DeleteLocalFile(logchannel);
await this.DeleteLocalFile(log);
this.LocalTempfile = tempfile;
this.Hash = null;
this.Size = 0;
@@ -138,18 +134,18 @@ namespace Duplicati.Library.Main.Operation.Common
return false;
}
public async Task DeleteLocalFile(IWriteChannel<LogMessage> logchannel)
public async Task DeleteLocalFile(LogWrapper log)
{
if (this.LocalTempfile != null)
try { this.LocalTempfile.Dispose(); }
catch (Exception ex) { await logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to dispose temporary file: {0}", this.LocalTempfile), ex)); }
catch (Exception ex) { await log.WriteWarningAsync(string.Format("Failed to dispose temporary file: {0}", this.LocalTempfile), ex); }
finally { this.LocalTempfile = null; }
}
}
[ChannelName("LogChannel")]
private IWriteChannel<LogMessage> m_logchannel;
private IWriteChannel<LogMessage> m_logchannel = Channels.LogChannel.ForWrite;
private LogWrapper m_log;
private DatabaseCommon m_database;
private IEncryption m_encryption;
@@ -167,6 +163,7 @@ namespace Duplicati.Library.Main.Operation.Common
m_options = options;
m_backendurl = backendUrl;
m_stats = stats;
m_log = new LogWrapper(m_logchannel);
if (!options.NoEncryption)
m_encryption = DynamicLoader.EncryptionLoader.GetModule(options.EncryptionModule, options.Passphrase, options.RawOptions);
}
@@ -201,6 +198,14 @@ namespace Duplicati.Library.Main.Operation.Common
var tcs = new TaskCompletionSource<bool>();
var backgroundhashAndEncrypt = Task.Run(async () =>
{
if (m_encryption != null)
await fe.Encrypt(m_encryption, m_log).ConfigureAwait(false);
return fe.UpdateHashAndSize(m_options);
});
await RunOnMain(async () =>
{
try
@@ -209,6 +214,9 @@ namespace Duplicati.Library.Main.Operation.Common
if (fe.IsRetry)
await RenameFileAfterErrorAsync(fe);
// Make sure the encryption and hashing has completed
await backgroundhashAndEncrypt;
return await DoPut(fe);
});
@@ -306,7 +314,7 @@ namespace Duplicati.Library.Main.Operation.Common
}
catch (Exception dex)
{
await m_logchannel.WriteAsync(LogMessage.Warning(LC.L("Failed to dispose backend instance: {0}", (ex ?? dex).Message), dex));
await m_log.WriteWarningAsync(LC.L("Failed to dispose backend instance: {0}", (ex ?? dex).Message), dex);
}
m_backend = null;
@@ -337,7 +345,7 @@ namespace Duplicati.Library.Main.Operation.Common
{
item.IsRetry = true;
lastException = ex;
await m_logchannel.WriteAsync(LogMessage.RetryAttempt(string.Format("Operation {0} with file {1} attempt {2} of {3} failed with message: {4}", item.Operation, item.RemoteFilename, i + 1, m_options.NumberOfRetries, ex.Message), ex));
await m_log.WriteRetryAttemptAsync(string.Format("Operation {0} with file {1} attempt {2} of {3} failed with message: {4}", item.Operation, item.RemoteFilename, i + 1, m_options.NumberOfRetries, ex.Message), ex);
// If the thread is aborted, we exit here
if (ex is System.Threading.ThreadAbortException)
@@ -356,7 +364,7 @@ namespace Duplicati.Library.Main.Operation.Common
}
catch (Exception dex)
{
await m_logchannel.WriteAsync(LogMessage.Warning(string.Format("Failed to create folder: {0}", ex.Message), dex));
await m_log.WriteWarningAsync(string.Format("Failed to create folder: {0}", ex.Message), dex);
}
}
@@ -384,20 +392,21 @@ namespace Duplicati.Library.Main.Operation.Common
await m_stats.SendEventAsync(item.Operation, BackendEventType.Rename, oldname, item.Size);
await m_stats.SendEventAsync(item.Operation, BackendEventType.Rename, newname, item.Size);
await m_logchannel.WriteAsync(LogMessage.Information(string.Format("Renaming \"{0}\" to \"{1}\"", oldname, newname)));
await m_log.WriteInformationAsync(string.Format("Renaming \"{0}\" to \"{1}\"", oldname, newname));
await m_database.RenameRemoteFileAsync(oldname, newname);
item.RemoteFilename = newname;
}
private async Task<bool> DoPut(FileEntryItem item)
private async Task<bool> DoPut(FileEntryItem item, bool updatedHash = false)
{
if (m_encryption != null)
await item.Encrypt(m_encryption, m_logchannel);
if (m_encryption != null && !item.Encrypted)
await item.Encrypt(m_encryption, m_log);
if (item.UpdateHashAndSize(m_options) && item.TrackedInDb)
updatedHash |= item.UpdateHashAndSize(m_options);
if (updatedHash && item.TrackedInDb)
await m_database.UpdateRemoteVolumeAsync(item.RemoteFilename, RemoteVolumeState.Uploading, item.Size, item.Hash);
await m_database.LogRemoteOperationAsync("put", item.RemoteFilename, JsonConvert.SerializeObject(new { Size = item.Size, Hash = item.Hash }));
await m_stats.SendEventAsync(BackendActionType.Put, BackendEventType.Started, item.RemoteFilename, item.Size);
@@ -414,7 +423,7 @@ namespace Duplicati.Library.Main.Operation.Common
m_backend.Put(item.RemoteFilename, item.LocalFilename);
var duration = DateTime.Now - begin;
Logging.Log.WriteMessage(string.Format("Uploaded {0} in {1}, {2}/s", Library.Utility.Utility.FormatSizeString(item.Size), duration, Library.Utility.Utility.FormatSizeString((long)(item.Size / duration.TotalSeconds))), Duplicati.Library.Logging.LogMessageType.Profiling);
await m_log.WriteProfilingAsync(string.Format("Uploaded {0} in {1}, {2}/s", Library.Utility.Utility.FormatSizeString(item.Size), duration, Library.Utility.Utility.FormatSizeString((long)(item.Size / duration.TotalSeconds))), null);
if (item.TrackedInDb)
await m_database.UpdateRemoteVolumeAsync(item.RemoteFilename, RemoteVolumeState.Uploaded, item.Size, item.Hash);
@@ -430,7 +439,7 @@ namespace Duplicati.Library.Main.Operation.Common
throw new Exception(string.Format("List verify failed for file: {0}, size was {1} but expected to be {2}", f.Name, f.Size, item.Size));
}
await item.DeleteLocalFile(m_logchannel);
await item.DeleteLocalFile(m_log);
await m_database.CommitTransactionAsync("CommitAfterUpload");
return true;
@@ -478,7 +487,7 @@ namespace Duplicati.Library.Main.Operation.Common
if (isFileMissingException || (wr != null && wr.StatusCode == System.Net.HttpStatusCode.NotFound))
{
await m_logchannel.WriteAsync(LogMessage.Warning(LC.L("Delete operation failed for {0} with FileNotFound, listing contents", item.RemoteFilename), ex));
await m_log.WriteWarningAsync(LC.L("Delete operation failed for {0} with FileNotFound, listing contents", item.RemoteFilename), ex);
bool success = false;
try
@@ -491,7 +500,7 @@ namespace Duplicati.Library.Main.Operation.Common
if (success)
{
await m_logchannel.WriteAsync(LogMessage.Information(LC.L("Listing indicates file {0} is deleted correctly", item.RemoteFilename)));
await m_log.WriteInformationAsync(LC.L("Listing indicates file {0} is deleted correctly", item.RemoteFilename));
return true;
}
@@ -556,7 +565,7 @@ namespace Duplicati.Library.Main.Operation.Common
var duration = DateTime.Now - begin;
var filehash = FileEntryItem.CalculateFileHash(tmpfile);
Logging.Log.WriteMessage(string.Format("Downloaded {0} in {1}, {2}/s", Library.Utility.Utility.FormatSizeString(item.Size), duration, Library.Utility.Utility.FormatSizeString((long)(item.Size / duration.TotalSeconds))), Duplicati.Library.Logging.LogMessageType.Profiling);
await m_log.WriteProfilingAsync(string.Format("Downloaded {0} in {1}, {2}/s", Library.Utility.Utility.FormatSizeString(item.Size), duration, Library.Utility.Utility.FormatSizeString((long)(item.Size / duration.TotalSeconds))), null);
await m_database.LogRemoteOperationAsync("get", item.RemoteFilename, JsonConvert.SerializeObject(new { Size = new System.IO.FileInfo(tmpfile).Length, Hash = filehash }));
await m_stats.SendEventAsync(BackendActionType.Get, BackendEventType.Completed, item.RemoteFilename, new System.IO.FileInfo(tmpfile).Length);
@@ -601,19 +610,19 @@ namespace Duplicati.Library.Main.Operation.Common
// Check if the file is encrypted with something else
if (DynamicLoader.EncryptionLoader.Keys.Contains(ext, StringComparer.InvariantCultureIgnoreCase))
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Filename extension \"{0}\" does not match encryption module \"{1}\", using matching encryption module", ext, m_options.EncryptionModule));
await m_log.WriteVerboseAsync("Filename extension \"{0}\" does not match encryption module \"{1}\", using matching encryption module", ext, m_options.EncryptionModule);
using(var encmodule = DynamicLoader.EncryptionLoader.GetModule(ext, m_options.Passphrase, m_options.RawOptions))
(encmodule ?? m_encryption).Decrypt(tmpfile2, tmpfile);
}
// Check if the file is not encrypted
else if (DynamicLoader.CompressionLoader.Keys.Contains(ext, StringComparer.InvariantCultureIgnoreCase))
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Filename extension \"{0}\" does not match encryption module \"{1}\", guessing that it is not encrypted", ext, m_options.EncryptionModule));
await m_log.WriteVerboseAsync("Filename extension \"{0}\" does not match encryption module \"{1}\", guessing that it is not encrypted", ext, m_options.EncryptionModule);
}
// Fallback, lets see what happens...
else
{
await m_logchannel.WriteAsync(LogMessage.Verbose("Filename extension \"{0}\" does not match encryption module \"{1}\", attempting to use specified encryption module as no others match", ext, m_options.EncryptionModule));
await m_log.WriteVerboseAsync("Filename extension \"{0}\" does not match encryption module \"{1}\", attempting to use specified encryption module as no others match", ext, m_options.EncryptionModule);
m_encryption.Decrypt(tmpfile2, tmpfile);
}
}
@@ -15,14 +15,12 @@
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
using System;
using CoCoL;
namespace Duplicati.Library.Main.Operation.Common
{
public class Channels
internal static class Channels
{
public Channels()
{
}
}
}
public static readonly ChannelMarkerWrapper<LogMessage> LogChannel = new ChannelMarkerWrapper<LogMessage>("LogChannel");
}}
@@ -15,6 +15,8 @@
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
using System;
using CoCoL;
using System.Threading.Tasks;
namespace Duplicati.Library.Main.Operation.Common
{
@@ -26,70 +28,6 @@ namespace Duplicati.Library.Main.Operation.Common
public bool IsVerbose;
public bool IsDryRun;
public bool IsRetry;
public static LogMessage Warning(string message, Exception ex)
{
return new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Warning,
Message = message,
Exception = ex
};
}
public static LogMessage Error(string message, Exception ex)
{
return new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Error,
Message = message,
Exception = ex
};
}
public static LogMessage Profiling(string message, Exception ex)
{
return new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Profiling,
Message = message,
Exception = ex
};
}
public static LogMessage Information(string message, Exception ex = null)
{
return new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Information,
Message = message,
Exception = ex
};
}
public static LogMessage Verbose(string message, params object[] args)
{
return new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Information,
Message = string.Format(message, args),
IsVerbose = true
};
}
public static LogMessage DryRun(string message, params object[] args)
{
return new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Information,
Message = string.Format(message, args),
IsDryRun = true
};
}
public static LogMessage RetryAttempt(string message, params object[] args)
{
return new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Information,
Message = string.Format(message, args),
IsRetry = true
};
}
}
}
@@ -0,0 +1,117 @@
// Copyright (C) 2015, The Duplicati Team
// http://www.duplicati.com, info@duplicati.com
//
// This library is free software; you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation; either version 2.1 of the
// License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
using System;
using CoCoL;
using System.Threading.Tasks;
namespace Duplicati.Library.Main.Operation.Common
{
public class LogWrapper : IDisposable
{
private bool m_autoLeave;
private IWriteChannel<LogMessage> m_channel;
//TODO: Fix this so the verbose messages are not sent unless required
public LogWrapper()
{
m_channel = ChannelManager.GetChannel(Channels.LogChannel.ForWrite);
m_autoLeave = true;
}
public LogWrapper(IWriteChannel<LogMessage> channel)
{
m_channel = channel;
m_autoLeave = false;
}
public Task WriteWarningAsync(string message, Exception ex)
{
return m_channel.WriteAsync(new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Warning,
Message = message,
Exception = ex
});
}
public Task WriteErrorAsync(string message, Exception ex)
{
return m_channel.WriteAsync(new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Error,
Message = message,
Exception = ex
});
}
public Task WriteProfilingAsync(string message, Exception ex)
{
return m_channel.WriteAsync(new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Profiling,
Message = message,
Exception = ex
});
}
public Task WriteInformationAsync(string message, Exception ex = null)
{
return m_channel.WriteAsync(new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Information,
Message = message,
Exception = ex
});
}
public Task WriteVerboseAsync(string message, params object[] args)
{
return m_channel.WriteAsync(new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Information,
Message = string.Format(message, args),
IsVerbose = true
});
}
public Task WriteDryRunAsync(string message, params object[] args)
{
return m_channel.WriteAsync(new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Information,
Message = string.Format(message, args),
IsDryRun = true
});
}
public Task WriteRetryAttemptAsync(string message, params object[] args)
{
return m_channel.WriteAsync(new LogMessage() {
Level = Duplicati.Library.Logging.LogMessageType.Information,
Message = string.Format(message, args),
IsRetry = true
});
}
public void Dispose()
{
if (m_autoLeave && m_channel != null)
{
m_autoLeave = false;
if (m_channel is IRetireAbleChannel)
((IRetireAbleChannel)m_channel).Retire();
}
m_channel = null;
}
}
}
BIN
View File
Binary file not shown.