SKAZY · KEVIN · CTO
Mındly
API
FastAPILangChainQdrant RedisFirestoreOpenAI Docker~192 tests
DESCRIPTION Multi-tenant RAG API built with FastAPI. Deploy domain-specific conversational assistants from PDF document corpora, with token-by-token streaming, persistent history and full per-user data isolation.
BUSINESS CONTEXT Originally built for labor law assistance in New Caledonia (collective agreements, banking / retail / port handling sectors). Content-agnostic architecture — adaptable to any professional document corpus.
192
UNIT TESTS
RAG
RETRIEVAL-AUGMENTED GEN.
JWT
AUTH + REFRESH ROTATION
vLLM
MULTI-PROVIDER READY
01
ARCHTECTURE ROUTER → SERVICE → MODULE
PRINCIPLE

Three strictly separated layers with a single dependency direction. Routers are thin HTTP layers — they validate, apply role-based access control via FastAPI dependencies, and delegate. All real logic lives in services, which orchestrate infrastructure modules.

This separation allows testing services independently of the HTTP protocol, and replacing any technical component without touching the routers.

The user store follows the same principle: an abstract UserStore class defines the contract, FirestoreUserStore implements it, and a facade allows injecting a mock store in tests.

LAYERS
ROUTERS
auth.py
chat.py
documents.py
user.py
histo.py
vectorstore.py
SERVICES
auth_service
chat_service
document_service
user_service
vectorstore_service
MODULES
security
rag
qdrant_service
history_service
user_store
vector_base_registry
INFRA
Qdrant
Redis
Firestore
OpenAI
FAIL-FAST STARTUP (main.py)
PYTHONsrc/main.py
def _validate_startup_config() -> None:
    errors: list[str] = []
    if not config.SECRET_KEY_API or len(config.SECRET_KEY_API) < 32:
        errors.append("SECRET_KEY_API must be at least 32 characters.")
    if not config.GPT_API_KEY:
        errors.append("GPT_API_KEY is required.")
    if errors:
        for err in errors:
            logger.critical("Configuration invalide : %s", err)
        sys.exit(1)

# Swagger disabled in production
app = FastAPI(
    docs_url="/docs" if config.DEBUG_MODE else None,
    redoc_url="/redoc" if config.DEBUG_MODE else None,
    on_startup=[verify_redis_connection],
)
ABSTRACT USER STORE — INJECTABLE FOR TESTING
PYTHONsrc/modules/user_store_base.py
class UserStore(ABC):
    @abstractmethod
    async def authenticate_user(self, username: str, password: str) -> dict | None: ...
    @abstractmethod
    async def create_user_with_config(self, *, username: str, ...) -> dict: ...
    @abstractmethod
    async def store_refresh_token(self, *, username: str, ...) -> None: ...

# Public facade — inject via set_user_store() in tests
_USER_STORE: UserStore = FirestoreUserStore.from_env()

def set_user_store(store: UserStore) -> None:
    global _USER_STORE
    _USER_STORE = store
02
REQUEST LIFECYCLE POST /CHAT/STREAM

Before generating a single token, the request goes through a chain of validations that fail fast and clearly. Each step has its own HTTP error code.

01
JWT auth
get_current_user
token + active user
→ 401
02
User config
Firestore
full config
→ 403
03
Base config.
index_name defined
→ 400
04
Ownership
rh role only
user_owns_base
→ 404
05
Collection
Qdrant exists
→ 404
06
Stream
Redis history
RAG + OpenAI
token/token
BUILD_CHAT_CONTEXT (services/chat_service.py)
PYTHONservices/chat_service.py
user_config = await get_user_config_from_store(username)
if user_config is None:
    raise HTTPException(403, "User not registered")

index_name = user_config.get("index_name")
if not index_name:
    raise HTTPException(400, "No vector base configured")

if current_user.get("role") == "rh" and not await user_owns_vector_base(
        int(current_user["id"]), str(index_name)):
    raise HTTPException(404, "Collection does not exist in Qdrant.")

if not await collection_exists(index_name):
    raise HTTPException(404, "Collection does not exist in Qdrant.")
The response is streamed as text/plain in real time. The exchange is saved in a background task, after the stream ends, to avoid delaying the client display.
03
RAG PIPELINE RETRIEVAL-AUGMENTED GENERATION
  • Ingestion & parsing. PDF loading via PyMuPDF, page by page.
  • Cleaning. Unicode normalization, removal of page numbers and boilerplate repeated on ≥60% of pages.
  • Chunking. Recursive splitting with legal-specific separators (Article, Chapter), controlled overlap (200 chars over 1000).
  • Deduplication. SHA-1 hash of normalized chunks + SHA-256 hash of the full file to reject already-indexed documents (409).
  • Vectorization. OpenAI embeddings stored in the owner's Qdrant collection.
  • Retrieval. Similarity search, grouped by document priority then sorted by descending score.
PRIORITY + SCORE SORT (modules/rag.py)
PYTHONmodules/rag.py
def _query_qdrant_sync(store, query, *, top_k=5):
    results = store.similarity_search_with_score(query, k=top_k)
    grouped = {}
    for doc, score in results:
        prio = doc.metadata.get("priority", float("inf"))
        grouped.setdefault(prio, []).append((doc, score))
    ordered = []
    for prio in sorted(grouped):
        ordered.extend(
            sorted(grouped[prio], key=lambda t: t[1], reverse=True)
        )
    return ordered
BOILERPLATE DETECTION (modules/text_preprocessing.py)
PYTHONmodules/text_preprocessing.py
# line seen on >= 60% of pages = repeated noise
min_occurrences = max(3, ceil(
    len(documents) * BOILERPLATE_RATIO_THRESHOLD
))
repeated = {
    line_key
    for line_key, count in page_level_counter.items()
    if count >= min_occurrences
}

The system prompt instructs the model to answer only from the provided context, cite the article and source, and admit when an answer is not available rather than hallucinating.

LCEL STREAMING (modules/rag.py)
PYTHONmodules/rag.py — get_response_async_openai
rag_chain = (
    {"context": lambda x: x["context"], "question": lambda x: x["question"],
     "history": lambda x: x["history"]}
    | prompt | llm | StrOutputParser()
)
async for chunk in rag_chain.astream(chain_input):
    yield chunk  # → StreamingResponse text/plain
TERMINAL — LIVE PIPELINE TRACE
MINDLY-API — RAG PIPELINE TRACE
04
DATA ISOLATION OWNERSHIP · ATOMIC ROLLBACK

Multi-tenancy does not rely on naming conventions but on a Firestore registry: each vector base is linked to an owner_user_id. Every operation verifies ownership before acting.

ROLES
rh vs agent
An rh user creates, lists and manages their own bases and documents. An agent consumes an assigned base without being able to modify its configuration.
OWNERSHIP
Verified everywhere
Chat, upload, deletion, priorities: every path goes through user_owns_vector_base. The Firestore key is the SHA-256 of the base name — stable throughout the lifecycle.
CREATION
Atomic rollback
Qdrant creation then Firestore registration. If Firestore fails, the Qdrant collection is immediately deleted. No orphaned collections possible.
STATES
Inconsistencies handled
If Qdrant is absent but Firestore still registered (partial crash), deletion cleans up the Firestore orphan without an HTTP error.
VECTORSTORE_SERVICE.PY — ROLLBACK
PYTHONservices/vectorstore_service.py
await create_new_vector_store(collection_name, model_name)
try:
    await register_vector_base(base_name=collection_name, ...)
except Exception:
    logger.exception("Firestore KO — rollback Qdrant")
    try:
        await delete_vector_store(collection_name)
    except Exception:
        logger.exception("Rollback Qdrant échoué — orphelin possible")
    raise
05
SECURITY BCRYPT · JWT · RATE LIMIT · AUDIT
bcrypt_sha256
Bypasses the 72-byte limit of native bcrypt. verify_password raises ValueError if the stored hash is legacy plaintext — migration required before deployment.
JWT double verification
Claims: username, user_id, jti (16-bit hex UUID, non-replayable), type (access/refresh). Each request: JWT decode + user_id check against Firestore. Protects against orphaned tokens.
Refresh token rotation
On each refresh: old token revoked, all previous active tokens too (no multi-session). Expired or revoked tokens >30d purged. All in one atomic Firestore batch. SHA-256 hash stored — never the raw token.
Rate limiting by IP
slowapi on 4 critical routes: login (5/min), refresh (10/min), upload (10/h), chat (30/h). Configurable via env vars. Returns HTTP 429.
Strict Pydantic validation
StrongPassword: min 8, regex (?=.*[a-z])(?=.*[A-Z])(?=.*\d). Chat message: max 4000 chars. DELETE filename: pattern ^[\w.\- ]+$, max 255. All input validated before business logic.
Firestore audit trail
Every creation / update / deletion generates a document in user_config_audit with actor_user_id, target_user_id, old_config, new_config, UTC timestamp.
SECURITY.PY — JWT + HASH TOKEN
PYTHONsrc/modules/security.py
def _create_token(*, username, user_id, token_type, expires_delta):
    now = datetime.now(timezone.utc)
    payload = {
        "username": username, "user_id": int(user_id),
        "type": token_type,
        "jti": secrets.token_hex(16),  # non-replayable
        "iat": int(now.timestamp()),
        "exp": int((now + expires_delta).timestamp()),
    }
    return jwt.encode(payload, _get_secret_key(), algorithm="HS256")

def hash_token(token: str) -> str:
    # SHA-256 of refresh token — never store raw
    return hashlib.sha256(token.encode("utf-8")).hexdigest()

pwd_context = CryptContext(schemes=["bcrypt_sha256", "bcrypt"], deprecated="auto")
In production, DEBUG_MODE=False: /docs and /redoc routes disappear and stack traces are never returned to the client — only logged server-side.
06
REDIS HISTORY SLIDING WINDOW · TTL 30D
PRINCIPLE

Each message is stored as a JSON element in a Redis List (history:{username}). Sliding window maintained atomically: rpush + ltrim(-max, -1). TTL is refreshed on each push — it slides with activity.

Automatic migration from the old format (JSON string) to the new list format via ensure_history_is_list — backward compatibility without downtime.

The last retrieved RAG documents are stored separately (retrieved_docs:{username}) with a 15-minute TTL, exposed via GET /chat/retrieved_documents.

HISTORY_SERVICE.PY
PYTHONmodules/history_service.py
async def push_user_history(
    username: str, new_message: dict, max_length: int | None
) -> None:
    key = f"history:{username}"
    if max_length is None:
        max_length = DEFAULT_MAX_HISTORY_LENGTH  # 5

    await _redis_call(redis_client.rpush, key, json.dumps(new_message))
    await _redis_call(redis_client.ltrim, key, -max_length, -1)

    if HISTORY_TTL_SECONDS:
        await _redis_call(redis_client.expire, key, HISTORY_TTL_SECONDS)
07
MULTI-PROVIDER MODEL-AGNOSTIC
Two distinct axes, often confused. Embedding fixes the vector dimension and conditions the collection — it is locked per base, otherwise you compare incomparable vectors. Generation is swappable per request. A serious multi-provider design separates these two axes rather than controlling both with a single model field.
TARGET PROVIDERS
OpenAI
gpt-4o-2024-08-06
embeddings ada-002
1536 dims
vLLM
Qwen3 / DeepSeek
2×H100 self-hosted
API OpenAI compat.
Vertex AI
Gemini Pro / Flash
ChatVertexAI
GCP native
Ollama
ChatOllama
localhost / network
zero cloud
Azure OpenAI
AzureChatOpenAI
enterprise compliance
EU region available
CURRENT REGISTRY — EMBEDDINGS (qdrant_service.py)
PYTHONmodules/qdrant_service.py
# Extensible registry — 1 line per provider
EMBEDDINGS_REGISTRY: dict[str, Any] = {
    "openai": OpenAIEmbeddings(openai_api_key=GPT_API_KEY),
    # "vertex": VertexAIEmbeddings(model="text-embedding-004"),
    # "ollama": OllamaEmbeddings(model="nomic-embed-text"),
}
MODEL_DIMENSIONS: dict[ModelName, int] = {
    "openai": 1536,
    # "vertex": 768,
}
LLM FACTORY — TARGET (modules/rag.py)
PYTHONmodules/rag.pyCIBLE
def get_llm(provider: str, model: str) -> BaseChatModel:
    return {
        "openai": lambda: ChatOpenAI(model=model, streaming=True),
        "vertex": lambda: ChatVertexAI(model=model),
        "ollama": lambda: ChatOllama(model=model),
        "vllm":   lambda: ChatOpenAI(model=model, base_url=VLLM_URL),
        "azure":  lambda: AzureChatOpenAI(deployment_name=model),
    }[provider]()
08
DEPLOYMENT DOCKER COMPOSE · 4 SERVICES
SERVICES
API
chatbot_fastapi
python:3.13-slim · port 8000 · volume pkgs (pip cache) · uvicorn prod
VECTOR DB
qdrant
qdrant/qdrant:latest · ports 6333/6334 · volume ./qdrant_data persisted
CACHE
redis
redis:latest · port 6379 · password + SSL recommended in prod
FIRESTORE
firestore-emulator
google-cloud-cli:emulators · port 8080 · optional UI via dev-ui profile
ENVIRONMENT VARIABLES
VARREQUIREDDEFAULT
SECRET_KEY_API— (min 32 chars)
GPT_API_KEY
DEBUG_MODEnoFalse
RATE_LIMIT_CHATno30/hour
HISTORY_TTL_DAYSno30
MAX_UPLOAD_MBno20
CORS_ALLOWED_ORIGINSlocalhost
REDIS_SSLprod yesfalse
CHUNK_SIZEno1000
RAG_TOP_Kno5
09
TESTS ~192 UNIT TESTS
SECURITY
test_security.py
bcrypt_sha256 hashing, JWT generation/validation, token type verification, edge cases (invalid payload, user_id ≤ 0).
HISTORIQUE
test_history_service.py
Push, get, reset, trim, list_keys, retrieved_docs. Legacy JSON string → Redis list migration. TTL refresh.
CHAT
test_chat_service.py
Context validation: missing config, empty base, ownership, absent Qdrant collection.
INDEXATION
test_process_pdf.py
Full pipeline: loader, splitter, chunk dedup, Qdrant insertion. Empty case, SHA-256 duplicate case.
QDRANT
test_qdrant_service.py
List, create, delete collections. Nested vs direct payload filters. Priority update, reset.
INTÉGRATION
test_integration_api.py
End-to-end login, strong password validation, filename validation, auth 401/403.
10
API REFERENCE JWT BEARER · RATE LIMITED
AUTHENTICATION
POST
/login
Credentials → access + refresh token pairPUBLIC · 5/min
POST
/refresh
Token rotation — revokes old, issues new pairPUBLIC · 10/min
POST
/logout
Revokes current refresh_token in FirestoreAUTH
CHAT
POST
/chat/stream
RAG streaming token-by-token — StreamingResponse text/plainAUTH · 30/h
GET
/chat/retrieved_documents
RAG documents from last call (TTL 15 min)AUTH
USERS
GET
/get_user_config
Connected user configurationAUTH
GET
/list_users
List active usernamesRH
POST
/create_new_user
Create user + bcrypt_sha256 hash + Firestore auditRH
PUT
/users/{username}/config
Modify target user config — traced in auditRH
DELETE
/users/{username}
Delete user + tokens + Redis history — atomic batchRH
DOCUMENTS
GET
/document_list
Indexed PDFs with priority and document_idRH
POST
/upload_document
Upload PDF → cleaning → chunking → deduplicated → QdrantRH · 10/h
DELETE
/delete_document
Delete all chunks of a document by nameRH
PUT
/set_new_documents_priority
Reorder priorities — impacts RAG sortingRH
PUT
/reset_document_priority
Reset all priorities to nullRH
VECTORSTORE
GET
/get_vectorbase_list
Qdrant collections owned by the userRH
POST
/create_new_vector_base
Create Qdrant collection + Firestore ownership registrationRH
DELETE
/delete_vector_base
Delete Qdrant + Firestore registry — handles orphansRH
GET
/show_history
User conversation historyAUTH
DELETE
/clean_history
Reset Redis historyAUTH
PUT
/modify_history_length
Modify maximum history windowRH
11
TECH STACK FASTAPI
PACKAGEROLE
fastapiFramework API async — routing, validation Pydantic, streaming
langchainRAG orchestration, LCEL chain, multi-turn prompt templates
langchain-openaiChatOpenAI, OpenAIEmbeddings
langchain-qdrantQdrantVectorStore, similarity_search_with_score
qdrant-clientscroll, set_payload, delete, count, create_collection
redisAsync client — list ops (rpush, ltrim, lrange), scan, TTL
google-cloud-firestoreUser store, refresh tokens, audit trail, ownership registry
PyMuPDFPDF text extraction page by page (PyMuPDFLoader)
passlib[bcrypt]bcrypt_sha256 — password hashing without 72-byte limit
PyJWTJWT HS256 creation and decoding with typed claims
slowapiRate limiting by IP basé sur limits
uvicornASGI server — production without --reload
aiofilesAsync file reading — SHA-256 PDF hash
langchain-text-splittersRecursiveCharacterTextSplitter with legal separators