Connect to external services and sync data into Opbox tables. Multiple connectors for CRM, accounting, project management, scheduling, document signing, meetings, support, messaging, registry, and development services with scheduled syncs, webhook triggers, concurrent execution, variant routing, and dynamic progress notifications.
Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/pipelines/connector-types | List available connector types |
| GET | /api/pipelines/connections | List connections in the active workspace |
| POST | /api/pipelines/connections | Create a connection |
| GET | /api/pipelines/connections/:id | Get connection details |
| PUT | /api/pipelines/connections/:id | Update a connection |
| PATCH | /api/pipelines/connections/:id | Reset sync cursor |
| DELETE | /api/pipelines/connections/:id | Delete a connection |
| POST | /api/pipelines/connections/test | Test connection credentials |
| POST | /api/pipelines/connections/:id/check | Check connection health |
| POST | /api/pipelines/connections/:id/discover | Discover available data streams |
| GET | /api/pipelines/connections/:id/runs | List sync runs |
| POST | /api/pipelines/connections/:id/runs | Trigger a sync run |
| POST | /api/pipelines/connections/:id/runs/:runId | Stop a running sync |
| POST | /api/pipelines/connections/:id/variants | Create a variant connection |
| POST | /api/pipelines/connections/:id/webhook-url | Reveal webhook URL (audit-logged) |
| GET | /api/pipelines/cron | Trigger scheduled syncs (cron endpoint) |
| POST | /api/pipelines/run-all | Trigger sync for all enabled connections |
| POST | /api/pipelines/oauth/:connector/authorize | Initiate OAuth flow (creates connection, returns authorize URL) |
| GET | /api/pipelines/oauth/:connector/callback | OAuth callback (validates state, exchanges code for tokens) |
| POST | /api/pipelines/oauth/:connector/exchange | Exchange authorization code for tokens (manual code paste flow) |
Connector Types
List all available connector types and their required credentials.
GET /api/pipelines/connector-types
Available Connectors
| Type | Name | Category |
|---|---|---|
CRM_PLATFORM | CRM Platform | CRM |
ACCOUNTING | Accounting Software | Accounting |
PROJECT_MGMT | Project Management | Project Management |
SCHEDULING | Scheduling Platform | Scheduling |
DOC_SIGNING | Document Signing | Documents |
MEETINGS | Meeting Transcription | Meetings |
SUPPORT | Customer Support | Support |
MESSAGING | WhatsApp Messaging | |
REGISTRY | Registry Service | Registry |
Create a Connection
Creates a new pipeline connection with encrypted credentials. Requires ADMIN or OWNER role.
POST /api/pipelines/connections
Content-Type: application/json
{
"name": "Production CRM",
"type": "CRM_PLATFORM",
"credentials": {
"apiKey": "your-api-key-here"
},
"schedule": {
"enabled": true,
"frequency": "daily"
},
"notifyUserIds": ["cm...user1", "cm...user2"]
}
Body Parameters
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Connection display name |
type | string | Yes | Connector type (see table above) |
credentials | object | Yes | Service-specific credentials (encrypted at rest) |
config | object | No | Custom configuration |
schedule | object | No | Auto-sync schedule: hourly, daily, weekly, or monthly |
notifyUserIds | string[] | No | User IDs to notify on sync completion or failure. If empty, all org members are notified. |
Test Connection
Validates credentials without saving the connection.
POST /api/pipelines/connections/test
Content-Type: application/json
{
"type": "CRM_PLATFORM",
"credentials": { "apiKey": "your-api-key-here" }
}
{ "success": true, "message": "Connection successful" }
Sync Operations
Trigger data syncs, discover available data streams, and monitor run history.
Discover Streams
POST /api/pipelines/connections/:id/discover
{
"streams": [
{ "name": "contacts", "fields": [], "supportedSyncModes": ["full_refresh"] },
{ "name": "deals", "fields": [], "supportedSyncModes": ["full_refresh"] }
]
}
Trigger Sync
POST /api/pipelines/connections/:id/runs
Content-Type: application/json
{
"streams": ["contacts", "deals"]
}
The sync runs asynchronously. Multiple connections can sync concurrently. Synced data is written to Opbox tables with automatic column type inference and schema evolution.
Stop a Sync
POST /api/pipelines/connections/:id/runs/:runId
List Runs
GET /api/pipelines/connections/:id/runs?limit=50&offset=0
{
"runs": [
{
"id": "cm...",
"status": "COMPLETED",
"trigger": "MANUAL",
"startedAt": "2026-02-11T10:00:00.000Z",
"completedAt": "2026-02-11T10:02:30.000Z",
"rowsSynced": 150,
"rowsCreated": 30,
"rowsUpdated": 120
}
],
"pagination": { "limit": 50, "offset": 0, "total": 12, "hasMore": false }
}
Reset Sync Cursor
Reset or adjust the sync cursor for a connection. Incremental syncs use the cursor to determine the starting point. Resetting the cursor allows you to re-sync historical data or force a full resync. Requires ADMIN or OWNER role.
Set cursor to a specific date
PATCH /api/pipelines/connections/:id
Content-Type: application/json
{
"stream": "transcripts",
"cursor": "2026-01-01T00:00:00.000Z"
}
{
"success": true,
"syncState": { "version": 1, "streams": { "transcripts": { "cursor": "2026-01-01T00:00:00.000Z" } } },
"message": "Sync cursor set to 2026-01-01T00:00:00.000Z for stream \"transcripts\""
}
Full resync (clear all cursors)
PATCH /api/pipelines/connections/:id
Content-Type: application/json
{
"stream": "transcripts",
"cursor": null
}
{
"success": true,
"syncState": { "version": 1, "streams": {} },
"message": "Sync cursor cleared - next sync will be a full resync"
}
Body Parameters
| Field | Type | Required | Description |
|---|---|---|---|
stream | string | No | Stream name to reset (defaults to "transcripts") |
cursor | string | null | Yes | ISO 8601 date string to set the cursor to, or null to clear all cursors for a full resync |
Webhook Trigger
Each connection has a unique webhook URL that can trigger syncs from external services. No authentication required - the webhook token acts as the credential.
POST /api/pipelines/webhook/:token
{ "success": true, "message": "Sync triggered" }
Returns 202 Accepted.
Reveal Webhook URL
Retrieve the full webhook URL for a connection. This is an audit-logged operation (VIEW_SECRET) that requires ADMIN or OWNER role. The response includes a Cache-Control: no-store header to prevent caching.
POST /api/pipelines/connections/:id/webhook-url
{
"webhookUrl": "https://app.example.com/api/pipelines/webhook/abc123...",
"tokenPrefix": "abc1...",
"curlExample": "curl -X POST https://app.example.com/api/pipelines/webhook/abc123..."
}
Legacy webhook tokens are automatically upgraded to the current encoding format when this endpoint is called. No action is required from the caller.
Notification Targeting
Control who receives sync completion and failure notifications per connection. Set notifyUserIds to an array of user IDs when creating or updating a connection.
PUT /api/pipelines/connections/:id
Content-Type: application/json
{
"notifyUserIds": ["cm...user1", "cm...user2"]
}
To notify all org members (default behavior), pass an empty array:
{ "notifyUserIds": [] }
When notifyUserIds is empty or not set, all org members receive pipeline notifications (filtered by their notification preferences). When set, only the specified users receive notifications. The user who triggered the sync is excluded from notifications.
Connection Variants
Create variant connections from an existing connection for routing to different endpoints or configurations. Variants share the same credentials and are linked via a variantGroupId. Requires ADMIN or OWNER role.
POST /api/pipelines/connections/:id/variants
Content-Type: application/json
{
"name": "CRM Platform - Europe Region",
"variantLabel": "EU"
}
{
"id": "cm...",
"name": "CRM Platform - Europe Region",
"type": "CRM_PLATFORM",
"variantGroupId": "cm...",
"variantLabel": "EU",
"status": "DISCONNECTED",
"webhookToken": "new-unique-token"
}
Returns 201 Created. The source connection is automatically backfilled with variant metadata if it doesn't already have a group ID. Sequential labels are auto-assigned if not provided.
Shared Table Storage
By default, all variants in a group share the same destination tables. Synced data from every variant is merged into one table per stream, with table names derived from the base connection name (variant labels like "(2024)" or "(EU)" are stripped). To give a variant its own separate tables, set createOwnTables: true in the variant's config.
PUT /api/pipelines/connections/:id
Content-Type: application/json
// Use shared tables (default - merges data with other variants)
{
"config": { "createOwnTables": false }
}
// Use separate tables for this variant only
{
"config": { "createOwnTables": true }
}
| Config Field | Type | Default | Description |
|---|---|---|---|
variantGroupId | string | auto | Links variants to a group (set automatically when creating a variant) |
createOwnTables | boolean | false | When true, this variant creates its own tables instead of sharing with the group |
Scheduled Sync
System endpoint for triggering scheduled pipeline syncs (internal use only).
GET /api/pipelines/cron
{
"triggered": 3,
"skipped": 7,
"connections": ["cm...conn1", "cm...conn2", "cm...conn3"]
}
Checks all enabled connections against their schedule frequency (hourly, daily, weekly, monthly). Already-running syncs are deduplicated.
Run All Connections
Manually trigger a sync for all enabled pipeline connections in your workspace. Requires ADMIN or OWNER role.
POST /api/pipelines/run-all
{
"runIds": ["cm...run1", "cm...run2"],
"count": 2,
"message": "Enqueued 2 sync(s), 1 already in progress"
}
Returns 201 Created. Connections with an existing RUNNING or PENDING run are skipped to prevent duplicate syncs.
OAuth Authentication Flow
Connectors that require OAuth use a 3-step flow: authorize, redirect, exchange. The authorize endpoint creates a DISCONNECTED connection, stores encrypted credentials, and returns a signed authorization URL. The callback/exchange endpoints validate the state parameter and exchange the authorization code for access/refresh tokens.
1. Initiate OAuth
POST /api/pipelines/oauth/connector/authorize
Content-Type: application/json
{
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"connectionName": "Accounting Connection"
}
Returns { authorizeUrl, connectionId }. Redirect the user to authorizeUrl.
2. Exchange Code
POST /api/pipelines/oauth/connector/exchange
Content-Type: application/json
{
"connectionId": "clxyz...",
"code": "authorization-code-from-redirect"
}
Exchanges the authorization code for access + refresh tokens, updates the connection to CONNECTED status, and triggers initial sync discovery.
Security
Pipeline credentials are encrypted at rest. Credentials are never returned in API responses.