rework caching - less leak of goschema everywhere

This commit is contained in:
Yann Hamon 2020-05-31 03:12:28 +02:00
parent 8eb297d4c4
commit b918da9c59
5 changed files with 69 additions and 65 deletions

37
main.go
View file

@ -26,7 +26,7 @@ type validationResult struct {
// filter returns true if the file should be skipped
// Returning an array, this Reader might container multiple resources
func validateFile(f io.Reader, regs []registry.Registry, k8sVersion string, skip func(signature resource.Signature) bool) []validationResult {
func validateFile(f io.Reader, regs []registry.Registry, k8sVersion string, c *cache.SchemaCache, skip func(signature resource.Signature) bool) []validationResult {
rawResource, err := ioutil.ReadAll(f)
if err != nil {
return []validationResult{{err: fmt.Errorf("failed reading file: %s", err)}}
@ -42,15 +42,33 @@ func validateFile(f io.Reader, regs []registry.Registry, k8sVersion string, skip
}
var schema *gojsonschema.Schema
for _, reg := range regs {
downloadSchema := cache.WithCache(reg.DownloadSchema)
schema, err = downloadSchema(sig.Kind, sig.Version, k8sVersion)
if err == nil {
break
var schemaBytes []byte
var ok bool
cacheKey := cache.Key(sig.Kind, sig.Version, k8sVersion)
schema, ok = c.Get(cacheKey)
if !ok {
for _, reg := range regs {
schemaBytes, err = reg.DownloadSchema(sig.Kind, sig.Version, k8sVersion)
if err == nil {
schema, err = gojsonschema.NewSchema(gojsonschema.NewBytesLoader(schemaBytes))
if err != nil {
return []validationResult{{err: err, skipped: false}} // skip if no schema found
}
break
}
// If we get a 404, we keep trying, but we exit if we get a real failure
if er, retryable := err.(registry.Retryable); !(retryable && !er.IsRetryable()) {
return []validationResult{{err: fmt.Errorf("error while downloading schema for resource: %s", err)}}
}
}
}
if err != nil {
return []validationResult{{err: fmt.Errorf("error while downloading schema for resource: %s", err)}}
c.Set(cacheKey, schema)
if err != nil { // Not found
return []validationResult{{err: nil, skipped: true}} // skip if no schema found
}
if err = validator.Validate(rawResource, schema); err != nil {
@ -134,6 +152,7 @@ func realMain() int {
close(fileBatches)
}()
c := cache.NewSchemaCache()
var wg sync.WaitGroup
for i := 0; i < nWorkers; i++ {
wg.Add(1)
@ -148,7 +167,7 @@ func realMain() int {
continue
}
res := validateFile(f, registries, k8sVersion, filter)
res := validateFile(f, registries, k8sVersion, c, filter)
f.Close()
for _, resourceValidation := range res {

53
pkg/cache/main.go vendored
View file

@ -3,40 +3,33 @@ package cache
import (
"fmt"
"github.com/xeipuuv/gojsonschema"
"github.com/yannh/kubeconform/pkg/registry"
"sync"
)
var mu sync.Mutex
var schemas map[string]*gojsonschema.Schema
func init() {
schemas = map[string]*gojsonschema.Schema{}
type SchemaCache struct {
sync.RWMutex
schemas map[string]*gojsonschema.Schema
}
func WithCache(downloadSchema func(string, string, string) (*gojsonschema.Schema, error)) func(string, string, string) (*gojsonschema.Schema, error) {
return func(resourceKind, resourceAPIVersion, k8sVersion string) (*gojsonschema.Schema, error) {
cacheKey := fmt.Sprintf("%s-%s-%s", resourceKind, resourceAPIVersion, k8sVersion)
mu.Lock()
cachedSchema, ok := schemas[cacheKey]
mu.Unlock()
if ok {
return cachedSchema, nil
}
schema, err := downloadSchema(resourceKind, resourceAPIVersion, k8sVersion)
if err != nil {
// will try to download the schema later, except if the error implements Retryable
// and returns false on IsRetryable
if er, retryable := err.(registry.Retryable); !(retryable && !er.IsRetryable()) {
return schema, err
}
}
mu.Lock()
schemas[cacheKey] = schema
mu.Unlock()
return schema, err
func NewSchemaCache() *SchemaCache {
return &SchemaCache{
schemas: map[string]*gojsonschema.Schema{},
}
}
func Key(resourceKind, resourceAPIVersion, k8sVersion string) string {
return fmt.Sprintf("%s-%s-%s", resourceKind, resourceAPIVersion, k8sVersion)
}
func (c *SchemaCache) Get(key string) (*gojsonschema.Schema, bool) {
c.RLock()
defer c.RUnlock()
schema, ok := c.schemas[key]
return schema, ok
}
func (c *SchemaCache) Set(key string, schema *gojsonschema.Schema) {
c.Lock()
defer c.Unlock()
c.schemas[key] = schema
}

View file

@ -2,7 +2,6 @@ package registry
import (
"fmt"
"github.com/xeipuuv/gojsonschema"
"io/ioutil"
"net/http"
"strings"
@ -53,7 +52,7 @@ func (r KubernetesRegistry) schemaURL(resourceKind, resourceAPIVersion, k8sVersi
return fmt.Sprintf("%s/%s-standalone%s/%s%s.json", r.baseURL, normalisedVersion, strictSuffix, strings.ToLower(resourceKind), kindSuffix)
}
func (r KubernetesRegistry) DownloadSchema(resourceKind, resourceAPIVersion, k8sVersion string) (*gojsonschema.Schema, error) {
func (r KubernetesRegistry) DownloadSchema(resourceKind, resourceAPIVersion, k8sVersion string) ([]byte, error) {
url := r.schemaURL(resourceKind, resourceAPIVersion, k8sVersion)
resp, err := http.Get(url)
@ -75,5 +74,5 @@ func (r KubernetesRegistry) DownloadSchema(resourceKind, resourceAPIVersion, k8s
return nil, fmt.Errorf("failed downloading schema at %s: %s", url, err)
}
return gojsonschema.NewSchema(gojsonschema.NewBytesLoader(body))
return body, nil
}

View file

@ -2,20 +2,18 @@ package registry
import (
"fmt"
"github.com/xeipuuv/gojsonschema"
"io/ioutil"
"os"
"sigs.k8s.io/yaml"
"strings"
)
type LocalSchemas struct {
schemas map[string]*gojsonschema.Schema
schemas map[string]string
}
func NewLocalSchemas(schemaFiles []string) (*LocalSchemas, error) {
schemas := &LocalSchemas{
schemas: map[string]*gojsonschema.Schema{},
schemas: map[string]string{},
}
for _, schemaFile := range schemaFiles {
@ -36,32 +34,29 @@ func NewLocalSchemas(schemaFiles []string) (*LocalSchemas, error) {
} `json:"Names"`
} `json:"Spec"`
}
err = yaml.Unmarshal(content, &parsedSchema)
err = yaml.Unmarshal(content, &parsedSchema) // Index Schemas by kind
if err != nil {
return nil, fmt.Errorf("failed parsing schema %s", schemaFile)
}
if strings.HasSuffix(schemaFile, ".yml") || strings.HasSuffix(schemaFile, ".yaml") {
asJSON, err := yaml.YAMLToJSON(content)
if err != nil {
return nil, fmt.Errorf("error converting manifest %s to JSON: %s", schemaFile, err)
}
schema, err := gojsonschema.NewSchema(gojsonschema.NewBytesLoader(asJSON))
if err != nil {
return nil, err
}
schemas.schemas[parsedSchema.Spec.Names.Kind] = schema
}
schemas.schemas[parsedSchema.Spec.Names.Kind] = schemaFile
}
return schemas, nil
}
func (r LocalSchemas) DownloadSchema(resourceKind, resourceAPIVersion, k8sVersion string) (*gojsonschema.Schema, error) {
schema, ok := r.schemas[resourceKind]
func (r LocalSchemas) DownloadSchema(resourceKind, resourceAPIVersion, k8sVersion string) ([]byte, error) {
schemaFile, ok := r.schemas[resourceKind]
if !ok {
return nil, fmt.Errorf("no local schema for Kind %s", resourceKind)
}
return schema, nil
f, err := os.Open(schemaFile)
if err != nil {
return nil, fmt.Errorf("failed to open schema %s", schemaFile)
}
defer f.Close()
content, err := ioutil.ReadAll(f)
return content, nil
}

View file

@ -1,13 +1,11 @@
package registry
import "github.com/xeipuuv/gojsonschema"
type Manifest struct {
Kind, Version string
}
type Registry interface {
DownloadSchema(resourceKind, resourceAPIVersion, k8sVersion string) (*gojsonschema.Schema, error)
DownloadSchema(resourceKind, resourceAPIVersion, k8sVersion string) ([]byte, error)
}
type Retryable interface {