From a1c32a1f52ce6c0b4d4381bb931013fdcd31df1b Mon Sep 17 00:00:00 2001 From: rtribotte Date: Tue, 20 Jan 2026 12:02:37 +0100 Subject: [PATCH 1/3] feat: one side buffering --- buffer/buffer.go | 92 ++++++++++++++++++++++++++----------------- buffer/buffer_test.go | 87 ++++++++++++++++++++++++++++++++++++++++ buffer/options.go | 18 +++++++++ 3 files changed, 161 insertions(+), 36 deletions(-) diff --git a/buffer/buffer.go b/buffer/buffer.go index 2b1165c5..0bfea382 100644 --- a/buffer/buffer.go +++ b/buffer/buffer.go @@ -60,9 +60,11 @@ var errHandler utils.ErrorHandler = &SizeErrHandler{} // Buffer is responsible for buffering requests and responses // It buffers large requests and responses to disk,. type Buffer struct { + disableRequest bool maxRequestBodyBytes int64 memRequestBodyBytes int64 + disableResponse bool maxResponseBodyBytes int64 memResponseBodyBytes int64 @@ -109,6 +111,12 @@ func (b *Buffer) Wrap(next http.Handler) error { } func (b *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if b.disableRequest && b.disableResponse { + b.next.ServeHTTP(w, req) + + return + } + if b.verbose { dump := utils.DumpHTTPRequest(req) @@ -123,53 +131,65 @@ func (b *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - // Read the body while keeping limits in mind. This reader controls the maximum bytes - // to read into memory and disk. This reader returns an error if the total request size exceeds the - // predefined MaxSizeBytes. This can occur if we got chunked request, in this case ContentLength would be set to -1 - // and the reader would be unbounded bufio in the http.Server - body, err := multibuf.New(req.Body, multibuf.MaxBytes(b.maxRequestBodyBytes), multibuf.MemBytes(b.memRequestBodyBytes)) - if err != nil || body == nil { - if req.Context().Err() != nil { - b.log.Error("vulcand/oxy/buffer: error when reading request body, err: %v", req.Context().Err()) - b.errHandler.ServeHTTP(w, req, req.Context().Err()) + var body multibuf.MultiReader + var totalSize int64 + outReq := req + + if !b.disableRequest { + // Read the body while keeping limits in mind. This reader controls the maximum bytes + // to read into memory and disk. This reader returns an error if the total request size exceeds the + // predefined MaxSizeBytes. This can occur if we got chunked request, in this case ContentLength would be set to -1 + // and the reader would be unbounded bufio in the http.Server + var err error + body, err = multibuf.New(req.Body, multibuf.MaxBytes(b.maxRequestBodyBytes), multibuf.MemBytes(b.memRequestBodyBytes)) + if err != nil || body == nil { + if req.Context().Err() != nil { + b.log.Error("vulcand/oxy/buffer: error when reading request body, err: %v", req.Context().Err()) + b.errHandler.ServeHTTP(w, req, req.Context().Err()) + + return + } + + b.log.Error("vulcand/oxy/buffer: error when reading request body, err: %v", err) + b.errHandler.ServeHTTP(w, req, err) return } - b.log.Error("vulcand/oxy/buffer: error when reading request body, err: %v", err) - b.errHandler.ServeHTTP(w, req, err) + // Set request body to buffered reader that can replay the read and execute Seek + // Note that we don't change the original request body as it's handled by the http server + // and we don't want to mess with standard library + defer func() { + if body != nil { + errClose := body.Close() + if errClose != nil { + b.log.Error("vulcand/oxy/buffer: failed to close body, err: %v", errClose) + } + } + }() - return - } + // We need to set ContentLength based on known request size. The incoming request may have been + // set without content length or using chunked TransferEncoding + totalSize, err = body.Size() + if err != nil { + b.log.Error("vulcand/oxy/buffer: failed to get request size, err: %v", err) + b.errHandler.ServeHTTP(w, req, err) - // Set request body to buffered reader that can replay the read and execute Seek - // Note that we don't change the original request body as it's handled by the http server - // and we don't want to mess with standard library - defer func() { - if body != nil { - errClose := body.Close() - if errClose != nil { - b.log.Error("vulcand/oxy/buffer: failed to close body, err: %v", errClose) - } + return } - }() - // We need to set ContentLength based on known request size. The incoming request may have been - // set without content length or using chunked TransferEncoding - totalSize, err := body.Size() - if err != nil { - b.log.Error("vulcand/oxy/buffer: failed to get request size, err: %v", err) - b.errHandler.ServeHTTP(w, req, err) + if totalSize == 0 { + body = nil + } - return + outReq = b.copyRequest(req, body, totalSize) } - if totalSize == 0 { - body = nil + if b.disableResponse { + b.next.ServeHTTP(w, outReq) + return } - outReq := b.copyRequest(req, body, totalSize) - attempt := 1 for { @@ -220,7 +240,7 @@ func (b *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) { reader = rdr } - if (b.retryPredicate == nil || attempt > DefaultMaxRetryAttempts) || + if body == nil || (b.retryPredicate == nil || attempt > DefaultMaxRetryAttempts) || !b.retryPredicate(&context{r: req, attempt: attempt, responseCode: bw.code}) { utils.CopyHeaders(w.Header(), bw.Header()) w.WriteHeader(bw.code) @@ -236,7 +256,7 @@ func (b *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) { if body != nil { if _, err := body.Seek(0, 0); err != nil { - b.log.Error("vulcand/oxy/buffer: failed to rewind response body, err: %v", err) + b.log.Error("vulcand/oxy/buffer: failed to rewind request body, err: %v", err) b.errHandler.ServeHTTP(w, req, err) return diff --git a/buffer/buffer_test.go b/buffer/buffer_test.go index e172e7e1..9c332b17 100644 --- a/buffer/buffer_test.go +++ b/buffer/buffer_test.go @@ -496,3 +496,90 @@ func TestBuffer_GRPC_OKResponse(t *testing.T) { assert.Equal(t, http.StatusOK, re.StatusCode) assert.Equal(t, "grpc-body", string(body)) } + +func TestBuffer_disableRequestBuffer(t *testing.T) { + var ( + reqBody string + contentLength int64 + actuallyBuffered bool + ) + + srv := testutils.NewHandler(func(w http.ResponseWriter, req *http.Request) { + body, err := io.ReadAll(req.Body) + require.NoError(t, err) + + reqBody = string(body) + contentLength = req.ContentLength + // When buffering is disabled, chunked requests should preserve their transfer encoding, and have no content-length. + actuallyBuffered = contentLength > 0 || len(req.TransferEncoding) == 0 + _, _ = w.Write([]byte("response")) + }) + t.Cleanup(srv.Close) + + fwd := forward.New(false) + rdr := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + req.URL = testutils.MustParseRequestURI(srv.URL) + fwd.ServeHTTP(w, req) + }) + + // buffer with disabled request buffering. + st, err := New(rdr, DisableRequestBuffer()) + require.NoError(t, err) + + proxy := httptest.NewServer(st) + t.Cleanup(proxy.Close) + + // Send a chunked request - when buffering is disabled, it should remain chunked. + conn, err := net.Dial("tcp", testutils.MustParseRequestURI(proxy.URL).Host) + require.NoError(t, err) + defer conn.Close() + + _, _ = fmt.Fprintf(conn, "POST / HTTP/1.1\r\nHost: %s\r\nTransfer-Encoding: chunked\r\n\r\n4\r\ntest\r\n0\r\n\r\n", testutils.MustParseRequestURI(proxy.URL).Host) + status, err := bufio.NewReader(conn).ReadString('\n') + require.NoError(t, err) + + assert.Equal(t, "HTTP/1.1 200 OK\r\n", status) + assert.Equal(t, "test", reqBody) + // When buffering is disabled, chunked encoding should be preserved (not converted to Content-Length). + assert.False(t, actuallyBuffered, "Request should not have been buffered") + assert.Equal(t, int64(-1), contentLength, "Content-Length should be -1 for chunked requests when not buffered") +} + +func TestBuffer_disableResponseBuffer(t *testing.T) { + largeResponseBody := strings.Repeat("A", 1000) + srv := testutils.NewHandler(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(largeResponseBody)) + }) + t.Cleanup(srv.Close) + + fwd := forward.New(false) + rdr := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + req.URL = testutils.MustParseRequestURI(srv.URL) + fwd.ServeHTTP(w, req) + }) + + // buffer with a small max response size. + st, err := New(rdr, MaxResponseBodyBytes(4)) + require.NoError(t, err) + + proxy := httptest.NewServer(st) + t.Cleanup(proxy.Close) + + re, body, err := testutils.Get(proxy.URL) + require.NoError(t, err) + // Response should not pass through as it exceeds the limit. + assert.Equal(t, http.StatusInternalServerError, re.StatusCode) + + // buffer with disabled response buffering and a small max response size. + st, err = New(rdr, DisableResponseBuffer(), MaxResponseBodyBytes(4)) + require.NoError(t, err) + + proxy2 := httptest.NewServer(st) + t.Cleanup(proxy2.Close) + + re, body, err = testutils.Get(proxy2.URL) + require.NoError(t, err) + // Response should pass through even though it exceeds the limit, because buffering has been disabled. + assert.Equal(t, http.StatusOK, re.StatusCode) + assert.Equal(t, largeResponseBody, string(body)) +} diff --git a/buffer/options.go b/buffer/options.go index 0ba9d83a..1a16747f 100644 --- a/buffer/options.go +++ b/buffer/options.go @@ -69,6 +69,24 @@ func ErrorHandler(h utils.ErrorHandler) Option { } } +// DisableRequestBuffer disables request buffering. +func DisableRequestBuffer() Option { + return func(b *Buffer) error { + b.disableRequest = true + + return nil + } +} + +// DisableResponseBuffer disables response buffering. +func DisableResponseBuffer() Option { + return func(b *Buffer) error { + b.disableResponse = true + + return nil + } +} + // MaxRequestBodyBytes sets the maximum request body size in bytes. func MaxRequestBodyBytes(m int64) Option { return func(b *Buffer) error { From 766c0679f2b73ee4694dec00f72b607c8e798b7a Mon Sep 17 00:00:00 2001 From: rtribotte Date: Tue, 20 Jan 2026 12:16:04 +0100 Subject: [PATCH 2/3] linting --- buffer/buffer.go | 3 +++ buffer/buffer_test.go | 12 +++++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/buffer/buffer.go b/buffer/buffer.go index 0bfea382..1c162a09 100644 --- a/buffer/buffer.go +++ b/buffer/buffer.go @@ -132,7 +132,9 @@ func (b *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } var body multibuf.MultiReader + var totalSize int64 + outReq := req if !b.disableRequest { @@ -141,6 +143,7 @@ func (b *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // predefined MaxSizeBytes. This can occur if we got chunked request, in this case ContentLength would be set to -1 // and the reader would be unbounded bufio in the http.Server var err error + body, err = multibuf.New(req.Body, multibuf.MaxBytes(b.maxRequestBodyBytes), multibuf.MemBytes(b.memRequestBodyBytes)) if err != nil || body == nil { if req.Context().Err() != nil { diff --git a/buffer/buffer_test.go b/buffer/buffer_test.go index 9c332b17..2e0d1569 100644 --- a/buffer/buffer_test.go +++ b/buffer/buffer_test.go @@ -532,7 +532,9 @@ func TestBuffer_disableRequestBuffer(t *testing.T) { // Send a chunked request - when buffering is disabled, it should remain chunked. conn, err := net.Dial("tcp", testutils.MustParseRequestURI(proxy.URL).Host) require.NoError(t, err) - defer conn.Close() + t.Cleanup(func() { + _ = conn.Close() + }) _, _ = fmt.Fprintf(conn, "POST / HTTP/1.1\r\nHost: %s\r\nTransfer-Encoding: chunked\r\n\r\n4\r\ntest\r\n0\r\n\r\n", testutils.MustParseRequestURI(proxy.URL).Host) status, err := bufio.NewReader(conn).ReadString('\n') @@ -565,10 +567,10 @@ func TestBuffer_disableResponseBuffer(t *testing.T) { proxy := httptest.NewServer(st) t.Cleanup(proxy.Close) - re, body, err := testutils.Get(proxy.URL) + resp, _, err := testutils.Get(proxy.URL) require.NoError(t, err) // Response should not pass through as it exceeds the limit. - assert.Equal(t, http.StatusInternalServerError, re.StatusCode) + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) // buffer with disabled response buffering and a small max response size. st, err = New(rdr, DisableResponseBuffer(), MaxResponseBodyBytes(4)) @@ -577,9 +579,9 @@ func TestBuffer_disableResponseBuffer(t *testing.T) { proxy2 := httptest.NewServer(st) t.Cleanup(proxy2.Close) - re, body, err = testutils.Get(proxy2.URL) + resp2, body, err := testutils.Get(proxy2.URL) require.NoError(t, err) // Response should pass through even though it exceeds the limit, because buffering has been disabled. - assert.Equal(t, http.StatusOK, re.StatusCode) + assert.Equal(t, http.StatusOK, resp2.StatusCode) assert.Equal(t, largeResponseBody, string(body)) } From 71080ec7beffcba5ad4f0b1d3329e7ae5ec2023e Mon Sep 17 00:00:00 2001 From: rtribotte Date: Tue, 20 Jan 2026 14:46:56 +0100 Subject: [PATCH 3/3] review --- buffer/buffer.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/buffer/buffer.go b/buffer/buffer.go index 1c162a09..30ff1f03 100644 --- a/buffer/buffer.go +++ b/buffer/buffer.go @@ -124,13 +124,6 @@ func (b *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer b.log.Debug("vulcand/oxy/buffer: completed ServeHttp on request: %s", dump) } - if err := b.checkLimit(req); err != nil { - b.log.Error("vulcand/oxy/buffer: request body over limit, err: %v", err) - b.errHandler.ServeHTTP(w, req, err) - - return - } - var body multibuf.MultiReader var totalSize int64 @@ -138,6 +131,13 @@ func (b *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) { outReq := req if !b.disableRequest { + if err := b.checkLimit(req); err != nil { + b.log.Error("vulcand/oxy/buffer: request body over limit, err: %v", err) + b.errHandler.ServeHTTP(w, req, err) + + return + } + // Read the body while keeping limits in mind. This reader controls the maximum bytes // to read into memory and disk. This reader returns an error if the total request size exceeds the // predefined MaxSizeBytes. This can occur if we got chunked request, in this case ContentLength would be set to -1