Apr 15, 2025

Justin Trugman
Cofounder & Head of Technology
BetterFutureLabs

Multi-threading with open-source LLMs was a game changer for us. We've been able to run independent tasks simultaneously, instead of waiting for agents sequentially, slashing turnaround times on document processing. Our team can now process 5x more cases without proportional headcount increases.
— Justin Trugman, Cofounder & Head of Technology, BetterFutureLabs
Consider a real-world scenario where you're building a medical diagnosis support system. Your system needs multiple specialized multi-agent teams simultaneously analyzing different aspects of a patient case:
Each team must reach internal consensus through multi-agent discussions before contributing to the final diagnostic assessment.
Sequential execution: 40-58 minutes total (each team waits for the previous to complete)
Parallel execution: 10-15 minutes total (limited by the longest-running team)
This 70-75% reduction in processing time transforms diagnosis from a lengthy batch process into a near real-time clinical decision support tool, potentially improving patient outcomes through faster, more comprehensive analysis.
We've found success using Python's concurrent.futures.ThreadPoolExecutor as the foundation for parallel agent execution. This approach provides several key advantages:
How it works: ThreadPoolExecutor creates a pool of worker threads that can execute functions concurrently. When you submit a task, it gets assigned to an available worker thread. The key insight is that while one thread waits for an API response, other threads can continue processing their own tasks.
import concurrent.futures
# Define independent diagnostic analysis tasks
diagnostic_tasks = [
"symptom_pattern_analysis",
"literature_review",
"drug_interaction_check",
"imaging_analysis",
"treatment_planning"
]
# Execute diagnostic teams in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(run_diagnostic_team, team_id, analysis_type, patient_data)
for analysis_type in diagnostic_tasks
]
concurrent.futures.wait(futures) # Wait for all teams to completeEach run_diagnostic_team call creates an independent set of AG2 agents (medical specialist, clinical reviewer, supervisor) that collaborate on their specific analysis without interfering with other parallel teams. The concurrent.futures.wait() function blocks until all teams have completed their work, ensuring synchronization before proceeding to further diagnostic synthesis.
For this example, we're using an Agent Factory Pattern to showcase the power of parallel execution. Rather than sharing agent instances across parallel tasks, each parallel execution creates completely fresh agent instances. This prevents state contamination and ensures true independence.
def run_diagnostic_team(team_id, analysis_type, patient_data):
"""
Factory function that creates a complete independent agent team
for a specific medical analysis task
"""
# Create fresh user proxy for this diagnostic team
user_proxy = UserProxyAgent(
name=f"coordinator_{analysis_type}_{team_id}",
is_termination_msg=lambda msg: "TERMINATE" in msg["content"],
human_input_mode="NEVER",
max_consecutive_auto_reply=1,
code_execution_config=False
)
# Create specialized medical analyst with unique configuration
medical_specialist = GPTAssistantAgent(
name=f"specialist_{analysis_type}_{team_id}",
instructions=medical_instructions[analysis_type],
overwrite_instructions=True, # Ensure clean state
overwrite_tools=True,
llm_config={
"config_list": config_list,
"tools": diagnostic_tools[analysis_type],
"assistant_id": specialist_assistant_ids[analysis_type]
}
)
# Register analysis-specific functions
medical_specialist.register_function(
function_map=diagnostic_functions[analysis_type]
)
# Create complete diagnostic team and execute
team = [user_proxy, medical_specialist, clinical_reviewer, supervisor]
groupchat = ag2.GroupChat(agents=team, messages=[], max_round=15)
chat_manager = ag2.GroupChatManager(groupchat=groupchat, llm_config={"config_list": config_list})
# Execute diagnostic analysis with patient context
user_proxy.initiate_chat(chat_manager, message=f"Perform {analysis_type} for patient: {patient_data}")overwrite_instructions=True ensures no state leakage between executionsThe most dangerous trap in parallel execution is sharing mutable state between agents. When multiple threads modify the same data structure simultaneously, you get unpredictable behavior and data corruption. This is especially problematic with global dictionaries, shared configuration objects, or any state that gets modified during execution.
# ❌ BAD: Shared mutable state
shared_patient_analysis = {}
def bad_diagnostic_process(analysis_type):
shared_patient_analysis[analysis_type] = perform_analysis(...) # Race condition!
# ✅ GOOD: Independent storage
def good_diagnostic_process(analysis_type, patient_id):
result = perform_analysis(...)
save_to_file(f"diagnosis_{patient_id}_{analysis_type}.json", result)Creating unlimited threads will overwhelm your system and degrade performance for all tasks. Each thread consumes memory and system resources. Too many threads also increase context switching overhead, making everything slower rather than faster.
# ❌ BAD: Unlimited workers
with ThreadPoolExecutor() as executor: # Could create too many threads
# ✅ GOOD: Controlled resource usage
max_workers = min(len(tasks), 5)
with ThreadPoolExecutor(max_workers=max_workers) as executor:In parallel execution, individual tasks will fail. Network timeouts, API rate limits, and processing errors are inevitable. If you don't handle these gracefully, one failed task can crash your entire parallel workflow, wasting all the work completed by successful tasks.
# ❌ BAD: No error handling
futures = [executor.submit(task) for task in tasks]
results = [f.result() for f in futures] # Will crash on any failure
# ✅ GOOD: Graceful error handling
for future in concurrent.futures.as_completed(futures):
try:
result = future.result(timeout=300)
handle_success(result)
except Exception as e:
handle_error(e)Robust error handling is essential for production parallel execution. You need to capture and log failures without stopping the entire workflow. The key is to collect both successful results and error information, then decide how to handle partial failures based on your business requirements.
def execute_parallel_tasks(doc_id, tasks, document_data):
results = {}
errors = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_task = {
executor.submit(process_document, doc_id, task, document_data): task
for task in tasks
}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
results[task] = future.result(timeout=300)
logging.info(f"Task {task} completed successfully")
except Exception as e:
errors[task] = str(e)
logging.error(f"Task {task} failed: {e}")
return results, errorsThe default ThreadPoolExecutor worker count is min(32, (os.process_cpu_count() or 1) + 4). This formula preserves at least 5 workers while avoiding excessive resource usage on many-core machines. For more details, see the Python ThreadPoolExecutor documentation.
# Calculate optimal worker count based on Python's official recommendation
import os
def get_optimal_workers(task_count):
# Following Python's default formula
default_workers = min(32, (os.process_cpu_count() or 1) + 4)
return min(task_count, default_workers)After implementing the core patterns, follow these production-ready guidelines:
min(32, (os.process_cpu_count() or 1) + 4 for I/O-bound agent operationsWhatever monitoring tool or logging system you're using, these are valuable metrics to track for parallel agent execution:
Parallel agent execution with AG2 transforms research and analysis applications from sequential batch operations into concurrent workflows. The patterns outlined in this guide provide a production-ready foundation for scaling multi-agent systems.
By applying proper resource management patterns and designing for independence, you can build systems that scale horizontally while maintaining the collaborative intelligence that makes multi-agent systems effective.
Parallel execution fundamentally changes how users interact with multi-agent systems. Instead of waiting for sequential processing, users get concurrent analysis across multiple specialized domains. Start with these patterns, measure your results, and iterate based on your specific use case requirements.