Custom Store Adapter
Build your own adapter for different schemas, databases, or query patterns.
The shipped adapters cover the common case of Postgres with pgvector, but your requirements might be different. Maybe you need additional filtering capabilities, a different schema structure, support for a different vector database, or custom query logic. Building a custom store adapter is straightforward because the interface is intentionally minimal.
The VectorStore interface
A store adapter is an object with two methods:
import type { VectorStore, Chunk } from "@unrag/core/types";
export const myStore: VectorStore = {
upsert: async (chunks: Chunk[]) => {
// Write documents, chunks, and embeddings to your database
},
query: async ({ embedding, topK, scope }) => {
// Find and return the most similar chunks
return []; // Array<Chunk & { score: number }>
},
};That's the entire contract. UnRAG calls upsert() during ingestion and query() during retrieval. Everything else—how you connect to your database, how you structure your tables, what distance function you use—is up to you.
Understanding the Chunk type
The Chunk type that flows through the system looks like this:
type Chunk = {
id: string; // UUID for this chunk
documentId: string; // UUID of the parent document
sourceId: string; // Your logical identifier (e.g., "docs:getting-started")
index: number; // Position in the original document (0, 1, 2, ...)
content: string; // The chunk's text
tokenCount: number; // Approximate token count
metadata: Metadata; // JSON metadata from ingestion
embedding?: number[]; // The vector (present during upsert)
documentContent?: string; // Full original document text
};During upsert(), chunks arrive with their embeddings attached. During query(), you return chunks with a score field added.
Example: Adding tenant filtering
Let's say you need to filter by tenant ID stored in metadata, not just by sourceId prefix. Here's how you might modify the Raw SQL adapter:
import type { Pool } from "pg";
import type { Chunk, VectorStore } from "@unrag/core/types";
type ExtendedScope = {
sourceId?: string;
tenantId?: string; // New: filter by tenant
};
export const createTenantAwareStore = (pool: Pool): VectorStore => ({
upsert: async (chunks) => {
// Same as the standard adapter - insert documents, chunks, embeddings
// The tenant ID is already in metadata, so no changes needed here
if (chunks.length === 0) return;
const client = await pool.connect();
try {
await client.query("BEGIN");
const head = chunks[0];
await client.query(`
INSERT INTO documents (id, source_id, content, metadata)
VALUES ($1, $2, $3, $4::jsonb)
ON CONFLICT (id) DO UPDATE SET
source_id = excluded.source_id,
content = excluded.content,
metadata = excluded.metadata
`, [
head.documentId,
head.sourceId,
head.documentContent ?? "",
JSON.stringify(head.metadata)
]);
for (const chunk of chunks) {
await client.query(`
INSERT INTO chunks (id, document_id, source_id, idx, content, token_count, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
ON CONFLICT (id) DO UPDATE SET
content = excluded.content,
token_count = excluded.token_count,
metadata = excluded.metadata
`, [
chunk.id,
chunk.documentId,
chunk.sourceId,
chunk.index,
chunk.content,
chunk.tokenCount,
JSON.stringify(chunk.metadata)
]);
if (chunk.embedding) {
const vectorLiteral = `[${chunk.embedding.join(",")}]`;
await client.query(`
INSERT INTO embeddings (chunk_id, embedding, embedding_dimension)
VALUES ($1, $2::vector, $3)
ON CONFLICT (chunk_id) DO UPDATE SET
embedding = excluded.embedding,
embedding_dimension = excluded.embedding_dimension
`, [chunk.id, vectorLiteral, chunk.embedding.length]);
}
}
await client.query("COMMIT");
} catch (err) {
await client.query("ROLLBACK");
throw err;
} finally {
client.release();
}
},
query: async ({ embedding, topK, scope = {} }) => {
const extendedScope = scope as ExtendedScope;
const vectorLiteral = `[${embedding.join(",")}]`;
// Build WHERE clause dynamically
const conditions: string[] = [];
const values: unknown[] = [vectorLiteral, topK];
let paramIndex = 3;
if (extendedScope.sourceId) {
conditions.push(`c.source_id LIKE $${paramIndex}`);
values.push(extendedScope.sourceId + '%');
paramIndex++;
}
// New: tenant filtering via metadata JSONB
if (extendedScope.tenantId) {
conditions.push(`c.metadata->>'tenantId' = $${paramIndex}`);
values.push(extendedScope.tenantId);
paramIndex++;
}
const whereClause = conditions.length > 0
? `WHERE ${conditions.join(" AND ")}`
: "";
const res = await pool.query(`
SELECT
c.id, c.document_id, c.source_id, c.idx,
c.content, c.token_count, c.metadata,
(e.embedding <=> $1::vector) as score
FROM chunks c
JOIN embeddings e ON e.chunk_id = c.id
${whereClause}
ORDER BY score ASC
LIMIT $2
`, values);
return res.rows.map((row) => ({
id: String(row.id),
documentId: String(row.document_id),
sourceId: String(row.source_id),
index: Number(row.idx),
content: String(row.content),
tokenCount: Number(row.token_count),
metadata: row.metadata ?? {},
score: Number(row.score),
}));
},
});Now you can retrieve with tenant filtering:
const result = await engine.retrieve({
query: "how do I reset my password?",
topK: 10,
scope: { tenantId: "acme-corp" } as any, // Cast needed for extended scope
});Example: Different distance function
The default adapters use cosine distance (<=>). If you want Euclidean distance (<->), modify the query:
query: async ({ embedding, topK, scope }) => {
const vectorLiteral = `[${embedding.join(",")}]`;
const res = await pool.query(`
SELECT c.*, (e.embedding <-> $1::vector) as score
FROM chunks c
JOIN embeddings e ON e.chunk_id = c.id
ORDER BY score ASC
LIMIT $2
`, [vectorLiteral, topK]);
// ... rest of mapping
};Or inner product (<#>) for normalized vectors:
// Note: inner product returns negative values, with more negative = more similar
// You might want to negate for a more intuitive score
(e.embedding <#> $1::vector) as scoreExample: Alternative schema
Maybe you want a simpler schema without separate embeddings table:
CREATE TABLE documents (
id UUID PRIMARY KEY,
source_id TEXT NOT NULL,
content TEXT NOT NULL,
embedding VECTOR(1536), -- Embed the whole document
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
);Your adapter would skip chunking entirely:
export const simpleStore: VectorStore = {
upsert: async (chunks) => {
// Ignore chunking, store document-level embedding
const head = chunks[0];
const docEmbedding = head.embedding; // Use first chunk's embedding
await pool.query(`
INSERT INTO documents (id, source_id, content, embedding, metadata)
VALUES ($1, $2, $3, $4::vector, $5::jsonb)
ON CONFLICT (id) DO UPDATE SET ...
`, [head.documentId, head.sourceId, head.documentContent, vectorLiteral, metadata]);
},
query: async ({ embedding, topK }) => {
const res = await pool.query(`
SELECT *, (embedding <=> $1::vector) as score
FROM documents
ORDER BY score ASC
LIMIT $2
`, [`[${embedding.join(",")}]`, topK]);
// Map rows to Chunk format for compatibility
return res.rows.map((row) => ({
id: row.id,
documentId: row.id,
sourceId: row.source_id,
index: 0,
content: row.content,
tokenCount: 0,
metadata: row.metadata,
score: row.score,
}));
},
};This loses the precision benefits of chunking but simplifies the schema significantly.
Example: Non-Postgres vector stores
You can adapt UnRAG to work with other vector databases. Here's a sketch for Pinecone:
import { Pinecone } from "@pinecone-database/pinecone";
import type { VectorStore, Chunk } from "@unrag/core/types";
const pinecone = new Pinecone();
const index = pinecone.index("my-index");
export const pineconeStore: VectorStore = {
upsert: async (chunks) => {
const vectors = chunks.map((chunk) => ({
id: chunk.id,
values: chunk.embedding!,
metadata: {
documentId: chunk.documentId,
sourceId: chunk.sourceId,
content: chunk.content,
...chunk.metadata,
},
}));
await index.upsert(vectors);
},
query: async ({ embedding, topK, scope }) => {
const filter = scope?.sourceId
? { sourceId: { $startsWith: scope.sourceId } }
: undefined;
const results = await index.query({
vector: embedding,
topK,
filter,
includeMetadata: true,
});
return results.matches.map((match) => ({
id: match.id,
documentId: match.metadata?.documentId as string,
sourceId: match.metadata?.sourceId as string,
index: 0,
content: match.metadata?.content as string,
tokenCount: 0,
metadata: match.metadata ?? {},
score: match.score ?? 0,
}));
},
};The exact implementation depends on your chosen database's API, but the pattern is the same: map UnRAG's simple interface to whatever your storage requires.
Testing your adapter
Before deploying a custom adapter, test both methods thoroughly:
import { describe, it, expect } from "vitest";
import { myCustomStore } from "./my-store";
describe("custom store", () => {
it("upserts and queries chunks", async () => {
const testChunk = {
id: "test-chunk-1",
documentId: "test-doc-1",
sourceId: "test:doc",
index: 0,
content: "This is test content",
tokenCount: 4,
metadata: { test: true },
embedding: [0.1, 0.2, 0.3, /* ... */],
documentContent: "This is test content",
};
await myCustomStore.upsert([testChunk]);
const results = await myCustomStore.query({
embedding: [0.1, 0.2, 0.3, /* ... */],
topK: 5,
scope: { sourceId: "test:" },
});
expect(results.length).toBeGreaterThan(0);
expect(results[0].content).toBe("This is test content");
});
});The adapter is vendored code, so you can test it alongside your application code using your normal test setup.