Skip to main content

Ingestors

Ingestors are services that pull data from external sources and submit it to the RAG server for indexing. Each ingestor connects to a specific data source, transforms the data into documents or graph entities, and manages its own sync schedule.

For implementation details and creating custom ingestors, see the Ingestors README.

How Ingestors Work​

All ingestors follow a common pattern:

  1. Connect to an external data source (API, database, file system)
  2. Fetch data with pagination and incremental sync where supported
  3. Transform data into documents or graph entities with metadata
  4. Submit to the RAG server via REST API (POST /v1/ingest)
  5. Track job progress and handle errors
  6. Schedule periodic syncs to keep data fresh

Data Types​

Ingestors can produce two types of data:

TypeStorageUse Case
DocumentsMilvus (vectors)Unstructured text like web pages, chat messages, wiki pages
Graph EntitiesMilvus + Neo4jStructured data with relationships like infrastructure resources

Available Ingestors​

Web Loader​

Crawls sitemaps and web pages. Built into the RAG system and triggered via the Web UI.

FeatureDescription
InputSitemap URLs or individual page URLs
OutputDocuments
TriggerOn-demand via Web UI

AWS​

Discovers and ingests AWS resources across all regions.

FeatureDescription
InputAWS API (via boto3)
OutputGraph Entities
Entity TypesEC2, S3, RDS, Lambda, EKS, DynamoDB, VPC, IAM, and more
DocumentationAWS README

Kubernetes​

Ingests Kubernetes resources including custom resources.

FeatureDescription
InputKubernetes API (via kubeconfig)
OutputGraph Entities
Entity TypesPods, Deployments, Services, ConfigMaps, Secrets, CRDs
DocumentationKubernetes README

Backstage​

Ingests entities from a Backstage service catalog.

FeatureDescription
InputBackstage Catalog API
OutputGraph Entities
Entity TypesComponents, APIs, Systems, Domains, Groups, Users
DocumentationBackstage README

ArgoCD​

Ingests GitOps resources from ArgoCD.

FeatureDescription
InputArgoCD API
OutputGraph Entities
Entity TypesApplications, Projects, Clusters, Repositories, ApplicationSets
DocumentationArgoCD README

GitHub​

Ingests organizational data from GitHub.

FeatureDescription
InputGitHub API
OutputGraph Entities
Entity TypesOrganizations, Repositories, Teams, Users
DocumentationGitHub README

Confluence​

Ingests pages from Confluence spaces with incremental sync support.

FeatureDescription
InputConfluence REST API
OutputDocuments
FeaturesIncremental sync, space filtering
DocumentationConfluence README

Slack​

Ingests conversations from Slack channels.

FeatureDescription
InputSlack API
OutputDocuments
FeaturesThreads grouped as single documents, channel filtering
DocumentationSlack README

Webex​

Ingests messages from Webex spaces.

FeatureDescription
InputWebex API
OutputDocuments
FeaturesSpace filtering, message threading
DocumentationWebex README

Common Configuration​

All ingestors share common configuration options:

VariableDefaultDescription
RAG_SERVER_URLhttp://localhost:9446RAG server endpoint
SYNC_INTERVAL86400 (24h)Seconds between syncs
INIT_DELAY_SECONDS0Delay before first sync
MAX_DOCUMENTS_PER_INGEST1000Batch size for ingestion
EXIT_AFTER_FIRST_SYNCfalseExit after one sync (for batch jobs)

Authentication​

Ingestors authenticate with the RAG server using one of two methods:

Development: Trusted Network​

When ALLOW_TRUSTED_NETWORK=true on the server, ingestors from localhost or configured CIDRs connect without authentication.

Production: OAuth2 Client Credentials​

Ingestors obtain access tokens via OAuth2 client credentials flow:

INGESTOR_OIDC_ISSUER=https://your-keycloak.com/realms/production
INGESTOR_OIDC_CLIENT_ID=rag-ingestor
INGESTOR_OIDC_CLIENT_SECRET=xxx

The ingestor client automatically:

  • Fetches tokens via client credentials grant
  • Caches tokens and refreshes before expiry
  • Includes Bearer token in all API calls

Creating Custom Ingestors​

The IngestorBuilder class provides a simple framework for creating ingestors:

from common.ingestor import IngestorBuilder

async def sync_data(client):
# Your sync logic here
pass

IngestorBuilder()\
.name("my-ingestor")\
.type("custom")\
.sync_with_fn(sync_data)\
.every(3600)\ # Sync every hour
.run()

See the Ingestors README for a complete example with job management and error handling.

Further Reading​