run across multiple workers

This commit is contained in:
Yann Hamon 2020-05-30 20:17:04 +02:00
parent 928d694e66
commit 87f1a0531e
2 changed files with 48 additions and 26 deletions

View file

@ -2,6 +2,10 @@
A Kubernetes manifests validation tool, inspired by & similar to [Kubeval](https://github.com/instrumenta/kubeval)
Notable features:
* high performance: will validate & download manifests over multiple routines
* support for Kubernetes CRDs
```
$ ./bin/kubeconform -h
Usage of ./bin/kubeconform:
@ -13,8 +17,12 @@ Usage of ./bin/kubeconform:
version of Kubernetes to test against (default "1.18.0")
-output string
output format - text, json (default "text")
-printsummary
print a summary at the end
-schema value
file containing an additional Schema (can be specified multiple times)
-skipKinds string
comma-separated list of kinds to ignore
-workers int
number of routines to run in parallel (default 4)
```

66
main.go
View file

@ -9,6 +9,7 @@ import (
"log"
"os"
"strings"
"sync"
"github.com/yannh/kubeconform/pkg/cache"
"github.com/yannh/kubeconform/pkg/fsutils"
@ -75,10 +76,13 @@ func realMain() int {
var files, dirs, schemas arrayFiles
var skipKinds, k8sVersion, outputFormat string
var printSummary bool
var nWorkers int
flag.BoolVar(&printSummary, "printsummary", false, "print a summary at the end")
flag.Var(&files, "file", "file to validate (can be specified multiple times)")
flag.Var(&dirs, "dir", "directory to validate (can be specified multiple times)")
flag.Var(&schemas, "schema", "file containing an additional Schema")
flag.Var(&schemas, "schema", "file containing an additional Schema (can be specified multiple times)")
flag.IntVar(&nWorkers, "workers", 4, "number of routines to run in parallel")
flag.StringVar(&k8sVersion, "k8sversion", "1.18.0", "version of Kubernetes to test against")
flag.StringVar(&skipKinds, "skipKinds", "", "comma-separated list of kinds to ignore")
flag.StringVar(&outputFormat, "output", "text", "output format - text, json")
@ -104,6 +108,16 @@ func realMain() int {
return false
}
registries := []registry.Registry{}
registries = append(registries, registry.NewKubernetesRegistry(false))
if len(schemas) > 0 {
localRegistry, err := registry.NewLocalSchemas(schemas)
if err != nil {
log.Fatalf("%s", err)
}
registries = append(registries, localRegistry)
}
fileBatches := make(chan []string)
go func() {
@ -120,33 +134,33 @@ func realMain() int {
close(fileBatches)
}()
registries := []registry.Registry{}
registries = append(registries, registry.NewKubernetesRegistry(false))
if len(schemas) > 0 {
localRegistry, err := registry.NewLocalSchemas(schemas)
if err != nil {
log.Fatalf("%s", err)
}
registries = append(registries, localRegistry)
var wg sync.WaitGroup
for i:=0; i<nWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for fileBatch := range fileBatches {
for _, filename := range fileBatch {
f, err := os.Open(filename)
if err != nil {
log.Printf("failed opening %s\n", filename)
continue
}
res := validateFile(f, registries, k8sVersion, filter)
f.Close()
for _, resourceValidation := range res {
o.Write(filename, resourceValidation.err, resourceValidation.skipped)
}
}
}
}()
}
for fileBatch := range fileBatches {
for _, filename := range fileBatch {
f, err := os.Open(filename)
if err != nil {
log.Printf("failed opening %s\n", filename)
continue
}
res := validateFile(f, registries, k8sVersion, filter)
f.Close()
for _, resourceValidation := range res {
o.Write(filename, resourceValidation.err, resourceValidation.skipped)
}
}
}
wg.Wait()
o.Flush()
return 0