Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
fd6a3cb
spec: identities, collections, and deduplication
jesserobbins Apr 20, 2026
f60ab4c
feat: add deduplication engine with identity discovery
jesserobbins Apr 21, 2026
7a33020
feat: add collections and multi-source query filtering
jesserobbins Apr 21, 2026
68c6d34
test: add coverage for dedup, collections, identities
jesserobbins Apr 21, 2026
3080a0f
fix: DuckDB soft-delete filtering, collection auto-init, TOML escaping
jesserobbins Apr 22, 2026
b2aa98c
fix: zero-source scope, batchID path traversal, empty SourceIDs filter
jesserobbins Apr 22, 2026
52f1cd4
fix: list-identities empty-scope guard, DuckDB soft-delete in GetGmai…
jesserobbins Apr 22, 2026
b5c3b86
fix: dry-run is read-only, require scoped AccountSourceIDs in Engine.…
jesserobbins Apr 22, 2026
5086043
fix: skip blank entries in IdentityAddressSet
jesserobbins Apr 22, 2026
6d2801f
fix: normalizeRawMIME uses earliest header/body boundary
jesserobbins Apr 22, 2026
ef35ac0
fix: DuckDB GetGmailIDsByFilter fallback uses appendSourceFilter for …
jesserobbins Apr 22, 2026
19caff5
fix: document UndoDedup scope — restores rows, not merge side effects
jesserobbins Apr 22, 2026
29e34d0
logging: warn with resolved log file path
jesserobbins Apr 23, 2026
23f6b61
test: cover empty-scope inputs for source filter and dedup engine
jesserobbins Apr 23, 2026
8c8fa4a
list-identities: robust parsing and dialect portability note
jesserobbins Apr 23, 2026
cfdaab4
store: atomic collection membership updates
jesserobbins Apr 23, 2026
22fbf66
store: dialect-portable SQL and NullTime scanning for dedup
jesserobbins Apr 23, 2026
0dd9c53
dedup: report accuracy, manifest ID uniqueness, undo best-effort
jesserobbins Apr 23, 2026
3575629
dedup cli: VACUUM INTO backup, multi-undo, partial-success reporting
jesserobbins Apr 23, 2026
4c5ff50
store: don't skip EnsureDefaultCollection when FTS5 unavailable
jesserobbins Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions cmd/msgvault/cmd/account_scope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package cmd

import (
"errors"
"fmt"

"github.com/wesm/msgvault/internal/store"
)

// AccountScope is the result of resolving a user-supplied --account
// flag against the store.
type AccountScope struct {
Input string
Source *store.Source
Collection *store.CollectionWithSources
}

// IsEmpty reports whether the scope resolved to nothing.
func (s AccountScope) IsEmpty() bool {
return s.Source == nil && s.Collection == nil
}

// IsCollection reports whether the scope refers to a collection.
func (s AccountScope) IsCollection() bool {
return s.Collection != nil
}

// SourceIDs returns the source IDs that this scope expands to.
func (s AccountScope) SourceIDs() []int64 {
switch {
case s.Collection != nil:
return append([]int64(nil), s.Collection.SourceIDs...)
case s.Source != nil:
return []int64{s.Source.ID}
}
return nil
}

// DisplayName returns a human-readable label for the scope.
func (s AccountScope) DisplayName() string {
switch {
case s.Collection != nil:
return s.Collection.Name
case s.Source != nil:
return s.Source.Identifier
}
return ""
}

// ResolveAccount resolves a user-supplied --account string against
// the store. Collections are checked first, then sources.
func ResolveAccount(
st *store.Store, input string,
) (AccountScope, error) {
scope := AccountScope{Input: input}
if input == "" {
return scope, nil
}

// Try collection first.
coll, err := st.GetCollectionByName(input)
switch {
case err == nil:
scope.Collection = coll
return scope, nil
case errors.Is(err, store.ErrCollectionNotFound):
// Fall through to source lookup.
default:
return scope, fmt.Errorf(
"look up collection %q: %w", input, err,
)
}

// Source lookup.
sources, err := st.GetSourcesByIdentifierOrDisplayName(input)
if err != nil {
return scope, fmt.Errorf(
"look up source for %q: %w", input, err,
)
}
if len(sources) == 0 {
return scope, fmt.Errorf(
"no collection or source found for %q "+
"(try 'msgvault collections list' or "+
"'msgvault list-accounts')",
input,
)
}
if len(sources) > 1 {
names := make([]string, 0, len(sources))
for _, s := range sources {
names = append(names, fmt.Sprintf(
"%s (%s, id=%d)",
s.Identifier, s.SourceType, s.ID,
))
}
return scope, fmt.Errorf(
"ambiguous account %q matches multiple sources: %v",
input, names,
)
}
scope.Source = sources[0]
return scope, nil
}
2 changes: 1 addition & 1 deletion cmd/msgvault/cmd/build_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func setupSQLiteSource(duckDB *sql.DB, dbPath string) (cleanup func(), err error
query string
typeOverrides string // DuckDB types parameter for read_csv_auto (empty = infer all)
}{
{"messages", "SELECT id, source_id, source_message_id, conversation_id, subject, snippet, sent_at, size_estimate, has_attachments, attachment_count, deleted_from_source_at, sender_id, message_type FROM messages WHERE sent_at IS NOT NULL",
{"messages", "SELECT id, source_id, source_message_id, conversation_id, subject, snippet, sent_at, size_estimate, has_attachments, attachment_count, deleted_from_source_at, sender_id, message_type FROM messages WHERE sent_at IS NOT NULL AND deleted_at IS NULL",
"types={'sent_at': 'TIMESTAMP', 'deleted_from_source_at': 'TIMESTAMP'}"},
{"message_recipients", "SELECT message_id, participant_id, recipient_type, display_name FROM message_recipients", ""},
{"message_labels", "SELECT message_id, label_id FROM message_labels", ""},
Expand Down
8 changes: 5 additions & 3 deletions cmd/msgvault/cmd/build_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func setupTestSQLite(t *testing.T) (string, func()) {
deleted_from_source_at TIMESTAMP,
sender_id INTEGER,
message_type TEXT NOT NULL DEFAULT 'email',
deleted_at DATETIME,
UNIQUE(source_id, source_message_id)
);

Expand Down Expand Up @@ -1133,7 +1134,7 @@ func TestBuildCache_EmptyDatabase(t *testing.T) {
db, _ := sql.Open("sqlite3", dbPath)
_, _ = db.Exec(`
CREATE TABLE sources (id INTEGER PRIMARY KEY, identifier TEXT);
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP, attachment_count INTEGER DEFAULT 0, sender_id INTEGER, message_type TEXT NOT NULL DEFAULT 'email');
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP, attachment_count INTEGER DEFAULT 0, sender_id INTEGER, message_type TEXT NOT NULL DEFAULT 'email', deleted_at DATETIME);
CREATE TABLE participants (id INTEGER PRIMARY KEY, email_address TEXT, domain TEXT, display_name TEXT, phone_number TEXT);
CREATE TABLE message_recipients (message_id INTEGER, participant_id INTEGER, recipient_type TEXT, display_name TEXT);
CREATE TABLE labels (id INTEGER PRIMARY KEY, name TEXT);
Expand Down Expand Up @@ -1333,7 +1334,7 @@ func BenchmarkBuildCache(b *testing.B) {
// Create schema
_, _ = db.Exec(`
CREATE TABLE sources (id INTEGER PRIMARY KEY, identifier TEXT);
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP, attachment_count INTEGER DEFAULT 0, sender_id INTEGER, message_type TEXT NOT NULL DEFAULT 'email');
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP, attachment_count INTEGER DEFAULT 0, sender_id INTEGER, message_type TEXT NOT NULL DEFAULT 'email', deleted_at DATETIME);
CREATE TABLE participants (id INTEGER PRIMARY KEY, email_address TEXT UNIQUE, domain TEXT, display_name TEXT, phone_number TEXT);
CREATE TABLE message_recipients (message_id INTEGER, participant_id INTEGER, recipient_type TEXT, display_name TEXT);
CREATE TABLE labels (id INTEGER PRIMARY KEY, name TEXT);
Expand Down Expand Up @@ -1427,6 +1428,7 @@ func setupTestSQLiteEmpty(t *testing.T) (string, func()) {
deleted_from_source_at TIMESTAMP,
sender_id INTEGER,
message_type TEXT NOT NULL DEFAULT 'email',
deleted_at DATETIME,
UNIQUE(source_id, source_message_id)
);
CREATE TABLE participants (
Expand Down Expand Up @@ -1991,7 +1993,7 @@ func BenchmarkBuildCacheIncremental(b *testing.B) {
// Create schema and initial data (10000 messages)
_, _ = db.Exec(`
CREATE TABLE sources (id INTEGER PRIMARY KEY, identifier TEXT);
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP, attachment_count INTEGER DEFAULT 0, sender_id INTEGER, message_type TEXT NOT NULL DEFAULT 'email');
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP, attachment_count INTEGER DEFAULT 0, sender_id INTEGER, message_type TEXT NOT NULL DEFAULT 'email', deleted_at DATETIME);
CREATE TABLE participants (id INTEGER PRIMARY KEY, email_address TEXT UNIQUE, domain TEXT, display_name TEXT, phone_number TEXT);
CREATE TABLE message_recipients (message_id INTEGER, participant_id INTEGER, recipient_type TEXT, display_name TEXT);
CREATE TABLE labels (id INTEGER PRIMARY KEY, name TEXT);
Expand Down
238 changes: 238 additions & 0 deletions cmd/msgvault/cmd/collections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package cmd

import (
"fmt"
"os"
"strconv"
"strings"
"text/tabwriter"

"github.com/spf13/cobra"
"github.com/wesm/msgvault/internal/store"
)

var collectionsCmd = &cobra.Command{
Use: "collections",
Short: "Manage named groups of accounts",
Long: `Collections are named groupings of accounts that let you view and
deduplicate across multiple sources as one unified archive.

A default "All" collection is created automatically and includes
every account.`,
}

var collectionsCreateCmd = &cobra.Command{
Use: "create <name> --accounts <email1,email2,...>",
Short: "Create a new collection",
Args: cobra.ExactArgs(1),
RunE: runCollectionsCreate,
}

var collectionsListCmd = &cobra.Command{
Use: "list",
Short: "List all collections",
RunE: runCollectionsList,
}

var collectionsShowCmd = &cobra.Command{
Use: "show <name>",
Short: "Show collection details",
Args: cobra.ExactArgs(1),
RunE: runCollectionsShow,
}

var collectionsAddCmd = &cobra.Command{
Use: "add <name> --accounts <email1,email2,...>",
Short: "Add accounts to a collection",
Args: cobra.ExactArgs(1),
RunE: runCollectionsAdd,
}

var collectionsRemoveCmd = &cobra.Command{
Use: "remove <name> --accounts <email1,email2,...>",
Short: "Remove accounts from a collection",
Args: cobra.ExactArgs(1),
RunE: runCollectionsRemove,
}

var collectionsDeleteCmd = &cobra.Command{
Use: "delete <name>",
Short: "Delete a collection (sources and messages are untouched)",
Args: cobra.ExactArgs(1),
RunE: runCollectionsDelete,
}

var collectionsAccounts string

func runCollectionsCreate(_ *cobra.Command, args []string) error {
st, err := openStoreAndInit()
if err != nil {
return err
}
defer func() { _ = st.Close() }()

name := args[0]
sourceIDs, err := resolveAccountList(st, collectionsAccounts)
if err != nil {
return err
}

coll, err := st.CreateCollection(name, "", sourceIDs)
if err != nil {
return err
}
fmt.Printf("Created collection %q with %d source(s).\n",
coll.Name, len(sourceIDs))
return nil
}

func runCollectionsList(_ *cobra.Command, _ []string) error {
st, err := openStoreAndInit()
if err != nil {
return err
}
defer func() { _ = st.Close() }()

collections, err := st.ListCollections()
if err != nil {
return err
}
if len(collections) == 0 {
fmt.Println("No collections.")
return nil
}

w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
_, _ = fmt.Fprintln(w, "NAME\tSOURCES\tMESSAGES")
for _, c := range collections {
_, _ = fmt.Fprintf(w, "%s\t%d\t%s\n",
c.Name, len(c.SourceIDs),
formatCount(c.MessageCount))
}
_ = w.Flush()
return nil
}

func runCollectionsShow(_ *cobra.Command, args []string) error {
st, err := openStoreAndInit()
if err != nil {
return err
}
defer func() { _ = st.Close() }()

coll, err := st.GetCollectionByName(args[0])
if err != nil {
return err
}

fmt.Printf("Collection: %s\n", coll.Name)
if coll.Description != "" {
fmt.Printf("Description: %s\n", coll.Description)
}
fmt.Printf("Sources: %d\n", len(coll.SourceIDs))
fmt.Printf("Messages: %s\n", formatCount(coll.MessageCount))
fmt.Printf("Created: %s\n", coll.CreatedAt.Format("2006-01-02 15:04"))

if len(coll.SourceIDs) > 0 {
fmt.Println("\nMember source IDs:", coll.SourceIDs)
}
return nil
}

func runCollectionsAdd(_ *cobra.Command, args []string) error {
st, err := openStoreAndInit()
if err != nil {
return err
}
defer func() { _ = st.Close() }()

sourceIDs, err := resolveAccountList(st, collectionsAccounts)
if err != nil {
return err
}

if err := st.AddSourcesToCollection(args[0], sourceIDs); err != nil {
return err
}
fmt.Printf("Added %d source(s) to %q.\n", len(sourceIDs), args[0])
return nil
}

func runCollectionsRemove(_ *cobra.Command, args []string) error {
st, err := openStoreAndInit()
if err != nil {
return err
}
defer func() { _ = st.Close() }()

sourceIDs, err := resolveAccountList(st, collectionsAccounts)
if err != nil {
return err
}

if err := st.RemoveSourcesFromCollection(args[0], sourceIDs); err != nil {
return err
}
fmt.Printf("Removed %d source(s) from %q.\n", len(sourceIDs), args[0])
return nil
}

func runCollectionsDelete(_ *cobra.Command, args []string) error {
st, err := openStoreAndInit()
if err != nil {
return err
}
defer func() { _ = st.Close() }()

if err := st.DeleteCollection(args[0]); err != nil {
return err
}
fmt.Printf("Deleted collection %q.\n", args[0])
return nil
}

func resolveAccountList(st *store.Store, accounts string) ([]int64, error) {
if accounts == "" {
return nil, fmt.Errorf("--accounts is required")
}
parts := strings.Split(accounts, ",")
var ids []int64
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
// Try as numeric ID first
if id, err := strconv.ParseInt(p, 10, 64); err == nil {
ids = append(ids, id)
continue
}
// Resolve by identifier
scope, err := ResolveAccount(st, p)
if err != nil {
return nil, err
}
ids = append(ids, scope.SourceIDs()...)
}
if len(ids) == 0 {
return nil, fmt.Errorf("no valid accounts in --accounts")
}
return ids, nil
}

func init() {
rootCmd.AddCommand(collectionsCmd)
collectionsCmd.AddCommand(collectionsCreateCmd)
collectionsCmd.AddCommand(collectionsListCmd)
collectionsCmd.AddCommand(collectionsShowCmd)
collectionsCmd.AddCommand(collectionsAddCmd)
collectionsCmd.AddCommand(collectionsRemoveCmd)
collectionsCmd.AddCommand(collectionsDeleteCmd)

collectionsCreateCmd.Flags().StringVar(&collectionsAccounts,
"accounts", "", "Comma-separated account emails or source IDs")
collectionsAddCmd.Flags().StringVar(&collectionsAccounts,
"accounts", "", "Comma-separated account emails or source IDs")
collectionsRemoveCmd.Flags().StringVar(&collectionsAccounts,
"accounts", "", "Comma-separated account emails or source IDs")
}
Loading