API
Method reference for the vendored Notion connector module.
The connector ships as vendored code inside your Unrag install directory at <installDir>/connectors/notion/**. In application code you typically import from your alias base:
import { notionConnector } from "@unrag/connectors/notion";Primary API
notionConnector.streamPages(input)
This is the main entry point for syncing Notion pages. It returns an async iterable that yields connector events—upserts, warnings, progress updates, and checkpoints. You consume this stream via engine.runConnectorStream(...).
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds: ["b5f3e3e9c6ea4ce5a1c3e0d6a9d2f1ab"],
});
const result = await engine.runConnectorStream({ stream });The runner applies each event to your engine and returns a summary:
Prop
Type
streamPages input
Prop
Type
sourceIdPrefix prepends a namespace to every sourceId. This is useful for multi-tenant apps where you want to partition content by tenant:
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds: ["b5f3e3e9c6ea4ce5a1c3e0d6a9d2f1ab"],
sourceIdPrefix: `tenant:${tenantId}:`,
});
await engine.runConnectorStream({ stream });With a prefix, the resulting source IDs look like tenant:acme:notion:page:<pageId>. You can then retrieve with scope: { sourceId: "tenant:acme:" } to search only that tenant's content.
deleteOnNotFound tells the connector to emit a delete event if a page is not found or inaccessible. This is useful when you keep a static list of page IDs and want your index to reflect reality after permissions change or a page is deleted:
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds: ["b5f3e3e9c6ea4ce5a1c3e0d6a9d2f1ab"],
deleteOnNotFound: true,
});
await engine.runConnectorStream({ stream });Consuming the stream
The recommended way to consume a connector stream is via engine.runConnectorStream(...), which handles all event types automatically:
const result = await engine.runConnectorStream({
stream,
onEvent: (event) => {
// Called for every event (progress, warning, upsert, delete, checkpoint)
console.log(event.type, event);
},
onCheckpoint: async (checkpoint) => {
// Called specifically for checkpoint events
await persistCheckpoint(checkpoint);
},
signal: abortController.signal, // Optional: abort early
});Prop
Type
loadNotionPageDocument(args)
This lower-level helper loads a single page and returns a normalized document shape with sourceId, content, metadata, and assets. Use it when you want to add custom metadata, control chunking, or decide exactly how ingestion happens (batching, retries, conditional writes).
Prop
Type
Notion file URLs are signed and can expire. For production ingestion (especially with PDF extraction), prefer a background job pattern so you can retry safely. See the Next.js Production Recipe.
Here's an example that adds custom metadata before ingesting:
import { createUnragEngine } from "@unrag/config";
import { createNotionClient, loadNotionPageDocument } from "@unrag/connectors/notion";
export async function ingestWithCustomMetadata() {
const engine = createUnragEngine();
const notion = createNotionClient({ token: process.env.NOTION_TOKEN! });
const doc = await loadNotionPageDocument({
notion,
pageIdOrUrl: "b5f3e3e9c6ea4ce5a1c3e0d6a9d2f1ab",
maxDepth: 6,
sourceIdPrefix: "docs:",
});
const result = await engine.ingest({
sourceId: doc.sourceId,
content: doc.content,
assets: doc.assets,
metadata: {
...doc.metadata,
importedBy: "notion-sync",
visibility: "internal",
},
chunking: { chunkSize: 300, chunkOverlap: 50 },
});
// If assets are skipped (unsupported kinds, PDF extraction disabled, etc.),
// Unrag emits structured warnings so you don't silently miss content.
if (result.warnings.length > 0) {
console.warn("unrag ingest warnings", result.warnings);
}
}The maxDepth parameter controls how deeply the connector recurses into nested blocks. The default is conservative to keep sync fast; increase it if your pages have heavily nested content.
Listing accessible pages
The connector doesn't include a built-in "list all pages" helper because Notion integrations can only access pages that have been explicitly shared with them. However, you can use the underlying Notion client's Search API to discover all accessible pages.
Listing all pages shared with your integration
Use createNotionClient() and call the Notion SDK's search method with pagination:
import { createNotionClient } from "@unrag/connectors/notion";
export async function listAccessiblePages() {
const notion = createNotionClient({ token: process.env.NOTION_TOKEN! });
const pages: any[] = [];
let cursor: string | undefined;
while (true) {
const res = await notion.search({
start_cursor: cursor,
page_size: 100,
filter: { property: "object", value: "page" },
});
pages.push(...res.results);
if (!res.has_more) break;
cursor = res.next_cursor ?? undefined;
}
return pages;
}Each result includes id, url, properties, last_edited_time, and other Notion metadata. See the Page object reference for the full schema.
Syncing all accessible pages
Combine the listing with the connector stream to ingest everything your integration can see:
import { createUnragEngine } from "@unrag/config";
import { createNotionClient, notionConnector } from "@unrag/connectors/notion";
export async function syncAllAccessiblePages() {
const engine = createUnragEngine();
const notion = createNotionClient({ token: process.env.NOTION_TOKEN! });
// 1. Discover all accessible pages
const pages: any[] = [];
let cursor: string | undefined;
while (true) {
const res = await notion.search({
start_cursor: cursor,
page_size: 100,
filter: { property: "object", value: "page" },
});
pages.push(...res.results);
if (!res.has_more) break;
cursor = res.next_cursor ?? undefined;
}
const pageIds = pages.map((p) => p.id);
console.log(`Found ${pageIds.length} accessible pages`);
// 2. Stream sync them all
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds,
});
return await engine.runConnectorStream({
stream,
onEvent: (event) => {
if (event.type === "progress" && event.message === "page:success") {
console.log(`✓ Synced ${event.sourceId}`);
}
},
});
}Listing pages from a specific data source
If you want pages from a particular Notion data source rather than all accessible pages, use dataSources.query:
import { createNotionClient } from "@unrag/connectors/notion";
export async function listPagesFromDataSource(dataSourceId: string) {
const notion = createNotionClient({ token: process.env.NOTION_TOKEN! });
const pages: any[] = [];
let cursor: string | undefined;
while (true) {
const res = await notion.dataSources.query({
data_source_id: dataSourceId,
start_cursor: cursor,
page_size: 100,
});
pages.push(...res.results);
if (!res.has_more) break;
cursor = res.next_cursor ?? undefined;
}
return pages;
}This is useful when your content lives in a structured database (e.g., a "Docs" or "Knowledge Base" database) and you want to sync only those entries.
Utilities
createNotionClient({ token, timeoutMs? })
Creates a Notion API client using the official @notionhq/client SDK. The returned client exposes the full Notion API, so you can call any endpoint directly (e.g., notion.pages.retrieve(), notion.blocks.children.list()). Most users don't need this unless they want to build custom fetch logic on top of the connector.
ID helpers
The connector accepts both raw page IDs and URLs. If you want to normalize IDs yourself, two helpers are available:
normalizeNotionPageId32(pageIdOrUrl) extracts and normalizes a page ID to 32-hex form.
toUuidHyphenated(id32) converts the 32-hex form to hyphenated UUID format, which is what the Notion API expects.
Rendering
renderNotionBlocksToText(nodes) converts a Notion block tree into the text representation used for ingestion. The v1 renderer supports common block types (paragraphs, headings, lists, todos, quotes, callouts, code, dividers). Unsupported blocks are skipped.
Because the connector is vendored, you can extend the renderer if your team depends on specific block types. Open lib/unrag/connectors/notion/render.ts and add cases for the blocks you need.
Stable source IDs
The connector uses a stable scheme for sourceId values:
- Without a prefix:
notion:page:<pageId> - With
sourceIdPrefix:<prefix>notion:page:<pageId>(prefix is normalized to include a trailing:)
This enables safe re-runs (idempotent ingest), scoped retrieval via scope.sourceId prefixes, and deletion per page or per tenant namespace.
Examples
The examples below cover common integration patterns. They assume you've already set up your Notion integration and have NOTION_TOKEN available as an environment variable.
Logging progress with onEvent
The onEvent callback fires for each event as the stream progresses. This is useful for logging, progress indicators, or instrumenting failures:
import { createUnragEngine } from "@unrag/config";
import { notionConnector } from "@unrag/connectors/notion";
const engine = createUnragEngine();
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds: [
"b5f3e3e9c6ea4ce5a1c3e0d6a9d2f1ab",
"a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6",
"11112222333344445555666677778888",
],
});
const result = await engine.runConnectorStream({
stream,
onEvent: (event) => {
if (event.type === "progress" && event.message === "page:success") {
console.log(`✓ Synced ${event.sourceId}`);
} else if (event.type === "warning" && event.code === "page_not_found") {
console.warn(`⊘ Page not found: ${event.data?.pageId}`);
} else if (event.type === "warning") {
console.error(`✗ Failed: ${event.message}`);
}
},
});
console.log(`Done: ${result.upserts} succeeded, ${result.warnings} warnings`);Inspecting results and handling warnings
The runner returns a summary. Use it to detect issues and decide whether to retry or alert:
import { createUnragEngine } from "@unrag/config";
import { notionConnector } from "@unrag/connectors/notion";
const engine = createUnragEngine();
const warnings: Array<{ code: string; message: string; data?: unknown }> = [];
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds: getPageIdsFromConfig(),
});
const result = await engine.runConnectorStream({
stream,
onEvent: (event) => {
if (event.type === "warning") {
warnings.push({ code: event.code, message: event.message, data: event.data });
}
},
});
if (warnings.length > 0) {
console.error(`Sync completed with ${warnings.length} warnings:`);
for (const w of warnings) {
console.error(` - [${w.code}] ${w.message}`);
}
}
console.log(`Synced ${result.upserts} documents, deleted ${result.deletes}`);End-to-end: sync, retrieve, and use in a prompt
Here's a complete flow that syncs Notion content, then uses it to answer a question:
import { createUnragEngine } from "@unrag/config";
import { notionConnector } from "@unrag/connectors/notion";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";
const engine = createUnragEngine();
// 1. Sync your knowledge base pages
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds: [
"b5f3e3e9c6ea4ce5a1c3e0d6a9d2f1ab", // Product FAQ
"a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6", // Pricing docs
],
});
await engine.runConnectorStream({ stream });
// 2. Retrieve relevant chunks for a user question
const question = "What's the pricing for the Pro plan?";
const { chunks } = await engine.retrieve({
query: question,
topK: 5,
});
// 3. Build context and generate an answer
const context = chunks.map((c) => c.content).join("\n\n---\n\n");
const { text } = await generateText({
model: openai("gpt-4o"),
system: `Answer questions using only the provided context. If the answer isn't in the context, say so.`,
prompt: `Context:\n${context}\n\nQuestion: ${question}`,
});
console.log(text);Next.js server action
In a Next.js app, run sync from a server action. This keeps the token server-side and lets you trigger sync from an admin UI:
// app/actions/sync-notion.ts
"use server";
import { createUnragEngine } from "@/lib/unrag/config";
import { notionConnector } from "@/lib/unrag/connectors/notion";
export async function syncNotionAction(pageIds: string[]) {
const engine = createUnragEngine();
const warnings: string[] = [];
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds,
});
const result = await engine.runConnectorStream({
stream,
onEvent: (event) => {
if (event.type === "warning") {
warnings.push(`[${event.code}] ${event.message}`);
}
},
});
return {
upserts: result.upserts,
deletes: result.deletes,
warnings,
};
}Then call it from a client component:
// app/admin/sync-button.tsx
"use client";
import { syncNotionAction } from "@/app/actions/sync-notion";
export function SyncButton() {
const handleSync = async () => {
const result = await syncNotionAction([
"b5f3e3e9c6ea4ce5a1c3e0d6a9d2f1ab",
]);
if (result.warnings.length > 0) {
alert(`Sync had ${result.warnings.length} warnings`);
} else {
alert(`Synced ${result.upserts} pages`);
}
};
return <button onClick={handleSync}>Sync Notion</button>;
}Batch syncing with rate limit pauses
For larger page lists, batch your calls and add pauses to stay within Notion's rate limits:
import { createUnragEngine } from "@unrag/config";
import { notionConnector } from "@unrag/connectors/notion";
const allPageIds = getPageIdsFromDatabase(); // e.g., 200 pages
const BATCH_SIZE = 20;
const PAUSE_MS = 2000;
const engine = createUnragEngine();
let totalUpserts = 0;
let totalWarnings = 0;
for (let i = 0; i < allPageIds.length; i += BATCH_SIZE) {
const batch = allPageIds.slice(i, i + BATCH_SIZE);
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds: batch,
});
const result = await engine.runConnectorStream({ stream });
totalUpserts += result.upserts;
totalWarnings += result.warnings;
console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${result.upserts} synced, ${result.warnings} warnings`);
// Pause before next batch (skip on last batch)
if (i + BATCH_SIZE < allPageIds.length) {
await new Promise((r) => setTimeout(r, PAUSE_MS));
}
}
console.log(`Total: ${totalUpserts} synced, ${totalWarnings} warnings`);Wiping a namespace before re-sync
If you want a clean slate—ensuring that pages no longer in your list are removed from the index—wipe the namespace first:
import { createUnragEngine } from "@unrag/config";
import { notionConnector } from "@unrag/connectors/notion";
const engine = createUnragEngine();
const tenantId = "acme";
const prefix = `tenant:${tenantId}:`;
// 1. Wipe all existing content for this tenant
await engine.delete({ sourceIdPrefix: prefix });
// 2. Re-sync the current set of pages
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds: getCurrentPageIdsForTenant(tenantId),
sourceIdPrefix: prefix,
});
await engine.runConnectorStream({ stream });This pattern is useful when tenants can remove pages from their list—without the wipe, those old pages would remain in the index.
Scheduled sync with a cron job
For content that changes over time, run sync on a schedule. Here's a simple Node script you can trigger with cron or a job runner:
// scripts/sync-notion-cron.ts
import { createUnragEngine } from "../lib/unrag/config";
import { notionConnector } from "../lib/unrag/connectors/notion";
async function main() {
console.log(`[${new Date().toISOString()}] Starting Notion sync...`);
const engine = createUnragEngine();
const pageIds = await fetchPageIdsFromConfig();
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds,
deleteOnNotFound: true, // Clean up pages that were removed
});
const result = await engine.runConnectorStream({ stream });
console.log(`Completed: ${result.upserts} synced, ${result.deletes} deleted, ${result.warnings} warnings`);
if (result.warnings > 0) {
process.exitCode = 1; // Signal failure to cron/scheduler
}
}
main().catch((err) => {
console.error("Sync failed:", err);
process.exitCode = 1;
});Run it with cron (e.g., every night at 2 AM):
0 2 * * * cd /path/to/project && npx tsx scripts/sync-notion-cron.ts >> /var/log/notion-sync.log 2>&1Or use a job runner like BullMQ, Inngest, or Trigger.dev for more sophisticated scheduling and retry logic.
