Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/configuration/sinks/telegram.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,15 @@ Now we're ready to configure the Telegram sink.
bot_token: <YOUR BOT TOKEN>
chat_id: <CHAT ID>
thread_id: <THREAD ID> # Optional thread (topic) ID
parse_mode: MarkdownV2 # Optional. "MarkdownV2" (default) or null for plain text
.. note::

If you don't want Robusta to send file attachments, set ``send_files`` to ``False`` under your Telegram sink. (True by default)

.. note::

By default, Robusta formats Telegram messages using `MarkdownV2 <https://core.telegram.org/bots/api#markdownv2-style>`_, escaping special characters in your alert content so resource names containing characters like ``_`` (e.g. ``crowdsec-agent_k8vkt``) don't break message formatting. If you'd rather receive unformatted messages, set ``parse_mode`` to ``null`` to send plain text.

Save the file and run

.. code-block:: bash
Expand Down
16 changes: 11 additions & 5 deletions src/robusta/core/sinks/telegram/telegram_client.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
import logging
import os
from typing import Union
from typing import Optional, Union

import requests

from robusta.core.reporting.utils import PNG_SUFFIX, SVG_SUFFIX, convert_svg_to_png, is_image

TELEGRAM_BASE_URL = os.environ.get("TELEGRAM_BASE_URL", "https://api.telegram.org")
# guard the notification path against a slow/unreachable Telegram API hanging the sink
TELEGRAM_REQUEST_TIMEOUT_SECONDS = 30


class TelegramClient:
def __init__(self, chat_id: Union[int, str], thread_id: int, bot_token: str):
def __init__(
self, chat_id: Union[int, str], thread_id: int, bot_token: str, parse_mode: Optional[str] = "MarkdownV2"
):
self.chat_id = int(chat_id)
self.thread_id = thread_id
self.bot_token = bot_token
self.parse_mode = parse_mode

def send_message(self, message: str, disable_links_preview: bool = True):
url = f"{TELEGRAM_BASE_URL}/bot{self.bot_token}/sendMessage"
message_json = {
"chat_id": self.chat_id,
"message_thread_id": self.thread_id,
"disable_web_page_preview": disable_links_preview,
"parse_mode": "Markdown",
"text": message,
}
response = requests.post(url, json=message_json)
if self.parse_mode is not None:
message_json["parse_mode"] = self.parse_mode
response = requests.post(url, json=message_json, timeout=TELEGRAM_REQUEST_TIMEOUT_SECONDS)

if response.status_code != 200:
logging.error(
Expand All @@ -39,7 +45,7 @@ def send_file(self, file_name: str, contents: bytes):
file_name = file_name.replace(SVG_SUFFIX, PNG_SUFFIX)

files = {file_type.lower(): (file_name, contents)}
response = requests.post(url, files=files)
response = requests.post(url, files=files, timeout=TELEGRAM_REQUEST_TIMEOUT_SECONDS)

if response.status_code != 200:
logging.error(
Expand Down
38 changes: 24 additions & 14 deletions src/robusta/core/sinks/telegram/telegram_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from robusta.core.sinks.sink_base import SinkBase
from robusta.core.sinks.telegram.telegram_client import TelegramClient
from robusta.core.sinks.telegram.telegram_sink_params import TelegramSinkConfigWrapper
from robusta.core.sinks.transformer import Transformer
from robusta.core.sinks.telegram.telegram_transformer import TelegramTransformer

SEVERITY_EMOJI_MAP = {
FindingSeverity.INFO: "\U0001F7E2",
Expand All @@ -23,10 +23,10 @@ class TelegramSink(SinkBase):
def __init__(self, sink_config: TelegramSinkConfigWrapper, registry):
super().__init__(sink_config.telegram_sink, registry)

self.client = TelegramClient(
sink_config.telegram_sink.chat_id, sink_config.telegram_sink.thread_id, sink_config.telegram_sink.bot_token
)
self.send_files = sink_config.telegram_sink.send_files
params = sink_config.telegram_sink
self.client = TelegramClient(params.chat_id, params.thread_id, params.bot_token, params.parse_mode)
self.send_files = params.send_files
self.transformer = TelegramTransformer(params.parse_mode)

def write_finding(self, finding: Finding, platform_enabled: bool):
self.__send_telegram_message(finding, platform_enabled)
Expand Down Expand Up @@ -56,17 +56,21 @@ def __get_message_text(self, finding: Finding, platform_enabled: bool):
if actions_content:
message_content += actions_content

blocks = [MarkdownBlock(text=f"*Source:* `{self.cluster_name}`\n\n")]

blocks = []
# first add finding description block
if finding.description:
blocks.append(MarkdownBlock(finding.description))

for enrichment in finding.enrichments:
blocks.extend([block for block in enrichment.blocks if self.__is_telegram_text_block(block)])

source_line = f"{self.transformer.bold('Source:')} {self.transformer.code(self.cluster_name)}\n\n"
message_content += source_line

for block in blocks:
block_text = Transformer.to_standard_markdown([block])
block_text = self.transformer.block_to_markdownv2(block)
if not block_text:
continue
if len(block_text) + len(message_content) >= 4096: # telegram message size limit
break
message_content += block_text + "\n"
Expand All @@ -77,13 +81,20 @@ def _get_actions_block(self, finding: Finding, platform_enabled: bool):
actions_content = ""
if platform_enabled:
actions_content += (
f"[{INVESTIGATE_ICON} Investigate]({finding.get_investigate_uri(self.account_id, self.cluster_name)}) "
self.transformer.link(
f"{INVESTIGATE_ICON} Investigate",
finding.get_investigate_uri(self.account_id, self.cluster_name),
)
+ " "
)
if finding.add_silence_url:
actions_content += f"[{SILENCE_ICON} Silence]({finding.get_prometheus_silence_url(self.account_id, self.cluster_name)})"
actions_content += self.transformer.link(
f"{SILENCE_ICON} Silence",
finding.get_prometheus_silence_url(self.account_id, self.cluster_name),
)

for link in finding.links:
actions_content = f"[{link.link_text}]({link.url})"
actions_content += self.transformer.link(link.link_text, link.url) + " "

if actions_content:
actions_content += "\n\n"
Expand All @@ -95,10 +106,9 @@ def __is_telegram_text_block(cls, block: BaseBlock) -> bool:
# enrichments text tables are too big for mobile device
return not (isinstance(block, FileBlock) or isinstance(block, TableBlock))

@classmethod
def __build_telegram_title(
cls, title: str, status: FindingStatus, severity: FindingSeverity, add_silence_url: bool
self, title: str, status: FindingStatus, severity: FindingSeverity, add_silence_url: bool
) -> str:
icon = SEVERITY_EMOJI_MAP.get(severity, "")
status_str: str = f"{status.to_emoji()} {status.name.lower()} - " if add_silence_url else ""
return f"{status_str}{icon} {severity.name} - *{title}*\n\n"
return f"{status_str}{icon} {severity.name} - {self.transformer.bold(title)}\n\n"
12 changes: 11 additions & 1 deletion src/robusta/core/sinks/telegram/telegram_sink_params.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Union
from typing import Optional, Union

from pydantic import validator

from robusta.core.sinks.sink_base_params import SinkBaseParams
from robusta.core.sinks.sink_config import SinkConfigBase
Expand All @@ -9,11 +11,19 @@ class TelegramSinkParams(SinkBaseParams):
chat_id: Union[int, str]
thread_id: int = None
send_files: bool = True # Change to False, to omit file attachments
parse_mode: Optional[str] = "MarkdownV2" # "MarkdownV2" or None (plain text)

@classmethod
def _get_sink_type(cls):
return "telegram"

@validator("parse_mode")
def validate_parse_mode(cls, value):
allowed = {"MarkdownV2", None}
if value not in allowed:
raise ValueError(f"telegram parse_mode must be one of {allowed}, got {value!r}")
return value


class TelegramSinkConfigWrapper(SinkConfigBase):
telegram_sink: TelegramSinkParams
Expand Down
114 changes: 114 additions & 0 deletions src/robusta/core/sinks/telegram/telegram_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging
import re
from typing import Optional

from robusta.core.reporting.base import BaseBlock
from robusta.core.reporting.blocks import DividerBlock, HeaderBlock, JsonBlock, KubernetesDiffBlock, MarkdownBlock

MARKDOWNV2 = "MarkdownV2"
_DIVIDER = "-------------------"

_MARKDOWNV2_SPECIAL_CHARS = r"_*[]()~`>#+-=|{}.!"
_MARKDOWNV2_ESCAPE_RE = re.compile("([" + re.escape(_MARKDOWNV2_SPECIAL_CHARS) + "])")

# inline token: slack link <url|text> | github link [text](url) | code `...` | bold *...*
# Extension point: to preserve more MarkdownV2 constructs in body text (italic _..._,
# strikethrough ~...~, spoiler ||...||), add an alternative group here and a matching
# branch in _inline_to_markdownv2. The escape char set itself is fixed by Telegram's
# MarkdownV2 spec and lives in _MARKDOWNV2_SPECIAL_CHARS.
_INLINE_RE = re.compile(
r"<(?P<slack_url>[^|>]+)\|(?P<slack_text>[^>]+)>"
r"|\[(?P<gh_text>[^\]]+)\]\((?P<gh_url>[^)]+)\)"
r"|`(?P<code>[^`]+)`"
r"|\*\*(?P<bold2>[^*]+)\*\*" # github-style **bold** — must precede the single-* alternative
r"|\*(?P<bold>[^*]+)\*"
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def escape_markdownv2(text: str) -> str:
"""Escape all Telegram MarkdownV2 special characters in arbitrary text."""
return _MARKDOWNV2_ESCAPE_RE.sub(r"\\\1", text)


def escape_markdownv2_code(text: str) -> str:
"""Escape content placed inside a MarkdownV2 code/pre span (only ` and \\)."""
return text.replace("\\", "\\\\").replace("`", "\\`")


def escape_markdownv2_url(url: str) -> str:
"""Escape content placed inside the (...) of a MarkdownV2 inline link (only ) and \\)."""
return url.replace("\\", "\\\\").replace(")", "\\)")


class TelegramTransformer:
"""Render Robusta blocks/values as Telegram MarkdownV2 (or plain text when parse_mode is None)."""

def __init__(self, parse_mode: Optional[str] = MARKDOWNV2):
self.parse_mode = parse_mode
self.markdown = parse_mode == MARKDOWNV2

def escape(self, text: str) -> str:
return escape_markdownv2(text) if self.markdown else text

def bold(self, text: str) -> str:
return f"*{escape_markdownv2(text)}*" if self.markdown else text

def code(self, text: str) -> str:
return f"`{escape_markdownv2_code(text)}`" if self.markdown else text

def link(self, text: str, url: str) -> str:
if self.markdown:
return f"[{escape_markdownv2(text)}]({escape_markdownv2_url(url)})"
return f"{text} ({url})"

def block_to_markdownv2(self, block: BaseBlock) -> str:
if isinstance(block, MarkdownBlock):
return self._inline_to_markdownv2(block.text) if block.text else ""
elif isinstance(block, HeaderBlock):
return self.bold(block.text)
elif isinstance(block, DividerBlock):
return self.escape(_DIVIDER)
elif isinstance(block, JsonBlock):
if self.markdown:
return f"```\n{escape_markdownv2_code(block.json_str)}\n```"
return block.json_str
elif isinstance(block, KubernetesDiffBlock):
lines = []
for diff in block.diffs:
path = ".".join(diff.path)
lines.append(
f"{self.bold(path)}: {self.escape(str(diff.other_value))} "
f"{self.escape('==>')} {self.escape(str(diff.value))}"
)
return "\n".join(lines)
else:
logging.debug(f"Unsupported block type ({type(block)}) for telegram MarkdownV2 rendering")
return ""

def _inline_to_markdownv2(self, text: str) -> str:
if not self.markdown:
# plain text: strip the markers, keep readable link text
text = re.sub(r"<([^|>]+)\|([^>]+)>", r"\2 (\1)", text)
text = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r"\1 (\2)", text)
text = re.sub(r"`([^`]+)`", r"\1", text)
text = re.sub(r"\*\*([^*]+)\*\*", r"\1", text)
text = re.sub(r"\*([^*]+)\*", r"\1", text)
return text

out = []
last = 0
for m in _INLINE_RE.finditer(text):
out.append(escape_markdownv2(text[last : m.start()])) # plain run before token
if m.group("slack_url") is not None:
out.append(self.link(m.group("slack_text"), m.group("slack_url")))
elif m.group("gh_text") is not None:
out.append(self.link(m.group("gh_text"), m.group("gh_url")))
elif m.group("code") is not None:
out.append(self.code(m.group("code")))
elif m.group("bold2") is not None:
out.append(self.bold(m.group("bold2")))
elif m.group("bold") is not None:
out.append(self.bold(m.group("bold")))
last = m.end()
out.append(escape_markdownv2(text[last:])) # trailing plain run
return "".join(out)
Loading