diff --git a/docs/configuration/sinks/telegram.rst b/docs/configuration/sinks/telegram.rst index c25f54625..5d697ec97 100644 --- a/docs/configuration/sinks/telegram.rst +++ b/docs/configuration/sinks/telegram.rst @@ -52,10 +52,15 @@ Now we're ready to configure the Telegram sink. bot_token: chat_id: thread_id: # Optional thread (topic) ID + parse_mode: MarkdownV2 # Optional. "MarkdownV2" (default) or null for plain text .. note:: If you don't want Robusta to send file attachments, set ``send_files`` to ``False`` under your Telegram sink. (True by default) +.. note:: + + By default, Robusta formats Telegram messages using `MarkdownV2 `_, escaping special characters in your alert content so resource names containing characters like ``_`` (e.g. ``crowdsec-agent_k8vkt``) don't break message formatting. If you'd rather receive unformatted messages, set ``parse_mode`` to ``null`` to send plain text. + Save the file and run .. code-block:: bash diff --git a/src/robusta/core/sinks/telegram/telegram_client.py b/src/robusta/core/sinks/telegram/telegram_client.py index fb23d34a4..242964583 100644 --- a/src/robusta/core/sinks/telegram/telegram_client.py +++ b/src/robusta/core/sinks/telegram/telegram_client.py @@ -1,19 +1,24 @@ import logging import os -from typing import Union +from typing import Optional, Union import requests from robusta.core.reporting.utils import PNG_SUFFIX, SVG_SUFFIX, convert_svg_to_png, is_image TELEGRAM_BASE_URL = os.environ.get("TELEGRAM_BASE_URL", "https://api.telegram.org") +# guard the notification path against a slow/unreachable Telegram API hanging the sink +TELEGRAM_REQUEST_TIMEOUT_SECONDS = 30 class TelegramClient: - def __init__(self, chat_id: Union[int, str], thread_id: int, bot_token: str): + def __init__( + self, chat_id: Union[int, str], thread_id: int, bot_token: str, parse_mode: Optional[str] = "MarkdownV2" + ): self.chat_id = int(chat_id) self.thread_id = thread_id self.bot_token = bot_token + self.parse_mode = parse_mode def send_message(self, message: str, disable_links_preview: bool = True): url = f"{TELEGRAM_BASE_URL}/bot{self.bot_token}/sendMessage" @@ -21,10 +26,11 @@ def send_message(self, message: str, disable_links_preview: bool = True): "chat_id": self.chat_id, "message_thread_id": self.thread_id, "disable_web_page_preview": disable_links_preview, - "parse_mode": "Markdown", "text": message, } - response = requests.post(url, json=message_json) + if self.parse_mode is not None: + message_json["parse_mode"] = self.parse_mode + response = requests.post(url, json=message_json, timeout=TELEGRAM_REQUEST_TIMEOUT_SECONDS) if response.status_code != 200: logging.error( @@ -39,7 +45,7 @@ def send_file(self, file_name: str, contents: bytes): file_name = file_name.replace(SVG_SUFFIX, PNG_SUFFIX) files = {file_type.lower(): (file_name, contents)} - response = requests.post(url, files=files) + response = requests.post(url, files=files, timeout=TELEGRAM_REQUEST_TIMEOUT_SECONDS) if response.status_code != 200: logging.error( diff --git a/src/robusta/core/sinks/telegram/telegram_sink.py b/src/robusta/core/sinks/telegram/telegram_sink.py index fd5a10c1e..e3c6ce64c 100644 --- a/src/robusta/core/sinks/telegram/telegram_sink.py +++ b/src/robusta/core/sinks/telegram/telegram_sink.py @@ -7,7 +7,7 @@ from robusta.core.sinks.sink_base import SinkBase from robusta.core.sinks.telegram.telegram_client import TelegramClient from robusta.core.sinks.telegram.telegram_sink_params import TelegramSinkConfigWrapper -from robusta.core.sinks.transformer import Transformer +from robusta.core.sinks.telegram.telegram_transformer import TelegramTransformer SEVERITY_EMOJI_MAP = { FindingSeverity.INFO: "\U0001F7E2", @@ -23,10 +23,10 @@ class TelegramSink(SinkBase): def __init__(self, sink_config: TelegramSinkConfigWrapper, registry): super().__init__(sink_config.telegram_sink, registry) - self.client = TelegramClient( - sink_config.telegram_sink.chat_id, sink_config.telegram_sink.thread_id, sink_config.telegram_sink.bot_token - ) - self.send_files = sink_config.telegram_sink.send_files + params = sink_config.telegram_sink + self.client = TelegramClient(params.chat_id, params.thread_id, params.bot_token, params.parse_mode) + self.send_files = params.send_files + self.transformer = TelegramTransformer(params.parse_mode) def write_finding(self, finding: Finding, platform_enabled: bool): self.__send_telegram_message(finding, platform_enabled) @@ -56,8 +56,7 @@ def __get_message_text(self, finding: Finding, platform_enabled: bool): if actions_content: message_content += actions_content - blocks = [MarkdownBlock(text=f"*Source:* `{self.cluster_name}`\n\n")] - + blocks = [] # first add finding description block if finding.description: blocks.append(MarkdownBlock(finding.description)) @@ -65,8 +64,13 @@ def __get_message_text(self, finding: Finding, platform_enabled: bool): for enrichment in finding.enrichments: blocks.extend([block for block in enrichment.blocks if self.__is_telegram_text_block(block)]) + source_line = f"{self.transformer.bold('Source:')} {self.transformer.code(self.cluster_name)}\n\n" + message_content += source_line + for block in blocks: - block_text = Transformer.to_standard_markdown([block]) + block_text = self.transformer.block_to_markdownv2(block) + if not block_text: + continue if len(block_text) + len(message_content) >= 4096: # telegram message size limit break message_content += block_text + "\n" @@ -77,13 +81,20 @@ def _get_actions_block(self, finding: Finding, platform_enabled: bool): actions_content = "" if platform_enabled: actions_content += ( - f"[{INVESTIGATE_ICON} Investigate]({finding.get_investigate_uri(self.account_id, self.cluster_name)}) " + self.transformer.link( + f"{INVESTIGATE_ICON} Investigate", + finding.get_investigate_uri(self.account_id, self.cluster_name), + ) + + " " ) if finding.add_silence_url: - actions_content += f"[{SILENCE_ICON} Silence]({finding.get_prometheus_silence_url(self.account_id, self.cluster_name)})" + actions_content += self.transformer.link( + f"{SILENCE_ICON} Silence", + finding.get_prometheus_silence_url(self.account_id, self.cluster_name), + ) for link in finding.links: - actions_content = f"[{link.link_text}]({link.url})" + actions_content += self.transformer.link(link.link_text, link.url) + " " if actions_content: actions_content += "\n\n" @@ -95,10 +106,9 @@ def __is_telegram_text_block(cls, block: BaseBlock) -> bool: # enrichments text tables are too big for mobile device return not (isinstance(block, FileBlock) or isinstance(block, TableBlock)) - @classmethod def __build_telegram_title( - cls, title: str, status: FindingStatus, severity: FindingSeverity, add_silence_url: bool + self, title: str, status: FindingStatus, severity: FindingSeverity, add_silence_url: bool ) -> str: icon = SEVERITY_EMOJI_MAP.get(severity, "") status_str: str = f"{status.to_emoji()} {status.name.lower()} - " if add_silence_url else "" - return f"{status_str}{icon} {severity.name} - *{title}*\n\n" + return f"{status_str}{icon} {severity.name} - {self.transformer.bold(title)}\n\n" diff --git a/src/robusta/core/sinks/telegram/telegram_sink_params.py b/src/robusta/core/sinks/telegram/telegram_sink_params.py index cf1d002b0..936299992 100644 --- a/src/robusta/core/sinks/telegram/telegram_sink_params.py +++ b/src/robusta/core/sinks/telegram/telegram_sink_params.py @@ -1,4 +1,6 @@ -from typing import Union +from typing import Optional, Union + +from pydantic import validator from robusta.core.sinks.sink_base_params import SinkBaseParams from robusta.core.sinks.sink_config import SinkConfigBase @@ -9,11 +11,19 @@ class TelegramSinkParams(SinkBaseParams): chat_id: Union[int, str] thread_id: int = None send_files: bool = True # Change to False, to omit file attachments + parse_mode: Optional[str] = "MarkdownV2" # "MarkdownV2" or None (plain text) @classmethod def _get_sink_type(cls): return "telegram" + @validator("parse_mode") + def validate_parse_mode(cls, value): + allowed = {"MarkdownV2", None} + if value not in allowed: + raise ValueError(f"telegram parse_mode must be one of {allowed}, got {value!r}") + return value + class TelegramSinkConfigWrapper(SinkConfigBase): telegram_sink: TelegramSinkParams diff --git a/src/robusta/core/sinks/telegram/telegram_transformer.py b/src/robusta/core/sinks/telegram/telegram_transformer.py new file mode 100644 index 000000000..434422452 --- /dev/null +++ b/src/robusta/core/sinks/telegram/telegram_transformer.py @@ -0,0 +1,114 @@ +import logging +import re +from typing import Optional + +from robusta.core.reporting.base import BaseBlock +from robusta.core.reporting.blocks import DividerBlock, HeaderBlock, JsonBlock, KubernetesDiffBlock, MarkdownBlock + +MARKDOWNV2 = "MarkdownV2" +_DIVIDER = "-------------------" + +_MARKDOWNV2_SPECIAL_CHARS = r"_*[]()~`>#+-=|{}.!" +_MARKDOWNV2_ESCAPE_RE = re.compile("([" + re.escape(_MARKDOWNV2_SPECIAL_CHARS) + "])") + +# inline token: slack link | github link [text](url) | code `...` | bold *...* +# Extension point: to preserve more MarkdownV2 constructs in body text (italic _..._, +# strikethrough ~...~, spoiler ||...||), add an alternative group here and a matching +# branch in _inline_to_markdownv2. The escape char set itself is fixed by Telegram's +# MarkdownV2 spec and lives in _MARKDOWNV2_SPECIAL_CHARS. +_INLINE_RE = re.compile( + r"<(?P[^|>]+)\|(?P[^>]+)>" + r"|\[(?P[^\]]+)\]\((?P[^)]+)\)" + r"|`(?P[^`]+)`" + r"|\*\*(?P[^*]+)\*\*" # github-style **bold** — must precede the single-* alternative + r"|\*(?P[^*]+)\*" +) + + +def escape_markdownv2(text: str) -> str: + """Escape all Telegram MarkdownV2 special characters in arbitrary text.""" + return _MARKDOWNV2_ESCAPE_RE.sub(r"\\\1", text) + + +def escape_markdownv2_code(text: str) -> str: + """Escape content placed inside a MarkdownV2 code/pre span (only ` and \\).""" + return text.replace("\\", "\\\\").replace("`", "\\`") + + +def escape_markdownv2_url(url: str) -> str: + """Escape content placed inside the (...) of a MarkdownV2 inline link (only ) and \\).""" + return url.replace("\\", "\\\\").replace(")", "\\)") + + +class TelegramTransformer: + """Render Robusta blocks/values as Telegram MarkdownV2 (or plain text when parse_mode is None).""" + + def __init__(self, parse_mode: Optional[str] = MARKDOWNV2): + self.parse_mode = parse_mode + self.markdown = parse_mode == MARKDOWNV2 + + def escape(self, text: str) -> str: + return escape_markdownv2(text) if self.markdown else text + + def bold(self, text: str) -> str: + return f"*{escape_markdownv2(text)}*" if self.markdown else text + + def code(self, text: str) -> str: + return f"`{escape_markdownv2_code(text)}`" if self.markdown else text + + def link(self, text: str, url: str) -> str: + if self.markdown: + return f"[{escape_markdownv2(text)}]({escape_markdownv2_url(url)})" + return f"{text} ({url})" + + def block_to_markdownv2(self, block: BaseBlock) -> str: + if isinstance(block, MarkdownBlock): + return self._inline_to_markdownv2(block.text) if block.text else "" + elif isinstance(block, HeaderBlock): + return self.bold(block.text) + elif isinstance(block, DividerBlock): + return self.escape(_DIVIDER) + elif isinstance(block, JsonBlock): + if self.markdown: + return f"```\n{escape_markdownv2_code(block.json_str)}\n```" + return block.json_str + elif isinstance(block, KubernetesDiffBlock): + lines = [] + for diff in block.diffs: + path = ".".join(diff.path) + lines.append( + f"{self.bold(path)}: {self.escape(str(diff.other_value))} " + f"{self.escape('==>')} {self.escape(str(diff.value))}" + ) + return "\n".join(lines) + else: + logging.debug(f"Unsupported block type ({type(block)}) for telegram MarkdownV2 rendering") + return "" + + def _inline_to_markdownv2(self, text: str) -> str: + if not self.markdown: + # plain text: strip the markers, keep readable link text + text = re.sub(r"<([^|>]+)\|([^>]+)>", r"\2 (\1)", text) + text = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r"\1 (\2)", text) + text = re.sub(r"`([^`]+)`", r"\1", text) + text = re.sub(r"\*\*([^*]+)\*\*", r"\1", text) + text = re.sub(r"\*([^*]+)\*", r"\1", text) + return text + + out = [] + last = 0 + for m in _INLINE_RE.finditer(text): + out.append(escape_markdownv2(text[last : m.start()])) # plain run before token + if m.group("slack_url") is not None: + out.append(self.link(m.group("slack_text"), m.group("slack_url"))) + elif m.group("gh_text") is not None: + out.append(self.link(m.group("gh_text"), m.group("gh_url"))) + elif m.group("code") is not None: + out.append(self.code(m.group("code"))) + elif m.group("bold2") is not None: + out.append(self.bold(m.group("bold2"))) + elif m.group("bold") is not None: + out.append(self.bold(m.group("bold"))) + last = m.end() + out.append(escape_markdownv2(text[last:])) # trailing plain run + return "".join(out) diff --git a/tests/test_telegram_transformer.py b/tests/test_telegram_transformer.py new file mode 100644 index 000000000..a9810f5e6 --- /dev/null +++ b/tests/test_telegram_transformer.py @@ -0,0 +1,198 @@ +from unittest.mock import patch + +import pytest +from pydantic import ValidationError + +from robusta.core.reporting.blocks import DividerBlock, HeaderBlock, JsonBlock, MarkdownBlock +from robusta.core.sinks.telegram.telegram_sink_params import TelegramSinkParams +from robusta.core.sinks.telegram.telegram_transformer import ( + TelegramTransformer, + escape_markdownv2, + escape_markdownv2_code, +) + + +def _params(**kw): + base = dict(name="tg", bot_token="t", chat_id=123) + base.update(kw) + return TelegramSinkParams(**base) + + +def test_escape_markdownv2_underscore_pod_name(): + # the exact repro from issue #1982 + assert escape_markdownv2("crowdsec-agent_k8vkt") == r"crowdsec\-agent\_k8vkt" + + +def test_escape_markdownv2_all_special_chars(): + assert escape_markdownv2("_*[]()~`>#+-=|{}.!") == (r"\_\*\[\]\(\)\~\`\>\#\+\-\=\|\{\}\.\!") + + +def test_escape_markdownv2_plain_text_unchanged(): + assert escape_markdownv2("hello world 123") == "hello world 123" + + +def test_escape_markdownv2_code_only_backtick_and_backslash(): + assert escape_markdownv2_code(r"a`b\c_d*e") == r"a\`b\\c_d*e" + + +def test_header_block_bold_and_escaped(): + t = TelegramTransformer("MarkdownV2") + assert t.block_to_markdownv2(HeaderBlock("pod_x crashed!")) == r"*pod\_x crashed\!*" + + +def test_divider_block_is_safe_literal(): + t = TelegramTransformer("MarkdownV2") + out = t.block_to_markdownv2(DividerBlock()) + assert out # non-empty + # every dash must be backslash-escaped; the only chars in the output are "\" and "-" + assert set(out) == {"\\", "-"} + assert "-" not in out.replace("\\-", "") # no unescaped dash remains + + +def test_json_block_wrapped_in_code_fence(): + t = TelegramTransformer("MarkdownV2") + out = t.block_to_markdownv2(JsonBlock('{"a": 1}')) + assert out.startswith("```") and out.rstrip().endswith("```") + assert '{"a": 1}' in out # inner JSON has no ` or \, so it is unchanged + + +def test_plain_mode_header_no_markers(): + t = TelegramTransformer(None) + assert t.block_to_markdownv2(HeaderBlock("pod_x")) == "pod_x" + + +def test_markdown_block_preserves_bold_escapes_content(): + t = TelegramTransformer("MarkdownV2") + # underscore in surrounding text is escaped; *bold* preserved with escaped inner text + out = t.block_to_markdownv2(MarkdownBlock("pod_x is *down_now*")) + assert out == r"pod\_x is *down\_now*" + + +def test_markdown_block_preserves_code(): + t = TelegramTransformer("MarkdownV2") + out = t.block_to_markdownv2(MarkdownBlock("see `value_1` here")) + assert out == r"see `value_1` here" # inside code, _ is not escaped + + +def test_markdown_block_slack_link(): + t = TelegramTransformer("MarkdownV2") + out = t.block_to_markdownv2(MarkdownBlock("")) + assert out == r"[click\_here](https://x.io/a_b)" + + +def test_markdown_block_github_link(): + t = TelegramTransformer("MarkdownV2") + out = t.block_to_markdownv2(MarkdownBlock("[click_here](https://x.io/a_b)")) + assert out == r"[click\_here](https://x.io/a_b)" + + +def test_link_escapes_closing_paren_in_url(): + t = TelegramTransformer("MarkdownV2") + # a ) inside the URL must be escaped, or it would terminate the link early + assert t.link("docs", "https://x.io/foo(bar)") == r"[docs](https://x.io/foo(bar\))" + + +def test_markdown_block_double_asterisk_bold(): + t = TelegramTransformer("MarkdownV2") + # github-style **bold** becomes MarkdownV2 *bold* with no stray asterisks + assert t.block_to_markdownv2(MarkdownBlock("see **down_now** ok")) == r"see *down\_now* ok" + + +def test_markdown_block_double_asterisk_plain_mode(): + t = TelegramTransformer(None) + assert t.block_to_markdownv2(MarkdownBlock("see **bold** ok")) == "see bold ok" + + +def test_markdown_block_unbalanced_asterisk_does_not_crash(): + t = TelegramTransformer("MarkdownV2") + out = t.block_to_markdownv2(MarkdownBlock("weird * lonely _ marks")) + # lonely markers are escaped, never emitted raw + assert out == r"weird \* lonely \_ marks" + + +def test_markdown_block_plain_mode_strips_markers(): + t = TelegramTransformer(None) + out = t.block_to_markdownv2(MarkdownBlock("pod_x is *down* see ")) + assert out == "pod_x is down see here (https://x.io)" + + +def test_parse_mode_defaults_to_markdownv2(): + assert _params().parse_mode == "MarkdownV2" + + +def test_parse_mode_accepts_none(): + assert _params(parse_mode=None).parse_mode is None + + +def test_parse_mode_rejects_unsupported(): + with pytest.raises(ValidationError): + _params(parse_mode="HTML") + + +def test_client_sends_parse_mode_when_set(): + from robusta.core.sinks.telegram.telegram_client import TelegramClient + + client = TelegramClient(chat_id=1, thread_id=None, bot_token="x", parse_mode="MarkdownV2") + with patch("robusta.core.sinks.telegram.telegram_client.requests.post") as post: + post.return_value.status_code = 200 + client.send_message("hi") + body = post.call_args.kwargs["json"] + assert body["parse_mode"] == "MarkdownV2" + + +def test_client_omits_parse_mode_when_none(): + from robusta.core.sinks.telegram.telegram_client import TelegramClient + + client = TelegramClient(chat_id=1, thread_id=None, bot_token="x", parse_mode=None) + with patch("robusta.core.sinks.telegram.telegram_client.requests.post") as post: + post.return_value.status_code = 200 + client.send_message("hi") + body = post.call_args.kwargs["json"] + assert "parse_mode" not in body + + +def _render_sink_text(parse_mode="MarkdownV2", links=None): + from robusta.core.reporting.base import Finding + from robusta.core.sinks.telegram.telegram_sink import TelegramSink + + with patch.object(TelegramSink, "__init__", lambda self, *a, **k: None): + sink = TelegramSink.__new__(TelegramSink) + sink.transformer = TelegramTransformer(parse_mode) + sink.cluster_name = "prod_cluster" + sink.account_id = "acc" + finding = Finding(title="Pod crowdsec-agent_k8vkt OOMKilled", aggregation_key="OOM") + if links: + finding.links = links + return sink._TelegramSink__get_message_text(finding, platform_enabled=False) + + +def test_sink_escapes_underscore_in_title(): + text = _render_sink_text("MarkdownV2") + assert r"crowdsec\-agent\_k8vkt" in text + assert "crowdsec-agent_k8vkt" not in text # raw form must not leak through + + +def test_sink_plain_mode_has_no_escapes(): + text = _render_sink_text(None) + assert "crowdsec-agent_k8vkt" in text # raw, but plain text never crashes telegram + assert "\\" not in text + + +def test_sink_aggregates_multiple_action_links(): + from robusta.core.reporting.base import Link + + links = [Link(url="https://a.io/1", name="first"), Link(url="https://b.io/2", name="second")] + text = _render_sink_text("MarkdownV2", links=links) + # both links must survive; the old code overwrote and kept only the last one + assert "first" in text and "https://a.io/1" in text + assert "second" in text and "https://b.io/2" in text + + +def test_client_send_message_passes_timeout(): + from robusta.core.sinks.telegram.telegram_client import TelegramClient + + client = TelegramClient(chat_id=1, thread_id=None, bot_token="x") + with patch("robusta.core.sinks.telegram.telegram_client.requests.post") as post: + post.return_value.status_code = 200 + client.send_message("hi") + assert post.call_args.kwargs["timeout"] == 30