[ISS-1258][DC] updating the detect_aws_credentials to support temp json files

This commit is contained in:
DanielConnelly 2026-06-02 15:14:09 +01:00
parent fa6b006f0e
commit 10ce480836
No known key found for this signature in database
3 changed files with 70 additions and 7 deletions

View file

@ -5,13 +5,12 @@ import configparser
import os import os
from collections.abc import Sequence from collections.abc import Sequence
from typing import NamedTuple from typing import NamedTuple
import json
class BadFile(NamedTuple): class BadFile(NamedTuple):
filename: str filename: str
key: str key: str
def get_aws_cred_files_from_env() -> set[str]: def get_aws_cred_files_from_env() -> set[str]:
"""Extract credential file paths from environment variables.""" """Extract credential file paths from environment variables."""
return { return {
@ -23,17 +22,38 @@ def get_aws_cred_files_from_env() -> set[str]:
if env_var in os.environ if env_var in os.environ
} }
def get_aws_secrets_from_env() -> set[str]: def get_aws_secrets_from_env() -> set[str]:
"""Extract AWS secrets from environment variables.""" """Extract AWS secrets from environment variables."""
keys = set() keys = set()
for env_var in ( for env_var in (
'AWS_SECRET_ACCESS_KEY', 'AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN', 'AWS_SECRET_ACCESS_KEY', 'AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN',
): ):
if os.environ.get(env_var): if os.environ.get(env_var):
keys.add(os.environ[env_var]) keys.add(os.environ[env_var])
return keys return keys
def get_aws_secrets_from_json_file(json_credentials_file: str) -> set[str]:
"""Extract AWS secrets from JSON configuration files.
Read a JSON-style configuration file and return a set with all found AWS
secret access keys.
"""
aws_credentials_file_path = os.path.expanduser(json_credentials_file)
if not os.path.exists(aws_credentials_file_path):
return set()
with open(aws_credentials_file_path, 'r') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
return set()
keys = set()
for var in ('AccessKeyId', 'SecretAccessKey', 'SessionToken', 'aws_secret_access_key', 'aws_security_token', 'aws_session_token'):
if var in data.get('Credentials', {}):
keys.add(data['Credentials'][var])
return keys
def get_aws_secrets_from_file(credentials_file: str) -> set[str]: def get_aws_secrets_from_file(credentials_file: str) -> set[str]:
"""Extract AWS secrets from configuration files. """Extract AWS secrets from configuration files.
@ -54,8 +74,8 @@ def get_aws_secrets_from_file(credentials_file: str) -> set[str]:
keys = set() keys = set()
for section in parser.sections(): for section in parser.sections():
for var in ( for var in (
'aws_secret_access_key', 'aws_security_token', 'aws_secret_access_key', 'aws_security_token',
'aws_session_token', 'aws_session_token',
): ):
try: try:
key = parser.get(section, var).strip() key = parser.get(section, var).strip()
@ -104,6 +124,16 @@ def main(argv: Sequence[str] | None = None) -> int:
'secret keys. Can be passed multiple times.' 'secret keys. Can be passed multiple times.'
), ),
) )
parser.add_argument(
'--json-credentials-file',
dest='json_credential_file_locations',
action='append',
default=['~/.aws/cli/cache/', '~/.aws/login/cache/'],
help=(
'Location of additional AWS JSON credential file from which to get '
'secret keys. Can be passed multiple times.'
),
)
parser.add_argument( parser.add_argument(
'--allow-missing-credentials', '--allow-missing-credentials',
dest='allow_missing_credentials', dest='allow_missing_credentials',
@ -113,6 +143,13 @@ def main(argv: Sequence[str] | None = None) -> int:
args = parser.parse_args(argv) args = parser.parse_args(argv)
credential_files = set(args.credentials_file) credential_files = set(args.credentials_file)
json_credential_file_locations = set(args.json_credential_file_locations)
json_credential_files = set()
for json_credential_file_location in json_credential_file_locations:
if os.path.isdir(os.path.expanduser(json_credential_file_location)):
for filename in os.listdir(os.path.expanduser(json_credential_file_location)):
if filename.endswith('.json'):
json_credential_files.add(os.path.join(json_credential_file_location, filename))
# Add the credentials files configured via environment variables to the set # Add the credentials files configured via environment variables to the set
# of files to to gather AWS secrets from. # of files to to gather AWS secrets from.
@ -122,6 +159,8 @@ def main(argv: Sequence[str] | None = None) -> int:
for credential_file in credential_files: for credential_file in credential_files:
keys |= get_aws_secrets_from_file(credential_file) keys |= get_aws_secrets_from_file(credential_file)
for json_credential_file in json_credential_files:
keys |= get_aws_secrets_from_json_file(json_credential_file)
# Secrets might be part of environment variables, so add such secrets to # Secrets might be part of environment variables, so add such secrets to
# the set of keys. # the set of keys.
keys |= get_aws_secrets_from_env() keys |= get_aws_secrets_from_env()

View file

@ -0,0 +1,7 @@
{
"accessToken": {
"accessKeyId": "tempAccessKeyId",
"secretAccessKey": "tempSecretAccessKey",
"sessionToken": "tempSessionToken"
}
}

View file

@ -7,6 +7,7 @@ import pytest
from pre_commit_hooks.detect_aws_credentials import get_aws_cred_files_from_env from pre_commit_hooks.detect_aws_credentials import get_aws_cred_files_from_env
from pre_commit_hooks.detect_aws_credentials import get_aws_secrets_from_env from pre_commit_hooks.detect_aws_credentials import get_aws_secrets_from_env
from pre_commit_hooks.detect_aws_credentials import get_aws_secrets_from_file from pre_commit_hooks.detect_aws_credentials import get_aws_secrets_from_file
from pre_commit_hooks.detect_aws_credentials import get_aws_secrets_from_json_file
from pre_commit_hooks.detect_aws_credentials import main from pre_commit_hooks.detect_aws_credentials import main
from testing.util import get_resource_path from testing.util import get_resource_path
@ -67,6 +68,22 @@ def test_get_aws_secrets_from_env(env_vars, values):
with patch.dict('os.environ', env_vars, clear=True): with patch.dict('os.environ', env_vars, clear=True):
assert get_aws_secrets_from_env() == values assert get_aws_secrets_from_env() == values
@pytest.mark.parametrize(
('filename', 'expected_keys'),
(
(
'aws_temp_secrets_file.json',
{"tempAccessKeyId", "tempSecretAccessKey", "tempSessionToken"},
),
('nonsense.txt', set()),
('ok_json.json', set()),
),
)
def test_get_aws_secrets_from_json_file(filename, expected_keys):
"""Test that reading secrets from files works."""
keys = get_aws_secrets_from_json_file(get_resource_path(filename))
assert keys == expected_keys
@pytest.mark.parametrize( @pytest.mark.parametrize(
('filename', 'expected_keys'), ('filename', 'expected_keys'),