Skip to content

fix: Anti csrf should happen only when access token is passed while session is optional #369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [unreleased]


## Fixes

- Anti csrf check should happen only when access token is passed while session is optional

## [0.14.6] - 2023-06-22

### Changes and fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ async def get_session_from_request(
do_anti_csrf_check = normalise_http_method(request.method()) != "get"
if request_transfer_method == "header":
do_anti_csrf_check = False
if request_access_token is None:
do_anti_csrf_check = False

if do_anti_csrf_check and config.anti_csrf == "VIA_CUSTOM_HEADER":
if config.anti_csrf == "VIA_CUSTOM_HEADER":
Expand Down
63 changes: 60 additions & 3 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
# under the License.

from datetime import datetime, timedelta
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from unittest.mock import MagicMock

from fastapi import FastAPI
from fastapi import FastAPI, Depends
from fastapi.requests import Request
from fastapi.responses import JSONResponse
from fastapi.testclient import TestClient
from pytest import fixture, mark

Expand All @@ -41,7 +42,10 @@
merge_into_access_token_payload,
update_session_data_in_database,
)
from supertokens_python.recipe.session.interfaces import RecipeInterface
from supertokens_python.recipe.session.interfaces import (
RecipeInterface,
SessionContainer,
)
from supertokens_python.recipe.session.jwt import (
parse_jwt_without_signature_verification,
)
Expand All @@ -52,6 +56,7 @@
refresh_session,
revoke_session,
)
from supertokens_python.recipe.session.framework.fastapi import verify_session
from tests.utils import clean_st, reset, setup_st, start_st

pytestmark = mark.asyncio
Expand Down Expand Up @@ -251,6 +256,12 @@ async def create_api(request: Request): # type: ignore
await async_create_new_session(request, "test-user", {}, {})
return ""

@app.post("/sessioninfo-optional")
async def _session_info(s: Optional[SessionContainer] = Depends(verify_session(session_required=False))): # type: ignore
if s is not None:
return JSONResponse({"session": s.get_handle(), "user_id": s.get_user_id()})
return JSONResponse({"message": "no session"})

return TestClient(app)


Expand Down Expand Up @@ -724,3 +735,49 @@ async def test_that_verify_session_doesnt_always_call_core():
AllowedProcessStates.CALLING_SERVICE_IN_VERIFY
in ProcessState.get_instance().history
) # Core got called this time


async def test_anti_csrf_header_via_custom_header_check_happens_only_when_access_token_is_provided(
driver_config_client: TestClient,
):
args = get_st_init_args([session.init(anti_csrf="VIA_CUSTOM_HEADER", get_token_transfer_method=lambda *_: "cookie")]) # type: ignore
init(**args) # type: ignore
start_st()

response = driver_config_client.post("/create")
assert response.status_code == 200

# With access token:
# without RID:
response = driver_config_client.post("/sessioninfo-optional")
assert response.status_code == 401
assert response.json() == {"message": "try refresh token"}

# with RID:
response = driver_config_client.post(
"/sessioninfo-optional",
headers={
"rid": "session",
},
)
assert response.status_code == 200
assert list(response.json()) == ["session", "user_id"]

# Clear access tokens:
driver_config_client.cookies.clear()

# Without access tokens:
# without RID:
response = driver_config_client.post("/sessioninfo-optional")
assert response.status_code == 200
assert response.json() == {"message": "no session"}

# with RID:
response = driver_config_client.post(
"/sessioninfo-optional",
headers={
"rid": "session",
},
)
assert response.status_code == 200
assert response.json() == {"message": "no session"}