|
22 | 22 | teardown_function as default_teardown_function,
|
23 | 23 | set_key_value_in_config,
|
24 | 24 | st_init_common_args,
|
| 25 | + reset, |
25 | 26 | )
|
26 | 27 |
|
27 | 28 | from supertokens_python.recipe.session.jwks import (
|
@@ -623,3 +624,78 @@ def callback():
|
623 | 624 | # With cache lifetime being 2s, we expect the cache to be missed 5 times
|
624 | 625 | assert next(not_returned_from_cache_count) == 1 + 5 # 1 original + 5 misses
|
625 | 626 | JWKSConfig.update(original_jwks_config)
|
| 627 | + |
| 628 | + |
| 629 | +from pytest import fixture |
| 630 | +from fastapi import FastAPI, Request, Depends |
| 631 | +from fastapi.testclient import TestClient |
| 632 | +from supertokens_python.framework.fastapi import get_middleware |
| 633 | +from supertokens_python.recipe.session.framework.fastapi import verify_session |
| 634 | +from supertokens_python.recipe.session.asyncio import create_new_session |
| 635 | +from supertokens_python.recipe.session import SessionContainer |
| 636 | + |
| 637 | + |
| 638 | +@fixture(scope="function") |
| 639 | +async def client(): |
| 640 | + app = FastAPI() |
| 641 | + app.add_middleware(get_middleware()) |
| 642 | + |
| 643 | + @app.get("/login") |
| 644 | + async def login(request: Request): # type: ignore |
| 645 | + user_id = "test" |
| 646 | + s = await create_new_session(request, user_id, {}, {}) |
| 647 | + return {"jwt": s.get_access_token()} |
| 648 | + |
| 649 | + @app.get("/sessioninfo") |
| 650 | + async def info(s: SessionContainer = Depends(verify_session())): # type: ignore |
| 651 | + user_id = s.get_user_id() |
| 652 | + return {"user_id": user_id} |
| 653 | + |
| 654 | + return TestClient(app) |
| 655 | + |
| 656 | + |
| 657 | +async def test_session_verification_of_jwt_with_dynamic_signing_key_mode_works_as_expected( |
| 658 | + client: TestClient, |
| 659 | +): |
| 660 | + args = get_st_init_args( |
| 661 | + recipe_list=[session.init(use_dynamic_access_token_signing_key=False)] |
| 662 | + ) |
| 663 | + init(**args) # type: ignore |
| 664 | + start_st() |
| 665 | + |
| 666 | + # Create a session: |
| 667 | + res = client.get("/login") |
| 668 | + assert res.status_code == 200 |
| 669 | + |
| 670 | + jwt_with_static_key = res.json()["jwt"] |
| 671 | + |
| 672 | + res = client.get( |
| 673 | + "/sessioninfo", headers={"Authorization": f"Bearer {jwt_with_static_key}"} |
| 674 | + ) |
| 675 | + assert res.status_code == 200 |
| 676 | + assert res.json()["user_id"] == "test" |
| 677 | + |
| 678 | + reset(stop_core=False) |
| 679 | + |
| 680 | + # initalize again with use_dynamic_access_token_signing_key=True |
| 681 | + args = get_st_init_args( |
| 682 | + recipe_list=[session.init(use_dynamic_access_token_signing_key=True)] |
| 683 | + ) |
| 684 | + init(**args) # type: ignore |
| 685 | + |
| 686 | + from supertokens_python.recipe.session.exceptions import TryRefreshTokenError |
| 687 | + |
| 688 | + res = client.get( |
| 689 | + "/sessioninfo", headers={"Authorization": f"Bearer {jwt_with_static_key}"} |
| 690 | + ) |
| 691 | + assert res.status_code == 401 |
| 692 | + assert res.json() == {"message": "try refresh token"} |
| 693 | + |
| 694 | + try: |
| 695 | + res = await get_session_without_request_response(jwt_with_static_key) |
| 696 | + assert False |
| 697 | + except TryRefreshTokenError as e: |
| 698 | + assert ( |
| 699 | + str(e) |
| 700 | + == "The access token doesn't match the useDynamicAccessTokenSigningKey setting" |
| 701 | + ) |
0 commit comments