Document Processing Integration Guide¶
This guide outlines the changes needed across the MicroDC platform to support document processing using Docling.
Overview¶
Docling enables processing of various document formats (PDF, DOCX, PPTX, XLSX, HTML, images, audio, etc.) to extract meaningful text and markdown. This integration uses model-based routing to direct jobs to the appropriate processing engine.
Architecture: Model-Based Routing¶
Document processing has been implemented using a scalable engine architecture that routes jobs based on model_id rather than job_type. This enables multiple engines of the same type (e.g., multiple OCR engines) to coexist.
Engine Hierarchy¶
Engine (base class)
├── InferenceEngine (LLM inference)
│ └── OllamaEngine, vLLMEngine, etc.
└── ProcessingEngine (non-inference tasks)
├── DoclingEngine (provides model_id="docling")
├── TesseractEngine (provides model_id="tesseract")
└── EasyOCREngine (provides model_id="easyocr")
Key Benefits¶
- Model-Based Routing: Jobs specify
model_idto select specific processor (e.g.,model_id="docling") - Multiple Engines Per Type: Support multiple OCR engines, document engines, etc.
- Extensible: Easy to add new engine types (OCR, image, audio, video)
- Clean Separation: Different engine types have appropriate interfaces
- Type Safety: Engine capabilities are declared via enums
- Lazy Registration: Engine models registered on-demand for better performance
- Capability Registration: Workers report
supported_job_typesto server
Routing Example¶
Jobs are routed by model_id:
# Document processing job
job = Job(
model_id="docling", # Routes to DoclingEngine
job_type="document",
input_data={"source": "https://example.com/doc.pdf"}
)
# OCR job with Tesseract
job = Job(
model_id="tesseract", # Routes to TesseractEngine
job_type="ocr",
input_data={"source": "image.png"}
)
# OCR job with EasyOCR
job = Job(
model_id="easyocr", # Routes to EasyOCREngine
job_type="ocr",
input_data={"source": "image.png"}
)
Architecture¶
The integration uses the existing job system with minimal changes:
- Worker: Processes documents using Docling (primary implementation)
- Server: Routes document jobs to capable workers (minimal changes)
- Python Client: Provides API for submitting document processing jobs (new methods)
1. Worker Changes (This Repository)¶
Status: ✅ Complete¶
The worker now supports document processing through the engine architecture.
Key Components:
Engine/ProcessingEnginebase classes (src/engines/engine_base.py)DoclingEngineimplementing document processing (src/engines/docling_engine.py)DocumentProcessorclass wrapping Docling (src/processors/document_processor.py)- Extended
JobExecutorto route to processing engines - Capability registration in
WorkerRegistrationandWorkerHeartbeat - Configuration under
engine.processing.doclinginconfig/default.yaml
Worker Capability Registration¶
Workers automatically register their supported job types during:
1. Initial Registration (WorkerRegistration):
# Worker collects job types from all engines
supported_job_types = []
supported_job_types.extend(inference_engine.get_supported_job_types()) # ["llm", "embed"]
# Iterate over processing engines (now a list)
for engine in processing_engines:
supported_job_types.extend(engine.get_supported_job_types()) # ["document"]
# Results in: ["document", "embed", "llm"]
registration = WorkerRegistration(
...,
supported_job_types=["document", "embed", "llm"]
)
2. Heartbeat Updates (WorkerHeartbeat):
# Capabilities reported on every heartbeat (can change dynamically)
heartbeat = WorkerHeartbeat(
...,
supported_job_types=["document", "embed", "llm"]
)
3. Model-Based Routing (JobExecutor):
# JobExecutor builds model_id -> engine mapping on initialization
# and performs lazy lookup for processing engine models
processing_engine = await self._get_processing_engine_for_model(job.model_id)
if processing_engine:
# Route to processing engine (docling, tesseract, etc.)
result = await processing_engine.process(
model_id=job.model_id,
input_data=job.input_data,
params=job.parameters
)
else:
# Route to inference engine
result = await self.inference_engine.generate(...)
This allows the server to know which workers can handle document processing jobs and enables workers to route jobs to the correct engine based on model_id.
2. Server Changes Required¶
Status: ⚠️ Recommended (Minor Changes)¶
The worker already reports supported_job_types in registration and heartbeat. The server should store and use this information for intelligent job routing.
2.1 Store Worker Capabilities¶
File: server/models/worker.py (or database schema)
Change: Add field to store supported job types
class Worker(Base):
# ... existing fields ...
supported_job_types = Column(JSON, default=["llm", "embed"]) # Array of strings
Update on Registration:
Update on Heartbeat:
# Update if capabilities changed (e.g., new engine added)
if heartbeat.supported_job_types:
worker.supported_job_types = heartbeat.supported_job_types
2.2 Job Routing by Capability¶
File: server/api/jobs.py (job assignment endpoint)
Change: Filter workers by supported job types
def get_available_workers_for_job(job: Job):
"""Get workers that can handle this job type."""
return (
db.query(Worker)
.filter(Worker.status == "idle")
.filter(Worker.supported_job_types.contains([job.job_type])) # PostgreSQL JSON contains
.order_by(Worker.priority.desc())
.all()
)
Rationale: Only route document jobs to workers with Docling installed
2.3 API Schema Documentation¶
File: server/api/schemas.py (or equivalent)
Change: Add documentation for document job type
class JobType(str):
"""Supported job types.
- llm: Language model inference (chat/generation)
- embed: Generate text embeddings
- document: Document processing and extraction
"""
LLM = "llm"
EMBED = "embed"
DOCUMENT = "document" # Add this
Rationale: Makes the API self-documenting and appears in OpenAPI/Swagger docs
2.2 Job Validation (Optional)¶
File: server/api/validators.py (or job creation endpoint)
Current Behavior: Server accepts any string for job_type
Recommended Addition: Add validation with clear error messages
SUPPORTED_JOB_TYPES = {"llm", "embed", "document"}
def validate_job_type(job_type: str) -> str:
"""Validate job type is supported."""
if job_type not in SUPPORTED_JOB_TYPES:
raise ValueError(
f"Unsupported job_type: {job_type}. "
f"Supported types: {', '.join(SUPPORTED_JOB_TYPES)}"
)
return job_type
Rationale: Provides better error messages to users submitting invalid job types
2.3 Model Filtering (Recommended)¶
File: server/api/jobs.py (job availability endpoint)
Current Behavior: Filters jobs by model compatibility
Recommended Addition: Add job type filtering
def get_available_jobs(worker_id: str, limit: int = 5):
"""Get jobs available for this worker."""
worker = get_worker(worker_id)
worker_capabilities = worker.supported_job_types # New field
jobs = (
db.query(Job)
.filter(Job.status == "pending")
.filter(Job.job_type.in_(worker_capabilities)) # Filter by capability
.order_by(Job.priority.desc(), Job.created_at.asc())
.limit(limit)
.all()
)
return jobs
Rationale: Prevents routing document jobs to workers without Docling support
2.4 Worker Registration Enhancement¶
File: server/api/workers.py (registration endpoint)
Add Field: Allow workers to declare supported job types
class WorkerRegistration(BaseModel):
# ... existing fields ...
supported_job_types: List[str] = ["llm", "embed"] # New field
class WorkerHeartbeat(BaseModel):
# ... existing fields ...
supported_job_types: Optional[List[str]] = None # Allow updates via heartbeat
Worker Side Change:
# In worker registration
registration = WorkerRegistration(
# ... existing fields ...
supported_job_types=["llm", "embed", "document"] # Declare capability
)
Rationale: Enables intelligent job routing based on worker capabilities
2.5 Database Migration (If Adding Job Type Enum)¶
Only needed if you want database-level validation (not recommended due to reduced flexibility)
-- Option 1: Add CHECK constraint (PostgreSQL)
ALTER TABLE jobs
ADD CONSTRAINT valid_job_type
CHECK (job_type IN ('llm', 'embed', 'document'));
-- Option 2: Use ENUM type (PostgreSQL)
CREATE TYPE job_type_enum AS ENUM ('llm', 'embed', 'document');
ALTER TABLE jobs ALTER COLUMN job_type TYPE job_type_enum USING job_type::job_type_enum;
⚠️ Not Recommended: Reduces flexibility for adding new job types in the future
2.6 API Documentation¶
File: server/docs/API.md or OpenAPI schema
Add Section:
### Document Processing Jobs
Process documents and extract text/markdown using Docling.
**Job Type:** `document`
**Supported Formats:**
- PDF, DOCX, PPTX, XLSX
- HTML, Markdown
- Images (PNG, JPEG, TIFF)
- Audio (WAV, MP3)
- Video (VTT)
**Input Format:**
```json
{
"job_type": "document",
"model_id": "docling",
"input_data": {
"source": "https://example.com/document.pdf",
"source_type": "url",
"filename": "document.pdf"
},
"parameters": {
"doc_format": "markdown",
"enable_ocr": true,
"extract_tables": true,
"extract_images": false
}
}
Output Format:
{
"content": "# Extracted markdown content...",
"format": "markdown",
"metadata": {
"pages": 10,
"tables_detected": 3,
"figures_detected": 2,
"processing_time_ms": 1234
}
}
### Summary of Server Changes
| Change | Priority | Breaking Change? | Effort |
|--------|----------|------------------|--------|
| API Schema Documentation | High | No | Low |
| Job Type Validation | Medium | No | Low |
| Model Filtering | High | No | Medium |
| Worker Registration Enhancement | High | No | Medium |
| Database Migration | Low | Possibly | High |
| API Documentation | High | No | Low |
**Estimated Total Effort:** 2-4 hours
---
## 3. Python Client Changes Required
### Status: 🔧 Required for User-Facing API
The Python client needs new methods to make document processing easy for users.
### 3.1 Add Document Processing Method
**File:** `client/microdc/client.py` (or equivalent)
**Add Class:**
```python
class DocumentProcessor:
"""Document processing API client."""
def __init__(self, api_client):
self.client = api_client
def process_document(
self,
source: str,
source_type: str = "url",
output_format: str = "markdown",
enable_ocr: bool = True,
extract_tables: bool = True,
extract_images: bool = False,
model_id: str = "docling", # Required for routing
timeout: Optional[int] = 300,
**kwargs
) -> Dict[str, Any]:
"""Process a document and extract text/markdown.
Args:
source: URL, file path, or base64-encoded document
source_type: "url", "path", or "base64"
output_format: "markdown", "html", "json", or "doctags"
enable_ocr: Enable OCR for scanned documents
extract_tables: Extract table structures
extract_images: Include extracted images in output
model_id: Processing engine to use ("docling", "tesseract", etc.)
timeout: Job timeout in seconds
**kwargs: Additional processing options
Returns:
Dictionary with 'content' and 'metadata' keys
Example:
>>> processor = client.documents
>>> result = processor.process_document(
... source="https://example.com/doc.pdf",
... source_type="url",
... output_format="markdown",
... model_id="docling" # Routes to DoclingEngine
... )
>>> print(result['content'])
# Document Title...
>>> print(result['metadata']['pages'])
10
"""
job_data = {
"job_type": "document",
"model_id": model_id, # Required for model-based routing
"input_data": {
"source": source,
"source_type": source_type,
},
"parameters": {
"doc_format": output_format,
"enable_ocr": enable_ocr,
"extract_tables": extract_tables,
"extract_images": extract_images,
**kwargs
},
"timeout": timeout
}
# Submit job and wait for result
job = self.client.jobs.create(**job_data)
result = self.client.jobs.wait_for_completion(job.id, timeout=timeout)
if result.status == "completed":
return result.output
else:
raise Exception(f"Document processing failed: {result.error}")
def process_document_async(
self,
source: str,
source_type: str = "url",
**kwargs
) -> str:
"""Submit document processing job asynchronously.
Returns job_id for later polling.
Example:
>>> job_id = processor.process_document_async(
... source="https://example.com/doc.pdf"
... )
>>> # ... do other work ...
>>> result = client.jobs.get_result(job_id)
"""
job_data = {
"job_type": "document",
"input_data": {
"source": source,
"source_type": source_type,
},
"parameters": kwargs
}
job = self.client.jobs.create(**job_data)
return job.id
def process_file(
self,
file_path: str,
**kwargs
) -> Dict[str, Any]:
"""Process a local file.
Args:
file_path: Path to local document file
**kwargs: Additional options (see process_document)
Example:
>>> result = processor.process_file("./document.pdf")
"""
# Read file and encode as base64
import base64
with open(file_path, "rb") as f:
content = base64.b64encode(f.read()).decode("utf-8")
return self.process_document(
source=content,
source_type="base64",
**kwargs
)
3.2 Integrate into Main Client¶
File: client/microdc/client.py
class MicroDCClient:
def __init__(self, api_key: str, base_url: str = "https://api.microdc.ai"):
self.api_key = api_key
self.base_url = base_url
# ... existing initialization ...
# Add document processor
self.documents = DocumentProcessor(self) # New property
3.3 Add Convenience Methods¶
File: client/microdc/client.py
class MicroDCClient:
# ... existing methods ...
def process_document(self, source: str, **kwargs) -> Dict[str, Any]:
"""Shorthand for document processing.
Example:
>>> client = MicroDCClient(api_key="...")
>>> result = client.process_document("https://example.com/doc.pdf")
"""
return self.documents.process_document(source, **kwargs)
3.4 Update Client Documentation¶
File: client/README.md
Add section:
## Document Processing
Process documents and extract text using Docling:
### Basic Usage
```python
from microdc import MicroDCClient
client = MicroDCClient(api_key="your-api-key")
# Process a document from URL
result = client.process_document("https://example.com/document.pdf")
print(result['content']) # Extracted markdown
# Process local file
result = client.documents.process_file("./presentation.pptx")
# Advanced options
result = client.documents.process_document(
source="https://example.com/scan.pdf",
output_format="markdown",
enable_ocr=True,
extract_tables=True,
extract_images=False
)
# Async processing for large documents
job_id = client.documents.process_document_async("https://example.com/large.pdf")
# ... do other work ...
result = client.jobs.get_result(job_id)
Supported Formats¶
- Documents: PDF, DOCX, PPTX, XLSX
- Web: HTML, Markdown
- Images: PNG, JPEG, TIFF
- Audio: WAV, MP3
- Video: VTT subtitles
Parameters¶
output_format: Output format ("markdown", "html", "json", "doctags")enable_ocr: Enable OCR for scanned documents (default: True)extract_tables: Extract table structures (default: True)extract_images: Include extracted images (default: False)
### Summary of Client Changes
| Change | Priority | Breaking Change? | Effort |
|--------|----------|------------------|--------|
| DocumentProcessor class | High | No | Medium |
| Integration into main client | High | No | Low |
| Convenience methods | Medium | No | Low |
| Documentation | High | No | Low |
| Example scripts | Medium | No | Low |
| Unit tests | High | No | Medium |
**Estimated Total Effort:** 3-5 hours
---
## 4. Implementation Sequence
### Phase 1: Worker Implementation (Week 1)
1. ✅ Review and finalize plan
2. Add Docling integration to worker
3. Test with manual job submissions
4. Update worker documentation
### Phase 2: Server Updates (Week 1-2)
1. Add API schema documentation
2. Implement job type validation
3. Add worker capability filtering
4. Update API documentation
5. Deploy server changes
### Phase 3: Client SDK (Week 2)
1. Implement DocumentProcessor class
2. Add convenience methods
3. Write unit tests
4. Update client documentation
5. Publish new client version
### Phase 4: Testing & Rollout (Week 2-3)
1. End-to-end integration testing
2. Performance testing with large documents
3. Beta testing with select users
4. Full production rollout
---
## 5. Backward Compatibility
All changes are **backward compatible**:
- ✅ Existing LLM and embed jobs continue working unchanged
- ✅ Workers without Docling support can ignore document jobs
- ✅ Clients without document methods can still use raw API
- ✅ No database schema changes required (optional enhancements only)
---
## 6. Testing Strategy
### Worker Testing
```python
# Test document job processing
job = {
"job_type": "document",
"input_data": {
"source": "https://arxiv.org/pdf/example.pdf",
"source_type": "url"
},
"parameters": {
"doc_format": "markdown"
}
}
result = worker.process_job(job)
assert "content" in result
assert "metadata" in result
Server Testing¶
# Test job routing to capable workers
job = create_job(job_type="document")
workers = get_available_workers_for_job(job)
assert all(w.supports_job_type("document") for w in workers)
Client Testing¶
# Test client API
result = client.process_document("https://example.com/doc.pdf")
assert result["content"]
assert result["metadata"]["pages"] > 0
7. Monitoring & Metrics¶
Key Metrics to Track¶
- Document processing success rate
- Average processing time by format
- OCR usage percentage
- Error rates by document type
- Worker utilization for document jobs
Logging¶
- Log all document processing attempts
- Track Docling version and configuration
- Monitor temp file cleanup
- Alert on repeated failures
8. Security Considerations¶
Input Validation¶
- Validate URL sources (prevent SSRF)
- Limit file sizes (prevent DoS)
- Sanitize filenames
- Validate MIME types
Output Sanitization¶
- Escape HTML in markdown output
- Strip malicious content
- Limit output size
Resource Limits¶
- Max file size: 50MB (configurable)
- Timeout: 5 minutes default
- Rate limiting on document endpoints
9. Cost Considerations¶
Resource Usage¶
- Document processing is more CPU-intensive than text generation
- Large PDFs may require significant memory
- OCR processing adds substantial overhead
Pricing Recommendations¶
- Charge based on:
- Number of pages processed
- Document size (MB)
- Features enabled (OCR, table extraction)
- Output format complexity
10. Future Enhancements¶
Potential Additions¶
- Batch Processing: Process multiple documents in one job
- Document Comparison: Compare two documents for differences
- Language Detection: Auto-detect document language
- Summary Generation: Combine with LLM for document summarization
- Entity Extraction: Extract structured data (dates, names, etc.)
- Document Classification: Classify document types
- Custom Parsers: Allow users to register custom document processors
Questions?¶
For implementation questions or clarifications, refer to:
- Worker Implementation: See main Docling integration plan
- Server API:
docs/WORKER_SERVER_API.md - Docling Documentation: https://github.com/docling-project/docling