From a881246801e052b51f6425d4270e1441bd3c8448 Mon Sep 17 00:00:00 2001 From: yim_rei <65397527+yimtheppariyapol@users.noreply.github.com> Date: Wed, 10 Jun 2026 19:49:36 +0700 Subject: [PATCH] fix(falkordb): route single-group reads onto the group's graph MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FalkorDB stores each group_id in its own graph, so a read must clone the driver onto that graph. `handle_multiple_group_ids` only did this when `len(group_ids) > 1`, so a single-group read fell through to the base driver (pinned at default_db) and returned nothing — even though the data was physically present. The write path mutates `self.driver` onto the group graph, which masked the bug in same-process write-then-read tests. Replace the `len > 1` gate with a blank-filtered `routable_group_ids` (mirrors search()'s `!= ['']` convention), so a single real group routes and a `['']`/`[None]` group no longer routes onto an empty graph name. Also memoize `FalkorDriver.clone()` per database. `__init__` schedules build_indices_and_constraints() on every construction; that is load-bearing exactly once per group but pure churn on every subsequent read now that single-group reads clone. Caching keeps the once-per-group index build while removing the per-call constructor cost. Follow-up (not in this PR): the write-path `self.driver` mutation in graphiti.py could be made non-mutating so reads and writes share one routing mechanism. --- graphiti_core/decorators.py | 13 ++++++++---- graphiti_core/driver/falkordb_driver.py | 27 ++++++++++++++++--------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/graphiti_core/decorators.py b/graphiti_core/decorators.py index 9a4fd903fc..389e644ccd 100644 --- a/graphiti_core/decorators.py +++ b/graphiti_core/decorators.py @@ -44,13 +44,18 @@ async def wrapper(self, *args, **kwargs): if group_ids is None and group_ids_pos is not None and len(args) > group_ids_pos: group_ids = args[group_ids_pos] - # Only handle FalkorDB with multiple group_ids + # FalkorDB stores each group_id in its own graph, so reads must clone the + # driver onto the group's graph. This applies to a single group_id too: the + # base driver points at default_db, so an unrouted single-group read finds + # nothing (writes mutate self.driver and masked this in same-process tests). + # Blank/None entries mean "unspecified" (mirrors search()'s `!= ['']` rule), + # so filter them out rather than routing onto an empty graph name. + routable_group_ids = [gid for gid in group_ids if gid] if group_ids else [] if ( hasattr(self, 'clients') and hasattr(self.clients, 'driver') and self.clients.driver.provider == GraphProvider.FALKORDB - and group_ids - and len(group_ids) > 1 + and routable_group_ids ): # Execute for each group_id concurrently driver = self.clients.driver @@ -68,7 +73,7 @@ async def execute_for_group(gid: str): ) results = await semaphore_gather( - *[execute_for_group(gid) for gid in group_ids], + *[execute_for_group(gid) for gid in routable_group_ids], max_coroutines=getattr(self, 'max_coroutines', None), ) diff --git a/graphiti_core/driver/falkordb_driver.py b/graphiti_core/driver/falkordb_driver.py index 2268b3dd43..96c0b2a11a 100644 --- a/graphiti_core/driver/falkordb_driver.py +++ b/graphiti_core/driver/falkordb_driver.py @@ -322,17 +322,24 @@ async def build_indices_and_constraints(self, delete_existing=False): def clone(self, database: str) -> 'GraphDriver': """ - Returns a shallow copy of this driver with a different default database. - Reuses the same connection (e.g. FalkorDB, Neo4j). + Returns a driver bound to ``database``, reusing this driver's connection. + + Constructed drivers are memoized per database. FalkorDriver.__init__ + schedules build_indices_and_constraints() on every construction, which is + load-bearing exactly once per group (a new group's graph needs its indices + built) but pure churn on every subsequent read. Caching keeps the + once-per-group index build while removing the per-call constructor cost + that single-group reads would otherwise incur on every query. """ - if database == self._database: - cloned = self - elif database == self.default_group_id: - cloned = FalkorDriver(falkor_db=self.client) - else: - # Create a new instance of FalkorDriver with the same connection but a different database - cloned = FalkorDriver(falkor_db=self.client, database=database) - + target = 'default_db' if database == self.default_group_id else database + if target == self._database: + return self + + cache = self.__dict__.setdefault('_clone_cache', {}) + cloned = cache.get(target) + if cloned is None: + cloned = FalkorDriver(falkor_db=self.client, database=target) + cache[target] = cloned return cloned async def health_check(self) -> None: