Skip to content

Worker API Documentation

This document describes how workers interact with the MicroHub server to process jobs.

Authentication

Workers authenticate using Bearer tokens obtained during registration:

Authorization: Bearer <worker_token>

API Base URL

All worker API endpoints use the /api/v1/workers prefix:

  • Production: https://api.microdc.ai/api/v1/workers
  • Development: http://localhost:8000/api/v1/workers

Job Assignment System

The MicroHub platform uses a claim-based job assignment system to prevent multiple workers from competing for the same job. This ensures:

  • No race conditions with hundreds of workers
  • Jobs are only shown to workers that can handle them
  • Fair distribution of work
  • Atomic job claiming (only one worker gets each job)

How It Works

  1. Check Available Jobs - Worker requests list of jobs matching its capabilities
  2. Claim a Job - Worker atomically claims a specific job (first claim wins)
  3. Process - Only the successful claimer processes the job
  4. Complete/Fail - Submit results or report failure

API Endpoints

1. Get Available Jobs

Endpoint: GET /api/v1/workers/jobs/available

Query Parameters:

  • limit (optional, default=5): Maximum number of jobs to return

Description: Returns jobs that match the worker's capabilities and are available to claim.

response = requests.get(
    f"{server_url}/api/v1/workers/jobs/available",
    headers={"Authorization": f"Bearer {worker_token}"},
    params={"limit": 5}
)

data = response.json()
print(f"Found {data['count']} available jobs")

Response:

{
    "jobs": [
        {
            "job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
            "model": "llama2-7b",
            "type": "llm",
            "priority": "high",
            "estimated_reward": 0.0125,
            "created_at": "2024-01-01T12:00:00Z",
            "retry_count": 0,
            "expires_in": 30
        }
    ],
    "count": 1,
    "message": "Found 1 available jobs"
}

Important Notes:

  • Jobs returned are filtered by worker's supported models
  • Jobs with retry_count >= 3 are not shown
  • Jobs are ordered by priority (descending) then creation time (ascending)
  • These jobs are NOT reserved - you must claim them atomically

2. Claim a Job

Endpoint: POST /api/v1/workers/jobs/{job_id}/claim

Description: Atomically claim a job. Uses database row-level locking to ensure only one worker can claim each job.

response = requests.post(
    f"{server_url}/api/v1/workers/jobs/{job_id}/claim",
    headers={"Authorization": f"Bearer {worker_token}"}
)

result = response.json()
if result["status"] == "claimed":
    job_data = result["job_data"]
    print(f"Successfully claimed job {job_data['job_id']}")
    # Start processing the job...
else:
    print(f"Could not claim job: {result['status']}")

Success Response (status=claimed):

{
    "status": "claimed",
    "message": "Job successfully claimed",
    "job_data": {
        "job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
        "assignment_id": "7b3d5f64-1234-4562-b3fc-2c963f66afa6",
        "type": "llm",
        "model": "llama2-7b",
        "payload": {
            "prompt": "Hello, how are you?",
            "max_tokens": 512,
            "temperature": 0.7
        },
        "llm_interaction_type": "generation",  // "generation" or "chat"
        "input_modalities": ["text"],           // Input types: text, image, audio, video
        "output_modalities": ["text"],          // Expected output types
        "priority": "high",
        "timeout": 300,
        "started_at": "2024-01-01T12:00:00Z",
        "must_complete_by": "2024-01-01T12:05:00Z"
    }
}

Failure Response Statuses:

  • already_claimed - Another worker claimed the job
  • incompatible - Worker doesn't support the required model
  • worker_busy - Worker is already processing another job
  • 404 - Job not found

3. Complete Job

Endpoint: POST /api/v1/workers/jobs/{job_id}/complete

Description: Submit successful job completion with results.

import time

start_time = time.time()
result = process_job(job_data)  # Your processing logic
execution_time = time.time() - start_time

response = requests.post(
    f"{server_url}/api/v1/workers/jobs/{job_id}/complete",
    headers={"Authorization": f"Bearer {worker_token}"},
    json={
        "output": result,
        "execution_time": execution_time,
        "tokens_used": 150  # Optional
    }
)

Request Body:

{
    "output": {
        "text": "Generated response...",
        "finish_reason": "stop"
    },
    "execution_time": 2.5,
    "tokens_used": 150
}

Response:

{
    "status": "completed",
    "earnings": 0.0125,
    "next_poll": 60
}

4. Report Job Failure

Endpoint: POST /api/v1/workers/jobs/{job_id}/fail

Description: Report that a job could not be completed.

response = requests.post(
    f"{server_url}/api/v1/workers/jobs/{job_id}/fail",
    headers={"Authorization": f"Bearer {worker_token}"},
    json={
        "reason": "Model loading failed: Out of memory",
        "error_details": traceback.format_exc()  # Optional
    }
)

Request Body:

{
    "reason": "Out of memory",
    "error_details": "Detailed error traceback (optional)"
}

5. Worker Status Heartbeat

Endpoint: POST /api/v1/workers/heartbeat

Description: Send periodic heartbeat to update worker status, resource usage, and system metrics. This should be sent every 60 seconds to maintain active status.

IMPORTANT FOR LONG-RUNNING JOBS: If a worker is processing a job that may exceed the timeout (default 300 seconds / 5 minutes), it MUST include job_progress in the heartbeat to reset the timeout and prevent the job from being reassigned. This allows jobs to run indefinitely as long as the worker reports progress.

Request Body:

{
    "status": "idle",  // or "busy", "maintenance"
    "cpu_usage": 25.5,
    "memory_usage": 45.2,
    "gpu_usage": 0.0,  // Optional, null if no GPU
    "active_jobs": 0,
    "completed_jobs_session": 5,
    "worker_version": "1.0.0",  // Optional: Worker software version
    "current_models": ["llama2-7b", "mistral-7b"],  // Optional: Currently available models
    "job_progress": {  // IMPORTANT: Include this when processing a job to reset timeout
        "job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
        "assignment_id": "7b3d5f64-1234-4562-b3fc-2c963f66afa6",
        "progress_percentage": 45.0,  // Optional: 0-100
        "message": "Loading model weights",  // Optional: Human-readable status
        "estimated_remaining_seconds": 120  // Optional: Estimated time to completion
    },
    "system_metrics": {  // Optional: Detailed system metrics
        "load_average": [1.5, 2.0, 1.8],  // 1, 5, and 15 minute load averages
        "cpu_count": 16,
        "memory_total_gb": 64.0,
        "memory_available_gb": 32.5,
        "memory_percent": 49.2,
        "disk_total_gb": 1000.0,
        "disk_available_gb": 450.0,
        "disk_percent": 55.0,
        "gpu_memory_total_gb": 24.0,  // Optional
        "gpu_memory_used_gb": 8.5,     // Optional
        "gpu_temperature_c": 65.0,      // Optional: GPU temperature in Celsius
        "cpu_temperature_c": 58.0,      // Optional: CPU temperature in Celsius
        "network_sent_mb": 1024.5,      // Optional: Network bytes sent in MB
        "network_recv_mb": 2048.3,      // Optional: Network bytes received in MB
        "uptime_seconds": 86400
    },
    "supported_models": ["llama2-7b", "mistral-7b"]  // Legacy: Use current_models instead
}

Response:

{
    "status": "acknowledged",
    "server_time": "2024-01-01T12:00:00Z",
    "next_heartbeat": 60,
    "model_sync": {
        "added": ["new-model"],
        "removed": ["old-model"],
        "updated": ["existing-model"]
    },
    "metrics_received": true,
    "progress_recorded": true  // true if job_progress was successfully recorded
}

Notes:

  • The current_models field should list all models currently available on the worker
  • System metrics help the server monitor worker health and resource availability
  • All temperature values should be in Celsius
  • Load average follows Unix convention: [1-minute, 5-minute, 15-minute]
  • Network metrics are cumulative since worker start
  • Job Progress: When processing a job, include job_progress to extend the timeout. Each progress update resets the timeout window, allowing jobs to run longer than the initial timeout limit.
  • Timeout Handling: If no progress is reported, jobs will timeout based on timeout_seconds (default 300s). Including progress updates prevents timeout and job reassignment.

6. Optional Job Endpoints

Send Job Heartbeat

Endpoint: POST /api/v1/workers/jobs/{job_id}/heartbeat

Keep job alive and report progress (not currently implemented in server).

Release Job

Endpoint: POST /api/v1/workers/jobs/{job_id}/release

Release a claimed job back to queue if unable to process (not currently implemented in server).

Job Processing

Execute the job based on its type, interaction type, and modalities:

def process_job(job):
    job_type = job["type"]
    model = job["model"]
    payload = job["payload"]
    llm_interaction_type = job.get("llm_interaction_type")
    input_modalities = job.get("input_modalities", ["text"])
    output_modalities = job.get("output_modalities", ["text"])

    if job_type == "llm":
        # Route based on LLM interaction type
        if llm_interaction_type == "chat":
            # Chat-based interaction (messages format)
            result = run_chat_inference(
                model=model,
                messages=payload["messages"],
                max_tokens=payload.get("max_tokens", 512),
                temperature=payload.get("temperature", 0.7),
                input_modalities=input_modalities,
                output_modalities=output_modalities
            )
        elif llm_interaction_type == "generation":
            # Text generation (completion format)
            result = run_llm_inference(
                model=model,
                prompt=payload["prompt"],
                max_tokens=payload.get("max_tokens", 512),
                temperature=payload.get("temperature", 0.7),
                input_modalities=input_modalities,
                output_modalities=output_modalities
            )
        return result

    elif job_type == "embed":
        # Generate embeddings
        result = generate_embeddings(
            model=model,
            texts=payload["texts"]
        )
        return result

    # Add more job types as needed

Multimodal Job Examples

Text-to-Image Generation

if job_type == "llm" and "image" in output_modalities:
    # Input: text, Output: image
    result = generate_image(
        model=model,
        prompt=payload["prompt"],
        size=payload.get("size", "1024x1024")
    )
    return {
        "image_url": result.url,
        "image_data": result.base64  # or URL
    }

Vision Model (Image + Text → Text)

if "image" in input_modalities and "text" in input_modalities:
    # Multi-modal input processing
    result = process_vision_model(
        model=model,
        text=payload["prompt"],
        images=payload["images"],  # Base64 or URLs
        max_tokens=payload.get("max_tokens", 512)
    )
    return {
        "text": result.text,
        "finish_reason": "stop"
    }

Audio Transcription

if "audio" in input_modalities:
    # Audio to text
    result = transcribe_audio(
        model=model,
        audio_data=payload["audio"],  # Base64 or URL
        language=payload.get("language", "en")
    )
    return {
        "text": result.transcription,
        "language": result.detected_language
    }

Video Analysis

if "video" in input_modalities:
    # Video to text/image analysis
    result = analyze_video(
        model=model,
        video_url=payload["video_url"],
        analysis_type=payload.get("analysis_type", "description")
    )
    return {
        "analysis": result.text,
        "keyframes": result.keyframes if "image" in output_modalities else None
    }

Complete Python Worker Example

import requests
import time
import logging
from typing import Dict, Any, Optional, List
import random

class MicroHubWorker:
    def __init__(self, server_url: str, worker_token: str):
        self.server_url = server_url.rstrip('/')
        self.worker_token = worker_token
        self.headers = {
            "Authorization": f"Bearer {worker_token}",
            "Content-Type": "application/json"
        }
        self.logger = logging.getLogger(__name__)

    def get_available_jobs(self, limit: int = 5) -> List[Dict[str, Any]]:
        """Get list of jobs this worker can handle"""
        try:
            response = requests.get(
                f"{self.server_url}/api/v1/workers/jobs/available",
                headers=self.headers,
                params={"limit": limit}
            )

            if response.status_code == 200:
                data = response.json()
                return data.get("jobs", [])

            return []

        except Exception as e:
            self.logger.error(f"Error getting available jobs: {e}")
            return []

    def claim_job(self, job_id: str) -> Optional[Dict[str, Any]]:
        """Attempt to claim a specific job"""
        try:
            response = requests.post(
                f"{self.server_url}/api/v1/workers/jobs/{job_id}/claim",
                headers=self.headers
            )

            if response.status_code == 200:
                data = response.json()
                if data["status"] == "claimed":
                    self.logger.info(f"Successfully claimed job {job_id}")
                    return data["job_data"]
                else:
                    self.logger.info(f"Could not claim job {job_id}: {data['status']}")

            return None

        except Exception as e:
            self.logger.error(f"Error claiming job: {e}")
            return None

    def send_worker_heartbeat(self, status: str = "idle", active_jobs: int = 0,
                             cpu_usage: float = 0, memory_usage: float = 0,
                             gpu_usage: Optional[float] = None,
                             completed_jobs_session: int = 0,
                             supported_models: Optional[List[str]] = None,
                             job_progress: Optional[Dict[str, Any]] = None) -> bool:
        """Send worker status heartbeat (every 60 seconds)

        Args:
            job_progress: Optional dict with job_id, assignment_id, and optional
                         progress_percentage, message, estimated_remaining_seconds.
                         Including this resets the job timeout.
        """
        try:
            payload = {
                "status": status,  # "idle", "busy", or "maintenance"
                "cpu_usage": cpu_usage,
                "memory_usage": memory_usage,
                "gpu_usage": gpu_usage,
                "active_jobs": active_jobs,
                "completed_jobs_session": completed_jobs_session
            }

            if supported_models:
                payload["supported_models"] = supported_models

            if job_progress:
                payload["job_progress"] = job_progress

            response = requests.post(
                f"{self.server_url}/api/v1/workers/heartbeat",
                headers=self.headers,
                json=payload
            )

            if response.status_code == 200:
                self.logger.debug("Worker heartbeat sent successfully")
                return True
            else:
                self.logger.error(f"Worker heartbeat failed: {response.text}")
                return False

        except Exception as e:
            self.logger.error(f"Error sending worker heartbeat: {e}")
            return False

    def send_job_heartbeat(self, job_id: str, progress: float = None, message: str = None) -> bool:
        """Send heartbeat to keep job alive (not yet implemented in server)"""
        try:
            payload = {}
            if progress is not None:
                payload["progress"] = progress
            if message:
                payload["message"] = message

            response = requests.post(
                f"{self.server_url}/api/v1/workers/jobs/{job_id}/heartbeat",
                headers=self.headers,
                json=payload
            )
            return response.status_code == 200

        except Exception as e:
            self.logger.error(f"Error sending job heartbeat: {e}")
            return False

    def complete_job(self, job_id: str, result: Dict[str, Any],
                    execution_time: float, tokens_used: Optional[int] = None) -> bool:
        """Submit successful job completion"""
        try:
            payload = {
                "output": result,
                "execution_time": execution_time
            }

            if tokens_used:
                payload["tokens_used"] = tokens_used

            response = requests.post(
                f"{self.server_url}/api/v1/workers/jobs/{job_id}/complete",
                headers=self.headers,
                json=payload
            )

            if response.status_code == 200:
                data = response.json()
                self.logger.info(f"Job completed! Earned: ${data.get('earnings', 0)}")
                return True
            else:
                self.logger.error(f"Failed to complete job: {response.text}")
                return False

        except Exception as e:
            self.logger.error(f"Error completing job: {e}")
            return False

    def fail_job(self, job_id: str, reason: str, details: Optional[str] = None) -> bool:
        """Report job failure"""
        try:
            payload = {"reason": reason}
            if details:
                payload["error_details"] = details

            response = requests.post(
                f"{self.server_url}/api/v1/workers/jobs/{job_id}/fail",
                headers=self.headers,
                json=payload
            )

            return response.status_code == 200

        except Exception as e:
            self.logger.error(f"Error reporting job failure: {e}")
            return False

    def process_job(self, job: Dict[str, Any]) -> Dict[str, Any]:
        """Process a job based on its type, interaction type, and modalities - Override this method"""
        job_type = job["type"]
        model = job["model"]
        payload = job["payload"]
        llm_interaction_type = job.get("llm_interaction_type")
        input_modalities = job.get("input_modalities", ["text"])
        output_modalities = job.get("output_modalities", ["text"])

        # This is where you implement your actual processing logic
        if job_type == "llm":
            # Route based on LLM interaction type
            if llm_interaction_type == "chat":
                # Chat-based interaction
                return {
                    "text": f"Chat response to: {payload.get('messages', [])}",
                    "finish_reason": "stop"
                }
            elif llm_interaction_type == "generation":
                # Text generation
                return {
                    "text": f"Generated from: {payload.get('prompt', '')}",
                    "finish_reason": "stop"
                }

            # Handle multimodal cases
            if "image" in output_modalities:
                # Text-to-image generation
                return {"image_url": "https://example.com/generated.png"}

        elif job_type == "embed":
            # Example embedding processing
            return {
                "embeddings": [[0.1] * 768]  # Replace with actual embeddings
            }
        else:
            raise ValueError(f"Unsupported job type: {job_type}")

    def run(self, poll_interval: int = 10, heartbeat_interval: int = 60):
        """Main worker loop with claim-based assignment"""
        self.logger.info("Worker started (claim-based system)")
        last_heartbeat = 0
        current_job = None

        while True:
            try:
                # Send periodic heartbeat (every 60s)
                if time.time() - last_heartbeat > heartbeat_interval:
                    job_progress = None
                    if current_job:
                        # Include job progress to reset timeout for long-running jobs
                        job_progress = {
                            "job_id": current_job["job_id"],
                            "assignment_id": current_job["assignment_id"],
                            "progress_percentage": current_job.get("progress", 0),
                            "message": current_job.get("status_message", "Processing"),
                        }

                    self.send_worker_heartbeat(
                        status="busy" if current_job else "idle",
                        active_jobs=1 if current_job else 0,
                        cpu_usage=0,  # TODO: Get actual metrics
                        memory_usage=0,
                        job_progress=job_progress
                    )
                    last_heartbeat = time.time()

                # Get available jobs (not assigned)
                available_jobs = self.get_available_jobs(limit=5)

                if available_jobs:
                    self.logger.info(f"Found {len(available_jobs)} available jobs")

                    # Try to claim jobs in priority order
                    for job_info in available_jobs:
                        job_id = job_info["job_id"]

                        # Attempt to claim this job
                        job_data = self.claim_job(job_id)

                        if job_data:
                            # We got the job!
                            self.logger.info(f"Processing job {job_id}")
                            current_job = job_data
                            current_job["progress"] = 0

                            # Process the job
                            start_time = time.time()
                            try:
                                result = self.process_job(job_data)
                                execution_time = time.time() - start_time

                                # Submit result
                                if self.complete_job(job_id, result, execution_time):
                                    self.logger.info(f"Successfully completed job: {job_id}")
                                    current_job = None
                                    # Short delay before looking for next job
                                    time.sleep(2)
                                else:
                                    self.logger.error(f"Failed to submit job result: {job_id}")

                            except Exception as e:
                                # Report failure
                                self.logger.error(f"Job processing failed: {e}")
                                self.fail_job(job_id, str(e), traceback.format_exc())
                                current_job = None

                            # After processing one job, check for more
                            break
                        else:
                            # Someone else claimed this job, try next
                            continue
                else:
                    self.logger.debug("No available jobs")

                # Wait before next poll
                time.sleep(poll_interval)

            except KeyboardInterrupt:
                self.logger.info("Worker stopped by user")
                break
            except Exception as e:
                self.logger.error(f"Unexpected error: {e}")
                time.sleep(poll_interval)

# Usage
if __name__ == "__main__":
    import os

    logging.basicConfig(level=logging.INFO)

    # Import traceback for error reporting
    import traceback

    worker = MicroHubWorker(
        server_url=os.getenv("MICROHUB_SERVER_URL", "http://localhost:8000"),
        worker_token=os.getenv("WORKER_TOKEN")
    )

    # Use claim-based system with fast polling
    worker.run(poll_interval=10)

Important Notes

  1. Execution Time: Always track and report execution time accurately for billing purposes

  2. Error Handling: Always use try-catch blocks and report failures properly to avoid job getting stuck

  3. Polling Interval: Respect the next_poll value in responses to avoid overloading the server

  4. Token Management: Keep your worker token secure and never share it

  5. Result Format: Ensure your output matches the expected format for the job type

  6. Timeouts: Respect the timeout value provided with each job

  7. Idempotency: Jobs may be retried, ensure your processing is idempotent where possible

HTTP Status Codes

  • 200: Success
  • 401: Authentication failed (invalid or expired token)
  • 404: Job not found or not assigned to worker
  • 400: Bad request (invalid data format)
  • 429: Rate limited (too many requests)
  • 500: Server error

Rate Limits

  • Poll requests: Max 60 per minute
  • Job completions: No limit
  • Other endpoints: 100 requests per minute

Model Management

Model Registration and Approval Process

Workers can register support for models, which go through an approval process:

  1. Worker Registration: When registering, workers can specify models they support
  2. Model Request: Models start with status REQUESTED
  3. Admin Review: Admins review model requests in the /model-requests UI
  4. Approval Process:
  5. Admin clicks "Approve" on a model request
  6. Fills in configuration (size, requirements, pricing)
  7. Can use quick presets (Small/Medium/Large/XL)
  8. Model becomes AVAILABLE and appears on /models page
  9. Worker Support: Once approved, workers can process jobs for that model

Model Status Values

  • REQUESTED: Initial request from user/worker
  • PENDING: Awaiting admin review
  • IN_REVIEW: Admin is actively reviewing
  • APPROVED: Approved with configuration (transitions to AVAILABLE)
  • AVAILABLE: Active and available for use
  • REJECTED: Request was rejected with reason
  • UNAVAILABLE: Model is offline/not currently hosted
  • DEPRECATED: No longer supported
  • BETA: In beta testing

Support

For issues or questions, contact support@microdc.ai or visit the documentation at https://docs.microdc.ai