Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 87 additions & 11 deletions skills/slack-channel-monitor/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,15 @@ def _state_file_path() -> str:
def load_state(path: str) -> dict:
if os.path.exists(path):
with open(path) as f:
return json.load(f)
state = json.load(f)
print(
f"[state] loaded {path}: "
f"{len(state.get('bot_message_ts', []))} bot_message_ts, "
f"{len(state.get('conversations', {}))} conversations, "
f"{len(state.get('last_poll', {}))} last_poll entries"
)
return state
print(f"[state] no existing state at {path} — starting fresh")
return {
"version": 1,
"bot_user_id": None,
Expand All @@ -143,6 +151,11 @@ def load_state(path: str) -> dict:


def save_state(path: str, state: dict) -> None:
print(
f"[state] saving {path}: "
f"{len(state.get('bot_message_ts', []))} bot_message_ts, "
f"{len(state.get('conversations', {}))} conversations"
)
with open(path, "w") as f:
json.dump(state, f, indent=2)

Expand Down Expand Up @@ -217,7 +230,16 @@ def post_message(token: str, channel: str, text: str, thread_ts: str | None = No
body: dict = {"channel": channel, "text": text}
if thread_ts:
body["thread_ts"] = thread_ts
return slack_post(token, "chat.postMessage", body).get("ts", "")
preview = text.replace("\n", " ")[:80]
thread_info = f"thread_ts={thread_ts}" if thread_ts else "top-level"
print(f"[slack-post] → channel={channel} {thread_info} len={len(text)} preview={preview!r}")
try:
ts = slack_post(token, "chat.postMessage", body).get("ts", "")
except RuntimeError as exc:
print(f"[slack-post] ✗ chat.postMessage failed: {exc}")
raise
print(f"[slack-post] ✓ posted ts={ts}")
return ts


def channel_history(token: str, channel: str, oldest: str, limit: int = 100) -> list[dict]:
Expand Down Expand Up @@ -364,14 +386,23 @@ def conversation_final_response(agent_url: str, api_key: str, conv_id: str) -> s
# ── Message filtering ──────────────────────────────────────────────────────────

def _is_human_message(msg: dict, bot_user_id: str, bot_message_ts: list[str]) -> bool:
"""Return True if the message was posted by a human and not by this bot."""
"""Return True if the message was posted by a human and not by this bot.

Logs a `[dedupe]` line whenever a message is skipped so it's easy to trace
why an apparently-matching message did not trigger a conversation.
"""
msg_ts = msg.get("ts", "?")
if msg.get("bot_id"):
print(f"[dedupe] skip ts={msg_ts} reason=bot_id={msg.get('bot_id')}")
return False
if msg.get("subtype"):
print(f"[dedupe] skip ts={msg_ts} reason=subtype={msg.get('subtype')}")
return False
if msg.get("user") == bot_user_id:
print(f"[dedupe] skip ts={msg_ts} reason=self_user={bot_user_id}")
return False
if msg.get("ts") in bot_message_ts:
print(f"[dedupe] skip ts={msg_ts} reason=previously_posted_by_bot")
return False
return True

Expand Down Expand Up @@ -552,6 +583,10 @@ def _process_trigger_message(
ts_back = post_message(slack_token, channel_id, link_text, thread_ts=thread_root)
if ts_back:
bot_message_ts.append(ts_back)
print(
f"[dedupe] recorded bot reply ts={ts_back} "
f"(total tracked: {len(bot_message_ts)})"
)

print(f" Created conversation {conv_id} ({conv_url})")
except Exception as exc:
Expand All @@ -568,41 +603,57 @@ def _check_conversation_completion(
) -> None:
"""Post the agent's final response to the Slack thread when the conversation finishes."""
last_activity: float = rec.get("last_activity", 0.0)
if (time.time() - last_activity) < DONE_DEBOUNCE:
idle_for = time.time() - last_activity
if idle_for < DONE_DEBOUNCE:
print(
f"[reply-back] {conv_key} idle for {idle_for:.1f}s "
f"(< {DONE_DEBOUNCE}s debounce) — skipping status check"
)
return

conv_id = rec["conversation_id"]
channel_id = rec["channel_id"]
thread_ts = rec["thread_ts"]

print(f"[reply-back] {conv_key} idle for {idle_for:.1f}s — fetching status of conv={conv_id}")
try:
status = conversation_status(agent_url, api_key, conv_id)
except Exception as exc:
print(f" Warning: could not get status for {conv_id}: {exc}")
print(f"[reply-back] ✗ status fetch failed for conv={conv_id}: {exc}")
return

print(f" {conv_key} status={status}")
print(f"[reply-back] {conv_key} status={status}")

if status in ("idle", "finished", "error", "stuck"):
try:
final = conversation_final_response(agent_url, api_key, conv_id)
except Exception:
print(f"[reply-back] retrieved final response len={len(final)} chars")
except Exception as exc:
print(f"[reply-back] ✗ could not fetch final response: {exc}")
final = ""

if status in ("error", "stuck"):
summary = (
f"⚠️ The agent encountered a problem (status: *{status}*)."
+ (f"\n\n{final}" if final else "")
)
print(f"[reply-back] posting ERROR summary to {channel_id} thread={thread_ts}")
else:
summary = f"✅ Done!\n\n{final}" if final else "✅ Task complete (no summary available)."
print(f"[reply-back] posting SUCCESS summary to {channel_id} thread={thread_ts}")

ts_back = post_message(slack_token, channel_id, summary, thread_ts=thread_ts)
if ts_back:
bot_message_ts.append(ts_back)
print(
f"[dedupe] recorded bot reply ts={ts_back} "
f"(total tracked: {len(bot_message_ts)})"
)

rec["status"] = "closed"
print(f" Posted summary for {conv_key}")
print(f"[reply-back] ✓ {conv_key} marked closed")
else:
print(f"[reply-back] {conv_key} still in-progress (status={status}) — will retry next poll")


# ── Main ───────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -650,10 +701,17 @@ def main() -> None:
all_incoming = _poll_new_messages(
slack_token, use_search, oldest_by_channel, global_oldest, active_convs
)
print(
f"[poll] fetched {len(all_incoming)} candidate message(s); "
f"currently tracking {len(active_convs)} conversation(s) "
f"and {len(bot_message_ts)} bot-posted ts(s) for dedupe"
)

for cid in CHANNEL_IDS:
state["last_poll"][cid] = now_ts

triggers_fired = 0
replies_forwarded = 0
for channel_id, msg in all_incoming:
if not _is_human_message(msg, bot_user_id, bot_message_ts):
continue
Expand All @@ -675,43 +733,61 @@ def main() -> None:
and active_convs[conv_key].get("status") != "closed"
)

print(
f"[match] ts={msg_ts} channel={channel_id} conv_key={conv_key} "
f"has_trigger={has_trigger} is_reply_in_tracked={is_reply_in_tracked}"
)

# ── Case A: reply in a thread that has an active conversation ──────────
if is_reply_in_tracked:
rec = active_convs[conv_key]
print(f" Forwarding reply {msg_ts} → conversation {rec['conversation_id']}")
print(f"[forward] reply ts={msg_ts} → conversation {rec['conversation_id']}")
try:
send_to_conversation(agent_url, api_key, rec["conversation_id"],
f"User replied in Slack thread: {text}")
rec["status"] = "active"
rec["last_activity"] = time.time()
replies_forwarded += 1
except Exception as exc:
print(f" Warning: failed to forward reply: {exc}")
print(f"[forward] ✗ failed to forward reply: {exc}")
if has_trigger and can_react:
add_reaction(slack_token, channel_id, msg_ts)
continue

# ── Case B: message contains trigger phrase → create a new conversation ─
if has_trigger:
triggers_fired += 1
_process_trigger_message(
slack_token, agent_url, api_key, openhands_url,
channel_id, msg_ts, text, thread_root, conv_key,
active_convs, bot_message_ts, bot_user_id, can_react,
)

print(
f"[poll] processed: {triggers_fired} new conversation(s), "
f"{replies_forwarded} reply/replies forwarded"
)

print(f"[reply-back] sweeping {len(active_convs)} tracked conversation(s) for completion")
for conv_key, rec in list(active_convs.items()):
if rec.get("status") != "closed":
_check_conversation_completion(
conv_key, rec, agent_url, api_key, slack_token, bot_message_ts,
)
else:
print(f"[reply-back] {conv_key} already closed — skip")

if len(bot_message_ts) > MAX_BOT_TS:
print(
f"[dedupe] bot_message_ts size {len(bot_message_ts)} exceeds MAX_BOT_TS={MAX_BOT_TS}; "
f"trimming to last {MAX_BOT_TS}"
)
state["bot_message_ts"] = bot_message_ts[-MAX_BOT_TS:]
else:
state["bot_message_ts"] = bot_message_ts

state["conversations"] = active_convs
save_state(state_path, state)
print(f"State saved to {state_path}")


try:
Expand Down
Loading