refactor: migrate from qdrant to postgres+pgvector

This commit is contained in:
Clawdbot
2026-02-12 09:46:05 +11:00
parent f906453039
commit aab18d3a4f
5 changed files with 197 additions and 122 deletions

View File

@@ -23,8 +23,8 @@ spec:
- containerPort: 8000
name: http
env:
- name: QDRANT_URL
value: "http://qdrant.knowledge-mcp.svc:6333"
- name: DATABASE_URL
value: "postgresql://postgres:password@postgres.knowledge-mcp.svc:5432/knowledge"
- name: TEI_URL
value: "http://text-embeddings.tei.svc.cluster.local:8080"
resources:

66
manifests/postgres.yaml Normal file
View File

@@ -0,0 +1,66 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres
namespace: knowledge-mcp
labels:
app: postgres
spec:
serviceName: postgres
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: ankane/pgvector:v0.5.1
ports:
- containerPort: 5432
name: postgres
env:
- name: POSTGRES_USER
value: "postgres"
- name: POSTGRES_PASSWORD
value: "password"
- name: POSTGRES_DB
value: "knowledge"
- name: PGDATA
value: "/var/lib/postgresql/data/pgdata"
volumeMounts:
- name: storage
mountPath: /var/lib/postgresql/data
resources:
requests:
cpu: "100m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"
volumeClaimTemplates:
- metadata:
name: storage
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 5Gi
---
apiVersion: v1
kind: Service
metadata:
name: postgres
namespace: knowledge-mcp
labels:
app: postgres
spec:
ports:
- port: 5432
targetPort: 5432
name: postgres
selector:
app: postgres

View File

@@ -1,65 +0,0 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: qdrant
namespace: knowledge-mcp
labels:
app: qdrant
spec:
serviceName: qdrant
replicas: 1
selector:
matchLabels:
app: qdrant
template:
metadata:
labels:
app: qdrant
spec:
containers:
- name: qdrant
image: qdrant/qdrant:v1.13.0
ports:
- containerPort: 6333
name: http
- containerPort: 6334
name: grpc
env:
- name: QDRANT__STORAGE__STORAGE_PATH
value: /qdrant/storage
volumeMounts:
- name: storage
mountPath: /qdrant/storage
resources:
requests:
cpu: "200m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1Gi"
volumeClaimTemplates:
- metadata:
name: storage
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 10Gi
---
apiVersion: v1
kind: Service
metadata:
name: qdrant
namespace: knowledge-mcp
labels:
app: qdrant
spec:
ports:
- port: 6333
targetPort: 6333
name: http
- port: 6334
targetPort: 6334
name: grpc
selector:
app: qdrant

View File

@@ -1,6 +1,7 @@
mcp
httpx
qdrant-client
psycopg[binary]
pgvector
beautifulsoup4
pypdf
python-dotenv

181
server.py
View File

@@ -1,21 +1,75 @@
import os
import httpx
from mcp.server.fastmcp import FastMCP
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import psycopg
from pgvector.psycopg import register_vector
import uuid
import logging
import io
import json
from pypdf import PdfReader
from contextlib import contextmanager
# Configuration
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant.openshift-gitops.svc:6333")
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:password@postgres.knowledge-mcp.svc:5432/knowledge")
TEI_URL = os.getenv("TEI_URL", "http://text-embeddings.tei.svc.cluster.local:8080")
EMBEDDING_DIM = 768 # BAAI/bge-base-en-v1.5
# Initialize
mcp = FastMCP("knowledge-mcp")
qdrant = QdrantClient(url=QDRANT_URL)
@contextmanager
def get_db():
"""Provide a database connection."""
conn = psycopg.connect(DATABASE_URL, autocommit=True)
# Register vector type handler
register_vector(conn)
try:
yield conn
finally:
conn.close()
def init_db():
"""Initialize database schema."""
try:
with get_db() as conn:
conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
# Notebooks table (simple registry)
conn.execute("""
CREATE TABLE IF NOT EXISTS notebooks (
name TEXT PRIMARY KEY,
created_at TIMESTAMP DEFAULT NOW()
)
""")
# Chunks table
conn.execute(f"""
CREATE TABLE IF NOT EXISTS chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
notebook TEXT REFERENCES notebooks(name) ON DELETE CASCADE,
content TEXT NOT NULL,
embedding VECTOR({EMBEDDING_DIM}),
source TEXT,
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
)
""")
# Index for fast search
conn.execute("""
CREATE INDEX IF NOT EXISTS chunks_embedding_idx ON chunks
USING hnsw (embedding vector_cosine_ops)
""")
logging.info("Database initialized successfully.")
except Exception as e:
logging.error(f"Database initialization failed: {e}")
# Run init on import (or startup)
# In a real app, this might be a separate migration step, but for MCP self-contained:
try:
init_db()
except Exception as e:
logging.warning(f"Could not initialize DB immediately (might be waiting for connection): {e}")
def get_embedding(text: str) -> list[float]:
"""Get embedding from TEI."""
@@ -43,18 +97,20 @@ def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]
@mcp.tool()
def create_notebook(name: str) -> str:
"""Create a new RAG notebook (Qdrant collection)."""
"""Create a new RAG notebook."""
clean_name = name.lower().replace(" ", "-")
# Check if exists
if qdrant.collection_exists(clean_name):
return f"Notebook '{clean_name}' already exists."
qdrant.create_collection(
collection_name=clean_name,
vectors_config=VectorParams(size=EMBEDDING_DIM, distance=Distance.COSINE),
)
return f"Notebook '{clean_name}' created successfully."
try:
with get_db() as conn:
# Check existence
res = conn.execute("SELECT 1 FROM notebooks WHERE name = %s", (clean_name,)).fetchone()
if res:
return f"Notebook '{clean_name}' already exists."
conn.execute("INSERT INTO notebooks (name) VALUES (%s)", (clean_name,))
return f"Notebook '{clean_name}' created successfully."
except Exception as e:
return f"Error creating notebook: {e}"
@mcp.tool()
def add_source(notebook: str, content: str, source_name: str, format: str = "text") -> str:
@@ -62,8 +118,14 @@ def add_source(notebook: str, content: str, source_name: str, format: str = "tex
Add content to a notebook.
format: 'text' or 'pdf_path' (local path inside container)
"""
if not qdrant.collection_exists(notebook):
return f"Error: Notebook '{notebook}' does not exist."
clean_name = notebook.lower().replace(" ", "-")
try:
with get_db() as conn:
if not conn.execute("SELECT 1 FROM notebooks WHERE name = %s", (clean_name,)).fetchone():
return f"Error: Notebook '{clean_name}' does not exist."
except Exception as e:
return f"Database error: {e}"
text_to_process = ""
@@ -78,52 +140,63 @@ def add_source(notebook: str, content: str, source_name: str, format: str = "tex
text_to_process = content
chunks = chunk_text(text_to_process)
points = []
count = 0
for i, chunk in enumerate(chunks):
try:
vector = get_embedding(chunk)
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=vector,
payload={
"source": source_name,
"text": chunk,
"chunk_index": i,
"total_chunks": len(chunks)
}
))
except Exception as e:
logging.error(f"Failed to embed chunk {i}: {e}")
continue
if points:
qdrant.upsert(collection_name=notebook, points=points)
return f"Added {len(points)} chunks from '{source_name}' to '{notebook}'."
return "No content added (empty or failed)."
try:
with get_db() as conn:
for i, chunk in enumerate(chunks):
try:
vector = get_embedding(chunk)
meta = {
"chunk_index": i,
"total_chunks": len(chunks)
}
conn.execute("""
INSERT INTO chunks (notebook, content, embedding, source, metadata)
VALUES (%s, %s, %s, %s, %s)
""", (clean_name, chunk, vector, source_name, json.dumps(meta)))
count += 1
except Exception as e:
logging.error(f"Failed to process chunk {i}: {e}")
continue
return f"Added {count} chunks from '{source_name}' to '{clean_name}'."
except Exception as e:
return f"Failed to add source: {e}"
@mcp.tool()
def query_notebook(notebook: str, query: str, limit: int = 5) -> str:
"""Query the notebook for relevant context."""
if not qdrant.collection_exists(notebook):
return f"Error: Notebook '{notebook}' does not exist."
clean_name = notebook.lower().replace(" ", "-")
try:
vector = get_embedding(query)
hits = qdrant.search(
collection_name=notebook,
query_vector=vector,
limit=limit
)
with get_db() as conn:
# Check notebook
if not conn.execute("SELECT 1 FROM notebooks WHERE name = %s", (clean_name,)).fetchone():
return f"Error: Notebook '{clean_name}' does not exist."
results = []
for hit in hits:
score = hit.score
text = hit.payload.get('text', '').replace('\n', ' ')
source = hit.payload.get('source', 'unknown')
results.append(f"[{score:.2f}] {source}: {text}...")
# Cosine distance (<=>) sort ASC (closest first)
results = conn.execute("""
SELECT content, source, (1 - (embedding <=> %s::vector)) as score
FROM chunks
WHERE notebook = %s
ORDER BY embedding <=> %s::vector ASC
LIMIT %s
""", (vector, clean_name, vector, limit)).fetchall()
output = []
for row in results:
content, source, score = row
output.append(f"[{score:.2f}] {source}: {content.replace(chr(10), ' ')}...")
if not output:
return "No relevant matches found."
return "\n".join(output)
return "\n".join(results)
except Exception as e:
return f"Query failed: {e}"