kubeconform/scripts/helm/plugin_wrapper.py
2022-12-09 18:04:48 +00:00

552 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import logging
import os
import subprocess
import sys
import yaml
from glob import glob
def parse_args(add_chart=True, add_files=False, add_path=False, add_incl_excl=False):
# Command line options for helm, kubeconform and the plugin itself
args = {
"helm_tmpl": [],
"helm_build": [],
"kubeconform": [],
"wrapper": [],
}
# Define parser
parser = argparse.ArgumentParser(
description="Wrapper to run kubeconform for a Helm chart."
)
if add_path:
parser.add_argument(
"--charts-path",
metavar="PATH",
help="path to the directory with charts (default: charts)",
default="charts",
)
if add_incl_excl:
parser.add_argument(
"--include-charts",
metavar="LIST",
help="comma-separated list of chart names to include in the testing",
)
parser.add_argument(
"--exclude-charts",
metavar="LIST",
help="comma-separated list of chart names to exclude from the testing",
)
parser.add_argument(
"--cache", help="whether to use kubeconform cache", action="store_true"
)
parser.add_argument(
"--cache-dir",
metavar="DIR",
help="path to the cache directory (default: ~/.cache/kubeconform)",
default="~/.cache/kubeconform",
)
parser.add_argument(
"--config",
metavar="FILE",
help="config file name (default: .kubeconform)",
default=".kubeconform",
)
parser.add_argument(
"--values-dir",
metavar="DIR",
help="directory with optional values files for the tests (default: ci)",
default="ci",
)
parser.add_argument(
"--values-pattern",
metavar="PATTERN",
help="pattern to select the values files (default: *-values.yaml)",
default="*-values.yaml",
)
parser.add_argument(
"--debug",
help="debug output",
action="store_true",
)
group_helm_build = parser.add_argument_group(
"helm build", "Options passed to the 'helm build' command"
)
group_helm_build.add_argument(
"--skip-refresh",
help="do not refresh the local repository cache",
action="store_true",
)
group_helm_build.add_argument(
"--verify", help="verify the packages against signatures", action="store_true"
)
group_helm_tmpl = parser.add_argument_group(
"helm template", "Options passed to the 'helm template' command"
)
group_helm_tmpl.add_argument(
"-f",
"--values",
metavar="FILE",
help="values YAML file or URL (can specified multiple)",
action="append",
)
group_helm_tmpl.add_argument(
"-n",
"--namespace",
metavar="NAME",
help="namespace",
)
group_helm_tmpl.add_argument(
"-r",
"--release",
metavar="NAME",
help="release name",
)
if add_chart:
group_helm_tmpl.add_argument(
"CHART",
help="chart path (e.g. '.')",
)
group_kubeconform = parser.add_argument_group(
"kubeconform", "Options passsed to the 'kubeconform' command"
)
group_kubeconform.add_argument(
"--ignore-missing-schemas",
help="skip files with missing schemas instead of failing",
action="store_true",
)
group_kubeconform.add_argument(
"--insecure-skip-tls-verify",
help="disable verification of the server's SSL certificate",
action="store_true",
)
group_kubeconform.add_argument(
"--kubernetes-version",
metavar="VERSION",
help="version of Kubernetes to validate against, e.g. 1.18.0 (default: master)",
)
group_kubeconform.add_argument(
"--goroutines",
metavar="NUMBER",
help="number of goroutines to run concurrently (default: 4)",
)
group_kubeconform.add_argument(
"--output",
help="output format (default: text)",
choices=["json", "junit", "tap", "text"],
)
group_kubeconform.add_argument(
"--reject",
metavar="LIST",
help="comma-separated list of kinds or GVKs to reject",
)
group_kubeconform.add_argument(
"--schema-location",
metavar="LOCATION",
help="override schemas location search path (can specified multiple)",
action="append",
)
group_kubeconform.add_argument(
"--skip",
metavar="LIST",
help="comma-separated list of kinds or GVKs to ignore",
)
group_kubeconform.add_argument(
"--strict",
help="disallow additional properties not in schema or duplicated keys",
action="store_true",
)
group_kubeconform.add_argument(
"--summary",
help="print a summary at the end (ignored for junit output)",
action="store_true",
)
group_kubeconform.add_argument(
"--verbose",
help="print results for all resources (ignored for tap and junit output)",
action="store_true",
)
if add_files:
parser.add_argument(
"FILES",
help="files that have changed",
nargs="+",
)
# Parse the args
a = parser.parse_args()
# ### Populate the helm build options
if a.skip_refresh:
args["helm_build"] = ["--skip-refresh"]
if a.verify:
args["helm_build"] = ["--verify"]
# This must stay the last item from 'helm_build'!
if add_chart:
args["helm_build"] += [a.CHART]
# ### Populate the helm template options
if a.values:
for v in a.values:
args["helm_tmpl"] += ["--values", v]
if a.namespace is not None:
args["helm_tmpl"] += ["--namespace", a.namespace]
if a.release is not None:
args["helm_tmpl"] += [a.release]
# This must stay the last item from 'helm_tmpl'!
if add_chart:
args["helm_tmpl"] += [a.CHART]
# ### Polulate the kubeconform options
if a.cache:
args["kubeconform"] += ["-cache", os.path.expanduser(a.cache_dir)]
if a.ignore_missing_schemas is True:
args["kubeconform"] += ["-ignore-missing-schemas"]
if a.insecure_skip_tls_verify is True:
args["kubeconform"] += ["-insecure-skip-tls-verify"]
if a.kubernetes_version is not None:
args["kubeconform"] += ["-kubernetes-version", a.kubernetes_version]
if a.goroutines is not None:
args["kubeconform"] += ["-n", a.goroutines]
if a.output is not None:
args["kubeconform"] += ["-output", a.output]
if a.reject is True:
args["kubeconform"] += ["-reject"]
if a.schema_location:
for v in a.schema_location:
args["kubeconform"] += ["-schema-location", v]
if a.skip is True:
args["kubeconform"] += ["-skip"]
if a.strict is True:
args["kubeconform"] += ["-strict"]
if a.summary is True:
args["kubeconform"] += ["-summary"]
if a.verbose is True:
args["kubeconform"] += ["-verbose"]
# ### All args are wrapper options
args["wrapper"] = a
return args
def get_logger(debug):
if debug:
level = logging.DEBUG
else:
level = logging.ERROR
format = "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=format)
return logging.getLogger(__name__)
def parse_config(filename):
args = []
# Check if file exists
if not os.path.isfile(filename):
return args
# Read and parse the file
try:
with open(filename, "r") as stream:
try:
data = yaml.load(stream, Loader=yaml.Loader)
except yaml.YAMLError as e:
raise Exception("cannot parse YAML file '%s': %s" % (filename, e))
except IOError as e:
raise Exception("cannot open file '%s': %s" % (filename, e))
# Produce extra args out of the config file
if isinstance(data, dict):
for key, val in data.items():
if isinstance(val, list):
for v in val:
args.append("-%s=%s" % (key, v))
elif isinstance(v, dict):
# No deep dicts allowed in the config
continue
else:
args.append("-%s=%s" % (key, val))
return args
def get_values_files(values_dir, values_pattern):
values_files = []
# Get values files
if os.path.isdir(values_dir):
values_files = glob(os.path.join(values_dir, values_pattern))
return values_files
def run_helm_dependecy_build(args):
# Check if it's local chart
if not os.path.isfile(os.path.join(args[-1], "Chart.yaml")):
return
charts_dir = os.path.join(args[-1], "charts")
# Check if the dependency charts are already there
if os.path.isdir(charts_dir):
with open(os.path.join(args[-1], "Chart.yaml"), "r") as f:
try:
data = yaml.safe_load(f)
except yaml.YAMLError as e:
raise Exception("failed to parse Chart.yaml: %s" % e)
if "dependencies" in data:
for d in data["dependencies"]:
if not (
"name" in d
and "version" in d
and os.path.isfile(
os.path.join(
charts_dir, "%s-%s.tgz" % (d["name"], d["version"])
)
)
):
# Dependency missing, let's get it
break
else:
# All dependencies seem to be there so don't run anything
return
# Check if there is Chart.lock
if os.path.isfile(os.path.join(args[-1], "Chart.yaml")):
action = "update"
else:
action = "build"
# Run process
result = subprocess.run(
[
os.getenv("HELM_BIN", "helm"),
"dependency",
action,
]
+ args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Check for errors
if result.returncode != 0:
raise Exception(
"failed to run helm dependency build: rc=%d %s"
% (result.returncode, result.stderr)
)
def run_helm_template(args):
# Run process
result = subprocess.run(
[
os.getenv("HELM_BIN", "helm"),
"template",
]
+ args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Check for errors
if result.returncode != 0:
raise Exception(
"failed to run helm template: rc=%d %s" % (result.returncode, result.stderr)
)
return result
def run_kubeconform(args, input):
bin_file = "kubeconform"
# Try to use `HELM_PLUGIN_DIR` env var to determine location of kubeconform
helm_plugin_dir = os.getenv("HELM_PLUGIN_DIR", "")
helm_plugin_bin = os.path.join(helm_plugin_dir, "bin", bin_file)
if os.path.isfile(helm_plugin_bin):
bin_file = helm_plugin_bin
else:
helm_error = False
# Try to use `helm env` to determine location of kubeconform
try:
result = subprocess.run(
[
"helm",
"env",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
except Exception:
helm_error = True
if not helm_error:
for line in result.stdout.split("\n"):
if line.startswith("HELM_PLUGINS") and "=" in line:
_, plugins_path = line.split("=")
helm_plugin_bin = os.path.join(
plugins_path.strip('"'), bin_file, "bin", bin_file
)
if os.path.isfile(helm_plugin_bin):
bin_file = helm_plugin_bin
# Create the cache dir
if "-cache" in args:
c_idx = args.index("-cache")
c_dir = args[c_idx + 1]
if not os.path.isdir(c_dir):
try:
os.mkdir(c_dir, 0o755)
except OSError as e:
raise Exception("failed to create cache directory: %s" % e)
# Run process
result = subprocess.run(
[
bin_file,
]
+ args,
input=input,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Check for errors
if result.returncode != 0:
raise Exception(
"failed to run kubeconform: rc=%d\n%s%s"
% (result.returncode, result.stderr, result.stdout)
)
return result
def run_test(args, values_file=None):
values_args = []
# Add extra values parameter if any file is specified
if values_file:
values_args = [
"--values",
values_file,
]
# Build Helm dependencies
try:
run_helm_dependecy_build(
args["helm_build"],
)
except Exception as e:
raise Exception("dependency build failed: %s" % e)
# Get templated output
try:
helm_result = run_helm_template(
args["helm_tmpl"] + values_args,
)
except Exception as e:
raise Exception("templating failed: %s" % e)
# Get kubeconform output
try:
kubeconform_result = run_kubeconform(
args["kubeconform"],
helm_result.stdout,
)
except Exception as e:
raise Exception("kubeconform failed: %s" % e)
# Print results
if kubeconform_result.stdout:
print(kubeconform_result.stdout.rstrip())
def main():
# Parse args
args = parse_args()
# Ger logger
log = get_logger(args["wrapper"].debug)
# Parse config file
config_args = parse_config(
args["wrapper"].config,
)
# Merge the args from config file and from command line
if config_args:
args["kubeconform"] = config_args + args["kubeconform"]
# Get list of values files
values_files = get_values_files(
args["wrapper"].values_dir,
args["wrapper"].values_pattern,
)
# Run tests
try:
if values_files:
for values_file in values_files:
log.debug("Testing with CI values file %s" % values_file)
run_test(args, values_file)
else:
log.debug("Testing without CI values files")
run_test(args)
except Exception as e:
log.error("Testing failed: %s" % e)
sys.exit(1)
if __name__ == "__main__":
main()