import os import httpx from mcp.server.fastmcp import FastMCP from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct import uuid import logging import io from pypdf import PdfReader # Configuration QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant.openshift-gitops.svc:6333") TEI_URL = os.getenv("TEI_URL", "http://text-embeddings.tei.svc.cluster.local:8080") EMBEDDING_DIM = 768 # BAAI/bge-base-en-v1.5 # Initialize mcp = FastMCP("knowledge-mcp") qdrant = QdrantClient(url=QDRANT_URL) def get_embedding(text: str) -> list[float]: """Get embedding from TEI.""" url = f"{TEI_URL}/embed" try: response = httpx.post(url, json={"inputs": text}, timeout=10.0) response.raise_for_status() return response.json()[0] except Exception as e: logging.error(f"Embedding failed: {e}") raise def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]: """Sliding window chunking.""" if len(text) <= chunk_size: return [text] chunks = [] start = 0 while start < len(text): end = start + chunk_size chunks.append(text[start:end]) start += (chunk_size - overlap) return chunks @mcp.tool() def create_notebook(name: str) -> str: """Create a new RAG notebook (Qdrant collection).""" clean_name = name.lower().replace(" ", "-") # Check if exists if qdrant.collection_exists(clean_name): return f"Notebook '{clean_name}' already exists." qdrant.create_collection( collection_name=clean_name, vectors_config=VectorParams(size=EMBEDDING_DIM, distance=Distance.COSINE), ) return f"Notebook '{clean_name}' created successfully." @mcp.tool() def add_source(notebook: str, content: str, source_name: str, format: str = "text") -> str: """ Add content to a notebook. format: 'text' or 'pdf_path' (local path inside container) """ if not qdrant.collection_exists(notebook): return f"Error: Notebook '{notebook}' does not exist." text_to_process = "" if format == "pdf_path": try: reader = PdfReader(content) for page in reader.pages: text_to_process += page.extract_text() + "\n" except Exception as e: return f"Error reading PDF: {e}" else: text_to_process = content chunks = chunk_text(text_to_process) points = [] for i, chunk in enumerate(chunks): try: vector = get_embedding(chunk) points.append(PointStruct( id=str(uuid.uuid4()), vector=vector, payload={ "source": source_name, "text": chunk, "chunk_index": i, "total_chunks": len(chunks) } )) except Exception as e: logging.error(f"Failed to embed chunk {i}: {e}") continue if points: qdrant.upsert(collection_name=notebook, points=points) return f"Added {len(points)} chunks from '{source_name}' to '{notebook}'." return "No content added (empty or failed)." @mcp.tool() def query_notebook(notebook: str, query: str, limit: int = 5) -> str: """Query the notebook for relevant context.""" if not qdrant.collection_exists(notebook): return f"Error: Notebook '{notebook}' does not exist." try: vector = get_embedding(query) hits = qdrant.search( collection_name=notebook, query_vector=vector, limit=limit ) results = [] for hit in hits: score = hit.score text = hit.payload.get('text', '').replace('\n', ' ') source = hit.payload.get('source', 'unknown') results.append(f"[{score:.2f}] {source}: {text}...") return "\n".join(results) except Exception as e: return f"Query failed: {e}" if __name__ == "__main__": mcp.run()