diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 8b9158d8f67a7d..10baa11ca25af2 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1596,13 +1596,13 @@ DEFINE_Int64(segment_prefetch_thread_pool_thread_num_max, "2000"); DEFINE_mInt32(segment_file_cache_consume_rowids_batch_size, "8000"); // Enable segment file cache block prefetch for query -DEFINE_mBool(enable_query_segment_file_cache_prefetch, "false"); +DEFINE_mBool(enable_query_segment_file_cache_prefetch, "true"); // Number of blocks to prefetch ahead in segment iterator for query -DEFINE_mInt32(query_segment_file_cache_prefetch_block_size, "2"); +DEFINE_mInt32(query_segment_file_cache_prefetch_block_size, "4"); // Enable segment file cache block prefetch for compaction -DEFINE_mBool(enable_compaction_segment_file_cache_prefetch, "false"); +DEFINE_mBool(enable_compaction_segment_file_cache_prefetch, "true"); // Number of blocks to prefetch ahead in segment iterator for compaction -DEFINE_mInt32(compaction_segment_file_cache_prefetch_block_size, "2"); +DEFINE_mInt32(compaction_segment_file_cache_prefetch_block_size, "4"); // The min thread num for S3FileUploadThreadPool DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16"); // The max thread num for S3FileUploadThreadPool diff --git a/be/src/exec/operator/olap_scan_operator.cpp b/be/src/exec/operator/olap_scan_operator.cpp index c1113ba9a3e519..643f8e8aa0a319 100644 --- a/be/src/exec/operator/olap_scan_operator.cpp +++ b/be/src/exec/operator/olap_scan_operator.cpp @@ -313,8 +313,8 @@ Status OlapScanLocalState::_init_profile() { "SegmentIteratorInitTimer"); _segment_iterator_init_index_iterators_timer = ADD_CHILD_TIMER( _scanner_profile, "SegmentIteratorInitIndexIteratorsTimer", "SegmentIteratorInitTimer"); - _segment_iterator_init_segment_prefetchers_timer = - ADD_CHILD_TIMER(_scanner_profile, "SegmentIteratorInitSegmentPrefetchersTimer", + _segment_iterator_init_cache_block_prefetch_timer = + ADD_CHILD_TIMER(_scanner_profile, "SegmentIteratorInitCacheBlockPrefetchTimer", "SegmentIteratorInitTimer"); // These two timers span both iterator init and later lazy segment init paths, diff --git a/be/src/exec/operator/olap_scan_operator.h b/be/src/exec/operator/olap_scan_operator.h index cd82c7ab08e207..a0f1538d5d2ea4 100644 --- a/be/src/exec/operator/olap_scan_operator.h +++ b/be/src/exec/operator/olap_scan_operator.h @@ -290,7 +290,7 @@ class OlapScanLocalState final : public ScanLocalState { RuntimeProfile::Counter* _segment_iterator_init_timer = nullptr; RuntimeProfile::Counter* _segment_iterator_init_return_column_iterators_timer = nullptr; RuntimeProfile::Counter* _segment_iterator_init_index_iterators_timer = nullptr; - RuntimeProfile::Counter* _segment_iterator_init_segment_prefetchers_timer = nullptr; + RuntimeProfile::Counter* _segment_iterator_init_cache_block_prefetch_timer = nullptr; RuntimeProfile::Counter* _segment_create_column_readers_timer = nullptr; RuntimeProfile::Counter* _segment_load_index_timer = nullptr; diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index a7949535e37755..73c8c5142e15aa 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -881,8 +881,8 @@ void OlapScanner::_collect_profile_before_close() { stats.segment_iterator_init_return_column_iterators_timer_ns); COUNTER_UPDATE(local_state->_segment_iterator_init_index_iterators_timer, stats.segment_iterator_init_index_iterators_timer_ns); - COUNTER_UPDATE(local_state->_segment_iterator_init_segment_prefetchers_timer, - stats.segment_iterator_init_segment_prefetchers_timer_ns); + COUNTER_UPDATE(local_state->_segment_iterator_init_cache_block_prefetch_timer, + stats.segment_iterator_init_cache_block_prefetch_timer_ns); COUNTER_UPDATE(local_state->_segment_create_column_readers_timer, stats.segment_create_column_readers_timer_ns); diff --git a/be/src/io/cache/cache_block_aware_prefetch_remote_reader.cpp b/be/src/io/cache/cache_block_aware_prefetch_remote_reader.cpp new file mode 100644 index 00000000000000..cc3530d5ec6025 --- /dev/null +++ b/be/src/io/cache/cache_block_aware_prefetch_remote_reader.cpp @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/cache/cache_block_aware_prefetch_remote_reader.h" + +#include +#include +#include +#include + +#include "common/logging.h" + +namespace doris::io { + +namespace { + +CacheBlockRange cache_block_range(size_t block_id, size_t cache_block_size) { + return {block_id * cache_block_size, cache_block_size}; +} + +bool trigger_file_range_is_before_current_offset(const FileAccessRange& range, + size_t current_file_offset) { + if (current_file_offset <= range.offset) { + return false; + } + return current_file_offset - range.offset >= range.size; +} + +bool same_trigger_file_range(const FileAccessRange& lhs, const FileAccessRange& rhs) { + return lhs.offset == rhs.offset && lhs.size == rhs.size; +} + +} // namespace + +CacheBlockAwarePrefetchRemoteReader::CacheBlockAwarePrefetchRemoteReader( + FileReaderSPtr remote_file_reader, const FileReaderOptions& opts) + : CachedRemoteFileReader(std::move(remote_file_reader), opts) {} + +Status CacheBlockAwarePrefetchRemoteReader::set_read_pattern( + CacheBlockReadPattern pattern, const CacheBlockPrefetchPolicy& policy) { + if (policy.max_prefetch_blocks == 0 || policy.cache_block_size == 0) { + return Status::InvalidArgument( + "cache block prefetch policy requires positive window and block size"); + } + + auto plan = detail::CacheBlockPrefetchPlan::from_read_pattern(std::move(pattern), + policy.cache_block_size); + + std::lock_guard lock(_pattern_mutex); + if (plan.empty()) { + _prefetch_cursor.reset(); + return Status::OK(); + } + _prefetch_cursor.emplace(std::move(plan), policy.max_prefetch_blocks); + return Status::OK(); +} + +void CacheBlockAwarePrefetchRemoteReader::clear_read_pattern() { + std::lock_guard lock(_pattern_mutex); + _prefetch_cursor.reset(); +} + +void CacheBlockAwarePrefetchRemoteReader::async_touch_initial_window(const IOContext* io_ctx) { + std::vector ranges; + { + std::lock_guard lock(_pattern_mutex); + if (!_prefetch_cursor.has_value()) { + return; + } + ranges = _prefetch_cursor->next_initial_touch_ranges(); + } + _async_touch_ranges(std::move(ranges), io_ctx); +} + +bool CacheBlockAwarePrefetchRemoteReader::has_read_pattern() const { + std::lock_guard lock(_pattern_mutex); + return _prefetch_cursor.has_value(); +} + +Status CacheBlockAwarePrefetchRemoteReader::read_at_impl(size_t offset, Slice result, + size_t* bytes_read, + const IOContext* io_ctx) { + // Normal foreground reads drive the prefetch window by the real file offset + // that PageIO is about to read. Dry-run reads are submitted by + // CachedRemoteFileReader::async_touch_local_cache() to warm the file cache; + // they must not recursively schedule more prefetch work. + if (io_ctx == nullptr || !io_ctx->is_dryrun) { + _prefetch(offset, io_ctx); + } + return CachedRemoteFileReader::read_at_impl(offset, result, bytes_read, io_ctx); +} + +void CacheBlockAwarePrefetchRemoteReader::_prefetch(size_t current_file_offset, + const IOContext* io_ctx) { + std::vector ranges; + { + std::lock_guard lock(_pattern_mutex); + if (!_prefetch_cursor.has_value()) { + return; + } + ranges = _prefetch_cursor->next_touch_ranges(current_file_offset); + } + + _async_touch_ranges(std::move(ranges), io_ctx); +} + +void CacheBlockAwarePrefetchRemoteReader::_async_touch_ranges(std::vector ranges, + const IOContext* io_ctx) { + for (const auto& range : ranges) { + async_touch_local_cache(range.offset, range.size, io_ctx); + } +} + +namespace detail { + +CacheBlockPrefetchCursor::CacheBlockPrefetchCursor(CacheBlockPrefetchPlan plan, + size_t max_prefetch_blocks) + : _plan(std::move(plan)), _max_prefetch_blocks(max_prefetch_blocks) { + DCHECK(_max_prefetch_blocks > 0); +} + +std::vector CacheBlockPrefetchCursor::next_touch_ranges( + size_t current_file_offset) { + std::vector ranges; + const auto entries = _plan.entries(); + if (entries.empty() || _next_touch_index >= entries.size()) { + return ranges; + } + + _advance_current_index(current_file_offset); + if (_current_index >= entries.size()) { + return ranges; + } + + _next_touch_index = std::max(_next_touch_index, _current_index); + while (_next_touch_index < entries.size()) { + const bool has_window_capacity = _prefetched_window_size() < _max_prefetch_blocks; + if (!has_window_capacity && !_next_range_continues_current_file_range()) { + break; + } + ranges.push_back(entries[_next_touch_index++].cache_block_range); + } + return ranges; +} + +std::vector CacheBlockPrefetchCursor::next_initial_touch_ranges() { + const auto entries = _plan.entries(); + if (entries.empty() || _current_index >= entries.size()) { + return {}; + } + return next_touch_ranges(entries[_current_index].trigger_file_range.offset); +} + +void CacheBlockPrefetchCursor::_advance_current_index(size_t current_file_offset) { + auto current_range_is_behind = [this, + current_file_offset](const CacheBlockPrefetchPlanEntry& entry) { + if (_plan.direction() == CacheBlockReadDirection::FORWARD) { + return trigger_file_range_is_before_current_offset(entry.trigger_file_range, + current_file_offset); + } + return entry.trigger_file_range.offset > current_file_offset; + }; + const auto entries = _plan.entries(); + while (_current_index < entries.size() && current_range_is_behind(entries[_current_index])) { + ++_current_index; + } +} + +bool CacheBlockPrefetchCursor::_next_range_continues_current_file_range() const { + const auto entries = _plan.entries(); + return _next_touch_index > _current_index && _next_touch_index < entries.size() && + same_trigger_file_range(entries[_next_touch_index].trigger_file_range, + entries[_next_touch_index - 1].trigger_file_range); +} + +CacheBlockPrefetchPlan CacheBlockPrefetchPlan::from_read_pattern(CacheBlockReadPattern pattern, + size_t cache_block_size) { + const auto direction = pattern.direction; + if (pattern.direction == CacheBlockReadDirection::FORWARD) { + std::stable_sort(pattern.ranges.begin(), pattern.ranges.end(), + [](const FileAccessRange& lhs, const FileAccessRange& rhs) { + return lhs.offset < rhs.offset; + }); + } else { + std::stable_sort(pattern.ranges.begin(), pattern.ranges.end(), + [](const FileAccessRange& lhs, const FileAccessRange& rhs) { + return lhs.offset > rhs.offset; + }); + } + + std::vector entries; + std::unordered_set added_blocks; + for (const auto& range : pattern.ranges) { + if (range.size == 0) { + continue; + } + DORIS_CHECK(range.size - 1 <= std::numeric_limits::max() - range.offset); + + size_t start_block = range.offset / cache_block_size; + size_t end_block = (range.offset + range.size - 1) / cache_block_size; + if (pattern.direction == CacheBlockReadDirection::FORWARD) { + for (size_t block_id = start_block;; ++block_id) { + if (added_blocks.emplace(block_id).second) { + entries.push_back({ + .cache_block_range = cache_block_range(block_id, cache_block_size), + .trigger_file_range = range, + }); + } + if (block_id == end_block) { + break; + } + } + } else { + for (size_t block_id = end_block;; --block_id) { + if (added_blocks.emplace(block_id).second) { + entries.push_back({ + .cache_block_range = cache_block_range(block_id, cache_block_size), + .trigger_file_range = range, + }); + } + if (block_id == start_block) { + break; + } + } + } + } + return CacheBlockPrefetchPlan(direction, std::move(entries)); +} + +} // namespace detail + +} // namespace doris::io diff --git a/be/src/io/cache/cache_block_aware_prefetch_remote_reader.h b/be/src/io/cache/cache_block_aware_prefetch_remote_reader.h new file mode 100644 index 00000000000000..c189e143728623 --- /dev/null +++ b/be/src/io/cache/cache_block_aware_prefetch_remote_reader.h @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "io/cache/cached_remote_file_reader.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_reader_writer_fwd.h" + +namespace doris::io { + +struct IOContext; + +enum class CacheBlockReadDirection : uint8_t { + FORWARD = 0, + BACKWARD = 1, +}; + +// One caller-visible data range in the underlying remote file. +// +// Segment readers build these ranges from data page pointers in ordinal indexes. +// The reader uses the file range itself to advance prefetch by read_at() file +// offsets, so callers do not need to pass segment ordinals into the IO layer. +struct FileAccessRange { + size_t offset = 0; + size_t size = 0; +}; + +struct CacheBlockReadPattern { + CacheBlockReadDirection direction = CacheBlockReadDirection::FORWARD; + std::vector ranges; +}; + +struct CacheBlockPrefetchPolicy { + // Target number of file cache blocks kept in the prefetch window, including + // the block that contains the current read position. This is a soft cap: a + // single file access range is never split even when it spans more blocks. + size_t max_prefetch_blocks = 0; + + // The block size used to convert access ranges to file cache blocks. This + // should normally be config::file_cache_each_block_size. + size_t cache_block_size = 0; +}; + +struct CacheBlockRange { + size_t offset = 0; + size_t size = 0; +}; + +namespace detail { + +struct CacheBlockPrefetchPlanEntry { + CacheBlockRange cache_block_range; + FileAccessRange trigger_file_range; +}; + +// Immutable conversion result from high-level file access ranges to file-cache +// block ranges. Each entry keeps the original file range as the trigger so +// read_at() can advance prefetch by observed file offsets, even when a read +// starts inside a known data-page range rather than exactly at its first byte. +class CacheBlockPrefetchPlan { +public: + CacheBlockPrefetchPlan() = default; + + static CacheBlockPrefetchPlan from_read_pattern(CacheBlockReadPattern pattern, + size_t cache_block_size); + + [[nodiscard]] bool empty() const { return _entries.empty(); } + [[nodiscard]] CacheBlockReadDirection direction() const { return _direction; } + [[nodiscard]] std::span entries() const { return _entries; } + +private: + CacheBlockPrefetchPlan(CacheBlockReadDirection direction, + std::vector entries) + : _direction(direction), _entries(std::move(entries)) {} + + CacheBlockReadDirection _direction = CacheBlockReadDirection::FORWARD; + std::vector _entries; +}; + +// Mutable sliding-window cursor over a CacheBlockPrefetchPlan. It contains only +// scan progress; rebuilding the read pattern creates a fresh cursor. +class CacheBlockPrefetchCursor { +public: + CacheBlockPrefetchCursor() = default; + CacheBlockPrefetchCursor(CacheBlockPrefetchPlan plan, size_t max_prefetch_blocks); + + [[nodiscard]] bool empty() const { return _plan.empty(); } + std::vector next_touch_ranges(size_t current_file_offset); + std::vector next_initial_touch_ranges(); + +private: + void _advance_current_index(size_t current_file_offset); + [[nodiscard]] size_t _prefetched_window_size() const { + return _next_touch_index - _current_index; + } + [[nodiscard]] bool _next_range_continues_current_file_range() const; + + CacheBlockPrefetchPlan _plan; + size_t _max_prefetch_blocks = 0; + size_t _current_index = 0; + size_t _next_touch_index = 0; +}; + +} // namespace detail + +// Cached remote reader with cache-block-aware prefetch scheduling. +// +// Purpose: +// CachedRemoteFileReader already knows how to warm a file cache block by +// reading it in dry-run mode. This class adds an explicit read-pattern layer +// above that primitive: callers describe the future file access ranges and a +// prefetch policy. The reader translates the pattern to file cache blocks, +// keeps a sliding window of blocks ahead of the current read_at() offset, and +// submits one async local-cache touch task per cache block through +// CachedRemoteFileReader::async_touch_local_cache. +// +// Interface usage: +// 1. Build a CacheBlockReadPattern from higher-level metadata. For segment +// scans, the segment layer converts selected row ids through the ordinal +// index into FileAccessRange entries for data pages. +// 2. Give each physical column iterator its own +// CacheBlockAwarePrefetchRemoteReader. A reader owns at most one pattern, +// so multiple independently monotonic scan streams should not share this +// object. +// 3. Call set_read_pattern() with a CacheBlockPrefetchPolicy before scanning. +// Afterwards callers use the normal FileReader::read_at() API. Each read +// advances the pattern by file offset and warms cache blocks until the +// configured window is full. If one file range spans more cache blocks than +// the window, the whole range is still prefetched so large data pages and +// pages that cross block boundaries are not split. +// 4. If the caller knows the first file ranges are definitely going to be read +// (for example segment predicate columns after index pruning), it may call +// async_touch_initial_window() immediately after set_read_pattern(). This +// submits the first prefetch window before the first foreground read_at(). +// Optional or batch-local streams should skip it and let read_at() trigger +// prefetch from the actual file offset. +// +// Usage example: +// See BlockFileCacheTest.usage_example_read_at_automatically_prefetches_single_pattern in +// be/test/io/cache/cache_block_aware_prefetch_remote_reader_test.cpp. +// +// This optimization intentionally spends more object-storage IOPS to expose more +// parallelism and therefore more bandwidth: instead of a scanner fetching a large +// cold segment serially, many independent S3 range reads are issued at the file +// cache block granularity. On cold scans where bandwidth is the bottleneck and +// IOPS headroom exists, this trades S3 IOPS for higher aggregate throughput. +class CacheBlockAwarePrefetchRemoteReader final : public CachedRemoteFileReader { +public: + CacheBlockAwarePrefetchRemoteReader(FileReaderSPtr remote_file_reader, + const FileReaderOptions& opts); + + ~CacheBlockAwarePrefetchRemoteReader() override = default; + + Status set_read_pattern(CacheBlockReadPattern pattern, const CacheBlockPrefetchPolicy& policy); + + void async_touch_initial_window(const IOContext* io_ctx = nullptr); + + void clear_read_pattern(); + + bool has_read_pattern() const; + +protected: + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) override; + +private: + void _prefetch(size_t current_file_offset, const IOContext* io_ctx); + void _async_touch_ranges(std::vector ranges, const IOContext* io_ctx); + + mutable std::mutex _pattern_mutex; + std::optional _prefetch_cursor; +}; + +} // namespace doris::io diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index ee5f4ceadbd79d..a82eec158f6770 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -633,7 +633,8 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, g_skip_cache_sum << read_stats.skip_cache; } -void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IOContext* io_ctx) { +void CachedRemoteFileReader::async_touch_local_cache(size_t offset, size_t size, + const IOContext* io_ctx) { if (offset >= this->size() || size == 0) { return; } @@ -654,9 +655,10 @@ void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IO dryrun_ctx.file_cache_stats = nullptr; dryrun_ctx.file_reader_stats = nullptr; - LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) - << fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}", - offset, size, path().filename().native()); + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] Submitting local cache touch task for offset={} size={}, " + "file={}", + offset, size, path().filename().native()); std::weak_ptr weak_this = shared_from_this(); auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() { auto self = weak_this.lock(); @@ -666,14 +668,15 @@ void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IO size_t bytes_read; Slice dummy_buffer((char*)nullptr, size); (void)self->read_at_impl(offset, dummy_buffer, &bytes_read, &dryrun_ctx); - LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) - << fmt::format("[verbose] Prefetch task completed for offset={} size={}, file={}", - offset, size, self->path().filename().native()); + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] Local cache touch task completed for offset={} size={}, " + "file={}", + offset, size, self->path().filename().native()); }); if (!st.ok()) { - VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset << " size=" << size - << " error=" << st.to_string(); + VLOG_DEBUG << "Failed to submit local cache touch task for offset=" << offset + << " size=" << size << " error=" << st.to_string(); } } diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index 3f2e1ceb2e1395..c4b9f86c80373e 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -37,8 +37,8 @@ namespace doris::io { struct IOContext; struct FileCacheStatistics; -class CachedRemoteFileReader final : public FileReader, - public std::enable_shared_from_this { +class CachedRemoteFileReader : public FileReader, + public std::enable_shared_from_this { public: CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const FileReaderOptions& opts); @@ -58,17 +58,17 @@ class CachedRemoteFileReader final : public FileReader, int64_t mtime() const override { return _remote_file_reader->mtime(); } - // Asynchronously prefetch a range of file cache blocks. - // This method triggers read file cache in dryrun mode to warm up the cache - // without actually reading the data into user buffers. + // Asynchronously touch a range into the local file cache. + // This method schedules a dry-run read, which downloads missing bytes into + // local file cache blocks without copying data into the caller's buffer. // // Parameters: // offset: Starting offset in the file - // size: Number of bytes to prefetch + // size: Number of bytes to touch into local cache // io_ctx: IO context (can be nullptr, will create a dryrun context internally) // // Note: This is a best-effort operation. Errors are logged but not returned. - void prefetch_range(size_t offset, size_t size, const IOContext* io_ctx = nullptr); + void async_touch_local_cache(size_t offset, size_t size, const IOContext* io_ctx = nullptr); protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, diff --git a/be/src/io/fs/file_reader.cpp b/be/src/io/fs/file_reader.cpp index 86596fd88f7020..b9f13cd24f7086 100644 --- a/be/src/io/fs/file_reader.cpp +++ b/be/src/io/fs/file_reader.cpp @@ -20,6 +20,7 @@ #include #include +#include "io/cache/cache_block_aware_prefetch_remote_reader.h" #include "io/cache/cached_remote_file_reader.h" #include "io/fs/file_system.h" #include "util/async_io.h" @@ -44,6 +45,10 @@ Result create_cached_file_reader(FileReaderSPtr raw_reader, case io::FileCachePolicy::NO_CACHE: return raw_reader; case FileCachePolicy::FILE_BLOCK_CACHE: + if (opts.enable_cache_block_prefetch) { + return std::make_shared(std::move(raw_reader), + opts); + } return std::make_shared(std::move(raw_reader), opts); default: return ResultError(Status::InternalError("Unknown cache type: {}", opts.cache_type)); diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index 3df912cbad4af9..b860411f4811d2 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -59,6 +59,9 @@ struct FileReaderOptions { int64_t mtime = 0; // Used to query the location of the file cache int64_t tablet_id = -1; + // If true, FILE_BLOCK_CACHE readers are created as CacheBlockAwarePrefetchRemoteReader. + // Callers can install one file access pattern and let read_at() prefetch file cache blocks. + bool enable_cache_block_prefetch = false; static const FileReaderOptions DEFAULT; }; diff --git a/be/src/storage/index/ordinal_page_index.h b/be/src/storage/index/ordinal_page_index.h index f17d80c20ed120..0ba84ae8ed33b1 100644 --- a/be/src/storage/index/ordinal_page_index.h +++ b/be/src/storage/index/ordinal_page_index.h @@ -99,7 +99,7 @@ class OrdinalIndexReader : public MetadataAdder { private: friend class OrdinalPageIndexIterator; - friend class SegmentPrefetcher; + friend class SegmentFileAccessRangeBuilder; io::FileReaderSPtr _file_reader; DorisCallOnce _load_once; diff --git a/be/src/storage/olap_common.h b/be/src/storage/olap_common.h index 2c31e92b115ed3..ebdceb494d4d92 100644 --- a/be/src/storage/olap_common.h +++ b/be/src/storage/olap_common.h @@ -438,7 +438,7 @@ struct OlapReaderStatistics { int64_t segment_iterator_init_timer_ns = 0; int64_t segment_iterator_init_return_column_iterators_timer_ns = 0; int64_t segment_iterator_init_index_iterators_timer_ns = 0; - int64_t segment_iterator_init_segment_prefetchers_timer_ns = 0; + int64_t segment_iterator_init_cache_block_prefetch_timer_ns = 0; int64_t segment_create_column_readers_timer_ns = 0; int64_t segment_load_index_timer_ns = 0; diff --git a/be/src/storage/rowset/beta_rowset.cpp b/be/src/storage/rowset/beta_rowset.cpp index 70950dfe065634..dab3a24c8be79d 100644 --- a/be/src/storage/rowset/beta_rowset.cpp +++ b/be/src/storage/rowset/beta_rowset.cpp @@ -273,6 +273,9 @@ Status BetaRowset::load_segment(int64_t seg_id, OlapReaderStatistics* stats, .cache_base_path = "", .file_size = _rowset_meta->segment_file_size(static_cast(seg_id)), .tablet_id = _rowset_meta->tablet_id(), + .enable_cache_block_prefetch = config::is_cloud_mode() && config::enable_file_cache && + (config::enable_query_segment_file_cache_prefetch || + config::enable_compaction_segment_file_cache_prefetch), }; auto s = segment_v2::Segment::open( @@ -633,6 +636,10 @@ Status BetaRowset::check_current_rowset_segment() { .cache_base_path {}, .file_size = _rowset_meta->segment_file_size(seg_id), .tablet_id = _rowset_meta->tablet_id(), + .enable_cache_block_prefetch = + config::is_cloud_mode() && config::enable_file_cache && + (config::enable_query_segment_file_cache_prefetch || + config::enable_compaction_segment_file_cache_prefetch), }; auto s = segment_v2::Segment::open(fs, seg_path, _rowset_meta->tablet_id(), seg_id, diff --git a/be/src/storage/segment/column_reader.cpp b/be/src/storage/segment/column_reader.cpp index 20e20879d087b8..9ce4f249db9cb1 100644 --- a/be/src/storage/segment/column_reader.cpp +++ b/be/src/storage/segment/column_reader.cpp @@ -70,7 +70,7 @@ #include "storage/segment/page_pointer.h" // for PagePointer #include "storage/segment/row_ranges.h" #include "storage/segment/segment.h" -#include "storage/segment/segment_prefetcher.h" +#include "storage/segment/segment_file_access_range_builder.h" #include "storage/segment/variant/variant_column_reader.h" #include "storage/tablet/tablet_schema.h" #include "storage/types.h" // for TypeInfo @@ -278,7 +278,13 @@ ColumnReader::ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& _opts(opts), _num_rows(num_rows), _file_reader(std::move(file_reader)), + _file_reader_factory(opts.file_reader_factory), _dict_encoding_type(UNKNOWN_DICT_ENCODING) { + // The factory is stored separately from _opts because _opts is copied into + // page-read options and metadata helpers. Keeping the factory out of _opts + // avoids retaining a ColumnReaderCache-capturing lambda in code paths that + // only need immutable column metadata. + _opts.file_reader_factory = nullptr; _meta_length = meta.length(); _meta_type = (FieldType)meta.type(); if (_meta_type == FieldType::OLAP_FIELD_TYPE_ARRAY) { @@ -290,6 +296,19 @@ ColumnReader::ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& _meta_compression = meta.compression(); } +Result ColumnReader::_new_data_file_reader() const { + // This is intentionally called by FileColumnIterator::init() instead of by + // ColumnReader::create(). A cached ColumnReader can be shared by many scan + // iterators, while every FileColumnIterator needs its own data reader when + // cache-block prefetch is enabled. Returning the shared reader in the + // fallback path keeps the old behavior for local files and for scans that do + // not request cache-aware prefetch. + if (_file_reader_factory) { + return _file_reader_factory(); + } + return _file_reader; +} + ColumnReader::~ColumnReader() = default; int64_t ColumnReader::get_metadata_size() const { @@ -937,27 +956,30 @@ Status MapFileColumnIterator::seek_to_ordinal(ordinal_t ord) { return Status::OK(); } -Status MapFileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { - RETURN_IF_ERROR(_offsets_iterator->init_prefetcher(params)); +Status MapFileColumnIterator::init_cache_block_prefetch( + const SegmentCacheBlockPrefetchParams& params) { + RETURN_IF_ERROR(_offsets_iterator->init_cache_block_prefetch(params)); if (_map_reader->is_nullable()) { - RETURN_IF_ERROR(_null_iterator->init_prefetcher(params)); + RETURN_IF_ERROR(_null_iterator->init_cache_block_prefetch(params)); } - RETURN_IF_ERROR(_key_iterator->init_prefetcher(params)); - RETURN_IF_ERROR(_val_iterator->init_prefetcher(params)); + RETURN_IF_ERROR(_key_iterator->init_cache_block_prefetch(params)); + RETURN_IF_ERROR(_val_iterator->init_cache_block_prefetch(params)); return Status::OK(); } -void MapFileColumnIterator::collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) { - _offsets_iterator->collect_prefetchers(prefetchers, init_method); +void MapFileColumnIterator::collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) { + _offsets_iterator->collect_cache_block_prefetch_iterators(iterators, init_method); if (_map_reader->is_nullable()) { - _null_iterator->collect_prefetchers(prefetchers, init_method); + _null_iterator->collect_cache_block_prefetch_iterators(iterators, init_method); } // the actual data pages to read of key/value column depends on the read result of offset column, // so we can't init prefetch blocks according to rowids, just prefetch all data blocks here. - _key_iterator->collect_prefetchers(prefetchers, PrefetcherInitMethod::ALL_DATA_BLOCKS); - _val_iterator->collect_prefetchers(prefetchers, PrefetcherInitMethod::ALL_DATA_BLOCKS); + _key_iterator->collect_cache_block_prefetch_iterators( + iterators, FileAccessRangeBuildMethod::ALL_DATA_PAGES); + _val_iterator->collect_cache_block_prefetch_iterators( + iterators, FileAccessRangeBuildMethod::ALL_DATA_PAGES); } Status MapFileColumnIterator::next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) { @@ -1462,24 +1484,25 @@ Status StructFileColumnIterator::seek_to_ordinal(ordinal_t ord) { return Status::OK(); } -Status StructFileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { +Status StructFileColumnIterator::init_cache_block_prefetch( + const SegmentCacheBlockPrefetchParams& params) { for (auto& column_iterator : _sub_column_iterators) { - RETURN_IF_ERROR(column_iterator->init_prefetcher(params)); + RETURN_IF_ERROR(column_iterator->init_cache_block_prefetch(params)); } if (_struct_reader->is_nullable()) { - RETURN_IF_ERROR(_null_iterator->init_prefetcher(params)); + RETURN_IF_ERROR(_null_iterator->init_cache_block_prefetch(params)); } return Status::OK(); } -void StructFileColumnIterator::collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) { +void StructFileColumnIterator::collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) { for (auto& column_iterator : _sub_column_iterators) { - column_iterator->collect_prefetchers(prefetchers, init_method); + column_iterator->collect_cache_block_prefetch_iterators(iterators, init_method); } if (_struct_reader->is_nullable()) { - _null_iterator->collect_prefetchers(prefetchers, init_method); + _null_iterator->collect_cache_block_prefetch_iterators(iterators, init_method); } } @@ -1638,14 +1661,15 @@ Status OffsetFileColumnIterator::_peek_one_offset(ordinal_t* offset) { return Status::OK(); } -Status OffsetFileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { - return _offset_iterator->init_prefetcher(params); +Status OffsetFileColumnIterator::init_cache_block_prefetch( + const SegmentCacheBlockPrefetchParams& params) { + return _offset_iterator->init_cache_block_prefetch(params); } -void OffsetFileColumnIterator::collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) { - _offset_iterator->collect_prefetchers(prefetchers, init_method); +void OffsetFileColumnIterator::collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) { + _offset_iterator->collect_cache_block_prefetch_iterators(iterators, init_method); } /** @@ -1819,24 +1843,26 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, MutableColumnPtr& dst, boo return Status::OK(); } -Status ArrayFileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { - RETURN_IF_ERROR(_offset_iterator->init_prefetcher(params)); - RETURN_IF_ERROR(_item_iterator->init_prefetcher(params)); +Status ArrayFileColumnIterator::init_cache_block_prefetch( + const SegmentCacheBlockPrefetchParams& params) { + RETURN_IF_ERROR(_offset_iterator->init_cache_block_prefetch(params)); + RETURN_IF_ERROR(_item_iterator->init_cache_block_prefetch(params)); if (_array_reader->is_nullable()) { - RETURN_IF_ERROR(_null_iterator->init_prefetcher(params)); + RETURN_IF_ERROR(_null_iterator->init_cache_block_prefetch(params)); } return Status::OK(); } -void ArrayFileColumnIterator::collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) { - _offset_iterator->collect_prefetchers(prefetchers, init_method); +void ArrayFileColumnIterator::collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) { + _offset_iterator->collect_cache_block_prefetch_iterators(iterators, init_method); // the actual data pages to read of item column depends on the read result of offset column, // so we can't init prefetch blocks according to rowids, just prefetch all data blocks here. - _item_iterator->collect_prefetchers(prefetchers, PrefetcherInitMethod::ALL_DATA_BLOCKS); + _item_iterator->collect_cache_block_prefetch_iterators( + iterators, FileAccessRangeBuildMethod::ALL_DATA_PAGES); if (_array_reader->is_nullable()) { - _null_iterator->collect_prefetchers(prefetchers, init_method); + _null_iterator->collect_cache_block_prefetch_iterators(iterators, init_method); } } @@ -2014,6 +2040,15 @@ Status FileColumnIterator::init(const ColumnIteratorOptions& opts) { } _opts = opts; + // Install the physical data reader for this iterator before any page read. + // When cache-block prefetch is enabled, this call opens a dedicated + // CacheBlockAwarePrefetchRemoteReader for this iterator. All subsequent + // PageIO reads use _opts.file_reader, so data-page and dict-page read_at() + // calls advance only this iterator's single prefetch pattern. Other columns + // or another scan over the same column have their own FileColumnIterator and + // therefore their own reader/pattern state. + _data_file_reader = DORIS_TRY(_reader->_new_data_file_reader()); + _opts.file_reader = _data_file_reader.get(); if (!_opts.use_page_cache) { _reader->disable_index_meta_cache(); } @@ -2043,28 +2078,12 @@ Status FileColumnIterator::init(const ColumnIteratorOptions& opts) { FileColumnIterator::~FileColumnIterator() = default; -void FileColumnIterator::_trigger_prefetch_if_eligible(ordinal_t ord) { - std::vector ranges; - if (_prefetcher->need_prefetch(cast_set(ord), &ranges)) { - for (const auto& range : ranges) { - _cached_remote_file_reader->prefetch_range(range.offset, range.size, &_opts.io_ctx); - } - } -} - Status FileColumnIterator::seek_to_ordinal(ordinal_t ord) { if (_reading_flag == ReadingFlag::SKIP_READING) { DLOG(INFO) << "File column iterator column " << _column_name << " skip reading."; return Status::OK(); } - LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( - "[verbose] FileColumnIterator::seek_to_ordinal seek to ordinal {}, enable_prefetch={}", - ord, _enable_prefetch); - if (_enable_prefetch) { - _trigger_prefetch_if_eligible(ord); - } - // if current page contains this row, we don't need to seek if (!_page || !_page.contains(ord) || !_page_iter.valid()) { RETURN_IF_ERROR(_reader->seek_at_or_before(ord, &_page_iter, _opts)); @@ -2386,6 +2405,12 @@ Status FileColumnIterator::_load_next_page(bool* eos) { } Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter) { + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] FileColumnIterator::_read_data_page page_offset={}, has_prefetch_pattern={}", + iter.page().offset, + _cache_block_prefetch_reader != nullptr && + _cache_block_prefetch_reader->has_read_pattern()); + PageHandle handle; Slice page_body; PageFooterPB footer; @@ -2483,26 +2508,59 @@ Status FileColumnIterator::get_row_ranges_by_dict(const AndBlockColumnPredicate* return Status::OK(); } -Status FileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { - if (_cached_remote_file_reader = - std::dynamic_pointer_cast(_reader->_file_reader); - !_cached_remote_file_reader) { +Status FileColumnIterator::init_cache_block_prefetch( + const SegmentCacheBlockPrefetchParams& params) { + // _data_file_reader is iterator-local. If it is cache-block aware, the + // pattern installed below belongs only to this iterator. That is why + // FileColumnIterator no longer has to keep a pattern handle or call prefetch + // before every page read: CacheBlockAwarePrefetchRemoteReader::read_at() + // observes the actual file offset used by PageIO and advances the single + // pattern itself. + _cache_block_prefetch_reader = + std::dynamic_pointer_cast(_data_file_reader); + if (!_cache_block_prefetch_reader) { return Status::OK(); } - _enable_prefetch = true; - _prefetcher = std::make_unique(params.config); - RETURN_IF_ERROR(_prefetcher->init(_reader, params.read_options)); + + OrdinalIndexReader* ordinal_index = nullptr; + RETURN_IF_ERROR(_reader->get_ordinal_index_reader(ordinal_index, params.read_options.stats)); + _cache_block_prefetch_policy = params.policy; + _cache_block_read_direction = params.read_options.read_orderby_key_reverse + ? io::CacheBlockReadDirection::BACKWARD + : io::CacheBlockReadDirection::FORWARD; + _access_range_builder = std::make_unique( + ordinal_index, _cache_block_read_direction); return Status::OK(); } -void FileColumnIterator::collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) { - if (_prefetcher) { - prefetchers[init_method].emplace_back(_prefetcher.get()); +void FileColumnIterator::collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) { + if (_access_range_builder) { + iterators[init_method].emplace_back(this); } } +Status FileColumnIterator::install_cache_block_prefetch_pattern( + std::vector ranges) { + DCHECK(_cache_block_prefetch_reader != nullptr); + // SegmentIterator builds these ranges once after index pruning. The ranges + // describe the file offsets this physical iterator will read later. Because + // the cache-aware reader is not shared, replacing its one pattern here cannot + // disturb any sibling column iterator or another scan iterator. + io::CacheBlockReadPattern pattern { + .direction = _cache_block_read_direction, + .ranges = std::move(ranges), + }; + return _cache_block_prefetch_reader->set_read_pattern(std::move(pattern), + _cache_block_prefetch_policy); +} + +void FileColumnIterator::async_touch_cache_block_prefetch_initial_window() { + DCHECK(_cache_block_prefetch_reader != nullptr); + _cache_block_prefetch_reader->async_touch_initial_window(&_opts.io_ctx); +} + Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) { _opts = opts; // be consistent with segment v1 diff --git a/be/src/storage/segment/column_reader.h b/be/src/storage/segment/column_reader.h index 281b6b429944f4..3c07f137ab110a 100644 --- a/be/src/storage/segment/column_reader.h +++ b/be/src/storage/segment/column_reader.h @@ -23,7 +23,9 @@ #include // for size_t #include // for uint32_t -#include // for unique_ptr +#include +#include +#include // for unique_ptr #include #include #include @@ -33,7 +35,6 @@ #include "common/status.h" // for Status #include "core/column/column_array.h" // ColumnArray #include "core/data_type/data_type.h" -#include "io/cache/cached_remote_file_reader.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/io_common.h" #include "storage/index/index_reader.h" @@ -45,7 +46,7 @@ #include "storage/segment/page_handle.h" // for PageHandle #include "storage/segment/page_pointer.h" #include "storage/segment/parsed_page.h" // for ParsedPage -#include "storage/segment/segment_prefetcher.h" +#include "storage/segment/segment_file_access_range_builder.h" #include "storage/segment/stream_reader.h" #include "storage/tablet/tablet_schema.h" #include "storage/types.h" @@ -90,6 +91,20 @@ struct ColumnReaderOptions { int be_exec_version = -1; TabletSchemaSPtr tablet_schema = nullptr; + + // Optional factory for creating the data-page FileReader used by one + // physical FileColumnIterator. + // + // ColumnReader instances are cached at segment scope and may be reused by + // different SegmentIterators or by different physical subcolumns. They must + // not own mutable scan progress for cache-block prefetch. When + // enable_cache_block_prefetch is false, the factory returns the shared + // segment reader to keep the historical resource-sharing behavior. When it + // is true, the factory opens a fresh CacheBlockAwarePrefetchRemoteReader for + // each FileColumnIterator::init(), so that reader owns exactly one monotonic + // read pattern and read_at() can advance it without any outer manual trigger + // or cross-column/cross-scan interference. + std::function()> file_reader_factory; }; struct ColumnIteratorOptions { @@ -237,11 +252,11 @@ class ColumnReader : public MetadataAdder, private: friend class VariantColumnReader; friend class FileColumnIterator; - friend class SegmentPrefetcher; ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows, io::FileReaderSPtr file_reader); Status init(const ColumnMetaPB* meta); + Result _new_data_file_reader() const; [[nodiscard]] Status _load_zone_map_index(bool use_page_cache, bool kept_in_memory, const ColumnIteratorOptions& iter_opts); @@ -278,6 +293,7 @@ class ColumnReader : public MetadataAdder, uint64_t _num_rows; io::FileReaderSPtr _file_reader; + std::function()> _file_reader_factory; DictEncodingType _dict_encoding_type; @@ -402,11 +418,21 @@ class ColumnIterator { virtual void remove_pruned_sub_iterators() {}; - virtual Status init_prefetcher(const SegmentPrefetchParams& params) { return Status::OK(); } + virtual Status init_cache_block_prefetch(const SegmentCacheBlockPrefetchParams& params) { + return Status::OK(); + } + + virtual void collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) {} - virtual void collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) {} + virtual SegmentFileAccessRangeBuilder* cache_block_prefetch_range_builder() { return nullptr; } + + virtual Status install_cache_block_prefetch_pattern(std::vector ranges) { + return Status::OK(); + } + + virtual void async_touch_cache_block_prefetch_initial_window() {} static constexpr const char* ACCESS_OFFSET = "OFFSET"; static constexpr const char* ACCESS_ALL = "*"; @@ -479,10 +505,15 @@ class FileColumnIterator : public ColumnIterator { bool is_all_dict_encoding() const override { return _is_all_dict_encoding; } - Status init_prefetcher(const SegmentPrefetchParams& params) override; - void collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) override; + Status init_cache_block_prefetch(const SegmentCacheBlockPrefetchParams& params) override; + void collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) override; + SegmentFileAccessRangeBuilder* cache_block_prefetch_range_builder() override { + return _access_range_builder.get(); + } + Status install_cache_block_prefetch_pattern(std::vector ranges) override; + void async_touch_cache_block_prefetch_initial_window() override; protected: // Exposed to derived iterators (e.g. StringFileColumnIterator) so they can @@ -494,7 +525,6 @@ class FileColumnIterator : public ColumnIterator { Status _load_next_page(bool* eos); Status _read_data_page(const OrdinalPageIndexIterator& iter); Status _read_dict_data(); - void _trigger_prefetch_if_eligible(ordinal_t ord); std::shared_ptr _reader = nullptr; @@ -522,9 +552,15 @@ class FileColumnIterator : public ColumnIterator { std::unique_ptr _dict_word_info; - bool _enable_prefetch {false}; - std::unique_ptr _prefetcher; - std::shared_ptr _cached_remote_file_reader {nullptr}; + std::unique_ptr _access_range_builder; + // Owned by this iterator and used for all data/dict page reads through + // _opts.file_reader. With cache-block prefetch enabled this is a fresh + // CacheBlockAwarePrefetchRemoteReader, not the segment-level shared reader. + // Therefore the reader's single read pattern is iterator-local. + io::FileReaderSPtr _data_file_reader; + std::shared_ptr _cache_block_prefetch_reader; + io::CacheBlockPrefetchPolicy _cache_block_prefetch_policy; + io::CacheBlockReadDirection _cache_block_read_direction = io::CacheBlockReadDirection::FORWARD; }; class EmptyFileColumnIterator final : public ColumnIterator { @@ -583,10 +619,10 @@ class OffsetFileColumnIterator final : public ColumnIterator { return _offset_iterator->read_by_rowids(rowids, count, dst); } - Status init_prefetcher(const SegmentPrefetchParams& params) override; - void collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) override; + Status init_cache_block_prefetch(const SegmentCacheBlockPrefetchParams& params) override; + void collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) override; private: std::unique_ptr _offset_iterator; @@ -620,10 +656,10 @@ class MapFileColumnIterator final : public ColumnIterator { } return _offsets_iterator->get_current_ordinal(); } - Status init_prefetcher(const SegmentPrefetchParams& params) override; - void collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) override; + Status init_cache_block_prefetch(const SegmentCacheBlockPrefetchParams& params) override; + void collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) override; Status set_access_paths(const TColumnAccessPaths& all_access_paths, const TColumnAccessPaths& predicate_access_paths) override; @@ -676,10 +712,10 @@ class StructFileColumnIterator final : public ColumnIterator { void remove_pruned_sub_iterators() override; - Status init_prefetcher(const SegmentPrefetchParams& params) override; - void collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) override; + Status init_cache_block_prefetch(const SegmentCacheBlockPrefetchParams& params) override; + void collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) override; private: std::shared_ptr _struct_reader = nullptr; @@ -723,10 +759,10 @@ class ArrayFileColumnIterator final : public ColumnIterator { void remove_pruned_sub_iterators() override; - Status init_prefetcher(const SegmentPrefetchParams& params) override; - void collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) override; + Status init_cache_block_prefetch(const SegmentCacheBlockPrefetchParams& params) override; + void collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) override; private: std::shared_ptr _array_reader = nullptr; diff --git a/be/src/storage/segment/column_reader_cache.cpp b/be/src/storage/segment/column_reader_cache.cpp index 1c8ed7728ada81..ad5b90fc8993f7 100644 --- a/be/src/storage/segment/column_reader_cache.cpp +++ b/be/src/storage/segment/column_reader_cache.cpp @@ -17,6 +17,8 @@ #include "storage/segment/column_reader_cache.h" +#include + #include "storage/segment/column_meta_accessor.h" #include "storage/segment/segment.h" #include "storage/segment/variant/variant_column_reader.h" @@ -30,15 +32,59 @@ namespace doris::segment_v2 { ColumnReaderCache::ColumnReaderCache( ColumnMetaAccessor* accessor, TabletSchemaSPtr tablet_schema, - io::FileReaderSPtr file_reader, uint64_t num_rows, + io::FileReaderSPtr file_reader, io::FileSystemSPtr fs, io::Path path, + io::FileReaderOptions reader_options, uint64_t num_rows, std::function&, OlapReaderStatistics*)> get_footer_cb) : _accessor(accessor), _tablet_schema(std::move(tablet_schema)), - _file_reader(std::move(file_reader)), + _file_reader(file_reader), + _file_reader_factory( + [shared_reader = std::move(file_reader), fs = std::move(fs), + path = std::move(path), + reader_options = std::move(reader_options)]() -> Result { + // This lambda is copied into each cached ColumnReader and is invoked from + // FileColumnIterator::init(), not from ColumnReader construction. That + // placement is deliberate: + // + // * ColumnReader is shared metadata. It is cached by (column uid, path) and + // can outlive or be reused across many scan iterators. + // * FileColumnIterator is the physical scan stream that has a locally + // monotonic page-read sequence. It is the right owner for a + // CacheBlockAwarePrefetchRemoteReader's single mutable read pattern. + // * Keeping this as a factory preserves the old shared-reader behavior when + // prefetch is disabled, while letting every iterator get a fresh + // cache-aware reader when prefetch is enabled. + if (!reader_options.enable_cache_block_prefetch) { + return shared_reader; + } + + // Segment::_open records the fs/path/options used to open the segment data + // file. Reopening with the same options here lets FileSystem::open_file() + // build the same reader stack, except it is no longer shared by sibling + // column iterators. In cloud mode this means each iterator has an independent + // CacheBlockAwarePrefetchRemoteReader around the remote reader and can + // spend extra object-storage IOPS to warm file-cache blocks concurrently. + DORIS_CHECK(fs != nullptr); + io::FileReaderSPtr file_reader; + Status st = fs->open_file(path, &file_reader, &reader_options); + if (!st.ok()) { + return ResultError(st); + } + return file_reader; + }), _num_rows(num_rows), _get_footer_cb(std::move(get_footer_cb)) {} +ColumnReaderCache::ColumnReaderCache( + ColumnMetaAccessor* accessor, TabletSchemaSPtr tablet_schema, + io::FileReaderSPtr file_reader, uint64_t num_rows, + std::function&, OlapReaderStatistics*)> + get_footer_cb) + : ColumnReaderCache(accessor, std::move(tablet_schema), std::move(file_reader), nullptr, + io::Path(), io::FileReaderOptions(), num_rows, + std::move(get_footer_cb)) {} + ColumnReaderCache::~ColumnReaderCache() { g_segment_column_reader_cache_count << -_cache_map.size(); } @@ -118,7 +164,8 @@ Status ColumnReaderCache::get_column_reader(int32_t col_uid, ColumnReaderOptions opts {.kept_in_memory = _tablet_schema->is_in_memory(), .be_exec_version = _be_exec_version, - .tablet_schema = _tablet_schema}; + .tablet_schema = _tablet_schema, + .file_reader_factory = _file_reader_factory}; std::shared_ptr reader; if ((FieldType)meta.type() == FieldType::OLAP_FIELD_TYPE_VARIANT) { @@ -166,7 +213,8 @@ Status ColumnReaderCache::get_path_column_reader(int32_t col_uid, PathInData rel // Ensure variant root reader is available in cache. ColumnReaderOptions opts {.kept_in_memory = _tablet_schema->is_in_memory(), .be_exec_version = _be_exec_version, - .tablet_schema = _tablet_schema}; + .tablet_schema = _tablet_schema, + .file_reader_factory = _file_reader_factory}; std::shared_ptr variant_column_reader; RETURN_IF_ERROR(get_column_reader(col_uid, &variant_column_reader, stats)); @@ -193,4 +241,4 @@ Status ColumnReaderCache::get_path_column_reader(int32_t col_uid, PathInData rel return Status::OK(); } -} // namespace doris::segment_v2 \ No newline at end of file +} // namespace doris::segment_v2 diff --git a/be/src/storage/segment/column_reader_cache.h b/be/src/storage/segment/column_reader_cache.h index bc672c7b408d61..21573f548bc625 100644 --- a/be/src/storage/segment/column_reader_cache.h +++ b/be/src/storage/segment/column_reader_cache.h @@ -16,8 +16,11 @@ // under the License. #pragma once +#include + #include "agent/be_exec_version_manager.h" #include "io/fs/file_reader.h" +#include "io/fs/file_system.h" #include "storage/segment/stream_reader.h" #include "storage/tablet/tablet_fwd.h" #include "util/json/path_in_data.h" @@ -45,6 +48,12 @@ class ColumnReaderCache { // Main constructor used in production: cache is bound to a specific segment's // ColumnMetaAccessor, TabletSchema, file reader and row count, plus a footer // getter callback (Segment::_get_segment_footer). + ColumnReaderCache( + ColumnMetaAccessor* accessor, TabletSchemaSPtr tablet_schema, + io::FileReaderSPtr file_reader, io::FileSystemSPtr fs, io::Path path, + io::FileReaderOptions reader_options, uint64_t num_rows, + std::function&, OlapReaderStatistics*)> + get_footer_cb); ColumnReaderCache( ColumnMetaAccessor* accessor, TabletSchemaSPtr tablet_schema, io::FileReaderSPtr file_reader, uint64_t num_rows, @@ -76,6 +85,7 @@ class ColumnReaderCache { // Insert an already-created reader directly into cache void _insert_direct(const ColumnReaderCacheKey& key, const std::shared_ptr& column_reader); + // keep _lru_list and _cache_map thread safe std::mutex _cache_mutex; // Doubly-linked list to maintain LRU order @@ -89,9 +99,10 @@ class ColumnReaderCache { // Segment-level context needed to construct ColumnReader. TabletSchemaSPtr _tablet_schema; io::FileReaderSPtr _file_reader; + std::function()> _file_reader_factory; uint64_t _num_rows = 0; // Callback to get footer, usually bound to Segment::_get_segment_footer. std::function&, OlapReaderStatistics*)> _get_footer_cb; }; -} // namespace doris::segment_v2 \ No newline at end of file +} // namespace doris::segment_v2 diff --git a/be/src/storage/segment/segment.cpp b/be/src/storage/segment/segment.cpp index 103df3f482fd19..4595606276102d 100644 --- a/be/src/storage/segment/segment.cpp +++ b/be/src/storage/segment/segment.cpp @@ -122,6 +122,10 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s if (st) { segment->_fs = fs; segment->_file_reader = std::move(file_reader); + // Keep the exact reader options that produced the opened segment reader. + // ColumnReaderCache uses them later to reopen equivalent per-iterator + // data readers when cache-block prefetch is enabled. + segment->_reader_options = reader_options; st = segment->_open(stats); } @@ -141,6 +145,10 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s if (st) { segment->_fs = fs; segment->_file_reader = std::move(file_reader); + // See the first open path above. The retry still uses the original + // cache-enabled options, so per-iterator readers should use the same + // stack after the corrupt cache blocks are removed. + segment->_reader_options = reader_options; st = segment->_open(stats); } TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st); @@ -157,6 +165,10 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt)); segment->_fs = fs; segment->_file_reader = std::move(file_reader); + // The segment finally opened without file cache. Preserve that + // choice so later ColumnReaderCache factories do not recreate + // cache-aware per-iterator readers for this segment. + segment->_reader_options = opt; st = segment->_open(stats); if (!st.ok()) { // Tier 3: Remote source itself is corrupt. @@ -640,8 +652,14 @@ Status Segment::_create_column_meta(const SegmentFooterPB& footer) { } } + // ColumnReaderCache keeps ColumnReaders as shared metadata, but it also gets + // fs/path/options so it can provide a FileReader factory to FileColumnIterator. + // That factory returns the shared _file_reader when cache-block prefetch is + // off, and opens a fresh cache-aware reader per physical iterator when it is + // on. _column_reader_cache = std::make_unique( - _column_meta_accessor.get(), _tablet_schema, _file_reader, _num_rows, + _column_meta_accessor.get(), _tablet_schema, _file_reader, _fs, _file_reader->path(), + _reader_options, _num_rows, [this](std::shared_ptr& footer_pb, OlapReaderStatistics* stats) { return _get_segment_footer(footer_pb, stats); }); diff --git a/be/src/storage/segment/segment.h b/be/src/storage/segment/segment.h index 8806505d1e14bd..07f4dd91ea9531 100644 --- a/be/src/storage/segment/segment.h +++ b/be/src/storage/segment/segment.h @@ -260,6 +260,7 @@ class Segment : public std::enable_shared_from_this, public MetadataAdd io::FileSystemSPtr _fs; io::FileReaderSPtr _file_reader; + io::FileReaderOptions _reader_options; uint32_t _segment_id; uint32_t _num_rows; AtomicStatus _healthy_status; diff --git a/be/src/storage/segment/segment_file_access_range_builder.cpp b/be/src/storage/segment/segment_file_access_range_builder.cpp new file mode 100644 index 00000000000000..8b509474cf57b8 --- /dev/null +++ b/be/src/storage/segment/segment_file_access_range_builder.cpp @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/segment/segment_file_access_range_builder.h" + +#include +#include +#include + +#include "common/config.h" +#include "common/logging.h" +#include "storage/index/ordinal_page_index.h" + +namespace doris::segment_v2 { + +SegmentFileAccessRangeBuilder::SegmentFileAccessRangeBuilder(OrdinalIndexReader* ordinal_index, + io::CacheBlockReadDirection direction) + : _ordinal_index(ordinal_index), _direction(direction) { + DCHECK(_ordinal_index != nullptr); +} + +void SegmentFileAccessRangeBuilder::reset() { + _access_ranges.clear(); + _next_page_hint = 0; + _pending_page_index.reset(); +} + +void SegmentFileAccessRangeBuilder::add_ascending_rowids(std::span rowids) { + DCHECK(_ordinal_index != nullptr); + if (rowids.empty()) { + return; + } + DCHECK(std::ranges::is_sorted(rowids)); + const auto& ordinals = _ordinal_index->_ordinals; + const int num_pages = _ordinal_index->_num_pages; + DORIS_CHECK(num_pages > 0); + for (const rowid_t rowid : rowids) { + while (_next_page_hint < num_pages - 1 && ordinals[_next_page_hint + 1] <= rowid) { + _next_page_hint++; + } + + if (!_pending_page_index.has_value() || _next_page_hint != *_pending_page_index) { + if (_pending_page_index.has_value()) { + _append_page_access_range(*_pending_page_index); + } + _pending_page_index = _next_page_hint; + } + } +} + +std::vector SegmentFileAccessRangeBuilder::finish_by_rowids() { + DCHECK(_ordinal_index != nullptr); + if (_pending_page_index.has_value()) { + _append_page_access_range(*_pending_page_index); + } + _reverse_if_backward(); + auto output = std::move(_access_ranges); + reset(); + return output; +} + +std::vector SegmentFileAccessRangeBuilder::build_all_data_page_ranges() { + DCHECK(_ordinal_index != nullptr); + reset(); + const int num_pages = _ordinal_index->_num_pages; + + for (int page_index = 0; page_index < num_pages; ++page_index) { + _append_page_access_range(page_index); + } + + _reverse_if_backward(); + auto output = std::move(_access_ranges); + reset(); + return output; +} + +void SegmentFileAccessRangeBuilder::add_rowids_from_bitmap( + const roaring::Roaring& row_bitmap, + std::span builders) { + for (auto* builder : builders) { + builder->reset(); + } + + int batch_size = config::segment_file_cache_consume_rowids_batch_size; + DORIS_CHECK(batch_size > 0); + std::vector rowids(batch_size); + roaring::api::roaring_uint32_iterator_t iter; + roaring::api::roaring_init_iterator(&row_bitmap.roaring, &iter); + uint32_t num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size); + + for (; num > 0; + num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size)) { + for (auto* builder : builders) { + builder->add_ascending_rowids(std::span(rowids.data(), num)); + } + } +} + +void SegmentFileAccessRangeBuilder::_append_page_access_range(int page_index) { + const auto& page = _ordinal_index->_pages[page_index]; + _access_ranges.push_back(io::FileAccessRange { + .offset = page.offset, + .size = page.size, + }); +} + +void SegmentFileAccessRangeBuilder::_reverse_if_backward() { + if (!_is_forward() && !_access_ranges.empty()) { + std::ranges::reverse(_access_ranges); + } +} + +} // namespace doris::segment_v2 diff --git a/be/src/storage/segment/segment_file_access_range_builder.h b/be/src/storage/segment/segment_file_access_range_builder.h new file mode 100644 index 00000000000000..2ba695ef94ef98 --- /dev/null +++ b/be/src/storage/segment/segment_file_access_range_builder.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "io/cache/cache_block_aware_prefetch_remote_reader.h" +#include "storage/segment/common.h" + +namespace doris { +class StorageReadOptions; +} + +namespace doris::segment_v2 { + +class OrdinalIndexReader; + +enum class FileAccessRangeBuildMethod : int { FROM_ROWIDS = 0, ALL_DATA_PAGES = 1 }; + +struct SegmentCacheBlockPrefetchParams { + io::CacheBlockPrefetchPolicy policy; + const StorageReadOptions& read_options; +}; + +// Builds file access ranges from segment rowids and an ordinal index. +// +// The segment layer knows rowids, while CacheBlockAwarePrefetchRemoteReader only +// understands file ranges. This helper owns the ordinal-index walk that bridges +// the two representations: +// - add_ascending_rowids()/finish_by_rowids() consumes selected rowids and emits +// each touched data page as a FileAccessRange. +// - build_all_data_page_ranges() emits every data page for full-segment readers +// such as compaction. +// +// The builder deliberately returns page file ranges, not file-cache block ids. +// A data page may be larger than a file-cache block or may straddle multiple +// blocks. CacheBlockAwarePrefetchRemoteReader expands each [offset, offset+size) +// range into every covered file-cache block and deduplicates them there. Prefetch +// progress is triggered by the file offset of the page being read, not by row +// ordinal, so this helper keeps rowid handling entirely inside the segment layer. +class SegmentFileAccessRangeBuilder { +public: + SegmentFileAccessRangeBuilder(OrdinalIndexReader* ordinal_index, + io::CacheBlockReadDirection direction); + + void reset(); + // Rowids must be ascending by segment ordinal. Reverse scans still feed + // ascending rowids from the bitmap and only reverse the produced file ranges + // when finish_by_rowids() is called. + void add_ascending_rowids(std::span rowids); + std::vector finish_by_rowids(); + std::vector build_all_data_page_ranges(); + + static void add_rowids_from_bitmap(const roaring::Roaring& row_bitmap, + std::span builders); + +private: + void _append_page_access_range(int page_index); + void _reverse_if_backward(); + bool _is_forward() const { return _direction == io::CacheBlockReadDirection::FORWARD; } + + OrdinalIndexReader* _ordinal_index = nullptr; + io::CacheBlockReadDirection _direction = io::CacheBlockReadDirection::FORWARD; + std::vector _access_ranges; + + int _next_page_hint = 0; + std::optional _pending_page_index; +}; + +} // namespace doris::segment_v2 diff --git a/be/src/storage/segment/segment_iterator.cpp b/be/src/storage/segment/segment_iterator.cpp index 47992c592da387..46b78d5389e6f4 100644 --- a/be/src/storage/segment/segment_iterator.cpp +++ b/be/src/storage/segment/segment_iterator.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -65,7 +66,6 @@ #include "exprs/virtual_slot_ref.h" #include "exprs/vliteral.h" #include "exprs/vslot_ref.h" -#include "io/cache/cached_remote_file_reader.h" #include "io/fs/file_reader.h" #include "io/io_common.h" #include "runtime/query_context.h" @@ -99,7 +99,7 @@ #include "storage/segment/condition_cache.h" #include "storage/segment/row_ranges.h" #include "storage/segment/segment.h" -#include "storage/segment/segment_prefetcher.h" +#include "storage/segment/segment_file_access_range_builder.h" #include "storage/segment/variant/variant_column_reader.h" #include "storage/segment/virtual_column_iterator.h" #include "storage/tablet/tablet_schema.h" @@ -589,13 +589,13 @@ Status SegmentIterator::_lazy_init(Block* block) { _lazy_inited = true; - _init_segment_prefetchers(); + _init_cache_block_prefetch(); return Status::OK(); } -void SegmentIterator::_init_segment_prefetchers() { - SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_segment_prefetchers_timer_ns); +void SegmentIterator::_init_cache_block_prefetch() { + SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_cache_block_prefetch_timer_ns); if (!config::is_cloud_mode()) { return; } @@ -606,12 +606,12 @@ void SegmentIterator::_init_segment_prefetchers() { [&](ReaderType t) { return _opts.io_ctx.reader_type == t; })) { return; } - // Initialize segment prefetcher for predicate and non-predicate columns bool is_query = (_opts.io_ctx.reader_type == ReaderType::READER_QUERY); bool enable_prefetch = is_query ? config::enable_query_segment_file_cache_prefetch : config::enable_compaction_segment_file_cache_prefetch; LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( - "[verbose] SegmentIterator _init_segment_prefetchers, is_query={}, enable_prefetch={}, " + "[verbose] SegmentIterator _init_cache_block_prefetch, is_query={}, " + "enable_prefetch={}, " "_row_bitmap.isEmpty()={}, row_bitmap.cardinality()={}, tablet={}, rowset={}, " "segment={}, predicate_column_ids={}, common_expr_column_ids={}", is_query, enable_prefetch, _row_bitmap.isEmpty(), _row_bitmap.cardinality(), @@ -625,27 +625,36 @@ void SegmentIterator::_init_segment_prefetchers() { "[verbose] SegmentIterator prefetch config: window_size={}", window_size); if (window_size > 0 && !_column_iterators.empty()) { // ensure init_iterators has been called - SegmentPrefetcherConfig prefetch_config(window_size, - config::file_cache_each_block_size); + // This runs after segment open, index pruning, and iterator initialization. At this + // point _row_bitmap describes the candidate rows left by segment-level indexes. For + // predicate columns, those candidates are definitely read to evaluate predicates; for + // non-predicate/common-expression columns under lazy materialization, the final rowids + // are produced batch by batch after predicate filtering. Each physical column iterator + // owns an independent CacheBlockAwarePrefetchRemoteReader, so the installed pattern is + // consumed automatically by read_at() using the current file offset. + io::CacheBlockPrefetchPolicy prefetch_policy { + .max_prefetch_blocks = cast_set(window_size), + .cache_block_size = cast_set(config::file_cache_each_block_size), + }; for (auto cid : _schema->column_ids()) { auto& column_iter = _column_iterators[cid]; if (column_iter == nullptr) { continue; } const auto* tablet_column = _schema->column(cid); - SegmentPrefetchParams params { - .config = prefetch_config, + SegmentCacheBlockPrefetchParams params { + .policy = prefetch_policy, .read_options = _opts, }; LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( - "[verbose] SegmentIterator init_segment_prefetchers, " + "[verbose] SegmentIterator init_cache_block_prefetch, " "tablet={}, rowset={}, segment={}, column_id={}, col_name={}, type={}", _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), cid, tablet_column->name(), tablet_column->type()); - Status st = column_iter->init_prefetcher(params); + Status st = column_iter->init_cache_block_prefetch(params); if (!st.ok()) { LOG_IF(WARNING, config::enable_segment_prefetch_verbose_log) << fmt::format( - "[verbose] failed to init prefetcher for column_id={}, " + "[verbose] failed to init cache block prefetch for column_id={}, " "tablet={}, rowset={}, segment={}, error={}", cid, _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), st.to_string()); @@ -653,22 +662,84 @@ void SegmentIterator::_init_segment_prefetchers() { } // for compaction, it's guaranteed that all rows are read, so we can prefetch all data blocks - PrefetcherInitMethod init_method = (is_query && _row_bitmap.cardinality() < num_rows()) - ? PrefetcherInitMethod::FROM_ROWIDS - : PrefetcherInitMethod::ALL_DATA_BLOCKS; - std::map> prefetchers; - for (const auto& column_iter : _column_iterators) { - if (column_iter != nullptr) { - column_iter->collect_prefetchers(prefetchers, init_method); + FileAccessRangeBuildMethod init_method = + (is_query && _row_bitmap.cardinality() < num_rows()) + ? FileAccessRangeBuildMethod::FROM_ROWIDS + : FileAccessRangeBuildMethod::ALL_DATA_PAGES; + std::map> prefetch_iterators; + // Different columns in the same segment have independently predictable and monotonic + // data-page sequences. With prefetch enabled they no longer share the same + // CacheBlockAwarePrefetchRemoteReader; installing one pattern per physical iterator + // is enough, and the IO layer advances that pattern from subsequent read_at() calls. + // + // Predicate columns are special: the row ranges installed here are exactly the + // candidates that must be read to evaluate predicates for this segment. After their + // patterns are installed, we can immediately touch the first prefetch window before + // PageIO issues the first foreground read. Non-predicate/common-expression columns + // are different under lazy materialization: their exact rowids are produced batch by + // batch after predicate evaluation, so they keep the normal read_at()-triggered path. + auto is_predicate_column = [&](ColumnId cid) { + return std::ranges::find(_predicate_column_ids, cid) != _predicate_column_ids.end(); + }; + std::unordered_set initial_touch_iterators; + for (auto cid : _schema->column_ids()) { + auto& column_iter = _column_iterators[cid]; + if (column_iter == nullptr) { + continue; + } + std::map> + column_prefetch_iterators; + column_iter->collect_cache_block_prefetch_iterators(column_prefetch_iterators, + init_method); + for (auto& [method, iterators] : column_prefetch_iterators) { + if (is_predicate_column(cid)) { + initial_touch_iterators.insert(iterators.begin(), iterators.end()); + } + prefetch_iterators[method].insert(prefetch_iterators[method].end(), + iterators.begin(), iterators.end()); } } - for (auto& [method, prefetcher_vec] : prefetchers) { - if (method == PrefetcherInitMethod::ALL_DATA_BLOCKS) { - for (auto* prefetcher : prefetcher_vec) { - prefetcher->build_all_data_blocks(); + + for (auto& [method, iterators] : prefetch_iterators) { + if (method == FileAccessRangeBuildMethod::ALL_DATA_PAGES) { + for (auto* iterator : iterators) { + auto* builder = iterator->cache_block_prefetch_range_builder(); + DCHECK(builder != nullptr); + Status st = iterator->install_cache_block_prefetch_pattern( + builder->build_all_data_page_ranges()); + LOG_IF(WARNING, !st.ok()) << fmt::format( + "failed to install cache block prefetch pattern, tablet={}, " + "rowset={}, segment={}, error={}", + _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), + st.to_string()); + if (st.ok() && initial_touch_iterators.contains(iterator)) { + iterator->async_touch_cache_block_prefetch_initial_window(); + } + } + } else if (method == FileAccessRangeBuildMethod::FROM_ROWIDS && + !iterators.empty()) { + std::vector builders; + builders.reserve(iterators.size()); + for (auto* iterator : iterators) { + auto* builder = iterator->cache_block_prefetch_range_builder(); + DCHECK(builder != nullptr); + builders.emplace_back(builder); + } + SegmentFileAccessRangeBuilder::add_rowids_from_bitmap(_row_bitmap, builders); + for (auto* iterator : iterators) { + auto* builder = iterator->cache_block_prefetch_range_builder(); + DCHECK(builder != nullptr); + Status st = iterator->install_cache_block_prefetch_pattern( + builder->finish_by_rowids()); + LOG_IF(WARNING, !st.ok()) << fmt::format( + "failed to install cache block prefetch pattern, tablet={}, " + "rowset={}, segment={}, error={}", + _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), + st.to_string()); + if (st.ok() && initial_touch_iterators.contains(iterator)) { + iterator->async_touch_cache_block_prefetch_initial_window(); + } } - } else if (method == PrefetcherInitMethod::FROM_ROWIDS && !prefetcher_vec.empty()) { - SegmentPrefetcher::build_blocks_by_rowids(_row_bitmap, prefetcher_vec); } } } diff --git a/be/src/storage/segment/segment_iterator.h b/be/src/storage/segment/segment_iterator.h index 3852cf8743d25b..4756ca53b54f9d 100644 --- a/be/src/storage/segment/segment_iterator.h +++ b/be/src/storage/segment/segment_iterator.h @@ -344,7 +344,7 @@ class SegmentIterator : public RowwiseIterator { void _init_row_bitmap_by_condition_cache(); - void _init_segment_prefetchers(); + void _init_cache_block_prefetch(); class BitmapRangeIterator; class BackwardBitmapRangeIterator; diff --git a/be/src/storage/segment/segment_prefetcher.cpp b/be/src/storage/segment/segment_prefetcher.cpp deleted file mode 100644 index 2405066d2e5e1d..00000000000000 --- a/be/src/storage/segment/segment_prefetcher.cpp +++ /dev/null @@ -1,262 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "storage/segment/segment_prefetcher.h" - -#include -#include - -#include "common/config.h" -#include "common/logging.h" -#include "storage/index/ordinal_page_index.h" -#include "storage/iterators.h" -#include "storage/segment/column_reader.h" - -namespace doris::segment_v2 { - -void SegmentPrefetcher::add_rowids(const rowid_t* rowids, uint32_t num) { - if (ordinal_index == nullptr) { - return; - } - const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i - const auto& pages = ordinal_index->_pages; // pages[i] = page pointer of page i - const int num_pages = ordinal_index->_num_pages; - for (uint32_t i = 0; i < num; ++i) { - rowid_t rowid = rowids[i]; - - if (_is_forward) { - while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) { - page_idx++; - } - - const auto& page = pages[page_idx]; - size_t page_start_block = _offset_to_block_id(page.offset); - size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1); - - // If page spans two blocks, assign it to the next block (page_end_block) - size_t block_id = - (page_start_block != page_end_block) ? page_end_block : page_start_block; - - if (block_id != last_block_id) { - if (last_block_id != static_cast(-1)) { - _block_sequence.emplace_back(last_block_id, current_block_first_rowid); - } - last_block_id = block_id; - current_block_first_rowid = rowid; - } - } else { - // Backward reading: we need the last rowid in each block as the "first" rowid - // (because when reading backwards, we encounter the largest rowid first) - // - // Strategy: iterate forward through bitmap, but for each block, - // keep updating current_block_first_rowid to the latest (largest) rowid in that block - while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) { - page_idx++; - } - size_t block_id = _offset_to_block_id(pages[page_idx].offset); - - if (block_id != last_block_id) { - if (last_block_id != static_cast(-1)) { - _block_sequence.emplace_back(last_block_id, current_block_first_rowid); - } - last_block_id = block_id; - } - current_block_first_rowid = rowid; - } - } -} - -void SegmentPrefetcher::build_all_data_blocks() { - if (ordinal_index == nullptr) { - return; - } - reset_blocks(); - const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i - const auto& pages = ordinal_index->_pages; // pages[i] = page pointer of page i - const int num_pages = ordinal_index->_num_pages; - - last_block_id = static_cast(-1); - current_block_first_rowid = 0; - - for (page_idx = 0; page_idx < num_pages; ++page_idx) { - const auto& page = pages[page_idx]; - - if (_is_forward) { - size_t page_start_block = _offset_to_block_id(page.offset); - size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1); - - // If page spans two blocks, assign it to the next block (page_end_block) - size_t block_id = - (page_start_block != page_end_block) ? page_end_block : page_start_block; - - if (block_id != last_block_id) { - if (last_block_id != static_cast(-1)) { - _block_sequence.emplace_back(last_block_id, current_block_first_rowid); - } - last_block_id = block_id; - current_block_first_rowid = static_cast(ordinals[page_idx]); - } - } else { - // Backward: use the last ordinal in each block as first_rowid - size_t block_id = _offset_to_block_id(page.offset); - if (block_id != last_block_id) { - if (last_block_id != static_cast(-1)) { - _block_sequence.emplace_back(last_block_id, current_block_first_rowid); - } - last_block_id = block_id; - } - current_block_first_rowid = static_cast(ordinals[page_idx]); - } - } - - // Add the last block - if (last_block_id != static_cast(-1)) { - _block_sequence.emplace_back(last_block_id, current_block_first_rowid); - } - - // Reverse for backward reading - if (!_is_forward && !_block_sequence.empty()) { - std::ranges::reverse(_block_sequence); - } -} - -void SegmentPrefetcher::build_blocks_by_rowids(const roaring::Roaring& row_bitmap, - const std::vector& prefetchers) { - for (auto* prefetcher : prefetchers) { - prefetcher->begin_build_blocks_by_rowids(); - } - - int batch_size = config::segment_file_cache_consume_rowids_batch_size; - std::vector rowids(batch_size); - roaring::api::roaring_uint32_iterator_t iter; - roaring::api::roaring_init_iterator(&row_bitmap.roaring, &iter); - uint32_t num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size); - - for (; num > 0; - num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size)) { - for (auto* prefetcher : prefetchers) { - prefetcher->add_rowids(rowids.data(), num); - } - } - - for (auto* prefetcher : prefetchers) { - prefetcher->finish_build_blocks_by_rowids(); - } -} - -void SegmentPrefetcher::begin_build_blocks_by_rowids() { - reset_blocks(); - page_idx = 0; -} - -void SegmentPrefetcher::finish_build_blocks_by_rowids() { - if (ordinal_index == nullptr) { - return; - } - if (last_block_id != static_cast(-1)) { - _block_sequence.emplace_back(last_block_id, current_block_first_rowid); - } - - if (!_is_forward && !_block_sequence.empty()) { - std::ranges::reverse(_block_sequence); - } - - LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( - "[verbose] SegmentPrefetcher initialized with block count={}, is_forward={}, " - "num_pages={}, path={}, blocks: (block_id, first_rowid)=[{}]", - _block_sequence.size(), _is_forward, ordinal_index->_num_pages, _path, - fmt::join(_block_sequence | std::views::transform([](const auto& b) { - return fmt::format("({}, {})", b.block_id, b.first_rowid); - }), - ",")); -} - -void SegmentPrefetcher::reset_blocks() { - _block_sequence.clear(); - _current_block_index = 0; - _prefetched_index = -1; -} - -Status SegmentPrefetcher::init(std::shared_ptr column_reader, - const StorageReadOptions& read_options) { - DCHECK(column_reader != nullptr); - - reset_blocks(); - _is_forward = !read_options.read_orderby_key_reverse; - _path = column_reader->_file_reader->path().filename().native(); - - RETURN_IF_ERROR(column_reader->get_ordinal_index_reader(ordinal_index, read_options.stats)); - return Status::OK(); -} - -bool SegmentPrefetcher::need_prefetch(rowid_t current_rowid, std::vector* out_ranges) { - DCHECK(out_ranges != nullptr); - LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) - << fmt::format("[verbose] SegmentPrefetcher need_prefetch enter current_rowid={}, {}", - current_rowid, debug_string()); - if (_block_sequence.empty() || - _prefetched_index >= static_cast(_block_sequence.size()) - 1) { - return false; - } - - LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( - "[verbose] SegmentPrefetcher need_prefetch called with current_rowid={}, {}, " - "block=(id={}, first_rowid={})", - current_rowid, debug_string(), _block_sequence[_current_block_index].block_id, - _block_sequence[_current_block_index].first_rowid); - if (_is_forward) { - while (_current_block_index + 1 < _block_sequence.size() && - _block_sequence[_current_block_index + 1].first_rowid <= current_rowid) { - _current_block_index++; - } - } else { - while (_current_block_index + 1 < _block_sequence.size() && - _block_sequence[_current_block_index + 1].first_rowid >= current_rowid) { - _current_block_index++; - } - } - - out_ranges->clear(); - // for non-predicate column, some rowids in row_bitmap may be filtered out after vec evaluation of predicate columns, - // so we should not prefetch for these rows - _prefetched_index = std::max(_prefetched_index, _current_block_index - 1); - while (_prefetched_index + 1 < _block_sequence.size() && - window_size() < _config.prefetch_window_size) { - out_ranges->push_back(_block_id_to_range(_block_sequence[++_prefetched_index].block_id)); - } - - LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( - "[verbose] SegmentPrefetcher need_prefetch after calc with current_rowid={}, {}, " - "block=(id={}, first_rowid={})", - current_rowid, debug_string(), _block_sequence[_current_block_index].block_id, - _block_sequence[_current_block_index].first_rowid); - - bool triggered = !out_ranges->empty(); - if (triggered) { - LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( - "[verbose] SegmentPrefetcher prefetch triggered at rowid={}, {}, prefetch {} " - "blocks: (offset, size)=[{}]", - current_rowid, debug_string(), out_ranges->size(), - fmt::join(*out_ranges | std::views::transform([](const auto& b) { - return fmt::format("({}, {})", b.offset, b.size); - }), - ",")); - } - return triggered; -} - -} // namespace doris::segment_v2 diff --git a/be/src/storage/segment/segment_prefetcher.h b/be/src/storage/segment/segment_prefetcher.h deleted file mode 100644 index 659f80c3b0d40d..00000000000000 --- a/be/src/storage/segment/segment_prefetcher.h +++ /dev/null @@ -1,154 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include - -#include "common/status.h" -#include "storage/segment/common.h" - -namespace doris { -namespace io { -class FileReader; -} // namespace io -class StorageReadOptions; - -namespace segment_v2 { -class OrdinalIndexReader; -class ColumnReader; - -enum class PrefetcherInitMethod : int { FROM_ROWIDS = 0, ALL_DATA_BLOCKS = 1 }; - -// Configuration for segment prefetcher -struct SegmentPrefetcherConfig { - // Number of file cache blocks to prefetch ahead - size_t prefetch_window_size = 4; - - // File cache block size in bytes (default 1MB) - size_t block_size = 1024 * 1024; - - SegmentPrefetcherConfig(size_t window_size, size_t blk_size) - : prefetch_window_size(window_size), block_size(blk_size) {} -}; - -// Block range representing [offset, offset + size) in the segment file -struct BlockRange { - uint64_t offset; - uint64_t size; - - BlockRange(uint64_t off, uint64_t sz) : offset(off), size(sz) {} - - bool operator==(const BlockRange& other) const { - return offset == other.offset && size == other.size; - } -}; - -// Represents a block with its first rowid for reading -struct BlockInfo { - size_t block_id; - rowid_t first_rowid; - - BlockInfo(size_t bid, rowid_t rid) : block_id(bid), first_rowid(rid) {} -}; - -struct SegmentPrefetchParams { - SegmentPrefetcherConfig config; - const StorageReadOptions& read_options; -}; - -// SegmentPrefetcher maintains block sequence and triggers prefetch to keep -// N blocks ahead of current reading position. -// -// Key design: -// - Monotonic reading: rowids are read in order (forward or backward) -// - Trigger condition: when current_rowid reaches a block boundary, prefetch next N blocks -// - No deduplication needed: reading is monotonic, blocks are naturally processed in order -class SegmentPrefetcher { -public: - explicit SegmentPrefetcher(const SegmentPrefetcherConfig& config) : _config(config) {} - - ~SegmentPrefetcher() = default; - - Status init(std::shared_ptr column_reader, - const StorageReadOptions& read_options); - - bool need_prefetch(rowid_t current_rowid, std::vector* out_ranges); - - static void build_blocks_by_rowids(const roaring::Roaring& row_bitmap, - const std::vector& prefetchers); - void begin_build_blocks_by_rowids(); - void add_rowids(const rowid_t* rowids, uint32_t num); - void finish_build_blocks_by_rowids(); - - void build_all_data_blocks(); - -private: - // Parameters: - // row_bitmap: The complete bitmap of rowids to scan - // ordinal_index: Ordinal index reader (must be loaded) - // - // For forward reading: first_rowid is the first rowid we need to read in each block - // For backward reading: first_rowid is the last rowid we need to read in each block - // (since we read backwards, this is the first one we'll encounter) - void _build_block_sequence_from_bitmap(const roaring::Roaring& row_bitmap, - OrdinalIndexReader* ordinal_index); - size_t _offset_to_block_id(uint64_t offset) const { return offset / _config.block_size; } - - BlockRange _block_id_to_range(size_t block_id) const { - return {block_id * _config.block_size, _config.block_size}; - } - - int window_size() const { return _prefetched_index - _current_block_index + 1; } - - std::string debug_string() const { - return fmt::format( - "[internal state] _is_forward={}, _prefetched_index={}, _current_block_index={}, " - "window_size={}, block.size()={}, path={}", - _is_forward, _prefetched_index, _current_block_index, window_size(), - _block_sequence.size(), _path); - } - - void reset_blocks(); - -private: - SegmentPrefetcherConfig _config; - std::string _path; - - // Sequence of blocks with their first rowid (in reading order) - std::vector _block_sequence; - - bool _is_forward = true; - - int _prefetched_index = -1; - int _current_block_index = 0; - - int page_idx = 0; - // For each page, track the first rowid we need to read - // For forward: the smallest rowid in this page - // For backward: the largest rowid in this page (first one we'll encounter when reading backwards) - size_t last_block_id = static_cast(-1); - rowid_t current_block_first_rowid = 0; - - OrdinalIndexReader* ordinal_index = nullptr; -}; - -} // namespace segment_v2 -} // namespace doris diff --git a/be/src/storage/segment/variant/hierarchical_data_iterator.cpp b/be/src/storage/segment/variant/hierarchical_data_iterator.cpp index 052f231b27e68c..df708c4bfc1f72 100644 --- a/be/src/storage/segment/variant/hierarchical_data_iterator.cpp +++ b/be/src/storage/segment/variant/hierarchical_data_iterator.cpp @@ -166,36 +166,38 @@ ordinal_t HierarchicalDataIterator::get_current_ordinal() const { return (*_substream_reader.begin())->data.iterator->get_current_ordinal(); } -Status HierarchicalDataIterator::init_prefetcher(const SegmentPrefetchParams& params) { +Status HierarchicalDataIterator::init_cache_block_prefetch( + const SegmentCacheBlockPrefetchParams& params) { RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) { - RETURN_IF_ERROR(node.data.iterator->init_prefetcher(params)); + RETURN_IF_ERROR(node.data.iterator->init_cache_block_prefetch(params)); return Status::OK(); })); if (_root_reader) { DCHECK(_root_reader->inited); - RETURN_IF_ERROR(_root_reader->iterator->init_prefetcher(params)); + RETURN_IF_ERROR(_root_reader->iterator->init_cache_block_prefetch(params)); } if (_binary_column_reader) { DCHECK(_binary_column_reader->inited); - RETURN_IF_ERROR(_binary_column_reader->iterator->init_prefetcher(params)); + RETURN_IF_ERROR(_binary_column_reader->iterator->init_cache_block_prefetch(params)); } return Status::OK(); } -void HierarchicalDataIterator::collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) { +void HierarchicalDataIterator::collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) { static_cast(tranverse([&](SubstreamReaderTree::Node& node) { - node.data.iterator->collect_prefetchers(prefetchers, init_method); + node.data.iterator->collect_cache_block_prefetch_iterators(iterators, init_method); return Status::OK(); })); if (_root_reader) { DCHECK(_root_reader->inited); - _root_reader->iterator->collect_prefetchers(prefetchers, init_method); + _root_reader->iterator->collect_cache_block_prefetch_iterators(iterators, init_method); } if (_binary_column_reader) { DCHECK(_binary_column_reader->inited); - _binary_column_reader->iterator->collect_prefetchers(prefetchers, init_method); + _binary_column_reader->iterator->collect_cache_block_prefetch_iterators(iterators, + init_method); } } diff --git a/be/src/storage/segment/variant/hierarchical_data_iterator.h b/be/src/storage/segment/variant/hierarchical_data_iterator.h index 3e3816736a4851..d514886af454a7 100644 --- a/be/src/storage/segment/variant/hierarchical_data_iterator.h +++ b/be/src/storage/segment/variant/hierarchical_data_iterator.h @@ -89,10 +89,10 @@ class HierarchicalDataIterator : public ColumnIterator { Status add_stream(int32_t col_uid, const SubcolumnColumnMetaInfo::Node* node, ColumnReaderCache* column_reader_cache, OlapReaderStatistics* stats); - Status init_prefetcher(const SegmentPrefetchParams& params) override; - void collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) override; + Status init_cache_block_prefetch(const SegmentCacheBlockPrefetchParams& params) override; + void collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) override; private: SubstreamReaderTree _substream_reader; diff --git a/be/src/storage/segment/variant/variant_column_reader.cpp b/be/src/storage/segment/variant/variant_column_reader.cpp index d41775581bf2e5..6eaec06df02a5b 100644 --- a/be/src/storage/segment/variant/variant_column_reader.cpp +++ b/be/src/storage/segment/variant/variant_column_reader.cpp @@ -129,14 +129,26 @@ class ReaderOwnedColumnIterator final : public ColumnIterator { void remove_pruned_sub_iterators() override { _inner->remove_pruned_sub_iterators(); } - Status init_prefetcher(const SegmentPrefetchParams& params) override { - return _inner->init_prefetcher(params); + Status init_cache_block_prefetch(const SegmentCacheBlockPrefetchParams& params) override { + return _inner->init_cache_block_prefetch(params); } - void collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) override { - _inner->collect_prefetchers(prefetchers, init_method); + void collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) override { + _inner->collect_cache_block_prefetch_iterators(iterators, init_method); + } + + SegmentFileAccessRangeBuilder* cache_block_prefetch_range_builder() override { + return _inner->cache_block_prefetch_range_builder(); + } + + Status install_cache_block_prefetch_pattern(std::vector ranges) override { + return _inner->install_cache_block_prefetch_pattern(std::move(ranges)); + } + + void async_touch_cache_block_prefetch_initial_window() override { + _inner->async_touch_cache_block_prefetch_initial_window(); } private: @@ -1578,14 +1590,15 @@ Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const si return _process_root_column(dst, root_column, most_common_type); } -Status VariantRootColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { - return _inner_iter->init_prefetcher(params); +Status VariantRootColumnIterator::init_cache_block_prefetch( + const SegmentCacheBlockPrefetchParams& params) { + return _inner_iter->init_cache_block_prefetch(params); } -void VariantRootColumnIterator::collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) { - _inner_iter->collect_prefetchers(prefetchers, init_method); +void VariantRootColumnIterator::collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) { + _inner_iter->collect_cache_block_prefetch_iterators(iterators, init_method); } static void fill_nested_with_defaults(MutableColumnPtr& dst, MutableColumnPtr& sibling_column, diff --git a/be/src/storage/segment/variant/variant_column_reader.h b/be/src/storage/segment/variant/variant_column_reader.h index d692050e2b7478..2a671626ccad83 100644 --- a/be/src/storage/segment/variant/variant_column_reader.h +++ b/be/src/storage/segment/variant/variant_column_reader.h @@ -460,10 +460,10 @@ class VariantRootColumnIterator : public ColumnIterator { ordinal_t get_current_ordinal() const override { return _inner_iter->get_current_ordinal(); } - Status init_prefetcher(const SegmentPrefetchParams& params) override; - void collect_prefetchers( - std::map>& prefetchers, - PrefetcherInitMethod init_method) override; + Status init_cache_block_prefetch(const SegmentCacheBlockPrefetchParams& params) override; + void collect_cache_block_prefetch_iterators( + std::map>& iterators, + FileAccessRangeBuildMethod init_method) override; private: Status _process_root_column(MutableColumnPtr& dst, MutableColumnPtr& root_column, diff --git a/be/test/io/cache/cache_block_aware_prefetch_remote_reader_test.cpp b/be/test/io/cache/cache_block_aware_prefetch_remote_reader_test.cpp new file mode 100644 index 00000000000000..0aaa09507919f4 --- /dev/null +++ b/be/test/io/cache/cache_block_aware_prefetch_remote_reader_test.cpp @@ -0,0 +1,1154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/cache/cache_block_aware_prefetch_remote_reader.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "io/cache/block_file_cache_test_common.h" +#include "runtime/exec_env.h" +#include "storage/index/ordinal_page_index.h" +#include "storage/segment/page_pointer.h" +#include "storage/segment/segment_file_access_range_builder.h" +#include "util/defer_op.h" +#include "util/threadpool.h" + +namespace doris::io { + +using segment_v2::rowid_t; + +class CountingRemoteFileReader final : public FileReader { +public: + CountingRemoteFileReader(Path path, size_t size) : _path(std::move(path)), _size(size) {} + + Status close() override { + _closed = true; + return Status::OK(); + } + + const Path& path() const override { return _path; } + + size_t size() const override { return _size; } + + bool closed() const override { return _closed; } + + int64_t mtime() const override { return 0; } + + std::vector read_ranges() const { + std::lock_guard lock(_mutex); + return _read_ranges; + } + +protected: + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) override { + std::lock_guard lock(_mutex); + _read_ranges.push_back({offset, result.size}); + if (offset >= _size) { + *bytes_read = 0; + return Status::OK(); + } + *bytes_read = std::min(result.size, _size - offset); + for (size_t i = 0; i < *bytes_read; ++i) { + result.data[i] = static_cast('a' + ((offset + i) % 26)); + } + return Status::OK(); + } + +private: + Path _path; + size_t _size; + bool _closed = false; + mutable std::mutex _mutex; + std::vector _read_ranges; +}; + +bool wait_until(const std::function& pred) { + for (int i = 0; i < 200; ++i) { + if (pred()) { + return true; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + return false; +} + +std::unique_ptr make_ordinal_index_for_prefetch_test() { + auto reader = std::make_shared(Path("/tmp/ordinal_index_test"), 512); + auto ordinal_index = std::make_unique( + reader, 400, segment_v2::OrdinalIndexPB()); + ordinal_index->_num_pages = 4; + ordinal_index->_ordinals = {0, 100, 200, 300, 400}; + ordinal_index->_pages = { + segment_v2::PagePointer(0, 50), + segment_v2::PagePointer(100, 50), + segment_v2::PagePointer(200, 50), + segment_v2::PagePointer(300, 50), + }; + return ordinal_index; +} + +void expect_plan_entry(std::span entries, size_t index, + size_t cache_block_offset, size_t trigger_offset, size_t trigger_size) { + ASSERT_LT(index, entries.size()); + const auto& entry = entries[index]; + EXPECT_EQ(entry.cache_block_range.offset, cache_block_offset); + EXPECT_EQ(entry.trigger_file_range.offset, trigger_offset); + EXPECT_EQ(entry.trigger_file_range.size, trigger_size); +} + +void expect_plan_cache_block_offsets(std::span entries, + std::initializer_list expected_offsets) { + ASSERT_EQ(entries.size(), expected_offsets.size()); + size_t index = 0; + for (const size_t expected_offset : expected_offsets) { + EXPECT_EQ(entries[index].cache_block_range.offset, expected_offset) << "index=" << index; + ++index; + } +} + +void expect_cache_block_offsets(const std::vector& ranges, + std::initializer_list expected_offsets) { + ASSERT_EQ(ranges.size(), expected_offsets.size()); + size_t index = 0; + for (const size_t expected_offset : expected_offsets) { + EXPECT_EQ(ranges[index].offset, expected_offset) << "index=" << index; + ++index; + } +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, segment_file_access_range_builder_from_rowids) { + auto ordinal_index = make_ordinal_index_for_prefetch_test(); + segment_v2::SegmentFileAccessRangeBuilder builder(ordinal_index.get(), + CacheBlockReadDirection::FORWARD); + std::vector rowids {5, 30, 115, 250}; + builder.add_ascending_rowids(rowids); + + auto ranges = builder.finish_by_rowids(); + + ASSERT_EQ(ranges.size(), 3); + EXPECT_EQ(ranges[0].offset, 0); + EXPECT_EQ(ranges[0].size, 50); + EXPECT_EQ(ranges[1].offset, 100); + EXPECT_EQ(ranges[2].offset, 200); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + segment_file_access_range_builder_from_rowids_backward) { + auto ordinal_index = make_ordinal_index_for_prefetch_test(); + segment_v2::SegmentFileAccessRangeBuilder builder(ordinal_index.get(), + CacheBlockReadDirection::BACKWARD); + std::vector rowids {5, 30, 115, 250}; + builder.add_ascending_rowids(rowids); + + auto ranges = builder.finish_by_rowids(); + + ASSERT_EQ(ranges.size(), 3); + EXPECT_EQ(ranges[0].offset, 200); + EXPECT_EQ(ranges[1].offset, 100); + EXPECT_EQ(ranges[2].offset, 0); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, segment_file_access_range_builder_all_data) { + auto ordinal_index = make_ordinal_index_for_prefetch_test(); + segment_v2::SegmentFileAccessRangeBuilder builder(ordinal_index.get(), + CacheBlockReadDirection::BACKWARD); + + auto ranges = builder.build_all_data_page_ranges(); + + ASSERT_EQ(ranges.size(), 4); + EXPECT_EQ(ranges[0].offset, 300); + EXPECT_EQ(ranges[1].offset, 200); + EXPECT_EQ(ranges[2].offset, 100); + EXPECT_EQ(ranges[3].offset, 0); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + segment_file_access_range_builder_preserves_large_page_range) { + auto reader = + std::make_shared(Path("/tmp/large_page_ordinal_index"), 512); + auto ordinal_index = std::make_unique( + reader, 200, segment_v2::OrdinalIndexPB()); + ordinal_index->_num_pages = 2; + ordinal_index->_ordinals = {0, 100, 200}; + ordinal_index->_pages = { + segment_v2::PagePointer(50, 260), + segment_v2::PagePointer(400, 40), + }; + segment_v2::SegmentFileAccessRangeBuilder builder(ordinal_index.get(), + CacheBlockReadDirection::FORWARD); + std::vector rowids {10, 120}; + builder.add_ascending_rowids(rowids); + + auto ranges = builder.finish_by_rowids(); + + ASSERT_EQ(ranges.size(), 2); + EXPECT_EQ(ranges[0].offset, 50); + EXPECT_EQ(ranges[0].size, 260); + EXPECT_EQ(ranges[1].offset, 400); + EXPECT_EQ(ranges[1].size, 40); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + segment_file_access_range_builder_keeps_many_sparse_pages_across_large_span) { + auto reader = std::make_shared(Path("/tmp/sparse_page_ordinal_index"), + 128_mb); + auto ordinal_index = std::make_unique( + reader, 120, segment_v2::OrdinalIndexPB()); + ordinal_index->_num_pages = 12; + for (rowid_t ordinal = 0; ordinal <= 120; ordinal += 10) { + ordinal_index->_ordinals.push_back(ordinal); + } + ordinal_index->_pages = { + segment_v2::PagePointer(64_kb, 32_kb), + segment_v2::PagePointer(1_mb + 128_kb, 64_kb), + segment_v2::PagePointer(4_mb + 900_kb, 300_kb), + segment_v2::PagePointer(9_mb + 16_kb, 1_mb + 128_kb), + segment_v2::PagePointer(16_mb + 512_kb, 64_kb), + segment_v2::PagePointer(31_mb + 768_kb, 2_mb + 512_kb), + segment_v2::PagePointer(48_mb + 128_kb, 32_kb), + segment_v2::PagePointer(64_mb + 900_kb, 300_kb), + segment_v2::PagePointer(80_mb + 32_kb, 64_kb), + segment_v2::PagePointer(96_mb + 512_kb, 1_mb + 1), + segment_v2::PagePointer(112_mb + 128_kb, 128_kb), + segment_v2::PagePointer(127_mb, 512_kb), + }; + std::vector rowids {0, 1, 22, 35, 36, 58, 77, 78, 94, 119}; + + segment_v2::SegmentFileAccessRangeBuilder forward_builder(ordinal_index.get(), + CacheBlockReadDirection::FORWARD); + forward_builder.add_ascending_rowids(rowids); + auto forward_ranges = forward_builder.finish_by_rowids(); + ASSERT_EQ(forward_ranges.size(), 7); + EXPECT_EQ(forward_ranges[0].offset, 64_kb); + EXPECT_EQ(forward_ranges[0].size, 32_kb); + EXPECT_EQ(forward_ranges[1].offset, 4_mb + 900_kb); + EXPECT_EQ(forward_ranges[1].size, 300_kb); + EXPECT_EQ(forward_ranges[2].offset, 9_mb + 16_kb); + EXPECT_EQ(forward_ranges[2].size, 1_mb + 128_kb); + EXPECT_EQ(forward_ranges[3].offset, 31_mb + 768_kb); + EXPECT_EQ(forward_ranges[3].size, 2_mb + 512_kb); + EXPECT_EQ(forward_ranges[4].offset, 64_mb + 900_kb); + EXPECT_EQ(forward_ranges[4].size, 300_kb); + EXPECT_EQ(forward_ranges[5].offset, 96_mb + 512_kb); + EXPECT_EQ(forward_ranges[5].size, 1_mb + 1); + EXPECT_EQ(forward_ranges[6].offset, 127_mb); + EXPECT_EQ(forward_ranges[6].size, 512_kb); + + segment_v2::SegmentFileAccessRangeBuilder backward_builder(ordinal_index.get(), + CacheBlockReadDirection::BACKWARD); + backward_builder.add_ascending_rowids(rowids); + auto backward_ranges = backward_builder.finish_by_rowids(); + ASSERT_EQ(backward_ranges.size(), forward_ranges.size()); + for (size_t i = 0; i < backward_ranges.size(); ++i) { + EXPECT_EQ(backward_ranges[i].offset, forward_ranges[forward_ranges.size() - i - 1].offset); + EXPECT_EQ(backward_ranges[i].size, forward_ranges[forward_ranges.size() - i - 1].size); + } +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + segment_file_access_range_builder_consumes_bitmap_for_multiple_builders) { + auto first_ordinal_index = make_ordinal_index_for_prefetch_test(); + segment_v2::SegmentFileAccessRangeBuilder first_builder(first_ordinal_index.get(), + CacheBlockReadDirection::FORWARD); + std::vector stale_rowids {5}; + first_builder.add_ascending_rowids(stale_rowids); + + auto reader = + std::make_shared(Path("/tmp/second_ordinal_index"), 4096); + auto second_ordinal_index = std::make_unique( + reader, 400, segment_v2::OrdinalIndexPB()); + second_ordinal_index->_num_pages = 3; + second_ordinal_index->_ordinals = {0, 150, 260, 400}; + second_ordinal_index->_pages = { + segment_v2::PagePointer(1000, 50), + segment_v2::PagePointer(2000, 50), + segment_v2::PagePointer(3000, 50), + }; + segment_v2::SegmentFileAccessRangeBuilder second_builder(second_ordinal_index.get(), + CacheBlockReadDirection::FORWARD); + + roaring::Roaring row_bitmap; + row_bitmap.add(5); + row_bitmap.add(160); + row_bitmap.add(270); + row_bitmap.add(350); + std::vector builders { + &first_builder, + &second_builder, + }; + segment_v2::SegmentFileAccessRangeBuilder::add_rowids_from_bitmap(row_bitmap, builders); + + auto first_ranges = first_builder.finish_by_rowids(); + ASSERT_EQ(first_ranges.size(), 4); + EXPECT_EQ(first_ranges[0].offset, 0); + EXPECT_EQ(first_ranges[1].offset, 100); + EXPECT_EQ(first_ranges[2].offset, 200); + EXPECT_EQ(first_ranges[3].offset, 300); + + auto second_ranges = second_builder.finish_by_rowids(); + ASSERT_EQ(second_ranges.size(), 3); + EXPECT_EQ(second_ranges[0].offset, 1000); + EXPECT_EQ(second_ranges[1].offset, 2000); + EXPECT_EQ(second_ranges[2].offset, 3000); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + segment_file_access_ranges_drive_prefetch_plan_and_cursor) { + auto reader = std::make_shared( + Path("/tmp/prefetch_chain_ordinal_index"), 512); + auto ordinal_index = std::make_unique( + reader, 200, segment_v2::OrdinalIndexPB()); + ordinal_index->_num_pages = 2; + ordinal_index->_ordinals = {0, 100, 200}; + ordinal_index->_pages = { + segment_v2::PagePointer(50, 260), + segment_v2::PagePointer(400, 40), + }; + segment_v2::SegmentFileAccessRangeBuilder builder(ordinal_index.get(), + CacheBlockReadDirection::FORWARD); + std::vector rowids {10, 120}; + builder.add_ascending_rowids(rowids); + + auto file_ranges = builder.finish_by_rowids(); + detail::CacheBlockPrefetchCursor cursor { + detail::CacheBlockPrefetchPlan::from_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = std::move(file_ranges), + }, + 100), + 2}; + + auto cache_block_ranges = cursor.next_touch_ranges(50); + ASSERT_EQ(cache_block_ranges.size(), 4); + EXPECT_EQ(cache_block_ranges[0].offset, 0); + EXPECT_EQ(cache_block_ranges[1].offset, 100); + EXPECT_EQ(cache_block_ranges[2].offset, 200); + EXPECT_EQ(cache_block_ranges[3].offset, 300); + + cache_block_ranges = cursor.next_touch_ranges(400); + ASSERT_EQ(cache_block_ranges.size(), 1); + EXPECT_EQ(cache_block_ranges[0].offset, 400); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, build_prefetch_plan_forward) { + CacheBlockReadPattern pattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 205, .size = 30}, + {.offset = 0, .size = 90}, + {.offset = 90, .size = 40}, + {.offset = 310, .size = 10}, + }, + }; + + auto plan = detail::CacheBlockPrefetchPlan::from_read_pattern(pattern, 100); + const auto entries = plan.entries(); + + ASSERT_EQ(entries.size(), 4); + expect_plan_entry(entries, 0, 0, 0, 90); + expect_plan_entry(entries, 1, 100, 90, 40); + expect_plan_entry(entries, 2, 200, 205, 30); + expect_plan_entry(entries, 3, 300, 310, 10); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, build_prefetch_plan_backward) { + CacheBlockReadPattern pattern { + .direction = CacheBlockReadDirection::BACKWARD, + .ranges = + { + {.offset = 0, .size = 20}, + {.offset = 250, .size = 80}, + {.offset = 90, .size = 120}, + }, + }; + + auto plan = detail::CacheBlockPrefetchPlan::from_read_pattern(pattern, 100); + const auto entries = plan.entries(); + + ASSERT_EQ(entries.size(), 4); + expect_plan_entry(entries, 0, 300, 250, 80); + expect_plan_entry(entries, 1, 200, 250, 80); + expect_plan_entry(entries, 2, 100, 90, 120); + expect_plan_entry(entries, 3, 0, 90, 120); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + build_prefetch_plan_expands_cross_block_ranges_and_ignores_duplicates) { + CacheBlockReadPattern pattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 50, .size = 260}, + {.offset = 100, .size = 100}, + {.offset = 400, .size = 100}, + {.offset = 500, .size = 0}, + }, + }; + + auto plan = detail::CacheBlockPrefetchPlan::from_read_pattern(pattern, 100); + const auto entries = plan.entries(); + + ASSERT_EQ(entries.size(), 5); + expect_plan_entry(entries, 0, 0, 50, 260); + expect_plan_entry(entries, 1, 100, 50, 260); + expect_plan_entry(entries, 2, 200, 50, 260); + expect_plan_entry(entries, 3, 300, 50, 260); + expect_plan_entry(entries, 4, 400, 400, 100); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + build_prefetch_plan_handles_many_sparse_pages_across_many_cache_blocks) { + CacheBlockReadPattern pattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 96_mb + 256_kb, .size = 128_kb}, + {.offset = 64_kb, .size = 128_kb}, + {.offset = 2_mb + 900_kb, .size = 300_kb}, + {.offset = 7_mb + 16_kb, .size = 64_kb}, + {.offset = 7_mb + 512_kb, .size = 128_kb}, + {.offset = 32_mb + 768_kb, .size = 8_mb + 512_kb}, + {.offset = 48_mb + 64_kb, .size = 0}, + }, + }; + + auto plan = + detail::CacheBlockPrefetchPlan::from_read_pattern(pattern, static_cast(1_mb)); + const auto entries = plan.entries(); + + expect_plan_cache_block_offsets(entries, { + 0, + 2_mb, + 3_mb, + 7_mb, + 32_mb, + 33_mb, + 34_mb, + 35_mb, + 36_mb, + 37_mb, + 38_mb, + 39_mb, + 40_mb, + 41_mb, + 96_mb, + }); + expect_plan_entry(entries, 0, 0, 64_kb, 128_kb); + expect_plan_entry(entries, 1, 2_mb, 2_mb + 900_kb, 300_kb); + expect_plan_entry(entries, 2, 3_mb, 2_mb + 900_kb, 300_kb); + expect_plan_entry(entries, 3, 7_mb, 7_mb + 16_kb, 64_kb); + expect_plan_entry(entries, 4, 32_mb, 32_mb + 768_kb, 8_mb + 512_kb); + expect_plan_entry(entries, 13, 41_mb, 32_mb + 768_kb, 8_mb + 512_kb); + expect_plan_entry(entries, 14, 96_mb, 96_mb + 256_kb, 128_kb); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + build_backward_prefetch_plan_handles_many_sparse_pages_across_many_cache_blocks) { + CacheBlockReadPattern pattern { + .direction = CacheBlockReadDirection::BACKWARD, + .ranges = + { + {.offset = 96_mb + 256_kb, .size = 128_kb}, + {.offset = 64_kb, .size = 128_kb}, + {.offset = 2_mb + 900_kb, .size = 300_kb}, + {.offset = 7_mb + 16_kb, .size = 64_kb}, + {.offset = 7_mb + 512_kb, .size = 128_kb}, + {.offset = 32_mb + 768_kb, .size = 8_mb + 512_kb}, + }, + }; + + auto plan = + detail::CacheBlockPrefetchPlan::from_read_pattern(pattern, static_cast(1_mb)); + const auto entries = plan.entries(); + + expect_plan_cache_block_offsets(entries, { + 96_mb, + 41_mb, + 40_mb, + 39_mb, + 38_mb, + 37_mb, + 36_mb, + 35_mb, + 34_mb, + 33_mb, + 32_mb, + 7_mb, + 3_mb, + 2_mb, + 0, + }); + expect_plan_entry(entries, 0, 96_mb, 96_mb + 256_kb, 128_kb); + expect_plan_entry(entries, 1, 41_mb, 32_mb + 768_kb, 8_mb + 512_kb); + expect_plan_entry(entries, 10, 32_mb, 32_mb + 768_kb, 8_mb + 512_kb); + expect_plan_entry(entries, 11, 7_mb, 7_mb + 512_kb, 128_kb); + expect_plan_entry(entries, 12, 3_mb, 2_mb + 900_kb, 300_kb); + expect_plan_entry(entries, 13, 2_mb, 2_mb + 900_kb, 300_kb); + expect_plan_entry(entries, 14, 0, 64_kb, 128_kb); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, build_prefetch_plan_backward_cross_block_range) { + CacheBlockReadPattern pattern { + .direction = CacheBlockReadDirection::BACKWARD, + .ranges = + { + {.offset = 50, .size = 260}, + {.offset = 0, .size = 50}, + }, + }; + + auto plan = detail::CacheBlockPrefetchPlan::from_read_pattern(pattern, 100); + const auto entries = plan.entries(); + + ASSERT_EQ(entries.size(), 4); + expect_plan_entry(entries, 0, 300, 50, 260); + expect_plan_entry(entries, 1, 200, 50, 260); + expect_plan_entry(entries, 2, 100, 50, 260); + expect_plan_entry(entries, 3, 0, 50, 260); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, prefetch_window_advances_without_duplicates) { + detail::CacheBlockPrefetchCursor cursor { + detail::CacheBlockPrefetchPlan::from_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 0, .size = 1}, + {.offset = 100, .size = 1}, + {.offset = 200, .size = 1}, + {.offset = 300, .size = 1}, + }, + }, + 100), + 2}; + + auto ranges = cursor.next_touch_ranges(0); + ASSERT_EQ(ranges.size(), 2); + EXPECT_EQ(ranges[0].offset, 0); + EXPECT_EQ(ranges[1].offset, 100); + + ranges = cursor.next_touch_ranges(0); + EXPECT_TRUE(ranges.empty()); + + ranges = cursor.next_touch_ranges(100); + ASSERT_EQ(ranges.size(), 1); + EXPECT_EQ(ranges[0].offset, 200); + + ranges = cursor.next_touch_ranges(300); + ASSERT_EQ(ranges.size(), 1); + EXPECT_EQ(ranges[0].offset, 300); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, initial_touch_window_advances_before_read_at) { + detail::CacheBlockPrefetchCursor cursor { + detail::CacheBlockPrefetchPlan::from_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 0, .size = 1}, + {.offset = 100, .size = 1}, + {.offset = 200, .size = 1}, + }, + }, + 100), + 2}; + + auto ranges = cursor.next_initial_touch_ranges(); + ASSERT_EQ(ranges.size(), 2); + EXPECT_EQ(ranges[0].offset, 0); + EXPECT_EQ(ranges[1].offset, 100); + + ranges = cursor.next_touch_ranges(0); + EXPECT_TRUE(ranges.empty()); + + ranges = cursor.next_touch_ranges(100); + ASSERT_EQ(ranges.size(), 1); + EXPECT_EQ(ranges[0].offset, 200); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, prefetch_window_does_not_split_large_file_range) { + detail::CacheBlockPrefetchCursor cursor { + detail::CacheBlockPrefetchPlan::from_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 50, .size = 260}, + {.offset = 400, .size = 100}, + }, + }, + 100), + 2}; + + auto ranges = cursor.next_touch_ranges(50); + ASSERT_EQ(ranges.size(), 4); + EXPECT_EQ(ranges[0].offset, 0); + EXPECT_EQ(ranges[1].offset, 100); + EXPECT_EQ(ranges[2].offset, 200); + EXPECT_EQ(ranges[3].offset, 300); + + ranges = cursor.next_touch_ranges(400); + ASSERT_EQ(ranges.size(), 1); + EXPECT_EQ(ranges[0].offset, 400); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + prefetch_cursor_handles_sparse_large_span_and_many_cache_blocks) { + detail::CacheBlockPrefetchCursor cursor { + detail::CacheBlockPrefetchPlan::from_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 0, .size = 1}, + {.offset = 5_mb + 64_kb, .size = 64_kb}, + {.offset = 20_mb + 256_kb, .size = 7_mb + 512_kb}, + {.offset = 40_mb + 32_kb, .size = 64_kb}, + {.offset = 41_mb + 64_kb, .size = 64_kb}, + {.offset = 88_mb + 128_kb, .size = 64_kb}, + }, + }, + static_cast(1_mb)), + 3}; + + auto ranges = cursor.next_touch_ranges(0); + expect_cache_block_offsets(ranges, { + 0, + 5_mb, + 20_mb, + 21_mb, + 22_mb, + 23_mb, + 24_mb, + 25_mb, + 26_mb, + 27_mb, + }); + + ranges = cursor.next_touch_ranges(20_mb + 256_kb); + EXPECT_TRUE(ranges.empty()); + + ranges = cursor.next_touch_ranges(40_mb + 32_kb); + expect_cache_block_offsets(ranges, { + 40_mb, + 41_mb, + 88_mb, + }); + + ranges = cursor.next_touch_ranges(88_mb + 128_kb); + EXPECT_TRUE(ranges.empty()); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + prefetch_window_triggers_when_read_starts_inside_file_range) { + detail::CacheBlockPrefetchCursor forward_cursor { + detail::CacheBlockPrefetchPlan::from_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = {{.offset = 50, .size = 260}}, + }, + 100), + 2}; + + auto ranges = forward_cursor.next_touch_ranges(180); + ASSERT_EQ(ranges.size(), 4); + EXPECT_EQ(ranges[0].offset, 0); + EXPECT_EQ(ranges[1].offset, 100); + EXPECT_EQ(ranges[2].offset, 200); + EXPECT_EQ(ranges[3].offset, 300); + + detail::CacheBlockPrefetchCursor backward_cursor { + detail::CacheBlockPrefetchPlan::from_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::BACKWARD, + .ranges = {{.offset = 50, .size = 260}}, + }, + 100), + 2}; + + ranges = backward_cursor.next_touch_ranges(180); + ASSERT_EQ(ranges.size(), 4); + EXPECT_EQ(ranges[0].offset, 300); + EXPECT_EQ(ranges[1].offset, 200); + EXPECT_EQ(ranges[2].offset, 100); + EXPECT_EQ(ranges[3].offset, 0); +} + +TEST(CacheBlockAwarePrefetchRemoteReaderTest, + backward_prefetch_window_advances_without_duplicates) { + detail::CacheBlockPrefetchCursor cursor { + detail::CacheBlockPrefetchPlan::from_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::BACKWARD, + .ranges = + { + {.offset = 300, .size = 1}, + {.offset = 200, .size = 1}, + {.offset = 100, .size = 1}, + {.offset = 0, .size = 1}, + }, + }, + 100), + 2}; + + auto ranges = cursor.next_touch_ranges(300); + ASSERT_EQ(ranges.size(), 2); + EXPECT_EQ(ranges[0].offset, 300); + EXPECT_EQ(ranges[1].offset, 200); + + ranges = cursor.next_touch_ranges(200); + ASSERT_EQ(ranges.size(), 1); + EXPECT_EQ(ranges[0].offset, 100); + + ranges = cursor.next_touch_ranges(0); + ASSERT_EQ(ranges.size(), 1); + EXPECT_EQ(ranges[0].offset, 0); +} + +TEST_F(BlockFileCacheTest, usage_example_read_at_automatically_prefetches_single_pattern) { + // This is the intended integration pattern for segment readers: + // create one cache-aware reader per physical iterator, install one monotonic + // file-offset pattern, then let ordinary read_at() calls advance prefetch + // automatically. No outer code needs to keep a pattern id or manually + // trigger cache-block prefetch before each page read. + std::string test_cache_base_path = caches_dir / "cache_block_prefetch_usage_example" / ""; + auto cleanup = [&] { + ExecEnv::GetInstance()->_segment_prefetch_thread_pool.reset(); + if (fs::exists(test_cache_base_path)) { + fs::remove_all(test_cache_base_path); + } + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; + }; + cleanup(); + Defer defer {cleanup}; + + fs::create_directories(test_cache_base_path); + FileCacheSettings settings; + settings.query_queue_size = 8_mb; + settings.query_queue_elements = 8; + settings.index_queue_size = 1_mb; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1_mb; + settings.disposable_queue_elements = 1; + settings.capacity = 10_mb; + settings.max_file_block_size = config::file_cache_each_block_size; + settings.max_query_cache_size = 0; + ASSERT_TRUE( + FileCacheFactory::instance()->create_file_cache(test_cache_base_path, settings).ok()); + auto cache = FileCacheFactory::instance()->_path_to_cache[test_cache_base_path]; + ASSERT_TRUE(wait_until([&] { return cache->get_async_open_success(); })); + + std::unique_ptr pool; + ASSERT_TRUE(ThreadPoolBuilder("CacheBlockAwarePrefetchRemoteReaderUsageTest") + .set_min_threads(2) + .set_max_threads(4) + .build(&pool) + .ok()); + ExecEnv::GetInstance()->_segment_prefetch_thread_pool = std::move(pool); + + FileReaderOptions opts; + opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; + opts.is_doris_table = true; + opts.tablet_id = 10086; + + auto first_remote_reader = std::make_shared( + Path("/tmp/cache_block_prefetch_usage_example_first"), 5_mb); + auto first_reader = + std::make_shared(first_remote_reader, opts); + auto second_remote_reader = std::make_shared( + Path("/tmp/cache_block_prefetch_usage_example_second"), 5_mb); + auto second_reader = + std::make_shared(second_remote_reader, opts); + CacheBlockPrefetchPolicy policy { + .max_prefetch_blocks = 2, + .cache_block_size = static_cast(config::file_cache_each_block_size), + }; + + // The two readers stand for two independent column iterators. Their + // patterns are intentionally different to show that progress is isolated by + // the reader object instead of by an external read-pattern handle. + ASSERT_TRUE(first_reader + ->set_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 0, .size = 1}, + {.offset = 1_mb, .size = 1}, + {.offset = 2_mb, .size = 1}, + }, + }, + policy) + .ok()); + ASSERT_TRUE(second_reader + ->set_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 3_mb, .size = 1}, + {.offset = 4_mb, .size = 1}, + }, + }, + policy) + .ok()); + ASSERT_TRUE(first_reader->has_read_pattern()); + ASSERT_TRUE(second_reader->has_read_pattern()); + + char buf; + size_t bytes_read = 0; + IOContext io_ctx; + auto read_offsets = [](const std::shared_ptr& reader) { + std::set offsets; + for (const auto& range : reader->read_ranges()) { + offsets.emplace(range.offset); + } + return offsets; + }; + + // A normal read on the first reader touches the first reader's window only. + // The second reader remains idle until its own read_at() observes file + // offset 3 MiB. + ASSERT_TRUE(first_reader->read_at(0, Slice(&buf, 1), &bytes_read, &io_ctx).ok()); + EXPECT_EQ(bytes_read, 1); + ASSERT_TRUE(wait_until([&] { + auto offsets = read_offsets(first_remote_reader); + return offsets.contains(0) && offsets.contains(1_mb); + })); + EXPECT_TRUE(second_remote_reader->read_ranges().empty()); + + // Reading the second iterator advances only the second pattern. This mirrors + // segment scans where each physical column iterator owns its own + // CacheBlockAwarePrefetchRemoteReader. + ASSERT_TRUE(second_reader->read_at(3_mb, Slice(&buf, 1), &bytes_read, &io_ctx).ok()); + EXPECT_EQ(bytes_read, 1); + ASSERT_TRUE(wait_until([&] { + auto offsets = read_offsets(second_remote_reader); + return offsets.contains(3_mb) && offsets.contains(4_mb); + })); + + first_reader->clear_read_pattern(); + EXPECT_FALSE(first_reader->has_read_pattern()); + EXPECT_TRUE(second_reader->has_read_pattern()); +} + +TEST_F(BlockFileCacheTest, + cache_block_aware_prefetch_remote_reader_touches_initial_window_before_read_at) { + std::string test_cache_base_path = caches_dir / "initial_prefetch_window" / ""; + auto cleanup = [&] { + ExecEnv::GetInstance()->_segment_prefetch_thread_pool.reset(); + if (fs::exists(test_cache_base_path)) { + fs::remove_all(test_cache_base_path); + } + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; + }; + cleanup(); + Defer defer {cleanup}; + + fs::create_directories(test_cache_base_path); + FileCacheSettings settings; + settings.query_queue_size = 8_mb; + settings.query_queue_elements = 8; + settings.index_queue_size = 1_mb; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1_mb; + settings.disposable_queue_elements = 1; + settings.capacity = 10_mb; + settings.max_file_block_size = config::file_cache_each_block_size; + settings.max_query_cache_size = 0; + ASSERT_TRUE( + FileCacheFactory::instance()->create_file_cache(test_cache_base_path, settings).ok()); + auto cache = FileCacheFactory::instance()->_path_to_cache[test_cache_base_path]; + ASSERT_TRUE(wait_until([&] { return cache->get_async_open_success(); })); + + std::unique_ptr pool; + ASSERT_TRUE(ThreadPoolBuilder("CacheBlockAwarePrefetchRemoteReaderInitialWindowTest") + .set_min_threads(2) + .set_max_threads(4) + .build(&pool) + .ok()); + ExecEnv::GetInstance()->_segment_prefetch_thread_pool = std::move(pool); + + FileReaderOptions opts; + opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; + opts.is_doris_table = true; + opts.tablet_id = 10086; + + auto remote_reader = std::make_shared( + Path("/tmp/cache_block_prefetch_initial_window"), 4_mb); + auto reader = std::make_shared(remote_reader, opts); + CacheBlockPrefetchPolicy policy { + .max_prefetch_blocks = 2, + .cache_block_size = static_cast(config::file_cache_each_block_size), + }; + ASSERT_TRUE(reader->set_read_pattern( + CacheBlockReadPattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 0, .size = 1}, + {.offset = 1_mb, .size = 1}, + {.offset = 2_mb, .size = 1}, + }, + }, + policy) + .ok()); + + // Predicate columns in SegmentIterator use this path after their read pattern is installed: + // the first window is touched before PageIO issues the first foreground read_at(), because + // those ranges are guaranteed to be consumed by predicate evaluation. + IOContext io_ctx; + reader->async_touch_initial_window(&io_ctx); + ASSERT_TRUE(wait_until([&] { + std::set offsets; + for (const auto& range : remote_reader->read_ranges()) { + offsets.emplace(range.offset); + } + return offsets.contains(0) && offsets.contains(1_mb); + })); + + auto key = BlockFileCache::hash("cache_block_prefetch_initial_window"); + CacheContext context; + ReadStatistics stats; + context.stats = &stats; + context.cache_type = FileCacheType::NORMAL; + ASSERT_TRUE(wait_until([&] { + auto holder = cache->get_or_set(key, 0, 2_mb, context); + auto blocks = fromHolder(holder); + return blocks.size() == 2 && blocks[0]->state() == FileBlock::State::DOWNLOADED && + blocks[1]->state() == FileBlock::State::DOWNLOADED; + })); + + // The initial touch advances the same cursor used by read_at()-triggered prefetch, so calling + // it again without scan progress is idempotent and does not resubmit the already touched + // cache blocks. + const auto read_count = remote_reader->read_ranges().size(); + reader->async_touch_initial_window(&io_ctx); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(remote_reader->read_ranges().size(), read_count); +} + +TEST_F(BlockFileCacheTest, cached_remote_file_reader_async_touch_local_cache_downloads_range) { + std::string test_cache_base_path = caches_dir / "async_touch_local_cache" / ""; + auto cleanup = [&] { + ExecEnv::GetInstance()->_segment_prefetch_thread_pool.reset(); + if (fs::exists(test_cache_base_path)) { + fs::remove_all(test_cache_base_path); + } + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; + }; + cleanup(); + Defer defer {cleanup}; + + fs::create_directories(test_cache_base_path); + FileCacheSettings settings; + settings.query_queue_size = 8_mb; + settings.query_queue_elements = 8; + settings.index_queue_size = 1_mb; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1_mb; + settings.disposable_queue_elements = 1; + settings.capacity = 10_mb; + settings.max_file_block_size = config::file_cache_each_block_size; + settings.max_query_cache_size = 0; + ASSERT_TRUE( + FileCacheFactory::instance()->create_file_cache(test_cache_base_path, settings).ok()); + auto cache = FileCacheFactory::instance()->_path_to_cache[test_cache_base_path]; + ASSERT_TRUE(wait_until([&] { return cache->get_async_open_success(); })); + + std::unique_ptr pool; + ASSERT_TRUE(ThreadPoolBuilder("CachedRemoteFileReaderAsyncTouchTest") + .set_min_threads(2) + .set_max_threads(4) + .build(&pool) + .ok()); + ExecEnv::GetInstance()->_segment_prefetch_thread_pool = std::move(pool); + + FileReaderOptions opts; + opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; + opts.is_doris_table = true; + opts.tablet_id = 10086; + + auto remote_reader = std::make_shared( + Path("/tmp/cached_remote_async_touch_local_cache"), 2_mb); + auto reader = std::make_shared(remote_reader, opts); + + reader->async_touch_local_cache(0, 1_mb); + ASSERT_TRUE(wait_until([&] { return !remote_reader->read_ranges().empty(); })); + auto read_ranges = remote_reader->read_ranges(); + ASSERT_EQ(read_ranges[0].offset, 0); + + auto key = BlockFileCache::hash("cached_remote_async_touch_local_cache"); + CacheContext context; + ReadStatistics stats; + context.stats = &stats; + context.cache_type = FileCacheType::NORMAL; + ASSERT_TRUE(wait_until([&] { + auto holder = cache->get_or_set(key, 0, 1_mb, context); + auto blocks = fromHolder(holder); + return blocks.size() == 1 && blocks[0]->state() == FileBlock::State::DOWNLOADED; + })); +} + +TEST_F(BlockFileCacheTest, cache_block_aware_prefetch_remote_reader_prefetches_cache_blocks) { + std::string test_cache_base_path = caches_dir / "cache_block_aware_prefetch_remote_reader" / ""; + auto cleanup = [&] { + ExecEnv::GetInstance()->_segment_prefetch_thread_pool.reset(); + if (fs::exists(test_cache_base_path)) { + fs::remove_all(test_cache_base_path); + } + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; + }; + cleanup(); + Defer defer {cleanup}; + + fs::create_directories(test_cache_base_path); + FileCacheSettings settings; + settings.query_queue_size = 8_mb; + settings.query_queue_elements = 8; + settings.index_queue_size = 1_mb; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1_mb; + settings.disposable_queue_elements = 1; + settings.capacity = 10_mb; + settings.max_file_block_size = config::file_cache_each_block_size; + settings.max_query_cache_size = 0; + ASSERT_TRUE( + FileCacheFactory::instance()->create_file_cache(test_cache_base_path, settings).ok()); + auto cache = FileCacheFactory::instance()->_path_to_cache[test_cache_base_path]; + ASSERT_TRUE(wait_until([&] { return cache->get_async_open_success(); })); + + auto raw_reader = + std::make_shared(Path("/tmp/create_cached_reader"), 1024); + FileReaderOptions cached_reader_opts; + cached_reader_opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; + cached_reader_opts.is_doris_table = true; + cached_reader_opts.tablet_id = 10086; + + auto cached_reader = create_cached_file_reader(raw_reader, cached_reader_opts); + ASSERT_TRUE(cached_reader.has_value()); + EXPECT_NE(dynamic_cast(cached_reader.value().get()), nullptr); + EXPECT_EQ(dynamic_cast(cached_reader.value().get()), + nullptr); + + cached_reader_opts.enable_cache_block_prefetch = true; + auto prefetch_raw_reader = + std::make_shared(Path("/tmp/create_prefetch_reader"), 1024); + cached_reader = create_cached_file_reader(prefetch_raw_reader, cached_reader_opts); + ASSERT_TRUE(cached_reader.has_value()); + EXPECT_NE(dynamic_cast(cached_reader.value().get()), + nullptr); + + std::unique_ptr pool; + ASSERT_TRUE(ThreadPoolBuilder("CacheBlockAwarePrefetchRemoteReaderTest") + .set_min_threads(2) + .set_max_threads(4) + .build(&pool) + .ok()); + ExecEnv::GetInstance()->_segment_prefetch_thread_pool = std::move(pool); + + auto remote_reader = std::make_shared( + Path("/tmp/cache_block_aware_prefetch_remote_reader_file"), 4_mb); + FileReaderOptions opts; + opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; + opts.is_doris_table = true; + opts.tablet_id = 10086; + auto reader = std::make_shared(remote_reader, opts); + + CacheBlockReadPattern pattern { + .direction = CacheBlockReadDirection::FORWARD, + .ranges = + { + {.offset = 0, .size = 1}, + {.offset = 1_mb, .size = 1}, + {.offset = 2_mb, .size = 1}, + {.offset = 3_mb, .size = 1}, + }, + }; + CacheBlockPrefetchPolicy policy { + .max_prefetch_blocks = 2, + .cache_block_size = static_cast(config::file_cache_each_block_size), + }; + ASSERT_TRUE(reader->set_read_pattern(std::move(pattern), policy).ok()); + ASSERT_TRUE(reader->has_read_pattern()); + + char buf; + size_t bytes_read = 0; + IOContext io_ctx; + ASSERT_TRUE(reader->read_at(0, Slice(&buf, 1), &bytes_read, &io_ctx).ok()); + EXPECT_EQ(bytes_read, 1); + ASSERT_TRUE(wait_until([&] { return remote_reader->read_ranges().size() >= 2; })); + auto read_ranges = remote_reader->read_ranges(); + std::set offsets; + for (const auto& range : read_ranges) { + offsets.emplace(range.offset); + } + EXPECT_TRUE(offsets.contains(0)); + EXPECT_TRUE(offsets.contains(1_mb)); + + ASSERT_TRUE(reader->read_at(1_mb, Slice(&buf, 1), &bytes_read, &io_ctx).ok()); + EXPECT_EQ(bytes_read, 1); + ASSERT_TRUE(wait_until([&] { return remote_reader->read_ranges().size() >= 3; })); + read_ranges = remote_reader->read_ranges(); + offsets.clear(); + for (const auto& range : read_ranges) { + offsets.emplace(range.offset); + } + EXPECT_TRUE(offsets.contains(2_mb)); + + const auto read_count = remote_reader->read_ranges().size(); + ASSERT_TRUE(reader->read_at(1_mb, Slice(&buf, 1), &bytes_read, &io_ctx).ok()); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(remote_reader->read_ranges().size(), read_count); + + auto key = BlockFileCache::hash("cache_block_aware_prefetch_remote_reader_file"); + CacheContext context; + ReadStatistics stats; + context.stats = &stats; + context.cache_type = FileCacheType::NORMAL; + auto holder = cache->get_or_set(key, 0, 2_mb, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 2); + EXPECT_EQ(blocks[0]->state(), FileBlock::State::DOWNLOADED); + EXPECT_EQ(blocks[1]->state(), FileBlock::State::DOWNLOADED); +} + +} // namespace doris::io