fix race in flush_piece_impl(). support the case where another thread may already be flushing the piece

This commit is contained in:
Arvid Norberg
2026-04-06 07:30:16 +02:00
parent f0012e1197
commit 704bd8af4f
2 changed files with 32 additions and 17 deletions
+7
View File
@@ -11,6 +11,7 @@ see LICENSE file.
#define TORRENT_DISK_CACHE
#include <unordered_map>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <algorithm>
@@ -219,6 +220,11 @@ struct cached_piece_entry
// single wakeup.
static constexpr cached_piece_flags needs_hasher_kick_flag = 6_bit;
// set by a thread about to wait on m_flushing_cv for flushing_flag to
// clear on this piece. flush_piece_impl only calls notify_all() when this
// flag is set, avoiding the cost of signalling.
static constexpr cached_piece_flags notify_flushed_flag = 7_bit;
// flags are protected by the main disk cache mutex and may only be
// accessed while holding it
cached_piece_flags flags{};
@@ -568,6 +574,7 @@ private:
, std::function<void(jobqueue_t, disk_job*)> clear_piece_fun);
mutable std::mutex m_mutex;
std::condition_variable m_flushing_cv;
piece_container m_pieces;
// allocator used for all disk buffers in this cache. Set lazily on the
+25 -17
View File
@@ -624,12 +624,17 @@ Iter disk_cache::flush_piece_impl(View& view
{
auto se = scope_end([&] {
l.lock();
view.modify(piece_iter, [](cached_piece_entry& e) {
bool notify = false;
view.modify(piece_iter, [&notify](cached_piece_entry& e) {
TORRENT_ASSERT(bool(e.flags & cached_piece_entry::flushing_flag));
notify = bool(e.flags & cached_piece_entry::notify_flushed_flag);
// leave notify_flushed_flag set — it pins the piece against eviction
// until flush_storage clears it after waking up
e.flags &= ~cached_piece_entry::flushing_flag;
});
TORRENT_ASSERT(m_flushing_blocks >= num_blocks);
m_flushing_blocks -= num_blocks;
if (notify) m_flushing_cv.notify_all();
});
flushed_blocks.resize(int(blocks.size()));
flushed_blocks.clear_all();
@@ -805,7 +810,8 @@ void disk_cache::flush_to_disk(
if (piece_iter->flushed_cursor == piece_iter->blocks_in_piece()
&& bool(piece_iter->flags & cached_piece_entry::piece_hash_returned_flag)
&& !(piece_iter->flags & cached_piece_entry::flushing_flag))
&& !(piece_iter->flags & cached_piece_entry::flushing_flag)
&& !(piece_iter->flags & cached_piece_entry::notify_flushed_flag))
{
TORRENT_ASSERT(!(piece_iter->flags & cached_piece_entry::hashing_flag));
free_piece(*piece_iter);
@@ -895,36 +901,38 @@ void disk_cache::flush_storage(std::function<int(bitfield&, span<cached_block_en
for (auto i = begin; i != end; ++i)
pieces.push_back(i->piece.piece);
bitfield flushed_blocks;
for (auto piece : pieces)
{
auto piece_iter = view.find(piece_location{storage, piece});
auto const piece_iter = view.find(piece_location{storage, piece});
if (piece_iter == view.end())
continue;
// There's a risk that some other thread is flushing this piece, but
// won't force-flush it completely. In that case parts of the piece
// may not be flushed
// TODO: maybe we should track these pieces and synchronize with
// them later. maybe wait for them to be flushed or hang our job on
// them, but that would really only work if there's only one piece
// left
// A background flush thread may have this piece mid-flight (flushing_flag
// set, lock released). Set notify_flushed_flag so flush_piece_impl will
// signal us, then wait for it to finish before flushing remaining blocks.
if (piece_iter->flags & cached_piece_entry::flushing_flag)
continue;
{
view.modify(piece_iter, [](cached_piece_entry& e)
{ e.flags |= cached_piece_entry::notify_flushed_flag; });
m_flushing_cv.wait(l, [&]
{ return !(piece_iter->flags & cached_piece_entry::flushing_flag); });
// clear the pin now that we hold the lock and piece_iter is safe
view.modify(piece_iter, [](cached_piece_entry& e)
{ e.flags &= ~cached_piece_entry::notify_flushed_flag; });
}
int const num_blocks = piece_iter->num_jobs;
TORRENT_ASSERT(count_jobs(piece_iter->get_blocks()) == num_blocks);
if (num_blocks == 0) continue;
span<cached_block_entry> const blocks = piece_iter->get_blocks();
flush_piece_impl(view, piece_iter, f, l
, blocks, clear_piece_fun);
flush_piece_impl(view, piece_iter, f, l, blocks, clear_piece_fun);
TORRENT_ASSERT(l.owns_lock());
TORRENT_ASSERT(!(piece_iter->flags & cached_piece_entry::flushing_flag));
TORRENT_ASSERT(!(piece_iter->flags & cached_piece_entry::hashing_flag));
TORRENT_ASSERT(!(piece_iter->flags & cached_piece_entry::notify_flushed_flag));
free_piece(*piece_iter);
piece_iter = view.erase(piece_iter);
view.erase(piece_iter);
}
}