Trois couches strictement séparées avec un sens unique de dépendance. Les routers sont des couches HTTP minces — ils valident, appliquent le contrôle de rôle par dépendance FastAPI, et délèguent. Toute la logique réelle vit dans les services, qui orchestrent les modules d'infrastructure.
Cette séparation permet de tester les services indépendamment du protocole HTTP, et de remplacer n'importe quelle brique technique sans toucher aux routers.
Le user store illustre la même rigueur : une classe abstraite UserStore définit le contrat, FirestoreUserStore l'implémente, et une façade permet d'injecter un faux store en test.
def _validate_startup_config() -> None: errors: list[str] = [] if not config.SECRET_KEY_API or len(config.SECRET_KEY_API) < 32: errors.append("SECRET_KEY_API doit contenir au moins 32 caractères.") if not config.GPT_API_KEY: errors.append("GPT_API_KEY est requise.") if errors: for err in errors: logger.critical("Configuration invalide : %s", err) sys.exit(1) # Swagger désactivé en production app = FastAPI( docs_url="/docs" if config.DEBUG_MODE else None, redoc_url="/redoc" if config.DEBUG_MODE else None, on_startup=[verify_redis_connection], )
class UserStore(ABC): @abstractmethod async def authenticate_user(self, username: str, password: str) -> dict | None: ... @abstractmethod async def create_user_with_config(self, *, username: str, ...) -> dict: ... @abstractmethod async def store_refresh_token(self, *, username: str, ...) -> None: ... # Façade publique — injection via set_user_store() dans les tests _USER_STORE: UserStore = FirestoreUserStore.from_env() def set_user_store(store: UserStore) -> None: global _USER_STORE _USER_STORE = store
Avant de générer le moindre token, la requête traverse une chaîne de validations qui échouent vite et clairement. Chaque étape a son propre code d'erreur HTTP.
token + user actif
→ 401
config complète
→ 403
→ 400
user_owns_base
→ 404
→ 404
RAG + OpenAI
token/token
user_config = await get_user_config_from_store(username) if user_config is None: raise HTTPException(403, "Utilisateur non enregistre") index_name = user_config.get("index_name") if not index_name: raise HTTPException(400, "Aucune base vectorielle configuree") if current_user.get("role") == "rh" and not await user_owns_vector_base( int(current_user["id"]), str(index_name)): raise HTTPException(404, "La collection n'existe pas dans Qdrant.") if not await collection_exists(index_name): raise HTTPException(404, "La collection n'existe pas dans Qdrant.")
text/plain au fil de l'eau. L'enregistrement de l'échange se fait dans une tâche de fond, après la fin du stream, pour ne pas retarder l'affichage côté client.- Ingestion & parsing. Chargement du PDF via PyMuPDF, page par page.
- Nettoyage. Normalisation Unicode, suppression des numéros de page et du boilerplate répété sur ≥60% des pages.
- Chunking. Découpage récursif avec des séparateurs adaptés au juridique (
Article,Chapitre), chevauchement contrôlé (200 chars sur 1000). - Déduplication. Hash SHA-1 des chunks normalisés + hash SHA-256 du fichier complet pour refuser un document déjà présent (409).
- Vectorisation. Embeddings OpenAI stockés dans la collection Qdrant du propriétaire.
- Récupération. Recherche de similarité, regroupée par priorité de document puis triée par score décroissant.
def _query_qdrant_sync(store, query, *, top_k=5): results = store.similarity_search_with_score(query, k=top_k) grouped = {} for doc, score in results: prio = doc.metadata.get("priority", float("inf")) grouped.setdefault(prio, []).append((doc, score)) ordered = [] for prio in sorted(grouped): ordered.extend( sorted(grouped[prio], key=lambda t: t[1], reverse=True) ) return ordered
# ligne vue sur >= 60% des pages = bruit répété min_occurrences = max(3, ceil( len(documents) * BOILERPLATE_RATIO_THRESHOLD )) repeated = { line_key for line_key, count in page_level_counter.items() if count >= min_occurrences }
Le prompt système impose au modèle de répondre uniquement à partir du contexte fourni, de citer l'article et la source, et d'admettre l'absence de réponse plutôt que d'inventer.
rag_chain = (
{"context": lambda x: x["context"], "question": lambda x: x["question"],
"history": lambda x: x["history"]}
| prompt | llm | StrOutputParser()
)
async for chunk in rag_chain.astream(chain_input):
yield chunk # → StreamingResponse text/plain
Le multi-tenant ne repose pas sur une convention de nommage mais sur un registre Firestore : chaque base vectorielle est associée à un owner_user_id. Toute opération vérifie cette appartenance avant d'agir.
rh crée, liste et gère ses propres bases et documents. Un agent consomme une base assignée sans pouvoir modifier sa configuration.user_owns_vector_base. La clé Firestore est le SHA-256 du nom de base — stable sur tout le cycle de vie.await create_new_vector_store(collection_name, model_name) try: await register_vector_base(base_name=collection_name, ...) except Exception: logger.exception("Firestore KO — rollback Qdrant") try: await delete_vector_store(collection_name) except Exception: logger.exception("Rollback Qdrant échoué — orphelin possible") raise
verify_password lève ValueError si le hash stocké est un plaintext legacy — migration obligatoire avant déploiement.username, user_id, jti (UUID hex 16 bits, non-rejouable), type (access/refresh). À chaque requête : décodage JWT + vérification user_id contre Firestore. Protège contre les tokens orphelins.(?=.*[a-z])(?=.*[A-Z])(?=.*\d). Message chat : max 4000 chars. Filename DELETE : pattern ^[\w.\- ]+$, max 255. Tout input validé avant la logique métier.user_config_audit avec actor_user_id, target_user_id, old_config, new_config, timestamp UTC.def _create_token(*, username, user_id, token_type, expires_delta): now = datetime.now(timezone.utc) payload = { "username": username, "user_id": int(user_id), "type": token_type, "jti": secrets.token_hex(16), # non-rejouable "iat": int(now.timestamp()), "exp": int((now + expires_delta).timestamp()), } return jwt.encode(payload, _get_secret_key(), algorithm="HS256") def hash_token(token: str) -> str: # SHA-256 du refresh token — jamais le brut en base return hashlib.sha256(token.encode("utf-8")).hexdigest() pwd_context = CryptContext(schemes=["bcrypt_sha256", "bcrypt"], deprecated="auto")
DEBUG_MODE=False : les routes /docs et /redoc disparaissent et les stack traces ne sont jamais renvoyées au client — uniquement journalisées côté serveur.Chaque message est stocké comme un élément JSON dans une Redis List (history:{username}). Fenêtre glissante maintenue atomiquement : rpush + ltrim(-max, -1). Le TTL est rafraîchi à chaque push — il glisse avec l'activité.
Migration automatique depuis l'ancien format (JSON string) vers le nouveau format liste via ensure_history_is_list — rétrocompatibilité sans downtime.
Les derniers documents RAG récupérés sont stockés séparément (retrieved_docs:{username}) avec TTL de 15 minutes, exposés via GET /chat/retrieved_documents.
async def push_user_history( username: str, new_message: dict, max_length: int | None ) -> None: key = f"history:{username}" if max_length is None: max_length = DEFAULT_MAX_HISTORY_LENGTH # 5 await _redis_call(redis_client.rpush, key, json.dumps(new_message)) await _redis_call(redis_client.ltrim, key, -max_length, -1) if HISTORY_TTL_SECONDS: await _redis_call(redis_client.expire, key, HISTORY_TTL_SECONDS)
model.embeddings ada-002
1536 dims
2×H100 self-hosted
API OpenAI compat.
ChatVertexAI
GCP natif
localhost / network
zero cloud
compliance entreprise
région EU possible
# Registre extensible — 1 ligne par provider EMBEDDINGS_REGISTRY: dict[str, Any] = { "openai": OpenAIEmbeddings(openai_api_key=GPT_API_KEY), # "vertex": VertexAIEmbeddings(model="text-embedding-004"), # "ollama": OllamaEmbeddings(model="nomic-embed-text"), } MODEL_DIMENSIONS: dict[ModelName, int] = { "openai": 1536, # "vertex": 768, }
def get_llm(provider: str, model: str) -> BaseChatModel: return { "openai": lambda: ChatOpenAI(model=model, streaming=True), "vertex": lambda: ChatVertexAI(model=model), "ollama": lambda: ChatOllama(model=model), "vllm": lambda: ChatOpenAI(model=model, base_url=VLLM_URL), "azure": lambda: AzureChatOpenAI(deployment_name=model), }[provider]()
| VAR | REQUIS | DÉFAUT |
|---|---|---|
| SECRET_KEY_API | — (min 32 chars) | |
| GPT_API_KEY | — | |
| DEBUG_MODE | non | False |
| RATE_LIMIT_CHAT | non | 30/hour |
| HISTORY_TTL_DAYS | non | 30 |
| MAX_UPLOAD_MB | non | 20 |
| CORS_ALLOWED_ORIGINS | localhost | |
| REDIS_SSL | prod oui | false |
| CHUNK_SIZE | non | 1000 |
| RAG_TOP_K | non | 5 |
| PACKAGE | RÔLE |
|---|---|
| fastapi | Framework API async — routing, validation Pydantic, streaming |
| langchain | Orchestration RAG, LCEL chain, prompt templates multi-tours |
| langchain-openai | ChatOpenAI, OpenAIEmbeddings |
| langchain-qdrant | QdrantVectorStore, similarity_search_with_score |
| qdrant-client | scroll, set_payload, delete, count, create_collection |
| redis | Client async — list ops (rpush, ltrim, lrange), scan, TTL |
| google-cloud-firestore | User store, refresh tokens, audit trail, ownership registry |
| PyMuPDF | Extraction texte PDF page par page (PyMuPDFLoader) |
| passlib[bcrypt] | bcrypt_sha256 — hachage mots de passe sans limite 72 octets |
| PyJWT | Création et décodage JWT HS256 avec claims typés |
| slowapi | Rate limiting par IP basé sur limits |
| uvicorn | Serveur ASGI — production sans --reload |
| aiofiles | Lecture async fichiers — hash SHA-256 PDF |
| langchain-text-splitters | RecursiveCharacterTextSplitter avec séparateurs juridiques |