Enhance OAuth security, PKCE, and state validation

- Enforced stricter PKCE requirements by rejecting insecure 'plain' method for public clients.
- Transitioned client secret hashing to bcrypt for improved security and migration compatibility.
- Added constant-time comparison for state parameter validation to prevent timing attacks.
- Improved error handling and logging for OAuth workflows, including malformed headers and invalid scopes.
- Upgraded Google OIDC token validation to verify both signature and nonce.
- Refactored OAuth service methods and schemas for better readability, consistency, and compliance with RFC specifications.
This commit is contained in:
Felipe Cardoso
2025-11-26 00:14:26 +01:00
parent 0ea428b718
commit dc875c5c95
6 changed files with 284 additions and 159 deletions

View File

@@ -196,28 +196,27 @@ async def authorize(
valid_scopes = provider_service.validate_scopes(client, requested_scopes)
except provider_service.InvalidScopeError as e:
# Redirect with error
error_params = {
"error": e.error,
"error_description": e.error_description,
}
scope_error_params: dict[str, str] = {"error": e.error}
if e.error_description:
scope_error_params["error_description"] = e.error_description
if state:
error_params["state"] = state
scope_error_params["state"] = state
return RedirectResponse(
url=f"{redirect_uri}?{urlencode(error_params)}",
url=f"{redirect_uri}?{urlencode(scope_error_params)}",
status_code=status.HTTP_302_FOUND,
)
# Public clients MUST use PKCE
if client.client_type == "public":
if not code_challenge or code_challenge_method != "S256":
error_params = {
pkce_error_params: dict[str, str] = {
"error": "invalid_request",
"error_description": "PKCE with S256 is required for public clients",
}
if state:
error_params["state"] = state
pkce_error_params["state"] = state
return RedirectResponse(
url=f"{redirect_uri}?{urlencode(error_params)}",
url=f"{redirect_uri}?{urlencode(pkce_error_params)}",
status_code=status.HTTP_302_FOUND,
)
@@ -226,16 +225,18 @@ async def authorize(
# Store authorization request in session and redirect to login
# The frontend will handle the return URL
login_url = f"{settings.FRONTEND_URL}/login"
return_params = urlencode({
"oauth_authorize": "true",
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": " ".join(valid_scopes),
"state": state,
"code_challenge": code_challenge or "",
"code_challenge_method": code_challenge_method or "",
"nonce": nonce or "",
})
return_params = urlencode(
{
"oauth_authorize": "true",
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": " ".join(valid_scopes),
"state": state,
"code_challenge": code_challenge or "",
"code_challenge_method": code_challenge_method or "",
"nonce": nonce or "",
}
)
return RedirectResponse(
url=f"{login_url}?return_to=/auth/consent?{return_params}",
status_code=status.HTTP_302_FOUND,
@@ -248,16 +249,18 @@ async def authorize(
if not has_consent:
# Redirect to consent page
consent_params = urlencode({
"client_id": client_id,
"client_name": client.client_name,
"redirect_uri": redirect_uri,
"scope": " ".join(valid_scopes),
"state": state,
"code_challenge": code_challenge or "",
"code_challenge_method": code_challenge_method or "",
"nonce": nonce or "",
})
consent_params = urlencode(
{
"client_id": client_id,
"client_name": client.client_name,
"redirect_uri": redirect_uri,
"scope": " ".join(valid_scopes),
"state": state,
"code_challenge": code_challenge or "",
"code_challenge_method": code_challenge_method or "",
"nonce": nonce or "",
}
)
return RedirectResponse(
url=f"{settings.FRONTEND_URL}/auth/consent?{consent_params}",
status_code=status.HTTP_302_FOUND,
@@ -277,10 +280,9 @@ async def authorize(
nonce=nonce,
)
except provider_service.OAuthProviderError as e:
error_params = {
"error": e.error,
"error_description": e.error_description,
}
error_params: dict[str, str] = {"error": e.error}
if e.error_description:
error_params["error_description"] = e.error_description
if state:
error_params["state"] = state
return RedirectResponse(
@@ -340,14 +342,14 @@ async def submit_consent(
# If user denied, redirect with error
if not approved:
error_params = {
denied_params: dict[str, str] = {
"error": "access_denied",
"error_description": "User denied authorization",
}
if state:
error_params["state"] = state
denied_params["state"] = state
return RedirectResponse(
url=f"{redirect_uri}?{urlencode(error_params)}",
url=f"{redirect_uri}?{urlencode(denied_params)}",
status_code=status.HTTP_302_FOUND,
)
@@ -356,9 +358,7 @@ async def submit_consent(
valid_scopes = provider_service.validate_scopes(client, granted_scopes)
# Record consent
await provider_service.grant_consent(
db, current_user.id, client_id, valid_scopes
)
await provider_service.grant_consent(db, current_user.id, client_id, valid_scopes)
# Generate authorization code
try:
@@ -374,10 +374,9 @@ async def submit_consent(
nonce=nonce,
)
except provider_service.OAuthProviderError as e:
error_params = {
"error": e.error,
"error_description": e.error_description,
}
error_params: dict[str, str] = {"error": e.error}
if e.error_description:
error_params["error_description"] = e.error_description
if state:
error_params["state"] = state
return RedirectResponse(
@@ -439,6 +438,7 @@ async def token(
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Basic "):
import base64
try:
decoded = base64.b64decode(auth_header[6:]).decode()
client_id, client_secret = decoded.split(":", 1)
@@ -549,6 +549,7 @@ async def revoke(
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Basic "):
import base64
try:
decoded = base64.b64decode(auth_header[6:]).decode()
client_id, client_secret = decoded.split(":", 1)
@@ -619,6 +620,7 @@ async def introspect(
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Basic "):
import base64
try:
decoded = base64.b64decode(auth_header[6:]).decode()
client_id, client_secret = decoded.split(":", 1)
@@ -804,7 +806,9 @@ async def list_my_consents(
"client_id": consent.client_id,
"client_name": client.client_name,
"client_description": client.client_description,
"granted_scopes": consent.granted_scopes.split() if consent.granted_scopes else [],
"granted_scopes": consent.granted_scopes.split()
if consent.granted_scopes
else [],
"granted_at": consent.created_at.isoformat(),
}
for consent, client in rows