Skip to content

Eventing Model

To implement a write-through model where events are triggered on each change to your entities (such as when a board, lane, or card is created, updated, or deleted), you can use an event-driven architecture. Here’s a step-by-step approach:


1. Define the Event Model

Create an Event table to log changes. This can be a generic model that stores details about what entity was changed, the type of change, and the timestamp.

from sqlalchemy import Column, String, DateTime, JSON, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from app.database import Base
import uuid
from datetime import datetime

class Event(Base):
    __tablename__ = "events"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    entity_type = Column(String, nullable=False)  # e.g., "Board", "Lane", "Card"
    entity_id = Column(UUID(as_uuid=True), nullable=False)  # The ID of the changed entity
    event_type = Column(String, nullable=False)  # e.g., "CREATE", "UPDATE", "DELETE"
    data = Column(JSON, nullable=True)  # Optional: the new or old state of the entity
    created_at = Column(DateTime, default=datetime.utcnow)

2. Emit Events on Changes

You can use SQLAlchemy's ORM hooks or manually trigger event creation within your CRUD operations.

Example: Add Events in CRUD Operations

Update your CRUD methods to log changes:

from sqlalchemy.orm import Session
from app.models import Event, Lane, Board, Card

# Emit an event
def emit_event(db: Session, entity_type: str, entity_id: str, event_type: str, data: dict = None):
    event = Event(
        entity_type=entity_type,
        entity_id=entity_id,
        event_type=event_type,
        data=data
    )
    db.add(event)
    db.commit()

# Example: Create Lane with Event Emission
def create_lane(db: Session, board_id: str, lane_type: str, lane_data: dict):
    lane = Lane(board_id=board_id, type=lane_type, data=lane_data)
    db.add(lane)
    db.commit()
    db.refresh(lane)

    # Emit CREATE event for the lane
    emit_event(
        db=db,
        entity_type="Lane",
        entity_id=str(lane.id),
        event_type="CREATE",
        data={"type": lane_type, "data": lane_data, "board_id": board_id}
    )

    return lane

# Example: Update Lane
def update_lane(db: Session, lane_id: str, lane_type: str, lane_data: dict):
    lane = db.query(Lane).filter(Lane.id == lane_id).first()
    if not lane:
        raise ValueError(f"Lane with ID {lane_id} not found")

    old_data = {"type": lane.type, "data": lane.data}
    lane.type = lane_type
    lane.data = lane_data
    db.commit()
    db.refresh(lane)

    # Emit UPDATE event for the lane
    emit_event(
        db=db,
        entity_type="Lane",
        entity_id=str(lane.id),
        event_type="UPDATE",
        data={"old_data": old_data, "new_data": {"type": lane_type, "data": lane_data}}
    )

    return lane

Example: Create Generic Utility

If you have many entities (e.g., Board, Lane, Card), you can generalise the event logging using a decorator.

from functools import wraps

def log_event(entity_type):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            db = kwargs.get("db")
            result = func(*args, **kwargs)

            # Extract details
            entity_id = str(result.id)
            event_type = "CREATE" if func.__name__.startswith("create") else "UPDATE"

            emit_event(
                db=db,
                entity_type=entity_type,
                entity_id=entity_id,
                event_type=event_type,
                data=result.__dict__  # Log the full entity state
            )

            return result
        return wrapper
    return decorator

@log_event("Lane")
def create_lane(db: Session, board_id: str, lane_type: str, lane_data: dict):
    lane = Lane(board_id=board_id, type=lane_type, data=lane_data)
    db.add(lane)
    db.commit()
    db.refresh(lane)
    return lane

3. Use SQLAlchemy Events (Optional)

You can also use SQLAlchemy ORM events to log changes automatically for all models.

Attach Events to SQLAlchemy Mapper

Attach event listeners to the before_insert, before_update, and before_delete events.

from sqlalchemy.event import listens_for
from app.models import Lane, Board, Card

@listens_for(Lane, "after_insert")
def log_lane_creation(mapper, connection, target):
    # Log a CREATE event for Lane
    event = Event(
        entity_type="Lane",
        entity_id=str(target.id),
        event_type="CREATE",
        data={"type": target.type, "data": target.data, "board_id": target.board_id},
        created_at=datetime.utcnow()
    )
    connection.execute(Event.__table__.insert().values(event.__dict__))

@listens_for(Lane, "after_update")
def log_lane_update(mapper, connection, target):
    # Log an UPDATE event for Lane
    event = Event(
        entity_type="Lane",
        entity_id=str(target.id),
        event_type="UPDATE",
        data={"type": target.type, "data": target.data, "board_id": target.board_id},
        created_at=datetime.utcnow()
    )
    connection.execute(Event.__table__.insert().values(event.__dict__))

4. Consume Events

You can use the Event table for auditing, or export events to a message broker (e.g., RabbitMQ, Kafka) for real-time processing.

Example: Event Processor

def process_events(db: Session):
    events = db.query(Event).filter(Event.processed == False).all()
    for event in events:
        print(f"Processing {event.event_type} event for {event.entity_type} with ID {event.entity_id}")
        # Mark event as processed
        event.processed = True
    db.commit()

5. Test the Implementation

Write tests to verify the event log entries. For example:

def test_create_lane_emits_event(db):
    lane = create_lane(db, board_id="test-board-id", lane_type="review", lane_data={"reviewers": ["Alice"]})

    event = db.query(Event).filter_by(entity_id=str(lane.id), event_type="CREATE").first()
    assert event is not None
    assert event.entity_type == "Lane"
    assert event.data["type"] == "review"

Summary

Using the write-through model, you can: 1. Log Events: Emit events in CRUD operations or via SQLAlchemy hooks. 2. Event Table: Use a dedicated table to store events for auditing or processing. 3. Process Events: Integrate with external systems for real-time updates.

This approach ensures that every change in your system is logged and traceable.