|
| 1 | +""" |
| 2 | +The configuration file would look like this (sans those // comments): |
| 3 | +{ |
| 4 | + "tenant": "your_tenant_name", |
| 5 | + // Your target tenant, DNS name |
| 6 | + "client_id": "your_client_id", |
| 7 | + // Target app ID in Azure AD |
| 8 | + "scope": ["https://graph.microsoft.com/.default"], |
| 9 | + // Specific to Client Credentials Grant i.e. acquire_token_for_client(), |
| 10 | + // you don't specify, in the code, the individual scopes you want to access. |
| 11 | + // Instead, you statically declared them when registering your application. |
| 12 | + // Therefore the only possible scope is "resource/.default" |
| 13 | + // (here "https://graph.microsoft.com/.default") |
| 14 | + // which means "the static permissions defined in the application". |
| 15 | + "vault_tenant": "your_vault_tenant_name", |
| 16 | + // Your Vault tenant may be different to your target tenant |
| 17 | + // If that's not the case, you can set this to the same |
| 18 | + // as "tenant" |
| 19 | + "vault_clientid": "your_vault_client_id", |
| 20 | + // Client ID of your vault app in your vault tenant |
| 21 | + "vault_clientsecret": "your_vault_client_secret", |
| 22 | + // Secret for your vault app |
| 23 | + "vault_url": "your_vault_url", |
| 24 | + // URL of your vault app |
| 25 | + "cert": "your_cert_name", |
| 26 | + // Name of your certificate in your vault |
| 27 | + "cert_thumb": "your_cert_thumbprint", |
| 28 | + // Thumbprint of your certificate |
| 29 | + "endpoint": "https://graph.microsoft.com/v1.0/users" |
| 30 | + // For this resource to work, you need to visit Application Permissions |
| 31 | + // page in portal, declare scope User.Read.All, which needs admin consent |
| 32 | + // https://github.com/Azure-Samples/ms-identity-python-daemon/blob/master/2-Call-MsGraph-WithCertificate/README.md |
| 33 | +} |
| 34 | +You can then run this sample with a JSON configuration file: |
| 35 | + python sample.py parameters.json |
| 36 | +""" |
| 37 | + |
| 38 | +import base64 |
| 39 | +import json |
| 40 | +import logging |
| 41 | +import requests |
| 42 | +import sys |
| 43 | +import time |
| 44 | +import uuid |
| 45 | +import msal |
| 46 | + |
| 47 | +# Optional logging |
| 48 | +# logging.basicConfig(level=logging.DEBUG) # Enable DEBUG log for entire script |
| 49 | +# logging.getLogger("msal").setLevel(logging.INFO) # Optionally disable MSAL DEBUG logs |
| 50 | + |
| 51 | +from azure.keyvault import KeyVaultClient, KeyVaultAuthentication |
| 52 | +from azure.common.credentials import ServicePrincipalCredentials |
| 53 | +from cryptography.hazmat.backends import default_backend |
| 54 | +from cryptography.hazmat.primitives import hashes |
| 55 | + |
| 56 | +config = json.load(open(sys.argv[1])) |
| 57 | + |
| 58 | +def auth_vault_callback(server, resource, scope): |
| 59 | + credentials = ServicePrincipalCredentials( |
| 60 | + client_id=config['vault_clientid'], |
| 61 | + secret=config['vault_clientsecret'], |
| 62 | + tenant=config['vault_tenant'], |
| 63 | + resource='https://vault.azure.net' |
| 64 | + ) |
| 65 | + token = credentials.token |
| 66 | + return token['token_type'], token['access_token'] |
| 67 | + |
| 68 | + |
| 69 | +def make_vault_jwt(): |
| 70 | + |
| 71 | + header = { |
| 72 | + 'alg': 'RS256', |
| 73 | + 'typ': 'JWT', |
| 74 | + 'x5t': base64.b64encode( |
| 75 | + config['cert_thumb'].decode('hex')) |
| 76 | + } |
| 77 | + header_b64 = base64.b64encode(json.dumps(header).encode('utf-8')) |
| 78 | + |
| 79 | + body = { |
| 80 | + 'aud': "https://login.microsoftonline.com/%s/oauth2/token" % |
| 81 | + config['tenant'], |
| 82 | + 'exp': (int(time.time()) + 600), |
| 83 | + 'iss': config['client_id'], |
| 84 | + 'jti': str(uuid.uuid4()), |
| 85 | + 'nbf': int(time.time()), |
| 86 | + 'sub': config['client_id'] |
| 87 | + } |
| 88 | + body_b64 = base64.b64encode(json.dumps(body).encode('utf-8')) |
| 89 | + |
| 90 | + full_b64 = b'.'.join([header_b64, body_b64]) |
| 91 | + |
| 92 | + client = KeyVaultClient(KeyVaultAuthentication(auth_vault_callback)) |
| 93 | + chosen_hash = hashes.SHA256() |
| 94 | + hasher = hashes.Hash(chosen_hash, default_backend()) |
| 95 | + hasher.update(full_b64) |
| 96 | + digest = hasher.finalize() |
| 97 | + signed_digest = client.sign(config['vault_url'], |
| 98 | + config['cert'], '', 'RS256', |
| 99 | + digest).result |
| 100 | + |
| 101 | + full_token = b'.'.join([full_b64, base64.b64encode(signed_digest)]) |
| 102 | + |
| 103 | + return full_token |
| 104 | + |
| 105 | + |
| 106 | +authority = "https://login.microsoftonline.com/%s" % config['tenant'] |
| 107 | + |
| 108 | +app = msal.ConfidentialClientApplication( |
| 109 | + config['client_id'], authority=authority, client_credential={"client_assertion": make_vault_jwt()} |
| 110 | + ) |
| 111 | + |
| 112 | +# The pattern to acquire a token looks like this. |
| 113 | +result = None |
| 114 | + |
| 115 | +# Firstly, looks up a token from cache |
| 116 | +# Since we are looking for token for the current app, NOT for an end user, |
| 117 | +# notice we give account parameter as None. |
| 118 | +result = app.acquire_token_silent(config["scope"], account=None) |
| 119 | + |
| 120 | +if not result: |
| 121 | + logging.info("No suitable token exists in cache. Let's get a new one from AAD.") |
| 122 | + result = app.acquire_token_for_client(scopes=config["scope"]) |
| 123 | + |
| 124 | +if "access_token" in result: |
| 125 | + # Calling graph using the access token |
| 126 | + graph_data = requests.get( # Use token to call downstream service |
| 127 | + config["endpoint"], |
| 128 | + headers={'Authorization': 'Bearer ' + result['access_token']},).json() |
| 129 | + print("Graph API call result: %s" % json.dumps(graph_data, indent=2)) |
| 130 | +else: |
| 131 | + print(result.get("error")) |
| 132 | + print(result.get("error_description")) |
| 133 | + print(result.get("correlation_id")) # You may need this when reporting a bug |
| 134 | + |
0 commit comments