# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0
"""Authentication mixin for AsgiServer.
Provides server.authenticate(scope) as the single extension point for auth.
Supports bearer, basic, JWT backends with O(1) lookup at request time.
Usage:
class MyServer(BasicAuthMixin, RoutingClass):
def __init__(self):
BasicAuthMixin.__init__(self, bearer={...}, basic={...})
...
# In middleware or dispatcher:
auth = server.authenticate(scope)
"""
from __future__ import annotations
import base64
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from ..exceptions import HTTPException
from ..utils import split_and_strip
try:
import jwt
HAS_JWT = True
except ImportError:
jwt = None # type: ignore[assignment]
HAS_JWT = False
if TYPE_CHECKING:
from ..types import Scope
__all__ = ["BasicAuthMixin"]
[docs]
class BasicAuthMixin:
"""Auth mixin: bearer, basic, JWT with O(1) credential lookup.
Attributes:
_auth_config: Dict mapping auth type to credentials dict.
"""
__slots__ = ()
[docs]
def __init__(self, **entries: Any) -> None:
"""Initialize auth config from keyword arguments.
Args:
**entries: Auth configuration by type (bearer, basic, jwt).
"""
self._auth_config: defaultdict[str, dict[str, Any]] = defaultdict(dict) # type: ignore[misc]
for auth_type, credentials in entries.items():
method = getattr(self, f"_configure_{auth_type}", self._configure_default)
method(credentials=credentials)
def _configure_bearer(self, *, credentials: dict[str, Any]) -> None:
"""Configure bearer token authentication.
Args:
credentials: Dict of {name: {token: "...", tags: "..."}}.
Raises:
ValueError: If a token entry is missing 'token' value.
"""
for cred_name, config in credentials.items():
token_value = config.get("token")
if not token_value:
raise ValueError(f"Bearer token '{cred_name}' missing 'token' value")
self._auth_config["bearer"][token_value] = {
"tags": split_and_strip(config.get("tags", [])),
"identity": cred_name,
}
def _configure_basic(self, *, credentials: dict[str, Any]) -> None:
"""Configure HTTP Basic authentication.
Args:
credentials: Dict of {username: {password: "...", tags: "..."}}.
Raises:
ValueError: If a user entry is missing 'password' value.
"""
for username, config in credentials.items():
password = config.get("password")
if not password:
raise ValueError(f"Basic auth user '{username}' missing 'password'")
b64_key = base64.b64encode(f"{username}:{password}".encode()).decode()
self._auth_config["basic"][b64_key] = {
"tags": split_and_strip(config.get("tags", [])),
"identity": username,
}
def _configure_jwt(self, *, credentials: dict[str, Any]) -> None:
"""Configure JWT token verification.
Args:
credentials: Dict of {name: {secret: "...", algorithm: "...", tags: "..."}}.
Raises:
ImportError: If pyjwt is not installed.
"""
if not HAS_JWT:
raise ImportError("JWT config requires pyjwt. Install: pip install pyjwt")
for config_name, config in credentials.items():
self._auth_config["jwt"][config_name] = {
"secret": config.get("secret"),
"public_key": config.get("public_key"),
"algorithm": config.get("algorithm", "HS256"),
"default_tags": split_and_strip(config.get("tags", [])),
"default_exp": config.get("exp", 3600),
}
def _configure_default(self, *, credentials: dict[str, Any]) -> None:
"""Fallback for unknown auth types. Silently ignored."""
pass
def _get_auth(self, scope: Scope) -> tuple[str | None, str | None]:
"""Extract auth type and credentials from Authorization header.
Args:
scope: ASGI scope with _headers dict.
Returns:
Tuple of (auth_type, credentials) or (None, None) if no header.
"""
auth_header = scope["_headers"].get("authorization")
if auth_header and " " in auth_header:
auth_type, credentials = auth_header.split(" ", 1)
return auth_type.lower(), credentials
return None, None
[docs]
def authenticate(self, scope: Scope) -> dict[str, Any] | None:
"""Authenticate request using configured backends.
Args:
scope: ASGI scope with _headers dict (populated by @headers_dict).
Returns:
Auth dict with tags/identity/backend if valid, None if no header.
Raises:
HTTPException: 401 if credentials present but invalid.
"""
auth_type, credentials = self._get_auth(scope)
if not auth_type or not credentials:
return None
method = getattr(self, f"_auth_{auth_type}", self._auth_default)
result = method(auth_type=auth_type, credentials=credentials)
if result is None:
raise HTTPException(
401,
detail="Invalid or expired credentials",
headers={"WWW-Authenticate": f"{auth_type.title()} realm=\"api\""},
)
return result
def _auth_bearer(self, *, credentials: str, **kw: Any) -> dict[str, Any] | None:
"""Authenticate bearer token. Falls back to JWT if not found.
Args:
credentials: Token string from Authorization header.
**kw: Additional args from dynamic dispatch (unused).
Returns:
Auth dict if valid, None if not found.
"""
entry = self._auth_config.get("bearer", {}).get(credentials)
if entry:
return {"tags": entry["tags"], "identity": entry["identity"], "backend": "bearer"}
return self._auth_jwt(credentials=credentials)
def _auth_basic(self, *, credentials: str, **kw: Any) -> dict[str, Any] | None:
"""Authenticate HTTP Basic credentials.
Args:
credentials: Base64-encoded "username:password" from header.
**kw: Additional args from dynamic dispatch (unused).
Returns:
Auth dict if valid, None if credentials not found.
"""
entry = self._auth_config.get("basic", {}).get(credentials)
if entry:
return {"tags": entry["tags"], "identity": entry["identity"], "backend": "basic"}
return None
def _auth_jwt(self, *, credentials: str, **kw: Any) -> dict[str, Any] | None:
"""Authenticate JWT token by trying all configured verifiers.
Args:
credentials: JWT token string.
**kw: Additional args from dynamic dispatch (unused).
Returns:
Auth dict if valid with any verifier, None otherwise.
"""
for name, jwt_config in self._auth_config.get("jwt", {}).items():
result = self._verify_jwt(credentials, jwt_config)
if result:
result["backend"] = f"jwt:{name}"
return result
return None
def _auth_default(self, *, auth_type: str, **kw: Any) -> dict[str, Any] | None:
"""Fallback for unknown authentication types.
Args:
auth_type: The unrecognized auth type from header.
**kw: Additional args from dynamic dispatch (unused).
Returns:
Always None — unknown types are rejected.
"""
return None
def _verify_jwt(self, credentials: str, jwt_config: dict[str, Any]) -> dict[str, Any] | None:
"""Verify JWT token and extract identity/tags from payload.
Args:
credentials: Raw JWT string (without "Bearer " prefix).
jwt_config: JWT verifier config (secret/public_key, algorithm).
Returns:
Auth dict ``{"identity": ..., "tags": ...}`` on success, None on failure.
"""
if not HAS_JWT or jwt is None:
return None
secret: str | bytes | None = jwt_config.get("secret") or jwt_config.get("public_key")
if not secret:
return None
algorithm: str = jwt_config.get("algorithm", "HS256")
try:
payload = jwt.decode(credentials, secret, algorithms=[algorithm])
return {
"identity": payload.get("sub"),
"tags": payload.get("tags", []),
}
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
[docs]
def verify_credentials(self, username: str, password: str) -> dict[str, Any] | None:
"""Verify username/password against basic auth config.
Args:
username: The username to verify.
password: The password to verify.
Returns:
Dict with tags and identity if valid, None otherwise.
"""
b64_key = base64.b64encode(f"{username}:{password}".encode()).decode()
entry = self._auth_config.get("basic", {}).get(b64_key)
if entry:
return {"tags": entry["tags"], "identity": username}
return None
if __name__ == "__main__":
pass