By Appropri8 Team

Task Graph Intelligence: Using DAG-Based Planning for Autonomous AI Agents

aiai-agentsdagtask-planninggraph-theoryautonomous-agentspythonnetworkxparallel-executiondependency-resolutionself-healing

Most AI agents work like a to-do list. They pick a task, complete it, then move to the next one. It’s simple, but it’s slow. And when a task fails, everything stops.

There’s a better way. Instead of a linear queue, use a graph. Tasks become nodes. Dependencies become edges. The agent can see which tasks can run in parallel. It can handle failures by retrying or finding alternatives. It can adapt the plan as it learns.

This is task graph intelligence. It’s how modern autonomous agents handle complex, multi-step workflows. They don’t just execute tasks. They reason about task relationships. They optimize execution order. They recover from failures.

This article shows you how DAG-based planning works, why it matters, and how to implement it in your agents.

Introduction: From Linear Task Execution to Graph-Based Reasoning

Traditional task execution is linear. The agent has a list of tasks. It processes them one by one. Task 1, then task 2, then task 3. If task 2 needs something from task 1, you wait. If task 2 fails, you’re stuck.

This works for simple workflows. But real work isn’t linear. Tasks have dependencies. Some tasks can run in parallel. Some tasks might fail and need retries. Some tasks become unnecessary if other tasks fail.

Consider a research assistant agent. It needs to:

  1. Search for papers on a topic
  2. Download the papers
  3. Extract key information
  4. Summarize findings
  5. Generate citations

In a linear system, this takes five sequential steps. But tasks 1 and 2 can run in parallel for different papers. Task 3 depends on task 2, but you can process multiple papers simultaneously. Task 4 depends on task 3, but again, you can summarize multiple papers in parallel. Task 5 depends on task 4.

A DAG captures these relationships. The agent sees that it can download multiple papers while searching for more. It sees that extraction can happen in parallel for different papers. It sees that summarization can happen as soon as extraction completes for each paper.

The agent also sees failure points. If downloading one paper fails, it doesn’t block the others. If extraction fails for one paper, the agent can retry or skip it without stopping everything.

This is why DAGs fit modern multi-step workflows. They model reality better than linear queues. They enable parallelism. They handle dependencies. They support failure recovery.

Understanding Task Graph Intelligence

Task graph intelligence means the agent understands task relationships and uses that understanding to optimize execution. It’s not just executing tasks. It’s reasoning about the graph structure.

Definition and Purpose

A task graph is a Directed Acyclic Graph where:

  • Nodes represent tasks
  • Edges represent dependencies (task A must complete before task B)
  • The graph is acyclic (no circular dependencies)

Task graph intelligence adds reasoning on top of this structure. The agent can:

  • Generate the graph dynamically based on context
  • Optimize execution order using topological sorting
  • Detect and handle failures
  • Reorder tasks when dependencies change
  • Parallelize independent tasks

The purpose is to make agents more efficient and resilient. Instead of waiting for tasks to complete sequentially, the agent runs what it can in parallel. Instead of failing completely when one task fails, the agent adapts.

Relationship to Planning and Reasoning

Task graphs connect to planning and reasoning in a few ways.

First, they’re a planning structure. The agent plans by building a graph of tasks. It reasons about dependencies. It decides what to do next based on what’s ready.

Second, they support dynamic planning. The graph can change as the agent learns. New tasks can be added. Dependencies can be updated. Failed tasks can be replaced with alternatives.

Third, they enable reasoning about execution. The agent can reason about which tasks are critical. It can reason about failure impact. It can reason about optimization opportunities.

A reasoning loop might work like this:

  1. Agent receives a goal
  2. Agent generates a task graph to achieve the goal
  3. Agent executes tasks in optimal order (topological sort)
  4. Agent monitors execution and updates the graph as needed
  5. Agent handles failures by modifying the graph
  6. Agent completes when all critical tasks are done

The graph is both the plan and the execution state. It evolves as the agent works.

Architectural Blueprint

The architecture has three main components: task nodes, dependency edges, and execution state.

Task Nodes

Each task node contains:

  • Task ID (unique identifier)
  • Task description (what needs to be done)
  • Task function (the code that executes the task)
  • Status (pending, running, completed, failed)
  • Result (output from execution)
  • Metadata (timing, retry count, etc.)

Tasks can be simple (execute a function) or complex (spawn sub-graphs). A complex task might generate its own task graph, creating a hierarchical structure.

Dependency Edges

Edges represent dependencies. An edge from task A to task B means B depends on A. B can’t start until A completes.

Edges can be:

  • Hard dependencies (B must wait for A)
  • Soft dependencies (B can start but might need A’s result)
  • Conditional dependencies (B depends on A only if A succeeds)
  • Time-based dependencies (B depends on A completing within a time window)

The graph structure determines execution order. Tasks with no incoming edges can start immediately. As tasks complete, their dependent tasks become ready.

Execution State

The execution state tracks:

  • Which tasks are ready to run (no unmet dependencies)
  • Which tasks are currently running
  • Which tasks have completed
  • Which tasks have failed
  • The overall graph state

The state updates as tasks execute. When a task completes, the agent checks if any dependent tasks are now ready. When a task fails, the agent decides how to handle it (retry, skip, find alternative).

Integrating with a Reasoning Loop

The task graph integrates with the agent’s reasoning loop:

Reasoning Loop:
1. Observe current state
2. Generate/update task graph
3. Select ready tasks
4. Execute tasks (in parallel if possible)
5. Update graph state
6. Reflect on progress
7. Repeat until goal achieved

The graph generation happens in step 2. The agent reasons about what tasks are needed. It identifies dependencies. It builds the graph structure.

Task selection happens in step 3. The agent uses topological sorting to find ready tasks. It might prioritize certain tasks based on heuristics.

Execution happens in step 4. The agent runs ready tasks, potentially in parallel. It monitors execution and handles failures.

Graph updates happen in step 5. As tasks complete or fail, the graph state changes. New tasks might become ready. The agent might need to add new tasks or modify dependencies.

Reflection happens in step 6. The agent evaluates progress. It might modify the graph based on what it learned. It might add new tasks or remove unnecessary ones.

Building a DAG Planner

Let’s build a DAG planner in Python. We’ll use networkx for graph operations and asyncio for parallel execution.

Core DAG Structure

First, we need a task graph class:

import networkx as nx
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
from datetime import datetime
import asyncio
from collections import deque

class TaskStatus(Enum):
    PENDING = "pending"
    READY = "ready"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class Task:
    """Represents a single task in the graph"""
    task_id: str
    description: str
    func: Callable
    args: tuple = field(default_factory=tuple)
    kwargs: dict = field(default_factory=dict)
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None
    error: Optional[Exception] = None
    retry_count: int = 0
    max_retries: int = 3
    metadata: dict = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.now)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

class TaskGraph:
    """Manages a DAG of tasks with dependency resolution and execution"""
    
    def __init__(self, graph_id: str = "default"):
        self.graph_id = graph_id
        self.graph = nx.DiGraph()
        self.tasks: Dict[str, Task] = {}
        self.execution_history: List[Dict] = []
        
    def add_task(
        self,
        task_id: str,
        description: str,
        func: Callable,
        dependencies: Optional[List[str]] = None,
        args: tuple = (),
        kwargs: dict = None,
        max_retries: int = 3
    ) -> str:
        """Add a task to the graph"""
        if task_id in self.tasks:
            raise ValueError(f"Task {task_id} already exists")
        
        task = Task(
            task_id=task_id,
            description=description,
            func=func,
            args=args,
            kwargs=kwargs or {},
            max_retries=max_retries
        )
        
        self.tasks[task_id] = task
        self.graph.add_node(task_id)
        
        # Add dependencies
        if dependencies:
            for dep_id in dependencies:
                if dep_id not in self.tasks:
                    raise ValueError(f"Dependency {dep_id} does not exist")
                self.graph.add_edge(dep_id, task_id)
        
        return task_id
    
    def validate_graph(self) -> bool:
        """Validate that the graph is acyclic"""
        try:
            # Check for cycles
            cycles = list(nx.simple_cycles(self.graph))
            if cycles:
                raise ValueError(f"Graph contains cycles: {cycles}")
            return True
        except nx.NetworkXError:
            return True
    
    def get_ready_tasks(self) -> List[str]:
        """Get tasks that are ready to execute (no unmet dependencies)"""
        ready = []
        
        for task_id, task in self.tasks.items():
            if task.status != TaskStatus.PENDING:
                continue
            
            # Check if all dependencies are completed
            dependencies = list(self.graph.predecessors(task_id))
            if not dependencies:
                # No dependencies, ready to run
                ready.append(task_id)
            else:
                # Check if all dependencies are completed
                all_completed = all(
                    self.tasks[dep_id].status == TaskStatus.COMPLETED
                    for dep_id in dependencies
                )
                if all_completed:
                    ready.append(task_id)
        
        return ready
    
    def get_topological_order(self) -> List[str]:
        """Get tasks in topological order (dependency order)"""
        try:
            return list(nx.topological_sort(self.graph))
        except nx.NetworkXError as e:
            raise ValueError(f"Graph is not a valid DAG: {e}")
    
    async def execute_task(self, task_id: str) -> Any:
        """Execute a single task"""
        task = self.tasks[task_id]
        
        if task.status != TaskStatus.PENDING and task.status != TaskStatus.READY:
            raise ValueError(f"Task {task_id} is not ready to execute")
        
        task.status = TaskStatus.RUNNING
        task.started_at = datetime.now()
        
        try:
            # Execute the task function
            if asyncio.iscoroutinefunction(task.func):
                result = await task.func(*task.args, **task.kwargs)
            else:
                result = task.func(*task.args, **task.kwargs)
            
            task.result = result
            task.status = TaskStatus.COMPLETED
            task.completed_at = datetime.now()
            
            self.execution_history.append({
                'task_id': task_id,
                'status': 'completed',
                'timestamp': datetime.now().isoformat(),
                'duration': (task.completed_at - task.started_at).total_seconds()
            })
            
            return result
            
        except Exception as e:
            task.error = e
            task.retry_count += 1
            
            if task.retry_count <= task.max_retries:
                # Retry the task
                task.status = TaskStatus.PENDING
                self.execution_history.append({
                    'task_id': task_id,
                    'status': 'retrying',
                    'retry_count': task.retry_count,
                    'error': str(e),
                    'timestamp': datetime.now().isoformat()
                })
                # Will be retried in next execution cycle
            else:
                # Max retries exceeded
                task.status = TaskStatus.FAILED
                task.completed_at = datetime.now()
                self.execution_history.append({
                    'task_id': task_id,
                    'status': 'failed',
                    'error': str(e),
                    'retry_count': task.retry_count,
                    'timestamp': datetime.now().isoformat()
                })
            
            raise
    
    async def execute(self, max_parallel: int = 5) -> Dict[str, Any]:
        """Execute all tasks in the graph"""
        # Validate graph
        if not self.validate_graph():
            raise ValueError("Graph validation failed")
        
        completed_count = 0
        failed_count = 0
        total_tasks = len(self.tasks)
        
        # Mark initial ready tasks
        for task_id in self.get_ready_tasks():
            self.tasks[task_id].status = TaskStatus.READY
        
        # Execution loop
        while completed_count + failed_count < total_tasks:
            # Get ready tasks
            ready_tasks = self.get_ready_tasks()
            
            if not ready_tasks:
                # Check if we're stuck (all remaining tasks have failed dependencies)
                remaining = [
                    tid for tid, task in self.tasks.items()
                    if task.status not in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.SKIPPED]
                ]
                
                if remaining:
                    # Some tasks are stuck due to failed dependencies
                    for task_id in remaining:
                        deps = list(self.graph.predecessors(task_id))
                        failed_deps = [
                            dep for dep in deps
                            if self.tasks[dep].status == TaskStatus.FAILED
                        ]
                        if failed_deps:
                            # Skip tasks with failed dependencies
                            self.tasks[task_id].status = TaskStatus.SKIPPED
                            self.execution_history.append({
                                'task_id': task_id,
                                'status': 'skipped',
                                'reason': f"Failed dependencies: {failed_deps}",
                                'timestamp': datetime.now().isoformat()
                            })
                            failed_count += 1
                    continue
                else:
                    break
            
            # Execute ready tasks in parallel (up to max_parallel)
            tasks_to_run = ready_tasks[:max_parallel]
            execution_tasks = [
                self.execute_task(task_id) for task_id in tasks_to_run
            ]
            
            # Wait for batch to complete
            results = await asyncio.gather(*execution_tasks, return_exceptions=True)
            
            # Update counts
            for i, result in enumerate(results):
                task_id = tasks_to_run[i]
                task = self.tasks[task_id]
                
                if task.status == TaskStatus.COMPLETED:
                    completed_count += 1
                elif task.status == TaskStatus.FAILED:
                    failed_count += 1
            
            # Mark newly ready tasks
            for task_id in self.get_ready_tasks():
                if self.tasks[task_id].status == TaskStatus.PENDING:
                    self.tasks[task_id].status = TaskStatus.READY
        
        # Return execution summary
        return {
            'total_tasks': total_tasks,
            'completed': completed_count,
            'failed': failed_count,
            'skipped': total_tasks - completed_count - failed_count,
            'execution_history': self.execution_history
        }
    
    def get_execution_summary(self) -> Dict[str, Any]:
        """Get a summary of execution state"""
        status_counts = {}
        for status in TaskStatus:
            status_counts[status.value] = sum(
                1 for task in self.tasks.values() if task.status == status
            )
        
        return {
            'graph_id': self.graph_id,
            'total_tasks': len(self.tasks),
            'status_counts': status_counts,
            'graph_structure': {
                'nodes': list(self.graph.nodes()),
                'edges': list(self.graph.edges()),
                'is_dag': nx.is_directed_acyclic_graph(self.graph)
            }
        }

Topological Sorting for Dependency Resolution

Topological sorting ensures tasks execute in the correct order. NetworkX provides this:

def get_execution_order(graph: TaskGraph) -> List[List[str]]:
    """
    Get tasks grouped by execution level.
    Tasks in the same level can run in parallel.
    """
    if not graph.validate_graph():
        raise ValueError("Graph is not a valid DAG")
    
    # Get topological order
    topo_order = graph.get_topological_order()
    
    # Group by level (tasks with same dependency depth)
    levels = []
    completed = set()
    
    while len(completed) < len(topo_order):
        # Find tasks whose dependencies are all completed
        current_level = []
        for task_id in topo_order:
            if task_id in completed:
                continue
            
            deps = list(graph.graph.predecessors(task_id))
            if all(dep in completed for dep in deps):
                current_level.append(task_id)
                completed.add(task_id)
        
        if current_level:
            levels.append(current_level)
        else:
            break  # Should not happen in a valid DAG
    
    return levels

This groups tasks into levels. Tasks in level 0 have no dependencies. Tasks in level 1 depend only on level 0 tasks. And so on. Tasks within the same level can run in parallel.

Case Study: Research Assistant Agent

Let’s build a research assistant that uses task graphs for literature review workflows.

Literature Review Workflow

The agent needs to:

  1. Search for papers on multiple topics
  2. Download papers (can be parallel)
  3. Extract metadata (depends on download)
  4. Extract key sections (depends on download, can be parallel with metadata)
  5. Summarize each paper (depends on extraction)
  6. Compare papers (depends on all summaries)
  7. Generate final report (depends on comparison)

Here’s how we build this:

import asyncio
from typing import List, Dict

# Simulated functions for the research workflow
async def search_papers(topic: str) -> List[str]:
    """Search for papers on a topic"""
    await asyncio.sleep(0.5)  # Simulate API call
    return [f"paper_{topic}_1", f"paper_{topic}_2"]

async def download_paper(paper_id: str) -> Dict:
    """Download a paper"""
    await asyncio.sleep(0.3)  # Simulate download
    return {"id": paper_id, "content": f"Content of {paper_id}"}

async def extract_metadata(paper_data: Dict) -> Dict:
    """Extract metadata from paper"""
    await asyncio.sleep(0.2)
    return {"title": f"Title of {paper_data['id']}", "authors": ["Author1"]}

async def extract_key_sections(paper_data: Dict) -> List[str]:
    """Extract key sections from paper"""
    await asyncio.sleep(0.4)
    return [f"Section 1 of {paper_data['id']}", f"Section 2 of {paper_data['id']}"]

async def summarize_paper(metadata: Dict, sections: List[str]) -> str:
    """Summarize a paper"""
    await asyncio.sleep(0.3)
    return f"Summary of {metadata['title']}"

async def compare_papers(summaries: List[str]) -> str:
    """Compare multiple papers"""
    await asyncio.sleep(0.5)
    return f"Comparison of {len(summaries)} papers"

async def generate_report(comparison: str) -> str:
    """Generate final report"""
    await asyncio.sleep(0.4)
    return f"Final Report: {comparison}"

class ResearchAssistant:
    """Research assistant using task graph planning"""
    
    def __init__(self):
        self.graph = TaskGraph(graph_id="research_workflow")
    
    async def build_literature_review_graph(self, topics: List[str]) -> TaskGraph:
        """Build a task graph for literature review"""
        graph = TaskGraph(graph_id="literature_review")
        
        # Step 1: Search for papers (can be parallel for different topics)
        search_task_ids = []
        for topic in topics:
            task_id = graph.add_task(
                f"search_{topic}",
                f"Search papers on {topic}",
                search_papers,
                args=(topic,)
            )
            search_task_ids.append(task_id)
        
        # Step 2: Download papers (depends on search, can be parallel)
        download_task_ids = []
        paper_mapping = {}  # Maps download task to paper ID
        
        for search_task_id in search_task_ids:
            # We'll need to get papers from search result
            # For simplicity, we'll create download tasks that depend on search
            # In practice, you'd dynamically create tasks based on search results
            download_id = graph.add_task(
                f"download_{search_task_id}",
                f"Download papers from {search_task_id}",
                download_paper,
                dependencies=[search_task_id],
                args=("paper_1",)  # Simplified
            )
            download_task_ids.append(download_id)
            paper_mapping[download_id] = "paper_1"
        
        # Step 3: Extract metadata and key sections (depend on download, can be parallel)
        metadata_task_ids = []
        section_task_ids = []
        
        for download_id in download_task_ids:
            meta_id = graph.add_task(
                f"metadata_{download_id}",
                f"Extract metadata from {download_id}",
                extract_metadata,
                dependencies=[download_id],
                kwargs={"paper_data": {"id": paper_mapping[download_id]}}
            )
            metadata_task_ids.append(meta_id)
            
            section_id = graph.add_task(
                f"sections_{download_id}",
                f"Extract sections from {download_id}",
                extract_key_sections,
                dependencies=[download_id],
                kwargs={"paper_data": {"id": paper_mapping[download_id]}}
            )
            section_task_ids.append(section_id)
        
        # Step 4: Summarize papers (depends on metadata and sections)
        summary_task_ids = []
        for meta_id, section_id in zip(metadata_task_ids, section_task_ids):
            summary_id = graph.add_task(
                f"summary_{meta_id}",
                f"Summarize paper from {meta_id}",
                summarize_paper,
                dependencies=[meta_id, section_id],
                args=({}, [])  # Simplified
            )
            summary_task_ids.append(summary_id)
        
        # Step 5: Compare papers (depends on all summaries)
        compare_id = graph.add_task(
            "compare_papers",
            "Compare all papers",
            compare_papers,
            dependencies=summary_task_ids,
            args=([],)  # Simplified
        )
        
        # Step 6: Generate report (depends on comparison)
        report_id = graph.add_task(
            "generate_report",
            "Generate final report",
            generate_report,
            dependencies=[compare_id],
            args=("",)  # Simplified
        )
        
        return graph
    
    async def run_literature_review(self, topics: List[str]) -> Dict[str, Any]:
        """Run a complete literature review workflow"""
        graph = await self.build_literature_review_graph(topics)
        
        print(f"Built graph with {len(graph.tasks)} tasks")
        print(f"Execution levels: {get_execution_order(graph)}")
        
        # Execute the graph
        result = await graph.execute(max_parallel=5)
        
        return {
            'execution_result': result,
            'summary': graph.get_execution_summary()
        }

# Example usage
async def main():
    assistant = ResearchAssistant()
    result = await assistant.run_literature_review(["quantum computing", "machine learning"])
    print("\nExecution Summary:")
    print(result['summary'])

if __name__ == "__main__":
    asyncio.run(main())

This workflow shows how task graphs handle complex dependencies. Search tasks run in parallel. Download tasks run in parallel after searches complete. Extraction tasks run in parallel after downloads. Summarization waits for both metadata and sections. Comparison waits for all summaries. The report waits for comparison.

The graph structure makes these relationships explicit. The execution engine handles the rest.

Optimizing for Failure Recovery

Failure recovery is where task graphs really shine. When a task fails, the graph structure helps the agent decide what to do.

Self-Healing Edges

Self-healing means the graph can repair itself when tasks fail. Here’s how:

class SelfHealingTaskGraph(TaskGraph):
    """Task graph with self-healing capabilities"""
    
    def __init__(self, graph_id: str = "default"):
        super().__init__(graph_id)
        self.failure_handlers: Dict[str, Callable] = {}
        self.alternative_tasks: Dict[str, List[str]] = {}
    
    def register_failure_handler(self, task_id: str, handler: Callable):
        """Register a handler for when a task fails"""
        self.failure_handlers[task_id] = handler
    
    def add_alternative_task(self, original_task_id: str, alternative_task_id: str):
        """Register an alternative task if original fails"""
        if original_task_id not in self.alternative_tasks:
            self.alternative_tasks[original_task_id] = []
        self.alternative_tasks[original_task_id].append(alternative_task_id)
    
    async def handle_task_failure(self, task_id: str) -> bool:
        """Handle a task failure, return True if handled"""
        task = self.tasks[task_id]
        
        # Try failure handler
        if task_id in self.failure_handlers:
            handler = self.failure_handlers[task_id]
            try:
                result = await handler(task)
                if result:
                    # Handler successfully recovered
                    task.status = TaskStatus.COMPLETED
                    task.result = result
                    return True
            except Exception as e:
                print(f"Failure handler for {task_id} failed: {e}")
        
        # Try alternative tasks
        if task_id in self.alternative_tasks:
            alternatives = self.alternative_tasks[task_id]
            for alt_id in alternatives:
                if alt_id in self.tasks:
                    alt_task = self.tasks[alt_id]
                    # Check if alternative is already completed
                    if alt_task.status == TaskStatus.COMPLETED:
                        # Use alternative's result
                        task.status = TaskStatus.COMPLETED
                        task.result = alt_task.result
                        return True
                    elif alt_task.status == TaskStatus.PENDING:
                        # Try to execute alternative
                        try:
                            result = await self.execute_task(alt_id)
                            task.status = TaskStatus.COMPLETED
                            task.result = result
                            return True
                        except Exception:
                            continue
        
        # Try to skip and continue
        dependent_tasks = list(self.graph.successors(task_id))
        if not dependent_tasks:
            # No dependent tasks, safe to skip
            task.status = TaskStatus.SKIPPED
            return True
        
        # Check if dependents can handle missing input
        can_skip = True
        for dep_id in dependent_tasks:
            dep_task = self.tasks[dep_id]
            # Check if task can handle None/missing input
            if not self._can_handle_missing_input(dep_task):
                can_skip = False
                break
        
        if can_skip:
            task.status = TaskStatus.SKIPPED
            # Update dependent tasks to handle missing input
            for dep_id in dependent_tasks:
                self.tasks[dep_id].kwargs['skip_missing'] = True
            return True
        
        return False
    
    def _can_handle_missing_input(self, task: Task) -> bool:
        """Check if a task can handle missing input"""
        # In practice, this would check task metadata or function signature
        return task.kwargs.get('allow_missing', False)
    
    async def execute(self, max_parallel: int = 5) -> Dict[str, Any]:
        """Execute with failure recovery"""
        # First, try normal execution
        try:
            return await super().execute(max_parallel)
        except Exception:
            pass
        
        # Handle failures
        failed_tasks = [
            tid for tid, task in self.tasks.items()
            if task.status == TaskStatus.FAILED
        ]
        
        for task_id in failed_tasks:
            handled = await self.handle_task_failure(task_id)
            if handled:
                # Retry execution from where we left off
                return await super().execute(max_parallel)
        
        # If we get here, some failures couldn't be handled
        return await super().execute(max_parallel)

Dynamic Edge Reordering

Sometimes dependencies change during execution. The graph should adapt:

def reorder_dependencies(graph: TaskGraph, task_id: str, new_dependencies: List[str]):
    """Update dependencies for a task"""
    # Remove old edges
    old_deps = list(graph.graph.predecessors(task_id))
    for dep_id in old_deps:
        graph.graph.remove_edge(dep_id, task_id)
    
    # Add new edges
    for dep_id in new_dependencies:
        if dep_id not in graph.tasks:
            raise ValueError(f"Dependency {dep_id} does not exist")
        graph.graph.add_edge(dep_id, task_id)
    
    # Validate graph is still acyclic
    if not graph.validate_graph():
        raise ValueError("New dependencies create a cycle")
    
    # If task was ready but now has new dependencies, check if still ready
    task = graph.tasks[task_id]
    if task.status == TaskStatus.READY:
        new_deps_status = [
            graph.tasks[dep_id].status == TaskStatus.COMPLETED
            for dep_id in new_dependencies
        ]
        if not all(new_deps_status):
            task.status = TaskStatus.PENDING

This lets the agent adapt the plan as it learns. If a task discovers it needs different inputs, it can update its dependencies.

Monitoring and Visualization

Visualizing the task graph helps understand execution and debug issues.

DAG State Visualization

Here’s how to visualize the graph using networkx and matplotlib:

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

def visualize_task_graph(graph: TaskGraph, show_status: bool = True):
    """Visualize the task graph with status colors"""
    pos = nx.spring_layout(graph.graph, k=1, iterations=50)
    
    # Color nodes by status
    node_colors = []
    for node_id in graph.graph.nodes():
        status = graph.tasks[node_id].status
        if status == TaskStatus.COMPLETED:
            node_colors.append('green')
        elif status == TaskStatus.RUNNING:
            node_colors.append('blue')
        elif status == TaskStatus.FAILED:
            node_colors.append('red')
        elif status == TaskStatus.READY:
            node_colors.append('yellow')
        else:
            node_colors.append('gray')
    
    # Draw nodes
    nx.draw_networkx_nodes(
        graph.graph, pos,
        node_color=node_colors,
        node_size=2000,
        alpha=0.9
    )
    
    # Draw edges
    nx.draw_networkx_edges(
        graph.graph, pos,
        edge_color='gray',
        arrows=True,
        arrowsize=20,
        alpha=0.6
    )
    
    # Draw labels
    labels = {}
    for node_id in graph.graph.nodes():
        task = graph.tasks[node_id]
        if show_status:
            labels[node_id] = f"{node_id}\n{task.status.value}"
        else:
            labels[node_id] = node_id
    
    nx.draw_networkx_labels(graph.graph, pos, labels, font_size=8)
    
    # Add legend
    if show_status:
        patches = [
            mpatches.Patch(color='green', label='Completed'),
            mpatches.Patch(color='blue', label='Running'),
            mpatches.Patch(color='yellow', label='Ready'),
            mpatches.Patch(color='red', label='Failed'),
            mpatches.Patch(color='gray', label='Pending')
        ]
        plt.legend(handles=patches, loc='upper right')
    
    plt.title(f"Task Graph: {graph.graph_id}")
    plt.axis('off')
    plt.tight_layout()
    plt.show()

def print_execution_timeline(graph: TaskGraph):
    """Print a timeline of task execution"""
    tasks_by_time = sorted(
        graph.tasks.values(),
        key=lambda t: t.started_at or t.created_at
    )
    
    print("\nExecution Timeline:")
    print("-" * 80)
    for task in tasks_by_time:
        if task.started_at:
            duration = ""
            if task.completed_at:
                duration = f" ({(task.completed_at - task.started_at).total_seconds():.2f}s)"
            print(f"{task.started_at.strftime('%H:%M:%S')} - {task.task_id}: {task.status.value}{duration}")
        else:
            print(f"Pending - {task.task_id}: {task.status.value}")

Best Practices

Here are some things to watch out for when building task graphs.

Avoiding Cyclic Reasoning

Cycles break task graphs. The agent must ensure the graph stays acyclic.

Common cycle sources:

  • Task A depends on B, B depends on A (direct cycle)
  • Task A depends on B, B depends on C, C depends on A (indirect cycle)
  • Dynamic task creation that accidentally creates cycles

Solution: Validate the graph before execution. Check for cycles when adding tasks. Use cycle detection algorithms.

def detect_cycles(graph: TaskGraph) -> List[List[str]]:
    """Detect cycles in the graph"""
    try:
        cycles = list(nx.simple_cycles(graph.graph))
        return cycles
    except nx.NetworkXError:
        return []

Managing Graph Complexity

Large graphs become hard to manage. Too many tasks, too many dependencies, and the system slows down.

Strategies:

  • Hierarchical graphs: Break large graphs into sub-graphs. Each sub-graph is a task in the parent graph.
  • Lazy task creation: Don’t create all tasks upfront. Create them as dependencies become available.
  • Task grouping: Group related tasks. Execute groups instead of individual tasks.
  • Graph pruning: Remove unnecessary tasks. If a task becomes irrelevant, remove it and its dependents.
def prune_unnecessary_tasks(graph: TaskGraph, completed_goal: str):
    """Remove tasks that are no longer needed"""
    # Find tasks that are not on path to goal
    goal_task = graph.tasks.get(completed_goal)
    if not goal_task or goal_task.status != TaskStatus.COMPLETED:
        return
    
    # Find all ancestors of goal (tasks that contributed to goal)
    ancestors = set(nx.ancestors(graph.graph, completed_goal))
    ancestors.add(completed_goal)
    
    # Remove tasks not in ancestor set
    to_remove = [
        tid for tid in graph.tasks.keys()
        if tid not in ancestors and graph.tasks[tid].status == TaskStatus.PENDING
    ]
    
    for tid in to_remove:
        graph.graph.remove_node(tid)
        del graph.tasks[tid]

Long-Running Agents

Agents that run for hours or days need special considerations:

  • Checkpointing: Save graph state periodically. Resume from checkpoints if the agent crashes.
  • Resource limits: Set limits on parallel execution, memory usage, execution time.
  • Progress tracking: Monitor progress. Detect if the agent is stuck.
  • Graceful degradation: If resources are limited, prioritize critical tasks.

Future Outlook

Task graph intelligence is evolving. Here are directions it’s heading.

DAG Reasoning + Memory Graphs Integration

Task graphs model execution. Memory graphs model knowledge. Combining them creates agents that reason about both what to do and what they know.

A memory graph connects related facts, experiences, and concepts. A task graph connects related actions. When integrated, the agent can:

  • Generate tasks based on memory patterns
  • Update memory based on task results
  • Reason about which memories are relevant to which tasks

This creates more intelligent planning. The agent doesn’t just plan actions. It plans actions informed by its knowledge structure.

Towards Autonomous Coordination Layers

Multiple agents can share task graphs. One agent generates a graph. Others contribute tasks. The graph evolves as agents collaborate.

This requires:

  • Distributed graph management
  • Conflict resolution (when agents have conflicting tasks)
  • Consensus mechanisms (deciding which tasks to execute)
  • Graph merging (combining graphs from different agents)

The result is a coordination layer where agents work together through shared task graphs. They don’t need central control. The graph structure coordinates them.

Conclusion

Task graph intelligence changes how agents plan and execute. Instead of linear queues, agents use graphs that model task relationships. This enables parallelism, handles dependencies, and supports failure recovery.

The core idea is simple: represent tasks as nodes, dependencies as edges, execute using topological sorting. But the implementation details matter. Failure handling, dynamic updates, visualization, and complexity management are all important.

If you’re building autonomous agents, consider task graphs. Start simple — create a graph, add tasks with dependencies, execute in topological order. Then add failure recovery. Add dynamic updates. Add visualization. Build complexity as you learn what works.

The field is still evolving. Integration with memory graphs, distributed coordination, and learned task generation are active areas. But the foundation is solid. Task graphs make agents more efficient and resilient.

Give it a try. You might find that your agents work better when they can see the big picture.

Discussion

Join the conversation and share your thoughts

Discussion

0 / 5000