forked from cardosofelipe/fast-next-template
Add user session tracking, schemas, utilities, and per-device session management
- Introduced `user_sessions` table with support for per-device authentication sessions. - Added `UserSession` model, including fields for device metadata, IP, and session state. - Created schemas (`SessionBase`, `SessionCreate`, `SessionResponse`) to manage session data and responses. - Implemented utilities for extracting and parsing device information from HTTP requests. - Added Alembic migration to define `user_sessions` table with indexes for performance and cleanup.
This commit is contained in:
@@ -0,0 +1,102 @@
|
|||||||
|
"""add_user_sessions_table
|
||||||
|
|
||||||
|
Revision ID: 549b50ea888d
|
||||||
|
Revises: b76c725fc3cf
|
||||||
|
Create Date: 2025-10-31 07:41:18.729544
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = '549b50ea888d'
|
||||||
|
down_revision: Union[str, None] = 'b76c725fc3cf'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
# Create user_sessions table for per-device session management
|
||||||
|
op.create_table(
|
||||||
|
'user_sessions',
|
||||||
|
sa.Column('id', sa.UUID(), nullable=False),
|
||||||
|
sa.Column('user_id', sa.UUID(), nullable=False),
|
||||||
|
sa.Column('refresh_token_jti', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('device_name', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('device_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('ip_address', sa.String(length=45), nullable=True),
|
||||||
|
sa.Column('user_agent', sa.String(length=500), nullable=True),
|
||||||
|
sa.Column('last_used_at', sa.DateTime(timezone=True), nullable=False),
|
||||||
|
sa.Column('expires_at', sa.DateTime(timezone=True), nullable=False),
|
||||||
|
sa.Column('is_active', sa.Boolean(), nullable=False, server_default='true'),
|
||||||
|
sa.Column('location_city', sa.String(length=100), nullable=True),
|
||||||
|
sa.Column('location_country', sa.String(length=100), nullable=True),
|
||||||
|
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
|
||||||
|
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
|
||||||
|
sa.PrimaryKeyConstraint('id')
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create foreign key to users table
|
||||||
|
op.create_foreign_key(
|
||||||
|
'fk_user_sessions_user_id',
|
||||||
|
'user_sessions',
|
||||||
|
'users',
|
||||||
|
['user_id'],
|
||||||
|
['id'],
|
||||||
|
ondelete='CASCADE'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create indexes for performance
|
||||||
|
# 1. Lookup session by refresh token JTI (most common query)
|
||||||
|
op.create_index(
|
||||||
|
'ix_user_sessions_jti',
|
||||||
|
'user_sessions',
|
||||||
|
['refresh_token_jti'],
|
||||||
|
unique=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. Lookup sessions by user ID
|
||||||
|
op.create_index(
|
||||||
|
'ix_user_sessions_user_id',
|
||||||
|
'user_sessions',
|
||||||
|
['user_id']
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. Composite index for active sessions by user
|
||||||
|
op.create_index(
|
||||||
|
'ix_user_sessions_user_active',
|
||||||
|
'user_sessions',
|
||||||
|
['user_id', 'is_active']
|
||||||
|
)
|
||||||
|
|
||||||
|
# 4. Index on expires_at for cleanup job
|
||||||
|
op.create_index(
|
||||||
|
'ix_user_sessions_expires_at',
|
||||||
|
'user_sessions',
|
||||||
|
['expires_at']
|
||||||
|
)
|
||||||
|
|
||||||
|
# 5. Composite index for active session lookup by JTI
|
||||||
|
op.create_index(
|
||||||
|
'ix_user_sessions_jti_active',
|
||||||
|
'user_sessions',
|
||||||
|
['refresh_token_jti', 'is_active']
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
# Drop indexes first
|
||||||
|
op.drop_index('ix_user_sessions_jti_active', table_name='user_sessions')
|
||||||
|
op.drop_index('ix_user_sessions_expires_at', table_name='user_sessions')
|
||||||
|
op.drop_index('ix_user_sessions_user_active', table_name='user_sessions')
|
||||||
|
op.drop_index('ix_user_sessions_user_id', table_name='user_sessions')
|
||||||
|
op.drop_index('ix_user_sessions_jti', table_name='user_sessions')
|
||||||
|
|
||||||
|
# Drop foreign key
|
||||||
|
op.drop_constraint('fk_user_sessions_user_id', 'user_sessions', type_='foreignkey')
|
||||||
|
|
||||||
|
# Drop table
|
||||||
|
op.drop_table('user_sessions')
|
||||||
@@ -6,9 +6,11 @@ Imports all models to ensure they're registered with SQLAlchemy.
|
|||||||
from app.core.database import Base
|
from app.core.database import Base
|
||||||
from .base import TimestampMixin, UUIDMixin
|
from .base import TimestampMixin, UUIDMixin
|
||||||
|
|
||||||
# Import user model
|
# Import models
|
||||||
from .user import User
|
from .user import User
|
||||||
|
from .user_session import UserSession
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'Base', 'TimestampMixin', 'UUIDMixin',
|
'Base', 'TimestampMixin', 'UUIDMixin',
|
||||||
'User',
|
'User', 'UserSession',
|
||||||
]
|
]
|
||||||
80
backend/app/models/user_session.py
Normal file
80
backend/app/models/user_session.py
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
"""
|
||||||
|
User session model for tracking per-device authentication sessions.
|
||||||
|
|
||||||
|
This allows users to:
|
||||||
|
- See where they're logged in
|
||||||
|
- Logout from specific devices
|
||||||
|
- Manage their active sessions
|
||||||
|
"""
|
||||||
|
from sqlalchemy import Column, String, Boolean, DateTime, ForeignKey, Index
|
||||||
|
from sqlalchemy.dialects.postgresql import UUID
|
||||||
|
from sqlalchemy.orm import relationship
|
||||||
|
|
||||||
|
from .base import Base, TimestampMixin, UUIDMixin
|
||||||
|
|
||||||
|
|
||||||
|
class UserSession(Base, UUIDMixin, TimestampMixin):
|
||||||
|
"""
|
||||||
|
Tracks individual user sessions (per-device).
|
||||||
|
|
||||||
|
Each time a user logs in from a device, a new session is created.
|
||||||
|
Sessions are identified by the refresh token JTI (JWT ID).
|
||||||
|
"""
|
||||||
|
__tablename__ = 'user_sessions'
|
||||||
|
|
||||||
|
# Foreign key to user
|
||||||
|
user_id = Column(UUID(as_uuid=True), ForeignKey('users.id', ondelete='CASCADE'), nullable=False, index=True)
|
||||||
|
|
||||||
|
# Refresh token identifier (JWT ID from the refresh token)
|
||||||
|
refresh_token_jti = Column(String(255), unique=True, nullable=False, index=True)
|
||||||
|
|
||||||
|
# Device information
|
||||||
|
device_name = Column(String(255), nullable=True) # "iPhone 14", "Chrome on MacBook"
|
||||||
|
device_id = Column(String(255), nullable=True) # Persistent device identifier (from client)
|
||||||
|
ip_address = Column(String(45), nullable=True) # IPv4 (15 chars) or IPv6 (45 chars)
|
||||||
|
user_agent = Column(String(500), nullable=True) # Browser/app user agent
|
||||||
|
|
||||||
|
# Session timing
|
||||||
|
last_used_at = Column(DateTime(timezone=True), nullable=False)
|
||||||
|
expires_at = Column(DateTime(timezone=True), nullable=False)
|
||||||
|
|
||||||
|
# Session state
|
||||||
|
is_active = Column(Boolean, default=True, nullable=False, index=True)
|
||||||
|
|
||||||
|
# Geographic information (optional, can be populated from IP)
|
||||||
|
location_city = Column(String(100), nullable=True)
|
||||||
|
location_country = Column(String(100), nullable=True)
|
||||||
|
|
||||||
|
# Relationship to user
|
||||||
|
user = relationship("User", backref="sessions")
|
||||||
|
|
||||||
|
# Composite indexes for performance (defined in migration)
|
||||||
|
__table_args__ = (
|
||||||
|
Index('ix_user_sessions_user_active', 'user_id', 'is_active'),
|
||||||
|
Index('ix_user_sessions_jti_active', 'refresh_token_jti', 'is_active'),
|
||||||
|
)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<UserSession {self.device_name} ({self.ip_address})>"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_expired(self) -> bool:
|
||||||
|
"""Check if session has expired."""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
return self.expires_at < datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
"""Convert session to dictionary for serialization."""
|
||||||
|
return {
|
||||||
|
'id': str(self.id),
|
||||||
|
'user_id': str(self.user_id),
|
||||||
|
'device_name': self.device_name,
|
||||||
|
'device_id': self.device_id,
|
||||||
|
'ip_address': self.ip_address,
|
||||||
|
'last_used_at': self.last_used_at.isoformat() if self.last_used_at else None,
|
||||||
|
'expires_at': self.expires_at.isoformat() if self.expires_at else None,
|
||||||
|
'is_active': self.is_active,
|
||||||
|
'location_city': self.location_city,
|
||||||
|
'location_country': self.location_country,
|
||||||
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
||||||
|
}
|
||||||
133
backend/app/schemas/sessions.py
Normal file
133
backend/app/schemas/sessions.py
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
"""
|
||||||
|
Pydantic schemas for user session management.
|
||||||
|
"""
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field, ConfigDict
|
||||||
|
|
||||||
|
|
||||||
|
class SessionBase(BaseModel):
|
||||||
|
"""Base schema for user sessions."""
|
||||||
|
device_name: Optional[str] = Field(None, max_length=255, description="Friendly device name")
|
||||||
|
device_id: Optional[str] = Field(None, max_length=255, description="Persistent device identifier")
|
||||||
|
|
||||||
|
|
||||||
|
class SessionCreate(SessionBase):
|
||||||
|
"""Schema for creating a new session (internal use)."""
|
||||||
|
user_id: UUID
|
||||||
|
refresh_token_jti: str = Field(..., max_length=255)
|
||||||
|
ip_address: Optional[str] = Field(None, max_length=45)
|
||||||
|
user_agent: Optional[str] = Field(None, max_length=500)
|
||||||
|
last_used_at: datetime
|
||||||
|
expires_at: datetime
|
||||||
|
location_city: Optional[str] = Field(None, max_length=100)
|
||||||
|
location_country: Optional[str] = Field(None, max_length=100)
|
||||||
|
|
||||||
|
|
||||||
|
class SessionUpdate(BaseModel):
|
||||||
|
"""Schema for updating a session (internal use)."""
|
||||||
|
last_used_at: Optional[datetime] = None
|
||||||
|
is_active: Optional[bool] = None
|
||||||
|
refresh_token_jti: Optional[str] = None
|
||||||
|
expires_at: Optional[datetime] = None
|
||||||
|
|
||||||
|
|
||||||
|
class SessionResponse(SessionBase):
|
||||||
|
"""
|
||||||
|
Schema for session responses to clients.
|
||||||
|
|
||||||
|
This is what users see when they list their active sessions.
|
||||||
|
"""
|
||||||
|
id: UUID
|
||||||
|
ip_address: Optional[str] = None
|
||||||
|
location_city: Optional[str] = None
|
||||||
|
location_country: Optional[str] = None
|
||||||
|
last_used_at: datetime
|
||||||
|
created_at: datetime
|
||||||
|
expires_at: datetime
|
||||||
|
is_current: bool = Field(default=False, description="Whether this is the current session")
|
||||||
|
|
||||||
|
model_config = ConfigDict(
|
||||||
|
from_attributes=True,
|
||||||
|
json_schema_extra={
|
||||||
|
"example": {
|
||||||
|
"id": "123e4567-e89b-12d3-a456-426614174000",
|
||||||
|
"device_name": "iPhone 14",
|
||||||
|
"device_id": "device-abc-123",
|
||||||
|
"ip_address": "192.168.1.100",
|
||||||
|
"location_city": "San Francisco",
|
||||||
|
"location_country": "United States",
|
||||||
|
"last_used_at": "2025-10-31T12:00:00Z",
|
||||||
|
"created_at": "2025-10-30T09:00:00Z",
|
||||||
|
"expires_at": "2025-11-06T09:00:00Z",
|
||||||
|
"is_current": True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class SessionListResponse(BaseModel):
|
||||||
|
"""Response containing list of sessions."""
|
||||||
|
sessions: list[SessionResponse]
|
||||||
|
total: int = Field(..., description="Total number of active sessions")
|
||||||
|
|
||||||
|
model_config = ConfigDict(
|
||||||
|
json_schema_extra={
|
||||||
|
"example": {
|
||||||
|
"sessions": [
|
||||||
|
{
|
||||||
|
"id": "123e4567-e89b-12d3-a456-426614174000",
|
||||||
|
"device_name": "iPhone 14",
|
||||||
|
"ip_address": "192.168.1.100",
|
||||||
|
"last_used_at": "2025-10-31T12:00:00Z",
|
||||||
|
"created_at": "2025-10-30T09:00:00Z",
|
||||||
|
"expires_at": "2025-11-06T09:00:00Z",
|
||||||
|
"is_current": True
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"total": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class LogoutRequest(BaseModel):
|
||||||
|
"""Request schema for logout endpoint."""
|
||||||
|
refresh_token: str = Field(
|
||||||
|
...,
|
||||||
|
description="Refresh token for the session to logout from",
|
||||||
|
min_length=10
|
||||||
|
)
|
||||||
|
|
||||||
|
model_config = ConfigDict(
|
||||||
|
json_schema_extra={
|
||||||
|
"example": {
|
||||||
|
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class DeviceInfo(BaseModel):
|
||||||
|
"""Device information extracted from request."""
|
||||||
|
device_name: Optional[str] = None
|
||||||
|
device_id: Optional[str] = None
|
||||||
|
ip_address: Optional[str] = None
|
||||||
|
user_agent: Optional[str] = None
|
||||||
|
location_city: Optional[str] = None
|
||||||
|
location_country: Optional[str] = None
|
||||||
|
|
||||||
|
model_config = ConfigDict(
|
||||||
|
json_schema_extra={
|
||||||
|
"example": {
|
||||||
|
"device_name": "Chrome on MacBook",
|
||||||
|
"device_id": "device-xyz-789",
|
||||||
|
"ip_address": "192.168.1.50",
|
||||||
|
"user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)...",
|
||||||
|
"location_city": "San Francisco",
|
||||||
|
"location_country": "United States"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
233
backend/app/utils/device.py
Normal file
233
backend/app/utils/device.py
Normal file
@@ -0,0 +1,233 @@
|
|||||||
|
"""
|
||||||
|
Utility functions for extracting and parsing device information from HTTP requests.
|
||||||
|
"""
|
||||||
|
import re
|
||||||
|
from typing import Optional
|
||||||
|
from fastapi import Request
|
||||||
|
|
||||||
|
from app.schemas.sessions import DeviceInfo
|
||||||
|
|
||||||
|
|
||||||
|
def extract_device_info(request: Request) -> DeviceInfo:
|
||||||
|
"""
|
||||||
|
Extract device information from the HTTP request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: FastAPI Request object
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DeviceInfo object with parsed device information
|
||||||
|
"""
|
||||||
|
user_agent = request.headers.get('user-agent', '')
|
||||||
|
|
||||||
|
device_info = DeviceInfo(
|
||||||
|
device_name=parse_device_name(user_agent),
|
||||||
|
device_id=request.headers.get('x-device-id'), # Client must send this header
|
||||||
|
ip_address=get_client_ip(request),
|
||||||
|
user_agent=user_agent[:500] if user_agent else None, # Truncate to max length
|
||||||
|
location_city=None, # Can be populated via IP geolocation service
|
||||||
|
location_country=None, # Can be populated via IP geolocation service
|
||||||
|
)
|
||||||
|
|
||||||
|
return device_info
|
||||||
|
|
||||||
|
|
||||||
|
def parse_device_name(user_agent: str) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Parse user agent string to extract a friendly device name.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_agent: User-Agent header string
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Friendly device name string or None
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X)" -> "iPhone"
|
||||||
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)" -> "Mac"
|
||||||
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)" -> "Windows PC"
|
||||||
|
"""
|
||||||
|
if not user_agent:
|
||||||
|
return "Unknown device"
|
||||||
|
|
||||||
|
user_agent_lower = user_agent.lower()
|
||||||
|
|
||||||
|
# Mobile devices (check first, as they can contain desktop patterns too)
|
||||||
|
if 'iphone' in user_agent_lower:
|
||||||
|
return "iPhone"
|
||||||
|
elif 'ipad' in user_agent_lower:
|
||||||
|
return "iPad"
|
||||||
|
elif 'android' in user_agent_lower:
|
||||||
|
# Try to extract device model
|
||||||
|
android_match = re.search(r'android.*;\s*([^)]+)\s*build', user_agent_lower)
|
||||||
|
if android_match:
|
||||||
|
device_model = android_match.group(1).strip()
|
||||||
|
return f"Android ({device_model.title()})"
|
||||||
|
return "Android device"
|
||||||
|
elif 'windows phone' in user_agent_lower:
|
||||||
|
return "Windows Phone"
|
||||||
|
|
||||||
|
# Desktop operating systems
|
||||||
|
elif 'macintosh' in user_agent_lower or 'mac os x' in user_agent_lower:
|
||||||
|
# Try to extract browser
|
||||||
|
browser = extract_browser(user_agent)
|
||||||
|
return f"{browser} on Mac" if browser else "Mac"
|
||||||
|
elif 'windows' in user_agent_lower:
|
||||||
|
browser = extract_browser(user_agent)
|
||||||
|
return f"{browser} on Windows" if browser else "Windows PC"
|
||||||
|
elif 'linux' in user_agent_lower and 'android' not in user_agent_lower:
|
||||||
|
browser = extract_browser(user_agent)
|
||||||
|
return f"{browser} on Linux" if browser else "Linux"
|
||||||
|
elif 'cros' in user_agent_lower:
|
||||||
|
return "Chromebook"
|
||||||
|
|
||||||
|
# Tablets (not already caught)
|
||||||
|
elif 'tablet' in user_agent_lower:
|
||||||
|
return "Tablet"
|
||||||
|
|
||||||
|
# Smart TVs
|
||||||
|
elif any(tv in user_agent_lower for tv in ['smart-tv', 'smarttv', 'tv']):
|
||||||
|
return "Smart TV"
|
||||||
|
|
||||||
|
# Game consoles
|
||||||
|
elif 'playstation' in user_agent_lower:
|
||||||
|
return "PlayStation"
|
||||||
|
elif 'xbox' in user_agent_lower:
|
||||||
|
return "Xbox"
|
||||||
|
elif 'nintendo' in user_agent_lower:
|
||||||
|
return "Nintendo"
|
||||||
|
|
||||||
|
# Fallback: just return browser name if detected
|
||||||
|
browser = extract_browser(user_agent)
|
||||||
|
if browser:
|
||||||
|
return browser
|
||||||
|
|
||||||
|
return "Unknown device"
|
||||||
|
|
||||||
|
|
||||||
|
def extract_browser(user_agent: str) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Extract browser name from user agent string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_agent: User-Agent header string
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Browser name or None
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
"Mozilla/5.0 ... Chrome/96.0" -> "Chrome"
|
||||||
|
"Mozilla/5.0 ... Firefox/94.0" -> "Firefox"
|
||||||
|
"""
|
||||||
|
if not user_agent:
|
||||||
|
return None
|
||||||
|
|
||||||
|
user_agent_lower = user_agent.lower()
|
||||||
|
|
||||||
|
# Check specific browsers (order matters - check Edge before Chrome!)
|
||||||
|
if 'edg/' in user_agent_lower or 'edge/' in user_agent_lower:
|
||||||
|
return "Edge"
|
||||||
|
elif 'opr/' in user_agent_lower or 'opera' in user_agent_lower:
|
||||||
|
return "Opera"
|
||||||
|
elif 'chrome/' in user_agent_lower:
|
||||||
|
return "Chrome"
|
||||||
|
elif 'safari/' in user_agent_lower:
|
||||||
|
# Make sure it's actually Safari, not Chrome (which also contains "Safari")
|
||||||
|
if 'chrome' not in user_agent_lower:
|
||||||
|
return "Safari"
|
||||||
|
return None
|
||||||
|
elif 'firefox/' in user_agent_lower:
|
||||||
|
return "Firefox"
|
||||||
|
elif 'msie' in user_agent_lower or 'trident/' in user_agent_lower:
|
||||||
|
return "Internet Explorer"
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_client_ip(request: Request) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Extract client IP address from request, considering proxy headers.
|
||||||
|
|
||||||
|
Checks X-Forwarded-For and X-Real-IP headers for proxy scenarios.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: FastAPI Request object
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Client IP address string or None
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
- In production behind a proxy/load balancer, X-Forwarded-For is often set
|
||||||
|
- The first IP in X-Forwarded-For is typically the real client IP
|
||||||
|
- request.client.host is fallback for direct connections
|
||||||
|
"""
|
||||||
|
# Check X-Forwarded-For (common in proxied environments)
|
||||||
|
x_forwarded_for = request.headers.get('x-forwarded-for')
|
||||||
|
if x_forwarded_for:
|
||||||
|
# Get the first IP (original client)
|
||||||
|
client_ip = x_forwarded_for.split(',')[0].strip()
|
||||||
|
return client_ip
|
||||||
|
|
||||||
|
# Check X-Real-IP (used by some proxies like nginx)
|
||||||
|
x_real_ip = request.headers.get('x-real-ip')
|
||||||
|
if x_real_ip:
|
||||||
|
return x_real_ip.strip()
|
||||||
|
|
||||||
|
# Fallback to direct connection IP
|
||||||
|
if request.client and request.client.host:
|
||||||
|
return request.client.host
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def is_mobile_device(user_agent: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if the device is a mobile device based on user agent.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_agent: User-Agent header string
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if mobile device, False otherwise
|
||||||
|
"""
|
||||||
|
if not user_agent:
|
||||||
|
return False
|
||||||
|
|
||||||
|
mobile_patterns = [
|
||||||
|
'mobile', 'android', 'iphone', 'ipad', 'ipod',
|
||||||
|
'blackberry', 'windows phone', 'webos', 'opera mini',
|
||||||
|
'iemobile', 'mobile safari'
|
||||||
|
]
|
||||||
|
|
||||||
|
user_agent_lower = user_agent.lower()
|
||||||
|
return any(pattern in user_agent_lower for pattern in mobile_patterns)
|
||||||
|
|
||||||
|
|
||||||
|
def get_device_type(user_agent: str) -> str:
|
||||||
|
"""
|
||||||
|
Determine the general device type.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_agent: User-Agent header string
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Device type: "mobile", "tablet", "desktop", or "other"
|
||||||
|
"""
|
||||||
|
if not user_agent:
|
||||||
|
return "other"
|
||||||
|
|
||||||
|
user_agent_lower = user_agent.lower()
|
||||||
|
|
||||||
|
# Check for tablets first (they can contain "mobile" too)
|
||||||
|
if 'ipad' in user_agent_lower or 'tablet' in user_agent_lower:
|
||||||
|
return "tablet"
|
||||||
|
|
||||||
|
# Check for mobile
|
||||||
|
if is_mobile_device(user_agent):
|
||||||
|
return "mobile"
|
||||||
|
|
||||||
|
# Check for desktop OS patterns
|
||||||
|
if any(os in user_agent_lower for os in ['windows', 'macintosh', 'linux', 'cros']):
|
||||||
|
return "desktop"
|
||||||
|
|
||||||
|
return "other"
|
||||||
Reference in New Issue
Block a user