Skip to content

Document Processing Integration Guide

This guide outlines the changes needed across the MicroDC platform to support document processing using Docling.

Overview

Docling enables processing of various document formats (PDF, DOCX, PPTX, XLSX, HTML, images, audio, etc.) to extract meaningful text and markdown. This integration uses model-based routing to direct jobs to the appropriate processing engine.

Architecture: Model-Based Routing

Document processing has been implemented using a scalable engine architecture that routes jobs based on model_id rather than job_type. This enables multiple engines of the same type (e.g., multiple OCR engines) to coexist.

Engine Hierarchy

Engine (base class)
├── InferenceEngine (LLM inference)
│   └── OllamaEngine, vLLMEngine, etc.
└── ProcessingEngine (non-inference tasks)
    ├── DoclingEngine (provides model_id="docling")
    ├── TesseractEngine (provides model_id="tesseract")
    └── EasyOCREngine (provides model_id="easyocr")

Key Benefits

  • Model-Based Routing: Jobs specify model_id to select specific processor (e.g., model_id="docling")
  • Multiple Engines Per Type: Support multiple OCR engines, document engines, etc.
  • Extensible: Easy to add new engine types (OCR, image, audio, video)
  • Clean Separation: Different engine types have appropriate interfaces
  • Type Safety: Engine capabilities are declared via enums
  • Lazy Registration: Engine models registered on-demand for better performance
  • Capability Registration: Workers report supported_job_types to server

Routing Example

Jobs are routed by model_id:

# Document processing job
job = Job(
    model_id="docling",  # Routes to DoclingEngine
    job_type="document",
    input_data={"source": "https://example.com/doc.pdf"}
)

# OCR job with Tesseract
job = Job(
    model_id="tesseract",  # Routes to TesseractEngine
    job_type="ocr",
    input_data={"source": "image.png"}
)

# OCR job with EasyOCR
job = Job(
    model_id="easyocr",  # Routes to EasyOCREngine
    job_type="ocr",
    input_data={"source": "image.png"}
)

Architecture

The integration uses the existing job system with minimal changes:

  • Worker: Processes documents using Docling (primary implementation)
  • Server: Routes document jobs to capable workers (minimal changes)
  • Python Client: Provides API for submitting document processing jobs (new methods)

1. Worker Changes (This Repository)

Status: ✅ Complete

The worker now supports document processing through the engine architecture.

Key Components:

  • Engine / ProcessingEngine base classes (src/engines/engine_base.py)
  • DoclingEngine implementing document processing (src/engines/docling_engine.py)
  • DocumentProcessor class wrapping Docling (src/processors/document_processor.py)
  • Extended JobExecutor to route to processing engines
  • Capability registration in WorkerRegistration and WorkerHeartbeat
  • Configuration under engine.processing.docling in config/default.yaml

Worker Capability Registration

Workers automatically register their supported job types during:

1. Initial Registration (WorkerRegistration):

# Worker collects job types from all engines
supported_job_types = []
supported_job_types.extend(inference_engine.get_supported_job_types())  # ["llm", "embed"]

# Iterate over processing engines (now a list)
for engine in processing_engines:
    supported_job_types.extend(engine.get_supported_job_types())  # ["document"]

# Results in: ["document", "embed", "llm"]
registration = WorkerRegistration(
    ...,
    supported_job_types=["document", "embed", "llm"]
)

2. Heartbeat Updates (WorkerHeartbeat):

# Capabilities reported on every heartbeat (can change dynamically)
heartbeat = WorkerHeartbeat(
    ...,
    supported_job_types=["document", "embed", "llm"]
)

3. Model-Based Routing (JobExecutor):

# JobExecutor builds model_id -> engine mapping on initialization
# and performs lazy lookup for processing engine models
processing_engine = await self._get_processing_engine_for_model(job.model_id)
if processing_engine:
    # Route to processing engine (docling, tesseract, etc.)
    result = await processing_engine.process(
        model_id=job.model_id,
        input_data=job.input_data,
        params=job.parameters
    )
else:
    # Route to inference engine
    result = await self.inference_engine.generate(...)

This allows the server to know which workers can handle document processing jobs and enables workers to route jobs to the correct engine based on model_id.


2. Server Changes Required

The worker already reports supported_job_types in registration and heartbeat. The server should store and use this information for intelligent job routing.

2.1 Store Worker Capabilities

File: server/models/worker.py (or database schema)

Change: Add field to store supported job types

class Worker(Base):
    # ... existing fields ...
    supported_job_types = Column(JSON, default=["llm", "embed"])  # Array of strings

Update on Registration:

worker.supported_job_types = registration.supported_job_types

Update on Heartbeat:

# Update if capabilities changed (e.g., new engine added)
if heartbeat.supported_job_types:
    worker.supported_job_types = heartbeat.supported_job_types

2.2 Job Routing by Capability

File: server/api/jobs.py (job assignment endpoint)

Change: Filter workers by supported job types

def get_available_workers_for_job(job: Job):
    """Get workers that can handle this job type."""
    return (
        db.query(Worker)
        .filter(Worker.status == "idle")
        .filter(Worker.supported_job_types.contains([job.job_type]))  # PostgreSQL JSON contains
        .order_by(Worker.priority.desc())
        .all()
    )

Rationale: Only route document jobs to workers with Docling installed

2.3 API Schema Documentation

File: server/api/schemas.py (or equivalent)

Change: Add documentation for document job type

class JobType(str):
    """Supported job types.

    - llm: Language model inference (chat/generation)
    - embed: Generate text embeddings
    - document: Document processing and extraction
    """
    LLM = "llm"
    EMBED = "embed"
    DOCUMENT = "document"  # Add this

Rationale: Makes the API self-documenting and appears in OpenAPI/Swagger docs

2.2 Job Validation (Optional)

File: server/api/validators.py (or job creation endpoint)

Current Behavior: Server accepts any string for job_type

Recommended Addition: Add validation with clear error messages

SUPPORTED_JOB_TYPES = {"llm", "embed", "document"}

def validate_job_type(job_type: str) -> str:
    """Validate job type is supported."""
    if job_type not in SUPPORTED_JOB_TYPES:
        raise ValueError(
            f"Unsupported job_type: {job_type}. "
            f"Supported types: {', '.join(SUPPORTED_JOB_TYPES)}"
        )
    return job_type

Rationale: Provides better error messages to users submitting invalid job types

File: server/api/jobs.py (job availability endpoint)

Current Behavior: Filters jobs by model compatibility

Recommended Addition: Add job type filtering

def get_available_jobs(worker_id: str, limit: int = 5):
    """Get jobs available for this worker."""
    worker = get_worker(worker_id)
    worker_capabilities = worker.supported_job_types  # New field

    jobs = (
        db.query(Job)
        .filter(Job.status == "pending")
        .filter(Job.job_type.in_(worker_capabilities))  # Filter by capability
        .order_by(Job.priority.desc(), Job.created_at.asc())
        .limit(limit)
        .all()
    )
    return jobs

Rationale: Prevents routing document jobs to workers without Docling support

2.4 Worker Registration Enhancement

File: server/api/workers.py (registration endpoint)

Add Field: Allow workers to declare supported job types

class WorkerRegistration(BaseModel):
    # ... existing fields ...
    supported_job_types: List[str] = ["llm", "embed"]  # New field

class WorkerHeartbeat(BaseModel):
    # ... existing fields ...
    supported_job_types: Optional[List[str]] = None  # Allow updates via heartbeat

Worker Side Change:

# In worker registration
registration = WorkerRegistration(
    # ... existing fields ...
    supported_job_types=["llm", "embed", "document"]  # Declare capability
)

Rationale: Enables intelligent job routing based on worker capabilities

2.5 Database Migration (If Adding Job Type Enum)

Only needed if you want database-level validation (not recommended due to reduced flexibility)

-- Option 1: Add CHECK constraint (PostgreSQL)
ALTER TABLE jobs
ADD CONSTRAINT valid_job_type
CHECK (job_type IN ('llm', 'embed', 'document'));

-- Option 2: Use ENUM type (PostgreSQL)
CREATE TYPE job_type_enum AS ENUM ('llm', 'embed', 'document');
ALTER TABLE jobs ALTER COLUMN job_type TYPE job_type_enum USING job_type::job_type_enum;

⚠️ Not Recommended: Reduces flexibility for adding new job types in the future

2.6 API Documentation

File: server/docs/API.md or OpenAPI schema

Add Section:

### Document Processing Jobs

Process documents and extract text/markdown using Docling.

**Job Type:** `document`

**Supported Formats:**
- PDF, DOCX, PPTX, XLSX
- HTML, Markdown
- Images (PNG, JPEG, TIFF)
- Audio (WAV, MP3)
- Video (VTT)

**Input Format:**
```json
{
  "job_type": "document",
  "model_id": "docling",
  "input_data": {
    "source": "https://example.com/document.pdf",
    "source_type": "url",
    "filename": "document.pdf"
  },
  "parameters": {
    "doc_format": "markdown",
    "enable_ocr": true,
    "extract_tables": true,
    "extract_images": false
  }
}

Output Format:

{
  "content": "# Extracted markdown content...",
  "format": "markdown",
  "metadata": {
    "pages": 10,
    "tables_detected": 3,
    "figures_detected": 2,
    "processing_time_ms": 1234
  }
}
### Summary of Server Changes

| Change | Priority | Breaking Change? | Effort |
|--------|----------|------------------|--------|
| API Schema Documentation | High | No | Low |
| Job Type Validation | Medium | No | Low |
| Model Filtering | High | No | Medium |
| Worker Registration Enhancement | High | No | Medium |
| Database Migration | Low | Possibly | High |
| API Documentation | High | No | Low |

**Estimated Total Effort:** 2-4 hours

---

## 3. Python Client Changes Required

### Status: 🔧 Required for User-Facing API

The Python client needs new methods to make document processing easy for users.

### 3.1 Add Document Processing Method

**File:** `client/microdc/client.py` (or equivalent)

**Add Class:**

```python
class DocumentProcessor:
    """Document processing API client."""

    def __init__(self, api_client):
        self.client = api_client

    def process_document(
        self,
        source: str,
        source_type: str = "url",
        output_format: str = "markdown",
        enable_ocr: bool = True,
        extract_tables: bool = True,
        extract_images: bool = False,
        model_id: str = "docling",  # Required for routing
        timeout: Optional[int] = 300,
        **kwargs
    ) -> Dict[str, Any]:
        """Process a document and extract text/markdown.

        Args:
            source: URL, file path, or base64-encoded document
            source_type: "url", "path", or "base64"
            output_format: "markdown", "html", "json", or "doctags"
            enable_ocr: Enable OCR for scanned documents
            extract_tables: Extract table structures
            extract_images: Include extracted images in output
            model_id: Processing engine to use ("docling", "tesseract", etc.)
            timeout: Job timeout in seconds
            **kwargs: Additional processing options

        Returns:
            Dictionary with 'content' and 'metadata' keys

        Example:
            >>> processor = client.documents
            >>> result = processor.process_document(
            ...     source="https://example.com/doc.pdf",
            ...     source_type="url",
            ...     output_format="markdown",
            ...     model_id="docling"  # Routes to DoclingEngine
            ... )
            >>> print(result['content'])
            # Document Title...
            >>> print(result['metadata']['pages'])
            10
        """
        job_data = {
            "job_type": "document",
            "model_id": model_id,  # Required for model-based routing
            "input_data": {
                "source": source,
                "source_type": source_type,
            },
            "parameters": {
                "doc_format": output_format,
                "enable_ocr": enable_ocr,
                "extract_tables": extract_tables,
                "extract_images": extract_images,
                **kwargs
            },
            "timeout": timeout
        }

        # Submit job and wait for result
        job = self.client.jobs.create(**job_data)
        result = self.client.jobs.wait_for_completion(job.id, timeout=timeout)

        if result.status == "completed":
            return result.output
        else:
            raise Exception(f"Document processing failed: {result.error}")

    def process_document_async(
        self,
        source: str,
        source_type: str = "url",
        **kwargs
    ) -> str:
        """Submit document processing job asynchronously.

        Returns job_id for later polling.

        Example:
            >>> job_id = processor.process_document_async(
            ...     source="https://example.com/doc.pdf"
            ... )
            >>> # ... do other work ...
            >>> result = client.jobs.get_result(job_id)
        """
        job_data = {
            "job_type": "document",
            "input_data": {
                "source": source,
                "source_type": source_type,
            },
            "parameters": kwargs
        }

        job = self.client.jobs.create(**job_data)
        return job.id

    def process_file(
        self,
        file_path: str,
        **kwargs
    ) -> Dict[str, Any]:
        """Process a local file.

        Args:
            file_path: Path to local document file
            **kwargs: Additional options (see process_document)

        Example:
            >>> result = processor.process_file("./document.pdf")
        """
        # Read file and encode as base64
        import base64
        with open(file_path, "rb") as f:
            content = base64.b64encode(f.read()).decode("utf-8")

        return self.process_document(
            source=content,
            source_type="base64",
            **kwargs
        )

3.2 Integrate into Main Client

File: client/microdc/client.py

class MicroDCClient:
    def __init__(self, api_key: str, base_url: str = "https://api.microdc.ai"):
        self.api_key = api_key
        self.base_url = base_url
        # ... existing initialization ...

        # Add document processor
        self.documents = DocumentProcessor(self)  # New property

3.3 Add Convenience Methods

File: client/microdc/client.py

class MicroDCClient:
    # ... existing methods ...

    def process_document(self, source: str, **kwargs) -> Dict[str, Any]:
        """Shorthand for document processing.

        Example:
            >>> client = MicroDCClient(api_key="...")
            >>> result = client.process_document("https://example.com/doc.pdf")
        """
        return self.documents.process_document(source, **kwargs)

3.4 Update Client Documentation

File: client/README.md

Add section:

## Document Processing

Process documents and extract text using Docling:

### Basic Usage

```python
from microdc import MicroDCClient

client = MicroDCClient(api_key="your-api-key")

# Process a document from URL
result = client.process_document("https://example.com/document.pdf")
print(result['content'])  # Extracted markdown

# Process local file
result = client.documents.process_file("./presentation.pptx")

# Advanced options
result = client.documents.process_document(
    source="https://example.com/scan.pdf",
    output_format="markdown",
    enable_ocr=True,
    extract_tables=True,
    extract_images=False
)

# Async processing for large documents
job_id = client.documents.process_document_async("https://example.com/large.pdf")
# ... do other work ...
result = client.jobs.get_result(job_id)

Supported Formats

  • Documents: PDF, DOCX, PPTX, XLSX
  • Web: HTML, Markdown
  • Images: PNG, JPEG, TIFF
  • Audio: WAV, MP3
  • Video: VTT subtitles

Parameters

  • output_format: Output format ("markdown", "html", "json", "doctags")
  • enable_ocr: Enable OCR for scanned documents (default: True)
  • extract_tables: Extract table structures (default: True)
  • extract_images: Include extracted images (default: False)
### Summary of Client Changes

| Change | Priority | Breaking Change? | Effort |
|--------|----------|------------------|--------|
| DocumentProcessor class | High | No | Medium |
| Integration into main client | High | No | Low |
| Convenience methods | Medium | No | Low |
| Documentation | High | No | Low |
| Example scripts | Medium | No | Low |
| Unit tests | High | No | Medium |

**Estimated Total Effort:** 3-5 hours

---

## 4. Implementation Sequence

### Phase 1: Worker Implementation (Week 1)
1. ✅ Review and finalize plan
2. Add Docling integration to worker
3. Test with manual job submissions
4. Update worker documentation

### Phase 2: Server Updates (Week 1-2)
1. Add API schema documentation
2. Implement job type validation
3. Add worker capability filtering
4. Update API documentation
5. Deploy server changes

### Phase 3: Client SDK (Week 2)
1. Implement DocumentProcessor class
2. Add convenience methods
3. Write unit tests
4. Update client documentation
5. Publish new client version

### Phase 4: Testing & Rollout (Week 2-3)
1. End-to-end integration testing
2. Performance testing with large documents
3. Beta testing with select users
4. Full production rollout

---

## 5. Backward Compatibility

All changes are **backward compatible**:
- ✅ Existing LLM and embed jobs continue working unchanged
- ✅ Workers without Docling support can ignore document jobs
- ✅ Clients without document methods can still use raw API
- ✅ No database schema changes required (optional enhancements only)

---

## 6. Testing Strategy

### Worker Testing
```python
# Test document job processing
job = {
    "job_type": "document",
    "input_data": {
        "source": "https://arxiv.org/pdf/example.pdf",
        "source_type": "url"
    },
    "parameters": {
        "doc_format": "markdown"
    }
}
result = worker.process_job(job)
assert "content" in result
assert "metadata" in result

Server Testing

# Test job routing to capable workers
job = create_job(job_type="document")
workers = get_available_workers_for_job(job)
assert all(w.supports_job_type("document") for w in workers)

Client Testing

# Test client API
result = client.process_document("https://example.com/doc.pdf")
assert result["content"]
assert result["metadata"]["pages"] > 0

7. Monitoring & Metrics

Key Metrics to Track

  • Document processing success rate
  • Average processing time by format
  • OCR usage percentage
  • Error rates by document type
  • Worker utilization for document jobs

Logging

  • Log all document processing attempts
  • Track Docling version and configuration
  • Monitor temp file cleanup
  • Alert on repeated failures

8. Security Considerations

Input Validation

  • Validate URL sources (prevent SSRF)
  • Limit file sizes (prevent DoS)
  • Sanitize filenames
  • Validate MIME types

Output Sanitization

  • Escape HTML in markdown output
  • Strip malicious content
  • Limit output size

Resource Limits

  • Max file size: 50MB (configurable)
  • Timeout: 5 minutes default
  • Rate limiting on document endpoints

9. Cost Considerations

Resource Usage

  • Document processing is more CPU-intensive than text generation
  • Large PDFs may require significant memory
  • OCR processing adds substantial overhead

Pricing Recommendations

  • Charge based on:
  • Number of pages processed
  • Document size (MB)
  • Features enabled (OCR, table extraction)
  • Output format complexity

10. Future Enhancements

Potential Additions

  1. Batch Processing: Process multiple documents in one job
  2. Document Comparison: Compare two documents for differences
  3. Language Detection: Auto-detect document language
  4. Summary Generation: Combine with LLM for document summarization
  5. Entity Extraction: Extract structured data (dates, names, etc.)
  6. Document Classification: Classify document types
  7. Custom Parsers: Allow users to register custom document processors

Questions?

For implementation questions or clarifications, refer to: