SKAZY · KEVIN · Directeur Technique
Mındly
API
FastAPILangChainQdrant RedisFirestoreOpenAI Docker~192 tests
DESCRIPTION API RAG multi-tenant construite avec FastAPI. Permet de déployer des assistants conversationnels spécialisés par domaine à partir de corpus documentaires PDF, avec streaming token-par-token, historique persistant et isolation complète des données par utilisateur.
CONTEXTE MÉTIER Initialement conçu pour l'assistance droit du travail en Nouvelle-Calédonie (conventions collectives, secteurs banque / commerce / manutention portuaire). Architecture agnostique du contenu — adaptable à tout corpus documentaire métier.
192
TESTS UNITAIRES
RAG
RETRIEVAL-AUGMENTED GEN.
JWT
AUTH + REFRESH ROTATION
vLLM
MULTI-PROVIDER PRÊT
01
ARCHTECTURE ROUTER → SERVICE → MODULE
PRINCIPE

Trois couches strictement séparées avec un sens unique de dépendance. Les routers sont des couches HTTP minces — ils valident, appliquent le contrôle de rôle par dépendance FastAPI, et délèguent. Toute la logique réelle vit dans les services, qui orchestrent les modules d'infrastructure.

Cette séparation permet de tester les services indépendamment du protocole HTTP, et de remplacer n'importe quelle brique technique sans toucher aux routers.

Le user store illustre la même rigueur : une classe abstraite UserStore définit le contrat, FirestoreUserStore l'implémente, et une façade permet d'injecter un faux store en test.

COUCHES
ROUTERS
auth.py
chat.py
documents.py
user.py
histo.py
vectorstore.py
SERVICES
auth_service
chat_service
document_service
user_service
vectorstore_service
MODULES
security
rag
qdrant_service
history_service
user_store
vector_base_registry
INFRA
Qdrant
Redis
Firestore
OpenAI
DÉMARRAGE FAIL-FAST (main.py)
PYTHONsrc/main.py
def _validate_startup_config() -> None:
    errors: list[str] = []
    if not config.SECRET_KEY_API or len(config.SECRET_KEY_API) < 32:
        errors.append("SECRET_KEY_API doit contenir au moins 32 caractères.")
    if not config.GPT_API_KEY:
        errors.append("GPT_API_KEY est requise.")
    if errors:
        for err in errors:
            logger.critical("Configuration invalide : %s", err)
        sys.exit(1)

# Swagger désactivé en production
app = FastAPI(
    docs_url="/docs" if config.DEBUG_MODE else None,
    redoc_url="/redoc" if config.DEBUG_MODE else None,
    on_startup=[verify_redis_connection],
)
ABSTRACT USER STORE — INJECTABLE POUR LES TESTS
PYTHONsrc/modules/user_store_base.py
class UserStore(ABC):
    @abstractmethod
    async def authenticate_user(self, username: str, password: str) -> dict | None: ...
    @abstractmethod
    async def create_user_with_config(self, *, username: str, ...) -> dict: ...
    @abstractmethod
    async def store_refresh_token(self, *, username: str, ...) -> None: ...

# Façade publique — injection via set_user_store() dans les tests
_USER_STORE: UserStore = FirestoreUserStore.from_env()

def set_user_store(store: UserStore) -> None:
    global _USER_STORE
    _USER_STORE = store
02
CYCLE D'UNE REQUÊTE POST /CHAT/STREAM

Avant de générer le moindre token, la requête traverse une chaîne de validations qui échouent vite et clairement. Chaque étape a son propre code d'erreur HTTP.

01
JWT auth
get_current_user
token + user actif
→ 401
02
Config user
Firestore
config complète
→ 403
03
Base config.
index_name défini
→ 400
04
Ownership
rôle rh seulement
user_owns_base
→ 404
05
Collection
Qdrant exists
→ 404
06
Stream
Redis history
RAG + OpenAI
token/token
BUILD_CHAT_CONTEXT (services/chat_service.py)
PYTHONservices/chat_service.py
user_config = await get_user_config_from_store(username)
if user_config is None:
    raise HTTPException(403, "Utilisateur non enregistre")

index_name = user_config.get("index_name")
if not index_name:
    raise HTTPException(400, "Aucune base vectorielle configuree")

if current_user.get("role") == "rh" and not await user_owns_vector_base(
        int(current_user["id"]), str(index_name)):
    raise HTTPException(404, "La collection n'existe pas dans Qdrant.")

if not await collection_exists(index_name):
    raise HTTPException(404, "La collection n'existe pas dans Qdrant.")
La réponse est diffusée en text/plain au fil de l'eau. L'enregistrement de l'échange se fait dans une tâche de fond, après la fin du stream, pour ne pas retarder l'affichage côté client.
03
PIPELINE RAG RETRIEVAL-AUGMENTED GENERATION
  • Ingestion & parsing. Chargement du PDF via PyMuPDF, page par page.
  • Nettoyage. Normalisation Unicode, suppression des numéros de page et du boilerplate répété sur ≥60% des pages.
  • Chunking. Découpage récursif avec des séparateurs adaptés au juridique (Article, Chapitre), chevauchement contrôlé (200 chars sur 1000).
  • Déduplication. Hash SHA-1 des chunks normalisés + hash SHA-256 du fichier complet pour refuser un document déjà présent (409).
  • Vectorisation. Embeddings OpenAI stockés dans la collection Qdrant du propriétaire.
  • Récupération. Recherche de similarité, regroupée par priorité de document puis triée par score décroissant.
TRI PRIORITÉ + SCORE (modules/rag.py)
PYTHONmodules/rag.py
def _query_qdrant_sync(store, query, *, top_k=5):
    results = store.similarity_search_with_score(query, k=top_k)
    grouped = {}
    for doc, score in results:
        prio = doc.metadata.get("priority", float("inf"))
        grouped.setdefault(prio, []).append((doc, score))
    ordered = []
    for prio in sorted(grouped):
        ordered.extend(
            sorted(grouped[prio], key=lambda t: t[1], reverse=True)
        )
    return ordered
DÉTECTION BOILERPLATE (modules/text_preprocessing.py)
PYTHONmodules/text_preprocessing.py
# ligne vue sur >= 60% des pages = bruit répété
min_occurrences = max(3, ceil(
    len(documents) * BOILERPLATE_RATIO_THRESHOLD
))
repeated = {
    line_key
    for line_key, count in page_level_counter.items()
    if count >= min_occurrences
}

Le prompt système impose au modèle de répondre uniquement à partir du contexte fourni, de citer l'article et la source, et d'admettre l'absence de réponse plutôt que d'inventer.

STREAMING LCEL (modules/rag.py)
PYTHONmodules/rag.py — get_response_async_openai
rag_chain = (
    {"context": lambda x: x["context"], "question": lambda x: x["question"],
     "history": lambda x: x["history"]}
    | prompt | llm | StrOutputParser()
)
async for chunk in rag_chain.astream(chain_input):
    yield chunk  # → StreamingResponse text/plain
TERMINAL — PIPELINE EN ACTION
MINDLY-API — PIPELINE RAG TRACE
04
CLOISONNEMENT OWNERSHIP · ROLLBACK ATOMIQUE

Le multi-tenant ne repose pas sur une convention de nommage mais sur un registre Firestore : chaque base vectorielle est associée à un owner_user_id. Toute opération vérifie cette appartenance avant d'agir.

RÔLES
rh vs agent
Un rh crée, liste et gère ses propres bases et documents. Un agent consomme une base assignée sans pouvoir modifier sa configuration.
OWNERSHIP
Vérifié partout
Chat, upload, suppression, priorités : chaque chemin passe par user_owns_vector_base. La clé Firestore est le SHA-256 du nom de base — stable sur tout le cycle de vie.
CRÉATION
Rollback atomique
Création Qdrant puis enregistrement Firestore. Si Firestore échoue, la collection Qdrant est supprimée immédiatement. Pas de collection orpheline possible.
ÉTATS
Incohérences gérées
Si Qdrant est absent mais Firestore toujours enregistré (crash partiel), la suppression nettoie l'orphelin Firestore sans erreur HTTP.
VECTORSTORE_SERVICE.PY — ROLLBACK
PYTHONservices/vectorstore_service.py
await create_new_vector_store(collection_name, model_name)
try:
    await register_vector_base(base_name=collection_name, ...)
except Exception:
    logger.exception("Firestore KO — rollback Qdrant")
    try:
        await delete_vector_store(collection_name)
    except Exception:
        logger.exception("Rollback Qdrant échoué — orphelin possible")
    raise
05
SÉCURITÉ BCRYPT · JWT · RATE LIMIT · AUDIT
bcrypt_sha256
Contourne la limite de 72 octets de bcrypt natif. verify_password lève ValueError si le hash stocké est un plaintext legacy — migration obligatoire avant déploiement.
JWT double vérification
Claims : username, user_id, jti (UUID hex 16 bits, non-rejouable), type (access/refresh). À chaque requête : décodage JWT + vérification user_id contre Firestore. Protège contre les tokens orphelins.
Refresh token rotation
À chaque refresh : ancien token révoqué, tous les tokens actifs précédents aussi (pas de multi-session). Tokens expirés ou révoqués >30j purgés. Tout en un batch Firestore atomique. Hash SHA-256 stocké — jamais le token brut.
Rate limiting par IP
slowapi sur 4 routes critiques : login (5/min), refresh (10/min), upload (10/h), chat (30/h). Configurable via env vars. Retourne HTTP 429.
Validation Pydantic stricte
StrongPassword : min 8, regex (?=.*[a-z])(?=.*[A-Z])(?=.*\d). Message chat : max 4000 chars. Filename DELETE : pattern ^[\w.\- ]+$, max 255. Tout input validé avant la logique métier.
Audit trail Firestore
Toute création / modification / suppression génère un document dans user_config_audit avec actor_user_id, target_user_id, old_config, new_config, timestamp UTC.
SECURITY.PY — JWT + HASH TOKEN
PYTHONsrc/modules/security.py
def _create_token(*, username, user_id, token_type, expires_delta):
    now = datetime.now(timezone.utc)
    payload = {
        "username": username, "user_id": int(user_id),
        "type": token_type,
        "jti": secrets.token_hex(16),  # non-rejouable
        "iat": int(now.timestamp()),
        "exp": int((now + expires_delta).timestamp()),
    }
    return jwt.encode(payload, _get_secret_key(), algorithm="HS256")

def hash_token(token: str) -> str:
    # SHA-256 du refresh token — jamais le brut en base
    return hashlib.sha256(token.encode("utf-8")).hexdigest()

pwd_context = CryptContext(schemes=["bcrypt_sha256", "bcrypt"], deprecated="auto")
En production, DEBUG_MODE=False : les routes /docs et /redoc disparaissent et les stack traces ne sont jamais renvoyées au client — uniquement journalisées côté serveur.
06
HISTORIQUE REDIS FENÊTRE GLISSANTE · TTL 30J
PRINCIPE

Chaque message est stocké comme un élément JSON dans une Redis List (history:{username}). Fenêtre glissante maintenue atomiquement : rpush + ltrim(-max, -1). Le TTL est rafraîchi à chaque push — il glisse avec l'activité.

Migration automatique depuis l'ancien format (JSON string) vers le nouveau format liste via ensure_history_is_list — rétrocompatibilité sans downtime.

Les derniers documents RAG récupérés sont stockés séparément (retrieved_docs:{username}) avec TTL de 15 minutes, exposés via GET /chat/retrieved_documents.

HISTORY_SERVICE.PY
PYTHONmodules/history_service.py
async def push_user_history(
    username: str, new_message: dict, max_length: int | None
) -> None:
    key = f"history:{username}"
    if max_length is None:
        max_length = DEFAULT_MAX_HISTORY_LENGTH  # 5

    await _redis_call(redis_client.rpush, key, json.dumps(new_message))
    await _redis_call(redis_client.ltrim, key, -max_length, -1)

    if HISTORY_TTL_SECONDS:
        await _redis_call(redis_client.expire, key, HISTORY_TTL_SECONDS)
07
MULTI-PROVIDER AGNOSTIQUE AU MODÈLE
Deux axes distincts, souvent confondus. L'embedding fixe la dimension du vecteur et conditionne la collection — il est figé par base, sous peine de comparer des vecteurs incomparables. La génération est interchangeable par requête. Un design multi-provider sérieux sépare ces deux axes plutôt que de les piloter avec un seul champ model.
PROVIDERS CIBLES
OpenAI
gpt-4o-2024-08-06
embeddings ada-002
1536 dims
vLLM
Qwen3 / DeepSeek
2×H100 self-hosted
API OpenAI compat.
Vertex AI
Gemini Pro / Flash
ChatVertexAI
GCP natif
Ollama
ChatOllama
localhost / network
zero cloud
Azure OpenAI
AzureChatOpenAI
compliance entreprise
région EU possible
REGISTRE ACTUEL — EMBEDDINGS (qdrant_service.py)
PYTHONmodules/qdrant_service.py
# Registre extensible — 1 ligne par provider
EMBEDDINGS_REGISTRY: dict[str, Any] = {
    "openai": OpenAIEmbeddings(openai_api_key=GPT_API_KEY),
    # "vertex": VertexAIEmbeddings(model="text-embedding-004"),
    # "ollama": OllamaEmbeddings(model="nomic-embed-text"),
}
MODEL_DIMENSIONS: dict[ModelName, int] = {
    "openai": 1536,
    # "vertex": 768,
}
FACTORY LLM — CIBLE (modules/rag.py)
PYTHONmodules/rag.pyCIBLE
def get_llm(provider: str, model: str) -> BaseChatModel:
    return {
        "openai": lambda: ChatOpenAI(model=model, streaming=True),
        "vertex": lambda: ChatVertexAI(model=model),
        "ollama": lambda: ChatOllama(model=model),
        "vllm":   lambda: ChatOpenAI(model=model, base_url=VLLM_URL),
        "azure":  lambda: AzureChatOpenAI(deployment_name=model),
    }[provider]()
08
DÉPLOIEMENT DOCKER COMPOSE · 4 SERVICES
SERVICES
API
chatbot_fastapi
python:3.13-slim · port 8000 · volume pkgs (cache pip) · uvicorn prod
VECTOR DB
qdrant
qdrant/qdrant:latest · ports 6333/6334 · volume ./qdrant_data persisté
CACHE
redis
redis:latest · port 6379 · password + SSL recommandés en prod
FIRESTORE
firestore-emulator
google-cloud-cli:emulators · port 8080 · UI optionnelle via profil dev-ui
VARIABLES D'ENVIRONNEMENT
VARREQUISDÉFAUT
SECRET_KEY_API— (min 32 chars)
GPT_API_KEY
DEBUG_MODEnonFalse
RATE_LIMIT_CHATnon30/hour
HISTORY_TTL_DAYSnon30
MAX_UPLOAD_MBnon20
CORS_ALLOWED_ORIGINSlocalhost
REDIS_SSLprod ouifalse
CHUNK_SIZEnon1000
RAG_TOP_Knon5
09
TESTS ~192 TESTS UNITAIRES
SÉCURITÉ
test_security.py
Hachage bcrypt_sha256, génération/validation JWT, vérification type token, cas limites (payload invalide, user_id ≤ 0).
HISTORIQUE
test_history_service.py
Push, get, reset, trim, list_keys, retrieved_docs. Migration legacy JSON string → Redis list. TTL refresh.
CHAT
test_chat_service.py
Validation contexte : config manquante, base vide, ownership, collection Qdrant absente.
INDEXATION
test_process_pdf.py
Pipeline complet : loader, splitter, dédup chunks, insertion Qdrant. Cas vide, cas doublon SHA-256.
QDRANT
test_qdrant_service.py
List, create, delete collections. Filtres payload nested vs direct. Priority update, reset.
INTÉGRATION
test_integration_api.py
Login end-to-end, validation mot de passe fort, validation filename, auth 401/403.
10
RÉFÉRENCE API JWT BEARER · RATE LIMITED
AUTHENTIFICATION
POST
/login
Credentials → paire access + refresh tokenPUBLIC · 5/min
POST
/refresh
Rotation tokens — révoque l'ancien, émet un nouveau pairPUBLIC · 10/min
POST
/logout
Révoque le refresh_token courant en FirestoreAUTH
CHAT
POST
/chat/stream
Streaming RAG token/token — StreamingResponse text/plainAUTH · 30/h
GET
/chat/retrieved_documents
Documents RAG du dernier appel (TTL 15 min)AUTH
USERS
GET
/get_user_config
Configuration de l'utilisateur connectéAUTH
GET
/list_users
Liste des usernames actifsRH
POST
/create_new_user
Crée user + hash bcrypt_sha256 + audit FirestoreRH
PUT
/users/{username}/config
Modifie config utilisateur cible — tracé en auditRH
DELETE
/users/{username}
Supprime user + tokens + historique Redis — batch atomiqueRH
DOCUMENTS
GET
/document_list
PDFs indexés avec priorité et document_idRH
POST
/upload_document
Upload PDF → nettoyage → chunking → dédupliqué → QdrantRH · 10/h
DELETE
/delete_document
Supprime tous les chunks d'un document par nomRH
PUT
/set_new_documents_priority
Réordonne les priorités — impacte le tri RAGRH
PUT
/reset_document_priority
Remet toutes les priorités à nullRH
VECTORSTORE
GET
/get_vectorbase_list
Collections Qdrant possédées par l'utilisateurRH
POST
/create_new_vector_base
Crée collection Qdrant + enregistrement Firestore ownershipRH
DELETE
/delete_vector_base
Supprime Qdrant + registre Firestore — gère les orphelinsRH
GET
/show_history
Historique de conversation de l'utilisateurAUTH
DELETE
/clean_history
Réinitialise l'historique RedisAUTH
PUT
/modify_history_length
Modifie la fenêtre max de l'historiqueRH
11
STACK TECHNIQUE FASTAPI
PACKAGERÔLE
fastapiFramework API async — routing, validation Pydantic, streaming
langchainOrchestration RAG, LCEL chain, prompt templates multi-tours
langchain-openaiChatOpenAI, OpenAIEmbeddings
langchain-qdrantQdrantVectorStore, similarity_search_with_score
qdrant-clientscroll, set_payload, delete, count, create_collection
redisClient async — list ops (rpush, ltrim, lrange), scan, TTL
google-cloud-firestoreUser store, refresh tokens, audit trail, ownership registry
PyMuPDFExtraction texte PDF page par page (PyMuPDFLoader)
passlib[bcrypt]bcrypt_sha256 — hachage mots de passe sans limite 72 octets
PyJWTCréation et décodage JWT HS256 avec claims typés
slowapiRate limiting par IP basé sur limits
uvicornServeur ASGI — production sans --reload
aiofilesLecture async fichiers — hash SHA-256 PDF
langchain-text-splittersRecursiveCharacterTextSplitter avec séparateurs juridiques