Worker API Documentation¶
This document describes how workers interact with the MicroHub server to process jobs.
Authentication¶
Workers authenticate using Bearer tokens obtained during registration:
API Base URL¶
All worker API endpoints use the /api/v1/workers prefix:
- Production:
https://api.microdc.ai/api/v1/workers - Development:
http://localhost:8000/api/v1/workers
Job Assignment System¶
The MicroHub platform uses a claim-based job assignment system to prevent multiple workers from competing for the same job. This ensures:
- No race conditions with hundreds of workers
- Jobs are only shown to workers that can handle them
- Fair distribution of work
- Atomic job claiming (only one worker gets each job)
How It Works¶
- Check Available Jobs - Worker requests list of jobs matching its capabilities
- Claim a Job - Worker atomically claims a specific job (first claim wins)
- Process - Only the successful claimer processes the job
- Complete/Fail - Submit results or report failure
API Endpoints¶
1. Get Available Jobs¶
Endpoint: GET /api/v1/workers/jobs/available
Query Parameters:
limit(optional, default=5): Maximum number of jobs to return
Description: Returns jobs that match the worker's capabilities and are available to claim.
response = requests.get(
f"{server_url}/api/v1/workers/jobs/available",
headers={"Authorization": f"Bearer {worker_token}"},
params={"limit": 5}
)
data = response.json()
print(f"Found {data['count']} available jobs")
Response:
{
"jobs": [
{
"job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"model": "llama2-7b",
"type": "llm",
"priority": "high",
"estimated_reward": 0.0125,
"created_at": "2024-01-01T12:00:00Z",
"retry_count": 0,
"expires_in": 30
}
],
"count": 1,
"message": "Found 1 available jobs"
}
Important Notes:
- Jobs returned are filtered by worker's supported models
- Jobs with
retry_count >= 3are not shown - Jobs are ordered by priority (descending) then creation time (ascending)
- These jobs are NOT reserved - you must claim them atomically
2. Claim a Job¶
Endpoint: POST /api/v1/workers/jobs/{job_id}/claim
Description: Atomically claim a job. Uses database row-level locking to ensure only one worker can claim each job.
response = requests.post(
f"{server_url}/api/v1/workers/jobs/{job_id}/claim",
headers={"Authorization": f"Bearer {worker_token}"}
)
result = response.json()
if result["status"] == "claimed":
job_data = result["job_data"]
print(f"Successfully claimed job {job_data['job_id']}")
# Start processing the job...
else:
print(f"Could not claim job: {result['status']}")
Success Response (status=claimed):
{
"status": "claimed",
"message": "Job successfully claimed",
"job_data": {
"job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"assignment_id": "7b3d5f64-1234-4562-b3fc-2c963f66afa6",
"type": "llm",
"model": "llama2-7b",
"payload": {
"prompt": "Hello, how are you?",
"max_tokens": 512,
"temperature": 0.7
},
"llm_interaction_type": "generation", // "generation" or "chat"
"input_modalities": ["text"], // Input types: text, image, audio, video
"output_modalities": ["text"], // Expected output types
"priority": "high",
"timeout": 300,
"started_at": "2024-01-01T12:00:00Z",
"must_complete_by": "2024-01-01T12:05:00Z"
}
}
Failure Response Statuses:
already_claimed- Another worker claimed the jobincompatible- Worker doesn't support the required modelworker_busy- Worker is already processing another job- 404 - Job not found
3. Complete Job¶
Endpoint: POST /api/v1/workers/jobs/{job_id}/complete
Description: Submit successful job completion with results.
import time
start_time = time.time()
result = process_job(job_data) # Your processing logic
execution_time = time.time() - start_time
response = requests.post(
f"{server_url}/api/v1/workers/jobs/{job_id}/complete",
headers={"Authorization": f"Bearer {worker_token}"},
json={
"output": result,
"execution_time": execution_time,
"tokens_used": 150 # Optional
}
)
Request Body:
{
"output": {
"text": "Generated response...",
"finish_reason": "stop"
},
"execution_time": 2.5,
"tokens_used": 150
}
Response:
4. Report Job Failure¶
Endpoint: POST /api/v1/workers/jobs/{job_id}/fail
Description: Report that a job could not be completed.
response = requests.post(
f"{server_url}/api/v1/workers/jobs/{job_id}/fail",
headers={"Authorization": f"Bearer {worker_token}"},
json={
"reason": "Model loading failed: Out of memory",
"error_details": traceback.format_exc() # Optional
}
)
Request Body:
5. Worker Status Heartbeat¶
Endpoint: POST /api/v1/workers/heartbeat
Description: Send periodic heartbeat to update worker status, resource usage, and system metrics. This should be sent every 60 seconds to maintain active status.
IMPORTANT FOR LONG-RUNNING JOBS: If a worker is processing a job that may exceed the timeout (default 300 seconds / 5 minutes), it MUST include job_progress in the heartbeat to reset the timeout and prevent the job from being reassigned. This allows jobs to run indefinitely as long as the worker reports progress.
Request Body:
{
"status": "idle", // or "busy", "maintenance"
"cpu_usage": 25.5,
"memory_usage": 45.2,
"gpu_usage": 0.0, // Optional, null if no GPU
"active_jobs": 0,
"completed_jobs_session": 5,
"worker_version": "1.0.0", // Optional: Worker software version
"current_models": ["llama2-7b", "mistral-7b"], // Optional: Currently available models
"job_progress": { // IMPORTANT: Include this when processing a job to reset timeout
"job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"assignment_id": "7b3d5f64-1234-4562-b3fc-2c963f66afa6",
"progress_percentage": 45.0, // Optional: 0-100
"message": "Loading model weights", // Optional: Human-readable status
"estimated_remaining_seconds": 120 // Optional: Estimated time to completion
},
"system_metrics": { // Optional: Detailed system metrics
"load_average": [1.5, 2.0, 1.8], // 1, 5, and 15 minute load averages
"cpu_count": 16,
"memory_total_gb": 64.0,
"memory_available_gb": 32.5,
"memory_percent": 49.2,
"disk_total_gb": 1000.0,
"disk_available_gb": 450.0,
"disk_percent": 55.0,
"gpu_memory_total_gb": 24.0, // Optional
"gpu_memory_used_gb": 8.5, // Optional
"gpu_temperature_c": 65.0, // Optional: GPU temperature in Celsius
"cpu_temperature_c": 58.0, // Optional: CPU temperature in Celsius
"network_sent_mb": 1024.5, // Optional: Network bytes sent in MB
"network_recv_mb": 2048.3, // Optional: Network bytes received in MB
"uptime_seconds": 86400
},
"supported_models": ["llama2-7b", "mistral-7b"] // Legacy: Use current_models instead
}
Response:
{
"status": "acknowledged",
"server_time": "2024-01-01T12:00:00Z",
"next_heartbeat": 60,
"model_sync": {
"added": ["new-model"],
"removed": ["old-model"],
"updated": ["existing-model"]
},
"metrics_received": true,
"progress_recorded": true // true if job_progress was successfully recorded
}
Notes:
- The
current_modelsfield should list all models currently available on the worker - System metrics help the server monitor worker health and resource availability
- All temperature values should be in Celsius
- Load average follows Unix convention: [1-minute, 5-minute, 15-minute]
- Network metrics are cumulative since worker start
- Job Progress: When processing a job, include
job_progressto extend the timeout. Each progress update resets the timeout window, allowing jobs to run longer than the initial timeout limit. - Timeout Handling: If no progress is reported, jobs will timeout based on
timeout_seconds(default 300s). Including progress updates prevents timeout and job reassignment.
6. Optional Job Endpoints¶
Send Job Heartbeat¶
Endpoint: POST /api/v1/workers/jobs/{job_id}/heartbeat
Keep job alive and report progress (not currently implemented in server).
Release Job¶
Endpoint: POST /api/v1/workers/jobs/{job_id}/release
Release a claimed job back to queue if unable to process (not currently implemented in server).
Job Processing¶
Execute the job based on its type, interaction type, and modalities:
def process_job(job):
job_type = job["type"]
model = job["model"]
payload = job["payload"]
llm_interaction_type = job.get("llm_interaction_type")
input_modalities = job.get("input_modalities", ["text"])
output_modalities = job.get("output_modalities", ["text"])
if job_type == "llm":
# Route based on LLM interaction type
if llm_interaction_type == "chat":
# Chat-based interaction (messages format)
result = run_chat_inference(
model=model,
messages=payload["messages"],
max_tokens=payload.get("max_tokens", 512),
temperature=payload.get("temperature", 0.7),
input_modalities=input_modalities,
output_modalities=output_modalities
)
elif llm_interaction_type == "generation":
# Text generation (completion format)
result = run_llm_inference(
model=model,
prompt=payload["prompt"],
max_tokens=payload.get("max_tokens", 512),
temperature=payload.get("temperature", 0.7),
input_modalities=input_modalities,
output_modalities=output_modalities
)
return result
elif job_type == "embed":
# Generate embeddings
result = generate_embeddings(
model=model,
texts=payload["texts"]
)
return result
# Add more job types as needed
Multimodal Job Examples¶
Text-to-Image Generation¶
if job_type == "llm" and "image" in output_modalities:
# Input: text, Output: image
result = generate_image(
model=model,
prompt=payload["prompt"],
size=payload.get("size", "1024x1024")
)
return {
"image_url": result.url,
"image_data": result.base64 # or URL
}
Vision Model (Image + Text → Text)¶
if "image" in input_modalities and "text" in input_modalities:
# Multi-modal input processing
result = process_vision_model(
model=model,
text=payload["prompt"],
images=payload["images"], # Base64 or URLs
max_tokens=payload.get("max_tokens", 512)
)
return {
"text": result.text,
"finish_reason": "stop"
}
Audio Transcription¶
if "audio" in input_modalities:
# Audio to text
result = transcribe_audio(
model=model,
audio_data=payload["audio"], # Base64 or URL
language=payload.get("language", "en")
)
return {
"text": result.transcription,
"language": result.detected_language
}
Video Analysis¶
if "video" in input_modalities:
# Video to text/image analysis
result = analyze_video(
model=model,
video_url=payload["video_url"],
analysis_type=payload.get("analysis_type", "description")
)
return {
"analysis": result.text,
"keyframes": result.keyframes if "image" in output_modalities else None
}
Complete Python Worker Example¶
import requests
import time
import logging
from typing import Dict, Any, Optional, List
import random
class MicroHubWorker:
def __init__(self, server_url: str, worker_token: str):
self.server_url = server_url.rstrip('/')
self.worker_token = worker_token
self.headers = {
"Authorization": f"Bearer {worker_token}",
"Content-Type": "application/json"
}
self.logger = logging.getLogger(__name__)
def get_available_jobs(self, limit: int = 5) -> List[Dict[str, Any]]:
"""Get list of jobs this worker can handle"""
try:
response = requests.get(
f"{self.server_url}/api/v1/workers/jobs/available",
headers=self.headers,
params={"limit": limit}
)
if response.status_code == 200:
data = response.json()
return data.get("jobs", [])
return []
except Exception as e:
self.logger.error(f"Error getting available jobs: {e}")
return []
def claim_job(self, job_id: str) -> Optional[Dict[str, Any]]:
"""Attempt to claim a specific job"""
try:
response = requests.post(
f"{self.server_url}/api/v1/workers/jobs/{job_id}/claim",
headers=self.headers
)
if response.status_code == 200:
data = response.json()
if data["status"] == "claimed":
self.logger.info(f"Successfully claimed job {job_id}")
return data["job_data"]
else:
self.logger.info(f"Could not claim job {job_id}: {data['status']}")
return None
except Exception as e:
self.logger.error(f"Error claiming job: {e}")
return None
def send_worker_heartbeat(self, status: str = "idle", active_jobs: int = 0,
cpu_usage: float = 0, memory_usage: float = 0,
gpu_usage: Optional[float] = None,
completed_jobs_session: int = 0,
supported_models: Optional[List[str]] = None,
job_progress: Optional[Dict[str, Any]] = None) -> bool:
"""Send worker status heartbeat (every 60 seconds)
Args:
job_progress: Optional dict with job_id, assignment_id, and optional
progress_percentage, message, estimated_remaining_seconds.
Including this resets the job timeout.
"""
try:
payload = {
"status": status, # "idle", "busy", or "maintenance"
"cpu_usage": cpu_usage,
"memory_usage": memory_usage,
"gpu_usage": gpu_usage,
"active_jobs": active_jobs,
"completed_jobs_session": completed_jobs_session
}
if supported_models:
payload["supported_models"] = supported_models
if job_progress:
payload["job_progress"] = job_progress
response = requests.post(
f"{self.server_url}/api/v1/workers/heartbeat",
headers=self.headers,
json=payload
)
if response.status_code == 200:
self.logger.debug("Worker heartbeat sent successfully")
return True
else:
self.logger.error(f"Worker heartbeat failed: {response.text}")
return False
except Exception as e:
self.logger.error(f"Error sending worker heartbeat: {e}")
return False
def send_job_heartbeat(self, job_id: str, progress: float = None, message: str = None) -> bool:
"""Send heartbeat to keep job alive (not yet implemented in server)"""
try:
payload = {}
if progress is not None:
payload["progress"] = progress
if message:
payload["message"] = message
response = requests.post(
f"{self.server_url}/api/v1/workers/jobs/{job_id}/heartbeat",
headers=self.headers,
json=payload
)
return response.status_code == 200
except Exception as e:
self.logger.error(f"Error sending job heartbeat: {e}")
return False
def complete_job(self, job_id: str, result: Dict[str, Any],
execution_time: float, tokens_used: Optional[int] = None) -> bool:
"""Submit successful job completion"""
try:
payload = {
"output": result,
"execution_time": execution_time
}
if tokens_used:
payload["tokens_used"] = tokens_used
response = requests.post(
f"{self.server_url}/api/v1/workers/jobs/{job_id}/complete",
headers=self.headers,
json=payload
)
if response.status_code == 200:
data = response.json()
self.logger.info(f"Job completed! Earned: ${data.get('earnings', 0)}")
return True
else:
self.logger.error(f"Failed to complete job: {response.text}")
return False
except Exception as e:
self.logger.error(f"Error completing job: {e}")
return False
def fail_job(self, job_id: str, reason: str, details: Optional[str] = None) -> bool:
"""Report job failure"""
try:
payload = {"reason": reason}
if details:
payload["error_details"] = details
response = requests.post(
f"{self.server_url}/api/v1/workers/jobs/{job_id}/fail",
headers=self.headers,
json=payload
)
return response.status_code == 200
except Exception as e:
self.logger.error(f"Error reporting job failure: {e}")
return False
def process_job(self, job: Dict[str, Any]) -> Dict[str, Any]:
"""Process a job based on its type, interaction type, and modalities - Override this method"""
job_type = job["type"]
model = job["model"]
payload = job["payload"]
llm_interaction_type = job.get("llm_interaction_type")
input_modalities = job.get("input_modalities", ["text"])
output_modalities = job.get("output_modalities", ["text"])
# This is where you implement your actual processing logic
if job_type == "llm":
# Route based on LLM interaction type
if llm_interaction_type == "chat":
# Chat-based interaction
return {
"text": f"Chat response to: {payload.get('messages', [])}",
"finish_reason": "stop"
}
elif llm_interaction_type == "generation":
# Text generation
return {
"text": f"Generated from: {payload.get('prompt', '')}",
"finish_reason": "stop"
}
# Handle multimodal cases
if "image" in output_modalities:
# Text-to-image generation
return {"image_url": "https://example.com/generated.png"}
elif job_type == "embed":
# Example embedding processing
return {
"embeddings": [[0.1] * 768] # Replace with actual embeddings
}
else:
raise ValueError(f"Unsupported job type: {job_type}")
def run(self, poll_interval: int = 10, heartbeat_interval: int = 60):
"""Main worker loop with claim-based assignment"""
self.logger.info("Worker started (claim-based system)")
last_heartbeat = 0
current_job = None
while True:
try:
# Send periodic heartbeat (every 60s)
if time.time() - last_heartbeat > heartbeat_interval:
job_progress = None
if current_job:
# Include job progress to reset timeout for long-running jobs
job_progress = {
"job_id": current_job["job_id"],
"assignment_id": current_job["assignment_id"],
"progress_percentage": current_job.get("progress", 0),
"message": current_job.get("status_message", "Processing"),
}
self.send_worker_heartbeat(
status="busy" if current_job else "idle",
active_jobs=1 if current_job else 0,
cpu_usage=0, # TODO: Get actual metrics
memory_usage=0,
job_progress=job_progress
)
last_heartbeat = time.time()
# Get available jobs (not assigned)
available_jobs = self.get_available_jobs(limit=5)
if available_jobs:
self.logger.info(f"Found {len(available_jobs)} available jobs")
# Try to claim jobs in priority order
for job_info in available_jobs:
job_id = job_info["job_id"]
# Attempt to claim this job
job_data = self.claim_job(job_id)
if job_data:
# We got the job!
self.logger.info(f"Processing job {job_id}")
current_job = job_data
current_job["progress"] = 0
# Process the job
start_time = time.time()
try:
result = self.process_job(job_data)
execution_time = time.time() - start_time
# Submit result
if self.complete_job(job_id, result, execution_time):
self.logger.info(f"Successfully completed job: {job_id}")
current_job = None
# Short delay before looking for next job
time.sleep(2)
else:
self.logger.error(f"Failed to submit job result: {job_id}")
except Exception as e:
# Report failure
self.logger.error(f"Job processing failed: {e}")
self.fail_job(job_id, str(e), traceback.format_exc())
current_job = None
# After processing one job, check for more
break
else:
# Someone else claimed this job, try next
continue
else:
self.logger.debug("No available jobs")
# Wait before next poll
time.sleep(poll_interval)
except KeyboardInterrupt:
self.logger.info("Worker stopped by user")
break
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
time.sleep(poll_interval)
# Usage
if __name__ == "__main__":
import os
logging.basicConfig(level=logging.INFO)
# Import traceback for error reporting
import traceback
worker = MicroHubWorker(
server_url=os.getenv("MICROHUB_SERVER_URL", "http://localhost:8000"),
worker_token=os.getenv("WORKER_TOKEN")
)
# Use claim-based system with fast polling
worker.run(poll_interval=10)
Important Notes¶
-
Execution Time: Always track and report execution time accurately for billing purposes
-
Error Handling: Always use try-catch blocks and report failures properly to avoid job getting stuck
-
Polling Interval: Respect the
next_pollvalue in responses to avoid overloading the server -
Token Management: Keep your worker token secure and never share it
-
Result Format: Ensure your output matches the expected format for the job type
-
Timeouts: Respect the timeout value provided with each job
-
Idempotency: Jobs may be retried, ensure your processing is idempotent where possible
HTTP Status Codes¶
200: Success401: Authentication failed (invalid or expired token)404: Job not found or not assigned to worker400: Bad request (invalid data format)429: Rate limited (too many requests)500: Server error
Rate Limits¶
- Poll requests: Max 60 per minute
- Job completions: No limit
- Other endpoints: 100 requests per minute
Model Management¶
Model Registration and Approval Process¶
Workers can register support for models, which go through an approval process:
- Worker Registration: When registering, workers can specify models they support
- Model Request: Models start with status
REQUESTED - Admin Review: Admins review model requests in the
/model-requestsUI - Approval Process:
- Admin clicks "Approve" on a model request
- Fills in configuration (size, requirements, pricing)
- Can use quick presets (Small/Medium/Large/XL)
- Model becomes
AVAILABLEand appears on/modelspage - Worker Support: Once approved, workers can process jobs for that model
Model Status Values¶
REQUESTED: Initial request from user/workerPENDING: Awaiting admin reviewIN_REVIEW: Admin is actively reviewingAPPROVED: Approved with configuration (transitions to AVAILABLE)AVAILABLE: Active and available for useREJECTED: Request was rejected with reasonUNAVAILABLE: Model is offline/not currently hostedDEPRECATED: No longer supportedBETA: In beta testing
Support¶
For issues or questions, contact support@microdc.ai or visit the documentation at https://docs.microdc.ai