diff --git a/Cargo.lock b/Cargo.lock index 0ef3ed475..0d45889b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2919,6 +2919,7 @@ dependencies = [ "expect-test", "hex", "ironrdp-acceptor", + "ironrdp-bulk", "ironrdp-cfg", "ironrdp-cliprdr", "ironrdp-cliprdr-format", diff --git a/crates/ironrdp-bulk/benches/bulk_compression.rs b/crates/ironrdp-bulk/benches/bulk_compression.rs index 107d37dbb..ed4d0f2aa 100644 --- a/crates/ironrdp-bulk/benches/bulk_compression.rs +++ b/crates/ironrdp-bulk/benches/bulk_compression.rs @@ -71,7 +71,7 @@ fn bench_compress_decompress(c: &mut Criterion, ct: CompressionType, data: &[u8] let name = algo_name(ct); // Verify data actually compresses with this algorithm - let mut test_comp = BulkCompressor::new(ct).expect("bulk compressor should initialize"); + let mut test_comp = BulkCompressor::new(ct); let (test_size, test_flags) = test_comp.compress(data).expect("bulk compression should succeed"); let is_compressed = test_flags & flags::PACKET_COMPRESSED != 0; @@ -85,7 +85,7 @@ fn bench_compress_decompress(c: &mut Criterion, ct: CompressionType, data: &[u8] group.bench_function(BenchmarkId::new("compress", data.len()), |b| { b.iter_batched( - || BulkCompressor::new(ct).expect("bulk compressor should initialize"), + || BulkCompressor::new(ct), |mut compressor| { black_box( compressor @@ -107,7 +107,7 @@ fn bench_compress_decompress(c: &mut Criterion, ct: CompressionType, data: &[u8] group.bench_function(BenchmarkId::new("decompress", data.len()), |b| { b.iter_batched( - || BulkCompressor::new(ct).expect("bulk compressor should initialize"), + || BulkCompressor::new(ct), |mut decompressor| { black_box( decompressor diff --git a/crates/ironrdp-bulk/src/bulk.rs b/crates/ironrdp-bulk/src/bulk.rs index 2a49852be..f680a38bd 100644 --- a/crates/ironrdp-bulk/src/bulk.rs +++ b/crates/ironrdp-bulk/src/bulk.rs @@ -72,12 +72,12 @@ impl BulkCompressor { /// - `Rdp6` (0x02): NCRUSH (Huffman-based) /// - `Rdp61` (0x03): XCRUSH (two-level: chunk matching + MPPC) /// - pub fn new(compression_level: CompressionType) -> Result { + pub fn new(compression_level: CompressionType) -> Self { // MPPC contexts are created with level 1 by default and adjusted dynamically. let mppc_send = MppcContext::new(1); let mppc_recv = MppcContext::new(1); - let ncrush_send = NCrushContext::new()?; - let ncrush_recv = NCrushContext::new()?; + let ncrush_send = NCrushContext::new(); + let ncrush_recv = NCrushContext::new(); let xcrush_send = XCrushContext::new(); let xcrush_recv = XCrushContext::new(); @@ -88,7 +88,7 @@ impl BulkCompressor { v.into_boxed_slice().try_into().unwrap_or_else(|_| unreachable!()) }; - Ok(Self { + Self { compression_level, mppc_send, mppc_recv, @@ -99,7 +99,7 @@ impl BulkCompressor { output_buffer, total_uncompressed_bytes: 0, total_compressed_bytes: 0, - }) + } } /// Returns the configured compression level. @@ -271,25 +271,25 @@ mod tests { #[test] fn test_bulk_compressor_new_rdp4() { - let bulk = BulkCompressor::new(CompressionType::Rdp4).unwrap(); + let bulk = BulkCompressor::new(CompressionType::Rdp4); assert_eq!(bulk.compression_level(), CompressionType::Rdp4); } #[test] fn test_bulk_compressor_new_rdp5() { - let bulk = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let bulk = BulkCompressor::new(CompressionType::Rdp5); assert_eq!(bulk.compression_level(), CompressionType::Rdp5); } #[test] fn test_bulk_compressor_new_rdp6() { - let bulk = BulkCompressor::new(CompressionType::Rdp6).unwrap(); + let bulk = BulkCompressor::new(CompressionType::Rdp6); assert_eq!(bulk.compression_level(), CompressionType::Rdp6); } #[test] fn test_bulk_compressor_new_rdp61() { - let bulk = BulkCompressor::new(CompressionType::Rdp61).unwrap(); + let bulk = BulkCompressor::new(CompressionType::Rdp61); assert_eq!(bulk.compression_level(), CompressionType::Rdp61); } @@ -314,14 +314,14 @@ mod tests { #[test] fn test_bulk_compressor_reset() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp61).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp61); // Should not panic bulk.reset(); } #[test] fn test_bulk_compressor_contexts_independent() { - let bulk = BulkCompressor::new(CompressionType::Rdp6).unwrap(); + let bulk = BulkCompressor::new(CompressionType::Rdp6); // Send and receive NCRUSH contexts should be separate instances // (we can only verify they exist and the struct was created) assert_eq!(bulk.compression_level(), CompressionType::Rdp6); @@ -333,7 +333,7 @@ mod tests { #[test] fn test_bulk_compress_skip_small_input() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp5); let data = b"tiny"; // 4 bytes, below threshold let (size, flags) = bulk.compress(data).unwrap(); assert_eq!(size, data.len()); @@ -342,7 +342,7 @@ mod tests { #[test] fn test_bulk_compress_skip_empty() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp5); let data = b""; let (size, flags) = bulk.compress(data).unwrap(); assert_eq!(size, 0); @@ -355,7 +355,7 @@ mod tests { #[test] fn test_bulk_decompress_no_flags() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp5); let data = b"uncompressed data"; let result = bulk.decompress(data, 0x00).unwrap(); assert_eq!(result, data); @@ -363,7 +363,7 @@ mod tests { #[test] fn test_bulk_decompress_unsupported_type() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp5); // flags = PACKET_COMPRESSED | type 0x0F (invalid) let result = bulk.decompress(b"data", 0x2F); assert!(result.is_err()); @@ -376,8 +376,8 @@ mod tests { /// Helper: compress with one BulkCompressor (sender) and decompress /// with another (receiver). Returns the decompressed data as a Vec. fn bulk_roundtrip(compression_level: CompressionType, input: &[u8]) -> Vec { - let mut sender = BulkCompressor::new(compression_level).unwrap(); - let mut receiver = BulkCompressor::new(compression_level).unwrap(); + let mut sender = BulkCompressor::new(compression_level); + let mut receiver = BulkCompressor::new(compression_level); let (comp_size, flags) = sender.compress(input).unwrap(); @@ -451,7 +451,7 @@ mod tests { #[test] fn test_bulk_compress_rdp5_sets_type_bits() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp5); let input = b"Some data that should compress with MPPC level 1 algorithm!!"; let (_size, flags) = bulk.compress(input).unwrap(); @@ -464,7 +464,7 @@ mod tests { #[test] fn test_bulk_compress_rdp6_sets_type_bits() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp6).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp6); let input = b"for.whom.the.bell.tolls,.the.bell.tolls.for.thee!xx"; let (_size, flags) = bulk.compress(input).unwrap(); @@ -481,7 +481,7 @@ mod tests { #[test] fn test_metrics_start_at_zero() { - let bulk = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let bulk = BulkCompressor::new(CompressionType::Rdp5); assert_eq!(bulk.total_compressed_bytes(), 0); assert_eq!(bulk.total_uncompressed_bytes(), 0); assert!(bulk.compression_ratio().abs() < f64::EPSILON); @@ -489,7 +489,7 @@ mod tests { #[test] fn test_metrics_accumulate_on_compress() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp5); let input = b"Hello world! Hello world! Hello world! Hello world! x"; let (comp_size, flags) = bulk.compress(input).unwrap(); @@ -509,8 +509,8 @@ mod tests { #[test] fn test_metrics_accumulate_on_decompress() { - let mut sender = BulkCompressor::new(CompressionType::Rdp5).unwrap(); - let mut receiver = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let mut sender = BulkCompressor::new(CompressionType::Rdp5); + let mut receiver = BulkCompressor::new(CompressionType::Rdp5); let input = b"Hello world! Hello world! Hello world! Hello world! x"; let (comp_size, flags) = sender.compress(input).unwrap(); @@ -533,8 +533,8 @@ mod tests { #[test] fn test_metrics_accumulate_across_multiple_calls() { - let mut sender = BulkCompressor::new(CompressionType::Rdp5).unwrap(); - let mut receiver = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let mut sender = BulkCompressor::new(CompressionType::Rdp5); + let mut receiver = BulkCompressor::new(CompressionType::Rdp5); let inputs: &[&[u8]] = &[ b"Hello world! Hello world! Hello world! Hello world! x", @@ -557,7 +557,7 @@ mod tests { #[test] fn test_metrics_not_reset_by_context_reset() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp5).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp5); let input = b"Hello world! Hello world! Hello world! Hello world! x"; let _ = bulk.compress(input).unwrap(); let before = bulk.total_uncompressed_bytes(); @@ -569,7 +569,7 @@ mod tests { #[test] fn test_bulk_compress_rdp61_sets_type_bits() { - let mut bulk = BulkCompressor::new(CompressionType::Rdp61).unwrap(); + let mut bulk = BulkCompressor::new(CompressionType::Rdp61); let input = b"XCRUSH test data with repeated XCRUSH patterns for compression!!"; let (_size, flags) = bulk.compress(input).unwrap(); diff --git a/crates/ironrdp-bulk/src/lib.rs b/crates/ironrdp-bulk/src/lib.rs index ba93fd852..557ad8af8 100644 --- a/crates/ironrdp-bulk/src/lib.rs +++ b/crates/ironrdp-bulk/src/lib.rs @@ -17,8 +17,8 @@ //! use ironrdp_bulk::{BulkCompressor, CompressionType, flags}; //! //! // Create sender (compressor) and receiver (decompressor) -//! let mut sender = BulkCompressor::new(CompressionType::Rdp5).unwrap(); -//! let mut receiver = BulkCompressor::new(CompressionType::Rdp5).unwrap(); +//! let mut sender = BulkCompressor::new(CompressionType::Rdp5); +//! let mut receiver = BulkCompressor::new(CompressionType::Rdp5); //! //! let input = b"Hello world! Hello world! Hello world! Hello world! x"; //! diff --git a/crates/ironrdp-bulk/src/ncrush/mod.rs b/crates/ironrdp-bulk/src/ncrush/mod.rs index e2f0f8b8d..f8b14d2dc 100644 --- a/crates/ironrdp-bulk/src/ncrush/mod.rs +++ b/crates/ironrdp-bulk/src/ncrush/mod.rs @@ -175,7 +175,7 @@ impl NCrushContext { /// Allocates the history, hash, and match buffers on the heap, generates /// the runtime Huffman tables, and calls `reset(false)`. /// - pub(crate) fn new() -> Result { + pub(crate) fn new() -> Self { let mut ctx = Self { history_offset: 0, history_end_offset: HISTORY_BUFFER_SIZE - 1, @@ -189,10 +189,10 @@ impl NCrushContext { huff_table_lom: heap_zeroed_array::(), }; - ctx.generate_tables()?; + ctx.generate_tables(); ctx.reset(false); - Ok(ctx) + ctx } /// Generates the runtime Huffman lookup tables for CopyOffset and @@ -206,7 +206,7 @@ impl NCrushContext { clippy::cast_possible_truncation, reason = "table generation: k (usize ≤4096) safely cast to u32 for verification" )] - fn generate_tables(&mut self) -> Result<(), BulkError> { + fn generate_tables(&mut self) { // --- Generate HuffTableLOM --- // For each LOM index i (0..28), fill entries for all values that // map to that index (based on LOMBitsLUT). @@ -232,20 +232,18 @@ impl NCrushContext { 28usize }; - if i >= tables::LOMBitsLUT.len() || i >= tables::LOMBaseLUT.len() { - return Err(BulkError::InvalidCompressedData( - "NCRUSH: generate_tables LOM index out of range", - )); - } + debug_assert!( + i < tables::LOMBitsLUT.len() && i < tables::LOMBaseLUT.len(), + "NCRUSH: generate_tables LOM index out of range (static-table invariant)" + ); let mask = (1u32 << tables::LOMBitsLUT[i]) - 1; let base = tables::LOMBaseLUT[i]; let reconstructed = (mask & (k as u32 - 2)) + base; - if reconstructed != k as u32 { - return Err(BulkError::InvalidCompressedData( - "NCRUSH: generate_tables LOM verification failed", - )); - } + debug_assert_eq!( + reconstructed, k as u32, + "NCRUSH: generate_tables LOM verification failed (static-table invariant)" + ); } // --- Generate HuffTableCopyOffset --- @@ -279,13 +277,10 @@ impl NCrushContext { } } - if (k + 256) > HUFF_TABLE_COPY_OFFSET_SIZE { - return Err(BulkError::InvalidCompressedData( - "NCRUSH: generate_tables CopyOffset overflow", - )); - } - - Ok(()) + debug_assert!( + (k + 256) <= HUFF_TABLE_COPY_OFFSET_SIZE, + "NCRUSH: generate_tables CopyOffset overflow (static-table invariant)" + ); } /// Refills the bit accumulator from the source data. @@ -1456,7 +1451,7 @@ mod tests { #[test] fn test_ncrush_context_new() { - let ctx = NCrushContext::new().unwrap(); + let ctx = NCrushContext::new(); assert_eq!(ctx.history_buffer_size, HISTORY_BUFFER_SIZE); assert_eq!(ctx.history_end_offset, HISTORY_BUFFER_SIZE - 1); assert_eq!(ctx.history_offset, 0); @@ -1466,7 +1461,7 @@ mod tests { #[test] fn test_ncrush_context_reset_no_flush() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); ctx.history_offset = 12345; ctx.offset_cache[0] = 42; ctx.offset_cache[1] = 99; @@ -1481,7 +1476,7 @@ mod tests { #[test] fn test_ncrush_context_reset_flush() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); ctx.reset(true); assert_eq!(ctx.history_offset, HISTORY_BUFFER_SIZE + 1); @@ -1489,7 +1484,7 @@ mod tests { #[test] fn test_ncrush_generate_tables_lom() { - let ctx = NCrushContext::new().unwrap(); + let ctx = NCrushContext::new(); // First entry at index 2 should be 0 (LOM index 0) assert_eq!(ctx.huff_table_lom[2], 0); @@ -1503,7 +1498,7 @@ mod tests { #[test] fn test_ncrush_generate_tables_copy_offset() { - let ctx = NCrushContext::new().unwrap(); + let ctx = NCrushContext::new(); // First entry at index 2 should be 0 assert_eq!(ctx.huff_table_copy_offset[2], 0); @@ -1521,7 +1516,7 @@ mod tests { fn test_ncrush_decompress_uncompressed_passthrough() { use crate::flags; - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); let data = b"hello world"; // No PACKET_COMPRESSED flag → should return source data directly @@ -1535,7 +1530,7 @@ mod tests { fn test_ncrush_decompress_flushed_clears_state() { use crate::flags; - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); ctx.history_offset = 1000; ctx.offset_cache[0] = 42; ctx.history_buffer[500] = 0xFF; @@ -1553,7 +1548,7 @@ mod tests { fn test_ncrush_decompress_compressed_too_short() { use crate::flags; - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); let data = [0u8; 3]; // less than 4 bytes let result = ctx.decompress(&data, flags::PACKET_FLUSHED | flags::PACKET_COMPRESSED); @@ -1635,7 +1630,7 @@ mod tests { fn test_ncrush_decompress_bells() { use crate::flags; - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // flags: PACKET_COMPRESSED | 2 (compression type NCRUSH) let flags_value = flags::PACKET_COMPRESSED | 0x02; @@ -1661,7 +1656,7 @@ mod tests { #[test] fn test_ncrush_hash_table_add_basic() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // Write "ABABAB..." into history at offset 100 let data = b"ABABABABAB"; // 10 bytes @@ -1698,7 +1693,7 @@ mod tests { #[test] fn test_ncrush_hash_table_add_chain() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // Insert two blocks with the same starting bytes to create a chain let data1 = b"XYXYXYXYXY"; // 10 bytes at offset 50 @@ -1718,7 +1713,7 @@ mod tests { #[test] fn test_ncrush_find_match_length_basic() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // Write identical data at two positions ctx.history_buffer[10] = b'A'; @@ -1742,7 +1737,7 @@ mod tests { #[test] fn test_ncrush_find_match_length_limit() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // Write identical data at two positions for i in 0..10 { @@ -1759,7 +1754,7 @@ mod tests { #[test] fn test_ncrush_find_match_length_immediate_limit() { - let ctx = NCrushContext::new().unwrap(); + let ctx = NCrushContext::new(); // offset1 > limit immediately → returns -1 let len = ctx.find_match_length(10, 20, 5); @@ -1768,7 +1763,7 @@ mod tests { #[test] fn test_ncrush_find_best_match_no_chain() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // match_table[100] = 0 → no chain let result = ctx.find_best_match(100).unwrap(); @@ -1777,7 +1772,7 @@ mod tests { #[test] fn test_ncrush_find_best_match_simple() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // Set up: write "ABCDEF" at position 50 and "ABCDXY" at position 100 let pattern1 = b"ABCDEF"; @@ -1807,7 +1802,7 @@ mod tests { #[test] fn test_ncrush_move_encoder_windows_basic() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // Write some data in the second half of the buffer for i in 0..100 { @@ -1837,7 +1832,7 @@ mod tests { #[test] fn test_ncrush_move_encoder_windows_clamps_negative() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // Entry pointing before the offset should be clamped to 0 ctx.hash_table[42] = 50; // 50 < offset (say, 100) @@ -1993,7 +1988,7 @@ mod tests { #[test] fn test_ncrush_encode_length_of_match_simple() { - let ctx = NCrushContext::new().unwrap(); + let ctx = NCrushContext::new(); let mut buf = [0u8; 16]; let mut writer = NCrushBitWriter::new(&mut buf); @@ -2009,7 +2004,7 @@ mod tests { #[test] fn test_ncrush_encode_copy_offset_small() { - let ctx = NCrushContext::new().unwrap(); + let ctx = NCrushContext::new(); let mut buf = [0u8; 16]; let mut writer = NCrushBitWriter::new(&mut buf); @@ -2038,7 +2033,7 @@ mod tests { #[test] fn test_ncrush_compress_basic() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); let data = b"hello world"; let mut dst = vec![0u8; 256]; @@ -2053,7 +2048,7 @@ mod tests { #[test] fn test_ncrush_compress_with_repeats() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // Repetitive data should compress well let data = b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; let mut dst = vec![0u8; 256]; @@ -2069,7 +2064,7 @@ mod tests { #[test] fn test_ncrush_compress_empty() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); let data = b""; let mut dst = vec![0u8; 256]; @@ -2080,7 +2075,7 @@ mod tests { #[test] fn test_ncrush_compress_updates_history_offset() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); let data = b"some test data for ncrush compression"; let mut dst = vec![0u8; 256]; @@ -2095,7 +2090,7 @@ mod tests { #[test] fn test_ncrush_compress_offset_cache_updated() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); // Use data with a repeated pattern to trigger back-references let data = b"for.whom.the.bell.tolls,.the.bell.tolls.for.thee!"; let mut dst = vec![0u8; 256]; @@ -2116,7 +2111,7 @@ mod tests { /// and verifies the output matches the expected compressed bytes exactly. #[test] fn test_ncrush_compress_bells() { - let mut ctx = NCrushContext::new().unwrap(); + let mut ctx = NCrushContext::new(); let mut dst = vec![0u8; 65536]; let (size, flags_out) = ctx.compress(test_data::TEST_BELLS_DATA, &mut dst).unwrap(); @@ -2152,8 +2147,8 @@ mod tests { /// Compress → decompress → verify output matches original input. #[test] fn test_ncrush_roundtrip_bells() { - let mut compressor = NCrushContext::new().unwrap(); - let mut decompressor = NCrushContext::new().unwrap(); + let mut compressor = NCrushContext::new(); + let mut decompressor = NCrushContext::new(); let input = test_data::TEST_BELLS_DATA; let mut compressed = vec![0u8; 65536]; @@ -2179,8 +2174,8 @@ mod tests { /// Round-trip test with a short repetitive pattern. #[test] fn test_ncrush_roundtrip_repetitive() { - let mut compressor = NCrushContext::new().unwrap(); - let mut decompressor = NCrushContext::new().unwrap(); + let mut compressor = NCrushContext::new(); + let mut decompressor = NCrushContext::new(); let input = b"ABCABCABCABCABCABCABCABCABCABCABCABC"; let mut compressed = vec![0u8; 65536]; @@ -2196,8 +2191,8 @@ mod tests { /// Round-trip test with a longer text block containing varied content. #[test] fn test_ncrush_roundtrip_prose() { - let mut compressor = NCrushContext::new().unwrap(); - let mut decompressor = NCrushContext::new().unwrap(); + let mut compressor = NCrushContext::new(); + let mut decompressor = NCrushContext::new(); let input = b"The quick brown fox jumps over the lazy dog. \ The quick brown fox jumps over the lazy dog again. \ @@ -2215,8 +2210,8 @@ mod tests { /// Round-trip test with binary-like data (all byte values 0-255). #[test] fn test_ncrush_roundtrip_binary() { - let mut compressor = NCrushContext::new().unwrap(); - let mut decompressor = NCrushContext::new().unwrap(); + let mut compressor = NCrushContext::new(); + let mut decompressor = NCrushContext::new(); // Create a pattern with all 256 byte values repeated let mut input = Vec::new(); @@ -2239,8 +2234,8 @@ mod tests { /// (tests that history buffer state carries across calls). #[test] fn test_ncrush_roundtrip_sequential() { - let mut compressor = NCrushContext::new().unwrap(); - let mut decompressor = NCrushContext::new().unwrap(); + let mut compressor = NCrushContext::new(); + let mut decompressor = NCrushContext::new(); let inputs: &[&[u8]] = &[ b"first.message.to.compress", diff --git a/crates/ironrdp-client/src/rdp.rs b/crates/ironrdp-client/src/rdp.rs index 073ddfe50..1089511b8 100644 --- a/crates/ironrdp-client/src/rdp.rs +++ b/crates/ironrdp-client/src/rdp.rs @@ -793,7 +793,6 @@ async fn active_session( share_id, enable_server_pointer, pointer_software_rendering, - bulk_decompressor: None, } .build(), ); diff --git a/crates/ironrdp-fuzzing/src/oracles/mod.rs b/crates/ironrdp-fuzzing/src/oracles/mod.rs index 35891c895..7553bfc54 100644 --- a/crates/ironrdp-fuzzing/src/oracles/mod.rs +++ b/crates/ironrdp-fuzzing/src/oracles/mod.rs @@ -33,27 +33,21 @@ pub fn bulk_decompress_mppc(data: &[u8]) { } else { (CompressionType::Rdp5, 0x01) }; - let Ok(mut bulk) = BulkCompressor::new(comp_type) else { - return; - }; + let mut bulk = BulkCompressor::new(comp_type); let _ = bulk.decompress(payload, flags::PACKET_COMPRESSED | algo_bits); } pub fn bulk_decompress_ncrush(data: &[u8]) { use ironrdp_bulk::{BulkCompressor, CompressionType, flags}; - let Ok(mut bulk) = BulkCompressor::new(CompressionType::Rdp6) else { - return; - }; + let mut bulk = BulkCompressor::new(CompressionType::Rdp6); let _ = bulk.decompress(data, flags::PACKET_COMPRESSED | 0x02); } pub fn bulk_decompress_xcrush(data: &[u8]) { use ironrdp_bulk::{BulkCompressor, CompressionType, flags}; - let Ok(mut bulk) = BulkCompressor::new(CompressionType::Rdp61) else { - return; - }; + let mut bulk = BulkCompressor::new(CompressionType::Rdp61); let _ = bulk.decompress(data, flags::PACKET_COMPRESSED | 0x03); } @@ -84,9 +78,7 @@ pub fn bulk_round_trip(data: &[u8]) { _ => CompressionType::Rdp61, }; - let Ok(mut sender) = BulkCompressor::new(algo) else { - return; - }; + let mut sender = BulkCompressor::new(algo); let Ok((compressed_size, compress_flags)) = sender.compress(src) else { return; }; @@ -101,9 +93,7 @@ pub fn bulk_round_trip(data: &[u8]) { sender.compressed_data(compressed_size) }; - let Ok(mut receiver) = BulkCompressor::new(algo) else { - return; - }; + let mut receiver = BulkCompressor::new(algo); let decompressed = receiver .decompress(payload, compress_flags) .unwrap_or_else(|e| panic!("bulk round-trip decompress failed for {algo:?}: {e:?}")); diff --git a/crates/ironrdp-session/src/active_stage.rs b/crates/ironrdp-session/src/active_stage.rs index b7bb82446..0fc571fca 100644 --- a/crates/ironrdp-session/src/active_stage.rs +++ b/crates/ironrdp-session/src/active_stage.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use ironrdp_bulk::BulkCompressor; use ironrdp_connector::ConnectionResult; use ironrdp_connector::connection_activation::ConnectionActivationSequence; use ironrdp_core::{ReadCursor, WriteBuf}; @@ -10,28 +9,17 @@ use ironrdp_graphics::pointer::DecodedPointer; use ironrdp_pdu::geometry::InclusiveRectangle; use ironrdp_pdu::input::fast_path::{FastPathInput, FastPathInputEvent}; use ironrdp_pdu::rdp::autodetect::AutoDetectRequest; -use ironrdp_pdu::rdp::client_info::CompressionType as PduCompressionType; use ironrdp_pdu::rdp::headers::ShareDataPdu; use ironrdp_pdu::rdp::multitransport::MultitransportRequestPdu; use ironrdp_pdu::slow_path::{self, GraphicsUpdateType}; use ironrdp_pdu::{Action, mcs}; use ironrdp_svc::{SvcMessage, SvcProcessor, SvcProcessorMessages}; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use crate::fast_path::UpdateKind; use crate::image::DecodedImage; use crate::{SessionError, SessionErrorExt as _, SessionResult, fast_path, x224}; -/// Converts the PDU-layer compression type to the bulk crate's compression type. -fn to_bulk_compression_type(ct: PduCompressionType) -> ironrdp_bulk::CompressionType { - match ct { - PduCompressionType::K8 => ironrdp_bulk::CompressionType::Rdp4, - PduCompressionType::K64 => ironrdp_bulk::CompressionType::Rdp5, - PduCompressionType::Rdp6 => ironrdp_bulk::CompressionType::Rdp6, - PduCompressionType::Rdp61 => ironrdp_bulk::CompressionType::Rdp61, - } -} - pub struct ActiveStage { x224_processor: x224::Processor, fast_path_processor: fast_path::Processor, @@ -48,28 +36,12 @@ impl ActiveStage { connection_result.connection_activation, ); - // Create bulk decompressor if compression was negotiated - let bulk_decompressor = connection_result.compression_type.and_then(|ct| { - let bulk_ct = to_bulk_compression_type(ct); - match BulkCompressor::new(bulk_ct) { - Ok(compressor) => { - info!(compression_type = %bulk_ct, "Bulk decompressor initialized for FastPath"); - Some(compressor) - } - Err(e) => { - tracing::error!(error = %e, "Failed to create bulk decompressor, compression disabled"); - None - } - } - }); - let fast_path_processor = fast_path::ProcessorBuilder { io_channel_id: connection_result.io_channel_id, user_channel_id: connection_result.user_channel_id, share_id: connection_result.share_id, enable_server_pointer: connection_result.enable_server_pointer, pointer_software_rendering: connection_result.pointer_software_rendering, - bulk_decompressor, } .build(); diff --git a/crates/ironrdp-session/src/fast_path.rs b/crates/ironrdp-session/src/fast_path.rs index 2a05d1a23..2046b5014 100644 --- a/crates/ironrdp-session/src/fast_path.rs +++ b/crates/ironrdp-session/src/fast_path.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use ironrdp_bulk::BulkCompressor; +use ironrdp_bulk::{BulkCompressor, CompressionType}; use ironrdp_core::{DecodeErrorKind, ReadCursor, WriteBuf, decode_cursor}; use ironrdp_graphics::image_processing::PixelFormat; use ironrdp_graphics::pointer::{DecodedPointer, PointerBitmapTarget}; @@ -41,9 +41,12 @@ pub struct Processor { mouse_pos_update: Option<(u16, u16)>, enable_server_pointer: bool, pointer_software_rendering: bool, - /// Bulk decompressor for server-to-client compressed PDUs. - /// `None` when compression was not negotiated. - bulk_decompressor: Option, + /// Bulk decompressor for server-to-client compressed PDUs. Always present: + /// the library constructs it in `ProcessorBuilder::build`, so compressed + /// FastPath updates are always decodable. The construction-time type is + /// irrelevant on receive; `decompress` routes per update on the packet's + /// type bits. + bulk_decompressor: BulkCompressor, /// Current 8bpp color palette. Updated by Palette fast-path updates. palette: Palette, #[cfg(feature = "qoiz")] @@ -104,26 +107,22 @@ impl Processor { let bulk_flags = u32::from(flags.bits()) | u32::from(update_pdu.compression_type.map_or(0, |ct| ct.as_u8())); - if let Some(ref mut decompressor) = self.bulk_decompressor { - let decompressed = decompressor - .decompress(update_pdu.data, bulk_flags) - .map_err(|e| reason_err!("FastPath", "bulk decompression failed: {}", e))?; - // Copy decompressed data before accessing metrics (releases the mutable borrow). - decompressed_data = decompressed.to_vec(); - debug!( - compressed_size = update_pdu.data.len(), - decompressed_size = decompressed_data.len(), - compression_type = ?update_pdu.compression_type, - compression_ratio = format_args!("{:.2}x", decompressor.compression_ratio()), - total_compressed = decompressor.total_compressed_bytes(), - total_uncompressed = decompressor.total_uncompressed_bytes(), - "Decompressed FastPath update" - ); - decompressed_data.as_slice() - } else { - warn!("Received compressed FastPath data but no decompressor is configured"); - update_pdu.data - } + let decompressor = &mut self.bulk_decompressor; + let decompressed = decompressor + .decompress(update_pdu.data, bulk_flags) + .map_err(|e| reason_err!("FastPath", "bulk decompression failed: {}", e))?; + // Copy decompressed data before accessing metrics (releases the mutable borrow). + decompressed_data = decompressed.to_vec(); + debug!( + compressed_size = update_pdu.data.len(), + decompressed_size = decompressed_data.len(), + compression_type = ?update_pdu.compression_type, + compression_ratio = format_args!("{:.2}x", decompressor.compression_ratio()), + total_compressed = decompressor.total_compressed_bytes(), + total_uncompressed = decompressor.total_uncompressed_bytes(), + "Decompressed FastPath update" + ); + decompressed_data.as_slice() } else { // Compression flags present but COMPRESSED bit not set — pass data through. // Still need to inform the decompressor of FLUSHED/AT_FRONT flags even @@ -598,9 +597,6 @@ pub struct ProcessorBuilder { /// `UpdateKind::PointerBitmap` will not be generated. Remote pointer will be drawn /// via software rendering on top of the output image. pub pointer_software_rendering: bool, - /// Bulk decompressor for server-to-client compressed PDUs. - /// `None` when compression was not negotiated. - pub bulk_decompressor: Option, } impl ProcessorBuilder { @@ -615,7 +611,9 @@ impl ProcessorBuilder { mouse_pos_update: None, enable_server_pointer: self.enable_server_pointer, pointer_software_rendering: self.pointer_software_rendering, - bulk_decompressor: self.bulk_decompressor, + // Always present; Rdp61 is an arbitrary default because `decompress` + // routes per update on the packet's type bits, not on this value. + bulk_decompressor: BulkCompressor::new(CompressionType::Rdp61), palette: Palette::system_default(), #[cfg(feature = "qoiz")] zdctx: zstd_safe::DCtx::default(), diff --git a/crates/ironrdp-testsuite-core/Cargo.toml b/crates/ironrdp-testsuite-core/Cargo.toml index 68cbd45fd..7f74032d3 100644 --- a/crates/ironrdp-testsuite-core/Cargo.toml +++ b/crates/ironrdp-testsuite-core/Cargo.toml @@ -40,6 +40,7 @@ hex = "0.4" ironrdp-cliprdr-format.path = "../ironrdp-cliprdr-format" ironrdp-cliprdr = { path = "../ironrdp-cliprdr", features = ["__test"] } ironrdp-acceptor.path = "../ironrdp-acceptor" +ironrdp-bulk.path = "../ironrdp-bulk" ironrdp-connector.path = "../ironrdp-connector" ironrdp-displaycontrol.path = "../ironrdp-displaycontrol" ironrdp-dvc.path = "../ironrdp-dvc" diff --git a/crates/ironrdp-testsuite-core/tests/session/fast_path.rs b/crates/ironrdp-testsuite-core/tests/session/fast_path.rs new file mode 100644 index 000000000..20e89d77d --- /dev/null +++ b/crates/ironrdp-testsuite-core/tests/session/fast_path.rs @@ -0,0 +1,146 @@ +//! Regression tests for bulk-compressed FastPath updates. +//! +//! A FastPath `Processor` always owns a bulk decompressor: the library builds +//! one in `ProcessorBuilder::build`. A server can send a compressed FastPath +//! update (for example a full-frame redraw after a resize) regardless of what +//! the client advertised, so the decompressor must always be present. These +//! tests pin that a compressed update decodes to exactly the same pixels as its +//! uncompressed twin rather than being dropped or aborting the session. + +use ironrdp_bulk::{BulkCompressor, CompressionType as BulkCompressionType, flags as bulk_flags}; +use ironrdp_core::{WriteBuf, encode_vec}; +use ironrdp_graphics::image_processing::PixelFormat; +use ironrdp_pdu::bitmap::{BitmapData, BitmapUpdateData, Compression}; +use ironrdp_pdu::fast_path::{ + Compression as FpCompression, EncryptionFlags, FastPathHeader, FastPathUpdatePdu, Fragmentation, UpdateCode, +}; +use ironrdp_pdu::geometry::InclusiveRectangle; +use ironrdp_pdu::rdp::client_info::CompressionType as PduCompressionType; +use ironrdp_pdu::rdp::headers::CompressionFlags; +use ironrdp_session::fast_path::{Processor, ProcessorBuilder}; +use ironrdp_session::image::DecodedImage; + +const IMAGE_DIM: u16 = 64; +const RECT_DIM: u16 = 8; + +fn processor() -> Processor { + ProcessorBuilder { + io_channel_id: 0, + user_channel_id: 0, + share_id: 0, + enable_server_pointer: false, + pointer_software_rendering: false, + } + .build() +} + +/// Encodes the inner FastPath Bitmap update: a single uncompressed 8x8 32bpp +/// rectangle filled with a repetitive (and therefore compressible) pattern. +fn bitmap_update_payload() -> Vec { + let pixels: Vec = (0..RECT_DIM * RECT_DIM) + .flat_map(|_| [0x11u8, 0x22, 0x33, 0xff]) + .collect(); + + let update = BitmapUpdateData { + rectangles: vec![BitmapData { + rectangle: InclusiveRectangle { + left: 0, + top: 0, + right: RECT_DIM - 1, + bottom: RECT_DIM - 1, + }, + width: RECT_DIM, + height: RECT_DIM, + bits_per_pixel: 32, + compression_flags: Compression::empty(), + compressed_data_header: None, + bitmap_data: &pixels, + }], + }; + + encode_vec(&update).expect("encode bitmap update") +} + +/// Wraps an encoded `FastPathUpdatePdu` in a FastPath output frame (the +/// `FastPathHeader` that `Processor::process` expects to read first). +fn fastpath_frame(update_pdu: &[u8]) -> Vec { + let header = FastPathHeader::new(EncryptionFlags::empty(), update_pdu.len()); + let mut frame = encode_vec(&header).expect("encode FastPath header"); + frame.extend_from_slice(update_pdu); + frame +} + +/// Runs one FastPath frame through a fresh processor and returns the resulting +/// framebuffer. +fn render(frame: &[u8]) -> DecodedImage { + let mut image = DecodedImage::new(PixelFormat::RgbA32, IMAGE_DIM, IMAGE_DIM); + let mut output = WriteBuf::new(); + processor() + .process(&mut image, frame, &mut output) + .expect("process FastPath frame"); + image +} + +#[test] +fn compressed_fastpath_update_decompresses_like_its_uncompressed_twin() { + let payload = bitmap_update_payload(); + + let uncompressed = fastpath_frame( + &encode_vec(&FastPathUpdatePdu { + fragmentation: Fragmentation::Single, + update_code: UpdateCode::Bitmap, + compression_flags: None, + compression_type: None, + data: &payload, + }) + .expect("encode uncompressed FastPath update"), + ); + + // Bulk-compress the same payload (RDP5 / MPPC) and wrap it as a compressed + // FastPath update. + let mut compressor = BulkCompressor::new(BulkCompressionType::Rdp5); + let (size, flags) = compressor.compress(&payload).expect("bulk compress payload"); + assert_ne!( + flags & bulk_flags::PACKET_COMPRESSED, + 0, + "test payload must actually compress; adjust it if MPPC declines to compress" + ); + let compressed_data = compressor.compressed_data(size).to_vec(); + + let mut compressed_pdu = encode_vec(&FastPathUpdatePdu { + fragmentation: Fragmentation::Single, + update_code: UpdateCode::Bitmap, + // Carry the compressor's control bits (notably PACKET_AT_FRONT) so the + // decompressor resets its history at the start of the stream. + compression_flags: Some(CompressionFlags::from_bits_retain( + u8::try_from(flags & 0xe0).expect("control-flag byte fits in u8"), + )), + compression_type: Some(PduCompressionType::K64), + data: &compressed_data, + }) + .expect("encode compressed FastPath update"); + // `FastPathUpdatePdu::encode` writes the update header byte before it sets the + // COMPRESSION_USED bit, so the encoded header does not flag the trailing + // compression byte. Set it here so the PDU decodes as compressed (idempotent + // if the encoder is corrected). + compressed_pdu[0] |= FpCompression::COMPRESSION_USED.bits() << 6; + let compressed = fastpath_frame(&compressed_pdu); + + let from_uncompressed = render(&uncompressed); + let from_compressed = render(&compressed); + + assert_eq!( + from_uncompressed.data(), + from_compressed.data(), + "compressed FastPath update rendered differently from its uncompressed twin" + ); + + // Sanity check: the update actually drew something, so the equality above + // compares real pixels rather than two blank frames. + let blank = DecodedImage::new(PixelFormat::RgbA32, IMAGE_DIM, IMAGE_DIM); + assert_ne!( + from_uncompressed.data(), + blank.data(), + "the bitmap update should have modified the framebuffer" + ); +} diff --git a/crates/ironrdp-testsuite-core/tests/session/mod.rs b/crates/ironrdp-testsuite-core/tests/session/mod.rs index e9126d910..fd2dd100f 100644 --- a/crates/ironrdp-testsuite-core/tests/session/mod.rs +++ b/crates/ironrdp-testsuite-core/tests/session/mod.rs @@ -1,5 +1,6 @@ mod autodetect; mod connection_activation; +mod fast_path; mod rfx; #[cfg(test)] diff --git a/crates/ironrdp-testsuite-extra/tests/main.rs b/crates/ironrdp-testsuite-extra/tests/main.rs index 8c6f66dc3..814486c04 100644 --- a/crates/ironrdp-testsuite-extra/tests/main.rs +++ b/crates/ironrdp-testsuite-extra/tests/main.rs @@ -106,7 +106,6 @@ async fn test_deactivation_reactivation() { share_id, enable_server_pointer, pointer_software_rendering, - bulk_decompressor: None, } .build(), ); diff --git a/crates/ironrdp-web/src/session.rs b/crates/ironrdp-web/src/session.rs index ba1c31e5c..18f70b3d6 100644 --- a/crates/ironrdp-web/src/session.rs +++ b/crates/ironrdp-web/src/session.rs @@ -1025,7 +1025,6 @@ impl iron_remote_desktop::Session for Session { share_id, enable_server_pointer, pointer_software_rendering, - bulk_decompressor: None, } .build(), ); diff --git a/ffi/src/session/mod.rs b/ffi/src/session/mod.rs index 37728024e..cf33be624 100644 --- a/ffi/src/session/mod.rs +++ b/ffi/src/session/mod.rs @@ -175,7 +175,6 @@ pub mod ffi { share_id, enable_server_pointer, pointer_software_rendering, - bulk_decompressor: None, } .build(), );