From fc65e34e4a22c9d17905664cfcd6f095698ed5e7 Mon Sep 17 00:00:00 2001 From: zhouyusd <1848611319@qq.com> Date: Wed, 15 Apr 2026 03:13:01 +0800 Subject: [PATCH 1/3] fix: improve SSE stream response handling and add fasthttp support (#888) --- sse/sse.go | 113 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 65 insertions(+), 48 deletions(-) diff --git a/sse/sse.go b/sse/sse.go index 39b98646..5552be66 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -2,9 +2,11 @@ package sse import ( + "bufio" "context" "encoding/json" "fmt" + "io" "net/http" "os" "reflect" @@ -14,6 +16,7 @@ import ( "time" "github.com/danielgtaylor/huma/v2" + "github.com/valyala/fasthttp" ) // WriteTimeout is the timeout for writing to the client. @@ -54,6 +57,12 @@ func (s Sender) Data(data any) error { return s(Message{Data: data}) } +type flusherFunc func() error + +func (f flusherFunc) Flush() { + f() +} + // Register a new SSE operation. The `eventTypeMap` maps from event name to // the type of the data that will be sent. The `f` function is called with // the context, input, and a `send` function that can be used to send messages @@ -133,6 +142,61 @@ func Register[I any](api huma.API, op huma.Operation, eventTypeMap map[string]an Body: func(ctx huma.Context) { ctx.SetHeader("Content-Type", "text/event-stream") bw := ctx.BodyWriter() + send := func(deadliner writeDeadliner, flusher http.Flusher, encoder *json.Encoder, writer io.Writer) Sender { + return func(msg Message) error { + if deadliner != nil { + if err := deadliner.SetWriteDeadline(time.Now().Add(WriteTimeout)); err != nil { + fmt.Fprintf(os.Stderr, "warning: unable to set write deadline: %v\n", err) + } + } else { + fmt.Fprintln(os.Stderr, "write deadline not supported by underlying writer") + } + + // Write optional fields + if msg.ID > 0 { + writer.Write(fmt.Appendf(nil, "id: %d\n", msg.ID)) + } + if msg.Retry > 0 { + writer.Write(fmt.Appendf(nil, "retry: %d\n", msg.Retry)) + } + + event, ok := typeToEvent[deref(reflect.TypeOf(msg.Data))] + if !ok { + fmt.Fprintf(os.Stderr, "error: unknown event type %v\n", reflect.TypeOf(msg.Data)) + debug.PrintStack() + } + if event != "" && event != "message" { + // `message` is the default, so no need to transmit it. + writer.Write([]byte("event: " + event + "\n")) + } + + // Write the message data. + if _, err := writer.Write([]byte("data: ")); err != nil { + return err + } + if err := encoder.Encode(msg.Data); err != nil { + writer.Write([]byte(`{"error": "encode error: `)) + writer.Write([]byte(err.Error())) + writer.Write([]byte("\"}\n\n")) + return err + } + writer.Write([]byte("\n")) + if flusher != nil { + flusher.Flush() + } else { + fmt.Fprintln(os.Stderr, "error: unable to flush") + return fmt.Errorf("unable to flush: %w", http.ErrNotSupported) + } + return nil + } + } + if fastCtx, ok := bw.(*fasthttp.RequestCtx); ok { + fastCtx.SetBodyStreamWriter(func(bfw *bufio.Writer) { + encoder := json.NewEncoder(bfw) + f(ctx.Context(), input, send(fastCtx.Conn(), flusherFunc(bfw.Flush), encoder, bfw)) + }) + return + } encoder := json.NewEncoder(bw) // Get the flusher/deadliner from the response writer if possible. @@ -164,55 +228,8 @@ func Register[I any](api huma.API, op huma.Operation, eventTypeMap map[string]an } } - send := func(msg Message) error { - if deadliner != nil { - if err := deadliner.SetWriteDeadline(time.Now().Add(WriteTimeout)); err != nil { - fmt.Fprintf(os.Stderr, "warning: unable to set write deadline: %v\n", err) - } - } else { - fmt.Fprintln(os.Stderr, "write deadline not supported by underlying writer") - } - - // Write optional fields - if msg.ID > 0 { - bw.Write(fmt.Appendf(nil, "id: %d\n", msg.ID)) - } - if msg.Retry > 0 { - bw.Write(fmt.Appendf(nil, "retry: %d\n", msg.Retry)) - } - - event, ok := typeToEvent[deref(reflect.TypeOf(msg.Data))] - if !ok { - fmt.Fprintf(os.Stderr, "error: unknown event type %v\n", reflect.TypeOf(msg.Data)) - debug.PrintStack() - } - if event != "" && event != "message" { - // `message` is the default, so no need to transmit it. - bw.Write([]byte("event: " + event + "\n")) - } - - // Write the message data. - if _, err := bw.Write([]byte("data: ")); err != nil { - return err - } - if err := encoder.Encode(msg.Data); err != nil { - bw.Write([]byte(`{"error": "encode error: `)) - bw.Write([]byte(err.Error())) - bw.Write([]byte("\"}\n\n")) - return err - } - bw.Write([]byte("\n")) - if flusher != nil { - flusher.Flush() - } else { - fmt.Fprintln(os.Stderr, "error: unable to flush") - return fmt.Errorf("unable to flush: %w", http.ErrNotSupported) - } - return nil - } - // Call the user-provided SSE handler. - f(ctx.Context(), input, send) + f(ctx.Context(), input, send(deadliner, flusher, encoder, bw)) }, }, nil }) From dfa71053856fbfbc15f9b045ffbc0e04268f3f21 Mon Sep 17 00:00:00 2001 From: zhouyusd <1848611319@qq.com> Date: Wed, 15 Apr 2026 03:24:40 +0800 Subject: [PATCH 2/3] fix: handle flush error --- sse/sse.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sse/sse.go b/sse/sse.go index 5552be66..a61537ba 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -60,7 +60,9 @@ func (s Sender) Data(data any) error { type flusherFunc func() error func (f flusherFunc) Flush() { - f() + if err := f(); err != nil { + fmt.Fprintf(os.Stderr, "warning: flush failed: %v\n", err) + } } // Register a new SSE operation. The `eventTypeMap` maps from event name to From 70aa0abc62848088581736531591f7344d6c0cc1 Mon Sep 17 00:00:00 2001 From: zhouyusd <1848611319@qq.com> Date: Wed, 15 Apr 2026 03:36:46 +0800 Subject: [PATCH 3/3] chore: simplify encoder creation --- sse/sse.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sse/sse.go b/sse/sse.go index a61537ba..697ebc7f 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -144,7 +144,8 @@ func Register[I any](api huma.API, op huma.Operation, eventTypeMap map[string]an Body: func(ctx huma.Context) { ctx.SetHeader("Content-Type", "text/event-stream") bw := ctx.BodyWriter() - send := func(deadliner writeDeadliner, flusher http.Flusher, encoder *json.Encoder, writer io.Writer) Sender { + send := func(deadliner writeDeadliner, flusher http.Flusher, writer io.Writer) Sender { + encoder := json.NewEncoder(writer) return func(msg Message) error { if deadliner != nil { if err := deadliner.SetWriteDeadline(time.Now().Add(WriteTimeout)); err != nil { @@ -194,13 +195,10 @@ func Register[I any](api huma.API, op huma.Operation, eventTypeMap map[string]an } if fastCtx, ok := bw.(*fasthttp.RequestCtx); ok { fastCtx.SetBodyStreamWriter(func(bfw *bufio.Writer) { - encoder := json.NewEncoder(bfw) - f(ctx.Context(), input, send(fastCtx.Conn(), flusherFunc(bfw.Flush), encoder, bfw)) + f(ctx.Context(), input, send(fastCtx.Conn(), flusherFunc(bfw.Flush), bfw)) }) return } - encoder := json.NewEncoder(bw) - // Get the flusher/deadliner from the response writer if possible. var flusher http.Flusher flushCheck := bw @@ -231,7 +229,7 @@ func Register[I any](api huma.API, op huma.Operation, eventTypeMap map[string]an } // Call the user-provided SSE handler. - f(ctx.Context(), input, send(deadliner, flusher, encoder, bw)) + f(ctx.Context(), input, send(deadliner, flusher, bw)) }, }, nil })