forked from cardosofelipe/fast-next-template
- Enforced stricter PKCE requirements by rejecting insecure 'plain' method for public clients. - Transitioned client secret hashing to bcrypt for improved security and migration compatibility. - Added constant-time comparison for state parameter validation to prevent timing attacks. - Improved error handling and logging for OAuth workflows, including malformed headers and invalid scopes. - Upgraded Google OIDC token validation to verify both signature and nonce. - Refactored OAuth service methods and schemas for better readability, consistency, and compliance with RFC specifications.
396 lines
14 KiB
Python
396 lines
14 KiB
Python
"""
|
|
Pydantic schemas for OAuth authentication.
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from uuid import UUID
|
|
|
|
from pydantic import BaseModel, ConfigDict, Field
|
|
|
|
# ============================================================================
|
|
# OAuth Provider Info (for frontend to display available providers)
|
|
# ============================================================================
|
|
|
|
|
|
class OAuthProviderInfo(BaseModel):
|
|
"""Information about an available OAuth provider."""
|
|
|
|
provider: str = Field(..., description="Provider identifier (google, github)")
|
|
name: str = Field(..., description="Human-readable provider name")
|
|
icon: str | None = Field(None, description="Icon identifier for frontend")
|
|
|
|
|
|
class OAuthProvidersResponse(BaseModel):
|
|
"""Response containing list of enabled OAuth providers."""
|
|
|
|
enabled: bool = Field(..., description="Whether OAuth is globally enabled")
|
|
providers: list[OAuthProviderInfo] = Field(
|
|
default_factory=list, description="List of enabled providers"
|
|
)
|
|
|
|
model_config = ConfigDict(
|
|
json_schema_extra={
|
|
"example": {
|
|
"enabled": True,
|
|
"providers": [
|
|
{"provider": "google", "name": "Google", "icon": "google"},
|
|
{"provider": "github", "name": "GitHub", "icon": "github"},
|
|
],
|
|
}
|
|
}
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# OAuth Account (linked provider accounts)
|
|
# ============================================================================
|
|
|
|
|
|
class OAuthAccountBase(BaseModel):
|
|
"""Base schema for OAuth accounts."""
|
|
|
|
provider: str = Field(..., max_length=50, description="OAuth provider name")
|
|
provider_email: str | None = Field(
|
|
None, max_length=255, description="Email from OAuth provider"
|
|
)
|
|
|
|
|
|
class OAuthAccountCreate(OAuthAccountBase):
|
|
"""Schema for creating an OAuth account link (internal use)."""
|
|
|
|
user_id: UUID
|
|
provider_user_id: str = Field(..., max_length=255)
|
|
access_token_encrypted: str | None = None
|
|
refresh_token_encrypted: str | None = None
|
|
token_expires_at: datetime | None = None
|
|
|
|
|
|
class OAuthAccountResponse(OAuthAccountBase):
|
|
"""Schema for OAuth account response to clients."""
|
|
|
|
id: UUID
|
|
created_at: datetime
|
|
|
|
model_config = ConfigDict(
|
|
from_attributes=True,
|
|
json_schema_extra={
|
|
"example": {
|
|
"id": "123e4567-e89b-12d3-a456-426614174000",
|
|
"provider": "google",
|
|
"provider_email": "user@gmail.com",
|
|
"created_at": "2025-11-24T12:00:00Z",
|
|
}
|
|
},
|
|
)
|
|
|
|
|
|
class OAuthAccountsListResponse(BaseModel):
|
|
"""Response containing list of linked OAuth accounts."""
|
|
|
|
accounts: list[OAuthAccountResponse]
|
|
|
|
model_config = ConfigDict(
|
|
json_schema_extra={
|
|
"example": {
|
|
"accounts": [
|
|
{
|
|
"id": "123e4567-e89b-12d3-a456-426614174000",
|
|
"provider": "google",
|
|
"provider_email": "user@gmail.com",
|
|
"created_at": "2025-11-24T12:00:00Z",
|
|
}
|
|
]
|
|
}
|
|
}
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# OAuth Flow (authorization, callback, etc.)
|
|
# ============================================================================
|
|
|
|
|
|
class OAuthAuthorizeRequest(BaseModel):
|
|
"""Request parameters for OAuth authorization."""
|
|
|
|
provider: str = Field(..., description="OAuth provider (google, github)")
|
|
redirect_uri: str | None = Field(
|
|
None, description="Frontend callback URL after OAuth"
|
|
)
|
|
mode: str = Field(
|
|
default="login",
|
|
description="OAuth mode: login, register, or link",
|
|
pattern="^(login|register|link)$",
|
|
)
|
|
|
|
|
|
class OAuthCallbackRequest(BaseModel):
|
|
"""Request parameters for OAuth callback."""
|
|
|
|
code: str = Field(..., description="Authorization code from provider")
|
|
state: str = Field(..., description="State parameter for CSRF protection")
|
|
|
|
|
|
class OAuthCallbackResponse(BaseModel):
|
|
"""Response after successful OAuth authentication."""
|
|
|
|
access_token: str = Field(..., description="JWT access token")
|
|
refresh_token: str = Field(..., description="JWT refresh token")
|
|
token_type: str = Field(default="bearer")
|
|
expires_in: int = Field(..., description="Token expiration in seconds")
|
|
is_new_user: bool = Field(
|
|
default=False, description="Whether a new user was created"
|
|
)
|
|
|
|
model_config = ConfigDict(
|
|
json_schema_extra={
|
|
"example": {
|
|
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
|
|
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
|
|
"token_type": "bearer",
|
|
"expires_in": 900,
|
|
"is_new_user": False,
|
|
}
|
|
}
|
|
)
|
|
|
|
|
|
class OAuthUnlinkResponse(BaseModel):
|
|
"""Response after unlinking an OAuth account."""
|
|
|
|
success: bool = Field(..., description="Whether the unlink was successful")
|
|
message: str = Field(..., description="Status message")
|
|
|
|
model_config = ConfigDict(
|
|
json_schema_extra={
|
|
"example": {"success": True, "message": "Google account unlinked"}
|
|
}
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# OAuth State (CSRF protection - internal use)
|
|
# ============================================================================
|
|
|
|
|
|
class OAuthStateCreate(BaseModel):
|
|
"""Schema for creating OAuth state (internal use)."""
|
|
|
|
state: str = Field(..., max_length=255)
|
|
code_verifier: str | None = Field(None, max_length=128)
|
|
nonce: str | None = Field(None, max_length=255)
|
|
provider: str = Field(..., max_length=50)
|
|
redirect_uri: str | None = Field(None, max_length=500)
|
|
user_id: UUID | None = None
|
|
expires_at: datetime
|
|
|
|
|
|
# ============================================================================
|
|
# OAuth Client (Provider Mode - MCP clients)
|
|
# ============================================================================
|
|
|
|
|
|
class OAuthClientBase(BaseModel):
|
|
"""Base schema for OAuth clients."""
|
|
|
|
client_name: str = Field(..., max_length=255, description="Client application name")
|
|
client_description: str | None = Field(
|
|
None, max_length=1000, description="Client description"
|
|
)
|
|
redirect_uris: list[str] = Field(
|
|
default_factory=list, description="Allowed redirect URIs"
|
|
)
|
|
allowed_scopes: list[str] = Field(
|
|
default_factory=list, description="Allowed OAuth scopes"
|
|
)
|
|
|
|
|
|
class OAuthClientCreate(OAuthClientBase):
|
|
"""Schema for creating an OAuth client."""
|
|
|
|
client_type: str = Field(
|
|
default="public",
|
|
description="Client type: public or confidential",
|
|
pattern="^(public|confidential)$",
|
|
)
|
|
|
|
|
|
class OAuthClientResponse(OAuthClientBase):
|
|
"""Schema for OAuth client response."""
|
|
|
|
id: UUID
|
|
client_id: str = Field(..., description="OAuth client ID")
|
|
client_type: str
|
|
is_active: bool
|
|
created_at: datetime
|
|
|
|
model_config = ConfigDict(
|
|
from_attributes=True,
|
|
json_schema_extra={
|
|
"example": {
|
|
"id": "123e4567-e89b-12d3-a456-426614174000",
|
|
"client_id": "abc123def456",
|
|
"client_name": "My MCP App",
|
|
"client_description": "My application that uses MCP",
|
|
"client_type": "public",
|
|
"redirect_uris": ["http://localhost:3000/callback"],
|
|
"allowed_scopes": ["read:users", "write:users"],
|
|
"is_active": True,
|
|
"created_at": "2025-11-24T12:00:00Z",
|
|
}
|
|
},
|
|
)
|
|
|
|
|
|
class OAuthClientWithSecret(OAuthClientResponse):
|
|
"""Schema for OAuth client response including secret (only shown once)."""
|
|
|
|
client_secret: str | None = Field(
|
|
None, description="Client secret (only shown once for confidential clients)"
|
|
)
|
|
|
|
model_config = ConfigDict(
|
|
from_attributes=True,
|
|
json_schema_extra={
|
|
"example": {
|
|
"id": "123e4567-e89b-12d3-a456-426614174000",
|
|
"client_id": "abc123def456",
|
|
"client_secret": "secret_xyz789",
|
|
"client_name": "My MCP App",
|
|
"client_type": "confidential",
|
|
"redirect_uris": ["http://localhost:3000/callback"],
|
|
"allowed_scopes": ["read:users"],
|
|
"is_active": True,
|
|
"created_at": "2025-11-24T12:00:00Z",
|
|
}
|
|
},
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# OAuth Provider Discovery (RFC 8414 - skeleton)
|
|
# ============================================================================
|
|
|
|
|
|
class OAuthServerMetadata(BaseModel):
|
|
"""OAuth 2.0 Authorization Server Metadata (RFC 8414)."""
|
|
|
|
issuer: str = Field(..., description="Authorization server issuer URL")
|
|
authorization_endpoint: str = Field(..., description="Authorization endpoint URL")
|
|
token_endpoint: str = Field(..., description="Token endpoint URL")
|
|
registration_endpoint: str | None = Field(
|
|
None, description="Dynamic client registration endpoint"
|
|
)
|
|
revocation_endpoint: str | None = Field(
|
|
None, description="Token revocation endpoint"
|
|
)
|
|
introspection_endpoint: str | None = Field(
|
|
None, description="Token introspection endpoint (RFC 7662)"
|
|
)
|
|
scopes_supported: list[str] = Field(
|
|
default_factory=list, description="Supported scopes"
|
|
)
|
|
response_types_supported: list[str] = Field(
|
|
default_factory=lambda: ["code"], description="Supported response types"
|
|
)
|
|
grant_types_supported: list[str] = Field(
|
|
default_factory=lambda: ["authorization_code", "refresh_token"],
|
|
description="Supported grant types",
|
|
)
|
|
code_challenge_methods_supported: list[str] = Field(
|
|
default_factory=lambda: ["S256"], description="Supported PKCE methods"
|
|
)
|
|
token_endpoint_auth_methods_supported: list[str] = Field(
|
|
default_factory=lambda: ["client_secret_basic", "client_secret_post", "none"],
|
|
description="Supported client authentication methods",
|
|
)
|
|
|
|
model_config = ConfigDict(
|
|
json_schema_extra={
|
|
"example": {
|
|
"issuer": "https://api.example.com",
|
|
"authorization_endpoint": "https://api.example.com/oauth/authorize",
|
|
"token_endpoint": "https://api.example.com/oauth/token",
|
|
"revocation_endpoint": "https://api.example.com/oauth/revoke",
|
|
"introspection_endpoint": "https://api.example.com/oauth/introspect",
|
|
"scopes_supported": ["openid", "profile", "email", "read:users"],
|
|
"response_types_supported": ["code"],
|
|
"grant_types_supported": ["authorization_code", "refresh_token"],
|
|
"code_challenge_methods_supported": ["S256"],
|
|
"token_endpoint_auth_methods_supported": [
|
|
"client_secret_basic",
|
|
"client_secret_post",
|
|
"none",
|
|
],
|
|
}
|
|
}
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# OAuth Token Responses (RFC 6749)
|
|
# ============================================================================
|
|
|
|
|
|
class OAuthTokenResponse(BaseModel):
|
|
"""OAuth 2.0 Token Response (RFC 6749 Section 5.1)."""
|
|
|
|
access_token: str = Field(..., description="The access token issued by the server")
|
|
token_type: str = Field(
|
|
default="Bearer", description="The type of token (typically 'Bearer')"
|
|
)
|
|
expires_in: int | None = Field(None, description="Token lifetime in seconds")
|
|
refresh_token: str | None = Field(
|
|
None, description="Refresh token for obtaining new access tokens"
|
|
)
|
|
scope: str | None = Field(
|
|
None, description="Space-separated list of granted scopes"
|
|
)
|
|
|
|
model_config = ConfigDict(
|
|
json_schema_extra={
|
|
"example": {
|
|
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
|
|
"token_type": "Bearer",
|
|
"expires_in": 3600,
|
|
"refresh_token": "dGhpcyBpcyBhIHJlZnJlc2ggdG9rZW4...",
|
|
"scope": "openid profile email",
|
|
}
|
|
}
|
|
)
|
|
|
|
|
|
class OAuthTokenIntrospectionResponse(BaseModel):
|
|
"""OAuth 2.0 Token Introspection Response (RFC 7662)."""
|
|
|
|
active: bool = Field(..., description="Whether the token is currently active")
|
|
scope: str | None = Field(None, description="Space-separated list of scopes")
|
|
client_id: str | None = Field(None, description="Client identifier for the token")
|
|
username: str | None = Field(
|
|
None, description="Human-readable identifier for the resource owner"
|
|
)
|
|
token_type: str | None = Field(
|
|
None, description="Type of the token (e.g., 'Bearer')"
|
|
)
|
|
exp: int | None = Field(None, description="Token expiration time (Unix timestamp)")
|
|
iat: int | None = Field(None, description="Token issue time (Unix timestamp)")
|
|
nbf: int | None = Field(None, description="Token not-before time (Unix timestamp)")
|
|
sub: str | None = Field(None, description="Subject of the token (user ID)")
|
|
aud: str | None = Field(None, description="Intended audience of the token")
|
|
iss: str | None = Field(None, description="Issuer of the token")
|
|
|
|
model_config = ConfigDict(
|
|
json_schema_extra={
|
|
"example": {
|
|
"active": True,
|
|
"scope": "openid profile",
|
|
"client_id": "client123",
|
|
"username": "user@example.com",
|
|
"token_type": "Bearer",
|
|
"exp": 1735689600,
|
|
"iat": 1735686000,
|
|
"sub": "user-uuid-here",
|
|
}
|
|
}
|
|
)
|