Eventing Model
To implement a write-through model where events are triggered on each change to your entities (such as when a board, lane, or card is created, updated, or deleted), you can use an event-driven architecture. Here’s a step-by-step approach:
1. Define the Event Model
Create an Event table to log changes. This can be a generic model that stores details about what entity was changed, the type of change, and the timestamp.
from sqlalchemy import Column, String, DateTime, JSON, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from app.database import Base
import uuid
from datetime import datetime
class Event(Base):
__tablename__ = "events"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
entity_type = Column(String, nullable=False) # e.g., "Board", "Lane", "Card"
entity_id = Column(UUID(as_uuid=True), nullable=False) # The ID of the changed entity
event_type = Column(String, nullable=False) # e.g., "CREATE", "UPDATE", "DELETE"
data = Column(JSON, nullable=True) # Optional: the new or old state of the entity
created_at = Column(DateTime, default=datetime.utcnow)
2. Emit Events on Changes
You can use SQLAlchemy's ORM hooks or manually trigger event creation within your CRUD operations.
Example: Add Events in CRUD Operations
Update your CRUD methods to log changes:
from sqlalchemy.orm import Session
from app.models import Event, Lane, Board, Card
# Emit an event
def emit_event(db: Session, entity_type: str, entity_id: str, event_type: str, data: dict = None):
event = Event(
entity_type=entity_type,
entity_id=entity_id,
event_type=event_type,
data=data
)
db.add(event)
db.commit()
# Example: Create Lane with Event Emission
def create_lane(db: Session, board_id: str, lane_type: str, lane_data: dict):
lane = Lane(board_id=board_id, type=lane_type, data=lane_data)
db.add(lane)
db.commit()
db.refresh(lane)
# Emit CREATE event for the lane
emit_event(
db=db,
entity_type="Lane",
entity_id=str(lane.id),
event_type="CREATE",
data={"type": lane_type, "data": lane_data, "board_id": board_id}
)
return lane
# Example: Update Lane
def update_lane(db: Session, lane_id: str, lane_type: str, lane_data: dict):
lane = db.query(Lane).filter(Lane.id == lane_id).first()
if not lane:
raise ValueError(f"Lane with ID {lane_id} not found")
old_data = {"type": lane.type, "data": lane.data}
lane.type = lane_type
lane.data = lane_data
db.commit()
db.refresh(lane)
# Emit UPDATE event for the lane
emit_event(
db=db,
entity_type="Lane",
entity_id=str(lane.id),
event_type="UPDATE",
data={"old_data": old_data, "new_data": {"type": lane_type, "data": lane_data}}
)
return lane
Example: Create Generic Utility
If you have many entities (e.g., Board, Lane, Card), you can generalise the event logging using a decorator.
from functools import wraps
def log_event(entity_type):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
db = kwargs.get("db")
result = func(*args, **kwargs)
# Extract details
entity_id = str(result.id)
event_type = "CREATE" if func.__name__.startswith("create") else "UPDATE"
emit_event(
db=db,
entity_type=entity_type,
entity_id=entity_id,
event_type=event_type,
data=result.__dict__ # Log the full entity state
)
return result
return wrapper
return decorator
@log_event("Lane")
def create_lane(db: Session, board_id: str, lane_type: str, lane_data: dict):
lane = Lane(board_id=board_id, type=lane_type, data=lane_data)
db.add(lane)
db.commit()
db.refresh(lane)
return lane
3. Use SQLAlchemy Events (Optional)
You can also use SQLAlchemy ORM events to log changes automatically for all models.
Attach Events to SQLAlchemy Mapper
Attach event listeners to the before_insert, before_update, and before_delete events.
from sqlalchemy.event import listens_for
from app.models import Lane, Board, Card
@listens_for(Lane, "after_insert")
def log_lane_creation(mapper, connection, target):
# Log a CREATE event for Lane
event = Event(
entity_type="Lane",
entity_id=str(target.id),
event_type="CREATE",
data={"type": target.type, "data": target.data, "board_id": target.board_id},
created_at=datetime.utcnow()
)
connection.execute(Event.__table__.insert().values(event.__dict__))
@listens_for(Lane, "after_update")
def log_lane_update(mapper, connection, target):
# Log an UPDATE event for Lane
event = Event(
entity_type="Lane",
entity_id=str(target.id),
event_type="UPDATE",
data={"type": target.type, "data": target.data, "board_id": target.board_id},
created_at=datetime.utcnow()
)
connection.execute(Event.__table__.insert().values(event.__dict__))
4. Consume Events
You can use the Event table for auditing, or export events to a message broker (e.g., RabbitMQ, Kafka) for real-time processing.
Example: Event Processor
def process_events(db: Session):
events = db.query(Event).filter(Event.processed == False).all()
for event in events:
print(f"Processing {event.event_type} event for {event.entity_type} with ID {event.entity_id}")
# Mark event as processed
event.processed = True
db.commit()
5. Test the Implementation
Write tests to verify the event log entries. For example:
def test_create_lane_emits_event(db):
lane = create_lane(db, board_id="test-board-id", lane_type="review", lane_data={"reviewers": ["Alice"]})
event = db.query(Event).filter_by(entity_id=str(lane.id), event_type="CREATE").first()
assert event is not None
assert event.entity_type == "Lane"
assert event.data["type"] == "review"
Summary
Using the write-through model, you can: 1. Log Events: Emit events in CRUD operations or via SQLAlchemy hooks. 2. Event Table: Use a dedicated table to store events for auditing or processing. 3. Process Events: Integrate with external systems for real-time updates.
This approach ensures that every change in your system is logged and traceable.