Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions graphiti_core/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ async def wrapper(self, *args, **kwargs):
if group_ids is None and group_ids_pos is not None and len(args) > group_ids_pos:
group_ids = args[group_ids_pos]

# Only handle FalkorDB with multiple group_ids
# FalkorDB stores each group_id in its own graph, so reads must clone the
# driver onto the group's graph. This applies to a single group_id too: the
# base driver points at default_db, so an unrouted single-group read finds
# nothing (writes mutate self.driver and masked this in same-process tests).
# Blank/None entries mean "unspecified" (mirrors search()'s `!= ['']` rule),
# so filter them out rather than routing onto an empty graph name.
routable_group_ids = [gid for gid in group_ids if gid] if group_ids else []
if (
hasattr(self, 'clients')
and hasattr(self.clients, 'driver')
and self.clients.driver.provider == GraphProvider.FALKORDB
and group_ids
and len(group_ids) > 1
and routable_group_ids
):
# Execute for each group_id concurrently
driver = self.clients.driver
Expand All @@ -68,7 +73,7 @@ async def execute_for_group(gid: str):
)

results = await semaphore_gather(
*[execute_for_group(gid) for gid in group_ids],
*[execute_for_group(gid) for gid in routable_group_ids],
max_coroutines=getattr(self, 'max_coroutines', None),
)

Expand Down
27 changes: 17 additions & 10 deletions graphiti_core/driver/falkordb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,17 +322,24 @@ async def build_indices_and_constraints(self, delete_existing=False):

def clone(self, database: str) -> 'GraphDriver':
"""
Returns a shallow copy of this driver with a different default database.
Reuses the same connection (e.g. FalkorDB, Neo4j).
Returns a driver bound to ``database``, reusing this driver's connection.

Constructed drivers are memoized per database. FalkorDriver.__init__
schedules build_indices_and_constraints() on every construction, which is
load-bearing exactly once per group (a new group's graph needs its indices
built) but pure churn on every subsequent read. Caching keeps the
once-per-group index build while removing the per-call constructor cost
that single-group reads would otherwise incur on every query.
"""
if database == self._database:
cloned = self
elif database == self.default_group_id:
cloned = FalkorDriver(falkor_db=self.client)
else:
# Create a new instance of FalkorDriver with the same connection but a different database
cloned = FalkorDriver(falkor_db=self.client, database=database)

target = 'default_db' if database == self.default_group_id else database
if target == self._database:
return self

cache = self.__dict__.setdefault('_clone_cache', {})
cloned = cache.get(target)
if cloned is None:
cloned = FalkorDriver(falkor_db=self.client, database=target)
cache[target] = cloned
return cloned

async def health_check(self) -> None:
Expand Down
Loading