From d3ae574bf21ac98fe2dcf03a164f27c2e71f370e Mon Sep 17 00:00:00 2001 From: Nikolai Emil Damm Date: Thu, 25 Jun 2026 02:50:07 +0200 Subject: [PATCH] fix: copy scanner bytes in FromStream to prevent aliasing data race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FromStream set each Resource's Bytes to scanner.Bytes(), which aliases the bufio.Scanner's internal buffer. Resources are emitted on a channel and retained/parsed by the consumer concurrently with the producer goroutine; when the scanner refills its buffer for later documents it overwrites the slices earlier Resources still point at. Consumers that parse resources as they arrive (e.g. ValidateWithContext) then read corrupted bytes — a later document spliced into the one being parsed — surfacing as non-deterministic YAML parse errors on large multi-document streams (a different resource fails on each run). Clone the scanner bytes so each Resource owns its slice. Adds a regression test that streams >4MB (forcing the scanner to refill) and asserts every resource's bytes are intact after draining. Co-Authored-By: Claude Opus 4.8 --- pkg/resource/stream.go | 8 ++++- pkg/resource/stream_test.go | 59 +++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/pkg/resource/stream.go b/pkg/resource/stream.go index 8186d53..7b97b67 100644 --- a/pkg/resource/stream.go +++ b/pkg/resource/stream.go @@ -62,7 +62,13 @@ func FromStream(ctx context.Context, path string, r io.Reader) (<-chan Resource, break SCAN default: } - res := Resource{Path: path, Bytes: scanner.Bytes()} + // scanner.Bytes() aliases the scanner's internal buffer, which is + // overwritten when the scanner refills for later documents. Resources are + // sent on the channel below and retained/parsed by the consumer (e.g. + // ValidateWithContext) concurrently with this producer goroutine, so + // without a copy the consumer reads corrupted bytes once the buffer is + // reused. Clone so each Resource owns its bytes. + res := Resource{Path: path, Bytes: bytes.Clone(scanner.Bytes())} for _, subres := range res.Resources() { resources <- subres } diff --git a/pkg/resource/stream_test.go b/pkg/resource/stream_test.go index 6c78eb5..545a0fa 100644 --- a/pkg/resource/stream_test.go +++ b/pkg/resource/stream_test.go @@ -3,6 +3,7 @@ package resource_test import ( "bytes" "context" + "fmt" "io" "reflect" "strings" @@ -173,3 +174,61 @@ kind: Deployment wg.Wait() } } + +// TestFromStreamResourceBytesNotAliased is a regression test for a data race in +// FromStream: each Resource was created with Bytes set to scanner.Bytes(), which +// aliases the bufio.Scanner's internal buffer. Resources are emitted on a channel +// and consumed concurrently with the producer goroutine; the scanner refills that +// buffer for later documents, overwriting the slices earlier Resources still point +// at. Callers that retain or parse resources as they arrive (e.g. +// ValidateWithContext) then read corrupted bytes — a different document spliced +// into the one being parsed — failing a different resource on every run. +// +// The stream is deliberately larger than the scanner's 4MB initial buffer so it +// must shift/refill mid-stream (the condition that clobbers earlier tokens). Each +// document carries a unique marker; after draining every resource, all markers +// must still be intact. Run with -race to also catch the underlying data race. +func TestFromStreamResourceBytesNotAliased(t *testing.T) { + const ( + docs = 400 + fillerSize = 16 * 1024 // docs*fillerSize (~6.4MB) exceeds the 4MB scanner buffer + ) + + var stream strings.Builder + for i := 0; i < docs; i++ { + fmt.Fprintf( + &stream, + "---\napiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm-%d\ndata:\n filler: %q\n", + i, strings.Repeat("x", fillerSize), + ) + } + + resChan, errChan := resource.FromStream(context.Background(), "stream", strings.NewReader(stream.String())) + + go func() { + for range errChan { //nolint:revive // drain the error channel + } + }() + + // Drain every resource; the scanner keeps refilling its buffer as we do, + // clobbering any earlier Resource.Bytes that still alias it. + var all []resource.Resource + for res := range resChan { + all = append(all, res) + } + + if len(all) != docs { + t.Fatalf("expected %d resources, got %d", docs, len(all)) + } + + for i, res := range all { + marker := fmt.Sprintf("name: cm-%d\n", i) + if !bytes.Contains(res.Bytes, []byte(marker)) { + t.Fatalf( + "resource %d: bytes were overwritten (Resource.Bytes aliased the scanner buffer, "+ + "clobbered when the scanner refilled); expected them to contain %q, got first 120 bytes:\n%s", + i, marker, string(res.Bytes[:min(120, len(res.Bytes))]), + ) + } + } +}