By Appropri8 Team

Designing Hybrid Cloud-Edge Architectures: Consistency Models for the Real World

system-designarchitecturedistributed-systemsedge-computingcloud-computingconsistencycrdtscqrsevent-sourcingiotpythoncap-theorem

Hybrid Cloud-Edge Architecture

Your drone needs to make decisions in milliseconds. But it’s flying over a remote area with spotty internet. The central cloud has the latest weather data and flight restrictions. The edge device has sensors and needs to avoid obstacles. How do you keep them in sync?

This is the problem with hybrid cloud-edge architectures. You need data in two places at once. The cloud has global context and powerful compute. The edge has real-time sensors and low latency. But they can’t always talk to each other.

Network partitions happen. Connections drop. Data drifts between edge and cloud. Without the right consistency model, your system breaks. A drone might try to fly through restricted airspace because it hasn’t received the latest updates. Or a retail store might sell items that are already out of stock at the warehouse.

This article covers how to design systems that work when edge and cloud disagree. We’ll look at consistency models that fit real-world constraints. Then we’ll explore patterns and code examples that actually work.

Introduction: The rise of hybrid execution layers

Systems are moving away from cloud-only architectures. The old model was simple: clients connect to a central cloud, everything processes there, results come back. That works for most web apps. But it breaks for applications that need low latency or work offline.

The shift happened because of three trends:

AI inference at the edge: Running models on-device cuts latency from hundreds of milliseconds to single digits. Your phone can translate speech without waiting for a cloud API.

IoT deployments: Factories, farms, and cities deploy thousands of sensors. Sending all data to the cloud is expensive and slow. Processing locally is cheaper and faster.

Offline-first applications: Retail stores need to work when the internet is down. Medical devices can’t depend on network connectivity. Autonomous vehicles can’t wait for cloud responses.

The architectural shift is from:

Client → Cloud → Database

To:

Edge Device → Local Processing + Local Cache

    Sync Layer

    Cloud → Global Database

This creates a consistency challenge. The edge device has its own view of the world. The cloud has a different view. They need to converge eventually, but they can disagree temporarily.

The architectural shift from cloud-only to cloud+edge

Let’s look at real examples of where this matters.

Drone control systems

A delivery drone flies a route. It needs to:

  • Avoid obstacles in real-time (edge processing)
  • Know about airspace restrictions (cloud sync)
  • Navigate around weather (cloud sync)
  • Make landing decisions (edge processing)

The drone can’t wait 200ms for cloud responses when avoiding a tree. But it also can’t ignore airspace restrictions that change daily.

In practice, the edge runs obstacle avoidance locally. Airspace and weather data sync from the cloud periodically. If the connection drops, the drone uses cached restrictions but prioritizes safety (it won’t enter restricted airspace even if the cache is stale).

Industrial IoT

A factory floor has hundreds of sensors monitoring equipment. Each sensor sends data every second. Sending everything to the cloud costs too much bandwidth. Processing locally makes sense.

But the cloud needs aggregated data for analytics. Managers want dashboards showing production trends across multiple factories. The cloud also coordinates between factories — if Factory A is down, Factory B might need to increase production.

The edge processes sensor data and detects anomalies. Critical alerts sync to cloud immediately. Normal readings batch and sync every few minutes.

On-device AI inference

Your phone’s camera app uses on-device models to detect faces and suggest edits. The model runs on the phone (edge) for instant results. But the cloud has newer models, better training data, and can do more expensive processing.

The architecture needs to:

  • Run inference on-device for speed
  • Sync model updates from cloud periodically
  • Upload interesting images to cloud for training
  • Balance user privacy with model improvement

The edge needs to work offline. The cloud needs to aggregate data for improvement.

Consistency challenges in hybrid systems

When data lives in two places, they can disagree. That’s the fundamental challenge.

CAP theorem in the edge era

The CAP theorem says you can only have two of three properties:

  • Consistency: All nodes see the same data
  • Availability: The system responds to requests
  • Partition tolerance: The system works during network failures

For edge systems, partitions are common. Your drone loses connection. Your IoT sensor can’t reach the cloud. Your phone goes into airplane mode.

If you choose consistency and partition tolerance, you lose availability. The edge device can’t update data without the cloud. That doesn’t work for real-time systems.

If you choose availability and partition tolerance, you lose consistency. The edge and cloud can have different data temporarily. That’s usually acceptable, but you need to handle conflicts.

Most edge systems choose availability and partition tolerance. They accept eventual consistency — the edge and cloud will agree eventually, but not always immediately.

Network partitions and data drift

When the network partitions, data drifts. The edge device continues operating with cached data. The cloud continues operating with its own data. They diverge.

Example: A retail store system. The store has a local inventory database. The warehouse has a central inventory system. They sync periodically.

A customer buys the last item in the store. The local system updates inventory to zero. But before it syncs to the cloud, another store tries to transfer that item from your store to theirs. The cloud still thinks you have inventory.

Or the reverse: The warehouse marks an item as out of stock. The store hasn’t synced yet, so it still shows the item as available. A customer tries to buy it, but the store’s local system rejects it when it finally syncs.

These conflicts are inevitable in distributed systems. The question is how to handle them.

Modern Consistency Models

Different consistency models fit different use cases. Let’s look at the main ones.

Strong consistency

Strong consistency means all nodes see the same data at the same time. When you write to one node, all other nodes see the change immediately.

This requires coordination. For edge systems, it usually means:

  • All writes go through the cloud
  • Edge devices read from local cache
  • Cache invalidates when data changes

Problem: If the edge loses connection, it can’t write. The system becomes unavailable.

Use strong consistency when correctness is more important than availability. Financial transactions, medical records, flight control systems. These can’t tolerate inconsistencies, even temporarily.

Eventual consistency

Eventual consistency means nodes can disagree temporarily, but they’ll converge eventually. The edge device can write locally, then sync to cloud later. Both can have different views for a while.

This is the most common model for edge systems. It provides availability during partitions. The edge keeps working even when disconnected.

The challenge is conflict resolution. When the edge and cloud sync, they might have conflicting updates. You need rules to merge them.

Use eventual consistency when availability matters more than immediate correctness. Most IoT, retail, and mobile apps fit here.

Causal consistency

Causal consistency preserves cause-and-effect relationships. If event A causes event B, all nodes see A before B. But unrelated events can appear in different orders.

Example: A document editing system. If user A edits paragraph 1, then user B edits paragraph 2 based on A’s changes, all nodes see A’s edit before B’s. But if user C edits paragraph 3 independently, nodes might see C’s edit in different orders relative to A and B.

This is weaker than strong consistency but stronger than eventual consistency. It prevents some classes of bugs while still allowing partitions.

Tunable consistency

Tunable consistency lets you choose per-operation. Read the latest inventory count with strong consistency. But read historical analytics with eventual consistency.

Some systems let you specify consistency levels in queries:

# Strong consistency - wait for latest data
inventory = await db.read("inventory", consistency="strong")

# Eventual consistency - fast, might be stale
analytics = await db.read("analytics", consistency="eventual")

This gives you control. Use strong consistency when you need it, eventual when you don’t.

CRDTs and delta-state sync in edge scenarios

CRDTs (Conflict-free Replicated Data Types) are data structures designed for eventual consistency. They guarantee that merging conflicting updates produces a correct result, without manual conflict resolution.

The key insight: Design data structures where operations commute. If two nodes apply operations in different orders, they get the same result.

CRDT counter example

A simple CRDT is a counter. Instead of storing a single number, each node stores a map of node IDs to counts. When you increment, you increment your own counter. When you merge, you sum all counters.

from dataclasses import dataclass, field
from typing import Dict
from uuid import uuid4

@dataclass
class CRDTCounter:
    """A conflict-free replicated counter.
    
    Each node maintains its own counter. When merging,
    we sum all counters to get the total.
    """
    node_id: str = field(default_factory=lambda: str(uuid4()))
    counters: Dict[str, int] = field(default_factory=dict)
    
    def increment(self, delta: int = 1):
        """Increment this node's counter."""
        current = self.counters.get(self.node_id, 0)
        self.counters[self.node_id] = current + delta
    
    def decrement(self, delta: int = 1):
        """Decrement this node's counter."""
        self.increment(-delta)
    
    def value(self) -> int:
        """Get the total count across all nodes."""
        return sum(self.counters.values())
    
    def merge(self, other: 'CRDTCounter') -> 'CRDTCounter':
        """Merge another counter into this one.
        
        This operation is commutative and associative.
        Order doesn't matter.
        """
        merged = CRDTCounter()
        merged.counters = self.counters.copy()
        
        for node_id, count in other.counters.items():
            merged.counters[node_id] = max(
                merged.counters.get(node_id, 0),
                count
            )
        
        return merged
    
    def to_dict(self) -> dict:
        """Serialize for network transmission."""
        return {
            "node_id": self.node_id,
            "counters": self.counters
        }
    
    @classmethod
    def from_dict(cls, data: dict) -> 'CRDTCounter':
        """Deserialize from network data."""
        counter = cls(node_id=data["node_id"])
        counter.counters = data["counters"]
        return counter

# Usage example
edge_counter = CRDTCounter()
edge_counter.increment(5)
edge_counter.increment(3)
print(f"Edge value: {edge_counter.value()}")  # 8

cloud_counter = CRDTCounter()
cloud_counter.increment(2)

# Merge
merged = edge_counter.merge(cloud_counter)
print(f"Merged value: {merged.value()}")  # 10

# Merging is commutative - order doesn't matter
merged2 = cloud_counter.merge(edge_counter)
print(f"Merged value (reverse): {merged2.value()}")  # 10

The counter works because addition is commutative. It doesn’t matter if edge increments before cloud, or cloud before edge. The result is the same.

More complex CRDTs

CRDTs exist for sets, maps, lists, and text editing. The idea is the same: design operations that commute.

For inventory systems, you might use a CRDT map where each store maintains its own inventory counts. When merging, you take the maximum count (assuming items can’t be negative).

@dataclass
class CRDTInventory:
    """Inventory tracking with CRDT semantics."""
    node_id: str = field(default_factory=lambda: str(uuid4()))
    items: Dict[str, Dict[str, int]] = field(default_factory=dict)
    # items[item_id][node_id] = count
    
    def set_stock(self, item_id: str, count: int):
        """Set stock level for this node."""
        if item_id not in self.items:
            self.items[item_id] = {}
        self.items[item_id][self.node_id] = max(0, count)
    
    def get_stock(self, item_id: str) -> int:
        """Get total stock across all nodes."""
        if item_id not in self.items:
            return 0
        return sum(self.items[item_id].values())
    
    def merge(self, other: 'CRDTInventory') -> 'CRDTInventory':
        """Merge another inventory into this one."""
        merged = CRDTInventory()
        merged.items = {}
        
        # Collect all item-node pairs
        all_items = set(self.items.keys()) | set(other.items.items.keys())
        
        for item_id in all_items:
            merged.items[item_id] = {}
            
            # Take max from each node
            if item_id in self.items:
                for node_id, count in self.items[item_id].items():
                    merged.items[item_id][node_id] = max(
                        merged.items[item_id].get(node_id, 0),
                        count
                    )
            
            if item_id in other.items:
                for node_id, count in other.items[item_id].items():
                    merged.items[item_id][node_id] = max(
                        merged.items[item_id].get(node_id, 0),
                        count
                    )
        
        return merged

CRDTs work well for some use cases, but they have limitations. They require careful design. Not all data structures can be made conflict-free. And they can use more storage (each node tracks its own state).

Design Patterns

Beyond consistency models, there are patterns that help manage edge-cloud sync.

Command Query Responsibility Segregation (CQRS)

CQRS separates reads from writes. You have one model for writing (commands) and another for reading (queries). This lets you optimize each independently.

For edge systems, this means:

  • Edge devices write to local command store
  • Commands sync to cloud
  • Cloud processes commands and updates read models
  • Edge devices read from local read model (which syncs from cloud)
class EdgeCQRSStore:
    """CQRS implementation for edge devices."""
    
    def __init__(self):
        self.commands = []  # Write model
        self.read_model = {}  # Read model (synced from cloud)
        self.pending_sync = []
    
    async def execute_command(self, command: dict):
        """Execute a command locally and queue for sync."""
        # Validate command
        if not self.validate_command(command):
            raise ValueError("Invalid command")
        
        # Store command locally
        command["timestamp"] = time.time()
        command["command_id"] = str(uuid4())
        self.commands.append(command)
        
        # Queue for sync to cloud
        self.pending_sync.append(command)
        
        # Optimistically update local read model
        # (Cloud will send authoritative update later)
        self.apply_command_locally(command)
        
        return command["command_id"]
    
    def apply_command_locally(self, command: dict):
        """Optimistically apply command to local read model."""
        # This is a simplified example
        # Real implementation would have proper command handlers
        if command["type"] == "update_inventory":
            item_id = command["item_id"]
            delta = command["delta"]
            self.read_model[item_id] = self.read_model.get(item_id, 0) + delta
    
    async def sync_to_cloud(self):
        """Sync pending commands to cloud."""
        if not self.pending_sync:
            return
        
        # Send commands to cloud
        # In practice, you'd batch these
        for command in self.pending_sync:
            try:
                await self.send_to_cloud(command)
                # Mark as synced (remove from pending)
                # In practice, track sync status more carefully
            except Exception as e:
                # Retry later
                logger.error(f"Failed to sync command: {e}")
    
    async def sync_from_cloud(self):
        """Sync read model from cloud."""
        # Cloud sends authoritative read model updates
        updates = await self.fetch_from_cloud()
        self.read_model.update(updates)
    
    def read(self, key: str):
        """Read from local read model."""
        return self.read_model.get(key)

CQRS works well for edge systems because:

  • Edge devices can write immediately (no network delay)
  • Reads are fast (local read model)
  • Cloud has full command history for auditing
  • Read models can be optimized separately (different indexes, denormalization)

Edge-first write buffering

Edge devices write locally first, then sync to cloud. This provides low latency for users but requires conflict resolution.

The pattern:

  1. User action triggers write
  2. Write to local store immediately
  3. Return success to user
  4. Sync to cloud in background
  5. If cloud rejects (conflict), reconcile
class EdgeWriteBuffer:
    """Buffer writes locally, sync to cloud asynchronously."""
    
    def __init__(self):
        self.local_store = {}
        self.pending_writes = []
        self.conflict_resolver = None
    
    async def write(self, key: str, value: any) -> bool:
        """Write locally, queue for cloud sync."""
        # Write to local store immediately
        self.local_store[key] = {
            "value": value,
            "timestamp": time.time(),
            "synced": False
        }
        
        # Queue for cloud sync
        self.pending_writes.append({
            "key": key,
            "value": value,
            "timestamp": time.time()
        })
        
        # Trigger async sync (don't wait)
        asyncio.create_task(self.sync_pending_writes())
        
        return True  # Optimistic - assume it will work
    
    async def read(self, key: str) -> any:
        """Read from local store."""
        if key in self.local_store:
            return self.local_store[key]["value"]
        return None
    
    async def sync_pending_writes(self):
        """Sync pending writes to cloud."""
        while self.pending_writes:
            write = self.pending_writes.pop(0)
            
            try:
                # Send to cloud
                response = await self.send_to_cloud(write)
                
                if response.get("conflict"):
                    # Cloud detected conflict, need to reconcile
                    await self.handle_conflict(write, response)
                else:
                    # Success, mark as synced
                    if write["key"] in self.local_store:
                        self.local_store[write["key"]]["synced"] = True
                
            except Exception as e:
                # Network error, put back in queue for retry
                self.pending_writes.insert(0, write)
                logger.error(f"Sync failed: {e}")
                await asyncio.sleep(5)  # Retry after delay
    
    async def handle_conflict(self, local_write: dict, cloud_response: dict):
        """Resolve conflict between local write and cloud state."""
        if self.conflict_resolver:
            resolved = await self.conflict_resolver.resolve(
                local_write,
                cloud_response["cloud_value"]
            )
            # Apply resolved value
            self.local_store[local_write["key"]] = {
                "value": resolved,
                "timestamp": time.time(),
                "synced": True
            }
        else:
            # Default: last-write-wins (cloud wins)
            self.local_store[local_write["key"]] = {
                "value": cloud_response["cloud_value"],
                "timestamp": cloud_response["timestamp"],
                "synced": True
            }

This pattern gives users instant feedback. But you need to handle conflicts when they occur.

Conflict resolution strategies

When edge and cloud have conflicting updates, you need to resolve them. Here are common strategies.

Last-write-wins (LWW): The update with the latest timestamp wins. Simple, but can lose data if two updates happen simultaneously.

def resolve_lww(local: dict, cloud: dict) -> dict:
    """Last-write-wins conflict resolution."""
    if cloud["timestamp"] > local["timestamp"]:
        return cloud["value"]
    return local["value"]

Vector clocks: Each node maintains a vector of logical clocks. When nodes sync, they compare vectors to determine causality.

from collections import defaultdict

class VectorClock:
    """Vector clock for causal ordering."""
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.clock = defaultdict(int)
        self.clock[node_id] = 0
    
    def tick(self):
        """Increment this node's clock."""
        self.clock[self.node_id] += 1
        return self.clock.copy()
    
    def update(self, other_clock: dict):
        """Update with another clock (take element-wise max)."""
        for node_id, time in other_clock.items():
            self.clock[node_id] = max(self.clock[node_id], time)
    
    def happens_before(self, other: 'VectorClock') -> bool:
        """Check if this clock happens before other."""
        # This happens before other if:
        # - All elements <= other's elements
        # - At least one element < other's element
        all_le = all(
            self.clock[node] <= other.clock[node]
            for node in set(self.clock.keys()) | set(other.clock.keys())
        )
        some_lt = any(
            self.clock[node] < other.clock[node]
            for node in set(self.clock.keys()) | set(other.clock.keys())
        )
        return all_le and some_lt
    
    def concurrent(self, other: 'VectorClock') -> bool:
        """Check if clocks are concurrent (no causal relation)."""
        return not (self.happens_before(other) or other.happens_before(self))

# Usage
edge_clock = VectorClock("edge")
edge_clock.tick()  # edge: 1

cloud_clock = VectorClock("cloud")
cloud_clock.tick()  # cloud: 1

# Edge and cloud are concurrent (no sync yet)
print(edge_clock.concurrent(cloud_clock))  # True

# After edge syncs to cloud
cloud_clock.update(edge_clock.clock)
print(cloud_clock.clock)  # {'cloud': 1, 'edge': 1}

# Now edge ticks again
edge_clock.tick()  # edge: 2

# Edge's update happens after cloud's last sync
print(edge_clock.happens_before(cloud_clock))  # False (edge is newer)

Vector clocks help determine causality. If update A happens before update B, A should be applied first. If they’re concurrent, you need other resolution strategies.

Application-specific merge: Sometimes conflicts need domain-specific logic. For a document editor, you might merge text changes. For inventory, you might take the sum.

async def resolve_inventory_conflict(local: dict, cloud: dict) -> dict:
    """Merge inventory changes (assuming they're additive)."""
    # If both changed the same item, take the sum
    # This assumes both changes are increments/decrements
    merged = cloud.copy()
    
    for item_id, local_count in local.items():
        if item_id in merged:
            # Both changed this item - take max (can't have negative inventory)
            merged[item_id] = max(merged[item_id], local_count)
        else:
            merged[item_id] = local_count
    
    return merged

The right strategy depends on your use case. CRDTs handle merging automatically. Vector clocks help with ordering. Application-specific logic gives you control.

Code Samples

Let’s look at practical examples.

Python example of CRDT counter using dataclasses

We already showed this earlier, but here’s a complete working example with network sync:

import asyncio
import json
from dataclasses import dataclass, field, asdict
from typing import Dict
from uuid import uuid4
import time

@dataclass
class CRDTCounter:
    node_id: str = field(default_factory=lambda: str(uuid4()))
    counters: Dict[str, int] = field(default_factory=dict)
    last_sync: float = field(default_factory=time.time)
    
    def increment(self, delta: int = 1):
        current = self.counters.get(self.node_id, 0)
        self.counters[self.node_id] = current + delta
    
    def value(self) -> int:
        return sum(self.counters.values())
    
    def merge(self, other: 'CRDTCounter') -> 'CRDTCounter':
        merged = CRDTCounter(node_id=self.node_id)
        merged.counters = self.counters.copy()
        
        for node_id, count in other.counters.items():
            merged.counters[node_id] = max(
                merged.counters.get(node_id, 0),
                count
            )
        
        merged.last_sync = max(self.last_sync, other.last_sync)
        return merged
    
    def to_json(self) -> str:
        return json.dumps({
            "node_id": self.node_id,
            "counters": self.counters,
            "last_sync": self.last_sync
        })
    
    @classmethod
    def from_json(cls, data: str) -> 'CRDTCounter':
        obj = json.loads(data)
        counter = cls(node_id=obj["node_id"])
        counter.counters = obj["counters"]
        counter.last_sync = obj["last_sync"]
        return counter

# Edge node
class EdgeNode:
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.counter = CRDTCounter(node_id=node_id)
        self.sync_interval = 30  # Sync every 30 seconds
    
    async def increment(self):
        self.counter.increment()
        print(f"[{self.node_id}] Incremented. Value: {self.counter.value()}")
    
    async def sync_with_cloud(self, cloud_counter: CRDTCounter):
        """Sync with cloud counter."""
        print(f"[{self.node_id}] Syncing with cloud...")
        
        # Merge counters
        self.counter = self.counter.merge(cloud_counter)
        cloud_counter = cloud_counter.merge(self.counter)
        
        print(f"[{self.node_id}] After sync. Value: {self.counter.value()}")
        return cloud_counter

# Simulate edge-cloud sync
async def main():
    edge1 = EdgeNode("edge-1")
    edge2 = EdgeNode("edge-2")
    cloud = CRDTCounter(node_id="cloud")
    
    # Edge nodes increment independently
    await edge1.increment()
    await edge1.increment()
    await edge2.increment()
    
    # Sync edge1 with cloud
    cloud = await edge1.sync_with_cloud(cloud)
    
    # Sync edge2 with cloud
    cloud = await edge2.sync_with_cloud(cloud)
    
    # Sync edge1 again (should see edge2's increment)
    cloud = await edge1.sync_with_cloud(cloud)
    
    print(f"Final cloud value: {cloud.value()}")
    print(f"Final edge1 value: {edge1.counter.value()}")
    print(f"Final edge2 value: {edge2.counter.value()}")

if __name__ == "__main__":
    asyncio.run(main())

This shows CRDTs in action. Nodes increment independently, then sync. After syncing, all nodes see the same total, even though they applied operations in different orders.

Edge synchronization pseudocode with eventual convergence

Here’s a more complete sync algorithm:

class EdgeSyncManager:
    """Manages synchronization between edge and cloud."""
    
    def __init__(self, edge_store, cloud_client):
        self.edge_store = edge_store
        self.cloud_client = cloud_client
        self.sync_lock = asyncio.Lock()
        self.last_sync_time = 0
        self.sync_interval = 60  # Sync every 60 seconds
    
    async def start_sync_loop(self):
        """Background task that syncs periodically."""
        while True:
            try:
                await asyncio.sleep(self.sync_interval)
                await self.sync()
            except Exception as e:
                logger.error(f"Sync failed: {e}")
                await asyncio.sleep(5)  # Retry soon on error
    
    async def sync(self):
        """Perform bidirectional sync with cloud."""
        async with self.sync_lock:
            # Don't sync too frequently
            if time.time() - self.last_sync_time < 10:
                return
            
            try:
                # 1. Send local changes to cloud
                local_changes = self.edge_store.get_pending_changes()
                if local_changes:
                    cloud_response = await self.cloud_client.push_changes(local_changes)
                    
                    # Handle conflicts
                    if cloud_response.get("conflicts"):
                        await self.resolve_conflicts(cloud_response["conflicts"])
                    
                    # Mark as synced
                    self.edge_store.mark_synced(local_changes)
                
                # 2. Pull cloud changes
                cloud_changes = await self.cloud_client.pull_changes(
                    since=self.edge_store.get_last_sync_timestamp()
                )
                
                if cloud_changes:
                    # Apply cloud changes to local store
                    await self.apply_cloud_changes(cloud_changes)
                    self.edge_store.update_sync_timestamp(time.time())
                
                self.last_sync_time = time.time()
                
            except Exception as e:
                # Network error - will retry on next interval
                logger.error(f"Sync error: {e}")
                raise
    
    async def resolve_conflicts(self, conflicts: list):
        """Resolve conflicts between local and cloud changes."""
        for conflict in conflicts:
            local_value = self.edge_store.get(conflict["key"])
            cloud_value = conflict["cloud_value"]
            
            # Use application-specific resolution
            resolved = await self.resolve_conflict(
                conflict["key"],
                local_value,
                cloud_value,
                conflict["local_timestamp"],
                conflict["cloud_timestamp"]
            )
            
            # Apply resolved value
            self.edge_store.set(conflict["key"], resolved)
    
    async def resolve_conflict(self, key: str, local: any, cloud: any, 
                               local_ts: float, cloud_ts: float) -> any:
        """Resolve a single conflict. Override for custom logic."""
        # Default: last-write-wins
        if cloud_ts > local_ts:
            return cloud
        return local
    
    async def apply_cloud_changes(self, changes: list):
        """Apply changes from cloud to local store."""
        for change in changes:
            # Check if we have a local uncommitted change
            if self.edge_store.has_uncommitted_change(change["key"]):
                # Conflict - needs resolution
                await self.resolve_conflicts([{
                    "key": change["key"],
                    "local_value": self.edge_store.get(change["key"]),
                    "cloud_value": change["value"],
                    "local_timestamp": self.edge_store.get_timestamp(change["key"]),
                    "cloud_timestamp": change["timestamp"]
                }])
            else:
                # No conflict, apply directly
                self.edge_store.set(change["key"], change["value"])

This handles the common case: periodic sync, conflict resolution, and eventual convergence.

Local-first write reconciliation demo

A complete example of local-first writes with reconciliation:

class LocalFirstStore:
    """Local-first store with automatic reconciliation."""
    
    def __init__(self, resolver=None):
        self.data = {}
        self.write_log = []  # Log of all writes
        self.pending_sync = []
        self.resolver = resolver or self.default_resolver
    
    async def write(self, key: str, value: any):
        """Write locally, queue for sync."""
        write_entry = {
            "key": key,
            "value": value,
            "timestamp": time.time(),
            "node_id": self.get_node_id(),
            "synced": False
        }
        
        # Write locally immediately
        self.data[key] = value
        self.write_log.append(write_entry)
        self.pending_sync.append(write_entry)
        
        # Trigger async sync
        asyncio.create_task(self.sync_async())
        
        return write_entry
    
    async def read(self, key: str) -> any:
        """Read from local store."""
        return self.data.get(key)
    
    async def sync_async(self):
        """Async sync with cloud."""
        if not self.pending_sync:
            return
        
        try:
            # Send pending writes to cloud
            response = await self.send_to_cloud(self.pending_sync)
            
            # Process cloud response
            await self.process_sync_response(response)
            
            # Mark as synced
            for write in self.pending_sync:
                write["synced"] = True
            
            self.pending_sync = []
            
        except Exception as e:
            # Will retry later
            logger.error(f"Sync failed: {e}")
    
    async def process_sync_response(self, response: dict):
        """Process response from cloud sync."""
        # Apply cloud updates
        for update in response.get("updates", []):
            key = update["key"]
            
            # Check for conflict
            if key in self.data and not self.is_synced(key):
                # Local uncommitted change exists
                resolved = await self.resolver(
                    key,
                    self.data[key],
                    update["value"],
                    self.get_timestamp(key),
                    update["timestamp"]
                )
                self.data[key] = resolved
            else:
                # No conflict, apply cloud value
                self.data[key] = update["value"]
        
        # Handle conflicts
        for conflict in response.get("conflicts", []):
            resolved = await self.resolver(
                conflict["key"],
                conflict["local_value"],
                conflict["cloud_value"],
                conflict["local_timestamp"],
                conflict["cloud_timestamp"]
            )
            self.data[conflict["key"]] = resolved
    
    def default_resolver(self, key: str, local: any, cloud: any, 
                        local_ts: float, cloud_ts: float) -> any:
        """Default conflict resolution: last-write-wins."""
        return cloud if cloud_ts > local_ts else local
    
    def is_synced(self, key: str) -> bool:
        """Check if key has uncommitted local changes."""
        for write in self.write_log:
            if write["key"] == key and not write["synced"]:
                return False
        return True
    
    def get_timestamp(self, key: str) -> float:
        """Get timestamp of local value for key."""
        for write in reversed(self.write_log):
            if write["key"] == key:
                return write["timestamp"]
        return 0
    
    async def send_to_cloud(self, writes: list) -> dict:
        """Send writes to cloud. Mock implementation."""
        # In practice, this would be an HTTP call or message queue
        await asyncio.sleep(0.1)  # Simulate network delay
        
        # Mock response
        return {
            "updates": [],
            "conflicts": []
        }
    
    def get_node_id(self) -> str:
        """Get this node's ID."""
        return "edge-1"

This demonstrates the local-first pattern: write locally, sync async, reconcile conflicts.

Case Study: Retail edge system with offline mode

Let’s build a complete example: a retail store system that works offline.

Requirements

  • Store has local inventory database
  • Store syncs with central warehouse system
  • Store must work when internet is down
  • Multiple stores can update same items
  • Need to handle conflicts (e.g., two stores try to transfer same item)

Architecture

Store Edge Device
  ├── Local SQLite database (inventory, sales)
  ├── Sync manager (periodic sync with cloud)
  └── Conflict resolver (merge conflicting updates)

Cloud Warehouse System
  ├── Central PostgreSQL database
  ├── Sync API (receives and sends updates)
  └── Conflict detection (identifies conflicts)

Implementation

import sqlite3
import asyncio
import json
from datetime import datetime
from typing import Dict, List, Optional

class StoreInventorySystem:
    """Retail store inventory with offline support."""
    
    def __init__(self, db_path: str = "store.db"):
        self.db_path = db_path
        self.init_database()
        self.sync_manager = None
    
    def init_database(self):
        """Initialize local SQLite database."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Inventory table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS inventory (
                item_id TEXT PRIMARY KEY,
                quantity INTEGER NOT NULL,
                last_updated REAL NOT NULL,
                synced BOOLEAN DEFAULT 0
            )
        """)
        
        # Sales transactions
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS sales (
                transaction_id TEXT PRIMARY KEY,
                item_id TEXT NOT NULL,
                quantity INTEGER NOT NULL,
                timestamp REAL NOT NULL,
                synced BOOLEAN DEFAULT 0
            )
        """)
        
        # Sync log for conflict resolution
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS sync_log (
                log_id INTEGER PRIMARY KEY AUTOINCREMENT,
                item_id TEXT NOT NULL,
                operation TEXT NOT NULL,
                quantity INTEGER,
                timestamp REAL NOT NULL,
                synced BOOLEAN DEFAULT 0
            )
        """)
        
        conn.commit()
        conn.close()
    
    def sell_item(self, item_id: str, quantity: int) -> bool:
        """Sell item (decrease inventory)."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Check current inventory
        cursor.execute(
            "SELECT quantity FROM inventory WHERE item_id = ?",
            (item_id,)
        )
        row = cursor.fetchone()
        
        if not row:
            conn.close()
            return False  # Item doesn't exist
        
        current_qty = row[0]
        if current_qty < quantity:
            conn.close()
            return False  # Not enough inventory
        
        # Update inventory
        new_qty = current_qty - quantity
        cursor.execute("""
            UPDATE inventory 
            SET quantity = ?, last_updated = ?, synced = 0
            WHERE item_id = ?
        """, (new_qty, time.time(), item_id))
        
        # Log sale
        transaction_id = str(uuid4())
        cursor.execute("""
            INSERT INTO sales (transaction_id, item_id, quantity, timestamp, synced)
            VALUES (?, ?, ?, ?, 0)
        """, (transaction_id, item_id, quantity, time.time()))
        
        # Log sync operation
        cursor.execute("""
            INSERT INTO sync_log (item_id, operation, quantity, timestamp, synced)
            VALUES (?, 'sell', ?, ?, 0)
        """, (item_id, -quantity, time.time()))
        
        conn.commit()
        conn.close()
        
        # Trigger sync (async, don't wait)
        if self.sync_manager:
            asyncio.create_task(self.sync_manager.sync())
        
        return True
    
    def receive_item(self, item_id: str, quantity: int):
        """Receive item from warehouse (increase inventory)."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Check if item exists
        cursor.execute(
            "SELECT quantity FROM inventory WHERE item_id = ?",
            (item_id,)
        )
        row = cursor.fetchone()
        
        if row:
            # Update existing
            new_qty = row[0] + quantity
            cursor.execute("""
                UPDATE inventory 
                SET quantity = ?, last_updated = ?, synced = 0
                WHERE item_id = ?
            """, (new_qty, time.time(), item_id))
        else:
            # Insert new
            cursor.execute("""
                INSERT INTO inventory (item_id, quantity, last_updated, synced)
                VALUES (?, ?, ?, 0)
            """, (item_id, quantity, time.time()))
        
        # Log sync operation
        cursor.execute("""
            INSERT INTO sync_log (item_id, operation, quantity, timestamp, synced)
            VALUES (?, 'receive', ?, ?, 0)
        """, (item_id, quantity, time.time()))
        
        conn.commit()
        conn.close()
        
        # Trigger sync
        if self.sync_manager:
            asyncio.create_task(self.sync_manager.sync())
    
    def get_inventory(self, item_id: str) -> Optional[int]:
        """Get current inventory level."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute(
            "SELECT quantity FROM inventory WHERE item_id = ?",
            (item_id,)
        )
        row = cursor.fetchone()
        conn.close()
        
        return row[0] if row else None
    
    def get_pending_changes(self) -> List[dict]:
        """Get all unsynced changes."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT item_id, operation, quantity, timestamp
            FROM sync_log
            WHERE synced = 0
            ORDER BY timestamp
        """)
        
        changes = []
        for row in cursor.fetchall():
            changes.append({
                "item_id": row[0],
                "operation": row[1],
                "quantity": row[2],
                "timestamp": row[3]
            })
        
        conn.close()
        return changes
    
    async def apply_cloud_updates(self, updates: List[dict]):
        """Apply updates from cloud warehouse."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        for update in updates:
            item_id = update["item_id"]
            cloud_qty = update["quantity"]
            cloud_timestamp = update["timestamp"]
            
            # Check for conflicts
            cursor.execute("""
                SELECT quantity, last_updated 
                FROM inventory 
                WHERE item_id = ? AND synced = 0
            """, (item_id,))
            
            conflict = cursor.fetchone()
            
            if conflict:
                # Local unsynced change exists - resolve conflict
                local_qty, local_timestamp = conflict
                
                # Resolution: if cloud timestamp is newer, trust cloud
                # Otherwise, merge changes (additive operations)
                if cloud_timestamp > local_timestamp:
                    # Cloud wins for timestamp, but we need to account for local changes
                    # This is simplified - real system would be more sophisticated
                    cursor.execute("""
                        UPDATE inventory 
                        SET quantity = ?, last_updated = ?, synced = 1
                        WHERE item_id = ?
                    """, (cloud_qty, cloud_timestamp, item_id))
                else:
                    # Merge: add cloud change to local state
                    # This assumes operations are additive
                    merged_qty = local_qty + (cloud_qty - local_qty)
                    cursor.execute("""
                        UPDATE inventory 
                        SET quantity = ?, last_updated = ?, synced = 0
                        WHERE item_id = ?
                    """, (merged_qty, max(local_timestamp, cloud_timestamp), item_id))
            else:
                # No conflict, apply directly
                cursor.execute("""
                    INSERT OR REPLACE INTO inventory (item_id, quantity, last_updated, synced)
                    VALUES (?, ?, ?, 1)
                """, (item_id, cloud_qty, cloud_timestamp))
        
        conn.commit()
        conn.close()

Edge store syncing with central warehouse system

The sync manager handles periodic synchronization:

class WarehouseSyncManager:
    """Manages sync between store and warehouse."""
    
    def __init__(self, store_system: StoreInventorySystem, warehouse_api):
        self.store = store_system
        self.warehouse_api = warehouse_api
        self.sync_interval = 300  # 5 minutes
        self.last_sync = 0
    
    async def start(self):
        """Start periodic sync loop."""
        while True:
            try:
                await asyncio.sleep(self.sync_interval)
                await self.sync()
            except Exception as e:
                logger.error(f"Sync error: {e}")
                await asyncio.sleep(60)  # Retry in 1 minute on error
    
    async def sync(self):
        """Perform bidirectional sync."""
        # 1. Send local changes to warehouse
        local_changes = self.store.get_pending_changes()
        
        if local_changes:
            try:
                response = await self.warehouse_api.push_changes(local_changes)
                
                # Handle conflicts if any
                if response.get("conflicts"):
                    await self.resolve_conflicts(response["conflicts"])
                
                # Mark changes as synced
                await self.mark_synced(local_changes)
                
            except Exception as e:
                # Network error - will retry next cycle
                logger.error(f"Failed to push changes: {e}")
        
        # 2. Pull updates from warehouse
        try:
            updates = await self.warehouse_api.pull_updates(
                since=self.last_sync
            )
            
            if updates:
                await self.store.apply_cloud_updates(updates)
                self.last_sync = time.time()
                
        except Exception as e:
            logger.error(f"Failed to pull updates: {e}")
    
    async def resolve_conflicts(self, conflicts: List[dict]):
        """Resolve conflicts between local and warehouse."""
        for conflict in conflicts:
            # Application-specific resolution
            # For inventory, we might merge quantities or use last-write-wins
            resolved = await self.resolve_inventory_conflict(conflict)
            
            # Apply resolved value
            conn = sqlite3.connect(self.store.db_path)
            cursor = conn.cursor()
            cursor.execute("""
                UPDATE inventory 
                SET quantity = ?, last_updated = ?, synced = 1
                WHERE item_id = ?
            """, (resolved["quantity"], resolved["timestamp"], conflict["item_id"]))
            conn.commit()
            conn.close()
    
    async def resolve_inventory_conflict(self, conflict: dict) -> dict:
        """Resolve single inventory conflict."""
        # Simplified: last-write-wins based on timestamp
        if conflict["warehouse_timestamp"] > conflict["local_timestamp"]:
            return {
                "quantity": conflict["warehouse_quantity"],
                "timestamp": conflict["warehouse_timestamp"]
            }
        else:
            return {
                "quantity": conflict["local_quantity"],
                "timestamp": conflict["local_timestamp"]
            }
    
    async def mark_synced(self, changes: List[dict]):
        """Mark changes as synced."""
        conn = sqlite3.connect(self.store.db_path)
        cursor = conn.cursor()
        
        for change in changes:
            cursor.execute("""
                UPDATE sync_log 
                SET synced = 1 
                WHERE item_id = ? AND operation = ? AND timestamp = ?
            """, (change["item_id"], change["operation"], change["timestamp"]))
        
        conn.commit()
        conn.close()

Auto-reconciliation after reconnection

When the store reconnects after being offline, it needs to reconcile:

async def reconcile_after_reconnect(self):
    """Reconcile state after network reconnection."""
    logger.info("Reconciling after reconnection...")
    
    # Get all local changes made while offline
    local_changes = self.store.get_pending_changes()
    
    if not local_changes:
        return
    
    # Send to warehouse
    try:
        response = await self.warehouse_api.push_changes(local_changes)
        
        # Warehouse will detect conflicts
        if response.get("conflicts"):
            logger.warning(f"Found {len(response['conflicts'])} conflicts")
            
            # Resolve each conflict
            for conflict in response["conflicts"]:
                resolved = await self.resolve_inventory_conflict(conflict)
                
                # Update local store with resolved value
                conn = sqlite3.connect(self.store.db_path)
                cursor = conn.cursor()
                cursor.execute("""
                    UPDATE inventory 
                    SET quantity = ?, last_updated = ?, synced = 1
                    WHERE item_id = ?
                """, (resolved["quantity"], resolved["timestamp"], conflict["item_id"]))
                conn.commit()
                conn.close()
        
        # Mark all as synced
        await self.mark_synced(local_changes)
        
        # Pull latest from warehouse
        updates = await self.warehouse_api.pull_updates(since=0)
        if updates:
            await self.store.apply_cloud_updates(updates)
        
        logger.info("Reconciliation complete")
        
    except Exception as e:
        logger.error(f"Reconciliation failed: {e}")
        # Will retry on next sync cycle

This system works offline. Sales continue even without internet. When connection returns, it reconciles automatically.

Best Practices

Here are lessons learned from building these systems.

Versioned data replication

Version all data. Every update gets a version number or timestamp. This lets you:

  • Detect conflicts (same version updated differently)
  • Order updates (newer versions win)
  • Audit changes (see history)
@dataclass
class VersionedValue:
    """Value with version for conflict detection."""
    value: any
    version: int
    timestamp: float
    node_id: str
    
    def merge(self, other: 'VersionedValue') -> 'VersionedValue':
        """Merge two versions."""
        if other.version > self.version:
            return other
        elif other.version == self.version:
            # Same version from different nodes - conflict
            if other.timestamp > self.timestamp:
                return other
        return self

Hybrid event sourcing

Event sourcing stores all changes as events. Instead of updating state, you append events. To get current state, you replay events.

For edge systems, this works well:

  • Edge stores events locally
  • Events sync to cloud
  • Cloud replays events to build state
  • Conflicts show up as diverging event streams
class EventStore:
    """Simple event store for edge systems."""
    
    def __init__(self):
        self.events = []
    
    def append(self, event: dict):
        """Append an event."""
        event["event_id"] = str(uuid4())
        event["timestamp"] = time.time()
        self.events.append(event)
    
    def get_state(self) -> dict:
        """Replay events to get current state."""
        state = {}
        for event in self.events:
            self.apply_event(state, event)
        return state
    
    def apply_event(self, state: dict, event: dict):
        """Apply a single event to state."""
        # Application-specific logic
        if event["type"] == "inventory_update":
            state[event["item_id"]] = event["quantity"]

Designing for sync failure

Assume sync will fail. Network will drop. Cloud will be down. Design for it.

  • Local-first: Always work locally, sync is secondary
  • Retry logic: Exponential backoff, don’t spam cloud
  • Conflict handling: Expect conflicts, have resolution strategies
  • Partial sync: If full sync fails, sync what you can
  • Monitoring: Track sync status, alert on failures
class ResilientSyncManager:
    """Sync manager with failure handling."""
    
    async def sync_with_retry(self, max_retries=3):
        """Sync with exponential backoff retry."""
        for attempt in range(max_retries):
            try:
                await self.sync()
                return  # Success
            except Exception as e:
                if attempt == max_retries - 1:
                    raise  # Final attempt failed
                
                # Exponential backoff
                wait_time = 2 ** attempt
                logger.warning(f"Sync failed, retrying in {wait_time}s: {e}")
                await asyncio.sleep(wait_time)

Sync failures are normal. Your system should handle them gracefully.

Conclusion

Hybrid cloud-edge architectures are becoming the default. Applications need low latency and offline support. That means data lives in multiple places. Consistency becomes a design challenge, not a given.

The future is about balancing latency, cost, and correctness. You can’t have all three perfectly. But you can choose the right tradeoffs for your use case.

For real-time systems like drones, choose availability over consistency. Let the edge make decisions locally, sync when possible.

For financial systems, choose consistency over availability. Don’t process transactions offline if conflicts could cause problems.

For most applications, eventual consistency works. Accept temporary disagreements, resolve conflicts when syncing.

The patterns we covered — CRDTs, CQRS, event sourcing, conflict resolution — are tools. Use them where they fit. Don’t force a pattern because it’s popular. Choose based on your requirements.

The key is designing for partitions from the start. Assume the network will fail. Assume edge and cloud will disagree. Build reconciliation into your architecture, not as an afterthought.

Start simple. Use last-write-wins if it works for your case. Add vector clocks if you need causal ordering. Use CRDTs if your data structures support it. Build application-specific resolvers when you need fine control.

The systems that succeed are the ones that handle real-world constraints. Network partitions happen. Data drifts. Connections drop. Your architecture needs to work despite these challenges, not pretend they don’t exist.

Discussion

Join the conversation and share your thoughts

Discussion

0 / 5000