13
0
Fork 0
mirror of https://github.com/yannh/kubeconform.git synced 2026-07-02 17:39:30 +00:00
kubeconform/pkg/resource/stream.go
Nikolai Emil Damm d3ae574bf2
fix: copy scanner bytes in FromStream to prevent aliasing data race
FromStream set each Resource's Bytes to scanner.Bytes(), which aliases
the bufio.Scanner's internal buffer. Resources are emitted on a channel
and retained/parsed by the consumer concurrently with the producer
goroutine; when the scanner refills its buffer for later documents it
overwrites the slices earlier Resources still point at. Consumers that
parse resources as they arrive (e.g. ValidateWithContext) then read
corrupted bytes — a later document spliced into the one being parsed —
surfacing as non-deterministic YAML parse errors on large multi-document
streams (a different resource fails on each run).

Clone the scanner bytes so each Resource owns its slice. Adds a
regression test that streams >4MB (forcing the scanner to refill) and
asserts every resource's bytes are intact after draining.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 02:50:27 +02:00

82 lines
2.3 KiB
Go

package resource
import (
"bufio"
"bytes"
"context"
"io"
)
// SplitYAMLDocument is a bufio.SplitFunc for splitting a YAML document into individual documents.
//
// This is from Kubernetes' 'pkg/util/yaml'.splitYAMLDocument, which is unfortunately
// not exported.
func SplitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) {
const yamlSeparator = "\n---"
if atEOF && len(data) == 0 {
return 0, nil, nil
}
sep := len([]byte(yamlSeparator))
if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 {
// We have a potential document terminator
i += sep
after := data[i:]
if len(after) == 0 {
// we can't read any more characters
if atEOF {
return len(data), data[:len(data)-sep], nil
}
return 0, nil, nil
}
if j := bytes.IndexByte(after, '\n'); j >= 0 {
return i + j + 1, data[0 : i-sep], nil
}
return 0, nil, nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
// FromStream reads resources from a byte stream, usually here stdin
func FromStream(ctx context.Context, path string, r io.Reader) (<-chan Resource, <-chan error) {
resources := make(chan Resource)
errors := make(chan error)
go func() {
const initialBufSize = 4 * 1024 * 1024 // Start with 4MB
const maxBufSize = 256 * 1024 * 1024 // Start with 4MB
scanner := bufio.NewScanner(r)
buf := make([]byte, initialBufSize)
scanner.Buffer(buf, maxBufSize) // Resize up to 256MB
scanner.Split(SplitYAMLDocument)
SCAN:
for scanner.Scan() {
select {
case <-ctx.Done():
break SCAN
default:
}
// scanner.Bytes() aliases the scanner's internal buffer, which is
// overwritten when the scanner refills for later documents. Resources are
// sent on the channel below and retained/parsed by the consumer (e.g.
// ValidateWithContext) concurrently with this producer goroutine, so
// without a copy the consumer reads corrupted bytes once the buffer is
// reused. Clone so each Resource owns its bytes.
res := Resource{Path: path, Bytes: bytes.Clone(scanner.Bytes())}
for _, subres := range res.Resources() {
resources <- subres
}
}
close(resources)
close(errors)
}()
return resources, errors
}