diff --git a/manifests/deployment.yaml b/manifests/deployment.yaml index 2822646..807029f 100644 --- a/manifests/deployment.yaml +++ b/manifests/deployment.yaml @@ -23,8 +23,8 @@ spec: - containerPort: 8000 name: http env: - - name: QDRANT_URL - value: "http://qdrant.knowledge-mcp.svc:6333" + - name: DATABASE_URL + value: "postgresql://postgres:password@postgres.knowledge-mcp.svc:5432/knowledge" - name: TEI_URL value: "http://text-embeddings.tei.svc.cluster.local:8080" resources: diff --git a/manifests/postgres.yaml b/manifests/postgres.yaml new file mode 100644 index 0000000..03ecbfe --- /dev/null +++ b/manifests/postgres.yaml @@ -0,0 +1,66 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: postgres + namespace: knowledge-mcp + labels: + app: postgres +spec: + serviceName: postgres + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: ankane/pgvector:v0.5.1 + ports: + - containerPort: 5432 + name: postgres + env: + - name: POSTGRES_USER + value: "postgres" + - name: POSTGRES_PASSWORD + value: "password" + - name: POSTGRES_DB + value: "knowledge" + - name: PGDATA + value: "/var/lib/postgresql/data/pgdata" + volumeMounts: + - name: storage + mountPath: /var/lib/postgresql/data + resources: + requests: + cpu: "100m" + memory: "256Mi" + limits: + cpu: "500m" + memory: "512Mi" + volumeClaimTemplates: + - metadata: + name: storage + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 5Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: knowledge-mcp + labels: + app: postgres +spec: + ports: + - port: 5432 + targetPort: 5432 + name: postgres + selector: + app: postgres diff --git a/manifests/qdrant.yaml b/manifests/qdrant.yaml deleted file mode 100644 index 44a2d47..0000000 --- a/manifests/qdrant.yaml +++ /dev/null @@ -1,65 +0,0 @@ -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: qdrant - namespace: knowledge-mcp - labels: - app: qdrant -spec: - serviceName: qdrant - replicas: 1 - selector: - matchLabels: - app: qdrant - template: - metadata: - labels: - app: qdrant - spec: - containers: - - name: qdrant - image: qdrant/qdrant:v1.13.0 - ports: - - containerPort: 6333 - name: http - - containerPort: 6334 - name: grpc - env: - - name: QDRANT__STORAGE__STORAGE_PATH - value: /qdrant/storage - volumeMounts: - - name: storage - mountPath: /qdrant/storage - resources: - requests: - cpu: "200m" - memory: "512Mi" - limits: - cpu: "1000m" - memory: "1Gi" - volumeClaimTemplates: - - metadata: - name: storage - spec: - accessModes: [ "ReadWriteOnce" ] - resources: - requests: - storage: 10Gi ---- -apiVersion: v1 -kind: Service -metadata: - name: qdrant - namespace: knowledge-mcp - labels: - app: qdrant -spec: - ports: - - port: 6333 - targetPort: 6333 - name: http - - port: 6334 - targetPort: 6334 - name: grpc - selector: - app: qdrant diff --git a/requirements.txt b/requirements.txt index 061e6f1..4dc0c4d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ mcp httpx -qdrant-client +psycopg[binary] +pgvector beautifulsoup4 pypdf python-dotenv diff --git a/server.py b/server.py index c33a446..58f36de 100644 --- a/server.py +++ b/server.py @@ -1,21 +1,75 @@ import os import httpx from mcp.server.fastmcp import FastMCP -from qdrant_client import QdrantClient -from qdrant_client.models import Distance, VectorParams, PointStruct +import psycopg +from pgvector.psycopg import register_vector import uuid import logging -import io +import json from pypdf import PdfReader +from contextlib import contextmanager # Configuration -QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant.openshift-gitops.svc:6333") +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:password@postgres.knowledge-mcp.svc:5432/knowledge") TEI_URL = os.getenv("TEI_URL", "http://text-embeddings.tei.svc.cluster.local:8080") EMBEDDING_DIM = 768 # BAAI/bge-base-en-v1.5 # Initialize mcp = FastMCP("knowledge-mcp") -qdrant = QdrantClient(url=QDRANT_URL) + +@contextmanager +def get_db(): + """Provide a database connection.""" + conn = psycopg.connect(DATABASE_URL, autocommit=True) + # Register vector type handler + register_vector(conn) + try: + yield conn + finally: + conn.close() + +def init_db(): + """Initialize database schema.""" + try: + with get_db() as conn: + conn.execute("CREATE EXTENSION IF NOT EXISTS vector") + + # Notebooks table (simple registry) + conn.execute(""" + CREATE TABLE IF NOT EXISTS notebooks ( + name TEXT PRIMARY KEY, + created_at TIMESTAMP DEFAULT NOW() + ) + """) + + # Chunks table + conn.execute(f""" + CREATE TABLE IF NOT EXISTS chunks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + notebook TEXT REFERENCES notebooks(name) ON DELETE CASCADE, + content TEXT NOT NULL, + embedding VECTOR({EMBEDDING_DIM}), + source TEXT, + metadata JSONB, + created_at TIMESTAMP DEFAULT NOW() + ) + """) + + # Index for fast search + conn.execute(""" + CREATE INDEX IF NOT EXISTS chunks_embedding_idx ON chunks + USING hnsw (embedding vector_cosine_ops) + """) + logging.info("Database initialized successfully.") + except Exception as e: + logging.error(f"Database initialization failed: {e}") + +# Run init on import (or startup) +# In a real app, this might be a separate migration step, but for MCP self-contained: +try: + init_db() +except Exception as e: + logging.warning(f"Could not initialize DB immediately (might be waiting for connection): {e}") def get_embedding(text: str) -> list[float]: """Get embedding from TEI.""" @@ -43,18 +97,20 @@ def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str] @mcp.tool() def create_notebook(name: str) -> str: - """Create a new RAG notebook (Qdrant collection).""" + """Create a new RAG notebook.""" clean_name = name.lower().replace(" ", "-") - # Check if exists - if qdrant.collection_exists(clean_name): - return f"Notebook '{clean_name}' already exists." - - qdrant.create_collection( - collection_name=clean_name, - vectors_config=VectorParams(size=EMBEDDING_DIM, distance=Distance.COSINE), - ) - return f"Notebook '{clean_name}' created successfully." + try: + with get_db() as conn: + # Check existence + res = conn.execute("SELECT 1 FROM notebooks WHERE name = %s", (clean_name,)).fetchone() + if res: + return f"Notebook '{clean_name}' already exists." + + conn.execute("INSERT INTO notebooks (name) VALUES (%s)", (clean_name,)) + return f"Notebook '{clean_name}' created successfully." + except Exception as e: + return f"Error creating notebook: {e}" @mcp.tool() def add_source(notebook: str, content: str, source_name: str, format: str = "text") -> str: @@ -62,8 +118,14 @@ def add_source(notebook: str, content: str, source_name: str, format: str = "tex Add content to a notebook. format: 'text' or 'pdf_path' (local path inside container) """ - if not qdrant.collection_exists(notebook): - return f"Error: Notebook '{notebook}' does not exist." + clean_name = notebook.lower().replace(" ", "-") + + try: + with get_db() as conn: + if not conn.execute("SELECT 1 FROM notebooks WHERE name = %s", (clean_name,)).fetchone(): + return f"Error: Notebook '{clean_name}' does not exist." + except Exception as e: + return f"Database error: {e}" text_to_process = "" @@ -78,52 +140,63 @@ def add_source(notebook: str, content: str, source_name: str, format: str = "tex text_to_process = content chunks = chunk_text(text_to_process) - points = [] + count = 0 - for i, chunk in enumerate(chunks): - try: - vector = get_embedding(chunk) - points.append(PointStruct( - id=str(uuid.uuid4()), - vector=vector, - payload={ - "source": source_name, - "text": chunk, - "chunk_index": i, - "total_chunks": len(chunks) - } - )) - except Exception as e: - logging.error(f"Failed to embed chunk {i}: {e}") - continue - - if points: - qdrant.upsert(collection_name=notebook, points=points) - return f"Added {len(points)} chunks from '{source_name}' to '{notebook}'." - return "No content added (empty or failed)." + try: + with get_db() as conn: + for i, chunk in enumerate(chunks): + try: + vector = get_embedding(chunk) + meta = { + "chunk_index": i, + "total_chunks": len(chunks) + } + + conn.execute(""" + INSERT INTO chunks (notebook, content, embedding, source, metadata) + VALUES (%s, %s, %s, %s, %s) + """, (clean_name, chunk, vector, source_name, json.dumps(meta))) + count += 1 + except Exception as e: + logging.error(f"Failed to process chunk {i}: {e}") + continue + return f"Added {count} chunks from '{source_name}' to '{clean_name}'." + + except Exception as e: + return f"Failed to add source: {e}" @mcp.tool() def query_notebook(notebook: str, query: str, limit: int = 5) -> str: """Query the notebook for relevant context.""" - if not qdrant.collection_exists(notebook): - return f"Error: Notebook '{notebook}' does not exist." - + clean_name = notebook.lower().replace(" ", "-") + try: vector = get_embedding(query) - hits = qdrant.search( - collection_name=notebook, - query_vector=vector, - limit=limit - ) + + with get_db() as conn: + # Check notebook + if not conn.execute("SELECT 1 FROM notebooks WHERE name = %s", (clean_name,)).fetchone(): + return f"Error: Notebook '{clean_name}' does not exist." - results = [] - for hit in hits: - score = hit.score - text = hit.payload.get('text', '').replace('\n', ' ') - source = hit.payload.get('source', 'unknown') - results.append(f"[{score:.2f}] {source}: {text}...") + # Cosine distance (<=>) sort ASC (closest first) + results = conn.execute(""" + SELECT content, source, (1 - (embedding <=> %s::vector)) as score + FROM chunks + WHERE notebook = %s + ORDER BY embedding <=> %s::vector ASC + LIMIT %s + """, (vector, clean_name, vector, limit)).fetchall() + + output = [] + for row in results: + content, source, score = row + output.append(f"[{score:.2f}] {source}: {content.replace(chr(10), ' ')}...") + + if not output: + return "No relevant matches found." + + return "\n".join(output) - return "\n".join(results) except Exception as e: return f"Query failed: {e}"