From db13ba6316e678d077fb4fd1bbe1c1dcd015ecae Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 24 Apr 2026 14:11:24 +0300 Subject: [PATCH] feat: add quic transport Adds a QUIC transport based on the upcoming Node.js PR [here](https://github.com/nodejs/node/pull/62876). Currently you need to build a custom Node.js binary for the tests to pass using the instructions on the linked PR. --- packages/integration-tests/package.json | 1 + .../test/compliance/transport/quic.spec.ts | 74 +++++ packages/integration-tests/test/interop.ts | 6 + packages/interop/src/connect/index.ts | 2 +- packages/interop/src/index.ts | 2 +- packages/transport-quic/.aegir.js | 7 + packages/transport-quic/CODE_OF_CONDUCT.md | 3 + packages/transport-quic/LICENSE-APACHE | 201 ++++++++++++ packages/transport-quic/LICENSE-MIT | 19 ++ packages/transport-quic/README.md | 71 ++++ packages/transport-quic/package.json | 82 +++++ packages/transport-quic/src/constants.ts | 10 + packages/transport-quic/src/index.ts | 130 ++++++++ packages/transport-quic/src/listener.ts | 264 +++++++++++++++ packages/transport-quic/src/muxer.ts | 70 ++++ packages/transport-quic/src/quic.browser.ts | 34 ++ packages/transport-quic/src/quic.ts | 171 ++++++++++ .../transport-quic/src/session-to-conn.ts | 91 ++++++ packages/transport-quic/src/stream.ts | 122 +++++++ .../src/utils/get-remote-certificate.ts | 14 + packages/transport-quic/test/browser.ts | 22 ++ .../transport-quic/test/connection.spec.ts | 97 ++++++ packages/transport-quic/test/filter.spec.ts | 45 +++ .../transport-quic/test/listen-dial.spec.ts | 302 ++++++++++++++++++ packages/transport-quic/tsconfig.json | 30 ++ packages/transport-quic/typedoc.json | 6 + 26 files changed, 1874 insertions(+), 2 deletions(-) create mode 100644 packages/integration-tests/test/compliance/transport/quic.spec.ts create mode 100644 packages/transport-quic/.aegir.js create mode 100644 packages/transport-quic/CODE_OF_CONDUCT.md create mode 100644 packages/transport-quic/LICENSE-APACHE create mode 100644 packages/transport-quic/LICENSE-MIT create mode 100644 packages/transport-quic/README.md create mode 100644 packages/transport-quic/package.json create mode 100644 packages/transport-quic/src/constants.ts create mode 100644 packages/transport-quic/src/index.ts create mode 100644 packages/transport-quic/src/listener.ts create mode 100644 packages/transport-quic/src/muxer.ts create mode 100644 packages/transport-quic/src/quic.browser.ts create mode 100644 packages/transport-quic/src/quic.ts create mode 100644 packages/transport-quic/src/session-to-conn.ts create mode 100644 packages/transport-quic/src/stream.ts create mode 100644 packages/transport-quic/src/utils/get-remote-certificate.ts create mode 100644 packages/transport-quic/test/browser.ts create mode 100644 packages/transport-quic/test/connection.spec.ts create mode 100644 packages/transport-quic/test/filter.spec.ts create mode 100644 packages/transport-quic/test/listen-dial.spec.ts create mode 100644 packages/transport-quic/tsconfig.json create mode 100644 packages/transport-quic/typedoc.json diff --git a/packages/integration-tests/package.json b/packages/integration-tests/package.json index c393a2d619..a7c1b4836f 100644 --- a/packages/integration-tests/package.json +++ b/packages/integration-tests/package.json @@ -38,6 +38,7 @@ "@libp2p/mdns": "^12.0.18", "@libp2p/memory": "^2.0.17", "@libp2p/mplex": "^12.0.18", + "@libp2p/quic": "^0.0.0", "@chainsafe/libp2p-noise": "^17.0.0", "@libp2p/peer-collections": "^7.0.17", "@libp2p/peer-id": "^6.0.8", diff --git a/packages/integration-tests/test/compliance/transport/quic.spec.ts b/packages/integration-tests/test/compliance/transport/quic.spec.ts new file mode 100644 index 0000000000..914a7c96ff --- /dev/null +++ b/packages/integration-tests/test/compliance/transport/quic.spec.ts @@ -0,0 +1,74 @@ +import tests from '@libp2p/interface-compliance-tests/transport' +import { quic } from '@libp2p/quic' +import { QUIC_V1 } from '@multiformats/multiaddr-matcher' +import { isBrowser, isElectron, isWebWorker } from 'wherearewe' + +describe('quic transport interface compliance IPv4', () => { + if (isBrowser || isWebWorker || isElectron) { + return + } + + tests({ + async setup () { + const dialer = { + transports: [ + quic() + ], + connectionMonitor: { + enabled: false + } + } + + return { + dialer, + listener: { + addresses: { + listen: [ + '/ip4/127.0.0.1/udp/0/quic-v1', + '/ip4/127.0.0.1/udp/0/quic-v1' + ] + }, + ...dialer + }, + dialMultiaddrMatcher: QUIC_V1, + listenMultiaddrMatcher: QUIC_V1 + } + }, + async teardown () {} + }) +}) + +describe('quic transport interface compliance IPv6', () => { + if (isBrowser || isWebWorker || isElectron) { + return + } + + tests({ + async setup () { + const dialer = { + transports: [ + quic() + ], + connectionMonitor: { + enabled: false + } + } + + return { + dialer, + listener: { + addresses: { + listen: [ + '/ip6/::/udp/0/quic-v1', + '/ip6/::/udp/0/quic-v1' + ] + }, + ...dialer + }, + dialMultiaddrMatcher: QUIC_V1, + listenMultiaddrMatcher: QUIC_V1 + } + }, + async teardown () {} + }) +}) diff --git a/packages/integration-tests/test/interop.ts b/packages/integration-tests/test/interop.ts index b0cf025c2c..9abf835ccb 100644 --- a/packages/integration-tests/test/interop.ts +++ b/packages/integration-tests/test/interop.ts @@ -28,6 +28,7 @@ import type { SpawnOptions, Daemon, DaemonFactory } from '@libp2p/interop' import type { Ping } from '@libp2p/ping' import type { Multiaddr } from '@multiformats/multiaddr' import type { Libp2pOptions, ServiceFactoryMap } from 'libp2p' +import { quic } from '@libp2p/quic' /** * @packageDocumentation @@ -53,6 +54,8 @@ async function createGoPeer (options: SpawnOptions): Promise { } else { if (options.transport == null || options.transport === 'tcp') { opts.push('-hostAddrs=/ip4/127.0.0.1/tcp/0') + } else if (options.transport === 'quic') { + opts.push('-hostAddrs=/ip4/127.0.0.1/udp/0/quic-v1') } else if (options.transport === 'webtransport') { opts.push('-hostAddrs=/ip4/127.0.0.1/udp/0/quic-v1/webtransport') } else if (options.transport === 'webrtc-direct') { @@ -152,6 +155,7 @@ async function createJsPeer (options: SpawnOptions): Promise { listen: [] }, transports: [ + quic(), tcp(), circuitRelayTransport(), webRTCDirect() @@ -163,6 +167,8 @@ async function createJsPeer (options: SpawnOptions): Promise { if (options.noListen !== true) { if (options.transport == null || options.transport === 'tcp') { opts.addresses?.listen?.push('/ip4/127.0.0.1/tcp/0') + } else if (options.transport === 'quic') { + opts.addresses?.listen?.push('/ip4/127.0.0.1/udp/0/quic-v1') } else if (options.transport === 'webrtc-direct') { opts.addresses?.listen?.push('/ip4/127.0.0.1/udp/0/webrtc-direct') } else { diff --git a/packages/interop/src/connect/index.ts b/packages/interop/src/connect/index.ts index 4558b8cb04..1e8df4561e 100644 --- a/packages/interop/src/connect/index.ts +++ b/packages/interop/src/connect/index.ts @@ -3,7 +3,7 @@ import type { Daemon, DaemonFactory, NodeType, SpawnOptions, TransportType } fro export function connectTests (factory: DaemonFactory): void { const nodeTypes: NodeType[] = ['js', 'go'] - const transportTypes: TransportType[] = ['tcp', 'webtransport', 'webrtc-direct'] + const transportTypes: TransportType[] = ['quic', 'tcp', 'webtransport', 'webrtc-direct'] for (const typeA of nodeTypes) { for (const typeB of nodeTypes) { diff --git a/packages/interop/src/index.ts b/packages/interop/src/index.ts index 7536e26bda..c76c6d5903 100644 --- a/packages/interop/src/index.ts +++ b/packages/interop/src/index.ts @@ -59,7 +59,7 @@ export type PeerIdType = 'rsa' | 'ed25519' | 'secp256k1' export type PubSubRouter = 'gossipsub' | 'floodsub' export type Muxer = 'mplex' | 'yamux' export type Encryption = 'noise' | 'tls' | 'plaintext' -export type TransportType = 'tcp' | 'webtransport' | 'webrtc-direct' +export type TransportType = 'quic' | 'tcp' | 'webtransport' | 'webrtc-direct' export interface SpawnOptions { type: NodeType diff --git a/packages/transport-quic/.aegir.js b/packages/transport-quic/.aegir.js new file mode 100644 index 0000000000..b2624c33ba --- /dev/null +++ b/packages/transport-quic/.aegir.js @@ -0,0 +1,7 @@ + +/** @type {import('aegir').PartialOptions} */ +export default { + build: { + bundlesizeMax: '1KB' + } +} diff --git a/packages/transport-quic/CODE_OF_CONDUCT.md b/packages/transport-quic/CODE_OF_CONDUCT.md new file mode 100644 index 0000000000..6b0fa54c54 --- /dev/null +++ b/packages/transport-quic/CODE_OF_CONDUCT.md @@ -0,0 +1,3 @@ +# Contributor Code of Conduct + +This project follows the [`IPFS Community Code of Conduct`](https://github.com/ipfs/community/blob/master/code-of-conduct.md) diff --git a/packages/transport-quic/LICENSE-APACHE b/packages/transport-quic/LICENSE-APACHE new file mode 100644 index 0000000000..b09cd7856d --- /dev/null +++ b/packages/transport-quic/LICENSE-APACHE @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/packages/transport-quic/LICENSE-MIT b/packages/transport-quic/LICENSE-MIT new file mode 100644 index 0000000000..72dc60d84b --- /dev/null +++ b/packages/transport-quic/LICENSE-MIT @@ -0,0 +1,19 @@ +The MIT License (MIT) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/packages/transport-quic/README.md b/packages/transport-quic/README.md new file mode 100644 index 0000000000..c3c67e7028 --- /dev/null +++ b/packages/transport-quic/README.md @@ -0,0 +1,71 @@ +# @libp2p/quic + +[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) +[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) +[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p) +[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p/main.yml?branch=main\&style=flat-square)](https://github.com/libp2p/js-libp2p/actions/workflows/main.yml?query=branch%3Amain) + +> A QUIC transport for libp2p + +# About + + + +A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) based on the QUIC networking stack. + +## Example + +```TypeScript +import { createLibp2p } from 'libp2p' +import { quic } from '@libp2p/quic' +import { multiaddr } from '@multiformats/multiaddr' + +const node = await createLibp2p({ + transports: [ + quic() + ] +}) + +const ma = multiaddr('/ip4/123.123.123.123/udp/1234/quic-v1') + +// dial a QUIC connection, timing out after 10 seconds +const connection = await node.dial(ma, { + signal: AbortSignal.timeout(10_000) +}) + +// use connection... +``` + +# Install + +```console +$ npm i @libp2p/quic +``` + +# API Docs + +- + +# License + +Licensed under either of + +- Apache 2.0, ([LICENSE-APACHE](https://github.com/libp2p/js-libp2p/blob/main/packages/transport-quic/LICENSE-APACHE) / ) +- MIT ([LICENSE-MIT](https://github.com/libp2p/js-libp2p/blob/main/packages/transport-quic/LICENSE-MIT) / ) + +# Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/packages/transport-quic/package.json b/packages/transport-quic/package.json new file mode 100644 index 0000000000..1d64393fcb --- /dev/null +++ b/packages/transport-quic/package.json @@ -0,0 +1,82 @@ +{ + "name": "@libp2p/quic", + "version": "0.0.0", + "description": "A QUIC transport for libp2p", + "license": "Apache-2.0 OR MIT", + "homepage": "https://github.com/libp2p/js-libp2p/tree/main/packages/transport-quic#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/libp2p/js-libp2p.git" + }, + "bugs": { + "url": "https://github.com/libp2p/js-libp2p/issues" + }, + "publishConfig": { + "access": "public", + "provenance": true + }, + "keywords": [ + "IPFS", + "QUIC", + "libp2p", + "network", + "p2p", + "peer", + "peer-to-peer" + ], + "type": "module", + "types": "./dist/src/index.d.ts", + "files": [ + "src", + "dist", + "!dist/test", + "!**/*.tsbuildinfo" + ], + "exports": { + ".": { + "types": "./dist/src/index.d.ts", + "import": "./dist/src/index.js", + "module-sync": "./dist/src/index.js" + } + }, + "scripts": { + "clean": "aegir clean", + "lint": "aegir lint", + "dep-check": "aegir dep-check", + "doc-check": "aegir doc-check", + "build": "aegir build", + "test": "aegir test -t node", + "test:chrome": "aegir test -t browser -f ./dist/test/browser.js --cov", + "test:chrome-webworker": "aegir test -t webworker -f ./dist/test/browser.js", + "test:firefox": "aegir test -t browser -f ./dist/test/browser.js -- --browser firefox", + "test:firefox-webworker": "aegir test -t webworker -f ./dist/test/browser.js -- --browser firefox", + "test:node": "aegir test -t node --cov" + }, + "dependencies": { + "@libp2p/interface": "^3.1.1", + "@libp2p/peer-id": "^6.0.8", + "@libp2p/tls": "^3.0.17", + "@libp2p/utils": "^7.0.14", + "@multiformats/multiaddr": "^13.0.1", + "@multiformats/multiaddr-matcher": "^3.0.2", + "@types/sinon": "^20.0.0", + "progress-events": "^1.1.0", + "race-signal": "^2.0.0", + "uint8arraylist": "^2.4.9", + "uint8arrays": "^5.1.1" + }, + "devDependencies": { + "@libp2p/crypto": "^5.1.17", + "@libp2p/logger": "^6.2.3", + "aegir": "^47.0.26", + "p-defer": "^4.0.1", + "p-wait-for": "^6.0.0", + "sinon": "^21.0.3", + "sinon-ts": "^2.0.0", + "wherearewe": "^2.0.1" + }, + "browser": { + "./dist/src/quic.js": "./dist/src/quic.browser.js" + }, + "sideEffects": false +} diff --git a/packages/transport-quic/src/constants.ts b/packages/transport-quic/src/constants.ts new file mode 100644 index 0000000000..91477850d0 --- /dev/null +++ b/packages/transport-quic/src/constants.ts @@ -0,0 +1,10 @@ +// p2p multi-address code +export const CODE_P2P = 421 +export const CODE_CIRCUIT = 290 +export const CODE_UNIX = 400 + +// Time to wait for a connection to close gracefully before destroying it manually +export const CLOSE_TIMEOUT = 500 + +// Close the socket if there is no activity after this long in ms +export const SOCKET_TIMEOUT = 2 * 60000 // 2 mins diff --git a/packages/transport-quic/src/index.ts b/packages/transport-quic/src/index.ts new file mode 100644 index 0000000000..415f32cc5d --- /dev/null +++ b/packages/transport-quic/src/index.ts @@ -0,0 +1,130 @@ +/** + * @packageDocumentation + * + * A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) based on the QUIC networking stack. + * + * @example + * + * ```TypeScript + * import { createLibp2p } from 'libp2p' + * import { quic } from '@libp2p/quic' + * import { multiaddr } from '@multiformats/multiaddr' + * + * const node = await createLibp2p({ + * transports: [ + * quic() + * ] + * }) + * + * const ma = multiaddr('/ip4/123.123.123.123/udp/1234/quic-v1') + * + * // dial a QUIC connection, timing out after 10 seconds + * const connection = await node.dial(ma, { + * signal: AbortSignal.timeout(10_000) + * }) + * + * // use connection... + * ``` + */ + +import { QUIC } from './quic.ts' +import type { ComponentLogger, CounterGroup, Metrics, CreateListenerOptions, DialTransportOptions, Transport, OutboundConnectionUpgradeEvents, PrivateKey } from '@libp2p/interface' +import type { ProgressEvent } from 'progress-events' + +export interface QUICOptions { + /** + * An optional number in ms that is used as an inactivity timeout after which the socket will be closed + */ + inboundSocketInactivityTimeout?: number + + /** + * An optional number in ms that is used as an inactivity timeout after which the socket will be closed + */ + outboundSocketInactivityTimeout?: number + + /** + * When closing a socket, wait this long for it to close gracefully before it is closed more forcibly + */ + socketCloseTimeout?: number + + /** + * Set this property to reject connections when the server's connection count gets high. + * https://nodejs.org/api/net.html#servermaxconnections + */ + maxConnections?: number + + /** + * Parameter to specify the maximum length of the queue of pending connections + * https://nodejs.org/dist/latest-v18.x/docs/api/net.html#serverlisten + */ + backlog?: number + + /** + * Options passed to `net.connect` for every opened TCP socket + */ + dialOpts?: QUICSocketOptions + + /** + * Options passed to every `net.createServer` for every TCP server + */ + listenOpts?: QUICSocketOptions + + /** + * How many concurrent streams are allowed on outbound connections + */ + maxOutboundStreams?: number +} + +/** + * Expose a subset of net.connect options + */ +export interface QUICSocketOptions { + /** + * @see https://nodejs.org/api/net.html#socketconnectoptions-connectlistener + */ + noDelay?: boolean + + /** + * @see https://nodejs.org/api/net.html#socketconnectoptions-connectlistener + */ + keepAlive?: boolean + + /** + * @see https://nodejs.org/api/net.html#socketconnectoptions-connectlistener + */ + keepAliveInitialDelay?: number + + /** + * @see https://nodejs.org/api/net.html#new-netsocketoptions + */ + allowHalfOpen?: boolean +} + +export type QUICDialEvents = + OutboundConnectionUpgradeEvents | + ProgressEvent<'quic:open-connection'> + +export interface QUICDialOptions extends DialTransportOptions, QUICSocketOptions { + +} + +export interface QUICCreateListenerOptions extends CreateListenerOptions, QUICSocketOptions { + +} + +export interface QUICComponents { + privateKey: PrivateKey + metrics?: Metrics + logger: ComponentLogger +} + +export interface QUICMetrics { + events: CounterGroup<'error' | 'timeout' | 'connect' | 'abort'> + errors: CounterGroup<'outbound_verify_peer' | 'outbound_to_connection' | 'outbound_upgrade'> +} + +export function quic (init: QUICOptions = {}): (components: QUICComponents) => Transport { + return (components: QUICComponents) => { + return new QUIC(components, init) + } +} diff --git a/packages/transport-quic/src/listener.ts b/packages/transport-quic/src/listener.ts new file mode 100644 index 0000000000..e90bc0a286 --- /dev/null +++ b/packages/transport-quic/src/listener.ts @@ -0,0 +1,264 @@ +import * as crypto from 'node:crypto' +import * as os from 'node:os' +import { setMaxListeners, TypedEventEmitter } from '@libp2p/interface' +import { generateCertificate, verifyPeerCertificate } from '@libp2p/tls/utils' +import { getNetConfig } from '@libp2p/utils' +import { multiaddr } from '@multiformats/multiaddr' +import * as net from 'node:quic' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { quicMuxer } from './muxer.ts' +import { toMultiaddrConnection } from './session-to-conn.ts' +import { getRemoteCertificate } from './utils/get-remote-certificate.ts' +import type { Upgrader, Listener, ListenerEvents, CreateListenerOptions, ComponentLogger, Metrics, Logger, MetricGroup, CounterGroup, PrivateKey } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' +import type { QuicEndpoint, QuicSession } from 'node:quic' + +const networks = os.networkInterfaces() + +function isAnyAddr (ip: string): boolean { + return ['0.0.0.0', '::'].includes(ip) +} + +function getNetworkAddrs (family: string): string[] { + const addresses: string[] = [] + + for (const [, netAddrs] of Object.entries(networks)) { + if (netAddrs != null) { + for (const netAddr of netAddrs) { + if (netAddr.family === family) { + addresses.push(netAddr.address) + } + } + } + } + + return addresses +} + +const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' } + +export interface QUICListenerComponents { + privateKey: PrivateKey + metrics?: Metrics +} + +export interface QUICListenerInit extends CreateListenerOptions { + upgrader: Upgrader + socketInactivityTimeout?: number + socketCloseTimeout?: number + maxConnections?: number + maxInboundStreams?: number + metrics?: Metrics + logger: ComponentLogger +} + +export interface QUICListenerMetrics { + status?: MetricGroup + errors?: CounterGroup + events?: CounterGroup +} + +export class QUICListener extends TypedEventEmitter implements Listener { + private endpoint?: QuicEndpoint + private readonly upgrader: Upgrader + private readonly sessions: Set + private readonly components: QUICListenerComponents + private readonly log: Logger + private readonly logger: ComponentLogger + private readonly maxInboundStreams: number + private readonly socketInactivityTimeout: number + private readonly socketCloseTimeout: number + private readonly metrics: QUICListenerMetrics + private addr?: string + private readonly shutdownController: AbortController + + constructor (components: QUICListenerComponents, init: QUICListenerInit) { + super() + + this.components = components + this.upgrader = init.upgrader + this.sessions = new Set() + this.log = init.logger.forComponent('libp2p:quic:listener') + this.logger = init.logger + this.maxInboundStreams = init.maxInboundStreams ?? 1000 + this.socketInactivityTimeout = init.socketInactivityTimeout ?? 5000 + this.socketCloseTimeout = init.socketCloseTimeout ?? 5000 + + this.shutdownController = new AbortController() + setMaxListeners(Infinity, this.shutdownController.signal) + + init.metrics?.registerMetricGroup('libp2p_quic_inbound_sessions_total', { + label: 'address', + help: 'Current active sessions in QUIC listener', + calculate: () => { + if (this.addr == null) { + return {} + } + + return { + [this.addr]: this.sessions.size + } + } + }) + + this.metrics = { + status: init.metrics?.registerMetricGroup('libp2p_quic_listener_status_info', { + label: 'address', + help: 'Current status of the QUIC listener socket' + }), + errors: init.metrics?.registerMetricGroup('libp2p_quic_listener_errors_total', { + label: 'address', + help: 'Total count of QUIC listener errors by type' + }), + events: init.metrics?.registerMetricGroup('libp2p_quic_listener_events_total', { + label: 'address', + help: 'Total count of QUIC listener events by type' + }) + } + } + + async onSession (session: QuicSession): Promise { + try { + // TODO: incoming dial timeout + const cert = await getRemoteCertificate(session, { + signal: this.shutdownController.signal + }) + + // read one stream to do authentication + this.log('secure inbound stream') + const remotePeer = await verifyPeerCertificate(cert, undefined, this.log) + this.shutdownController.signal?.throwIfAborted() + + this.log('incoming peer %p', remotePeer) + + const path = session.path + + if (path == null) { + throw new Error('Session did not have path') + } + + // upgrade it + const maConn = toMultiaddrConnection(session, { + remoteAddr: multiaddr(`/ip${path.remote.family === 'ipv4' ? '4' : '6'}/${path.remote.address}/udp/${path.remote.port}/quic-v1`), + log: this.log, + inactivityTimeout: this.socketInactivityTimeout, + closeTimeout: this.socketCloseTimeout, + metrics: this.metrics?.events, + metricPrefix: `${this.addr} `, + logger: this.logger, + direction: 'inbound' + }) + + this.log('upgrading inbound connection') + await this.upgrader.upgradeInbound(maConn, { + skipEncryption: true, + skipProtection: true, + remotePeer, + signal: this.shutdownController.signal, + muxerFactory: quicMuxer(session, this.logger, { + maxInboundStreams: this.maxInboundStreams + }) + }) + + this.log('inbound connection upgrade complete') + // TODO: remove from sessions set + this.sessions.add(session) + } catch (err: any) { + this.log('inbound connection failed to upgrade - %e', err) + try { + // @ts-expect-error not in types + await session.close({ + type: 'application', + reason: err.message + }) + } catch (err) { + session.destroy(err) + } + } + } + + getAddrs (): Multiaddr[] { + if (this.endpoint == null) { + return [] + } + + const address = this.endpoint.address + + if (address == null) { + return [] + } + + const proto = address.family === 'ipv4' ? 'ip4' : 'ip6' + const toMa = (ip: string): Multiaddr => multiaddr(`/${proto}/${ip}/udp/${address.port}/quic-v1`) + + return (isAnyAddr(address.address) ? getNetworkAddrs(ProtoFamily[proto]) : [address.address]).map(toMa) + } + + updateAnnounceAddrs (addrs: Multiaddr[]): void { + + } + + async listen (ma: Multiaddr): Promise { + if (this.endpoint?.address != null) { + return + } + + const netConfig = getNetConfig(ma) + const pem = await generateCertificate(this.components.privateKey) + this.shutdownController.signal.throwIfAborted() + + try { + this.endpoint = await net.listen(this.onSession.bind(this), { + // @ts-expect-error types are wrong + sni: { + '*': { + certs: uint8ArrayFromString(pem.cert), + keys: crypto.createPrivateKey(pem.key) + } + }, + endpoint: { + address: `${netConfig.host}:${netConfig.port}` + }, + alpn: 'libp2p', + verifyClient: true, + rejectUnauthorized: false + }) + this.shutdownController.signal.throwIfAborted() + } catch (err: any) { + this.metrics.errors?.increment({ [`${this.addr} listen_error`]: true }) + this.safeDispatchEvent('error', { detail: err }) + + throw err + } + + const address = this.endpoint.address + + if (address != null) { + this.addr = `${address.address}:${address.port}` + } + + this.endpoint.closed + .then(() => { + this.safeDispatchEvent('close') + }) + .catch(err => { + this.metrics.errors?.increment({ [`${this.addr} close_error`]: true }) + this.safeDispatchEvent('error', { detail: err }) + }) + + this.safeDispatchEvent('listening') + + this.log('listening on %s', this.addr) + } + + async close (): Promise { + // TODO: this leaves sessions open until they time out thought the docs say + // they should be closed immediately + this.endpoint?.destroy?.() + // @ts-expect-error endpoint.destroy returns a promise - https://github.com/jasnell/node/blob/bbd0da0ae8862a882144dbcb6efa115b1068223c/lib/internal/quic/quic.js#L3740 + .catch(() => {}) + + // stop any in-progress connection upgrades + this.shutdownController.abort() + } +} diff --git a/packages/transport-quic/src/muxer.ts b/packages/transport-quic/src/muxer.ts new file mode 100644 index 0000000000..7f55f898c8 --- /dev/null +++ b/packages/transport-quic/src/muxer.ts @@ -0,0 +1,70 @@ +import { AbstractStreamMuxer } from '@libp2p/utils' +import { quicBiDiStreamToStream } from './stream.ts' +import type { QUICStream } from './stream.ts' +import type { ComponentLogger, CreateStreamOptions, MultiaddrConnection, StreamMuxer, StreamMuxerFactory } from '@libp2p/interface' +import type { QuicSession, QuicStream } from 'node:quic' + +const PROTOCOL = '/quic-v1' + +export interface QUICTransportStreamMuxerInit { + maxInboundStreams?: number + maxOutboundStreams?: number +} + +class QUICTransportStreamMuxer extends AbstractStreamMuxer { + private session: QuicSession + private streamIDCounter: number + + constructor (session: QuicSession, maConn: MultiaddrConnection, init: QUICTransportStreamMuxerInit = {}) { + super(maConn, { + protocol: PROTOCOL, + name: 'muxer' + }) + + this.session = session + this.streamIDCounter = 0 + + //! TODO unclear how to add backpressure here? + this.session.onstream = (stream: QuicStream) => { + this.onRemoteStream( + quicBiDiStreamToStream( + stream, + String(this.streamIDCounter++), + 'inbound', + this.log, + this.streamOptions + ) + ) + } + } + + async onCreateStream (options: CreateStreamOptions): Promise { + const quicStream = await this.session.createBidirectionalStream() + options?.signal?.throwIfAborted() + + return quicBiDiStreamToStream( + quicStream, + String(this.streamIDCounter++), + 'outbound', + this.log, + options + ) + } + + onData (): void { + + } + + sendReset (): void { + this.session.close() + } +} + +export function quicMuxer (session: QuicSession, log: ComponentLogger, init: QUICTransportStreamMuxerInit): StreamMuxerFactory { + return { + protocol: PROTOCOL, + createStreamMuxer (maConn: MultiaddrConnection): StreamMuxer { + return new QUICTransportStreamMuxer(session, maConn, init) + } + } +} diff --git a/packages/transport-quic/src/quic.browser.ts b/packages/transport-quic/src/quic.browser.ts new file mode 100644 index 0000000000..87e56a2c44 --- /dev/null +++ b/packages/transport-quic/src/quic.browser.ts @@ -0,0 +1,34 @@ +import { serviceCapabilities, transportSymbol } from '@libp2p/interface' +import type { QUICDialEvents } from './index.ts' +import type { Connection, Transport, Listener } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' + +export class QUIC implements Transport { + constructor () { + throw new Error('QUIC connections are not possible in browsers') + } + + readonly [transportSymbol] = true + + readonly [Symbol.toStringTag] = '@libp2p/quic' + + readonly [serviceCapabilities]: string[] = [ + '@libp2p/transport' + ] + + async dial (): Promise { + throw new Error('QUIC connections are not possible in browsers') + } + + createListener (): Listener { + throw new Error('QUIC connections are not possible in browsers') + } + + listenFilter (): Multiaddr[] { + return [] + } + + dialFilter (): Multiaddr[] { + return [] + } +} diff --git a/packages/transport-quic/src/quic.ts b/packages/transport-quic/src/quic.ts new file mode 100644 index 0000000000..b1b9fa5e13 --- /dev/null +++ b/packages/transport-quic/src/quic.ts @@ -0,0 +1,171 @@ +import * as crypto from 'node:crypto' +import { serviceCapabilities, transportSymbol } from '@libp2p/interface' +import { peerIdFromString } from '@libp2p/peer-id' +import { generateCertificate, verifyPeerCertificate } from '@libp2p/tls/utils' +import { getNetConfig } from '@libp2p/utils' +import { multiaddr } from '@multiformats/multiaddr' +import { QUIC_V1 } from '@multiformats/multiaddr-matcher' +import net from 'node:quic' +import { CustomProgressEvent } from 'progress-events' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { QUICListener } from './listener.ts' +import { quicMuxer } from './muxer.ts' +import { toMultiaddrConnection } from './session-to-conn.ts' +import { getRemoteCertificate } from './utils/get-remote-certificate.ts' +import type { QUICComponents, QUICCreateListenerOptions, QUICDialEvents, QUICDialOptions, QUICMetrics, QUICOptions } from './index.ts' +import type { Logger, Connection, Transport, Listener, MultiaddrConnection, PeerId } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' + +export class QUIC implements Transport { + private readonly opts: QUICOptions + private readonly metrics?: QUICMetrics + private readonly components: QUICComponents + private readonly log: Logger + + constructor (components: QUICComponents, options: QUICOptions = {}) { + this.log = components.logger.forComponent('libp2p:quic') + this.opts = options + this.components = components + + if (components.metrics != null) { + this.metrics = { + events: components.metrics.registerCounterGroup('libp2p_quic_dialer_events_total', { + label: 'event', + help: 'Total count of TCP dialer events by type' + }), + errors: components.metrics.registerCounterGroup('libp2p_quic_dialer_errors_total', { + label: 'event', + help: 'Total count of TCP dialer events by type' + }) + } + } + } + + readonly [transportSymbol] = true + + readonly [Symbol.toStringTag] = '@libp2p/quic' + + readonly [serviceCapabilities]: string[] = [ + '@libp2p/transport' + ] + + async dial (ma: Multiaddr, options: QUICDialOptions): Promise { + options.signal?.throwIfAborted() + options.keepAlive = options.keepAlive ?? true + options.noDelay = options.noDelay ?? true + options.onProgress?.(new CustomProgressEvent('quic:open-connection')) + + const config = getNetConfig(ma) + const pem = await generateCertificate(this.components.privateKey) + + let addr = `${config.host}:${config.port}` + + if (config.type === 'ip6') { + addr = `[${config.host}]:${config.port}` + } + + this.log('dialing %s', addr) + const session = await net.connect(addr, { + alpn: 'libp2p', + certs: uint8ArrayFromString(pem.cert), + keys: crypto.createPrivateKey(pem.key) + // endpoint: // TODO: pass listen endpoint to multiplex to single port + }) + + const cert = await getRemoteCertificate(session, options) + + let remotePeer: PeerId + + try { + let expectedPeer: PeerId | undefined + const maPeerString = ma.getComponents().findLast(c => c.name === 'p2p')?.value + + if (maPeerString != null) { + expectedPeer = peerIdFromString(maPeerString) + } + + this.log('secure outbound stream %p', expectedPeer) + remotePeer = await verifyPeerCertificate(cert, expectedPeer) + } catch (err) { + this.metrics?.errors.increment({ outbound_verify_peer: true }) + session.destroy(err) + throw err + } + + let maConn: MultiaddrConnection + + try { + const path = session.path + + if (path == null) { + throw new Error('Session did not have path') + } + + maConn = toMultiaddrConnection(session, { + remoteAddr: multiaddr(`/ip${path.remote.family === 'ipv4' ? '4' : '6'}/${path.remote.address}/udp/${path.remote.port}/quic-v1`), + localAddr: multiaddr(`/ip${path.local.family === 'ipv4' ? '4' : '6'}/${path.local.address}/udp/${path.local.port}/quic-v1`), + inactivityTimeout: this.opts.outboundSocketInactivityTimeout, + closeTimeout: this.opts.socketCloseTimeout, + metrics: this.metrics?.events, + logger: this.components.logger, + log: this.log, + direction: 'outbound' + }) + } catch (err: any) { + this.metrics?.errors.increment({ outbound_to_connection: true }) + session.destroy(err) + throw err + } + + try { + this.log('new outbound connection %s to %p', maConn.remoteAddr, remotePeer) + return await options.upgrader.upgradeOutbound(maConn, { + ...options, + skipProtection: true, + skipEncryption: true, + remotePeer, + muxerFactory: quicMuxer(session, this.components.logger, { + maxOutboundStreams: this.opts.maxOutboundStreams + }) + }) + } catch (err: any) { + this.metrics?.errors.increment({ outbound_upgrade: true }) + this.log.error('error upgrading outbound connection', err) + maConn.abort(err) + throw err + } + } + + /** + * Creates a TCP listener. The provided `handler` function will be called + * anytime a new incoming Connection has been successfully upgraded via + * `upgrader.upgradeInbound`. + */ + createListener (options: QUICCreateListenerOptions): Listener { + return new QUICListener(this.components, { + ...(this.opts.listenOpts ?? {}), + ...options, + maxConnections: this.opts.maxConnections, + socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout, + socketCloseTimeout: this.opts.socketCloseTimeout, + metrics: this.components.metrics, + logger: this.components.logger + }) + } + + /** + * Takes a list of `Multiaddr`s and returns only valid QUIC addresses + */ + listenFilter (multiaddrs: Multiaddr[]): Multiaddr[] { + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] + + return multiaddrs.filter(ma => QUIC_V1.exactMatch(ma)) + } + + /** + * Filter check for all Multiaddrs that this transport can dial + */ + dialFilter (multiaddrs: Multiaddr[]): Multiaddr[] { + return this.listenFilter(multiaddrs) + } +} diff --git a/packages/transport-quic/src/session-to-conn.ts b/packages/transport-quic/src/session-to-conn.ts new file mode 100644 index 0000000000..a943499c6f --- /dev/null +++ b/packages/transport-quic/src/session-to-conn.ts @@ -0,0 +1,91 @@ +import { AbstractMultiaddrConnection } from '@libp2p/utils' +import { raceSignal } from 'race-signal' +import type { AbortOptions, ComponentLogger, MultiaddrConnection } from '@libp2p/interface' +import type { AbstractMultiaddrConnectionInit, SendResult } from '@libp2p/utils' +import type { QuicSession } from 'node:quic' +import type { Uint8ArrayList } from 'uint8arraylist' + +export interface QUICSessionMultiaddrConnectionInit extends Omit { + closeTimeout?: number + logger: ComponentLogger + direction: 'inbound' | 'outbound' +} + +class QUICSessionMultiaddrConnection extends AbstractMultiaddrConnection { + private session: QuicSession + + constructor (session: QuicSession, init: QUICSessionMultiaddrConnectionInit) { + super(init) + + this.session = session + /* + // @ts-expect-error not in types + this.session.onerror = (err) => { + this.log.error('QUIC onerror - %e', err) + this.abort(err) + } +*/ + // @ts-expect-error not in types + this.session.onearlyrejected = (err) => { + this.log.error('QUIC onearlyrejected - %e', err) + this.onRemoteReset() + } + + // @ts-expect-error not in types + this.session.ongoaway = (err) => { + this.log.error('QUIC ongoaway - %e', err) + this.onTransportClosed() + } + + // @ts-expect-error not in types + session.opened.then(() => { + this.log('QUIC session opened') + }, (err: Error) => { + this.log('QUIC session failed to open - %e', err) + this.onTransportClosed() + }) + + session.closed.then(() => { + this.log('QUIC session closed gracefully') + }, (err: Error) => { + this.log.error('QUIC session closed with error - %e', err) + this.abort(err) + }).finally(() => { + this.log('QUIC session closed') + this.onTransportClosed() + // This is how we specify the connection is closed and shouldn't be used. + this.timeline.close = Date.now() + }) + } + + sendData (data: Uint8ArrayList): SendResult { + return { + sentBytes: data.byteLength, + canSendMore: true + } + } + + sendReset (): void { + this.session.destroy() + } + + async sendClose (options?: AbortOptions): Promise { + await raceSignal(this.session.close(), options?.signal) + } + + sendPause (): void { + // TODO: backpressure? + } + + sendResume (): void { + // TODO: backpressure? + } +} + +/** + * Convert a QUIC session into a MultiaddrConnection + * https://github.com/libp2p/interface-transport#multiaddrconnection + */ +export const toMultiaddrConnection = (session: QuicSession, init: QUICSessionMultiaddrConnectionInit): MultiaddrConnection => { + return new QUICSessionMultiaddrConnection(session, init) +} diff --git a/packages/transport-quic/src/stream.ts b/packages/transport-quic/src/stream.ts new file mode 100644 index 0000000000..41724d61a3 --- /dev/null +++ b/packages/transport-quic/src/stream.ts @@ -0,0 +1,122 @@ +import { AbstractStream } from '@libp2p/utils' +import { raceSignal } from 'race-signal' +import type { AbortOptions, MessageStreamDirection, Logger, StreamOptions } from '@libp2p/interface' +import type { AbstractStreamInit, SendResult } from '@libp2p/utils' +import type { QuicStream } from 'node:quic' +import type { Uint8ArrayList } from 'uint8arraylist' + +interface QUICStreamInit extends AbstractStreamInit { + stream: QuicStream +} + +export class QUICStream extends AbstractStream { + private readonly writer: WritableStreamDefaultWriter + + constructor (init: QUICStreamInit) { + super(init) + + // @ts-expect-error this comes from https://github.com/nodejs/node/pull/62876 + // and is missing from the types + this.writer = init.stream.writer + + Promise.resolve() + .then(async () => { + // @ts-expect-error not in types + // eslint-disable-next-line @typescript-eslint/await-thenable + for await (const bufs of init.stream) { + bufs.forEach((buf: Uint8Array) => { + this.onData(buf) + }) + } + + this.onRemoteCloseWrite() + }) + .catch(err => { + this.abort(err) + }) + } + + sendData (data: Uint8ArrayList): SendResult { + let sentBytes = 0 + + while (data.byteLength > 0) { + const toWrite = Math.min(data.byteLength, this.writer.desiredSize ?? 0) + + if (toWrite === 0) { + break + } + + // TODO: have to copy before write otherwise error is thrown: + // TypeError: Provided key doesn't match [[ArrayBufferDetachKey]] + const bytes = data.subarray(0, toWrite).slice() + data.consume(toWrite) + // @ts-expect-error not in types + this.writer.writeSync(bytes) + sentBytes += toWrite + } + + this.log.trace('desired size after sending %d bytes is %d bytes', sentBytes, this.writer.desiredSize) + + // null means the stream has errored - https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size + if (this.writer.desiredSize == null) { + return { + sentBytes, + canSendMore: false + } + } + + const canSendMore = this.writer.desiredSize > 0 + + if (!canSendMore) { + // @ts-expect-error not in types + // wait for drain + this.writer[Symbol.for('Stream.drainableProtocol')]?.()?.then(() => { + this.safeDispatchEvent('drain') + }, (err: any) => { + this.abort(err) + }) + } + + return { + sentBytes, + canSendMore + } + } + + sendReset (err: Error): void { + // @ts-expect-error not in types + this.writer.fail(err) + } + + async sendCloseWrite (options?: AbortOptions): Promise { + this.log('sendCloseWrite closing writer') + // @ts-expect-error not in types + await raceSignal(this.writer.end().catch(() => {}), options?.signal) + this.log('sendCloseWrite closed writer') + } + + async sendCloseRead (options?: AbortOptions): Promise { + this.log('sendCloseRead cancelling reader') + // await raceSignal(this.reader.cancel(), options?.signal) + this.log('sendCloseRead cancelled reader') + } + + sendPause (): void { + + } + + sendResume (): void { + // this.readData() + } +} + +export function quicBiDiStreamToStream (stream: QuicStream, streamId: string, direction: MessageStreamDirection, log: Logger, options?: StreamOptions): QUICStream { + return new QUICStream({ + ...options, + stream, + id: streamId, + direction, + log: log.newScope(`${direction}:${streamId}`), + protocol: '' + }) +} diff --git a/packages/transport-quic/src/utils/get-remote-certificate.ts b/packages/transport-quic/src/utils/get-remote-certificate.ts new file mode 100644 index 0000000000..1512ab2ba7 --- /dev/null +++ b/packages/transport-quic/src/utils/get-remote-certificate.ts @@ -0,0 +1,14 @@ +import { raceSignal } from 'race-signal' +import type { AbortOptions } from '@libp2p/interface' +import type { QuicSession } from 'node:quic' + +export async function getRemoteCertificate (session: QuicSession, options?: AbortOptions): Promise { + const p = new Promise((resolve) => { + session.onhandshake = () => { + // @ts-expect-error missing from types + resolve(session.peerCertificate.raw()) + } + }) + + return raceSignal(p, options?.signal) +} diff --git a/packages/transport-quic/test/browser.ts b/packages/transport-quic/test/browser.ts new file mode 100644 index 0000000000..f3fa088cff --- /dev/null +++ b/packages/transport-quic/test/browser.ts @@ -0,0 +1,22 @@ +import { generateKeyPair } from '@libp2p/crypto/keys' +import { defaultLogger } from '@libp2p/logger' +import { expect } from 'aegir/chai' +import { isBrowser, isWebWorker } from 'wherearewe' +import { quic } from '../src/index.ts' + +describe('browser non-support', () => { + it('should throw in browsers', function () { + if (!isBrowser && !isWebWorker) { + return this.skip() + } + + expect(async () => { + const privateKey = await generateKeyPair('Ed25519') + + quic()({ + privateKey, + logger: defaultLogger() + }) + }).to.throw() + }) +}) diff --git a/packages/transport-quic/test/connection.spec.ts b/packages/transport-quic/test/connection.spec.ts new file mode 100644 index 0000000000..242a177326 --- /dev/null +++ b/packages/transport-quic/test/connection.spec.ts @@ -0,0 +1,97 @@ +import { generateKeyPair } from '@libp2p/crypto/keys' +import { defaultLogger } from '@libp2p/logger' +import { multiaddr } from '@multiformats/multiaddr' +import { expect } from 'aegir/chai' +import pWaitFor from 'p-wait-for' +import { raceSignal } from 'race-signal' +import Sinon from 'sinon' +import { stubInterface } from 'sinon-ts' +import { quic } from '../src/index.ts' +import type { Connection, Transport, Upgrader } from '@libp2p/interface' +import type { StubbedInstance } from 'sinon-ts' + +describe('valid localAddr and remoteAddr', () => { + let transport: Transport + let upgrader: StubbedInstance + + beforeEach(async () => { + const privateKey = await generateKeyPair('Ed25519') + + transport = quic()({ + privateKey, + logger: defaultLogger() + }) + upgrader = stubInterface({ + upgradeInbound: Sinon.stub().resolves(), + upgradeOutbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + } + }) + }) + + const ma = multiaddr('/ip4/127.0.0.1/tcp/0') + + it('should resolve port 0', async () => { + // Create a listener + const listener = transport.createListener({ + upgrader + }) + + // Listen on the multi-address + await listener.listen(ma) + + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) + + // Dial to that address + await transport.dial(localAddrs[0], { + signal: AbortSignal.timeout(5_000), + upgrader + }) + + // Wait for the incoming dial to be handled + await pWaitFor(() => { + return upgrader.upgradeInbound.callCount === 1 + }) + + // Close the listener + await listener.close() + }) + + it('should handle multiple simultaneous closes', async () => { + // Create a listener + const listener = transport.createListener({ + upgrader + }) + + // Listen on the multi-address + await listener.listen(ma) + + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) + + // Dial to that address + const dialerConn = await transport.dial(localAddrs[0], { + signal: AbortSignal.timeout(5_000), + upgrader + }) + + // Wait for the incoming dial to be handled + await pWaitFor(() => { + return upgrader.upgradeInbound.callCount === 1 + }) + + // Close the dialer with two simultaneous calls to `close` + await raceSignal( + Promise.all([ + dialerConn.close(), + dialerConn.close() + ]), + AbortSignal.timeout(500) + ) + + await listener.close() + }) +}) diff --git a/packages/transport-quic/test/filter.spec.ts b/packages/transport-quic/test/filter.spec.ts new file mode 100644 index 0000000000..6e746d4222 --- /dev/null +++ b/packages/transport-quic/test/filter.spec.ts @@ -0,0 +1,45 @@ +import { generateKeyPair } from '@libp2p/crypto/keys' +import { defaultLogger } from '@libp2p/logger' +import { multiaddr } from '@multiformats/multiaddr' +import { expect } from 'aegir/chai' +import { quic } from '../src/index.ts' +import type { Transport } from '@libp2p/interface' + +describe('filter addrs', () => { + const base = '/ip4/127.0.0.1' + const ipfs = '/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw' + + let transport: Transport + + beforeEach(async () => { + const privateKey = await generateKeyPair('Ed25519') + + transport = quic()({ + privateKey, + logger: defaultLogger() + }) + }) + + it('filter valid addrs for this transport', () => { + const ma1 = multiaddr(base + '/udp/9090/quic-v1') + const ma2 = multiaddr(base + '/udp/9090') + const ma3 = multiaddr(base + '/tcp/9090/http') + const ma4 = multiaddr(base + '/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma5 = multiaddr(base + '/tcp/9090/http' + ipfs) + const ma6 = multiaddr('/ip4/127.0.0.1/tcp/9090/p2p-circuit' + ipfs) + const ma7 = multiaddr('/dns4/libp2p.io/tcp/9090') + const ma8 = multiaddr('/dnsaddr/libp2p.io/tcp/9090') + + const valid = transport.dialFilter([ma1, ma2, ma3, ma4, ma5, ma6, ma7, ma8]) + expect(valid.length).to.equal(1) + expect(valid[0]).to.deep.equal(ma1) + }) + + it('filter a single addr for this transport', () => { + const ma1 = multiaddr(base + '/udp/9090/quic-v1') + + const valid = transport.dialFilter([ma1]) + expect(valid.length).to.equal(1) + expect(valid[0]).to.eql(ma1) + }) +}) diff --git a/packages/transport-quic/test/listen-dial.spec.ts b/packages/transport-quic/test/listen-dial.spec.ts new file mode 100644 index 0000000000..1171b3e07e --- /dev/null +++ b/packages/transport-quic/test/listen-dial.spec.ts @@ -0,0 +1,302 @@ +import { generateKeyPair } from '@libp2p/crypto/keys' +import { defaultLogger } from '@libp2p/logger' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' +import { getNetConfig } from '@libp2p/utils' +import { multiaddr } from '@multiformats/multiaddr' +import { expect } from 'aegir/chai' +import pDefer from 'p-defer' +import Sinon from 'sinon' +import { stubInterface } from 'sinon-ts' +import { quic } from '../src/index.ts' +import type { Connection, Listener, PeerId, Transport, Upgrader } from '@libp2p/interface' + +const isCI = process.env.CI + +describe('listen', () => { + let transport: Transport + let listener: Listener | undefined + let upgrader: Upgrader + + beforeEach(async () => { + const privateKey = await generateKeyPair('Ed25519') + + transport = quic()({ + privateKey, + logger: defaultLogger() + }) + upgrader = stubInterface({ + upgradeInbound: Sinon.stub().resolves(), + upgradeOutbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + } + }) + }) + + afterEach(async () => { + try { + if (listener != null) { + await listener.close() + } + } catch { + // some tests close the listener so ignore errors + } + }) + + it('listen on port 0', async () => { + const mh = multiaddr('/ip4/127.0.0.1/udp/0/quic-v1') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + }) + + it('listen on IPv6 addr', async () => { + if (isCI != null) { + return + } + const mh = multiaddr('/ip6/::/udp/9090/quic-v1') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + }) + + it('listen on any Interface', async () => { + const mh = multiaddr('/ip4/0.0.0.0/udp/9090/quic-v1') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + }) + + it('getAddrs', async () => { + const mh = multiaddr('/ip4/127.0.0.1/udp/9090/quic-v1') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length).to.equal(1) + expect(multiaddrs[0]).to.deep.equal(mh) + }) + + it('getAddrs on port 0 listen', async () => { + const mh = multiaddr('/ip4/127.0.0.1/udp/0/quic-v1') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length).to.equal(1) + }) + + it('getAddrs from listening on 0.0.0.0', async () => { + const mh = multiaddr('/ip4/0.0.0.0/udp/9090/quic-v1') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length > 0).to.equal(true) + expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) + }) + + it('getAddrs from listening on 0.0.0.0 and port 0', async () => { + const mh = multiaddr('/ip4/0.0.0.0/udp/0/quic-v1') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length > 0).to.equal(true) + expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) + }) + + it('getAddrs from listening on ip6 \'::\'', async () => { + const mh = multiaddr('/ip6/::/udp/9090/quic-v1') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length > 0).to.equal(true) + expect(getNetConfig(multiaddrs[0]).host).to.not.equal('::') + }) + + it('getAddrs preserves IPFS Id', async () => { + const mh = multiaddr('/ip4/127.0.0.1/udp/9090/quic-v1/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length).to.equal(1) + expect(multiaddrs[0]).to.deep.equal(mh) + }) +}) + +describe('dial', () => { + let transport: Transport + let upgrader: Upgrader + let peerId: PeerId + + beforeEach(async () => { + const privateKey = await generateKeyPair('Ed25519') + peerId = peerIdFromPrivateKey(privateKey) + + upgrader = stubInterface({ + upgradeInbound: Sinon.stub().resolves(), + upgradeOutbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + } + }) + + transport = quic()({ + privateKey, + logger: defaultLogger() + }) + }) + + it('dial IPv4', async () => { + const ma = multiaddr('/ip4/127.0.0.1/udp/9090/quic-v1') + const listener = transport.createListener({ + upgrader + }) + await listener.listen(ma) + + await expect(transport.dial(ma, { + signal: AbortSignal.timeout(5_000), + upgrader + })).to.eventually.be.ok() + + await listener.close() + }) + + it.skip('dial IPv6', async () => { + if (isCI != null) { + return + } + + const ma = multiaddr('/ip6/::/udp/9090/quic-v1') + const listener = transport.createListener({ + upgrader + }) + await listener.listen(ma) + + await expect(transport.dial(ma, { + signal: AbortSignal.timeout(5_000), + upgrader + })).to.eventually.be.ok() + + await listener.close() + }) + + it('dials IPv4 with IPFS Id', async () => { + const ma = multiaddr(`/ip4/127.0.0.1/udp/9090/quic-v1/ipfs/${peerId}`) + const listener = transport.createListener({ + upgrader + }) + await listener.listen(ma) + + await expect(transport.dial(ma, { + signal: AbortSignal.timeout(5_000), + upgrader + })).to.eventually.be.ok() + + await listener.close() + }) + + it('should close before connection upgrade is completed', async () => { + // create a Promise that resolves when the upgrade starts + const upgradeStarted = pDefer() + + // create a listener with the handler + const listener = transport.createListener({ + upgrader: stubInterface({ + async upgradeInbound () { + upgradeStarted.resolve() + + return new Promise(() => {}) + }, + async upgradeOutbound () { + return new Promise(() => {}) + } + }) + }) + + // listen on a multiaddr + await listener.listen(multiaddr('/ip4/127.0.0.1/udp/0/quic-v1')) + + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) + + // dial the listener address + transport.dial(localAddrs[0], { + signal: AbortSignal.timeout(5_000), + upgrader + }).catch(() => {}) + + // wait for the upgrade to start + await upgradeStarted.promise + + // close the listener, process should exit normally + await listener.close() + }) + + it('should abort inbound upgrade on close', async () => { + // create a Promise that resolves when the upgrade starts + const upgradeStarted = pDefer() + const abortedUpgrade = pDefer() + + // create a listener with the handler + const listener = transport.createListener({ + upgrader: stubInterface({ + async upgradeInbound (maConn, opts) { + upgradeStarted.resolve() + + opts?.signal?.addEventListener('abort', () => { + abortedUpgrade.resolve() + }, { + once: true + }) + + return new Promise(() => {}) + }, + async upgradeOutbound () { + return new Promise(() => {}) + } + }) + }) + + // listen on a multiaddr + await listener.listen(multiaddr('/ip4/127.0.0.1/udp/0/quic-v1')) + + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) + + // dial the listener address + transport.dial(localAddrs[0], { + signal: AbortSignal.timeout(5_000), + upgrader + }).catch(() => {}) + + // wait for the upgrade to start + await upgradeStarted.promise + + // close the listener + await listener.close() + + // should abort the upgrade + await abortedUpgrade.promise + }) +}) diff --git a/packages/transport-quic/tsconfig.json b/packages/transport-quic/tsconfig.json new file mode 100644 index 0000000000..f120099a5c --- /dev/null +++ b/packages/transport-quic/tsconfig.json @@ -0,0 +1,30 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src", + "test" + ], + "references": [ + { + "path": "../crypto" + }, + { + "path": "../interface" + }, + { + "path": "../logger" + }, + { + "path": "../peer-id" + }, + { + "path": "../connection-encrypter-tls" + }, + { + "path": "../utils" + } + ] +} diff --git a/packages/transport-quic/typedoc.json b/packages/transport-quic/typedoc.json new file mode 100644 index 0000000000..db0b0747ef --- /dev/null +++ b/packages/transport-quic/typedoc.json @@ -0,0 +1,6 @@ +{ + "readme": "none", + "entryPoints": [ + "./src/index.ts" + ] +}