Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plottr/apps/inspectr.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ def loadDB(self) -> None:
overview: Optional[Dict[int, Any]] = None
if self.use_fast_sql:
try:
overview = get_db_overview(self.path)
overview = get_db_overview(self.path,
extra_columns=['inspectr_tag'])
except Exception as e:
LOGGER.warning(f"Fast SQL overview failed, falling back to "
f"qcodes API: {e}")
Expand Down
226 changes: 119 additions & 107 deletions plottr/data/qcodes_db_overview.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,34 @@
QCoDeS database schema, avoiding the expensive experiments()/data_sets()
enumeration.

**Intended for eventual contribution to QCoDeS.** The queries here rely on the
stable QCoDeS database schema (runs + experiments tables) which has not changed
across many QCoDeS versions.
The implementation has been contributed to QCoDeS: when a QCoDeS version that
exposes ``qcodes.dataset.get_db_overview`` is installed, that implementation is
used. For older QCoDeS versions the equivalent local implementation below is
used as a fallback. The queries rely on the stable QCoDeS database schema
(runs + experiments tables) which has not changed across many QCoDeS versions.
"""
import datetime
import json
import sys
import time
import logging
from contextlib import closing
from typing import Dict, Optional, Tuple
import sqlite3
from contextlib import closing, nullcontext
from typing import Dict, Optional, Sequence, Tuple

from typing_extensions import TypedDict

from qcodes.dataset.sqlite.database import conn_from_dbpath_or_conn
from qcodes.dataset.sqlite.query_helpers import is_column_in_table
Comment thread
astafan8 marked this conversation as resolved.
Outdated

logger = logging.getLogger(__name__)


def _records_from_run_description(run_description_json: Optional[str]) -> int:
"""Extract record count from run_description shapes field.
class RunOverviewDict(TypedDict):
"""Lightweight run overview — no snapshot, no data, no full DataSet.

QCoDeS run_description may contain a ``shapes`` dict mapping dependent
parameter names to their shape tuples. The total data-point count is the
product of shape dimensions summed across all parameter trees — matching
the semantics of ``DataSet.number_of_results``.
Extra ad-hoc metadata columns requested via the ``extra_columns`` argument
of :func:`get_db_overview` are added under their column name in addition to
the keys documented here (e.g. ``inspectr_tag``).
"""
if not run_description_json:
return 0
try:
desc = json.loads(run_description_json)
shapes = desc.get('shapes')
if not shapes:
return 0
total = 0
for shape in shapes.values():
if isinstance(shape, (list, tuple)) and len(shape) > 0:
n = 1
for dim in shape:
n *= dim
# Each parameter tree contributes n_values * n_params_in_tree
# But shapes only has dependent params, and number_of_results
# counts all values including axes. For display purposes,
# the product of the shape is the most useful number.
total += n
return total
except (json.JSONDecodeError, TypeError, KeyError):
return 0


class RunOverviewDict(TypedDict):
"""Lightweight run overview — no snapshot, no data, no full DataSet."""
run_id: int
experiment: str
sample: str
Expand All @@ -67,22 +44,50 @@ class RunOverviewDict(TypedDict):
completed_time: str
records: int
guid: str
inspectr_tag: str


def _format_timestamp(ts: Optional[float]) -> Tuple[str, str]:
"""Convert a unix timestamp float to (date, time) strings."""
"""Convert a unix timestamp float to (date, time) strings in local time."""
if ts is None or ts == 0:
return '', ''
try:
t = time.localtime(ts)
return time.strftime('%Y-%m-%d', t), time.strftime('%H:%M:%S', t)
dt = datetime.datetime.fromtimestamp(ts)
except (OSError, ValueError, OverflowError):
return '', ''
return dt.strftime('%Y-%m-%d'), dt.strftime('%H:%M:%S')


def _records_from_run_description(run_description_json: Optional[str]) -> int:
"""Extract record count from run_description shapes field.

def get_db_overview(db_path: str,
QCoDeS run_description may contain a ``shapes`` dict mapping dependent
parameter names to their shape tuples. The total data-point count is the
product of shape dimensions summed across all parameter trees.
"""
if not run_description_json:
return 0
try:
desc = json.loads(run_description_json)
except (json.JSONDecodeError, TypeError):
return 0
shapes = desc.get('shapes') if isinstance(desc, dict) else None
if not shapes:
return 0
total = 0
for shape in shapes.values():
if isinstance(shape, (list, tuple)) and len(shape) > 0:
n = 1
for dim in shape:
n *= dim
total += n
return total


def get_db_overview(path_to_db: Optional[str] = None,
*,
conn: Optional[object] = None,
start_run_id: int = 0,
extra_columns: Optional[Sequence[str]] = None,
) -> Dict[int, RunOverviewDict]:
"""Get a lightweight overview of all runs in a QCoDeS database.

Expand All @@ -93,39 +98,40 @@ def get_db_overview(db_path: str,
For a database with 1500 runs, this completes in ~10ms vs 15+ minutes
with the standard QCoDeS API.

:param db_path: path to the .db file.
:param path_to_db: path to the .db file. Opened read-only if given.
:param conn: an existing connection to use instead of ``path_to_db``. It is
left open by this function.
:param start_run_id: only return runs with run_id > start_run_id.
Use 0 to get all runs. Pass the last known run_id for incremental
refresh.
:param extra_columns: names of additional ``runs``-table columns to include
in each overview dict (e.g. ad-hoc metadata columns such as
``inspectr_tag``). Columns not present in the ``runs`` table are
silently skipped.
:returns: dict mapping run_id to RunOverviewDict.
"""
overview: Dict[int, RunOverviewDict] = {}

if sys.version_info >= (3, 11):
conn = conn_from_dbpath_or_conn(conn=None, path_to_db=db_path, read_only=True)
else:
conn = conn_from_dbpath_or_conn(conn=None, path_to_db=db_path)

with closing(conn) as c:
# Check which ad-hoc metadata columns exist in the runs table.
# QCoDeS stores metadata added via ds.add_metadata() as extra columns.
try:
col_info = c.execute('PRAGMA table_info(runs)').fetchall()
col_names = {col[1] for col in col_info}
except Exception:
col_names = set()
created_conn = conn is None
connection = conn_from_dbpath_or_conn(
conn=conn, path_to_db=path_to_db, read_only=True # type: ignore[arg-type]
)
manager = closing(connection) if created_conn else nullcontext(connection)

has_inspectr_tag = 'inspectr_tag' in col_names
with manager as c:
valid_extra_columns = [
col for col in (extra_columns or [])
if is_column_in_table(c, 'runs', col)
]
extra_select = ''.join(f", r.{col}" for col in valid_extra_columns)

# Build query: include inspectr_tag column if it exists.
# Includes run_description to extract shape info for record count.
# Deliberately excludes snapshot (large blob).
tag_col = ", r.inspectr_tag" if has_inspectr_tag else ""
query = f"""
SELECT r.run_id, e.name, e.sample_name, r.name,
r.run_timestamp, r.completed_timestamp,
r.result_counter, r.guid, r.result_table_name,
r.run_description{tag_col}
r.guid, r.result_table_name,
r.run_description{extra_select}
FROM runs r
JOIN experiments e ON r.exp_id = e.exp_id
WHERE r.run_id > ?
Expand All @@ -134,67 +140,73 @@ def get_db_overview(db_path: str,

try:
rows = c.execute(query, (start_run_id,)).fetchall()
except Exception as e:
except sqlite3.Error as e:
logger.warning(f"Could not query database overview: {e}")
return overview

# Build a map of actual row counts from each results table.
# result_counter in the runs table counts INSERT calls, not data points.
# For array paramtype one INSERT can contain thousands of data points,
# so result_counter can be much smaller than the real data point count.
results_tables: set[str] = set()
for row in rows:
tbl = row[8] # result_table_name
if tbl:
results_tables.add(tbl)
row_counts: dict[str, int] = {}
for tbl in results_tables:
# result_counter in the runs table is the run's ordinal within its
# experiment, not a data-point count, so it is not usable here. For
# array paramtype one INSERT can also contain thousands of data points,
# so query the real row count of each results table separately.
result_tables = {row[7] for row in rows if row[7]}
row_counts: Dict[str, int] = {}
for table in result_tables:
try:
cnt = c.execute(
f'SELECT COUNT(*) FROM "{tbl}"'
).fetchone()
row_counts[tbl] = cnt[0] if cnt else 0
except Exception:
pass # table may not exist

tag_col_idx = 10 if has_inspectr_tag else -1
(count,) = c.execute(f'SELECT COUNT(*) FROM "{table}"').fetchone()
except sqlite3.Error:
continue # results table may not exist (yet)
row_counts[table] = count

n_fixed = 9 # number of columns selected before extra_columns
for row in rows:
run_id = row[0]
started_date, started_time = _format_timestamp(row[4])
completed_date, completed_time = _format_timestamp(row[5])
tag = row[tag_col_idx] if tag_col_idx > 0 and len(row) > tag_col_idx and row[tag_col_idx] else ''
result_table = row[8] or ''
result_table = row[7] or ''
is_completed = row[5] is not None and row[5] != 0

# Determine record count.
# For completed datasets: prefer shape metadata (authoritative
# final count) over results table rows.
# For active (incomplete) datasets: prefer results table rows
# (live count that grows as data is added).
# Fall back to result_counter if nothing else is available.
# final count) over results table rows. For active datasets: prefer
# results table rows (live count that grows as data is added).
# 0 means "unknown".
if is_completed:
records = _records_from_run_description(row[9])
records = _records_from_run_description(row[8])
if records == 0:
records = row_counts.get(result_table, 0)
else:
records = row_counts.get(result_table, 0)
if records == 0:
records = _records_from_run_description(row[9])
if records == 0:
records = row[6] or 0

overview[run_id] = RunOverviewDict(
run_id=run_id,
experiment=row[1] or '',
sample=row[2] or '',
name=row[3] or '',
started_date=started_date,
started_time=started_time,
completed_date=completed_date,
completed_time=completed_time,
records=records,
guid=row[7] or '',
inspectr_tag=tag,
)
records = _records_from_run_description(row[8])

entry: RunOverviewDict = {
'run_id': run_id,
'experiment': row[1] or '',
'sample': row[2] or '',
'name': row[3] or '',
'started_date': started_date,
'started_time': started_time,
'completed_date': completed_date,
'completed_time': completed_time,
'records': records,
'guid': row[6] or '',
}
if valid_extra_columns:
extra = {col: row[n_fixed + i]
for i, col in enumerate(valid_extra_columns)}
# The keys of ``extra`` are only known at runtime (the
# user-supplied ``extra_columns``), so they cannot be part of
# the closed ``RunOverviewDict`` definition.
entry.update(extra) # type: ignore[typeddict-item]

overview[run_id] = entry

return overview


try:
# Prefer the upstream QCoDeS implementation when it is available; it is
# exported on ``qcodes.dataset`` from the QCoDeS version that upstreamed
# this function. Fall back to the local implementation above otherwise.
from qcodes.dataset import get_db_overview # type: ignore[no-redef] # noqa: F811
except ImportError:
pass
Comment thread
astafan8 marked this conversation as resolved.
Outdated
Loading