Connector Contract
Understanding the event-stream connector model and building custom connectors.
All Unrag connectors follow a simple contract: they produce a stream of events that the engine consumes and applies. This page explains how the contract works, what event types are available, and how to build your own connectors.
Why a streaming contract?
Connectors could have been simple functions that return results. Instead, they're async generators that yield events over time. This design choice enables several important capabilities:
Incremental progress. When syncing hundreds of pages or files, you don't want to wait until the end to know if something went wrong. The streaming model lets you see progress as it happens—each document is processed independently.
Checkpointing. For long-running syncs, especially in serverless environments with timeout limits, you need a way to resume where you left off. Connectors emit checkpoint events that you can persist. If a sync times out, the next invocation picks up from the last checkpoint.
Backpressure. The engine processes events one at a time (sequentially). If a downstream system is slow—your database, the embedding API—the connector naturally pauses while waiting for the previous event to complete. This prevents overwhelming external services.
Observability. Every event is visible to your code via callbacks. You can log progress, update a UI, send metrics, or decide to abort early based on the events you're seeing.
The event types
Connectors yield events that fall into five categories. Here's what each one does:
upsert
An upsert event tells the engine to ingest a document. It contains the same payload you'd pass to engine.ingest() directly—sourceId, content, metadata, and assets.
yield {
type: "upsert",
input: {
sourceId: "notion:page:abc123",
content: "Page title\n\nPage content here...",
metadata: { connector: "notion", kind: "page", pageId: "abc123" },
assets: [],
},
};When the runner receives this event, it calls engine.ingest(event.input) and waits for completion before processing the next event.
delete
A delete event tells the engine to remove a document. This is used when deleteOnNotFound is enabled and a source item (page, file) is no longer accessible.
yield {
type: "delete",
input: { sourceId: "notion:page:abc123" },
};The runner calls engine.delete(event.input). You can delete by exact sourceId or by sourceId prefix.
progress
A progress event signals that something is happening. It doesn't cause any action—it's purely informational. Use it for logging, updating a progress bar, or tracking which items are being processed.
yield {
type: "progress",
message: "page:start",
current: 5,
total: 100,
sourceId: "notion:page:abc123",
entityId: "abc123",
};The fields are all optional. The message field typically follows a convention like "page:start", "page:success", "file:start", etc., but you can use any string that makes sense for your connector.
warning
A warning event indicates that something went wrong, but the sync can continue. Common examples: a page wasn't found, a file was too large, or an API returned an unexpected error for one item.
yield {
type: "warning",
code: "page_not_found",
message: "Notion page not found or inaccessible.",
data: { pageId: "abc123", sourceId: "notion:page:abc123" },
};Warnings don't stop the sync. The runner counts them and includes the count in the final result. If you want to track individual warnings, use the onEvent callback:
const warnings: string[] = [];
await engine.runConnectorStream({
stream,
onEvent: (event) => {
if (event.type === "warning") {
warnings.push(`[${event.code}] ${event.message}`);
}
},
});checkpoint
A checkpoint event provides a resumption point. The payload is an opaque, JSON-serializable object that means something to the connector—typically containing an index or cursor.
yield {
type: "checkpoint",
checkpoint: { index: 5, pageId: "abc123" },
};The runner makes the checkpoint available via onCheckpoint, and includes the last checkpoint in the final result. Persist checkpoints to your database so you can pass them back when resuming:
// Save checkpoints as they arrive
await engine.runConnectorStream({
stream,
onCheckpoint: async (checkpoint) => {
await db.checkpoints.upsert({
where: { tenantId },
data: { checkpoint: JSON.stringify(checkpoint) },
});
},
});
// Later, resume from the saved checkpoint
const saved = await db.checkpoints.findUnique({ where: { tenantId } });
const checkpoint = saved ? JSON.parse(saved.checkpoint) : undefined;
const stream = notionConnector.streamPages({
token: process.env.NOTION_TOKEN!,
pageIds,
checkpoint,
});Consuming connector streams
The recommended way to consume a connector stream is via engine.runConnectorStream(...). This method handles all event types and returns a summary:
const result = await engine.runConnectorStream({
stream,
onEvent: (event) => {
// Called for every event
},
onCheckpoint: async (checkpoint) => {
// Called specifically for checkpoint events
await saveCheckpoint(checkpoint);
},
signal: abortController.signal, // Optional: abort early
});
console.log(result);
// { upserts: 47, deletes: 2, warnings: 3, lastCheckpoint: { index: 50 } }The runner processes events sequentially, preserving order. If you need to abort early (timeout, user cancellation), pass an AbortSignal and the runner will stop after the current event completes.
Building a custom connector
If you need to ingest from a source we don't support, you can build a connector that follows the same pattern. A connector is just an async generator function that yields events.
Here's a minimal example:
import type { ConnectorStream } from "@unrag/core";
type MySourceCheckpoint = {
index: number;
itemId?: string;
};
type MySourceInput = {
apiKey: string;
itemIds: string[];
sourceIdPrefix?: string;
checkpoint?: MySourceCheckpoint;
};
async function* streamMySourceItems(
input: MySourceInput
): ConnectorStream<MySourceCheckpoint> {
const { apiKey, itemIds, sourceIdPrefix, checkpoint } = input;
const startIndex = checkpoint?.index ?? 0;
for (let i = startIndex; i < itemIds.length; i++) {
const itemId = itemIds[i];
const sourceId = sourceIdPrefix
? `${sourceIdPrefix}my-source:item:${itemId}`
: `my-source:item:${itemId}`;
// Progress: starting
yield {
type: "progress",
message: "item:start",
current: i + 1,
total: itemIds.length,
sourceId,
entityId: itemId,
};
try {
// Fetch the item from your source
const item = await fetchItemFromMySource(apiKey, itemId);
// Upsert the document
yield {
type: "upsert",
input: {
sourceId,
content: item.content,
metadata: {
connector: "my-source",
kind: "item",
itemId,
title: item.title,
updatedAt: item.updatedAt,
},
assets: item.assets ?? [],
},
};
// Progress: success
yield {
type: "progress",
message: "item:success",
current: i + 1,
total: itemIds.length,
sourceId,
entityId: itemId,
};
} catch (err) {
// Warning: failed but continue
yield {
type: "warning",
code: "item_error",
message: err instanceof Error ? err.message : String(err),
data: { itemId, sourceId },
};
}
// Checkpoint: save progress
yield {
type: "checkpoint",
checkpoint: { index: i + 1, itemId },
};
}
}Then use it like the built-in connectors:
const stream = streamMySourceItems({
apiKey: process.env.MY_SOURCE_API_KEY!,
itemIds: ["item1", "item2", "item3"],
sourceIdPrefix: "tenant:acme:",
});
const result = await engine.runConnectorStream({ stream });Best practices for custom connectors
Yield checkpoints after each item. This enables fine-grained resumption. If you checkpoint only every 10 items, a timeout at item 15 would restart from item 10.
Use consistent sourceId schemes. Pick a pattern like <prefix><connector>:<kind>:<id> and stick with it. This makes scoped retrieval and deletion predictable.
Emit progress events liberally. They cost nothing and make debugging much easier. Include current and total when you know them.
Keep checkpoints small and JSON-serializable. They'll be persisted to databases and passed between function invocations. Don't include large objects or non-serializable data.
Handle errors at the item level. Yield warnings for individual failures rather than throwing exceptions that abort the entire sync. Reserve exceptions for truly unrecoverable situations (invalid credentials, network failure).
Test with small batches first. Run your connector on 2-3 items before scaling up. This catches schema mismatches, credential issues, and unexpected API behavior early.
TypeScript types
The core types are exported from @unrag/core:
import type {
ConnectorEvent,
ConnectorStream,
RunConnectorStreamOptions,
RunConnectorStreamResult,
} from "@unrag/core";ConnectorEvent<TCheckpoint> is a union of all five event types. The TCheckpoint generic allows you to type your checkpoint payload.
ConnectorStream<TCheckpoint> is AsyncIterable<ConnectorEvent<TCheckpoint>>—what your generator function should return.
RunConnectorStreamOptions<TCheckpoint> is the options object for engine.runConnectorStream(...).
RunConnectorStreamResult<TCheckpoint> is the return type: { upserts, deletes, warnings, lastCheckpoint? }.
Example: GitHub connector sketch
Here's a sketch of what a GitHub connector might look like. This isn't production-ready, but shows the pattern:
import type { ConnectorStream } from "@unrag/core";
import { Octokit } from "@octokit/rest";
type GitHubCheckpoint = {
index: number;
path?: string;
};
type GitHubConnectorInput = {
token: string;
owner: string;
repo: string;
paths?: string[]; // specific files, or all if omitted
branch?: string;
sourceIdPrefix?: string;
checkpoint?: GitHubCheckpoint;
};
async function* streamGitHubFiles(
input: GitHubConnectorInput
): ConnectorStream<GitHubCheckpoint> {
const octokit = new Octokit({ auth: input.token });
const branch = input.branch ?? "main";
// Get file list (simplified: would need recursive tree traversal for full repo)
const paths = input.paths ?? (await listRepoFiles(octokit, input.owner, input.repo, branch));
const startIndex = input.checkpoint?.index ?? 0;
for (let i = startIndex; i < paths.length; i++) {
const path = paths[i];
const sourceId = input.sourceIdPrefix
? `${input.sourceIdPrefix}github:${input.owner}/${input.repo}:${path}`
: `github:${input.owner}/${input.repo}:${path}`;
yield { type: "progress", message: "file:start", current: i + 1, total: paths.length, sourceId };
try {
const { data } = await octokit.repos.getContent({
owner: input.owner,
repo: input.repo,
path,
ref: branch,
});
if ("content" in data && data.type === "file") {
const content = Buffer.from(data.content, "base64").toString("utf-8");
yield {
type: "upsert",
input: {
sourceId,
content,
metadata: {
connector: "github",
kind: "file",
owner: input.owner,
repo: input.repo,
path,
sha: data.sha,
},
},
};
yield { type: "progress", message: "file:success", current: i + 1, total: paths.length, sourceId };
} else {
yield { type: "warning", code: "not_a_file", message: `${path} is not a file`, data: { path } };
}
} catch (err) {
yield {
type: "warning",
code: "file_error",
message: err instanceof Error ? err.message : String(err),
data: { path },
};
}
yield { type: "checkpoint", checkpoint: { index: i + 1, path } };
}
}This sketch shows the key patterns: iterating through items, yielding progress/upsert/warning/checkpoint events, and using consistent sourceId formatting. A production connector would add more sophistication—handling rate limits, supporting different file types as assets, recursive directory traversal, etc.
