Output Mapping¶
Learn how to extract and map workflow outputs to meaningful results.
Overview¶
Output mapping is essential for: - Result extraction: Getting generated images from workflow outputs - Multi-output handling: Managing workflows with multiple outputs - Metadata preservation: Maintaining generation parameters - Batch result processing: Handling results from batch operations
Understanding Workflow Outputs¶
Output Structure¶
# Workflow output structure
result = {
'status': 'COMPLETED',
'session_id': 'abc123',
'invocation_outputs': [
{
'invocation_id': 'node_123',
'invocation_type': 'l2i', # Latents to image
'outputs': {
'image': {
'image_name': 'output_xyz.png',
'width': 1024,
'height': 1024,
'board_id': 'results_board'
}
}
},
# More invocation outputs...
]
}
Output Types¶
# Common output types
output_types = {
'l2i': 'Latents to Image - final image output',
'nsfw_checker': 'NSFW detection results',
'metadata': 'Generation metadata',
'img_resize': 'Resized image output',
'img_watermark': 'Watermarked image',
'canvas_paste': 'Canvas composite output'
}
Basic Output Mapping¶
Map to Images¶
def map_outputs_to_images(result):
"""Extract all images from workflow result."""
images = []
if result.get('status') != 'COMPLETED':
return images
for output in result.get('invocation_outputs', []):
outputs = output.get('outputs', {})
# Check for image output
if 'image' in outputs:
image_info = outputs['image']
images.append({
'name': image_info['image_name'],
'width': image_info.get('width'),
'height': image_info.get('height'),
'board_id': image_info.get('board_id'),
'node_id': output['invocation_id'],
'type': output['invocation_type']
})
return images
# Use the mapping
result = wf.wait_for_completion_sync(submission)
images = map_outputs_to_images(result)
for img in images:
print(f"Generated: {img['name']} ({img['width']}x{img['height']})")
WorkflowHandle Helper¶
# Built-in output mapping
images = wf.map_outputs_to_images(result)
for image_name in images:
print(f"Image: {image_name}")
# Download the image
board = client.board_repo.get_board_handle("results")
image_data = board.download_image(image_name)
Advanced Output Mapping¶
Filter by Node Type¶
def get_outputs_by_type(result, node_type):
"""Get outputs from specific node types."""
outputs = []
for output in result.get('invocation_outputs', []):
if output['invocation_type'] == node_type:
outputs.append(output['outputs'])
return outputs
# Get only final images (l2i nodes)
final_images = get_outputs_by_type(result, 'l2i')
# Get metadata outputs
metadata = get_outputs_by_type(result, 'metadata')
Map by Node ID¶
def get_output_by_node_id(result, node_id):
"""Get output from specific node."""
for output in result.get('invocation_outputs', []):
if output['invocation_id'] == node_id:
return output['outputs']
return None
# Get output from specific node
node_output = get_output_by_node_id(result, 'main_output_node')
if node_output and 'image' in node_output:
image_name = node_output['image']['image_name']
Multi-Output Workflows¶
Handle Multiple Outputs¶
def map_multi_output_workflow(result):
"""Map outputs from workflow with multiple output nodes."""
outputs = {
'main_image': None,
'refined_image': None,
'thumbnail': None,
'metadata': {}
}
for output in result.get('invocation_outputs', []):
node_id = output['invocation_id']
node_outputs = output['outputs']
# Map based on node ID or type
if 'main_l2i' in node_id:
outputs['main_image'] = node_outputs.get('image')
elif 'refiner_l2i' in node_id:
outputs['refined_image'] = node_outputs.get('image')
elif 'thumbnail' in node_id:
outputs['thumbnail'] = node_outputs.get('image')
elif output['invocation_type'] == 'metadata':
outputs['metadata'].update(node_outputs)
return outputs
# Process multi-output result
mapped = map_multi_output_workflow(result)
if mapped['main_image']:
print(f"Main: {mapped['main_image']['image_name']}")
if mapped['refined_image']:
print(f"Refined: {mapped['refined_image']['image_name']}")
Output Collections¶
def map_collection_outputs(result):
"""Map outputs that produce collections."""
collections = {}
for output in result.get('invocation_outputs', []):
outputs = output['outputs']
# Check for collection outputs
if 'collection' in outputs:
collection_name = output['invocation_id']
collections[collection_name] = outputs['collection']
# Check for image collection
if 'images' in outputs:
collections[f"{output['invocation_id']}_images"] = outputs['images']
return collections
Metadata Extraction¶
Extract Generation Parameters¶
def extract_generation_metadata(result):
"""Extract generation parameters from result."""
metadata = {
'prompt': None,
'negative_prompt': None,
'seed': None,
'steps': None,
'cfg_scale': None,
'model': None,
'dimensions': None
}
for output in result.get('invocation_outputs', []):
node_type = output['invocation_type']
outputs = output['outputs']
# Extract from different node types
if node_type == 'compel' and 'prompt' in outputs:
if 'positive' in output['invocation_id']:
metadata['prompt'] = outputs['prompt']
else:
metadata['negative_prompt'] = outputs['prompt']
elif node_type == 'noise' and 'seed' in outputs:
metadata['seed'] = outputs['seed']
elif node_type == 'denoise' and 'steps' in outputs:
metadata['steps'] = outputs['steps']
metadata['cfg_scale'] = outputs.get('cfg_scale')
elif node_type == 'l2i' and 'image' in outputs:
img = outputs['image']
metadata['dimensions'] = f"{img['width']}x{img['height']}"
return metadata
# Extract and display metadata
meta = extract_generation_metadata(result)
print(f"Generated with: {meta['prompt'][:50]}...")
print(f"Seed: {meta['seed']}, Steps: {meta['steps']}")
Preserve Workflow Context¶
def create_result_package(wf, submission, result):
"""Create comprehensive result package."""
images = wf.map_outputs_to_images(result)
package = {
'session_id': submission['session_id'],
'workflow_name': wf.definition.meta.get('name', 'unnamed'),
'timestamp': datetime.now().isoformat(),
'status': result['status'],
'images': images,
'metadata': extract_generation_metadata(result),
'input_values': {}
}
# Include input values
for inp in wf.list_inputs():
field = wf.get_input_value(inp.input_index)
if hasattr(field, 'value'):
package['input_values'][inp.label] = field.value
return package
# Create full result package
package = create_result_package(wf, submission, result)
save_json(package, f"results/{package['session_id']}.json")
Batch Processing Results¶
Map Batch Results¶
def process_batch_results(results):
"""Process results from batch execution."""
batch_output = {
'successful': [],
'failed': [],
'statistics': {
'total': len(results),
'completed': 0,
'failed': 0,
'images_generated': 0
}
}
for i, result in enumerate(results):
if result['status'] == 'COMPLETED':
images = map_outputs_to_images(result)
batch_output['successful'].append({
'index': i,
'images': images,
'count': len(images)
})
batch_output['statistics']['completed'] += 1
batch_output['statistics']['images_generated'] += len(images)
else:
batch_output['failed'].append({
'index': i,
'status': result['status'],
'error': result.get('error')
})
batch_output['statistics']['failed'] += 1
return batch_output
# Process batch
batch_results = []
for params in batch_params:
wf.apply_params(params)
submission = wf.submit_sync()
result = wf.wait_for_completion_sync(submission)
batch_results.append(result)
batch_output = process_batch_results(batch_results)
print(f"Batch complete: {batch_output['statistics']}")
Aggregate Outputs¶
def aggregate_batch_outputs(results, output_board):
"""Aggregate all outputs to single board."""
all_images = []
for result in results:
if result['status'] == 'COMPLETED':
images = map_outputs_to_images(result)
all_images.extend(images)
# Move all to output board
board = client.board_repo.get_board_handle(output_board)
for img_info in all_images:
if img_info['board_id'] != output_board:
# Move image to aggregate board
move_image(client, img_info['name'],
img_info['board_id'], output_board)
return len(all_images)
Error Handling¶
Safe Output Mapping¶
def safe_map_outputs(result):
"""Safely map outputs with error handling."""
try:
if not result:
return {'error': 'No result provided', 'images': []}
if result.get('status') != 'COMPLETED':
return {
'error': f"Workflow {result.get('status', 'UNKNOWN')}",
'images': []
}
images = map_outputs_to_images(result)
if not images:
return {'warning': 'No images generated', 'images': []}
return {'success': True, 'images': images}
except Exception as e:
return {'error': str(e), 'images': []}
# Use safe mapping
mapped = safe_map_outputs(result)
if mapped.get('success'):
for img in mapped['images']:
print(f"Success: {img['name']}")
else:
print(f"Issue: {mapped.get('error') or mapped.get('warning')}")
Partial Results¶
def handle_partial_results(result):
"""Handle workflows that partially complete."""
all_outputs = []
errors = []
for output in result.get('invocation_outputs', []):
if 'error' in output:
errors.append({
'node': output['invocation_id'],
'error': output['error']
})
elif 'outputs' in output:
all_outputs.append(output)
# Extract what we can
images = []
for output in all_outputs:
if 'image' in output.get('outputs', {}):
images.append(output['outputs']['image']['image_name'])
return {
'partial': True,
'images': images,
'errors': errors,
'completion_rate': len(all_outputs) /
(len(all_outputs) + len(errors))
}
Output Formats¶
JSON Export¶
def export_results_json(result, filepath):
"""Export results to JSON format."""
export_data = {
'session_id': result.get('session_id'),
'status': result.get('status'),
'images': map_outputs_to_images(result),
'metadata': extract_generation_metadata(result),
'timestamp': datetime.now().isoformat()
}
with open(filepath, 'w') as f:
json.dump(export_data, f, indent=2)
return filepath
CSV Export¶
import csv
def export_results_csv(results, filepath):
"""Export batch results to CSV."""
with open(filepath, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['session_id', 'image_name', 'width',
'height', 'prompt', 'seed', 'status'])
for result in results:
meta = extract_generation_metadata(result)
images = map_outputs_to_images(result)
for img in images:
writer.writerow([
result.get('session_id'),
img['name'],
img.get('width'),
img.get('height'),
meta.get('prompt', '')[:100],
meta.get('seed'),
result.get('status')
])
return filepath
Performance Optimization¶
Lazy Output Loading¶
class LazyOutputMapper:
"""Lazy load outputs only when accessed."""
def __init__(self, result):
self.result = result
self._images = None
self._metadata = None
@property
def images(self):
if self._images is None:
self._images = map_outputs_to_images(self.result)
return self._images
@property
def metadata(self):
if self._metadata is None:
self._metadata = extract_generation_metadata(self.result)
return self._metadata
def get_image(self, index=0):
"""Get specific image by index."""
if index < len(self.images):
return self.images[index]
return None
# Use lazy mapper
mapper = LazyOutputMapper(result)
# Images only extracted when accessed
if mapper.images:
print(f"Generated {len(mapper.images)} images")
Selective Mapping¶
def map_selected_outputs(result, output_filter):
"""Map only selected outputs."""
selected = []
for output in result.get('invocation_outputs', []):
if output_filter(output):
selected.append(output['outputs'])
return selected
# Map only l2i outputs
l2i_only = map_selected_outputs(
result,
lambda o: o['invocation_type'] == 'l2i'
)
# Map outputs from specific nodes
specific_nodes = map_selected_outputs(
result,
lambda o: o['invocation_id'] in ['node1', 'node2']
)
Best Practices¶
1. Always Check Status¶
# Always verify completion before mapping
if result.get('status') == 'COMPLETED':
images = wf.map_outputs_to_images(result)
else:
print(f"Workflow failed: {result.get('status')}")
images = []
2. Handle Missing Outputs¶
# Defensive output access
for output in result.get('invocation_outputs', []):
outputs = output.get('outputs', {})
if image := outputs.get('image'):
# Process image
pass
3. Document Expected Outputs¶
"""
Workflow: SDXL Text-to-Image
Expected Outputs:
- l2i node: Final generated image
- nsfw_checker: Safety check results
- metadata node: Generation parameters
"""
Next Steps¶
- Learn about Boards for organizing outputs
- Explore Image Operations for handling results
- Master Execution Modes for different patterns
- Review Workflow Basics for complete examples