Refactor resource discovery

This commit is contained in:
Yann Hamon 2020-11-01 20:09:48 +01:00
parent 2391d82281
commit 0a7f885768
4 changed files with 78 additions and 139 deletions

View file

@ -67,7 +67,7 @@
@test "Return relevant error for non-existent file" { @test "Return relevant error for non-existent file" {
run bin/kubeconform fixtures/not-here run bin/kubeconform fixtures/not-here
[ "$status" -eq 1 ] [ "$status" -eq 1 ]
[ "$output" = "fixtures/not-here - failed validation: open fixtures/not-here: no such file or directory" ] [ "$output" = "fixtures/not-here - failed validation: lstat fixtures/not-here: no such file or directory" ]
} }
@test "Pass when parsing a blank config file" { @test "Pass when parsing a blank config file" {

View file

@ -8,7 +8,6 @@ import (
"github.com/yannh/kubeconform/pkg/cache" "github.com/yannh/kubeconform/pkg/cache"
"github.com/yannh/kubeconform/pkg/config" "github.com/yannh/kubeconform/pkg/config"
"github.com/yannh/kubeconform/pkg/fsutils"
"github.com/yannh/kubeconform/pkg/output" "github.com/yannh/kubeconform/pkg/output"
"github.com/yannh/kubeconform/pkg/registry" "github.com/yannh/kubeconform/pkg/registry"
"github.com/yannh/kubeconform/pkg/resource" "github.com/yannh/kubeconform/pkg/resource"
@ -36,60 +35,57 @@ func downloadSchema(registries []registry.Registry, kind, version, k8sVersion st
return nil, nil // No schema found - we don't consider it an error, resource will be skipped return nil, nil // No schema found - we don't consider it an error, resource will be skipped
} }
func ValidateResources(resources <-chan []resource.Resource, validationResults chan<- validator.Result, regs []registry.Registry, k8sVersion string, c *cache.SchemaCache, skip func(signature resource.Signature) bool, ignoreMissingSchemas bool) { func ValidateResources(resources <-chan resource.Resource, validationResults chan<- validator.Result, regs []registry.Registry, k8sVersion string, c *cache.SchemaCache, skip func(signature resource.Signature) bool, ignoreMissingSchemas bool) {
for resBatch := range resources { for res := range resources {
for _, res := range resBatch { sig, err := res.Signature()
sig, err := res.Signature() if err != nil {
if err != nil { validationResults <- validator.Result{Resource: res, Err: fmt.Errorf("error while parsing: %s", err), Status: validator.Error}
validationResults <- validator.Result{Resource: res, Err: fmt.Errorf("error while parsing: %s", err), Status: validator.Error} continue
}
if sig.Kind == "" {
validationResults <- validator.Result{Resource: res, Err: nil, Status: validator.Empty}
continue // We skip resoures that don't have a Kind defined
}
if skip(*sig) {
validationResults <- validator.Result{Resource: res, Err: nil, Status: validator.Skipped}
continue
}
cached := false
var schema *gojsonschema.Schema
cacheKey := ""
if c != nil {
cacheKey = cache.Key(sig.Kind, sig.Version, k8sVersion)
schema, cached = c.Get(cacheKey)
}
if !cached {
if schema, err = downloadSchema(regs, sig.Kind, sig.Version, k8sVersion); err != nil {
validationResults <- validator.Result{Resource: res, Err: err, Status: validator.Error}
continue continue
} }
if sig.Kind == "" {
validationResults <- validator.Result{Resource: res, Err: nil, Status: validator.Empty}
continue // We skip resoures that don't have a Kind defined
}
if skip(*sig) {
validationResults <- validator.Result{Resource: res, Err: nil, Status: validator.Skipped}
continue
}
cached := false
var schema *gojsonschema.Schema
cacheKey := ""
if c != nil { if c != nil {
cacheKey = cache.Key(sig.Kind, sig.Version, k8sVersion) c.Set(cacheKey, schema)
schema, cached = c.Get(cacheKey)
} }
if !cached {
if schema, err = downloadSchema(regs, sig.Kind, sig.Version, k8sVersion); err != nil {
validationResults <- validator.Result{Resource: res, Err: err, Status: validator.Error}
continue
}
if c != nil {
c.Set(cacheKey, schema)
}
}
if schema == nil {
if ignoreMissingSchemas {
validationResults <- validator.Result{Resource: res, Err: nil, Status: validator.Skipped}
} else {
validationResults <- validator.Result{Resource: res, Err: fmt.Errorf("could not find schema for %s", sig.Kind), Status: validator.Error}
}
continue
}
validationResults <- validator.Validate(res, schema)
} }
if schema == nil {
if ignoreMissingSchemas {
validationResults <- validator.Result{Resource: res, Err: nil, Status: validator.Skipped}
} else {
validationResults <- validator.Result{Resource: res, Err: fmt.Errorf("could not find schema for %s", sig.Kind), Status: validator.Error}
}
}
validationResults <- validator.Validate(res, schema)
} }
} }
func processResults(o output.Output, validationResults chan validator.Result, result chan<- bool) { func processResults(o output.Output, validationResults <-chan validator.Result, result chan<- bool) {
success := true success := true
for res := range validationResults { for res := range validationResults {
if res.Err != nil { if res.Err != nil {
@ -103,31 +99,6 @@ func processResults(o output.Output, validationResults chan validator.Result, re
result <- success result <- success
} }
func getFiles(files []string, filesChan chan<- string, validationResults chan validator.Result) {
for _, filename := range files {
file, err := os.Open(filename)
if err != nil {
validationResults <- validator.NewError(filename, err)
continue
}
defer file.Close()
fi, err := file.Stat()
switch {
case err != nil:
validationResults <- validator.NewError(filename, err)
case fi.IsDir():
if err := fsutils.FindYamlInDir(filename, filesChan); err != nil {
validationResults <- validator.NewError(filename, err)
}
default:
filesChan <- filename
}
}
}
func realMain() int { func realMain() int {
var err error var err error
@ -159,17 +130,19 @@ func realMain() int {
return 1 return 1
} }
var resourcesChan <-chan resource.Resource
var errors <-chan error
validationResults := make(chan validator.Result) validationResults := make(chan validator.Result)
res := make(chan bool) res := make(chan bool)
go processResults(o, validationResults, res) go processResults(o, validationResults, res)
files := make(chan string) if isStdin {
go func() { resourcesChan, errors = resource.FromStream("stdin", os.Stdin)
getFiles(cfg.Files, files, validationResults) } else {
close(files) resourcesChan, errors = resource.FromFiles(cfg.Files...)
}() }
resourcesChan := make(chan []resource.Resource)
c := cache.New() c := cache.New()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i := 0; i < cfg.NumberOfWorkers; i++ { for i := 0; i < cfg.NumberOfWorkers; i++ {
@ -180,34 +153,20 @@ func realMain() int {
}() }()
} }
if isStdin { wg.Add(1)
resources, err := resource.FromStream("stdin", os.Stdin) go func() {
if err != nil { for err := range errors {
validationResults <- validator.NewError("stdin", err)
} else {
resourcesChan <- resources
}
} else {
for filename := range files {
f, err := os.Open(filename)
if err != nil { if err != nil {
validationResults <- validator.NewError(filename, err) if err, ok := err.(resource.DiscoveryError); ok {
continue validationResults <- validator.NewError(err.Path, err.Err)
}
} }
resources, err := resource.FromStream(filename, f)
if err != nil {
validationResults <- validator.NewError(filename, err)
continue
}
resourcesChan <- resources
f.Close()
} }
} wg.Done()
}()
close(resourcesChan)
wg.Wait() wg.Wait()
close(validationResults) close(validationResults)
success := <-res success := <-res
o.Flush() o.Flush()

View file

@ -1,27 +0,0 @@
package fsutils
import (
"os"
"path/filepath"
"strings"
)
func isYaml(info os.FileInfo) bool {
return !info.IsDir() && (strings.HasSuffix(strings.ToLower(info.Name()), ".yaml") || strings.HasSuffix(strings.ToLower(info.Name()), ".yml"))
}
// FindYamlInDir will find yaml files in folder dir, and send their filenames in batches
// of size batchSize to channel fileBatches
func FindYamlInDir(dir string, files chan<- string) error {
return filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if isYaml(info) {
files <- path
}
return nil
})
}

View file

@ -6,17 +6,24 @@ import (
"io/ioutil" "io/ioutil"
) )
func FromStream(path string, r io.Reader) ([]Resource, error) { func FromStream(path string, r io.Reader) (<-chan Resource, <-chan error) {
data, err := ioutil.ReadAll(r) resources := make(chan Resource)
if err != nil { errors := make(chan error)
return []Resource{}, err
}
resources := []Resource{} go func() {
rawResources := bytes.Split(data, []byte("---\n")) data, err := ioutil.ReadAll(r)
for _, rawResource := range rawResources { if err != nil {
resources = append(resources, Resource{Path: path, Bytes: rawResource}) errors <- DiscoveryError{path, err}
} }
return resources, nil rawResources := bytes.Split(data, []byte("---\n"))
for _, rawResource := range rawResources {
resources <- Resource{Path: path, Bytes: rawResource}
}
close(resources)
close(errors)
}()
return resources, errors
} }