By Appropri8 Team

Composable Domain-Driven Architecture for AI-Driven Systems

architectureaidomain-driven-designmicroservicescomposable-architecture

AI is changing how we build software. Large language models and AI services are everywhere now. But most teams still use old architecture patterns that don’t work well with AI.

Traditional layered or monolithic architectures struggle with this new reality. They’re too rigid. Teams can’t move fast enough. AI services get tangled up with business logic. Domain ownership becomes unclear.

The solution? Move to a composable, domain-driven architecture that treats AI services as first-class components. This approach gives teams the agility they need while keeping AI services organized and maintainable.

Domain-Driven Architecture Recap

Domain-Driven Design (DDD) isn’t new, but it’s more important than ever with AI systems. The core idea is simple: organize your code around business domains, not technical layers.

DDD gives us bounded contexts. These are clear boundaries around different parts of your business. Each context has its own models, its own language, its own rules. The customer service team talks about “tickets” and “resolutions.” The order team talks about “carts” and “fulfillment.” They’re different worlds.

Aggregates keep related data together. They ensure consistency within a domain. The ubiquitous language helps everyone - developers, product managers, business users - speak the same way about the same things.

This aligns perfectly with architectural modularity. Each domain can own its own services. Teams can work independently. Changes in one domain don’t break others.

What “Composable” Means for AI Systems

Composable architecture means services and components that you can assemble or swap with minimal coupling. Think of it like building blocks. You can mix and match them to create different solutions.

In the AI era, this becomes crucial. AI services come in many forms:

  • LLMs for natural language processing
  • Inference services for real-time predictions
  • Model management for versioning and deployment
  • Feature services for data preparation

Each domain might need different AI capabilities. A customer service domain might call an AI agent for sentiment analysis. The order domain might use a recommendation engine. The content domain might need text generation.

The key is that each domain owns its AI services. They’re not shared across domains. This keeps things simple and maintainable.

The Architecture Pattern

Here’s how it works in practice. Each domain owns three things:

  1. Domain logic - the core business rules
  2. Domain-specific AI services - the AI capabilities it needs
  3. API/facade - how other domains interact with it

Shared services handle cross-cutting concerns:

  • Model registry - tracks AI model versions and metadata
  • Observability - monitoring, logging, tracing
  • Orchestration - coordinates between domains

The boundaries are clear. Domains communicate through well-defined APIs, event buses, or message brokers. Some flows are synchronous (when you need immediate results). Others are asynchronous (when you can handle things later).

Code Example: Domain Service with AI Integration

Let’s look at a customer service domain that uses AI for sentiment analysis. Here’s how the domain service might work:

from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from typing import Optional
import httpx
import asyncio
from kafka import KafkaProducer
import json
import logging

app = FastAPI(title="Customer Service Domain")

class TicketRequest(BaseModel):
    customer_id: str
    message: str
    priority: str

class TicketResponse(BaseModel):
    ticket_id: str
    sentiment: str
    confidence: float
    suggested_action: str

class SentimentAIRequest(BaseModel):
    text: str
    model_version: str = "v2.1"

class SentimentAIResponse(BaseModel):
    sentiment: str
    confidence: float
    model_version: str

@app.post("/tickets", response_model=TicketResponse)
async def create_ticket(
    request: TicketRequest,
    ai_model_version: Optional[str] = Header(None, alias="X-AI-Model-Version")
):
    """Create a new support ticket with AI-powered sentiment analysis."""
    
    # Call AI service for sentiment analysis
    try:
        ai_response = await call_sentiment_ai(
            request.message, 
            ai_model_version or "v2.1"
        )
    except Exception as e:
        logging.error(f"AI service failed: {e}")
        # Fallback to default sentiment
        ai_response = SentimentAIResponse(
            sentiment="neutral",
            confidence=0.5,
            model_version="fallback"
        )
    
    # Generate ticket ID and determine action
    ticket_id = f"TKT-{request.customer_id}-{int(asyncio.get_event_loop().time())}"
    suggested_action = determine_action(ai_response.sentiment, request.priority)
    
    # Publish event for other domains
    await publish_ticket_created_event({
        "ticket_id": ticket_id,
        "customer_id": request.customer_id,
        "sentiment": ai_response.sentiment,
        "priority": request.priority
    })
    
    return TicketResponse(
        ticket_id=ticket_id,
        sentiment=ai_response.sentiment,
        confidence=ai_response.confidence,
        suggested_action=suggested_action
    )

async def call_sentiment_ai(text: str, model_version: str) -> SentimentAIResponse:
    """Call the sentiment analysis AI service."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://sentiment-ai-service:8000/analyze",
            json={"text": text, "model_version": model_version},
            headers={"Content-Type": "application/json"},
            timeout=5.0
        )
        response.raise_for_status()
        return SentimentAIResponse(**response.json())

def determine_action(sentiment: str, priority: str) -> str:
    """Determine suggested action based on sentiment and priority."""
    if sentiment == "negative" and priority == "high":
        return "escalate_to_manager"
    elif sentiment == "negative":
        return "assign_senior_agent"
    else:
        return "standard_processing"

async def publish_ticket_created_event(event_data: dict):
    """Publish ticket creation event to message bus."""
    producer = KafkaProducer(
        bootstrap_servers=['kafka:9092'],
        value_serializer=lambda x: json.dumps(x).encode('utf-8')
    )
    producer.send('ticket-events', value=event_data)
    producer.flush()

Now let’s look at the AI service itself:

from fastapi import FastAPI, Header
from pydantic import BaseModel
import logging
from typing import Dict, Any
import time

app = FastAPI(title="Sentiment Analysis AI Service")

class AnalyzeRequest(BaseModel):
    text: str
    model_version: str

class AnalyzeResponse(BaseModel):
    sentiment: str
    confidence: float
    model_version: str
    processing_time_ms: int

# Mock model registry - in real implementation, this would be external
MODEL_REGISTRY = {
    "v2.1": {"path": "/models/sentiment_v2.1", "accuracy": 0.94},
    "v2.0": {"path": "/models/sentiment_v2.0", "accuracy": 0.91},
    "fallback": {"path": "/models/sentiment_fallback", "accuracy": 0.85}
}

@app.post("/analyze", response_model=AnalyzeResponse)
async def analyze_sentiment(
    request: AnalyzeRequest,
    request_id: str = Header(..., alias="X-Request-ID")
):
    """Analyze sentiment of input text using specified model version."""
    
    start_time = time.time()
    
    # Validate model version
    if request.model_version not in MODEL_REGISTRY:
        logging.warning(f"Unknown model version: {request.model_version}")
        request.model_version = "fallback"
    
    # Log telemetry
    logging.info(f"Processing request {request_id} with model {request.model_version}")
    
    # Mock sentiment analysis - replace with actual model inference
    sentiment, confidence = mock_sentiment_analysis(request.text, request.model_version)
    
    processing_time = int((time.time() - start_time) * 1000)
    
    # Log performance metrics
    logging.info(f"Request {request_id} completed in {processing_time}ms")
    
    return AnalyzeResponse(
        sentiment=sentiment,
        confidence=confidence,
        model_version=request.model_version,
        processing_time_ms=processing_time
    )

def mock_sentiment_analysis(text: str, model_version: str) -> tuple[str, float]:
    """Mock sentiment analysis - replace with actual model inference."""
    # Simple keyword-based sentiment for demo
    negative_words = ["bad", "terrible", "awful", "hate", "angry", "frustrated"]
    positive_words = ["good", "great", "excellent", "love", "happy", "satisfied"]
    
    text_lower = text.lower()
    negative_count = sum(1 for word in negative_words if word in text_lower)
    positive_count = sum(1 for word in positive_words if word in text_lower)
    
    if negative_count > positive_count:
        return "negative", 0.8
    elif positive_count > negative_count:
        return "positive", 0.8
    else:
        return "neutral", 0.6

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy", "service": "sentiment-ai"}

Testing the Architecture

Here’s how you might test this setup:

import pytest
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from your_domain_service import app

@pytest.fixture
def client():
    return TestClient(app)

@pytest.mark.asyncio
async def test_ticket_creation_with_ai_sentiment(client):
    """Test ticket creation with AI sentiment analysis."""
    
    # Mock the AI service response
    with patch('your_domain_service.call_sentiment_ai') as mock_ai:
        mock_ai.return_value = SentimentAIResponse(
            sentiment="negative",
            confidence=0.85,
            model_version="v2.1"
        )
        
        # Mock the event publishing
        with patch('your_domain_service.publish_ticket_created_event') as mock_publish:
            response = client.post(
                "/tickets",
                json={
                    "customer_id": "CUST-123",
                    "message": "This product is terrible and I hate it",
                    "priority": "high"
                },
                headers={"X-AI-Model-Version": "v2.1"}
            )
            
            assert response.status_code == 200
            data = response.json()
            
            # Verify response structure
            assert "ticket_id" in data
            assert data["sentiment"] == "negative"
            assert data["confidence"] == 0.85
            assert data["suggested_action"] == "escalate_to_manager"
            
            # Verify AI service was called correctly
            mock_ai.assert_called_once_with("This product is terrible and I hate it", "v2.1")
            
            # Verify event was published
            mock_publish.assert_called_once()
            event_data = mock_publish.call_args[0][0]
            assert event_data["sentiment"] == "negative"
            assert event_data["customer_id"] == "CUST-123"

@pytest.mark.asyncio
async def test_ai_service_fallback(client):
    """Test fallback behavior when AI service fails."""
    
    with patch('your_domain_service.call_sentiment_ai') as mock_ai:
        mock_ai.side_effect = Exception("AI service unavailable")
        
        response = client.post(
            "/tickets",
            json={
                "customer_id": "CUST-456",
                "message": "I need help with my order",
                "priority": "medium"
            }
        )
        
        assert response.status_code == 200
        data = response.json()
        
        # Should use fallback sentiment
        assert data["sentiment"] == "neutral"
        assert data["confidence"] == 0.5

Best Practices and Common Pitfalls

Best Practices

Clear domain boundaries. Each domain should own its data, its logic, and its AI services. Don’t share AI logic across domains. It creates coupling and makes changes harder.

Version your AI services. Models change. APIs change. Use version headers. Keep old versions running during transitions. This gives you time to test and roll back if needed.

Shared platform services. Don’t reinvent observability, model management, or orchestration in every domain. Build these once and share them. But keep domain-specific logic in the domains.

Bake in telemetry. Log everything. Track model performance. Monitor drift. Set up alerts. You can’t fix what you can’t see.

Use asynchronous flows where appropriate. Not everything needs to be synchronous. Events and message queues can decouple domains and improve resilience.

Common Pitfalls

Coupling domains through shared AI logic. This is the biggest mistake. If multiple domains use the same AI service, changes become painful. Each domain should own its AI capabilities.

Over-centralized model management. Having one team manage all AI models creates bottlenecks. Let domain teams own their models. Use shared services for common concerns like deployment and monitoring.

Ignoring domain ownership. When AI services are shared, it’s unclear who owns what. Bugs become someone else’s problem. Performance issues get ignored. Keep ownership clear.

No fallback for AI failures. AI services fail. Networks fail. Models return unexpected results. Always have fallback logic. Your system should work even when AI doesn’t.

Not monitoring model drift. Models degrade over time. Data changes. Performance drops. You need to track this and retrain when necessary.

Trade-offs to Consider

Orchestration vs choreography. Orchestration means one service coordinates everything. Choreography means services work together through events. Orchestration is simpler but creates coupling. Choreography is more flexible but harder to understand.

Synchronous vs asynchronous AI calls. Synchronous calls are simpler but can hurt latency. Asynchronous calls are more complex but more resilient. Choose based on your requirements.

Real-World Adoption

Getting Started

Identify domain boundaries. Look at your current system. What are the natural business domains? Where do teams already have ownership? Start there.

Inventory existing services and AI components. What do you have now? What AI capabilities exist? Where are they used? This gives you a baseline.

Define service contracts. How will domains communicate? What APIs do they need? What events will they publish? Get this clear before you start building.

Design shared services. What do all domains need? Model management? Observability? Event handling? Build these first.

Cultural Changes

Domain teams own their services and AI. This is a big shift. Teams need to understand AI. They need to monitor model performance. They need to handle failures. Invest in training.

Platform teams own shared services. Someone needs to build and maintain the shared infrastructure. This is usually a platform or infrastructure team.

Metrics to Track

AI latency. How fast are your AI services? Are they getting slower? This affects user experience.

Model usage. Which models are being used? How often? This helps with capacity planning and cost optimization.

Domain service latency. How fast are your domain services? Are they getting slower because of AI calls?

Event backlog. Are events being processed? Is the message queue backing up? This indicates system health.

Domain decoupling. How often do changes in one domain affect others? This measures how well your boundaries are working.

Conclusion

AI is here to stay. It’s becoming integral to most systems. Your architecture needs to adapt.

Treat AI services as domain components. Give each domain the AI capabilities it needs. Keep them separate from other domains. This gives you the agility to move fast and the clarity to maintain systems.

Invest in platform services for model management, observability, and orchestration. Don’t reinvent these in every domain. But keep domain-specific logic where it belongs.

The result? Teams can work independently. AI services are organized and maintainable. Changes don’t break everything. You can evolve quickly as AI capabilities improve.

Next Steps

Look at your current system. Where are AI services used? How are they organized? What domains exist? Start mapping AI dependencies to domains.

Then restructure toward domain-driven architecture. It’s not easy, but it’s necessary. The alternative is technical debt that gets worse over time.

Start small. Pick one domain. Move its AI services into the domain. See how it works. Learn from the experience. Then expand to other domains.

The future belongs to teams that can adapt quickly. Domain-driven, composable architecture with AI as a first-class citizen gives you that ability.

Additional Resources

  • Domain-Driven Design by Eric Evans - the foundational book on DDD
  • Building Microservices by Sam Newman - practical guidance on service architecture
  • AI Engineering practices from leading tech companies
  • Event-driven architecture patterns for better decoupling

Remember: architecture is about people as much as technology. Good architecture makes teams more productive. Bad architecture slows everyone down. Choose wisely.

Join the Discussion

Have thoughts on this article? Share your insights and engage with the community.