diff --git a/pkg/resource/stream.go b/pkg/resource/stream.go index 8186d53..7b97b67 100644 --- a/pkg/resource/stream.go +++ b/pkg/resource/stream.go @@ -62,7 +62,13 @@ func FromStream(ctx context.Context, path string, r io.Reader) (<-chan Resource, break SCAN default: } - res := Resource{Path: path, Bytes: scanner.Bytes()} + // scanner.Bytes() aliases the scanner's internal buffer, which is + // overwritten when the scanner refills for later documents. Resources are + // sent on the channel below and retained/parsed by the consumer (e.g. + // ValidateWithContext) concurrently with this producer goroutine, so + // without a copy the consumer reads corrupted bytes once the buffer is + // reused. Clone so each Resource owns its bytes. + res := Resource{Path: path, Bytes: bytes.Clone(scanner.Bytes())} for _, subres := range res.Resources() { resources <- subres } diff --git a/pkg/resource/stream_test.go b/pkg/resource/stream_test.go index 6c78eb5..545a0fa 100644 --- a/pkg/resource/stream_test.go +++ b/pkg/resource/stream_test.go @@ -3,6 +3,7 @@ package resource_test import ( "bytes" "context" + "fmt" "io" "reflect" "strings" @@ -173,3 +174,61 @@ kind: Deployment wg.Wait() } } + +// TestFromStreamResourceBytesNotAliased is a regression test for a data race in +// FromStream: each Resource was created with Bytes set to scanner.Bytes(), which +// aliases the bufio.Scanner's internal buffer. Resources are emitted on a channel +// and consumed concurrently with the producer goroutine; the scanner refills that +// buffer for later documents, overwriting the slices earlier Resources still point +// at. Callers that retain or parse resources as they arrive (e.g. +// ValidateWithContext) then read corrupted bytes — a different document spliced +// into the one being parsed — failing a different resource on every run. +// +// The stream is deliberately larger than the scanner's 4MB initial buffer so it +// must shift/refill mid-stream (the condition that clobbers earlier tokens). Each +// document carries a unique marker; after draining every resource, all markers +// must still be intact. Run with -race to also catch the underlying data race. +func TestFromStreamResourceBytesNotAliased(t *testing.T) { + const ( + docs = 400 + fillerSize = 16 * 1024 // docs*fillerSize (~6.4MB) exceeds the 4MB scanner buffer + ) + + var stream strings.Builder + for i := 0; i < docs; i++ { + fmt.Fprintf( + &stream, + "---\napiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm-%d\ndata:\n filler: %q\n", + i, strings.Repeat("x", fillerSize), + ) + } + + resChan, errChan := resource.FromStream(context.Background(), "stream", strings.NewReader(stream.String())) + + go func() { + for range errChan { //nolint:revive // drain the error channel + } + }() + + // Drain every resource; the scanner keeps refilling its buffer as we do, + // clobbering any earlier Resource.Bytes that still alias it. + var all []resource.Resource + for res := range resChan { + all = append(all, res) + } + + if len(all) != docs { + t.Fatalf("expected %d resources, got %d", docs, len(all)) + } + + for i, res := range all { + marker := fmt.Sprintf("name: cm-%d\n", i) + if !bytes.Contains(res.Bytes, []byte(marker)) { + t.Fatalf( + "resource %d: bytes were overwritten (Resource.Bytes aliased the scanner buffer, "+ + "clobbered when the scanner refilled); expected them to contain %q, got first 120 bytes:\n%s", + i, marker, string(res.Bytes[:min(120, len(res.Bytes))]), + ) + } + } +}