MicroDC.ai - Worker Client Component¶
Overview¶
The Worker Client is the compute node component of the MicroDC.ai platform that:
- Registers with the central server as an available worker
- Reports available models and compute capabilities
- Polls for and executes inference jobs
- Returns results to the server
- Manages local resources and model loading
This document outlines the architecture and requirements for building a flexible, extensible client that can work with various inference engines.
Core Architecture¶
Design Principles¶
- Pluggable Inference Engine: Abstract interface allowing different backends (Ollama, vLLM, Hugging Face, etc.)
- Model Discovery: Automatic detection and reporting of available models
- Resource Management: Efficient GPU/CPU utilization and memory management
- Fault Tolerance: Graceful handling of failures with automatic recovery
- Security: Secure communication with server using API keys and TLS
Component Structure¶
microdc-client/
├── src/
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py # Configuration management
│ │ ├── client.py # Main client orchestrator
│ │ └── exceptions.py # Custom exceptions
│ ├── api/
│ │ ├── __init__.py
│ │ ├── server_client.py # Server API communication
│ │ ├── models.py # Pydantic models for API
│ │ └── auth.py # Authentication handling
│ ├── engines/
│ │ ├── __init__.py
│ │ ├── base.py # Abstract inference engine interface
│ │ ├── ollama.py # Ollama implementation
│ │ ├── vllm.py # vLLM implementation (future)
│ │ └── transformers.py # HuggingFace implementation (future)
│ ├── jobs/
│ │ ├── __init__.py
│ │ ├── executor.py # Job execution orchestrator
│ │ ├── queue.py # Local job queue management
│ │ └── monitor.py # Job progress monitoring
│ ├── models/
│ │ ├── __init__.py
│ │ ├── registry.py # Local model registry
│ │ ├── capabilities.py # Model capability detection
│ │ └── loader.py # Model loading/unloading
│ └── utils/
│ ├── __init__.py
│ ├── system.py # System resource monitoring
│ ├── logging.py # Logging configuration
│ └── health.py # Health checks
├── tests/
├── config/
│ └── default.yaml # Default configuration
├── requirements.txt
└── setup.py
Registration Flow¶
1. Initial Registration¶
# Pseudocode for worker registration
async def register_worker():
"""Register this worker with the MicroDC server."""
# Step 1: Gather system information
system_info = {
"hostname": get_hostname(),
"organization": config.ORGANIZATION,
"capabilities": {
"gpu": detect_gpu_info(), # GPU model, memory, count
"cpu": get_cpu_info(), # CPU cores, model
"memory": get_memory_info(), # Available RAM
"storage": get_storage_info() # Available disk space
}
}
# Step 2: Discover available models
models = await discover_models()
# Step 3: Register with server
registration = await server_api.register_worker(
worker_id=config.WORKER_ID or generate_worker_id(),
system_info=system_info,
available_models=models,
api_key=config.API_KEY
)
# Step 4: Store registration details
save_registration(registration)
return registration
2. Model Discovery and Reporting¶
class ModelCapability:
"""Represents a model's capabilities."""
model_id: str # e.g., "llama3.1:8b"
model_name: str # e.g., "Meta Llama 3.1 8B"
model_size: int # Size in bytes
context_length: int # Max context window
quantization: str # e.g., "q4_0", "q8_0", "fp16"
capabilities: List[str] # ["chat", "completion", "embedding"]
requirements: Dict # {"vram": 5000, "ram": 8000}
async def discover_models() -> List[ModelCapability]:
"""Discover all available models from the inference engine."""
engine = get_inference_engine()
models = []
for model in await engine.list_models():
capability = ModelCapability(
model_id=model.id,
model_name=model.name,
model_size=model.size,
context_length=model.context_length,
quantization=model.quantization,
capabilities=model.get_capabilities(),
requirements=model.get_requirements()
)
models.append(capability)
return models
3. Heartbeat and Status Updates¶
async def heartbeat_loop():
"""Send periodic heartbeats to the server."""
while running:
try:
# Gather current status
status = {
"worker_id": worker_id,
"status": get_worker_status(), # IDLE, PROCESSING, ERROR
"current_job": current_job_id,
"resource_usage": {
"gpu_utilization": get_gpu_usage(),
"memory_used": get_memory_usage(),
"jobs_in_queue": job_queue.size()
},
"available_models": get_model_changes() # Report any changes
}
# Send heartbeat
response = await server_api.heartbeat(status)
# Process server commands (if any)
if response.commands:
await process_commands(response.commands)
except Exception as e:
logger.error(f"Heartbeat failed: {e}")
await asyncio.sleep(config.HEARTBEAT_INTERVAL) # Default: 30 seconds
Inference Engine Interface¶
Abstract Base Class¶
from abc import ABC, abstractmethod
from typing import Dict, List, AsyncGenerator, Optional
class InferenceEngine(ABC):
"""Abstract interface for inference engines."""
@abstractmethod
async def initialize(self, config: Dict) -> None:
"""Initialize the inference engine."""
pass
@abstractmethod
async def list_models(self) -> List[ModelInfo]:
"""List all available models."""
pass
@abstractmethod
async def load_model(self, model_id: str) -> None:
"""Load a model into memory."""
pass
@abstractmethod
async def unload_model(self, model_id: str) -> None:
"""Unload a model from memory."""
pass
@abstractmethod
async def generate(
self,
model_id: str,
prompt: str,
params: Dict
) -> str:
"""Generate a completion (non-streaming)."""
pass
@abstractmethod
async def generate_stream(
self,
model_id: str,
prompt: str,
params: Dict
) -> AsyncGenerator[str, None]:
"""Generate a completion (streaming)."""
pass
@abstractmethod
async def get_model_info(self, model_id: str) -> ModelInfo:
"""Get detailed information about a model."""
pass
@abstractmethod
async def health_check(self) -> bool:
"""Check if the engine is healthy."""
pass
Ollama Implementation¶
import httpx
from typing import Dict, List, AsyncGenerator
class OllamaEngine(InferenceEngine):
"""Ollama inference engine implementation."""
def __init__(self, base_url: str = "http://localhost:11434"):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=300.0)
async def initialize(self, config: Dict) -> None:
"""Initialize Ollama connection."""
self.base_url = config.get("base_url", self.base_url)
# Verify Ollama is running
if not await self.health_check():
raise ConnectionError("Cannot connect to Ollama")
async def list_models(self) -> List[ModelInfo]:
"""List models available in Ollama."""
response = await self.client.get(f"{self.base_url}/api/tags")
response.raise_for_status()
models = []
for model in response.json()["models"]:
models.append(ModelInfo(
id=model["name"],
name=model["name"],
size=model["size"],
context_length=self._get_context_length(model),
quantization=self._parse_quantization(model["name"])
))
return models
async def generate(
self,
model_id: str,
prompt: str,
params: Dict
) -> str:
"""Generate completion using Ollama."""
payload = {
"model": model_id,
"prompt": prompt,
"stream": False,
**params # temperature, max_tokens, etc.
}
response = await self.client.post(
f"{self.base_url}/api/generate",
json=payload
)
response.raise_for_status()
return response.json()["response"]
async def generate_stream(
self,
model_id: str,
prompt: str,
params: Dict
) -> AsyncGenerator[str, None]:
"""Stream generation from Ollama."""
payload = {
"model": model_id,
"prompt": prompt,
"stream": True,
**params
}
async with self.client.stream(
"POST",
f"{self.base_url}/api/generate",
json=payload
) as response:
async for line in response.aiter_lines():
if line:
data = json.loads(line)
if "response" in data:
yield data["response"]
async def health_check(self) -> bool:
"""Check if Ollama is running."""
try:
response = await self.client.get(f"{self.base_url}/api/tags")
return response.status_code == 200
except:
return False
Job Execution¶
Job Processor¶
class JobExecutor:
"""Executes jobs received from the server."""
def __init__(self, engine: InferenceEngine, server_api: ServerAPI):
self.engine = engine
self.server_api = server_api
self.current_job = None
async def execute_job(self, job: Job) -> JobResult:
"""Execute a single job."""
self.current_job = job
result = JobResult(job_id=job.id, worker_id=self.worker_id)
try:
# Update job status
await self.server_api.update_job_status(
job.id,
status="PROCESSING",
worker_id=self.worker_id
)
# Ensure model is loaded
if not await self.engine.is_model_loaded(job.model_id):
await self.engine.load_model(job.model_id)
# Execute based on job type
if job.stream:
result.output = await self._execute_streaming(job)
else:
result.output = await self._execute_batch(job)
result.status = "COMPLETED"
result.completed_at = datetime.utcnow()
except Exception as e:
result.status = "FAILED"
result.error = str(e)
logger.error(f"Job {job.id} failed: {e}")
finally:
self.current_job = None
# Report result to server
await self.server_api.submit_result(result)
return result
async def _execute_batch(self, job: Job) -> str:
"""Execute non-streaming inference."""
return await self.engine.generate(
model_id=job.model_id,
prompt=job.input_data,
params=job.parameters
)
async def _execute_streaming(self, job: Job) -> str:
"""Execute streaming inference with progress updates."""
chunks = []
async for chunk in self.engine.generate_stream(
model_id=job.model_id,
prompt=job.input_data,
params=job.parameters
):
chunks.append(chunk)
# Optionally send progress updates
if len(chunks) % 10 == 0:
await self.server_api.update_job_progress(
job.id,
tokens_generated=len(chunks)
)
return "".join(chunks)
Configuration¶
Default Configuration (config/default.yaml)¶
# Server connection
server:
url: "https://api.microdc.ai"
api_key: "${MICRODC_API_KEY}"
timeout: 30
# Worker identity
worker:
id: null # Auto-generated if not set
name: "${HOSTNAME}"
organization: "default"
tags:
- "gpu"
- "llama"
# Inference engine
engine:
type: "ollama" # ollama, vllm, transformers
config:
ollama:
base_url: "http://localhost:11434"
timeout: 300
vllm:
base_url: "http://localhost:8000"
transformers:
device: "cuda"
cache_dir: "~/.cache/huggingface"
# Job processing
jobs:
poll_interval: 5 # seconds
max_concurrent: 1
timeout: 3600 # 1 hour max per job
retry_attempts: 3
# Resource limits
resources:
max_gpu_memory: 0.9 # Use up to 90% of GPU memory
max_cpu_percent: 80
max_ram_gb: null # No limit if null
# Monitoring
monitoring:
heartbeat_interval: 30 # seconds
metrics_interval: 60
log_level: "INFO"
log_file: "worker.log"
# Storage
storage:
cache_dir: "~/.microdc/cache"
temp_dir: "/tmp/microdc"
max_cache_size_gb: 50
Installation and Setup¶
Requirements¶
# Core dependencies
httpx>=0.24.0
pydantic>=2.0.0
pyyaml>=6.0
asyncio
aiofiles>=23.0.0
# Monitoring
psutil>=5.9.0
nvidia-ml-py>=12.0.0 # For GPU monitoring
prometheus-client>=0.16.0
# Logging
structlog>=23.0.0
rich>=13.0.0 # For pretty console output
# Testing
pytest>=7.0.0
pytest-asyncio>=0.21.0
pytest-mock>=3.10.0
# Optional engines
ollama-python>=0.2.0 # If using Ollama
vllm>=0.4.0 # If using vLLM
transformers>=4.30.0 # If using HuggingFace
Quick Start¶
# 1. Install the client
pip install microdc-client
# 2. Configure your API key
export MICRODC_API_KEY="your-api-key-here"
# 3. Start Ollama (or your preferred engine)
ollama serve
# 4. Pull a model
ollama pull llama3.1:8b
# 5. Start the worker
microdc-worker start --config config.yaml
CLI Commands¶
# Main commands
microdc-worker start # Start the worker client
microdc-worker stop # Stop the worker client
microdc-worker status # Check worker status
# Registration
microdc-worker register # Register with server
microdc-worker deregister # Deregister from server
# Model management
microdc-worker models list # List available models
microdc-worker models pull # Pull a new model
microdc-worker models remove # Remove a model
# Configuration
microdc-worker config show # Show current configuration
microdc-worker config set # Set configuration value
microdc-worker config validate # Validate configuration
# Monitoring
microdc-worker logs # Show worker logs
microdc-worker metrics # Show performance metrics
microdc-worker health # Health check
Security Considerations¶
- API Key Management: Store API keys securely, never in code
- TLS Communication: Always use HTTPS for server communication
- Input Validation: Validate all job inputs before execution
- Resource Limits: Enforce strict resource limits to prevent abuse
- Sandboxing: Consider running inference in isolated environments
- Audit Logging: Log all job executions for accountability
Performance Optimization¶
- Model Caching: Keep frequently used models loaded in memory
- Batch Processing: Group similar requests for efficiency
- Connection Pooling: Reuse HTTP connections to the server
- Async I/O: Use async operations throughout for concurrency
- Resource Monitoring: Track and optimize resource usage
- Queue Management: Implement smart job queuing strategies
Error Handling¶
class WorkerError(Exception):
"""Base exception for worker errors."""
pass
class RegistrationError(WorkerError):
"""Failed to register with server."""
pass
class ModelNotFoundError(WorkerError):
"""Requested model not available."""
pass
class InferenceError(WorkerError):
"""Inference execution failed."""
pass
class ResourceExhaustedError(WorkerError):
"""Insufficient resources to execute job."""
pass
# Retry logic for transient failures
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def resilient_job_execution(job):
"""Execute job with automatic retry on failure."""
try:
return await execute_job(job)
except (httpx.TimeoutException, httpx.ConnectError) as e:
logger.warning(f"Transient error, retrying: {e}")
raise
except ResourceExhaustedError:
# Don't retry resource errors
raise
Testing Strategy¶
- Unit Tests: Test each component in isolation
- Integration Tests: Test engine integrations
- Mock Server Tests: Test against a mock MicroDC server
- Load Tests: Verify performance under load
- Failure Tests: Test error handling and recovery
Development Roadmap¶
Phase 1: Core Implementation (Current)¶
- [x] Define architecture and interfaces
- [ ] Implement Ollama engine integration
- [ ] Basic job execution
- [ ] Server registration and heartbeat
Phase 2: Robustness¶
- [ ] Error handling and retry logic
- [ ] Resource monitoring and limits
- [ ] Graceful shutdown
- [ ] Persistent job queue
Phase 3: Additional Engines¶
- [ ] vLLM integration
- [ ] HuggingFace Transformers integration
- [ ] Custom engine plugin system
Phase 4: Advanced Features¶
- [ ] Model auto-pulling on demand
- [ ] Multi-GPU support
- [ ] Job prioritization
- [ ] Result caching
- [ ] Distributed inference
Contributing¶
Guidelines for adding new inference engines:
- Implement the
InferenceEngineabstract class - Add engine-specific configuration options
- Write comprehensive tests
- Document any special requirements
- Submit PR with examples