This commit is contained in:
Yann Hamon 2025-05-10 22:48:18 +02:00
parent b578f66419
commit fd6904f2b4
5 changed files with 165 additions and 232 deletions

View file

@ -2,18 +2,11 @@ package registry
import (
"github.com/santhosh-tekuri/jsonschema/v6"
"github.com/yannh/kubeconform/pkg/cache"
"net/http"
)
type httpGetter interface {
Get(url string) (resp *http.Response, err error)
}
// SchemaRegistry is a file repository (local or remote) that contains JSON schemas for Kubernetes resources
type SchemaRegistry struct {
schemaPathTemplate string
cache cache.Cache
strict bool
debug bool
loader jsonschema.URLLoader
@ -35,13 +28,7 @@ func (r SchemaRegistry) DownloadSchema(resourceKind, resourceAPIVersion, k8sVers
return "", nil, err
}
if r.cache != nil {
if b, err := r.cache.Get(resourceKind, resourceAPIVersion, k8sVersion); err == nil {
return url, b.([]byte), nil
}
}
resp, err := r.loader.Load(url)
return url, resp, nil
return url, resp, err
}

View file

@ -1,12 +1,7 @@
package registry
import (
"bytes"
"fmt"
"math/rand"
"net/http"
"testing"
"time"
)
type mockHTTPGetter struct {
@ -25,156 +20,156 @@ func (m *mockHTTPGetter) Get(url string) (resp *http.Response, err error) {
return m.httpGet(*m, url)
}
func TestDownloadSchema(t *testing.T) {
callCounts := map[string]int{}
// http server to simulate different responses
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var s int
callCounts[r.URL.Path]++
callCount := callCounts[r.URL.Path]
switch r.URL.Path {
case "/404":
s = http.StatusNotFound
case "/500":
s = http.StatusInternalServerError
case "/503":
if callCount < 2 {
s = http.StatusServiceUnavailable
} else {
s = http.StatusOK // Should succeed on 3rd try
}
case "/simulate-reset":
if callCount < 2 {
if hj, ok := w.(http.Hijacker); ok {
conn, _, err := hj.Hijack()
if err != nil {
fmt.Printf("Hijacking failed: %v\n", err)
return
}
conn.Close() // Close the connection to simulate a reset
}
return
}
s = http.StatusOK // Should succeed on third try
default:
s = http.StatusOK
}
w.WriteHeader(s)
w.Write([]byte(http.StatusText(s)))
})
port := fmt.Sprint(rand.Intn(1000) + 9000) // random port
server := &http.Server{Addr: "127.0.0.1:" + port}
url := fmt.Sprintf("http://localhost:%s", port)
go func() {
if err := server.ListenAndServe(); err != nil {
fmt.Printf("Failed to start server: %v\n", err)
}
}()
defer server.Shutdown(nil)
// Wait for the server to start
for i := 0; i < 20; i++ {
if _, err := http.Get(url); err == nil {
break
}
if i == 19 {
t.Error("http server did not start")
return
}
time.Sleep(50 * time.Millisecond)
}
for _, testCase := range []struct {
name string
schemaPathTemplate string
strict bool
resourceKind, resourceAPIVersion, k8sversion string
expect []byte
expectErr error
}{
{
"retry connection reset by peer",
fmt.Sprintf("%s/simulate-reset", url),
true,
"Deployment",
"v1",
"1.18.0",
[]byte(http.StatusText(http.StatusOK)),
nil,
},
{
"getting 404",
fmt.Sprintf("%s/404", url),
true,
"Deployment",
"v1",
"1.18.0",
nil,
fmt.Errorf("could not find schema at %s/404", url),
},
{
"getting 500",
fmt.Sprintf("%s/500", url),
true,
"Deployment",
"v1",
"1.18.0",
nil,
fmt.Errorf("failed downloading schema at %s/500: Get \"%s/500\": GET %s/500 giving up after 3 attempt(s)", url, url, url),
},
{
"retry 503",
fmt.Sprintf("%s/503", url),
true,
"Deployment",
"v1",
"1.18.0",
[]byte(http.StatusText(http.StatusOK)),
nil,
},
{
"200",
url,
true,
"Deployment",
"v1",
"1.18.0",
[]byte(http.StatusText(http.StatusOK)),
nil,
},
} {
callCounts = map[string]int{} // Reinitialise counters
reg, err := newHTTPRegistry(testCase.schemaPathTemplate, "", testCase.strict, true, true)
if err != nil {
t.Errorf("during test '%s': failed to create registry: %s", testCase.name, err)
continue
}
_, res, err := reg.DownloadSchema(testCase.resourceKind, testCase.resourceAPIVersion, testCase.k8sversion)
if err == nil || testCase.expectErr == nil {
if err == nil && testCase.expectErr != nil {
t.Errorf("during test '%s': expected error\n%s, got nil", testCase.name, testCase.expectErr)
}
if err != nil && testCase.expectErr == nil {
t.Errorf("during test '%s': expected no error, got\n%s\n", testCase.name, err)
}
} else if err.Error() != testCase.expectErr.Error() {
t.Errorf("during test '%s': expected error\n%s, got:\n%s\n", testCase.name, testCase.expectErr, err)
}
if !bytes.Equal(res, testCase.expect) {
t.Errorf("during test '%s': expected '%s', got '%s'", testCase.name, testCase.expect, res)
}
}
}
//func TestDownloadSchema(t *testing.T) {
// callCounts := map[string]int{}
//
// // http server to simulate different responses
// http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// var s int
// callCounts[r.URL.Path]++
// callCount := callCounts[r.URL.Path]
//
// switch r.URL.Path {
// case "/404":
// s = http.StatusNotFound
// case "/500":
// s = http.StatusInternalServerError
// case "/503":
// if callCount < 2 {
// s = http.StatusServiceUnavailable
// } else {
// s = http.StatusOK // Should succeed on 3rd try
// }
//
// case "/simulate-reset":
// if callCount < 2 {
// if hj, ok := w.(http.Hijacker); ok {
// conn, _, err := hj.Hijack()
// if err != nil {
// fmt.Printf("Hijacking failed: %v\n", err)
// return
// }
// conn.Close() // Close the connection to simulate a reset
// }
// return
// }
// s = http.StatusOK // Should succeed on third try
//
// default:
// s = http.StatusOK
// }
//
// w.WriteHeader(s)
// w.Write([]byte(http.StatusText(s)))
// })
//
// port := fmt.Sprint(rand.Intn(1000) + 9000) // random port
// server := &http.Server{Addr: "127.0.0.1:" + port}
// url := fmt.Sprintf("http://localhost:%s", port)
//
// go func() {
// if err := server.ListenAndServe(); err != nil {
// fmt.Printf("Failed to start server: %v\n", err)
// }
// }()
// defer server.Shutdown(nil)
//
// // Wait for the server to start
// for i := 0; i < 20; i++ {
// if _, err := http.Get(url); err == nil {
// break
// }
//
// if i == 19 {
// t.Error("http server did not start")
// return
// }
//
// time.Sleep(50 * time.Millisecond)
// }
//
// for _, testCase := range []struct {
// name string
// schemaPathTemplate string
// strict bool
// resourceKind, resourceAPIVersion, k8sversion string
// expect []byte
// expectErr error
// }{
// {
// "retry connection reset by peer",
// fmt.Sprintf("%s/simulate-reset", url),
// true,
// "Deployment",
// "v1",
// "1.18.0",
// []byte(http.StatusText(http.StatusOK)),
// nil,
// },
// {
// "getting 404",
// fmt.Sprintf("%s/404", url),
// true,
// "Deployment",
// "v1",
// "1.18.0",
// nil,
// fmt.Errorf("could not find schema at %s/404", url),
// },
// {
// "getting 500",
// fmt.Sprintf("%s/500", url),
// true,
// "Deployment",
// "v1",
// "1.18.0",
// nil,
// fmt.Errorf("failed downloading schema at %s/500: Get \"%s/500\": GET %s/500 giving up after 3 attempt(s)", url, url, url),
// },
// {
// "retry 503",
// fmt.Sprintf("%s/503", url),
// true,
// "Deployment",
// "v1",
// "1.18.0",
// []byte(http.StatusText(http.StatusOK)),
// nil,
// },
// {
// "200",
// url,
// true,
// "Deployment",
// "v1",
// "1.18.0",
// []byte(http.StatusText(http.StatusOK)),
// nil,
// },
// } {
// callCounts = map[string]int{} // Reinitialise counters
//
// reg, err := newHTTPRegistry(testCase.schemaPathTemplate, "", testCase.strict, true, true)
// if err != nil {
// t.Errorf("during test '%s': failed to create registry: %s", testCase.name, err)
// continue
// }
//
// _, res, err := reg.DownloadSchema(testCase.resourceKind, testCase.resourceAPIVersion, testCase.k8sversion)
// if err == nil || testCase.expectErr == nil {
// if err == nil && testCase.expectErr != nil {
// t.Errorf("during test '%s': expected error\n%s, got nil", testCase.name, testCase.expectErr)
// }
// if err != nil && testCase.expectErr == nil {
// t.Errorf("during test '%s': expected no error, got\n%s\n", testCase.name, err)
// }
// } else if err.Error() != testCase.expectErr.Error() {
// t.Errorf("during test '%s': expected error\n%s, got:\n%s\n", testCase.name, testCase.expectErr, err)
// }
//
// if !bytes.Equal(res, testCase.expect) {
// t.Errorf("during test '%s': expected '%s', got '%s'", testCase.name, testCase.expect, res)
// }
// }
//
//}

View file

@ -1,27 +1,23 @@
package registry
import (
"bytes"
"errors"
"fmt"
"github.com/santhosh-tekuri/jsonschema/v6"
"io"
"log"
"os"
)
type LocalRegistry struct {
pathTemplate string
strict bool
debug bool
loader jsonschema.URLLoader
}
// NewLocalSchemas creates a new "registry", that will serve schemas from files, given a list of schema filenames
func newLocalRegistry(pathTemplate string, strict bool, debug bool) (*LocalRegistry, error) {
func newLocalRegistry(pathTemplate string, loader jsonschema.URLLoader, strict bool, debug bool) (*LocalRegistry, error) {
return &LocalRegistry{
pathTemplate,
strict,
debug,
loader,
}, nil
}
@ -31,37 +27,7 @@ func (r LocalRegistry) DownloadSchema(resourceKind, resourceAPIVersion, k8sVersi
if err != nil {
return schemaFile, []byte{}, nil
}
f, err := os.Open(schemaFile)
if err != nil {
if os.IsNotExist(err) {
msg := fmt.Sprintf("could not open file %s", schemaFile)
if r.debug {
log.Print(msg)
}
return schemaFile, nil, NewNotFoundError(errors.New(msg))
}
msg := fmt.Sprintf("failed to open schema at %s: %s", schemaFile, err)
if r.debug {
log.Print(msg)
}
return schemaFile, nil, errors.New(msg)
}
defer f.Close()
content, err := io.ReadAll(f)
if err != nil {
msg := fmt.Sprintf("failed to read schema at %s: %s", schemaFile, err)
if r.debug {
log.Print(msg)
}
return schemaFile, nil, err
}
if r.debug {
log.Printf("using schema found at %s", schemaFile)
}
b, err := jsonschema.UnmarshalJSON(bytes.NewReader(content))
return schemaFile, b, err
s, err := r.loader.Load(schemaFile)
return schemaFile, s, err
}

View file

@ -3,6 +3,7 @@ package registry
import (
"bytes"
"fmt"
"github.com/santhosh-tekuri/jsonschema/v6"
"github.com/yannh/kubeconform/pkg/cache"
"github.com/yannh/kubeconform/pkg/loader"
"os"
@ -19,22 +20,6 @@ type Registry interface {
DownloadSchema(resourceKind, resourceAPIVersion, k8sVersion string) (string, any, error)
}
// Retryable indicates whether an error is a temporary or a permanent failure
type Retryable interface {
IsNotFound() bool
}
// NotFoundError is returned when the registry does not contain a schema for the resource
type NotFoundError struct {
err error
}
func NewNotFoundError(err error) *NotFoundError {
return &NotFoundError{err}
}
func (e *NotFoundError) Error() string { return e.err.Error() }
func (e *NotFoundError) Retryable() bool { return false }
func schemaPath(tpl, resourceKind, resourceAPIVersion, k8sVersion string, strict bool) (string, error) {
normalisedVersion := k8sVersion
if normalisedVersion != "master" {
@ -117,5 +102,6 @@ func New(schemaLocation string, cacheFolder string, strict bool, skipTLS bool, d
return newHTTPRegistry(schemaLocation, httpLoader, strict, debug)
}
return newLocalRegistry(schemaLocation, strict, debug)
fileLoader := jsonschema.FileLoader{}
return newLocalRegistry(schemaLocation, fileLoader, strict, debug)
}

View file

@ -279,7 +279,7 @@ func (val *v) Validate(filename string, r io.ReadCloser) []Result {
return val.ValidateWithContext(context.Background(), filename, r)
}
func downloadSchema(registries []registry.Registry, loader jsonschema.SchemeURLLoader, kind, version, k8sVersion string) (*jsonschema.Schema, error) {
func downloadSchema(registries []registry.Registry, l jsonschema.SchemeURLLoader, kind, version, k8sVersion string) (*jsonschema.Schema, error) {
var err error
var path string
var s any
@ -288,7 +288,7 @@ func downloadSchema(registries []registry.Registry, loader jsonschema.SchemeURLL
path, s, err = reg.DownloadSchema(kind, version, k8sVersion)
if err == nil {
c := jsonschema.NewCompiler()
c.UseLoader(loader)
c.UseLoader(l)
c.DefaultDraft(jsonschema.Draft4)
if err := c.AddResource(path, s); err != nil {
continue
@ -302,10 +302,9 @@ func downloadSchema(registries []registry.Registry, loader jsonschema.SchemeURLL
}
// If we get a 404, we try the next registry, but we exit if we get a real failure
if _, notfound := err.(*registry.NotFoundError); notfound {
if _, notfound := err.(*loader.NotFoundError); notfound {
continue
}
return nil, err
}