Skip to content

MicroDC.ai - Worker Client Component

Overview

The Worker Client is the compute node component of the MicroDC.ai platform that:

  • Registers with the central server as an available worker
  • Reports available models and compute capabilities
  • Polls for and executes inference jobs
  • Returns results to the server
  • Manages local resources and model loading

This document outlines the architecture and requirements for building a flexible, extensible client that can work with various inference engines.

Core Architecture

Design Principles

  1. Pluggable Inference Engine: Abstract interface allowing different backends (Ollama, vLLM, Hugging Face, etc.)
  2. Model Discovery: Automatic detection and reporting of available models
  3. Resource Management: Efficient GPU/CPU utilization and memory management
  4. Fault Tolerance: Graceful handling of failures with automatic recovery
  5. Security: Secure communication with server using API keys and TLS

Component Structure

microdc-client/
├── src/
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py           # Configuration management
│   │   ├── client.py           # Main client orchestrator
│   │   └── exceptions.py       # Custom exceptions
│   ├── api/
│   │   ├── __init__.py
│   │   ├── server_client.py    # Server API communication
│   │   ├── models.py           # Pydantic models for API
│   │   └── auth.py            # Authentication handling
│   ├── engines/
│   │   ├── __init__.py
│   │   ├── base.py            # Abstract inference engine interface
│   │   ├── ollama.py          # Ollama implementation
│   │   ├── vllm.py            # vLLM implementation (future)
│   │   └── transformers.py    # HuggingFace implementation (future)
│   ├── jobs/
│   │   ├── __init__.py
│   │   ├── executor.py        # Job execution orchestrator
│   │   ├── queue.py           # Local job queue management
│   │   └── monitor.py         # Job progress monitoring
│   ├── models/
│   │   ├── __init__.py
│   │   ├── registry.py        # Local model registry
│   │   ├── capabilities.py    # Model capability detection
│   │   └── loader.py          # Model loading/unloading
│   └── utils/
│       ├── __init__.py
│       ├── system.py          # System resource monitoring
│       ├── logging.py         # Logging configuration
│       └── health.py          # Health checks
├── tests/
├── config/
│   └── default.yaml           # Default configuration
├── requirements.txt
└── setup.py

Registration Flow

1. Initial Registration

# Pseudocode for worker registration
async def register_worker():
    """Register this worker with the MicroDC server."""

    # Step 1: Gather system information
    system_info = {
        "hostname": get_hostname(),
        "organization": config.ORGANIZATION,
        "capabilities": {
            "gpu": detect_gpu_info(),  # GPU model, memory, count
            "cpu": get_cpu_info(),      # CPU cores, model
            "memory": get_memory_info(), # Available RAM
            "storage": get_storage_info() # Available disk space
        }
    }

    # Step 2: Discover available models
    models = await discover_models()

    # Step 3: Register with server
    registration = await server_api.register_worker(
        worker_id=config.WORKER_ID or generate_worker_id(),
        system_info=system_info,
        available_models=models,
        api_key=config.API_KEY
    )

    # Step 4: Store registration details
    save_registration(registration)

    return registration

2. Model Discovery and Reporting

class ModelCapability:
    """Represents a model's capabilities."""
    model_id: str           # e.g., "llama3.1:8b"
    model_name: str         # e.g., "Meta Llama 3.1 8B"
    model_size: int         # Size in bytes
    context_length: int     # Max context window
    quantization: str       # e.g., "q4_0", "q8_0", "fp16"
    capabilities: List[str] # ["chat", "completion", "embedding"]
    requirements: Dict      # {"vram": 5000, "ram": 8000}

async def discover_models() -> List[ModelCapability]:
    """Discover all available models from the inference engine."""
    engine = get_inference_engine()
    models = []

    for model in await engine.list_models():
        capability = ModelCapability(
            model_id=model.id,
            model_name=model.name,
            model_size=model.size,
            context_length=model.context_length,
            quantization=model.quantization,
            capabilities=model.get_capabilities(),
            requirements=model.get_requirements()
        )
        models.append(capability)

    return models

3. Heartbeat and Status Updates

async def heartbeat_loop():
    """Send periodic heartbeats to the server."""
    while running:
        try:
            # Gather current status
            status = {
                "worker_id": worker_id,
                "status": get_worker_status(),  # IDLE, PROCESSING, ERROR
                "current_job": current_job_id,
                "resource_usage": {
                    "gpu_utilization": get_gpu_usage(),
                    "memory_used": get_memory_usage(),
                    "jobs_in_queue": job_queue.size()
                },
                "available_models": get_model_changes()  # Report any changes
            }

            # Send heartbeat
            response = await server_api.heartbeat(status)

            # Process server commands (if any)
            if response.commands:
                await process_commands(response.commands)

        except Exception as e:
            logger.error(f"Heartbeat failed: {e}")

        await asyncio.sleep(config.HEARTBEAT_INTERVAL)  # Default: 30 seconds

Inference Engine Interface

Abstract Base Class

from abc import ABC, abstractmethod
from typing import Dict, List, AsyncGenerator, Optional

class InferenceEngine(ABC):
    """Abstract interface for inference engines."""

    @abstractmethod
    async def initialize(self, config: Dict) -> None:
        """Initialize the inference engine."""
        pass

    @abstractmethod
    async def list_models(self) -> List[ModelInfo]:
        """List all available models."""
        pass

    @abstractmethod
    async def load_model(self, model_id: str) -> None:
        """Load a model into memory."""
        pass

    @abstractmethod
    async def unload_model(self, model_id: str) -> None:
        """Unload a model from memory."""
        pass

    @abstractmethod
    async def generate(
        self,
        model_id: str,
        prompt: str,
        params: Dict
    ) -> str:
        """Generate a completion (non-streaming)."""
        pass

    @abstractmethod
    async def generate_stream(
        self,
        model_id: str,
        prompt: str,
        params: Dict
    ) -> AsyncGenerator[str, None]:
        """Generate a completion (streaming)."""
        pass

    @abstractmethod
    async def get_model_info(self, model_id: str) -> ModelInfo:
        """Get detailed information about a model."""
        pass

    @abstractmethod
    async def health_check(self) -> bool:
        """Check if the engine is healthy."""
        pass

Ollama Implementation

import httpx
from typing import Dict, List, AsyncGenerator

class OllamaEngine(InferenceEngine):
    """Ollama inference engine implementation."""

    def __init__(self, base_url: str = "http://localhost:11434"):
        self.base_url = base_url
        self.client = httpx.AsyncClient(timeout=300.0)

    async def initialize(self, config: Dict) -> None:
        """Initialize Ollama connection."""
        self.base_url = config.get("base_url", self.base_url)
        # Verify Ollama is running
        if not await self.health_check():
            raise ConnectionError("Cannot connect to Ollama")

    async def list_models(self) -> List[ModelInfo]:
        """List models available in Ollama."""
        response = await self.client.get(f"{self.base_url}/api/tags")
        response.raise_for_status()

        models = []
        for model in response.json()["models"]:
            models.append(ModelInfo(
                id=model["name"],
                name=model["name"],
                size=model["size"],
                context_length=self._get_context_length(model),
                quantization=self._parse_quantization(model["name"])
            ))
        return models

    async def generate(
        self,
        model_id: str,
        prompt: str,
        params: Dict
    ) -> str:
        """Generate completion using Ollama."""
        payload = {
            "model": model_id,
            "prompt": prompt,
            "stream": False,
            **params  # temperature, max_tokens, etc.
        }

        response = await self.client.post(
            f"{self.base_url}/api/generate",
            json=payload
        )
        response.raise_for_status()
        return response.json()["response"]

    async def generate_stream(
        self,
        model_id: str,
        prompt: str,
        params: Dict
    ) -> AsyncGenerator[str, None]:
        """Stream generation from Ollama."""
        payload = {
            "model": model_id,
            "prompt": prompt,
            "stream": True,
            **params
        }

        async with self.client.stream(
            "POST",
            f"{self.base_url}/api/generate",
            json=payload
        ) as response:
            async for line in response.aiter_lines():
                if line:
                    data = json.loads(line)
                    if "response" in data:
                        yield data["response"]

    async def health_check(self) -> bool:
        """Check if Ollama is running."""
        try:
            response = await self.client.get(f"{self.base_url}/api/tags")
            return response.status_code == 200
        except:
            return False

Job Execution

Job Processor

class JobExecutor:
    """Executes jobs received from the server."""

    def __init__(self, engine: InferenceEngine, server_api: ServerAPI):
        self.engine = engine
        self.server_api = server_api
        self.current_job = None

    async def execute_job(self, job: Job) -> JobResult:
        """Execute a single job."""
        self.current_job = job
        result = JobResult(job_id=job.id, worker_id=self.worker_id)

        try:
            # Update job status
            await self.server_api.update_job_status(
                job.id,
                status="PROCESSING",
                worker_id=self.worker_id
            )

            # Ensure model is loaded
            if not await self.engine.is_model_loaded(job.model_id):
                await self.engine.load_model(job.model_id)

            # Execute based on job type
            if job.stream:
                result.output = await self._execute_streaming(job)
            else:
                result.output = await self._execute_batch(job)

            result.status = "COMPLETED"
            result.completed_at = datetime.utcnow()

        except Exception as e:
            result.status = "FAILED"
            result.error = str(e)
            logger.error(f"Job {job.id} failed: {e}")

        finally:
            self.current_job = None
            # Report result to server
            await self.server_api.submit_result(result)

        return result

    async def _execute_batch(self, job: Job) -> str:
        """Execute non-streaming inference."""
        return await self.engine.generate(
            model_id=job.model_id,
            prompt=job.input_data,
            params=job.parameters
        )

    async def _execute_streaming(self, job: Job) -> str:
        """Execute streaming inference with progress updates."""
        chunks = []
        async for chunk in self.engine.generate_stream(
            model_id=job.model_id,
            prompt=job.input_data,
            params=job.parameters
        ):
            chunks.append(chunk)
            # Optionally send progress updates
            if len(chunks) % 10 == 0:
                await self.server_api.update_job_progress(
                    job.id,
                    tokens_generated=len(chunks)
                )

        return "".join(chunks)

Configuration

Default Configuration (config/default.yaml)

# Server connection
server:
  url: "https://api.microdc.ai"
  api_key: "${MICRODC_API_KEY}"
  timeout: 30

# Worker identity
worker:
  id: null  # Auto-generated if not set
  name: "${HOSTNAME}"
  organization: "default"
  tags:
    - "gpu"
    - "llama"

# Inference engine
engine:
  type: "ollama"  # ollama, vllm, transformers
  config:
    ollama:
      base_url: "http://localhost:11434"
      timeout: 300
    vllm:
      base_url: "http://localhost:8000"
    transformers:
      device: "cuda"
      cache_dir: "~/.cache/huggingface"

# Job processing
jobs:
  poll_interval: 5  # seconds
  max_concurrent: 1
  timeout: 3600  # 1 hour max per job
  retry_attempts: 3

# Resource limits
resources:
  max_gpu_memory: 0.9  # Use up to 90% of GPU memory
  max_cpu_percent: 80
  max_ram_gb: null  # No limit if null

# Monitoring
monitoring:
  heartbeat_interval: 30  # seconds
  metrics_interval: 60
  log_level: "INFO"
  log_file: "worker.log"

# Storage
storage:
  cache_dir: "~/.microdc/cache"
  temp_dir: "/tmp/microdc"
  max_cache_size_gb: 50

Installation and Setup

Requirements

# Core dependencies
httpx>=0.24.0
pydantic>=2.0.0
pyyaml>=6.0
asyncio
aiofiles>=23.0.0

# Monitoring
psutil>=5.9.0
nvidia-ml-py>=12.0.0  # For GPU monitoring
prometheus-client>=0.16.0

# Logging
structlog>=23.0.0
rich>=13.0.0  # For pretty console output

# Testing
pytest>=7.0.0
pytest-asyncio>=0.21.0
pytest-mock>=3.10.0

# Optional engines
ollama-python>=0.2.0  # If using Ollama
vllm>=0.4.0  # If using vLLM
transformers>=4.30.0  # If using HuggingFace

Quick Start

# 1. Install the client
pip install microdc-client

# 2. Configure your API key
export MICRODC_API_KEY="your-api-key-here"

# 3. Start Ollama (or your preferred engine)
ollama serve

# 4. Pull a model
ollama pull llama3.1:8b

# 5. Start the worker
microdc-worker start --config config.yaml

CLI Commands

# Main commands
microdc-worker start          # Start the worker client
microdc-worker stop           # Stop the worker client
microdc-worker status         # Check worker status

# Registration
microdc-worker register       # Register with server
microdc-worker deregister     # Deregister from server

# Model management
microdc-worker models list    # List available models
microdc-worker models pull    # Pull a new model
microdc-worker models remove  # Remove a model

# Configuration
microdc-worker config show    # Show current configuration
microdc-worker config set     # Set configuration value
microdc-worker config validate # Validate configuration

# Monitoring
microdc-worker logs          # Show worker logs
microdc-worker metrics       # Show performance metrics
microdc-worker health        # Health check

Security Considerations

  1. API Key Management: Store API keys securely, never in code
  2. TLS Communication: Always use HTTPS for server communication
  3. Input Validation: Validate all job inputs before execution
  4. Resource Limits: Enforce strict resource limits to prevent abuse
  5. Sandboxing: Consider running inference in isolated environments
  6. Audit Logging: Log all job executions for accountability

Performance Optimization

  1. Model Caching: Keep frequently used models loaded in memory
  2. Batch Processing: Group similar requests for efficiency
  3. Connection Pooling: Reuse HTTP connections to the server
  4. Async I/O: Use async operations throughout for concurrency
  5. Resource Monitoring: Track and optimize resource usage
  6. Queue Management: Implement smart job queuing strategies

Error Handling

class WorkerError(Exception):
    """Base exception for worker errors."""
    pass

class RegistrationError(WorkerError):
    """Failed to register with server."""
    pass

class ModelNotFoundError(WorkerError):
    """Requested model not available."""
    pass

class InferenceError(WorkerError):
    """Inference execution failed."""
    pass

class ResourceExhaustedError(WorkerError):
    """Insufficient resources to execute job."""
    pass

# Retry logic for transient failures
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def resilient_job_execution(job):
    """Execute job with automatic retry on failure."""
    try:
        return await execute_job(job)
    except (httpx.TimeoutException, httpx.ConnectError) as e:
        logger.warning(f"Transient error, retrying: {e}")
        raise
    except ResourceExhaustedError:
        # Don't retry resource errors
        raise

Testing Strategy

  1. Unit Tests: Test each component in isolation
  2. Integration Tests: Test engine integrations
  3. Mock Server Tests: Test against a mock MicroDC server
  4. Load Tests: Verify performance under load
  5. Failure Tests: Test error handling and recovery

Development Roadmap

Phase 1: Core Implementation (Current)

  • [x] Define architecture and interfaces
  • [ ] Implement Ollama engine integration
  • [ ] Basic job execution
  • [ ] Server registration and heartbeat

Phase 2: Robustness

  • [ ] Error handling and retry logic
  • [ ] Resource monitoring and limits
  • [ ] Graceful shutdown
  • [ ] Persistent job queue

Phase 3: Additional Engines

  • [ ] vLLM integration
  • [ ] HuggingFace Transformers integration
  • [ ] Custom engine plugin system

Phase 4: Advanced Features

  • [ ] Model auto-pulling on demand
  • [ ] Multi-GPU support
  • [ ] Job prioritization
  • [ ] Result caching
  • [ ] Distributed inference

Contributing

Guidelines for adding new inference engines:

  1. Implement the InferenceEngine abstract class
  2. Add engine-specific configuration options
  3. Write comprehensive tests
  4. Document any special requirements
  5. Submit PR with examples