diff --git a/SUPPORT_MATRIX.md b/SUPPORT_MATRIX.md index 6faffe13ea..f00bf5bd47 100644 --- a/SUPPORT_MATRIX.md +++ b/SUPPORT_MATRIX.md @@ -65,6 +65,7 @@ through language-specific library instrumentation documented later in this file. | gRPC | `1.0+` | All | Yes | No | Long-lived connections started before OBI may use `*` for method names | | MySQL | All | All | Yes | No | Prepared statements created before OBI started may miss query text | | PostgreSQL | All | All | Yes | No | Prepared statements created before OBI started may miss query text | +| MSSQL | All | All | Yes | No | Prepared statements created before OBI started may miss query text | | Redis | All | All | Yes | No | Existing connections may miss database number and `db.namespace` | | MongoDB | `5.0+` | `insert`, `update`, `find`, `delete`, `findAndModify`, `aggregate`, `count`, `distinct`, `mapReduce` | Yes | No | No support for compressed payloads | | Couchbase | All | All | Yes | No | Bucket or collection may be unknown if negotiation happened before OBI started | diff --git a/bpf/common/connection_info.h b/bpf/common/connection_info.h index 25f57d90e7..1fb58577a5 100644 --- a/bpf/common/connection_info.h +++ b/bpf/common/connection_info.h @@ -21,6 +21,7 @@ enum protocol_type : u8 { k_protocol_type_http = 3, k_protocol_type_kafka = 4, k_protocol_type_mqtt = 5, + k_protocol_type_mssql = 6, }; // Struct to keep information on the connections in flight diff --git a/bpf/common/large_buffers.h b/bpf/common/large_buffers.h index 778f25ceb6..eaaf4efce4 100644 --- a/bpf/common/large_buffers.h +++ b/bpf/common/large_buffers.h @@ -9,6 +9,7 @@ volatile const u32 http_max_captured_bytes = 0; volatile const u32 mysql_max_captured_bytes = 0; volatile const u32 postgres_max_captured_bytes = 0; volatile const u32 kafka_max_captured_bytes = 0; +volatile const u32 mssql_max_captured_bytes = 0; enum { // Maximum payload size per ring buffer chunk. @@ -26,6 +27,7 @@ enum { k_large_buf_max_mysql_captured_bytes = 1 << 16, k_large_buf_max_postgres_captured_bytes = 1 << 16, k_large_buf_max_kafka_captured_bytes = 1 << 16, + k_large_buf_max_mssql_captured_bytes = 1 << 16, }; SCRATCH_MEM_SIZED(tcp_large_buffers, k_large_buf_max_size); diff --git a/bpf/generictracer/k_tracer.c b/bpf/generictracer/k_tracer.c index 9dae7b4c5e..b715838f86 100644 --- a/bpf/generictracer/k_tracer.c +++ b/bpf/generictracer/k_tracer.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include diff --git a/bpf/generictracer/protocol_handler.c b/bpf/generictracer/protocol_handler.c index 6c054db05c..a0979c497b 100644 --- a/bpf/generictracer/protocol_handler.c +++ b/bpf/generictracer/protocol_handler.c @@ -74,6 +74,12 @@ int obi_handle_buf_with_args(void *ctx) { &args->protocol_type)) { bpf_dbg_printk("Found postgres connection"); bpf_tail_call(ctx, &jump_table, k_tail_protocol_tcp); + } else if (args->protocols.tcp && is_mssql(&args->pid_conn.conn, + (const unsigned char *)args->u_buf, + args->bytes_len, + &args->protocol_type)) { + bpf_dbg_printk("Found mssql connection"); + bpf_tail_call(ctx, &jump_table, k_tail_protocol_tcp); } else if (args->protocols.tcp && is_kafka(&args->pid_conn.conn, (const unsigned char *)args->u_buf, args->bytes_len, diff --git a/bpf/generictracer/protocol_mssql.h b/bpf/generictracer/protocol_mssql.h new file mode 100644 index 0000000000..3de11f5ea8 --- /dev/null +++ b/bpf/generictracer/protocol_mssql.h @@ -0,0 +1,219 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +// TDS Packet Header +// https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/7af53667-1b72-4703-8258-7984e838f746 +struct mssql_hdr { + u8 type; + u8 status; + u16 length; // big-endian + u16 spid; // big-endian + u8 packet_id; + u8 window; +}; + +enum { + // TDS header + k_mssql_hdr_size = 8, + k_mssql_messages_in_packet_max = 10, + + // TDS status bits + k_mssql_status_eom = 0x01, // End Of Message + + // TDS message types + k_mssql_msg_sql_batch = 0x01, + k_mssql_msg_rpc = 0x03, + k_mssql_msg_response = 0x04, + k_mssql_msg_login7 = 0x10, + k_mssql_msg_prelogin = 0x12, +}; + +static __always_inline struct mssql_hdr mssql_parse_hdr(const unsigned char *data) { + struct mssql_hdr hdr = {}; + + bpf_probe_read(&hdr, sizeof(hdr), (const void *)data); + + // Length and SPID are big-endian + hdr.length = bpf_ntohs(hdr.length); + hdr.spid = bpf_ntohs(hdr.spid); + + return hdr; +} + +static __always_inline u8 is_mssql(connection_info_t *conn_info, + const unsigned char *data, + u32 data_len, + enum protocol_type *protocol_type) { + if (*protocol_type != k_protocol_type_mssql && *protocol_type != k_protocol_type_unknown) { + // Already classified, not mssql. + return 0; + } + + if (data_len < k_mssql_hdr_size) { + return 0; + } + + size_t offset = 0; + bool includes_known_command = false; + + for (u8 i = 0; i < k_mssql_messages_in_packet_max; i++) { + if (offset + k_mssql_hdr_size > data_len) { + break; + } + + struct mssql_hdr hdr = mssql_parse_hdr(data + offset); + + if (hdr.length < k_mssql_hdr_size || hdr.length > data_len - offset) { + return 0; + } + + switch (hdr.type) { + case k_mssql_msg_sql_batch: + case k_mssql_msg_rpc: + case k_mssql_msg_response: + case k_mssql_msg_login7: + case k_mssql_msg_prelogin: + includes_known_command = true; + break; + default: + break; + } + + offset += hdr.length; + } + + if (offset != data_len || !includes_known_command) { + return 0; + } + + *protocol_type = k_protocol_type_mssql; + bpf_map_update_elem(&protocol_cache, conn_info, protocol_type, BPF_ANY); + + return 1; +} + +// Emit a large buffer event for MSSQL protocol. +// The return value is used to control the flow for this specific protocol. +// -1: wait additional data; 0: continue, regardless of errors. +static __always_inline int mssql_send_large_buffer(tcp_req_t *req, + const void *u_buf, + u32 bytes_len, + u8 packet_type, + u8 direction, + enum large_buf_action action) { + if (mssql_max_captured_bytes > k_large_buf_max_mssql_captured_bytes) { + bpf_dbg_printk("BUG: mssql_max_captured_bytes exceeds maximum allowed value."); + } + + if (packet_type == PACKET_TYPE_RESPONSE && req->resp_len == 0 && + bytes_len >= k_mssql_hdr_size) { + bpf_probe_read(req->rbuf, k_mssql_hdr_size, u_buf); + } + + const u32 bytes_sent = + packet_type == PACKET_TYPE_REQUEST ? req->lb_req_bytes : req->lb_res_bytes; + + if (mssql_max_captured_bytes > 0 && bytes_sent < mssql_max_captured_bytes && bytes_len > 0) { + tcp_large_buffer_t *large_buf = (tcp_large_buffer_t *)tcp_large_buffers_mem(); + if (!large_buf) { + bpf_dbg_printk( + "mssql_send_large_buffer: failed to reserve space for MSSQL large buffer"); + } else { + large_buf->type = EVENT_TCP_LARGE_BUFFER; + large_buf->packet_type = packet_type; + large_buf->action = action; + large_buf->direction = direction; + large_buf->conn_info = req->conn_info; + large_buf->tp = req->tp; + + u32 max_available_bytes = mssql_max_captured_bytes - bytes_sent; + bpf_clamp_umax(max_available_bytes, k_large_buf_max_mssql_captured_bytes); + + const u32 available_bytes = min(bytes_len, max_available_bytes); + const u32 consumed_bytes = large_buf_emit_chunks(large_buf, u_buf, available_bytes); + + if (packet_type == PACKET_TYPE_REQUEST) { + req->lb_req_bytes += consumed_bytes; + } else { + req->lb_res_bytes += consumed_bytes; + } + + if (consumed_bytes > 0) { + req->has_large_buffers = true; + } + } + } + + if (packet_type == PACKET_TYPE_RESPONSE) { + req->resp_len += bytes_len; + + // Scan complete TDS packets in the current recv buffer for the EOM bit. + // A response may arrive as: (a) one or more complete packets in a single + // recv, or (b) a single packet split across multiple recvs (header first, + // then payload). Handle both by falling back to accumulated-length tracking + // when the current buffer does not start at a TDS packet boundary. + bool eom = false; + bool found_complete_packet = false; + u32 offset = 0; + for (u8 i = 0; i < k_mssql_messages_in_packet_max; i++) { + if (offset + k_mssql_hdr_size > bytes_len) { + break; + } + const struct mssql_hdr hdr = mssql_parse_hdr((const unsigned char *)u_buf + offset); + if (hdr.length < k_mssql_hdr_size || offset + hdr.length > bytes_len) { + break; + } + found_complete_packet = true; + if (hdr.status & k_mssql_status_eom) { + eom = true; + } + offset += hdr.length; + } + + if (eom) { + // EOM found: all response packets received. + } else if (found_complete_packet) { + // Complete packets present but no EOM: more packets expected. + bpf_dbg_printk("mssql response: waiting for EOM, acc=%d", req->resp_len); + return -1; + } else { + // Could not parse a complete TDS packet (partial recv or payload + // continuation). Fall back to length tracking using the saved header. + if (req->resp_len < k_mssql_hdr_size) { + return -1; + } + const struct mssql_hdr first_hdr = mssql_parse_hdr(req->rbuf); + if (first_hdr.length >= k_mssql_hdr_size) { + const u32 prev_resp_len = req->resp_len - bytes_len; + // Wait while packet 1 is still completing, or if it just finished + // in this recv with no EOM (more packets follow; the next recv + // will start at a TDS boundary where the main loop detects EOM). + if (prev_resp_len < first_hdr.length && + (req->resp_len < first_hdr.length || + !(first_hdr.status & k_mssql_status_eom))) { + bpf_dbg_printk( + "mssql response: partial, acc=%d exp=%d", req->resp_len, first_hdr.length); + return -1; + } + } + } + } + + return 0; +} diff --git a/bpf/generictracer/protocol_tcp.h b/bpf/generictracer/protocol_tcp.h index e7d8999e31..81a9cf06a6 100644 --- a/bpf/generictracer/protocol_tcp.h +++ b/bpf/generictracer/protocol_tcp.h @@ -23,6 +23,7 @@ #include #include #include +#include #include @@ -161,6 +162,8 @@ static __always_inline int tcp_send_large_buffer(tcp_req_t *req, return postgres_send_large_buffer(req, u_buf, bytes_len, packet_type, direction, action); case k_protocol_type_kafka: return kafka_send_large_buffer(req, pid_conn, u_buf, bytes_len, direction, action); + case k_protocol_type_mssql: + return mssql_send_large_buffer(req, u_buf, bytes_len, packet_type, direction, action); case k_protocol_type_http: case k_protocol_type_mqtt: case k_protocol_type_unknown: @@ -289,13 +292,11 @@ static __always_inline void handle_unknown_tcp_connection(pid_connection_info_t } } else if (existing->direction != direction) { existing->is_server = is_server; - if (tcp_send_large_buffer(existing, - pid_conn, - u_buf, - bytes_len, - direction, - protocol_type, - k_large_buf_action_init) < 0) { + const enum large_buf_action response_action = + (existing->lb_res_bytes > 0) ? k_large_buf_action_append : k_large_buf_action_init; + if (tcp_send_large_buffer( + existing, pid_conn, u_buf, bytes_len, direction, protocol_type, response_action) < + 0) { bpf_dbg_printk("waiting additional response data"); return; } diff --git a/devdocs/features.md b/devdocs/features.md index 2380dc25fb..c074005653 100644 --- a/devdocs/features.md +++ b/devdocs/features.md @@ -10,6 +10,7 @@ through language-specific library instrumentation documented later in this file. | gRPC | All | 1.0+ | All | Yes | No | Can't get method for long living connections before OBI started, will mark method with `*` | MySQL | All | All | All | Yes | No | In the case of prepared statements, if the statement was prepared before OBI started then the query might be missed | PostgreSQL | All | All | All | Yes | No | In the case of prepared statements, if the statement was prepared before OBI started then the query might be missed +| MSSQL | All | All | All | Yes | No | In the case of prepared statements, if the statement was prepared before OBI started then the query might be missed | Redis | All | All | All | Yes | No | For already started connections, can't infer the number of the database, and won't add the `db.namespace` attribute | MongoDB | All | 5.0+ | insert, update, find, delete, findAndModify, aggregate, count, distinct, mapReduce | Yes | No | no support for compressed payloads | Couchbase | All | All | All | Yes | No | Bucket unknown if SELECT_BUCKET occurred before OBI started; Collection unknown if GET_COLLECTION_ID occurred before OBI started @@ -66,8 +67,9 @@ Large payloads are streamed to userspace across multiple ring-buffer events and | `OTEL_EBPF_BPF_BUFFER_SIZE_MYSQL` | MySQL | 65535 | 0 (disabled) | | `OTEL_EBPF_BPF_BUFFER_SIZE_KAFKA` | Kafka | 65535 | 0 (disabled) | | `OTEL_EBPF_BPF_BUFFER_SIZE_POSTGRES` | PostgreSQL | 65535 | 0 (disabled) | +| `OTEL_EBPF_BPF_BUFFER_SIZE_MSSQL` | MSSQL | 65535 | 0 (disabled) | -Equivalent YAML keys live under `ebpf.buffer_sizes.{http,mysql,kafka,postgres}`. +Equivalent YAML keys live under `ebpf.buffer_sizes.{http,mysql,kafka,postgres,mssql}`. ## GPU Instrumentation diff --git a/docs/config-schema.json b/docs/config-schema.json index f11de799b5..da44ad7fc7 100644 --- a/docs/config-schema.json +++ b/docs/config-schema.json @@ -341,6 +341,10 @@ "type": "integer", "x-env-var": "OTEL_EBPF_BPF_BUFFER_SIZE_KAFKA" }, + "mssql": { + "type": "integer", + "x-env-var": "OTEL_EBPF_BPF_BUFFER_SIZE_MSSQL" + }, "mysql": { "type": "integer", "x-env-var": "OTEL_EBPF_BPF_BUFFER_SIZE_MYSQL" @@ -475,6 +479,11 @@ "description": "MongoDB requests cache size.", "x-env-var": "OTEL_EBPF_BPF_MONGO_REQUESTS_CACHE_SIZE" }, + "mssql_prepared_statements_cache_size": { + "type": "integer", + "description": "MSSQL prepared statements cache size.", + "x-env-var": "OTEL_EBPF_BPF_MSSQL_PREPARED_STATEMENTS_CACHE_SIZE" + }, "mysql_prepared_statements_cache_size": { "type": "integer", "description": "MySQL prepared statements cache size.", diff --git a/internal/test/integration/components/mssqldb/Dockerfile b/internal/test/integration/components/mssqldb/Dockerfile new file mode 100644 index 0000000000..62434fcc12 --- /dev/null +++ b/internal/test/integration/components/mssqldb/Dockerfile @@ -0,0 +1,24 @@ +FROM mcr.microsoft.com/mssql/server@sha256:49b45a911dc535e9345fbfd7101a1bd8a1e190a5f29b877ef75387a061e5fcf0 + +USER root + +# Create a directory for our initialization +COPY setup.sql /setup.sql +RUN chown mssql /setup.sql + +# Set required environment variables for the build-time run +ENV ACCEPT_EULA=Y +ENV MSSQL_SA_PASSWORD=p_ssW0rd + +# Switch to mssql user for the initialization run +USER mssql + +# Start SQL Server, wait for it to be ready, run the setup script, and then shut down. +RUN /opt/mssql/bin/sqlservr & \ + bash -c "until /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P \"$MSSQL_SA_PASSWORD\" -C -Q 'SELECT 1' &> /dev/null; do sleep 2; echo 'Waiting for SQL Server...'; done" && \ + /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P "$MSSQL_SA_PASSWORD" -C -i /setup.sql && \ + pkill sqlservr && \ + sleep 5 + +# Use the default entrypoint +ENTRYPOINT ["/opt/mssql/bin/sqlservr"] \ No newline at end of file diff --git a/internal/test/integration/components/mssqldb/setup.sql b/internal/test/integration/components/mssqldb/setup.sql new file mode 100644 index 0000000000..e423d0d3b4 --- /dev/null +++ b/internal/test/integration/components/mssqldb/setup.sql @@ -0,0 +1,31 @@ +CREATE DATABASE testdb; +GO +USE testdb; +GO +CREATE TABLE actor ( + actor_id INT PRIMARY KEY, + first_name VARCHAR(50), + last_name VARCHAR(50) +); +GO +INSERT INTO actor (actor_id, first_name, last_name) VALUES (1, 'TOM', 'CRUISE'); +GO + +-- bulk_actor has 50 rows with padded names so the SELECT * response exceeds the +-- default 4096-byte TDS packet boundary and forces a multi-packet server response. +CREATE TABLE bulk_actor ( + actor_id INT PRIMARY KEY, + first_name VARCHAR(50), + last_name VARCHAR(50) +); +GO +DECLARE @i INT = 1; +WHILE @i <= 50 +BEGIN + INSERT INTO bulk_actor (actor_id, first_name, last_name) + VALUES (@i, + LEFT(REPLICATE('A', 48), 48), + LEFT(REPLICATE('B', 48), 48)); + SET @i = @i + 1; +END; +GO diff --git a/internal/test/integration/components/pythonsql/Dockerfile_mssql b/internal/test/integration/components/pythonsql/Dockerfile_mssql new file mode 100644 index 0000000000..d21b3fe5e9 --- /dev/null +++ b/internal/test/integration/components/pythonsql/Dockerfile_mssql @@ -0,0 +1,7 @@ +# Dockerfile that will build a container that runs python with FastAPI and uvicorn on port 8080 +FROM python:3.14@sha256:61346539f7b26521a230e72c11da5ebd872924745074b19736e7d65ba748c366 +EXPOSE 8080 +COPY requirements.txt /requirements.txt +RUN pip install --require-hashes -r /requirements.txt +COPY main_mssql.py /main.py +CMD ["uvicorn", "--port", "8080", "--host", "0.0.0.0", "main:app"] diff --git a/internal/test/integration/components/pythonsql/main_mssql.py b/internal/test/integration/components/pythonsql/main_mssql.py new file mode 100644 index 0000000000..bac885509e --- /dev/null +++ b/internal/test/integration/components/pythonsql/main_mssql.py @@ -0,0 +1,78 @@ +from fastapi import FastAPI +import os +import uvicorn +import pymssql + +app = FastAPI() + +conn = None + +def get_conn(): + global conn + if conn is None: + conn = pymssql.connect( + server="sqlserver", + user="sa", + password="p_ssW0rd", + database="testdb", + autocommit=True + ) + return conn + +@app.get("/query") +async def root(): + c = get_conn() + cur = c.cursor() + cur.execute("SELECT * FROM actor WHERE actor_id=1") + row = cur.fetchone() + cur.close() + return row + +@app.get("/argquery") +async def argquery(): + c = get_conn() + cur = c.cursor() + # pymssql uses %s or %d for placeholders + cur.execute("SELECT * FROM actor WHERE actor_id=%d", (1,)) + row = cur.fetchone() + cur.close() + return row + +@app.get("/prepquery") +async def prepquery(): + c = get_conn() + cur = c.cursor() + # Parameterized query intended to exercise prepared statement handling + # Uses the same pattern as /argquery + sql = "SELECT * FROM actor WHERE actor_id = %d" + cur.execute(sql, (1,)) + row = cur.fetchone() + cur.close() + return row + +@app.get("/largeresult") +async def largeresult(): + c = get_conn() + cur = c.cursor() + # Returns 50 rows with 48-char padded names. The total response exceeds the + # default 4096-byte TDS packet size, forcing a multi-packet server response. + cur.execute("SELECT * FROM bulk_actor") + rows = cur.fetchall() + cur.close() + return {"count": len(rows)} + +@app.get("/error") +async def error(): + c = get_conn() + cur = c.cursor() + try: + cur.execute("SELECT * FROM obi.nonexisting") + except Exception as e: + pass + finally: + cur.close() + return "" + +if __name__ == "__main__": + print(f"Server running: port={8080} process_id={os.getpid()}") + uvicorn.run(app, host="0.0.0.0", port=8080) diff --git a/internal/test/integration/components/pythonsql/requirements.in b/internal/test/integration/components/pythonsql/requirements.in index a5da60047b..705b916fdf 100644 --- a/internal/test/integration/components/pythonsql/requirements.in +++ b/internal/test/integration/components/pythonsql/requirements.in @@ -3,3 +3,4 @@ uvicorn==0.41.0 gunicorn==25.1.0 psycopg==3.2.9 mysql-connector-python==9.5.0 +pymssql==2.3.13 \ No newline at end of file diff --git a/internal/test/integration/components/pythonsql/requirements.txt b/internal/test/integration/components/pythonsql/requirements.txt index ddb462dd98..d40ba8b999 100644 --- a/internal/test/integration/components/pythonsql/requirements.txt +++ b/internal/test/integration/components/pythonsql/requirements.txt @@ -196,6 +196,51 @@ pydantic-core==2.41.5 \ --hash=sha256:f41eb9797986d6ebac5e8edff36d5cef9de40def462311b3eb3eeded1431e425 \ --hash=sha256:f547144f2966e1e16ae626d8ce72b4cfa0caedc7fa28052001c94fb2fcaa1c52 # via pydantic +pymssql==2.3.13 \ + --hash=sha256:0a7e6431925572bc75fb47929ae8ca5b0aac26abfe8b98d4c08daf117b5657f1 \ + --hash=sha256:0fddd24efe9d18bbf174fab7c6745b0927773718387f5517cf8082241f721a68 \ + --hash=sha256:123c55ee41bc7a82c76db12e2eb189b50d0d7a11222b4f8789206d1cda3b33b9 \ + --hash=sha256:1493f63d213607f708a5722aa230776ada726ccdb94097fab090a1717a2534e0 \ + --hash=sha256:152be40c0d7f5e4b1323f7728b0a01f3ee0082190cfbadf84b2c2e930d57e00e \ + --hash=sha256:16c5957a3c9e51a03276bfd76a22431e2bc4c565e2e95f2cbb3559312edda230 \ + --hash=sha256:17942dc9474693ab2229a8a6013e5b9cb1312a5251207552141bb85fcce8c131 \ + --hash=sha256:1c6d0b2d7961f159a07e4f0d8cc81f70ceab83f5e7fd1e832a2d069e1d67ee4e \ + --hash=sha256:2137e904b1a65546be4ccb96730a391fcd5a85aab8a0632721feb5d7e39cfbce \ + --hash=sha256:2b056eb175955f7fb715b60dc1c0c624969f4d24dbdcf804b41ab1e640a2b131 \ + --hash=sha256:30918bb044242865c01838909777ef5e0f1b9ecd7f5882346aefa57f4414b29c \ + --hash=sha256:319810b89aa64b99d9c5c01518752c813938df230496fa2c4c6dda0603f04c4c \ + --hash=sha256:476f6f06b2ae5dfbfa0b169a6ecdd0d9ddfedb07f2d6dc97d2dd630ff2d6789a \ + --hash=sha256:48631c7b9fd14a1bd5675c521b6082590bf700b7961c65638d237817b3fde735 \ + --hash=sha256:4aa18944a121f996178e26cadc598abdbf73759f03dc3cd74263fdab1b28cd96 \ + --hash=sha256:4b834c34e7600369eee7bc877948b53eb0fe6f3689f0888d005ae47dd53c0a66 \ + --hash=sha256:4d87237500def5f743a52e415cd369d632907212154fcc7b4e13f264b4e30021 \ + --hash=sha256:51e42c5defc3667f0803c7ade85db0e6f24b9a1c5a18fcdfa2d09c36bff9b065 \ + --hash=sha256:5c045c0f1977a679cc30d5acd9da3f8aeb2dc6e744895b26444b4a2f20dad9a0 \ + --hash=sha256:5c2e55b6513f9c5a2f58543233ed40baaa7f91c79e64a5f961ea3fc57a700b80 \ + --hash=sha256:612ac062027d2118879f11a5986e9d9d82d07ca3545bb98c93200b68826ea687 \ + --hash=sha256:6a6c0783d97f57133573a03aad3017917dbdf7831a65e0d84ccf2a85e183ca66 \ + --hash=sha256:79c759db6e991eeae473b000c2e0a7fb8da799b2da469fe5a10d30916315e0b5 \ + --hash=sha256:7d7037d2b5b907acc7906d0479924db2935a70c720450c41339146a4ada2b93d \ + --hash=sha256:7f9b1d5aef2b5f47a7f9d9733caee4d66772681e8f798a0f5e4739a8bdab408c \ + --hash=sha256:8d66ce0a249d2e3b57369048d71e1f00d08dfb90a758d134da0250ae7bc739c1 \ + --hash=sha256:910404e0ec85c4cc7c633ec3df9b04a35f23bb74a844dd377a387026ae635e3a \ + --hash=sha256:a930adda87bdd8351a5637cf73d6491936f34e525a5e513068a6eac742f69cdb \ + --hash=sha256:aa5e07eff7e6e8bd4ba22c30e4cb8dd073e138cd272090603609a15cc5dbc75b \ + --hash=sha256:b0af51904764811da0bfe4b057b1d72dee11a399ce9ed5770875162772740c8a \ + --hash=sha256:c0ea72641cb0f8bce7ad8565dbdbda4a7437aa58bce045f2a3a788d71af2e4be \ + --hash=sha256:c690f1869dadbf4201b7f51317fceff6e5d8f5175cec6a4a813e06b0dca2d6ed \ + --hash=sha256:cf4f32b4a05b66f02cb7d55a0f3bcb0574a6f8cf0bee4bea6f7b104038364733 \ + --hash=sha256:d663c908414a6a032f04d17628138b1782af916afc0df9fefac4751fa394c3ac \ + --hash=sha256:d94da3a55545c5b6926cb4d1c6469396f0ae32ad5d6932c513f7a0bf569b4799 \ + --hash=sha256:db77da1a3fc9b5b5c5400639d79d7658ba7ad620957100c5b025be608b562193 \ + --hash=sha256:e053b443e842f9e1698fcb2b23a4bff1ff3d410894d880064e754ad823d541e5 \ + --hash=sha256:e7c31f192da9d30f0e03ad99e548120a8740a675302e2f04fa8c929f7cbee771 \ + --hash=sha256:eb3275985c23479e952d6462ae6c8b2b6993ab6b99a92805a9c17942cf3d5b3d \ + --hash=sha256:f1897c1b767cc143e77d285123ae5fd4fa7379a1bfec5c515d38826caf084eb6 \ + --hash=sha256:f5d995a80996235ed32102a93067ce6a7143cce3bfd4e5042bf600020fc08456 \ + --hash=sha256:fc5482969c813b0a45ce51c41844ae5bfa8044ad5ef8b4820ef6de7d4545b7f2 \ + --hash=sha256:ff5be7ab1d643dbce2ee3424d2ef9ae8e4146cf75bd20946bc7a6108e3ad1e47 + # via -r requirements.in starlette==0.52.1 \ --hash=sha256:0029d43eb3d273bc4f83a08720b4912ea4b071087a3b48db01b7c839f7954d74 \ --hash=sha256:834edd1b0a23167694292e94f597773bc3f89f362be6effee198165a35d62933 diff --git a/internal/test/integration/docker-compose-python-mssql.yml b/internal/test/integration/docker-compose-python-mssql.yml new file mode 100644 index 0000000000..e519548198 --- /dev/null +++ b/internal/test/integration/docker-compose-python-mssql.yml @@ -0,0 +1,107 @@ +services: + sqlserver: + build: + context: ../../../internal/test/integration/components/mssqldb + dockerfile: Dockerfile + image: mssql + environment: + ACCEPT_EULA: "Y" + MSSQL_SA_PASSWORD: "p_ssW0rd" + ports: + - "1433:1433" + + testserver: + build: + context: ../../../internal/test/integration/components/pythonsql/ + dockerfile: Dockerfile_mssql + image: hatest-testserver-python-mssql + ports: + - "${TEST_SERVICE_PORTS}" + depends_on: + otelcol: + condition: service_started + sqlserver: + condition: service_started + + obi: + build: + context: ../../.. + dockerfile: ./internal/test/integration/components/obi/Dockerfile + volumes: + - ./configs/:/configs + - ./system/sys/kernel/security:/sys/kernel/security + - ../../../testoutput:/coverage + - ../../../testoutput/run-python-mssql:/var/run/obi + image: hatest-obi + privileged: true # in some environments (not GH Pull Requests) you can set it to false and then cap_add: [ SYS_ADMIN ] + network_mode: "service:testserver" + pid: "service:testserver" + environment: + OTEL_EBPF_CONFIG_PATH: "/configs/obi-config.yml" + GOCOVERDIR: "/coverage" + OTEL_EBPF_TRACE_PRINTER: "text" + OTEL_EBPF_OPEN_PORT: "${OTEL_EBPF_OPEN_PORT}" + OTEL_EBPF_DISCOVERY_POLL_INTERVAL: 500ms + OTEL_EBPF_EXECUTABLE_PATH: "${OTEL_EBPF_EXECUTABLE_PATH}" + OTEL_EBPF_SERVICE_NAMESPACE: "integration-test" + OTEL_EBPF_OTLP_TRACES_BATCH_TIMEOUT: "1ms" + OTEL_EBPF_METRICS_INTERVAL: "10ms" + OTEL_EBPF_BPF_BATCH_TIMEOUT: "10ms" + OTEL_EBPF_LOG_LEVEL: "DEBUG" + OTEL_EBPF_BPF_DEBUG: "TRUE" + OTEL_EBPF_HOSTNAME: "obi" + OTEL_EBPF_BPF_HTTP_REQUEST_TIMEOUT: "5s" + OTEL_EBPF_PROCESSES_INTERVAL: "100ms" + OTEL_EBPF_TRACES_INSTRUMENTATIONS: "sql" + OTEL_EBPF_METRICS_INSTRUMENTATIONS: "sql" + OTEL_EBPF_METRICS_FEATURES: "application" + OTEL_EBPF_BPF_BUFFER_SIZE_MSSQL: 8192 + depends_on: + testserver: + condition: service_started + + # OpenTelemetry Collector + otelcol: + image: otel/opentelemetry-collector-contrib:0.149.0@sha256:0fba96233274f6d665ac8831ad99dfe6479a9a20459f6e2719c0d20945773b46 + container_name: otel-col + deploy: + resources: + limits: + memory: 125M + restart: unless-stopped + command: ["--config=/etc/otelcol-config/otelcol-config.yml"] + volumes: + - ./configs/:/etc/otelcol-config + ports: + - "4317" # OTLP over gRPC receiver + - "4318:4318" # OTLP over HTTP receiver + - "9464" # Prometheus exporter + - "8888" # metrics endpoint + depends_on: + prometheus: + condition: service_started + + # Prometheus + prometheus: + image: quay.io/prometheus/prometheus:v3.10.0@sha256:7571a304e67fbd794be02422b13627dc7de822152f74e99e2bef95d29eceecde + container_name: prometheus + command: + - --config.file=/etc/prometheus/prometheus-config.yml + - --web.enable-lifecycle + - --web.route-prefix=/ + volumes: + - ./configs/:/etc/prometheus + ports: + - "9090:9090" + + jaeger: + image: jaegertracing/all-in-one:1.60@sha256:4fd2d70fa347d6a47e79fcb06b1c177e6079f92cba88b083153d56263082135e + ports: + - "16686:16686" # Query frontend + - "4317" # OTEL GRPC traces collector + - "4318" # OTEL HTTP traces collector + environment: + - COLLECTOR_OTLP_ENABLED=true + - LOG_LEVEL=debug +# curl http://localhost:16686/api/services +# curl http://localhost:16686/api/traces?service=testserver diff --git a/internal/test/integration/red_test_python_sql.go b/internal/test/integration/red_test_python_sql.go index 652d0aa593..8eaa05ea47 100644 --- a/internal/test/integration/red_test_python_sql.go +++ b/internal/test/integration/red_test_python_sql.go @@ -76,7 +76,7 @@ func assertSQLOperation(t *testing.T, comm, op, table, db string) { var tq jaeger.TracesQuery require.NoError(ct, json.NewDecoder(resp.Body).Decode(&tq)) traces := tq.FindBySpan(jaeger.Tag{Key: "db.operation.name", Type: "string", Value: op}) - assert.GreaterOrEqual(ct, len(traces), 1) + require.NotEmpty(ct, traces) lastTrace := traces[len(traces)-1] span := lastTrace.Spans[0] @@ -115,6 +115,11 @@ func assertSQLOperationErrored(t *testing.T, comm, op, table, db string) { "error.type": "42P01", "otel.status_description": "SQL Server errored for command 'COM_QUERY': error_code=NA sql_state=42P01 message=relation \"obi.nonexisting\" does not exist", }, + "microsoft.sql_server": { + "db.response.status_code": "208", + "error.type": "1", + "otel.status_description": "SQL Server errored for command 'COM_SQL_BATCH': error_code=208 sql_state=1 message=Invalid object name 'obi.nonexisting'.", + }, } params := neturl.Values{} @@ -341,3 +346,26 @@ func testREDMetricsPythonSQLSSL(t *testing.T) { }) } } + +func testPythonSQLMultiPacketResponse(t *testing.T, comm, url, table, db string) { + t.Helper() + + ti.DoHTTPGet(t, url+"/largeresult", 200) + + assertSQLOperation(t, comm, "SELECT", table, db) +} + +func testPythonMSSQL(t *testing.T) { + testCaseURL := "http://localhost:8381" + comm := "python3.14" + table := "actor" + db := "microsoft.sql_server" + + waitForSQLTestComponentsWithDB(t, testCaseURL, "/query", db) + + assertHTTPRequests(t, comm, "/query") + testPythonSQLQuery(t, comm, testCaseURL, table, db) + testPythonSQLPreparedStatements(t, comm, testCaseURL, table, db) + testPythonSQLError(t, comm, testCaseURL, db) + testPythonSQLMultiPacketResponse(t, comm, testCaseURL, "bulk_actor", db) +} diff --git a/internal/test/integration/suites_test.go b/internal/test/integration/suites_test.go index 74a6c6a618..5a475a212a 100644 --- a/internal/test/integration/suites_test.go +++ b/internal/test/integration/suites_test.go @@ -472,6 +472,16 @@ func TestSuite_PythonMySQL(t *testing.T) { require.NoError(t, compose.Close()) } +func TestSuite_PythonMSSQL(t *testing.T) { + compose, err := docker.ComposeSuite("docker-compose-python-mssql.yml", path.Join(pathOutput, "test-suite-python-mssql.log")) + require.NoError(t, err) + + compose.Env = append(compose.Env, `OTEL_EBPF_OPEN_PORT=8080`, `OTEL_EBPF_EXECUTABLE_PATH=`, `TEST_SERVICE_PORTS=8381:8080`) + require.NoError(t, compose.Up()) + t.Run("Python MSSQL tests", testPythonMSSQL) + require.NoError(t, compose.Close()) +} + func TestSuite_PythonKafka(t *testing.T) { compose, err := docker.ComposeSuite("docker-compose-python-kafka.yml", path.Join(pathOutput, "test-suite-python-kafka.log")) compose.Env = append(compose.Env, `OTEL_EBPF_OPEN_PORT=8080`, `OTEL_EBPF_EXECUTABLE_PATH=`, `TEST_SERVICE_PORTS=8381:8080`) diff --git a/pkg/appolly/app/request/span.go b/pkg/appolly/app/request/span.go index d7ccb1263b..46edaccf86 100644 --- a/pkg/appolly/app/request/span.go +++ b/pkg/appolly/app/request/span.go @@ -83,6 +83,7 @@ const ( DBGeneric SQLKind = iota + 1 DBPostgres DBMySQL + DBMSSQL ) const ( @@ -1462,6 +1463,8 @@ func (s *Span) DBSystemName() attribute.KeyValue { return semconv.DBSystemNamePostgreSQL case int(DBMySQL): return semconv.DBSystemNameMySQL + case int(DBMSSQL): + return semconv.DBSystemNameMicrosoftSQLServer } } diff --git a/pkg/config/ebpf_tracer.go b/pkg/config/ebpf_tracer.go index 0d65227a71..3ce6f36b03 100644 --- a/pkg/config/ebpf_tracer.go +++ b/pkg/config/ebpf_tracer.go @@ -109,6 +109,9 @@ type EBPFTracer struct { // Postgres prepared statements cache size. PostgresPreparedStatementsCacheSize int `yaml:"postgres_prepared_statements_cache_size" env:"OTEL_EBPF_BPF_POSTGRES_PREPARED_STATEMENTS_CACHE_SIZE" validate:"gt=0"` + // MSSQL prepared statements cache size. + MSSQLPreparedStatementsCacheSize int `yaml:"mssql_prepared_statements_cache_size" env:"OTEL_EBPF_BPF_MSSQL_PREPARED_STATEMENTS_CACHE_SIZE" validate:"gt=0"` + // Kafka Topic UUID to Name cache size. KafkaTopicUUIDCacheSize int `yaml:"kafka_topic_uuid_cache_size" env:"OTEL_KAFKA_TOPIC_UUID_CACHE_SIZE" validate:"gt=0"` @@ -180,6 +183,7 @@ type EBPFBufferSizes struct { MySQL uint32 `yaml:"mysql" env:"OTEL_EBPF_BPF_BUFFER_SIZE_MYSQL" validate:"lte=65536"` Kafka uint32 `yaml:"kafka" env:"OTEL_EBPF_BPF_BUFFER_SIZE_KAFKA" validate:"lte=65536"` Postgres uint32 `yaml:"postgres" env:"OTEL_EBPF_BPF_BUFFER_SIZE_POSTGRES" validate:"lte=65536"` + MSSQL uint32 `yaml:"mssql" env:"OTEL_EBPF_BPF_BUFFER_SIZE_MSSQL" validate:"lte=65536"` } // HasHeaders returns true if HTTP headers context propagation is enabled diff --git a/pkg/ebpf/common/common.go b/pkg/ebpf/common/common.go index f63c931577..0c1fdf130c 100644 --- a/pkg/ebpf/common/common.go +++ b/pkg/ebpf/common/common.go @@ -83,6 +83,7 @@ const ( ProtocolTypeHTTP // not used, written for consistency ProtocolTypeKafka ProtocolTypeMQTT // placeholder for future kernel-space detection + ProtocolTypeMSSQL ) const ( @@ -193,6 +194,7 @@ type EBPFParseContext struct { mysqlPreparedStatements *simplelru.LRU[mysqlPreparedStatementsKey, string] postgresPreparedStatements *simplelru.LRU[postgresPreparedStatementsKey, string] postgresPortals *simplelru.LRU[postgresPortalsKey, string] + mssqlPreparedStatements *simplelru.LRU[mssqlPreparedStatementsKey, string] kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string] payloadExtraction config.PayloadExtraction httpEnricher *ebpfhttp.HTTPEnricher @@ -231,6 +233,7 @@ func NewEBPFParseContext(cfg *config.EBPFTracer, spansChan *msg.Queue[[]request. mysqlPreparedStatements *simplelru.LRU[mysqlPreparedStatementsKey, string] postgresPreparedStatements *simplelru.LRU[postgresPreparedStatementsKey, string] postgresPortals *simplelru.LRU[postgresPortalsKey, string] + mssqlPreparedStatements *simplelru.LRU[mssqlPreparedStatementsKey, string] kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string] mongoRequestCache PendingMongoDBRequests payloadExtraction config.PayloadExtraction @@ -285,6 +288,11 @@ func NewEBPFParseContext(cfg *config.EBPFTracer, spansChan *msg.Queue[[]request. ptlog().Error("failed to create Postgres portals cache", "error", err) } + mssqlPreparedStatements, err = simplelru.NewLRU[mssqlPreparedStatementsKey, string](cfg.MSSQLPreparedStatementsCacheSize, nil) + if err != nil { + ptlog().Error("failed to create MSSQL prepared statements cache", "error", err) + } + kafkaTopicUUIDToName, err = simplelru.NewLRU[kafkaparser.UUID, string](cfg.KafkaTopicUUIDCacheSize, nil) if err != nil { ptlog().Error("failed to create Kafka topic UUID to name cache", "error", err) @@ -312,6 +320,7 @@ func NewEBPFParseContext(cfg *config.EBPFTracer, spansChan *msg.Queue[[]request. mysqlPreparedStatements: mysqlPreparedStatements, postgresPreparedStatements: postgresPreparedStatements, postgresPortals: postgresPortals, + mssqlPreparedStatements: mssqlPreparedStatements, kafkaTopicUUIDToName: kafkaTopicUUIDToName, payloadExtraction: payloadExtraction, httpEnricher: httpEnricher, diff --git a/pkg/ebpf/common/sql_detect_mssql.go b/pkg/ebpf/common/sql_detect_mssql.go new file mode 100644 index 0000000000..2bba7fde64 --- /dev/null +++ b/pkg/ebpf/common/sql_detect_mssql.go @@ -0,0 +1,363 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ebpfcommon // import "go.opentelemetry.io/obi/pkg/ebpf/common" + +import ( + "encoding/binary" + "log/slog" + "unicode/utf16" + "unicode/utf8" + + "go.opentelemetry.io/obi/pkg/appolly/app/request" + "go.opentelemetry.io/obi/pkg/internal/largebuf" + "go.opentelemetry.io/obi/pkg/internal/sqlprune" +) + +type mssqlPreparedStatementsKey struct { + connInfo BpfConnectionInfoT + stmtID uint32 +} + +const ( + kMSSQLHeaderLen = 8 + kMSSQLBatch = 1 + kMSSQLRPC = 3 + kMSSQLResponse = 4 + + kMSSQLProcIDPrepare = 11 + kMSSQLProcIDExecute = 12 + kMSSQLProcIDPrepExec = 13 + + // TDS Token types + // https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/7091f6f6-b83d-4ed2-afeb-ba5013dfb18f + kMSSQLTokenReturnValue = 0xAC + + // TDS TypeInfo type identifiers + // https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/d2ed21d6-527b-46ac-8035-94f6f68eb9a8 + kMSSQLTypeInt4 = 0x26 // fixed-length 4-byte integer + kMSSQLTypeIntN = 0x38 // variable-length integer (length byte precedes value) + + // Fixed lengths for TDS RETURNVALUE (0xAC) token fields that follow the name + kMSSQLStatusLen = 1 + kMSSQLUserTypeLen = 4 + kMSSQLFlagsLen = 2 + // Sum of Status, UserType, and Flags fields + kMSSQLReturnValueMetadataLen = kMSSQLStatusLen + kMSSQLUserTypeLen + kMSSQLFlagsLen + + // Maximum size of a single TDS packet as defined by the protocol. + // Defaults to 4096, but can be negotiated up to 32767. + kMSSQLMaxPacketSize = 32767 +) + +func isMSSQL(b *largebuf.LargeBuffer) bool { + if b.Len() < kMSSQLHeaderLen { + return false + } + + pktType, err := b.U8At(0) + if err != nil { + return false + } + if pktType != kMSSQLBatch && pktType != kMSSQLRPC && pktType != kMSSQLResponse { + return false + } + + // Status byte check: upper 4 bits are reserved and should be 0. + // This helps filter out random binary data that might match the packet type. + status, err := b.U8At(1) + if err != nil { + return false + } + if (status & 0xF0) != 0 { + return false + } + + // Length is big-endian in TDS. It's the total number of bytes in the Packet + // including the 8-byte header. + length, err := b.U16BEAt(2) + if err != nil { + return false + } + + // Check the length: + // 1. MUST be at least 8 bytes (the size of the header itself). + // 2. MUST be less than or equal to the maximum allowable negotiated packet + // size (32,767 bytes). + // Note: While the *negotiated* packet size must be between 512 and 32,767, + // individual packets can be much smaller (e.g., a simple SELECT batch). + if length < uint16(kMSSQLHeaderLen) || length > kMSSQLMaxPacketSize { + return false + } + + // The Window byte (at offset 7) is currently unused and should be 0. + window, err := b.U8At(7) + if err != nil { + return false + } + return window == 0 +} + +func ucs2ToUTF8(b []byte) []byte { + if len(b)%2 != 0 { + b = b[:len(b)-1] + } + + out := make([]byte, 0, len(b)) + + for i := 0; i < len(b); i += 2 { + u1 := binary.LittleEndian.Uint16(b[i:]) + + if utf16.IsSurrogate(rune(u1)) && i+2 < len(b) { + u2 := binary.LittleEndian.Uint16(b[i+2:]) + if r := utf16.DecodeRune(rune(u1), rune(u2)); r != utf8.RuneError { + out = utf8.AppendRune(out, r) + i += 2 + continue + } + } + + out = utf8.AppendRune(out, rune(u1)) + } + + return out +} + +// extractTDSPayloads iterates over all TDS packets in b and returns their +// concatenated payload bytes with every 8-byte packet header stripped out. +// This is necessary because a single TDS message may span multiple packets, +// and naively treating the whole buffer as one payload would corrupt decoding +// wherever an embedded packet header appears. +func extractTDSPayloads(b *largebuf.LargeBuffer) []byte { + total := b.Len() + var payload []byte + + for offset := 0; offset+kMSSQLHeaderLen <= total; { + pktLen, err := b.U16BEAt(offset + 2) + if err != nil || int(pktLen) < kMSSQLHeaderLen || offset+int(pktLen) > total { + break + } + payloadLen := int(pktLen) - kMSSQLHeaderLen + if payloadLen > 0 { + chunk, err := b.UnsafeViewAt(offset+kMSSQLHeaderLen, payloadLen) + if err != nil { + break + } + payload = append(payload, chunk...) + } + offset += int(pktLen) + } + + return payload +} + +func mssqlPreparedStatements(b *largebuf.LargeBuffer) (string, string, string) { + if b.Len() <= kMSSQLHeaderLen { + return "", "", "" + } + + pktType, _ := b.U8At(0) + if pktType == kMSSQLBatch { + stmt := ucs2ToUTF8(extractTDSPayloads(b)) + return detectSQL(stmt) + } + + return "", "", "" +} + +func handleMSSQL(parseCtx *EBPFParseContext, event *TCPRequestInfo, requestBuffer, responseBuffer *largebuf.LargeBuffer) (request.Span, error) { + var ( + op, table, stmt string + span request.Span + ) + + if requestBuffer.Len() < kMSSQLHeaderLen { + slog.Debug("MSSQL request too short") + return span, errFallback + } + + reqRaw := requestBuffer.UnsafeView() + respRaw := responseBuffer.UnsafeView() + + sqlCommand := sqlprune.SQLParseCommandID(request.DBMSSQL, reqRaw) + sqlError := sqlprune.SQLParseError(request.DBMSSQL, respRaw) + + switch sqlCommand { + case "SQL_BATCH": + op, table, stmt = mssqlPreparedStatements(requestBuffer) + case "RPC": + procID, r, err := parseMSSQLRPC(requestBuffer) + if err == nil { + payload := r.Bytes() + switch procID { + case kMSSQLProcIDPrepExec: + text := ucs2ToUTF8(payload) + op, table, stmt = detectSQL(text) + case kMSSQLProcIDPrepare: + text := ucs2ToUTF8(payload) + _, _, stmt = detectSQL(text) + handle := parseHandleFromPrepareResponse(responseBuffer) + if handle != 0 && stmt != "" { + parseCtx.mssqlPreparedStatements.Add(mssqlPreparedStatementsKey{ + connInfo: event.ConnInfo, + stmtID: handle, + }, stmt) + return span, errIgnore + } + case kMSSQLProcIDExecute: + handle := parseHandleFromExecute(r) + if handle != 0 { + var found bool + stmt, found = parseCtx.mssqlPreparedStatements.Get(mssqlPreparedStatementsKey{ + connInfo: event.ConnInfo, + stmtID: handle, + }) + if found { + op, table = sqlprune.SQLParseOperationAndTable(stmt) + } + } + } + } + } + + if !validSQL(op, table, request.DBMSSQL) { + slog.Debug("MSSQL operation and/or table are invalid", "stmt", stmt) + return span, errFallback + } + + return TCPToSQLToSpan(event, op, table, stmt, request.DBMSSQL, sqlCommand, sqlError), nil +} + +func parseMSSQLRPC(b *largebuf.LargeBuffer) (uint16, largebuf.LargeBufferReader, error) { + if b.Len() < kMSSQLHeaderLen+2 { + return 0, largebuf.LargeBufferReader{}, errFallback + } + + firstPktLen, err := b.U16BEAt(2) + if err != nil || int(firstPktLen) < kMSSQLHeaderLen || int(firstPktLen) > b.Len() { + return 0, largebuf.LargeBufferReader{}, errFallback + } + + // Parse ProcID from the first packet's payload. The RPC header fields + // (NameLen, ProcID/Name, OptionFlags) are always in the first TDS packet. + r, err := b.NewLimitedReader(kMSSQLHeaderLen, int(firstPktLen)) + if err != nil { + return 0, largebuf.LargeBufferReader{}, err + } + + nameLen, err := r.ReadU16LE() + if err != nil { + return 0, largebuf.LargeBufferReader{}, err + } + + var procID uint16 + if nameLen == 0xFFFF { + // ProcID follows NameLen when it is 0xFFFF + procID, err = r.ReadU16LE() + } else { + // Skip the name string (UCS-2, so 2 bytes per char) + err = r.Skip(int(nameLen) * 2) + } + + if err != nil { + return 0, largebuf.LargeBufferReader{}, err + } + + if err := r.Skip(2); err != nil { // OptionFlags + return procID, largebuf.LargeBufferReader{}, err + } + + // headerConsumed is the number of bytes at the start of the TDS payload + // used by the RPC header (NameLen + ProcID/Name + OptionFlags). + headerConsumed := r.ReadOffset() - kMSSQLHeaderLen + + // Extract parameters from all TDS packets so that multi-packet RPC + // requests are handled correctly. Strip the RPC header from the front. + allPayloads := extractTDSPayloads(b) + if headerConsumed > len(allPayloads) { + return procID, largebuf.LargeBufferReader{}, errFallback + } + + return procID, largebuf.NewLargeBufferFrom(allPayloads[headerConsumed:]).NewReader(), nil +} + +func parseHandleFromExecute(r largebuf.LargeBufferReader) uint32 { + nameLen, err := r.ReadU8() + if err != nil { + return 0 + } + + if err := r.Skip(int(nameLen) * 2); err != nil { // name (UCS-2) + return 0 + } + + if err := r.Skip(1); err != nil { // status + return 0 + } + + typ, err := r.ReadU8() + if err != nil { + return 0 + } + + switch typ { + case kMSSQLTypeInt4: + val, _ := r.ReadU32LE() + return val + case kMSSQLTypeIntN: + length, err := r.ReadU8() + if err == nil && length == 4 { + val, _ := r.ReadU32LE() + return val + } + } + return 0 +} + +func parseHandleFromPrepareResponse(b *largebuf.LargeBuffer) uint32 { + payload := extractTDSPayloads(b) + if len(payload) == 0 { + return 0 + } + + r := largebuf.NewLargeBufferFrom(payload).NewReader() + + for { + idx := r.IndexByte(kMSSQLTokenReturnValue) + if idx < 0 { + break + } + + _ = r.Skip(idx + 1) + + if r.Remaining() < 3 { + continue + } + + _ = r.Skip(2) // Ordinal + nameLen, _ := r.ReadU8() + + metadataLen := int(nameLen)*2 + kMSSQLReturnValueMetadataLen + if r.Remaining() < metadataLen+1 { + continue + } + + _ = r.Skip(metadataLen) + typ, _ := r.ReadU8() + + switch typ { + case kMSSQLTypeInt4: + if r.Remaining() >= 4 { + val, _ := r.ReadU32LE() + return val + } + case kMSSQLTypeIntN: + length, _ := r.ReadU8() + if length == 4 && r.Remaining() >= 4 { + val, _ := r.ReadU32LE() + return val + } + } + } + return 0 +} diff --git a/pkg/ebpf/common/sql_detect_mssql_test.go b/pkg/ebpf/common/sql_detect_mssql_test.go new file mode 100644 index 0000000000..d905fd88ef --- /dev/null +++ b/pkg/ebpf/common/sql_detect_mssql_test.go @@ -0,0 +1,330 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ebpfcommon + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" +) + +func TestIsMSSQL(t *testing.T) { + tests := []struct { + name string + buf []byte + want bool + }{ + { + name: "valid batch packet", + buf: []byte{0x01, 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00}, + want: true, + }, + { + name: "valid rpc packet", + buf: []byte{0x03, 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00}, + want: true, + }, + { + name: "valid response packet", + buf: []byte{0x04, 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00}, + want: true, + }, + { + name: "too short", + buf: []byte{0x01, 0x01, 0x00, 0x07, 0x00, 0x00, 0x00}, + want: false, + }, + { + name: "invalid type", + buf: []byte{0x05, 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00}, + want: false, + }, + { + name: "invalid status", + buf: []byte{0x01, 0x10, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00}, + want: false, + }, + { + name: "invalid length too small", + buf: []byte{0x01, 0x01, 0x00, 0x07, 0x00, 0x00, 0x00, 0x00}, + want: false, + }, + { + name: "invalid length too large", + buf: []byte{0x01, 0x01, 0x80, 0x01, 0x00, 0x00, 0x00, 0x00}, + want: false, + }, + { + name: "invalid window byte", + buf: []byte{0x01, 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x01}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, isMSSQL(largebuf.NewLargeBufferFrom(tt.buf))) + }) + } +} + +func TestUCS2ToUTF8(t *testing.T) { + tests := []struct { + name string + buf []byte + want []byte + }{ + { + name: "simple ascii", + buf: []byte{'S', 0, 'E', 0, 'L', 0, 'E', 0, 'C', 0, 'T', 0}, + want: []byte{'S', 'E', 'L', 'E', 'C', 'T'}, + }, + { + name: "with special chars", + buf: []byte{'S', 0, 'E', 0, 'L', 0, 'E', 0, 'C', 0, 'T', 0, ' ', 0, '*', 0, ' ', 0, 'F', 0, 'R', 0, 'O', 0, 'M', 0}, + want: []byte{'S', 'E', 'L', 'E', 'C', 'T', ' ', '*', ' ', 'F', 'R', 'O', 'M'}, + }, + { + name: "odd length", + buf: []byte{'S', 0, 'E', 0, 'L', 0, 'E', 0, 'C', 0, 'T', 0, 'X'}, + want: []byte{'S', 'E', 'L', 'E', 'C', 'T'}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, ucs2ToUTF8(tt.buf)) + }) + } +} + +func makeTDSPacket(pktType, status byte, payload []byte) []byte { + totalLen := kMSSQLHeaderLen + len(payload) + hdr := []byte{ + pktType, status, + byte(totalLen >> 8), byte(totalLen), + 0x00, 0x00, // SPID + 0x01, // PacketID + 0x00, // Window + } + return append(hdr, payload...) +} + +func TestExtractTDSPayloads(t *testing.T) { + sqlPart1 := []byte{'S', 0, 'E', 0, 'L', 0, 'E', 0, 'C', 0, 'T', 0} + sqlPart2 := []byte{' ', 0, '1', 0} + + tests := []struct { + name string + buf []byte + want []byte + }{ + { + name: "single packet", + buf: makeTDSPacket(kMSSQLBatch, 0x01, sqlPart1), + want: sqlPart1, + }, + { + name: "two packets concatenated", + buf: append( + makeTDSPacket(kMSSQLBatch, 0x00, sqlPart1), + makeTDSPacket(kMSSQLBatch, 0x01, sqlPart2)..., + ), + want: append(sqlPart1, sqlPart2...), + }, + { + name: "truncated second packet ignored", + buf: append( + makeTDSPacket(kMSSQLBatch, 0x00, sqlPart1), + []byte{kMSSQLBatch, 0x01, 0x00, 0x0A}..., // header claims 10 bytes but nothing follows + ), + want: sqlPart1, + }, + { + name: "empty payload packet", + buf: makeTDSPacket(kMSSQLBatch, 0x01, nil), + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := extractTDSPayloads(largebuf.NewLargeBufferFrom(tt.buf)) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestMSSQLBatchParsing(t *testing.T) { + selectSQL := []byte{'S', 0, 'E', 0, 'L', 0, 'E', 0, 'C', 0, 'T', 0, ' ', 0, '*', 0, ' ', 0, 'F', 0, 'R', 0, 'O', 0, 'M', 0, ' ', 0, 't', 0} + tests := []struct { + name string + buf []byte + wantOp string + wantTable string + wantStmt string + }{ + { + name: "valid single-packet batch", + buf: makeTDSPacket(kMSSQLBatch, 0x01, []byte{'S', 0, 'E', 0, 'L', 0, 'E', 0, 'C', 0, 'T', 0}), + wantOp: "SELECT", wantTable: "", wantStmt: "SELECT", + }, + { + name: "sql split across two TDS packets", + buf: append( + makeTDSPacket(kMSSQLBatch, 0x00, selectSQL[:len(selectSQL)/2]), + makeTDSPacket(kMSSQLBatch, 0x01, selectSQL[len(selectSQL)/2:])..., + ), + wantOp: "SELECT", wantTable: "t", wantStmt: "SELECT * FROM t", + }, + { + name: "too short", + buf: makeTDSPacket(kMSSQLBatch, 0x01, nil), + wantOp: "", + wantTable: "", + wantStmt: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op, table, stmt := mssqlPreparedStatements(largebuf.NewLargeBufferFrom(tt.buf)) + assert.Equal(t, tt.wantOp, op) + assert.Equal(t, tt.wantTable, table) + assert.Equal(t, tt.wantStmt, stmt) + }) + } +} + +func TestParseMSSQLRPC(t *testing.T) { + tests := []struct { + name string + buf []byte + wantProcID uint16 + wantErr bool + }{ + { + name: "proc id 13", + buf: makeTDSPacket(kMSSQLRPC, 0x01, []byte{0xFF, 0xFF, 0x0D, 0x00, 0x00, 0x00}), + wantProcID: 13, + }, + { + name: "named proc", + buf: makeTDSPacket(kMSSQLRPC, 0x01, []byte{0x02, 0x00, 's', 0, 'p', 0, 0x00, 0x00}), + wantProcID: 0, + }, + { + // A second TDS packet appended after the first must not confuse header parsing. + name: "second packet ignored — proc id still parsed from first", + buf: append( + makeTDSPacket(kMSSQLRPC, 0x00, []byte{0xFF, 0xFF, 0x0D, 0x00, 0x00, 0x00}), + makeTDSPacket(kMSSQLRPC, 0x01, []byte{0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00})..., + ), + wantProcID: 13, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + procID, _, err := parseMSSQLRPC(largebuf.NewLargeBufferFrom(tt.buf)) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tt.wantProcID, procID) + }) + } +} + +func TestParseHandleFromExecute(t *testing.T) { + tests := []struct { + name string + payload []byte + wantHandle uint32 + }{ + { + name: "valid TI_INT4 handle", + payload: func() []byte { + // nameLen=0, status=0, type=0x26 (TI_INT4), value=123 + p := []byte{0, 0, 0x26} + v := make([]byte, 4) + binary.LittleEndian.PutUint32(v, 123) + return append(p, v...) + }(), + wantHandle: 123, + }, + { + name: "valid TI_INTN handle", + payload: func() []byte { + // nameLen=0, status=0, type=0x38 (TI_INTN), length=4, value=456 + p := []byte{0, 0, 0x38, 4} + v := make([]byte, 4) + binary.LittleEndian.PutUint32(v, 456) + return append(p, v...) + }(), + wantHandle: 456, + }, + { + name: "too short", + payload: []byte{0, 0, 0x26, 1, 2, 3}, + wantHandle: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := largebuf.NewLargeBufferFrom(tt.payload).NewReader() + handle := parseHandleFromExecute(r) + assert.Equal(t, tt.wantHandle, handle) + }) + } +} + +func TestParseHandleFromPrepareResponse(t *testing.T) { + tests := []struct { + name string + buf []byte + wantHandle uint32 + }{ + { + name: "valid prepare response TI_INT4", + buf: func() []byte { + // 0xAC (RETURNVALUE), ordinal=1 (2 bytes), nameLen=0 (1 byte), status=0 (1 byte), userType=0 (4 bytes), flags=0 (2 bytes), type=0x26 (1 byte), value=789 (4 bytes) + payload := []byte{0xAC, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x26} + v := make([]byte, 4) + binary.LittleEndian.PutUint32(v, 789) + return makeTDSPacket(kMSSQLResponse, 0x01, append(payload, v...)) + }(), + wantHandle: 789, + }, + { + name: "valid prepare response TI_INTN", + buf: func() []byte { + // 0xAC (RETURNVALUE), ordinal=1, nameLen=0, status=0, userType=0, flags=0, type=0x38 (TI_INTN), length=4, value=1011 + payload := []byte{0xAC, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x38, 4} + v := make([]byte, 4) + binary.LittleEndian.PutUint32(v, 1011) + return makeTDSPacket(kMSSQLResponse, 0x01, append(payload, v...)) + }(), + wantHandle: 1011, + }, + { + name: "no return value token", + buf: []byte{0x04, 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00}, + wantHandle: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handle := parseHandleFromPrepareResponse(largebuf.NewLargeBufferFrom(tt.buf)) + assert.Equal(t, tt.wantHandle, handle) + }) + } +} diff --git a/pkg/ebpf/common/sql_detect_transform.go b/pkg/ebpf/common/sql_detect_transform.go index 65abc5ddff..16cbeb4921 100644 --- a/pkg/ebpf/common/sql_detect_transform.go +++ b/pkg/ebpf/common/sql_detect_transform.go @@ -13,10 +13,13 @@ import ( func sqlKind(b *largebuf.LargeBuffer) request.SQLKind { if isPostgres(b) { return request.DBPostgres - } else if isMySQL(b) { + } + if isMySQL(b) { return request.DBMySQL } - + if isMSSQL(b) { + return request.DBMSSQL + } return request.DBGeneric } @@ -102,6 +105,8 @@ func detectSQLPayload(useHeuristics bool, b *largebuf.LargeBuffer) (string, stri op, table, sql = postgresPreparedStatements(b) case request.DBMySQL: op, table, sql = mysqlPreparedStatements(view) + case request.DBMSSQL: + op, table, sql = mssqlPreparedStatements(b) } } diff --git a/pkg/ebpf/common/tcp_detect_transform.go b/pkg/ebpf/common/tcp_detect_transform.go index f583bbf3ff..20a788b19d 100644 --- a/pkg/ebpf/common/tcp_detect_transform.go +++ b/pkg/ebpf/common/tcp_detect_transform.go @@ -87,6 +87,8 @@ func dispatchKernelAssignedProtocol(parseCtx *EBPFParseContext, event *TCPReques return dispatchMySQL(parseCtx, event, requestBuffer, responseBuffer) case ProtocolTypePostgres: return dispatchPostgres(parseCtx, event, requestBuffer, responseBuffer) + case ProtocolTypeMSSQL: + return dispatchMSSQL(parseCtx, event, requestBuffer, responseBuffer) } return request.Span{}, false, false, nil @@ -147,6 +149,11 @@ func dispatchPostgres(parseCtx *EBPFParseContext, event *TCPRequestInfo, request return handleError(span, err, "Postgres") } +func dispatchMSSQL(parseCtx *EBPFParseContext, event *TCPRequestInfo, requestBuffer, responseBuffer *largebuf.LargeBuffer) (request.Span, bool, bool, error) { + span, err := handleMSSQL(parseCtx, event, requestBuffer, responseBuffer) + return handleError(span, err, "MSSQL") +} + // detectGenericProtocol runs deterministic protocol detection for unclassified events: // SQL, FastCGI, MongoDB, Couchbase, and Memcached noreply. func detectGenericProtocol(parseCtx *EBPFParseContext, cfg *config.EBPFTracer, event *TCPRequestInfo, requestBuffer, responseBuffer *largebuf.LargeBuffer) (request.Span, bool, bool, error) { diff --git a/pkg/internal/ebpf/generictracer/generictracer.go b/pkg/internal/ebpf/generictracer/generictracer.go index 102d83735e..298108580c 100644 --- a/pkg/internal/ebpf/generictracer/generictracer.go +++ b/pkg/internal/ebpf/generictracer/generictracer.go @@ -203,6 +203,8 @@ func (p *Tracer) constants() map[string]any { m["mysql_max_captured_bytes"] = p.cfg.EBPF.BufferSizes.MySQL m["kafka_max_captured_bytes"] = p.cfg.EBPF.BufferSizes.Kafka m["postgres_max_captured_bytes"] = p.cfg.EBPF.BufferSizes.Postgres + m["mssql_max_captured_bytes"] = p.cfg.EBPF.BufferSizes.MSSQL + m["max_transaction_time"] = uint64(p.cfg.EBPF.MaxTransactionTime.Nanoseconds()) m["g_bpf_debug"] = p.cfg.EBPF.BpfDebug diff --git a/pkg/internal/sqlprune/mssql.go b/pkg/internal/sqlprune/mssql.go new file mode 100644 index 0000000000..c28364b05d --- /dev/null +++ b/pkg/internal/sqlprune/mssql.go @@ -0,0 +1,116 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sqlprune // import "go.opentelemetry.io/obi/pkg/internal/sqlprune" + +import ( + "encoding/binary" + "strconv" + "unicode/utf16" + + "go.opentelemetry.io/obi/pkg/appolly/app/request" +) + +const ( + mssqlHdrSize = 8 + mssqlErrToken = 0xAA + mssqlPktResponse = 0x04 +) + +func parseMSSQLCommandID(buf []uint8) uint8 { + if len(buf) < 1 { + return 0 + } + // The first byte is the packet type + return buf[0] +} + +func mssqlCommandIDToString(commandID uint8) string { + switch commandID { + case 0x01: + return "SQL_BATCH" + case 0x03: + return "RPC" + case 0x04: + return "RESPONSE" + default: + return "" + } +} + +func parseMSSQLError(buf []uint8) *request.SQLError { + if len(buf) < mssqlHdrSize+1 { + return nil + } + + // Check if it is a response packet + if buf[0] != mssqlPktResponse { + return nil + } + + offset := mssqlHdrSize + if offset >= len(buf) { + return nil + } + + // We only check the first token for now to avoid complex parsing + token := buf[offset] + if token == mssqlErrToken { + offset++ // skip token + if offset+2 > len(buf) { + return nil + } + // Skip the 2-byte length of the error token stream + offset += 2 + + // Number (4 bytes) + if offset+4 > len(buf) { + return nil + } + code := binary.LittleEndian.Uint32(buf[offset : offset+4]) + offset += 4 + + // State (1 byte) + if offset+1 > len(buf) { + return nil + } + state := buf[offset] + offset++ + + // Class (1 byte) + if offset+1 > len(buf) { + return nil + } + offset++ + + // MsgText (US_VARCHAR) + if offset+2 > len(buf) { + return nil + } + msgLen := int(binary.LittleEndian.Uint16(buf[offset : offset+2])) + offset += 2 + + if offset+msgLen*2 > len(buf) { + return nil + } + msgBytes := buf[offset : offset+msgLen*2] + + u16s := make([]uint16, msgLen) + for i := 0; i < msgLen; i++ { + u16s[i] = binary.LittleEndian.Uint16(msgBytes[i*2:]) + } + message := string(utf16.Decode(u16s)) + + sqlErr := &request.SQLError{ + Message: message, + SQLState: strconv.Itoa(int(state)), + } + // MSSQL error numbers are 4 bytes; only assign Code when it fits in 16 bits + if code <= 0xFFFF { + sqlErr.Code = uint16(code) + } + return sqlErr + } + + return nil +} diff --git a/pkg/internal/sqlprune/mssql_test.go b/pkg/internal/sqlprune/mssql_test.go new file mode 100644 index 0000000000..79b52ee762 --- /dev/null +++ b/pkg/internal/sqlprune/mssql_test.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sqlprune + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/obi/pkg/appolly/app/request" +) + +// makeMSSQLErrorPacket builds a minimal TDS response packet containing a single ERROR token. +// msg must contain only ASCII characters. +func makeMSSQLErrorPacket(errNumber uint32, state uint8, msg string) []uint8 { + msgLen := len(msg) + + b := []uint8{ + mssqlPktResponse, 0x01, // type, status + 0x00, 0x00, // length (not validated by parser) + 0x00, 0x00, // spid + 0x01, 0x00, // packet_id, window + mssqlErrToken, // ERROR token + 0x00, 0x00, // token length (not used by parser) + } + + b = append(b, uint8(errNumber), uint8(errNumber>>8), uint8(errNumber>>16), uint8(errNumber>>24)) + b = append(b, state) + b = append(b, 0x10) // class + + b = append(b, uint8(msgLen), uint8(msgLen>>8)) + + for _, c := range msg { + b = append(b, uint8(c), 0x00) + } + + return b +} + +func TestParseMSSQLError(t *testing.T) { + tests := []struct { + name string + buf []uint8 + expected *request.SQLError + }{ + { + name: "valid error with code and message", + buf: makeMSSQLErrorPacket(208, 1, "Invalid object name 'nonexistent_table'"), + expected: &request.SQLError{ + Code: 208, + SQLState: "1", + Message: "Invalid object name 'nonexistent_table'", + }, + }, + { + name: "error number exceeds 16 bits clears Code", + buf: makeMSSQLErrorPacket(0x10000, 2, "some error"), + expected: &request.SQLError{ + SQLState: "2", + Message: "some error", + }, + }, + { + name: "not a response packet", + buf: func() []uint8 { b := makeMSSQLErrorPacket(208, 1, "err"); b[0] = 0x01; return b }(), + expected: nil, + }, + { + name: "no error token at offset 8", + buf: func() []uint8 { + b := makeMSSQLErrorPacket(208, 1, "err") + b[mssqlHdrSize] = 0x79 // DONE token, not ERROR + return b + }(), + expected: nil, + }, + { + name: "too short buffer", + buf: []uint8{mssqlPktResponse, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00}, + expected: nil, + }, + { + name: "message truncated", + buf: func() []uint8 { + b := makeMSSQLErrorPacket(208, 1, "some long message") + return b[:len(b)-4] // cut before message ends + }(), + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := parseMSSQLError(tt.buf) + assert.Equal(t, tt.expected, got) + }) + } +} diff --git a/pkg/internal/sqlprune/sqlparser.go b/pkg/internal/sqlprune/sqlparser.go index 22d893988f..922fcf7b6e 100644 --- a/pkg/internal/sqlprune/sqlparser.go +++ b/pkg/internal/sqlprune/sqlparser.go @@ -101,6 +101,8 @@ func SQLParseError(kind request.SQLKind, buf []uint8) *request.SQLError { sqlErr = parseMySQLError(buf) case request.DBPostgres: sqlErr = parsePostgresError(buf) + case request.DBMSSQL: + sqlErr = parseMSSQLError(buf) default: return nil // unsupported SQL kind } @@ -114,6 +116,8 @@ func SQLParseCommandID(kind request.SQLKind, buf []byte) string { return mysqlCommandIDToString(parseMySQLCommandID(buf)) case request.DBPostgres: return postgresMessageTypeToString(parsePostgresMessageType(buf)) + case request.DBMSSQL: + return mssqlCommandIDToString(parseMSSQLCommandID(buf)) default: return "" } diff --git a/pkg/internal/sqlprune/sqlparser_test.go b/pkg/internal/sqlprune/sqlparser_test.go index b403a5c992..941c02f3bb 100644 --- a/pkg/internal/sqlprune/sqlparser_test.go +++ b/pkg/internal/sqlprune/sqlparser_test.go @@ -233,6 +233,73 @@ func TestSQLParseError(t *testing.T) { buf: []uint8{0x00, 0x00, 0x00, 0x00}, expected: nil, }, + { + name: "Valid MSSQL error", + dbKind: request.DBMSSQL, + buf: []uint8{ + 0x04, // Packet Type: Response + 0x01, // Status: EOM + 0x00, 0x24, // Length: 36 bytes (8 header + 28 payload) + 0x00, 0x00, // SPID + 0x00, // PacketID + 0x00, // Window + 0xAA, // Token: ERROR + 0x12, 0x00, // Token length: 18 bytes (excluding token byte and length itself) + 0x39, 0x30, 0x00, 0x00, // Number: 12345 (0x3039) + 0x01, // State + 0x02, // Class + 0x05, 0x00, // MsgLen: 5 characters + 'H', 0x00, 'e', 0x00, 'l', 0x00, 'l', 0x00, 'o', 0x00, // Message: "Hello" (UTF-16LE) + }, + expected: &request.SQLError{ + Code: 12345, + Message: "Hello", + SQLState: "1", + }, + }, + { + name: "MSSQL error with large code (exceeds uint16)", + dbKind: request.DBMSSQL, + buf: []uint8{ + 0x04, // Packet Type: Response + 0x01, // Status: EOM + 0x00, 0x24, // Length + 0x00, 0x00, // SPID + 0x00, // PacketID + 0x00, // Window + 0xAA, // Token: ERROR + 0x12, 0x00, // Token length + 0x00, 0x00, 0x01, 0x00, // Number: 65536 (0x10000) + 0x01, // State + 0x02, // Class + 0x05, 0x00, // MsgLen: 5 characters + 'H', 0x00, 'e', 0x00, 'l', 0x00, 'l', 0x00, 'o', 0x00, // Message: "Hello" (UTF-16LE) + }, + expected: &request.SQLError{ + Code: 0, // Should be 0 because 65536 > 0xFFFF + Message: "Hello", + SQLState: "1", + }, + }, + { + name: "MSSQL non-response packet", + dbKind: request.DBMSSQL, + buf: []uint8{ + 0x01, // Packet Type: SQL Batch (not Response) + 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, + 0xAA, + }, + expected: nil, + }, + { + name: "MSSQL truncated error token", + dbKind: request.DBMSSQL, + buf: []uint8{ + 0x04, 0x01, 0x00, 0x0A, 0x00, 0x00, 0x00, 0x00, + 0xAA, 0x01, // Missing length, etc. + }, + expected: nil, + }, } for _, tt := range tests { diff --git a/pkg/obi/config.go b/pkg/obi/config.go index 23e36ef446..bd06011e4f 100644 --- a/pkg/obi/config.go +++ b/pkg/obi/config.go @@ -138,9 +138,11 @@ var DefaultConfig = Config{ MySQL: 0, Postgres: 0, Kafka: 0, + MSSQL: 0, }, MySQLPreparedStatementsCacheSize: 1024, PostgresPreparedStatementsCacheSize: 1024, + MSSQLPreparedStatementsCacheSize: 1024, MongoRequestsCacheSize: 1024, KafkaTopicUUIDCacheSize: 1024, CouchbaseDBCacheSize: 1024, diff --git a/pkg/obi/config_test.go b/pkg/obi/config_test.go index 906227dfa4..1bd3a4eb4d 100644 --- a/pkg/obi/config_test.go +++ b/pkg/obi/config_test.go @@ -160,6 +160,7 @@ discovery: }, MySQLPreparedStatementsCacheSize: 1024, PostgresPreparedStatementsCacheSize: 1024, + MSSQLPreparedStatementsCacheSize: 1024, MongoRequestsCacheSize: 1024, KafkaTopicUUIDCacheSize: 1024, CouchbaseDBCacheSize: 1024,