|
16 | 16 |
|
17 | 17 | import json
|
18 | 18 | import logging
|
19 |
| -from os import path |
| 19 | +from os import environ, path |
20 | 20 | import re
|
21 | 21 | import subprocess
|
22 | 22 |
|
23 | 23 | from google.auth import exceptions
|
24 | 24 |
|
25 | 25 | CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
|
| 26 | +_CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json" |
| 27 | +_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG" |
26 | 28 | _CERT_PROVIDER_COMMAND = "cert_provider_command"
|
27 | 29 | _CERT_REGEX = re.compile(
|
28 | 30 | b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
|
@@ -63,26 +65,150 @@ def _check_dca_metadata_path(metadata_path):
|
63 | 65 | return metadata_path
|
64 | 66 |
|
65 | 67 |
|
66 |
| -def _read_dca_metadata_file(metadata_path): |
67 |
| - """Loads context aware metadata from the given path. |
| 68 | +def _load_json_file(path): |
| 69 | + """Reads and loads JSON from the given path. Used to read both X509 workload certificate and |
| 70 | + secure connect configurations. |
68 | 71 |
|
69 | 72 | Args:
|
70 |
| - metadata_path (str): context aware metadata path. |
| 73 | + path (str): the path to read from. |
71 | 74 |
|
72 | 75 | Returns:
|
73 |
| - Dict[str, str]: The metadata. |
| 76 | + Dict[str, str]: The JSON stored at the file. |
74 | 77 |
|
75 | 78 | Raises:
|
76 |
| - google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON. |
| 79 | + google.auth.exceptions.ClientCertError: If failed to parse the file as JSON. |
77 | 80 | """
|
78 | 81 | try:
|
79 |
| - with open(metadata_path) as f: |
80 |
| - metadata = json.load(f) |
| 82 | + with open(path) as f: |
| 83 | + json_data = json.load(f) |
81 | 84 | except ValueError as caught_exc:
|
82 | 85 | new_exc = exceptions.ClientCertError(caught_exc)
|
83 | 86 | raise new_exc from caught_exc
|
84 | 87 |
|
85 |
| - return metadata |
| 88 | + return json_data |
| 89 | + |
| 90 | + |
| 91 | +def _get_workload_cert_and_key(certificate_config_path=None): |
| 92 | + """Read the workload identity cert and key files specified in the certificate config provided. |
| 93 | + If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG" |
| 94 | + first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json". |
| 95 | +
|
| 96 | + Args: |
| 97 | + certificate_config_path (string): The certificate config path. If no path is provided, |
| 98 | + the environment variable will be checked first, then the well known gcloud location. |
| 99 | +
|
| 100 | + Returns: |
| 101 | + Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key |
| 102 | + bytes in PEM format. |
| 103 | +
|
| 104 | + Raises: |
| 105 | + google.auth.exceptions.ClientCertError: if problems occurs when retrieving |
| 106 | + the certificate or key information. |
| 107 | + """ |
| 108 | + absolute_path = _get_cert_config_path(certificate_config_path) |
| 109 | + if absolute_path is None: |
| 110 | + return None, None |
| 111 | + data = _load_json_file(absolute_path) |
| 112 | + |
| 113 | + if "cert_configs" not in data: |
| 114 | + raise exceptions.ClientCertError( |
| 115 | + 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format( |
| 116 | + absolute_path |
| 117 | + ) |
| 118 | + ) |
| 119 | + cert_configs = data["cert_configs"] |
| 120 | + |
| 121 | + if "workload" not in cert_configs: |
| 122 | + raise exceptions.ClientCertError( |
| 123 | + 'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format( |
| 124 | + absolute_path |
| 125 | + ) |
| 126 | + ) |
| 127 | + workload = cert_configs["workload"] |
| 128 | + |
| 129 | + if "cert_path" not in workload: |
| 130 | + raise exceptions.ClientCertError( |
| 131 | + 'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format( |
| 132 | + absolute_path |
| 133 | + ) |
| 134 | + ) |
| 135 | + cert_path = workload["cert_path"] |
| 136 | + |
| 137 | + if "key_path" not in workload: |
| 138 | + raise exceptions.ClientCertError( |
| 139 | + 'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format( |
| 140 | + absolute_path |
| 141 | + ) |
| 142 | + ) |
| 143 | + key_path = workload["key_path"] |
| 144 | + |
| 145 | + return _read_cert_and_key_files(cert_path, key_path) |
| 146 | + |
| 147 | + |
| 148 | +def _get_cert_config_path(certificate_config_path=None): |
| 149 | + """Gets the certificate configuration full path using the following order of precedence: |
| 150 | +
|
| 151 | + 1: Explicit override, if set |
| 152 | + 2: Environment variable, if set |
| 153 | + 3: Well-known location |
| 154 | +
|
| 155 | + Returns "None" if the selected config file does not exist. |
| 156 | +
|
| 157 | + Args: |
| 158 | + certificate_config_path (string): The certificate config path. If provided, the well known |
| 159 | + location and environment variable will be ignored. |
| 160 | +
|
| 161 | + Returns: |
| 162 | + The absolute path of the certificate config file, and None if the file does not exist. |
| 163 | + """ |
| 164 | + |
| 165 | + if certificate_config_path is None: |
| 166 | + env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None) |
| 167 | + if env_path is not None and env_path != "": |
| 168 | + certificate_config_path = env_path |
| 169 | + else: |
| 170 | + certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH |
| 171 | + |
| 172 | + certificate_config_path = path.expanduser(certificate_config_path) |
| 173 | + if not path.exists(certificate_config_path): |
| 174 | + return None |
| 175 | + return certificate_config_path |
| 176 | + |
| 177 | + |
| 178 | +def _read_cert_and_key_files(cert_path, key_path): |
| 179 | + cert_data = _read_cert_file(cert_path) |
| 180 | + key_data = _read_key_file(key_path) |
| 181 | + |
| 182 | + return cert_data, key_data |
| 183 | + |
| 184 | + |
| 185 | +def _read_cert_file(cert_path): |
| 186 | + with open(cert_path, "rb") as cert_file: |
| 187 | + cert_data = cert_file.read() |
| 188 | + |
| 189 | + cert_match = re.findall(_CERT_REGEX, cert_data) |
| 190 | + if len(cert_match) != 1: |
| 191 | + raise exceptions.ClientCertError( |
| 192 | + "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format( |
| 193 | + cert_path |
| 194 | + ) |
| 195 | + ) |
| 196 | + return cert_match[0] |
| 197 | + |
| 198 | + |
| 199 | +def _read_key_file(key_path): |
| 200 | + with open(key_path, "rb") as key_file: |
| 201 | + key_data = key_file.read() |
| 202 | + |
| 203 | + key_match = re.findall(_KEY_REGEX, key_data) |
| 204 | + if len(key_match) != 1: |
| 205 | + raise exceptions.ClientCertError( |
| 206 | + "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format( |
| 207 | + key_path |
| 208 | + ) |
| 209 | + ) |
| 210 | + |
| 211 | + return key_match[0] |
86 | 212 |
|
87 | 213 |
|
88 | 214 | def _run_cert_provider_command(command, expect_encrypted_key=False):
|
@@ -163,7 +289,7 @@ def get_client_ssl_credentials(
|
163 | 289 | metadata_path = _check_dca_metadata_path(context_aware_metadata_path)
|
164 | 290 |
|
165 | 291 | if metadata_path:
|
166 |
| - metadata_json = _read_dca_metadata_file(metadata_path) |
| 292 | + metadata_json = _load_json_file(metadata_path) |
167 | 293 |
|
168 | 294 | if _CERT_PROVIDER_COMMAND not in metadata_json:
|
169 | 295 | raise exceptions.ClientCertError("Cert provider command is not found")
|
|
0 commit comments