4
4
# ------------------------------------
5
5
import os
6
6
import asyncio
7
+ import argparse
7
8
8
9
from azure .identity import ManagedIdentityCredential
9
10
from azure .identity .aio import ManagedIdentityCredential as AsyncManagedIdentityCredential
10
11
from azure .storage .blob import BlobServiceClient
11
12
from azure .storage .blob .aio import BlobServiceClient as AsyncBlobServiceClient
12
13
13
14
14
- def run_sync ():
15
- credential = ManagedIdentityCredential ()
15
+ def run_sync (identity_type = "system" ):
16
+ """Run synchronous authentication using the specified identity type.
17
+
18
+ :param str identity_type: The type of managed identity to use ("system" or "user")
19
+ """
20
+ if identity_type == "user" and os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" ):
21
+ credential = ManagedIdentityCredential (client_id = os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" ))
22
+ storage_name = os .environ .get ("IDENTITY_STORAGE_NAME_USER_ASSIGNED" , os .environ .get ("IDENTITY_STORAGE_NAME" ))
23
+ else :
24
+ # Default to system-assigned identity
25
+ credential = ManagedIdentityCredential ()
26
+ storage_name = os .environ .get ("IDENTITY_STORAGE_NAME" )
27
+
28
+ if not storage_name :
29
+ print ("Storage account name not found in environment variables" )
30
+ return False
16
31
17
32
client = BlobServiceClient (
18
- account_url = f"https://{ os . environ [ 'IDENTITY_STORAGE_NAME' ] } .blob.core.windows.net" ,
33
+ account_url = f"https://{ storage_name } .blob.core.windows.net" ,
19
34
credential = credential ,
20
35
)
21
36
22
37
containers = client .list_containers ()
23
38
for container in containers :
24
39
print (container ["name" ])
25
40
26
- if os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" ) and os .environ .get (
27
- "IDENTITY_STORAGE_NAME_USER_ASSIGNED"
28
- ):
29
- credential = ManagedIdentityCredential (client_id = os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" ))
30
-
31
- client = BlobServiceClient (
32
- account_url = f"https://{ os .environ ['IDENTITY_STORAGE_NAME_USER_ASSIGNED' ]} .blob.core.windows.net" ,
33
- credential = credential ,
34
- )
41
+ print (f"Successfully acquired token with ManagedIdentityCredential (identity_type={ identity_type } )" )
42
+ return True
35
43
36
- containers = client .list_containers ()
37
- for container in containers :
38
- print (container ["name" ])
39
44
40
- print (f"Successfully acquired token with ManagedIdentityCredential" )
45
+ async def run_async (identity_type = "system" ):
46
+ """Run asynchronous authentication using the specified identity type.
41
47
48
+ :param str identity_type: The type of managed identity to use ("system" or "user")
49
+ """
50
+ if identity_type == "user" and os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" ):
51
+ credential = AsyncManagedIdentityCredential (
52
+ client_id = os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" )
53
+ )
54
+ storage_name = os .environ .get ("IDENTITY_STORAGE_NAME_USER_ASSIGNED" , os .environ .get ("IDENTITY_STORAGE_NAME" ))
55
+ else :
56
+ # Default to system-assigned identity
57
+ credential = AsyncManagedIdentityCredential ()
58
+ storage_name = os .environ .get ("IDENTITY_STORAGE_NAME" )
42
59
43
- async def run_async ():
44
- credential = AsyncManagedIdentityCredential ()
60
+ if not storage_name :
61
+ print ("Storage account name not found in environment variables" )
62
+ return False
45
63
46
64
client = AsyncBlobServiceClient (
47
- account_url = f"https://{ os . environ [ 'IDENTITY_STORAGE_NAME' ] } .blob.core.windows.net" ,
65
+ account_url = f"https://{ storage_name } .blob.core.windows.net" ,
48
66
credential = credential ,
49
67
)
50
68
@@ -54,29 +72,44 @@ async def run_async():
54
72
await client .close ()
55
73
await credential .close ()
56
74
57
- if os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" ) and os .environ .get (
58
- "IDENTITY_STORAGE_NAME_USER_ASSIGNED"
59
- ):
60
- credential = AsyncManagedIdentityCredential (
61
- client_id = os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" )
62
- )
63
-
64
- client = AsyncBlobServiceClient (
65
- account_url = f"https://{ os .environ ['IDENTITY_STORAGE_NAME_USER_ASSIGNED' ]} .blob.core.windows.net" ,
66
- credential = credential ,
67
- )
68
-
69
- async for container in client .list_containers ():
70
- print (container ["name" ])
71
-
72
- await client .close ()
73
- await credential .close ()
74
-
75
- print ("Successfully acquired token with async ManagedIdentityCredential" )
75
+ print (f"Successfully acquired token with async ManagedIdentityCredential (identity_type={ identity_type } )" )
76
+ return True
76
77
77
78
78
79
if __name__ == "__main__" :
79
- run_sync ()
80
- asyncio .run (run_async ())
81
-
82
- print ("Passed!" )
80
+ parser = argparse .ArgumentParser (description = "Test managed identity authentication in AKS" )
81
+ parser .add_argument (
82
+ "--identity-type" ,
83
+ choices = ["system" , "user" , "both" ],
84
+ default = "both" ,
85
+ help = "Type of managed identity to use (system, user, or both)" ,
86
+ )
87
+ args = parser .parse_args ()
88
+
89
+ success = True
90
+
91
+ if args .identity_type in ["system" , "both" ]:
92
+ print ("Testing with system-assigned managed identity:" )
93
+ if not run_sync ("system" ):
94
+ success = False
95
+
96
+ if args .identity_type in ["user" , "both" ] and os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" ):
97
+ print ("\n Testing with user-assigned managed identity:" )
98
+ if not run_sync ("user" ):
99
+ success = False
100
+
101
+ if args .identity_type in ["system" , "both" ]:
102
+ print ("\n Testing with async system-assigned managed identity:" )
103
+ if not asyncio .run (run_async ("system" )):
104
+ success = False
105
+
106
+ if args .identity_type in ["user" , "both" ] and os .environ .get ("IDENTITY_USER_DEFINED_IDENTITY_CLIENT_ID" ):
107
+ print ("\n Testing with async user-assigned managed identity:" )
108
+ if not asyncio .run (run_async ("user" )):
109
+ success = False
110
+
111
+ if success :
112
+ print ("\n Passed!" )
113
+ else :
114
+ print ("\n Failed!" )
115
+ exit (1 )
0 commit comments