Skip to content

Execution Modes

Learn about different ways to execute workflows: synchronous, asynchronous, and streaming modes.

Overview

The InvokeAI Python Client supports multiple execution modes:

Mode Use Case Blocking Events
Synchronous Simple scripts, sequential processing Yes No
Asynchronous Concurrent operations, web apps No Yes
Streaming Real-time monitoring, progress UI No Yes
Hybrid Blocking with progress updates Yes Yes

Synchronous Execution

Basic Sync Pattern

The simplest way to execute workflows:

# Submit and wait sequentially
submission = wf.submit_sync()
result = wf.wait_for_completion_sync(timeout=120)

if result.get('status') == 'completed':
    print("Success!")
    mappings = wf.map_outputs_to_images(result)

With Progress Callback

def on_progress(queue_item):
    """Called periodically during execution."""
    status = queue_item.get('status')
    progress = queue_item.get('progress_percentage', 0)
    current = queue_item.get('current_step', 0)
    total = queue_item.get('total_steps', 0)

    print(f"[{progress:3.0f}%] Status: {status} ({current}/{total})")

# Submit with progress monitoring
submission = wf.submit_sync()
result = wf.wait_for_completion_sync(
    timeout=180,
    poll_interval=2.0,
    progress_callback=on_progress
)

Complete Sync Example

def run_workflow_sync(client, workflow_path, inputs):
    """Complete synchronous workflow execution."""
    # Load workflow
    wf = client.workflow_repo.create_workflow(
        WorkflowDefinition.from_file(workflow_path)
    )

    # Sync models
    wf.sync_dnn_model(by_name=True, by_base=True)

    # Set inputs
    for index, value in inputs.items():
        field = wf.get_input_value(index)
        if hasattr(field, 'value'):
            field.value = value

    # Submit and wait
    print("Submitting workflow...")
    submission = wf.submit_sync()

    print("Waiting for completion...")
    result = wf.wait_for_completion_sync(
        timeout=300,
        progress_callback=lambda q: print(f"Progress: {q.get('progress_percentage', 0)}%")
    )

    # Check result
    if result.get('status') == 'completed':
        print(" Workflow completed successfully")
        return wf.map_outputs_to_images(result)
    else:
        print(f" Workflow failed: {result.get('status')}")
        return None

Asynchronous Execution

Basic Async Pattern

For concurrent operations:

import asyncio

async def run_workflow_async(wf):
    """Async workflow execution."""
    # Submit asynchronously
    submission = await wf.submit()
    print(f"Submitted: {submission['batch_id']}")

    # Wait for completion
    result = await wf.wait_for_completion(timeout=180)

    # Map outputs
    if result.get('status') == 'completed':
        mappings = wf.map_outputs_to_images(result)
        return mappings

    return None

# Run async
async def main():
    wf = client.workflow_repo.create_workflow(workflow_def)
    result = await run_workflow_async(wf)
    print(f"Result: {result}")

asyncio.run(main())

Concurrent Workflows

async def process_batch_async(client, workflow_def, prompts):
    """Process multiple prompts concurrently."""
    tasks = []

    for prompt in prompts:
        # Create separate workflow instance
        wf = client.workflow_repo.create_workflow(workflow_def)
        wf.get_input_value(0).value = prompt

        # Create task
        task = run_workflow_async(wf)
        tasks.append(task)

    # Wait for all to complete
    results = await asyncio.gather(*tasks)

    return results

# Process batch
async def main():
    prompts = ["Sunset", "Mountains", "Ocean", "Forest"]
    results = await process_batch_async(client, workflow_def, prompts)

    for prompt, result in zip(prompts, results):
        print(f"{prompt}: {len(result) if result else 0} images")

asyncio.run(main())

With Event Handling

async def run_with_events(wf):
    """Execute with event subscription."""
    # Submit with event subscription
    submission = await wf.submit(subscribe_events=True)

    # Event handler
    async def handle_event(event):
        event_type = event.get('type')

        if event_type == 'generator_progress':
            step = event.get('step', 0)
            total = event.get('total_steps', 0)
            print(f"Progress: {step}/{total}")

        elif event_type == 'invocation_complete':
            node = event.get('node_id')
            print(f"Node completed: {node}")

        elif event_type == 'session_complete':
            print("Session finished!")

    # Subscribe to events
    wf.on_event(handle_event)

    # Wait for completion
    result = await wf.wait_for_completion()
    return result

Streaming Execution

Hybrid Streaming

Monitor progress while blocking:

async def submit_sync_monitor_async(wf):
    """Submit synchronously but monitor asynchronously."""
    # Submit
    submission = wf.submit_sync()

    # Stream events
    async for event in wf.stream_events(submission['session_id']):
        event_type = event.get('type')

        if event_type == 'generator_progress':
            progress = event.get('progress_percentage', 0)
            yield {'type': 'progress', 'value': progress}

        elif event_type == 'invocation_complete':
            node = event.get('node', {}).get('type')
            yield {'type': 'node_complete', 'node': node}

        elif event_type == 'session_complete':
            yield {'type': 'complete', 'status': event.get('status')}
            break

# Use streaming
async def monitor_workflow():
    async for update in submit_sync_monitor_async(wf):
        if update['type'] == 'progress':
            print(f"Progress: {update['value']}%")
        elif update['type'] == 'complete':
            print(f"Finished: {update['status']}")

WebSocket Streaming

import socketio

class WorkflowMonitor:
    """Real-time workflow monitoring via WebSocket."""

    def __init__(self, base_url):
        self.sio = socketio.AsyncClient()
        self.base_url = base_url
        self.events = []

        # Register handlers
        self.sio.on('generator_progress', self.on_progress)
        self.sio.on('invocation_complete', self.on_node_complete)
        self.sio.on('session_complete', self.on_complete)

    async def connect(self):
        await self.sio.connect(self.base_url)

    async def monitor_session(self, session_id):
        """Monitor a specific session."""
        await self.sio.emit('subscribe_session', {'session_id': session_id})

    async def on_progress(self, data):
        step = data.get('step', 0)
        total = data.get('total_steps', 0)
        print(f"Progress: {step}/{total}")
        self.events.append(('progress', data))

    async def on_node_complete(self, data):
        node_id = data.get('node_id')
        print(f"Node complete: {node_id}")
        self.events.append(('node', data))

    async def on_complete(self, data):
        status = data.get('status')
        print(f"Session complete: {status}")
        self.events.append(('complete', data))
        await self.sio.disconnect()

# Use monitor
monitor = WorkflowMonitor("ws://localhost:9090")
await monitor.connect()
await monitor.monitor_session(submission['session_id'])

Execution Control

Timeouts

# Short timeout for fast models
result = wf.wait_for_completion_sync(timeout=30)

# Long timeout for complex workflows
result = wf.wait_for_completion_sync(timeout=600)

# No timeout (wait indefinitely)
result = wf.wait_for_completion_sync(timeout=None)

# Handle timeout
try:
    result = wf.wait_for_completion_sync(timeout=60)
except TimeoutError:
    print("Workflow timed out after 60 seconds")
    # Optionally cancel
    client.cancel_workflow(submission['item_id'])

Polling Intervals

# Fast polling for real-time feel
result = wf.wait_for_completion_sync(
    timeout=120,
    poll_interval=0.5  # Check every 500ms
)

# Slow polling to reduce load
result = wf.wait_for_completion_sync(
    timeout=300,
    poll_interval=5.0  # Check every 5 seconds
)

# Adaptive polling
def adaptive_poll_interval(attempt):
    """Increase interval over time."""
    if attempt < 10:
        return 1.0  # Fast initially
    elif attempt < 30:
        return 2.0  # Medium
    else:
        return 5.0  # Slow for long runs

Cancellation

def run_with_cancellation(wf, timeout=120):
    """Run workflow with cancellation support."""
    import threading

    cancelled = threading.Event()

    def check_cancelled():
        """Check if cancelled."""
        if cancelled.is_set():
            raise InterruptedError("Workflow cancelled")

    try:
        # Submit
        submission = wf.submit_sync()

        # Wait with cancellation check
        result = wf.wait_for_completion_sync(
            timeout=timeout,
            progress_callback=lambda _: check_cancelled()
        )

        return result

    except InterruptedError:
        # Cancel on server
        client.cancel_workflow(submission['item_id'])
        return None

# Cancel from another thread
def cancel_after(seconds, event):
    import time
    time.sleep(seconds)
    event.set()

cancelled = threading.Event()
threading.Thread(target=cancel_after, args=(30, cancelled)).start()
result = run_with_cancellation(wf)

Queue Management

Queue Status

def get_queue_status(client, queue_id="default"):
    """Get queue statistics."""
    response = client._make_request("GET", f"/queue/{queue_id}/status")
    status = response.json()

    print(f"Queue: {queue_id}")
    print(f"  Pending: {status.get('queue_pending_count', 0)}")
    print(f"  In Progress: {status.get('queue_in_progress_count', 0)}")
    print(f"  Completed: {status.get('queue_completed_count', 0)}")
    print(f"  Failed: {status.get('queue_failed_count', 0)}")

    return status

Queue Position

def get_queue_position(client, item_id, queue_id="default"):
    """Get position in queue."""
    response = client._make_request(
        "GET", 
        f"/queue/{queue_id}/i/{item_id}/position"
    )
    position = response.json()

    print(f"Position: {position.get('position', 'unknown')}")
    print(f"Ahead: {position.get('ahead_count', 0)}")
    print(f"ETA: {position.get('estimated_time', 'unknown')}")

    return position

Priority Submission

def submit_with_priority(wf, priority="normal"):
    """Submit with queue priority."""
    # Priority levels: low, normal, high, urgent

    submission_data = wf._build_submission()
    submission_data['priority'] = priority

    response = client._make_request(
        "POST",
        "/queue/default/enqueue_batch",
        json=submission_data
    )

    return response.json()

# High priority submission
submission = submit_with_priority(wf, priority="high")

Performance Patterns

Batch Processing

def process_batch(client, workflow_def, items, batch_size=5):
    """Process items in batches."""
    from itertools import islice

    def chunks(iterable, size):
        it = iter(iterable)
        while True:
            chunk = list(islice(it, size))
            if not chunk:
                break
            yield chunk

    all_results = []

    for batch_num, batch in enumerate(chunks(items, batch_size)):
        print(f"Processing batch {batch_num + 1}")

        # Process batch concurrently
        batch_results = []
        for item in batch:
            wf = client.workflow_repo.create_workflow(workflow_def)
            # Set inputs from item
            wf.get_input_value(0).value = item['prompt']

            # Submit
            submission = wf.submit_sync()
            batch_results.append((wf, submission))

        # Wait for batch completion
        for wf, submission in batch_results:
            result = wf.wait_for_completion_sync(timeout=180)
            all_results.append(result)

    return all_results

Pipeline Pattern

class WorkflowPipeline:
    """Chain multiple workflows together."""

    def __init__(self, client):
        self.client = client
        self.stages = []

    def add_stage(self, workflow_def, input_mapper):
        """Add pipeline stage."""
        self.stages.append({
            'workflow': workflow_def,
            'mapper': input_mapper
        })
        return self

    def execute(self, initial_input):
        """Execute pipeline."""
        current_output = initial_input

        for i, stage in enumerate(self.stages):
            print(f"Stage {i + 1}/{len(self.stages)}")

            # Create workflow
            wf = self.client.workflow_repo.create_workflow(
                stage['workflow']
            )

            # Map inputs
            stage['mapper'](wf, current_output)

            # Execute
            submission = wf.submit_sync()
            result = wf.wait_for_completion_sync()

            # Get output for next stage
            current_output = wf.map_outputs_to_images(result)

        return current_output

# Use pipeline
pipeline = WorkflowPipeline(client)
pipeline.add_stage(
    workflow1,
    lambda wf, inp: wf.get_input_value(0).value = inp
)
pipeline.add_stage(
    workflow2,
    lambda wf, inp: wf.get_input_value(1).value = inp[0]['image_names'][0]
)
result = pipeline.execute("Initial prompt")

Error Handling

Retry Logic

def execute_with_retry(wf, max_retries=3, backoff=2.0):
    """Execute with automatic retry."""
    import time

    for attempt in range(max_retries):
        try:
            submission = wf.submit_sync()
            result = wf.wait_for_completion_sync(timeout=180)

            if result.get('status') == 'completed':
                return result
            elif result.get('status') == 'failed':
                error = result.get('error', 'Unknown error')
                print(f"Attempt {attempt + 1} failed: {error}")

                if attempt < max_retries - 1:
                    wait_time = backoff ** attempt
                    print(f"Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    raise Exception(f"Failed after {max_retries} attempts")

        except TimeoutError:
            print(f"Attempt {attempt + 1} timed out")
            if attempt == max_retries - 1:
                raise

    return None

Error Recovery

def execute_with_recovery(wf):
    """Execute with error recovery."""
    try:
        submission = wf.submit_sync()
        result = wf.wait_for_completion_sync(timeout=120)
        return result

    except TimeoutError:
        print("Timeout - attempting recovery")
        # Try with reduced quality
        wf.get_input_value(STEPS_IDX).value = 10  # Reduce steps
        wf.get_input_value(WIDTH_IDX).value = 512  # Reduce size
        wf.get_input_value(HEIGHT_IDX).value = 512

        # Retry with lower settings
        submission = wf.submit_sync()
        return wf.wait_for_completion_sync(timeout=60)

    except ConnectionError:
        print("Connection lost - waiting for reconnection")
        time.sleep(10)
        # Recreate client and retry
        client = InvokeAIClient.from_url("http://localhost:9090")
        wf = client.workflow_repo.create_workflow(workflow_def)
        return execute_with_recovery(wf)

Monitoring & Logging

Execution Logger

import logging
from datetime import datetime

class ExecutionLogger:
    """Log workflow execution details."""

    def __init__(self, log_file="workflow.log"):
        self.logger = logging.getLogger("workflow")
        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_submission(self, submission):
        self.logger.info(f"Submitted: {submission.get('batch_id')}")

    def log_progress(self, queue_item):
        progress = queue_item.get('progress_percentage', 0)
        self.logger.info(f"Progress: {progress}%")

    def log_completion(self, result):
        status = result.get('status')
        if status == 'completed':
            self.logger.info("Workflow completed successfully")
        else:
            self.logger.error(f"Workflow failed: {status}")

# Use logger
logger = ExecutionLogger()
submission = wf.submit_sync()
logger.log_submission(submission)

result = wf.wait_for_completion_sync(
    progress_callback=logger.log_progress
)
logger.log_completion(result)

Best Practices

1. Choose the Right Mode

# Simple, one-off generation
result = wf.wait_for_completion_sync(wf.submit_sync())

# Multiple concurrent generations
results = await asyncio.gather(*[wf.submit() for _ in range(10)])

# Real-time UI updates
async for event in wf.stream_events():
    update_ui(event)

2. Handle All Status Types

status_handlers = {
    'completed': lambda r: process_results(r),
    'failed': lambda r: log_error(r),
    'canceled': lambda r: cleanup(r),
    'pending': lambda r: wait_more(r)
}

status = result.get('status')
handler = status_handlers.get(status, lambda r: print(f"Unknown: {status}"))
handler(result)

3. Resource Cleanup

try:
    result = wf.wait_for_completion_sync(submission)
finally:
    # Always cleanup
    if submission:
        # Delete uploaded images
        for img in uploaded_images:
            client.delete_image(img)

Next Steps