#!/usr/bin/env python3 # Derived from https://github.com/instrumenta/openapi2jsonschema import yaml import json import sys import os import urllib.request import re if 'DISABLE_SSL_CERT_VALIDATION' in os.environ: import ssl ssl._create_default_https_context = ssl._create_unverified_context def test_additional_properties(): for test in iter([{ "input": {"something": {"properties": {}}}, "expect": {'something': {'properties': {}, "additionalProperties": False}} },{ "input": {"something": {"somethingelse": {}}}, "expect": {'something': {'somethingelse': {}}} }]): assert additional_properties(test["input"]) == test["expect"] def additional_properties(data, skip=False): "This recreates the behaviour of kubectl at https://github.com/kubernetes/kubernetes/blob/225b9119d6a8f03fcbe3cc3d590c261965d928d0/pkg/kubectl/validation/schema.go#L312" if isinstance(data, dict): if "properties" in data and not skip: if "additionalProperties" not in data: data["additionalProperties"] = False for _, v in data.items(): additional_properties(v) return data def test_replace_int_or_string(): for test in iter([{ "input": {"something": {"format": "int-or-string"}}, "expect": {'something': {'oneOf': [{'type': 'string'}, {'type': 'integer'}]}} },{ "input": {"something": {"format": "string"}}, "expect": {"something": {"format": "string"}}, }]): assert replace_int_or_string(test["input"]) == test["expect"] def replace_int_or_string(data): new = {} try: for k, v in iter(data.items()): new_v = v if isinstance(v, dict): if "format" in v and v["format"] == "int-or-string": new_v = {"oneOf": [{"type": "string"}, {"type": "integer"}]} else: new_v = replace_int_or_string(v) elif isinstance(v, list): new_v = list() for x in v: new_v.append(replace_int_or_string(x)) else: new_v = v new[k] = new_v return new except AttributeError: return data def allow_null_optional_fields(data, parent=None, grand_parent=None, key=None): new = {} try: for k, v in iter(data.items()): new_v = v if isinstance(v, dict): new_v = allow_null_optional_fields(v, data, parent, k) elif isinstance(v, list): new_v = list() for x in v: new_v.append(allow_null_optional_fields(x, v, parent, k)) elif isinstance(v, str): is_non_null_type = k == "type" and v != "null" has_required_fields = grand_parent and "required" in grand_parent if is_non_null_type and not has_required_fields: new_v = [v, "null"] new[k] = new_v return new except AttributeError: return data def append_no_duplicates(obj, key, value): """ Given a dictionary, lookup the given key, if it doesn't exist create a new array. Then check if the given value already exists in the array, if it doesn't add it. """ if key not in obj: obj[key] = [] if value not in obj[key]: obj[key].append(value) def uescape_decode(match): return match.group().encode().decode("unicode_escape") def write_schema_file(schema, filename): schemaJSON = "" schema = additional_properties(schema, skip=not os.getenv("DENY_ROOT_ADDITIONAL_PROPERTIES")) schema = replace_int_or_string(schema) schemaJSON = json.dumps(schema, indent=2) # Convert Unicode escape sequences to their actual Unicode character uescapes = re.compile(r"(?