Three strictly separated layers with a single dependency direction. Routers are thin HTTP layers — they validate, apply role-based access control via FastAPI dependencies, and delegate. All real logic lives in services, which orchestrate infrastructure modules.
This separation allows testing services independently of the HTTP protocol, and replacing any technical component without touching the routers.
The user store follows the same principle: an abstract UserStore class defines the contract, FirestoreUserStore implements it, and a facade allows injecting a mock store in tests.
def _validate_startup_config() -> None: errors: list[str] = [] if not config.SECRET_KEY_API or len(config.SECRET_KEY_API) < 32: errors.append("SECRET_KEY_API must be at least 32 characters.") if not config.GPT_API_KEY: errors.append("GPT_API_KEY is required.") if errors: for err in errors: logger.critical("Configuration invalide : %s", err) sys.exit(1) # Swagger disabled in production app = FastAPI( docs_url="/docs" if config.DEBUG_MODE else None, redoc_url="/redoc" if config.DEBUG_MODE else None, on_startup=[verify_redis_connection], )
class UserStore(ABC): @abstractmethod async def authenticate_user(self, username: str, password: str) -> dict | None: ... @abstractmethod async def create_user_with_config(self, *, username: str, ...) -> dict: ... @abstractmethod async def store_refresh_token(self, *, username: str, ...) -> None: ... # Public facade — inject via set_user_store() in tests _USER_STORE: UserStore = FirestoreUserStore.from_env() def set_user_store(store: UserStore) -> None: global _USER_STORE _USER_STORE = store
Before generating a single token, the request goes through a chain of validations that fail fast and clearly. Each step has its own HTTP error code.
token + active user
→ 401
full config
→ 403
→ 400
user_owns_base
→ 404
→ 404
RAG + OpenAI
token/token
user_config = await get_user_config_from_store(username) if user_config is None: raise HTTPException(403, "User not registered") index_name = user_config.get("index_name") if not index_name: raise HTTPException(400, "No vector base configured") if current_user.get("role") == "rh" and not await user_owns_vector_base( int(current_user["id"]), str(index_name)): raise HTTPException(404, "Collection does not exist in Qdrant.") if not await collection_exists(index_name): raise HTTPException(404, "Collection does not exist in Qdrant.")
text/plain in real time. The exchange is saved in a background task, after the stream ends, to avoid delaying the client display.- Ingestion & parsing. PDF loading via PyMuPDF, page by page.
- Cleaning. Unicode normalization, removal of page numbers and boilerplate repeated on ≥60% of pages.
- Chunking. Recursive splitting with legal-specific separators (
Article,Chapter), controlled overlap (200 chars over 1000). - Deduplication. SHA-1 hash of normalized chunks + SHA-256 hash of the full file to reject already-indexed documents (409).
- Vectorization. OpenAI embeddings stored in the owner's Qdrant collection.
- Retrieval. Similarity search, grouped by document priority then sorted by descending score.
def _query_qdrant_sync(store, query, *, top_k=5): results = store.similarity_search_with_score(query, k=top_k) grouped = {} for doc, score in results: prio = doc.metadata.get("priority", float("inf")) grouped.setdefault(prio, []).append((doc, score)) ordered = [] for prio in sorted(grouped): ordered.extend( sorted(grouped[prio], key=lambda t: t[1], reverse=True) ) return ordered
# line seen on >= 60% of pages = repeated noise min_occurrences = max(3, ceil( len(documents) * BOILERPLATE_RATIO_THRESHOLD )) repeated = { line_key for line_key, count in page_level_counter.items() if count >= min_occurrences }
The system prompt instructs the model to answer only from the provided context, cite the article and source, and admit when an answer is not available rather than hallucinating.
rag_chain = (
{"context": lambda x: x["context"], "question": lambda x: x["question"],
"history": lambda x: x["history"]}
| prompt | llm | StrOutputParser()
)
async for chunk in rag_chain.astream(chain_input):
yield chunk # → StreamingResponse text/plain
Multi-tenancy does not rely on naming conventions but on a Firestore registry: each vector base is linked to an owner_user_id. Every operation verifies ownership before acting.
rh user creates, lists and manages their own bases and documents. An agent consumes an assigned base without being able to modify its configuration.user_owns_vector_base. The Firestore key is the SHA-256 of the base name — stable throughout the lifecycle.await create_new_vector_store(collection_name, model_name) try: await register_vector_base(base_name=collection_name, ...) except Exception: logger.exception("Firestore KO — rollback Qdrant") try: await delete_vector_store(collection_name) except Exception: logger.exception("Rollback Qdrant échoué — orphelin possible") raise
verify_password raises ValueError if the stored hash is legacy plaintext — migration required before deployment.username, user_id, jti (16-bit hex UUID, non-replayable), type (access/refresh). Each request: JWT decode + user_id check against Firestore. Protects against orphaned tokens.(?=.*[a-z])(?=.*[A-Z])(?=.*\d). Chat message: max 4000 chars. DELETE filename: pattern ^[\w.\- ]+$, max 255. All input validated before business logic.user_config_audit with actor_user_id, target_user_id, old_config, new_config, UTC timestamp.def _create_token(*, username, user_id, token_type, expires_delta): now = datetime.now(timezone.utc) payload = { "username": username, "user_id": int(user_id), "type": token_type, "jti": secrets.token_hex(16), # non-replayable "iat": int(now.timestamp()), "exp": int((now + expires_delta).timestamp()), } return jwt.encode(payload, _get_secret_key(), algorithm="HS256") def hash_token(token: str) -> str: # SHA-256 of refresh token — never store raw return hashlib.sha256(token.encode("utf-8")).hexdigest() pwd_context = CryptContext(schemes=["bcrypt_sha256", "bcrypt"], deprecated="auto")
DEBUG_MODE=False: /docs and /redoc routes disappear and stack traces are never returned to the client — only logged server-side.Each message is stored as a JSON element in a Redis List (history:{username}). Sliding window maintained atomically: rpush + ltrim(-max, -1). TTL is refreshed on each push — it slides with activity.
Automatic migration from the old format (JSON string) to the new list format via ensure_history_is_list — backward compatibility without downtime.
The last retrieved RAG documents are stored separately (retrieved_docs:{username}) with a 15-minute TTL, exposed via GET /chat/retrieved_documents.
async def push_user_history( username: str, new_message: dict, max_length: int | None ) -> None: key = f"history:{username}" if max_length is None: max_length = DEFAULT_MAX_HISTORY_LENGTH # 5 await _redis_call(redis_client.rpush, key, json.dumps(new_message)) await _redis_call(redis_client.ltrim, key, -max_length, -1) if HISTORY_TTL_SECONDS: await _redis_call(redis_client.expire, key, HISTORY_TTL_SECONDS)
model field.embeddings ada-002
1536 dims
2×H100 self-hosted
API OpenAI compat.
ChatVertexAI
GCP native
localhost / network
zero cloud
enterprise compliance
EU region available
# Extensible registry — 1 line per provider EMBEDDINGS_REGISTRY: dict[str, Any] = { "openai": OpenAIEmbeddings(openai_api_key=GPT_API_KEY), # "vertex": VertexAIEmbeddings(model="text-embedding-004"), # "ollama": OllamaEmbeddings(model="nomic-embed-text"), } MODEL_DIMENSIONS: dict[ModelName, int] = { "openai": 1536, # "vertex": 768, }
def get_llm(provider: str, model: str) -> BaseChatModel: return { "openai": lambda: ChatOpenAI(model=model, streaming=True), "vertex": lambda: ChatVertexAI(model=model), "ollama": lambda: ChatOllama(model=model), "vllm": lambda: ChatOpenAI(model=model, base_url=VLLM_URL), "azure": lambda: AzureChatOpenAI(deployment_name=model), }[provider]()
| VAR | REQUIRED | DEFAULT |
|---|---|---|
| SECRET_KEY_API | — (min 32 chars) | |
| GPT_API_KEY | — | |
| DEBUG_MODE | no | False |
| RATE_LIMIT_CHAT | no | 30/hour |
| HISTORY_TTL_DAYS | no | 30 |
| MAX_UPLOAD_MB | no | 20 |
| CORS_ALLOWED_ORIGINS | localhost | |
| REDIS_SSL | prod yes | false |
| CHUNK_SIZE | no | 1000 |
| RAG_TOP_K | no | 5 |
| PACKAGE | ROLE |
|---|---|
| fastapi | Framework API async — routing, validation Pydantic, streaming |
| langchain | RAG orchestration, LCEL chain, multi-turn prompt templates |
| langchain-openai | ChatOpenAI, OpenAIEmbeddings |
| langchain-qdrant | QdrantVectorStore, similarity_search_with_score |
| qdrant-client | scroll, set_payload, delete, count, create_collection |
| redis | Async client — list ops (rpush, ltrim, lrange), scan, TTL |
| google-cloud-firestore | User store, refresh tokens, audit trail, ownership registry |
| PyMuPDF | PDF text extraction page by page (PyMuPDFLoader) |
| passlib[bcrypt] | bcrypt_sha256 — password hashing without 72-byte limit |
| PyJWT | JWT HS256 creation and decoding with typed claims |
| slowapi | Rate limiting by IP basé sur limits |
| uvicorn | ASGI server — production without --reload |
| aiofiles | Async file reading — SHA-256 PDF hash |
| langchain-text-splitters | RecursiveCharacterTextSplitter with legal separators |