diff --git a/test/src/unit-vfs.cc b/test/src/unit-vfs.cc index d856713e1a7..a800b82b2fd 100644 --- a/test/src/unit-vfs.cc +++ b/test/src/unit-vfs.cc @@ -926,6 +926,69 @@ TEST_CASE("VFS: Test remove_dir_if_empty", "[vfs][remove-dir-if-empty]") { REQUIRE_NOTHROW(vfs.remove_dir(URI(path))); } +// Basic smoke test for persistent file handle caching in Posix::write(). +// The fd is kept open across consecutive writes and only closed on flush(). +TEST_CASE( + "VFS: Persistent file handles for local writes", + "[vfs][persistent-handles]") { + ThreadPool compute_tp(4); + ThreadPool io_tp(4); + VFS vfs{ + &g_helper_stats, g_helper_logger().get(), &compute_tp, &io_tp, Config{}}; + std::string base = local_path(); + + SECTION("Multiple writes accumulate before flush") { + URI testfile(base + "multi_write_file"); + + std::string part1 = "Hello, "; + std::string part2 = "persistent "; + std::string part3 = "handles!"; + REQUIRE_NOTHROW(vfs.write(testfile, part1.data(), part1.size())); + REQUIRE_NOTHROW(vfs.write(testfile, part2.data(), part2.size())); + REQUIRE_NOTHROW(vfs.write(testfile, part3.data(), part3.size())); + + require_tiledb_ok(vfs.close_file(testfile)); + + std::string expected = "Hello, persistent handles!"; + CHECK(vfs.file_size(testfile) == expected.size()); + + std::vector buf(expected.size()); + require_tiledb_ok(vfs.read_exactly(testfile, 0, buf.data(), buf.size())); + CHECK(std::string(buf.data(), buf.size()) == expected); + + REQUIRE_NOTHROW(vfs.remove_file(testfile)); + } + + SECTION("Append across separate open cycles") { + URI testfile(base + "reopen_append_file"); + + // First write cycle. + std::string data1 = "first"; + REQUIRE_NOTHROW(vfs.write(testfile, data1.data(), data1.size())); + require_tiledb_ok(vfs.close_file(testfile)); + CHECK(vfs.file_size(testfile) == data1.size()); + + // Second write cycle -- file already exists on disk. + std::string data2 = "second"; + REQUIRE_NOTHROW(vfs.write(testfile, data2.data(), data2.size())); + require_tiledb_ok(vfs.close_file(testfile)); + + std::string expected = "firstsecond"; + CHECK(vfs.file_size(testfile) == expected.size()); + + std::vector buf(expected.size()); + require_tiledb_ok(vfs.read_exactly(testfile, 0, buf.data(), buf.size())); + CHECK(std::string(buf.data(), buf.size()) == expected); + + REQUIRE_NOTHROW(vfs.remove_file(testfile)); + } + + // Clean up the test directory. + if (vfs.is_dir(URI(base))) { + REQUIRE_NOTHROW(vfs.remove_dir(URI(base))); + } +} + #ifdef HAVE_AZURE TEST_CASE("VFS: Construct Azure Blob Storage endpoint URIs", "[azure][uri]") { // Test the construction of Azure Blob Storage URIs from account name and SAS diff --git a/tiledb/sm/filesystem/posix.cc b/tiledb/sm/filesystem/posix.cc index f2bba81bc7d..40a6beb8b9a 100644 --- a/tiledb/sm/filesystem/posix.cc +++ b/tiledb/sm/filesystem/posix.cc @@ -123,6 +123,14 @@ class PosixDIR { optional dir_; }; +Posix::~Posix() { + std::lock_guard lock(open_files_mtx_); + for (auto& [path, of] : open_files_) { + ::close(of.fd); + } + open_files_.clear(); +} + Posix::Posix(const Config& config) { // Initialize member variables with posix config parameters. @@ -199,8 +207,22 @@ bool Posix::is_file(const URI& uri) const { return (stat(uri.to_path().c_str(), &st) == 0) && !S_ISDIR(st.st_mode); } +void Posix::evict_cached_fds(const std::string& path_prefix) const { + std::lock_guard lock(open_files_mtx_); + for (auto it = open_files_.begin(); it != open_files_.end();) { + if (it->first == path_prefix || + it->first.compare(0, path_prefix.size() + 1, path_prefix + "/") == 0) { + ::close(it->second.fd); + it = open_files_.erase(it); + } else { + ++it; + } + } +} + void Posix::remove_dir(const URI& uri) const { auto path = uri.to_path(); + evict_cached_fds(path); int rc = nftw(path.c_str(), unlink_cb, 64, FTW_DEPTH | FTW_PHYS); if (rc) { throw IOError( @@ -223,6 +245,7 @@ bool Posix::remove_dir_if_empty(const std::string& path) const { void Posix::remove_file(const URI& uri) const { auto path = uri.to_path(); + evict_cached_fds(path); if (remove(path.c_str()) != 0) { throw IOError( std::string("Cannot delete file '") + path + "'; " + strerror(errno)); @@ -245,6 +268,7 @@ uint64_t Posix::file_size(const URI& uri) const { } void Posix::move_file(const URI& old_path, const URI& new_path) const { + evict_cached_fds(old_path.to_path()); auto new_uri_path = new_path.to_path(); throw_if_not_ok(ensure_directory(new_uri_path)); if (rename(old_path.to_path().c_str(), new_path.to_path().c_str()) != 0) { @@ -296,7 +320,34 @@ uint64_t Posix::read( } void Posix::flush(const URI& uri, bool) { - sync(uri); + auto path = uri.to_path(); + int fd = -1; + + { + std::lock_guard lock(open_files_mtx_); + auto it = open_files_.find(path); + if (it != open_files_.end()) { + fd = it->second.fd; + open_files_.erase(it); + } + } + + if (fd != -1) { + // fsync and close the cached file descriptor. + if (::fsync(fd) != 0) { + auto err = errno; + ::close(fd); + throw IOError( + std::string("Cannot sync file '") + path + "'; " + strerror(err)); + } + if (::close(fd) != 0) { + throw IOError( + std::string("Cannot close file '") + path + "'; " + strerror(errno)); + } + } else { + // No cached fd (e.g. directory sync or file not written through us). + sync(uri); + } } void Posix::sync(const URI& uri) const { @@ -350,32 +401,42 @@ void Posix::write( } } - // Get file offset (equal to file size) - Status st; - uint64_t file_offset = 0; - if (is_file(URI(path))) { - file_offset = file_size(URI(path)); - } else { - throw_if_not_ok(ensure_directory(path)); - } + int fd; + uint64_t file_offset; - // Open or create file. - int fd = open(path.c_str(), O_WRONLY | O_CREAT, file_permissions_); - if (fd == -1) { - throw IOError( - std::string("Cannot open file '") + path + "'; " + strerror(errno)); + { + std::lock_guard lock(open_files_mtx_); + auto it = open_files_.find(path); + if (it != open_files_.end()) { + // Reuse the cached file descriptor. + fd = it->second.fd; + file_offset = it->second.offset; + } else { + // First write to this path: open fd and determine current size. + if (is_file(URI(path))) { + file_offset = file_size(URI(path)); + } else { + throw_if_not_ok(ensure_directory(path)); + file_offset = 0; + } + fd = ::open(path.c_str(), O_WRONLY | O_CREAT, file_permissions_); + if (fd == -1) { + throw IOError( + std::string("Cannot open file '") + path + "'; " + strerror(errno)); + } + open_files_.emplace(path, OpenFile{fd, file_offset}); + } } - st = write_at(fd, file_offset, buffer, buffer_size); + auto st = write_at(fd, file_offset, buffer, buffer_size); if (!st.ok()) { - close(fd); - std::stringstream errmsg; - errmsg << "Cannot write to file '" << path << "'; " << st.message(); - throw IOError(errmsg.str()); - } - if (close(fd) != 0) { throw IOError( - std::string("Cannot close file '") + path + "'; " + strerror(errno)); + std::string("Cannot write to file '") + path + "'; " + st.message()); + } + + { + std::lock_guard lock(open_files_mtx_); + open_files_[path].offset = file_offset + buffer_size; } } diff --git a/tiledb/sm/filesystem/posix.h b/tiledb/sm/filesystem/posix.h index fdcdcd1451a..a0224fa64ea 100644 --- a/tiledb/sm/filesystem/posix.h +++ b/tiledb/sm/filesystem/posix.h @@ -41,7 +41,9 @@ #include #include +#include #include +#include #include #include "tiledb/common/status.h" @@ -79,8 +81,8 @@ class Posix : public LocalFilesystem { /** Constructor. */ explicit Posix(const Config& config); - /** Destructor. */ - ~Posix() override = default; + /** Destructor. Closes any cached file descriptors. */ + ~Posix() override; /* ********************************* */ /* API */ @@ -299,6 +301,25 @@ class Posix : public LocalFilesystem { private: uint32_t file_permissions_, directory_permissions_; + + /** + * Closes and removes cached fds whose path equals or is under the given + * prefix. Called from const removal/move methods to keep the cache + * consistent. + */ + void evict_cached_fds(const std::string& path_prefix) const; + + /** State for a file descriptor kept open between write() and flush(). */ + struct OpenFile { + int fd; + uint64_t offset; + }; + + /** Maps path -> cached file descriptor and current write offset. */ + mutable std::unordered_map open_files_; + + /** Protects open_files_. */ + mutable std::mutex open_files_mtx_; }; } // namespace sm diff --git a/tiledb/sm/filesystem/win.cc b/tiledb/sm/filesystem/win.cc index 61123cdff43..f46a05deb13 100644 --- a/tiledb/sm/filesystem/win.cc +++ b/tiledb/sm/filesystem/win.cc @@ -66,6 +66,14 @@ using tiledb::common::filesystem::directory_entry; namespace tiledb::sm { +Win::~Win() { + std::lock_guard lock(open_files_mtx_); + for (auto& [path, of] : open_files_) { + CloseHandle(of.handle); + } + open_files_.clear(); +} + namespace { /** Returns the last Windows error message string. */ std::string get_last_error_msg_desc(decltype(GetLastError()) gle) { @@ -279,9 +287,23 @@ Status Win::recursively_remove_directory(const std::string& path) const { get_last_error_msg(gle, offender.c_str())))); } +void Win::evict_cached_handles(const std::string& path_prefix) const { + std::lock_guard lock(open_files_mtx_); + for (auto it = open_files_.begin(); it != open_files_.end();) { + if (it->first == path_prefix || + it->first.compare(0, path_prefix.size() + 1, path_prefix + "\\") == 0) { + CloseHandle(it->second.handle); + it = open_files_.erase(it); + } else { + ++it; + } + } +} + void Win::remove_dir(const URI& uri) const { auto path = uri.to_path(); if (is_dir(uri)) { + evict_cached_handles(path); throw_if_not_ok(recursively_remove_directory(path)); } else { throw WindowsException( @@ -304,6 +326,7 @@ bool Win::remove_dir_if_empty(const std::string& path) const { void Win::remove_file(const URI& uri) const { auto path = uri.to_path(); + evict_cached_handles(path); if (!DeleteFile(path.c_str())) { throw WindowsException(std::string( "Failed to delete file '" + path + "' " + @@ -418,6 +441,7 @@ std::vector Win::ls_with_sizes( void Win::move_path(const URI& old_uri, const URI& new_uri) const { auto old_path = old_uri.to_path(); + evict_cached_handles(old_path); auto new_path = new_uri.to_path(); if (MoveFileEx( old_path.c_str(), new_path.c_str(), MOVEFILE_REPLACE_EXISTING) == 0) { @@ -503,7 +527,36 @@ uint64_t Win::read( } void Win::flush(const URI& uri, bool) { - sync(uri); + auto path = uri.to_path(); + HANDLE file_h = INVALID_HANDLE_VALUE; + + { + std::lock_guard lock(open_files_mtx_); + auto it = open_files_.find(path); + if (it != open_files_.end()) { + file_h = it->second.handle; + open_files_.erase(it); + } + } + + if (file_h != INVALID_HANDLE_VALUE) { + // Flush and close the cached HANDLE. + if (FlushFileBuffers(file_h) == 0) { + auto gle = GetLastError(); + CloseHandle(file_h); + throw WindowsException( + "Cannot sync file '" + path + "'; " + + get_last_error_msg(gle, "FlushFileBuffers")); + } + if (CloseHandle(file_h) == 0) { + throw WindowsException( + "Cannot close file '" + path + "'; " + + get_last_error_msg("CloseHandle")); + } + } else { + // No cached handle (e.g. directory sync or file not written through us). + sync(uri); + } } void Win::sync(const URI& uri) const { @@ -546,39 +599,56 @@ void Win::sync(const URI& uri) const { void Win::write( const URI& uri, const void* buffer, uint64_t buffer_size, bool) { auto path = uri.to_path(); - throw_if_not_ok(ensure_directory(path)); - // Open the file for appending, creating it if it doesn't exist. - HANDLE file_h = CreateFile( - path.c_str(), - GENERIC_WRITE, - 0, - NULL, - OPEN_ALWAYS, - FILE_ATTRIBUTE_NORMAL, - NULL); - if (file_h == INVALID_HANDLE_VALUE) { - throw WindowsException( - "Cannot write to file '" + path + "'; File opening error " + - get_last_error_msg("CreateFile")); + + HANDLE file_h; + uint64_t file_offset; + + { + std::lock_guard lock(open_files_mtx_); + auto it = open_files_.find(path); + if (it != open_files_.end()) { + // Reuse the cached HANDLE. + file_h = it->second.handle; + file_offset = it->second.offset; + } else { + // First write to this path: open HANDLE and cache it. + throw_if_not_ok(ensure_directory(path)); + file_h = CreateFile( + path.c_str(), + GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_DELETE, + NULL, + OPEN_ALWAYS, + FILE_ATTRIBUTE_NORMAL, + NULL); + if (file_h == INVALID_HANDLE_VALUE) { + throw WindowsException( + "Cannot write to file '" + path + "'; File opening error " + + get_last_error_msg("CreateFile")); + } + // Determine current file size for append semantics. + LARGE_INTEGER file_size_lg_int; + if (!GetFileSizeEx(file_h, &file_size_lg_int)) { + auto gle = GetLastError(); + CloseHandle(file_h); + throw WindowsException( + "Cannot write to file '" + path + "'; File size error " + + get_last_error_msg(gle, "GetFileSizeEx")); + } + file_offset = file_size_lg_int.QuadPart; + open_files_.emplace(path, OpenFile{file_h, file_offset}); + } } - // Get the current file size. - LARGE_INTEGER file_size_lg_int; - if (!GetFileSizeEx(file_h, &file_size_lg_int)) { - auto gle = GetLastError(); - CloseHandle(file_h); + + auto st = write_at(file_h, file_offset, buffer, buffer_size); + if (!st.ok()) { throw WindowsException( - "Cannot write to file '" + path + "'; File size error " + - get_last_error_msg(gle, "GetFileSizeEx")); + "Cannot write to file '" + path + "'; " + st.message()); } - uint64_t file_offset = file_size_lg_int.QuadPart; - if (!write_at(file_h, file_offset, buffer, buffer_size).ok()) { - CloseHandle(file_h); - throw WindowsException("Cannot write to file '" + path); - } - // Always close the handle. - if (CloseHandle(file_h) == 0) { - throw WindowsException( - "Cannot write to file '" + path + "'; File closing error"); + + { + std::lock_guard lock(open_files_mtx_); + open_files_[path].offset = file_offset + buffer_size; } } diff --git a/tiledb/sm/filesystem/win.h b/tiledb/sm/filesystem/win.h index 5c2f186b9cf..90363cd54ee 100644 --- a/tiledb/sm/filesystem/win.h +++ b/tiledb/sm/filesystem/win.h @@ -36,7 +36,9 @@ #ifdef _WIN32 #include +#include #include +#include #include #include "tiledb/common/status.h" @@ -87,6 +89,9 @@ class Win : public LocalFilesystem { Win(const Config&) { } + /** Destructor. Cleans up any HANDLEs still in the cache. */ + ~Win() override; + /* ********************************* */ /* API */ /* ********************************* */ @@ -280,6 +285,22 @@ class Win : public LocalFilesystem { uint64_t file_offset, const void* buffer, uint64_t buffer_size); + + /** + * Closes and removes cached HANDLEs whose path equals or is under the given + * prefix. Called from const removal/move methods to keep the cache + * consistent. + */ + void evict_cached_handles(const std::string& path_prefix) const; + + /** Cached HANDLE + write offset for a file that's still being written to. */ + struct OpenFile { + HANDLE handle; + uint64_t offset; + }; + + mutable std::unordered_map open_files_; + mutable std::mutex open_files_mtx_; }; } // namespace tiledb::sm