Opbox

Connect to external services and sync data into Opbox tables. Multiple connectors for CRM, accounting, project management, scheduling, document signing, meetings, support, messaging, registry, and development services with scheduled syncs, webhook triggers, concurrent execution, variant routing, and dynamic progress notifications.

Endpoints

MethodEndpointDescription
GET/api/pipelines/connector-typesList available connector types
GET/api/pipelines/connectionsList connections in the active workspace
POST/api/pipelines/connectionsCreate a connection
GET/api/pipelines/connections/:idGet connection details
PUT/api/pipelines/connections/:idUpdate a connection
PATCH/api/pipelines/connections/:idReset sync cursor
DELETE/api/pipelines/connections/:idDelete a connection
POST/api/pipelines/connections/testTest connection credentials
POST/api/pipelines/connections/:id/checkCheck connection health
POST/api/pipelines/connections/:id/discoverDiscover available data streams
GET/api/pipelines/connections/:id/runsList sync runs
POST/api/pipelines/connections/:id/runsTrigger a sync run
POST/api/pipelines/connections/:id/runs/:runIdStop a running sync
POST/api/pipelines/connections/:id/variantsCreate a variant connection
POST/api/pipelines/connections/:id/webhook-urlReveal webhook URL (audit-logged)
GET/api/pipelines/cronTrigger scheduled syncs (cron endpoint)
POST/api/pipelines/run-allTrigger sync for all enabled connections
POST/api/pipelines/oauth/:connector/authorizeInitiate OAuth flow (creates connection, returns authorize URL)
GET/api/pipelines/oauth/:connector/callbackOAuth callback (validates state, exchanges code for tokens)
POST/api/pipelines/oauth/:connector/exchangeExchange authorization code for tokens (manual code paste flow)

Connector Types

List all available connector types and their required credentials.

GET /api/pipelines/connector-types

Available Connectors

TypeNameCategory
CRM_PLATFORMCRM PlatformCRM
ACCOUNTINGAccounting SoftwareAccounting
PROJECT_MGMTProject ManagementProject Management
SCHEDULINGScheduling PlatformScheduling
DOC_SIGNINGDocument SigningDocuments
MEETINGSMeeting TranscriptionMeetings
SUPPORTCustomer SupportSupport
MESSAGINGWhatsApp MessagingWhatsApp
REGISTRYRegistry ServiceRegistry

Create a Connection

Creates a new pipeline connection with encrypted credentials. Requires ADMIN or OWNER role.

POST /api/pipelines/connections
Content-Type: application/json

{
  "name": "Production CRM",
  "type": "CRM_PLATFORM",
  "credentials": {
    "apiKey": "your-api-key-here"
  },
  "schedule": {
    "enabled": true,
    "frequency": "daily"
  },
  "notifyUserIds": ["cm...user1", "cm...user2"]
}

Body Parameters

FieldTypeRequiredDescription
namestringYesConnection display name
typestringYesConnector type (see table above)
credentialsobjectYesService-specific credentials (encrypted at rest)
configobjectNoCustom configuration
scheduleobjectNoAuto-sync schedule: hourly, daily, weekly, or monthly
notifyUserIdsstring[]NoUser IDs to notify on sync completion or failure. If empty, all org members are notified.

Test Connection

Validates credentials without saving the connection.

POST /api/pipelines/connections/test
Content-Type: application/json

{
  "type": "CRM_PLATFORM",
  "credentials": { "apiKey": "your-api-key-here" }
}
{ "success": true, "message": "Connection successful" }

Sync Operations

Trigger data syncs, discover available data streams, and monitor run history.

Discover Streams

POST /api/pipelines/connections/:id/discover
{
  "streams": [
    { "name": "contacts", "fields": [], "supportedSyncModes": ["full_refresh"] },
    { "name": "deals", "fields": [], "supportedSyncModes": ["full_refresh"] }
  ]
}

Trigger Sync

POST /api/pipelines/connections/:id/runs
Content-Type: application/json

{
  "streams": ["contacts", "deals"]
}

The sync runs asynchronously. Multiple connections can sync concurrently. Synced data is written to Opbox tables with automatic column type inference and schema evolution.

Stop a Sync

POST /api/pipelines/connections/:id/runs/:runId

List Runs

GET /api/pipelines/connections/:id/runs?limit=50&offset=0
{
  "runs": [
    {
      "id": "cm...",
      "status": "COMPLETED",
      "trigger": "MANUAL",
      "startedAt": "2026-02-11T10:00:00.000Z",
      "completedAt": "2026-02-11T10:02:30.000Z",
      "rowsSynced": 150,
      "rowsCreated": 30,
      "rowsUpdated": 120
    }
  ],
  "pagination": { "limit": 50, "offset": 0, "total": 12, "hasMore": false }
}

Reset Sync Cursor

Reset or adjust the sync cursor for a connection. Incremental syncs use the cursor to determine the starting point. Resetting the cursor allows you to re-sync historical data or force a full resync. Requires ADMIN or OWNER role.

Set cursor to a specific date

PATCH /api/pipelines/connections/:id
Content-Type: application/json

{
  "stream": "transcripts",
  "cursor": "2026-01-01T00:00:00.000Z"
}
{
  "success": true,
  "syncState": { "version": 1, "streams": { "transcripts": { "cursor": "2026-01-01T00:00:00.000Z" } } },
  "message": "Sync cursor set to 2026-01-01T00:00:00.000Z for stream \"transcripts\""
}

Full resync (clear all cursors)

PATCH /api/pipelines/connections/:id
Content-Type: application/json

{
  "stream": "transcripts",
  "cursor": null
}
{
  "success": true,
  "syncState": { "version": 1, "streams": {} },
  "message": "Sync cursor cleared - next sync will be a full resync"
}

Body Parameters

FieldTypeRequiredDescription
streamstringNoStream name to reset (defaults to "transcripts")
cursorstring | nullYesISO 8601 date string to set the cursor to, or null to clear all cursors for a full resync

Webhook Trigger

Each connection has a unique webhook URL that can trigger syncs from external services. No authentication required - the webhook token acts as the credential.

POST /api/pipelines/webhook/:token
{ "success": true, "message": "Sync triggered" }

Returns 202 Accepted.

Reveal Webhook URL

Retrieve the full webhook URL for a connection. This is an audit-logged operation (VIEW_SECRET) that requires ADMIN or OWNER role. The response includes a Cache-Control: no-store header to prevent caching.

POST /api/pipelines/connections/:id/webhook-url
{
  "webhookUrl": "https://app.example.com/api/pipelines/webhook/abc123...",
  "tokenPrefix": "abc1...",
  "curlExample": "curl -X POST https://app.example.com/api/pipelines/webhook/abc123..."
}

Legacy webhook tokens are automatically upgraded to the current encoding format when this endpoint is called. No action is required from the caller.

Notification Targeting

Control who receives sync completion and failure notifications per connection. Set notifyUserIds to an array of user IDs when creating or updating a connection.

PUT /api/pipelines/connections/:id
Content-Type: application/json

{
  "notifyUserIds": ["cm...user1", "cm...user2"]
}

To notify all org members (default behavior), pass an empty array:

{ "notifyUserIds": [] }

When notifyUserIds is empty or not set, all org members receive pipeline notifications (filtered by their notification preferences). When set, only the specified users receive notifications. The user who triggered the sync is excluded from notifications.

Connection Variants

Create variant connections from an existing connection for routing to different endpoints or configurations. Variants share the same credentials and are linked via a variantGroupId. Requires ADMIN or OWNER role.

POST /api/pipelines/connections/:id/variants
Content-Type: application/json

{
  "name": "CRM Platform - Europe Region",
  "variantLabel": "EU"
}
{
  "id": "cm...",
  "name": "CRM Platform - Europe Region",
  "type": "CRM_PLATFORM",
  "variantGroupId": "cm...",
  "variantLabel": "EU",
  "status": "DISCONNECTED",
  "webhookToken": "new-unique-token"
}

Returns 201 Created. The source connection is automatically backfilled with variant metadata if it doesn't already have a group ID. Sequential labels are auto-assigned if not provided.

Shared Table Storage

By default, all variants in a group share the same destination tables. Synced data from every variant is merged into one table per stream, with table names derived from the base connection name (variant labels like "(2024)" or "(EU)" are stripped). To give a variant its own separate tables, set createOwnTables: true in the variant's config.

PUT /api/pipelines/connections/:id
Content-Type: application/json

// Use shared tables (default - merges data with other variants)
{
  "config": { "createOwnTables": false }
}

// Use separate tables for this variant only
{
  "config": { "createOwnTables": true }
}
Config FieldTypeDefaultDescription
variantGroupIdstringautoLinks variants to a group (set automatically when creating a variant)
createOwnTablesbooleanfalseWhen true, this variant creates its own tables instead of sharing with the group

Scheduled Sync

System endpoint for triggering scheduled pipeline syncs (internal use only).

GET /api/pipelines/cron
{
  "triggered": 3,
  "skipped": 7,
  "connections": ["cm...conn1", "cm...conn2", "cm...conn3"]
}

Checks all enabled connections against their schedule frequency (hourly, daily, weekly, monthly). Already-running syncs are deduplicated.

Run All Connections

Manually trigger a sync for all enabled pipeline connections in your workspace. Requires ADMIN or OWNER role.

POST /api/pipelines/run-all
{
  "runIds": ["cm...run1", "cm...run2"],
  "count": 2,
  "message": "Enqueued 2 sync(s), 1 already in progress"
}

Returns 201 Created. Connections with an existing RUNNING or PENDING run are skipped to prevent duplicate syncs.

OAuth Authentication Flow

Connectors that require OAuth use a 3-step flow: authorize, redirect, exchange. The authorize endpoint creates a DISCONNECTED connection, stores encrypted credentials, and returns a signed authorization URL. The callback/exchange endpoints validate the state parameter and exchange the authorization code for access/refresh tokens.

1. Initiate OAuth

POST /api/pipelines/oauth/connector/authorize
Content-Type: application/json

{
  "client_id": "your-client-id",
  "client_secret": "your-client-secret",
  "connectionName": "Accounting Connection"
}

Returns { authorizeUrl, connectionId }. Redirect the user to authorizeUrl.

2. Exchange Code

POST /api/pipelines/oauth/connector/exchange
Content-Type: application/json

{
  "connectionId": "clxyz...",
  "code": "authorization-code-from-redirect"
}

Exchanges the authorization code for access + refresh tokens, updates the connection to CONNECTED status, and triggers initial sync discovery.

Security

Pipeline credentials are encrypted at rest. Credentials are never returned in API responses.