This commit is contained in:
Yann Hamon 2020-11-01 13:00:02 +01:00
parent 939b44e3ca
commit 1bc9283240
15 changed files with 241 additions and 458 deletions

View file

@ -1,17 +1,13 @@
package main
import (
"bytes"
"fmt"
"github.com/xeipuuv/gojsonschema"
"github.com/yannh/kubeconform/pkg/config"
"io"
"io/ioutil"
"os"
"strings"
"sync"
"github.com/yannh/kubeconform/pkg/cache"
"github.com/yannh/kubeconform/pkg/config"
"github.com/yannh/kubeconform/pkg/fsutils"
"github.com/yannh/kubeconform/pkg/output"
"github.com/yannh/kubeconform/pkg/registry"
@ -19,23 +15,6 @@ import (
"github.com/yannh/kubeconform/pkg/validator"
)
type validationResult struct {
filename, kind, version, Name string
err error
skipped bool
}
func resourcesFromReader(r io.Reader) ([][]byte, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return [][]byte{}, err
}
resources := bytes.Split(data, []byte("---\n"))
return resources, nil
}
func downloadSchema(registries []registry.Registry, kind, version, k8sVersion string) (*gojsonschema.Schema, error) {
var err error
var schemaBytes []byte
@ -57,99 +36,82 @@ func downloadSchema(registries []registry.Registry, kind, version, k8sVersion st
return nil, nil // No schema found - we don't consider it an error, resource will be skipped
}
// filter returns true if the file should be skipped
// Returning an array, this Reader might container multiple resources
func ValidateStream(r io.Reader, regs []registry.Registry, k8sVersion string, c *cache.SchemaCache, skip func(signature resource.Signature) bool, ignoreMissingSchemas bool) []validationResult {
rawResources, err := resourcesFromReader(r)
if err != nil {
return []validationResult{{err: fmt.Errorf("failed reading file: %s", err)}}
}
validationResults := []validationResult{}
if len(rawResources) == 0 {
// In case a file has no resources, we want to capture that the file was parsed - and therefore send a message with an empty resource and no error
validationResults = append(validationResults, validationResult{kind: "", version: "", Name: "", err: nil, skipped: false})
}
for _, rawResource := range rawResources {
var sig resource.Signature
if sig, err = resource.SignatureFromBytes(rawResource); err != nil {
validationResults = append(validationResults, validationResult{kind: sig.Kind, version: sig.Version, Name: sig.Name, err: fmt.Errorf("error while parsing: %s", err)})
continue
}
if sig.Kind == "" {
validationResults = append(validationResults, validationResult{kind: "", version: "", Name: "", err: nil, skipped: false})
continue // We skip resoures that don't have a Kind defined
}
if skip(sig) {
validationResults = append(validationResults, validationResult{kind: sig.Kind, version: sig.Version, Name: sig.Name, err: nil, skipped: true})
continue
}
ok := false
var schema *gojsonschema.Schema
cacheKey := ""
if c != nil {
cacheKey = cache.Key(sig.Kind, sig.Version, k8sVersion)
schema, ok = c.Get(cacheKey)
}
if !ok {
schema, err = downloadSchema(regs, sig.Kind, sig.Version, k8sVersion)
func ValidateResources(resources <-chan []resource.Resource, validationResults chan<- validator.Result, regs []registry.Registry, k8sVersion string, c *cache.SchemaCache, skip func(signature resource.Signature) bool, ignoreMissingSchemas bool) {
for resBatch := range resources {
for _, res := range resBatch {
sig, err := res.Signature()
if err != nil {
validationResults = append(validationResults, validationResult{kind: sig.Kind, version: sig.Version, Name: sig.Name, err: err, skipped: false})
validationResults <- validator.Result{Resource: res, Err: fmt.Errorf("error while parsing: %s", err), Status: validator.Error}
continue
}
if sig.Kind == "" {
validationResults <- validator.Result{Resource: res, Err: nil, Status: validator.Empty}
continue // We skip resoures that don't have a Kind defined
}
if skip(*sig) {
validationResults <- validator.Result{Resource: res, Err: nil, Status: validator.Skipped}
continue
}
ok := false
var schema *gojsonschema.Schema
cacheKey := ""
if c != nil {
c.Set(cacheKey, schema)
cacheKey = cache.Key(sig.Kind, sig.Version, k8sVersion)
schema, ok = c.Get(cacheKey)
}
}
if schema == nil {
if ignoreMissingSchemas {
validationResults = append(validationResults, validationResult{kind: sig.Kind, version: sig.Version, Name: sig.Name, err: nil, skipped: true})
} else {
validationResults = append(validationResults, validationResult{kind: sig.Kind, version: sig.Version, Name: sig.Name, err: fmt.Errorf("could not find schema for %s", sig.Kind), skipped: false})
if !ok {
schema, err = downloadSchema(regs, sig.Kind, sig.Version, k8sVersion)
if err != nil {
validationResults <- validator.Result{Resource: res, Err: err, Status: validator.Error}
continue
}
if c != nil {
c.Set(cacheKey, schema)
}
}
}
err = validator.Validate(rawResource, schema)
validationResults = append(validationResults, validationResult{kind: sig.Kind, version: sig.Version, Name: sig.Name, err: err})
if schema == nil {
if ignoreMissingSchemas {
validationResults <- validator.Result{Resource: res, Err: nil, Status: validator.Skipped}
} else {
validationResults <- validator.Result{Resource: res, Err: fmt.Errorf("could not find schema for %s", sig.Kind), Status: validator.Error}
}
}
validationResults <- validator.Validate(res, schema)
}
}
return validationResults
}
func processResults(o output.Output, validationResults chan []validationResult, result chan<- bool) {
func processResults(o output.Output, validationResults chan validator.Result, result chan<- bool) {
success := true
for results := range validationResults {
for _, result := range results {
if result.err != nil {
success = false
}
if err := o.Write(result.filename, result.kind, result.Name, result.version, result.err, result.skipped); err != nil {
fmt.Fprint(os.Stderr, "failed writing log\n")
}
for res := range validationResults {
if res.Err != nil {
success = false
}
if err := o.Write(res); err != nil {
fmt.Fprint(os.Stderr, "failed writing log\n")
}
}
result <- success
}
func getFiles(files []string, fileBatches chan []string, validationResults chan []validationResult) {
func getFiles(files []string, filesChan chan<- string, validationResults chan validator.Result) {
for _, filename := range files {
file, err := os.Open(filename)
if err != nil {
validationResults <- []validationResult{{
filename: filename,
err: err,
skipped: false,
}}
validationResults <- validator.Result{
Resource: resource.Resource{Path: filename},
Err: err,
Status: validator.Error,
}
continue
}
defer file.Close()
@ -157,23 +119,23 @@ func getFiles(files []string, fileBatches chan []string, validationResults chan
fi, err := file.Stat()
switch {
case err != nil:
validationResults <- []validationResult{{
filename: filename,
err: err,
skipped: false,
}}
validationResults <- validator.Result{
Resource: resource.Resource{Path: filename},
Err: err,
Status: validator.Error,
}
case fi.IsDir():
if err := fsutils.FindYamlInDir(filename, fileBatches, 10); err != nil {
validationResults <- []validationResult{{
filename: filename,
err: err,
skipped: false,
}}
if err := fsutils.FindYamlInDir(filename, filesChan); err != nil {
validationResults <- validator.Result{
Resource: resource.Resource{Path: filename},
Err: err,
Status: validator.Error,
}
}
default:
fileBatches <- []string{filename}
filesChan <- filename
}
}
}
@ -189,7 +151,6 @@ func realMain() int {
// Detect whether we have data being piped through stdin
stat, _ := os.Stdin.Stat()
isStdin := (stat.Mode() & os.ModeCharDevice) == 0
if len(cfg.Files) == 1 && cfg.Files[0] == "-" {
isStdin = true
}
@ -201,75 +162,76 @@ func realMain() int {
registries := []registry.Registry{}
for _, schemaLocation := range cfg.SchemaLocations {
if !strings.HasSuffix(schemaLocation, "json") { // If we dont specify a full templated path, we assume the paths of kubernetesjsonschema.dev
schemaLocation += "/{{ .NormalizedVersion }}-standalone{{ .StrictSuffix }}/{{ .ResourceKind }}{{ .KindSuffix }}.json"
}
if strings.HasPrefix(schemaLocation, "http") {
registries = append(registries, registry.NewHTTPRegistry(schemaLocation, cfg.Strict))
} else {
registries = append(registries, registry.NewLocalRegistry(schemaLocation, cfg.Strict))
}
registries = append(registries, registry.New(schemaLocation, cfg.Strict))
}
validationResults := make(chan []validationResult)
c := cache.New()
fileBatches := make(chan []string)
go func() {
getFiles(cfg.Files, fileBatches, validationResults)
close(fileBatches)
}()
var o output.Output
if o, err = output.New(cfg.OutputFormat, cfg.Summary, isStdin, cfg.Verbose); err != nil {
fmt.Fprintln(os.Stderr, err)
return 1
}
validationResults := make(chan validator.Result)
res := make(chan bool)
go processResults(o, validationResults, res)
if isStdin {
res := ValidateStream(os.Stdin, registries, cfg.KubernetesVersion, c, filter, cfg.IgnoreMissingSchemas)
for i := range res {
res[i].filename = "stdin"
}
validationResults <- res
} else {
var wg sync.WaitGroup
for i := 0; i < cfg.NumberOfWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
files := make(chan string)
go func() {
getFiles(cfg.Files, files, validationResults)
close(files)
}()
for fileBatch := range fileBatches {
for _, filename := range fileBatch {
f, err := os.Open(filename)
if err != nil {
validationResults <- []validationResult{{
filename: filename,
err: err,
skipped: true,
}}
continue
}
res := ValidateStream(f, registries, cfg.KubernetesVersion, c, filter, cfg.IgnoreMissingSchemas)
f.Close()
for i := range res {
res[i].filename = filename
}
validationResults <- res
}
}
}()
}
wg.Wait()
resourcesChan := make(chan []resource.Resource)
c := cache.New()
wg := sync.WaitGroup{}
for i := 0; i < cfg.NumberOfWorkers; i++ {
wg.Add(1)
go func() {
ValidateResources(resourcesChan, validationResults, registries, cfg.KubernetesVersion, c, filter, cfg.IgnoreMissingSchemas)
wg.Done()
}()
}
if isStdin {
resources, err := resource.FromStream("stdin", os.Stdin)
if err != nil {
validationResults <- validator.Result{
Resource: resource.Resource{Path: "stdin"},
Err: err,
Status: validator.Error,
}
} else {
resourcesChan <- resources
}
} else {
for filename := range files {
f, err := os.Open(filename)
if err != nil {
validationResults <- validator.Result{
Resource: resource.Resource{Path: filename},
Err: err,
Status: validator.Error,
}
continue
}
resources, err := resource.FromStream(filename, f)
if err != nil {
validationResults <- validator.Result{
Resource: resource.Resource{Path: filename},
Err: err,
Status: validator.Error,
}
continue
}
resourcesChan <- resources
f.Close()
}
}
close(resourcesChan)
wg.Wait()
close(validationResults)
success := <-res
o.Flush()