from typing import Optional, List
class DataProcessor:
"""
Processes data for agent consumption.
Attributes:
batch_size: Size of data batches to process
max_items: Maximum number of items to process
"""
def __init__(self, batch_size: int = 100, max_items: Optional[int] = None):
self.batch_size = batch_size
self.max_items = max_items
def process_batch(self, items: List[dict]) -> List[dict]:
"""
Process a batch of items.
Args:
items: List of items to process
Returns:
List of processed items
"""
return [self._process_item(item) for item in items]
from agents.exceptions import AgentError
class AgentService:
async def create_agent(self, data: AgentCreate) -> AgentModel:
try:
# Attempt to create agent
agent = await AgentModel.create(**data.dict())
return agent
except Exception as e:
raise AgentError(f"Failed to create agent: {str(e)}")
Database Management
Migrations
Use Alembic for database migrations:
# Create a new migration
alembic revision --autogenerate -m "Add agent table"
# Run migrations
alembic upgrade head
Models
Define SQLAlchemy models:
from sqlalchemy import Column, String, Integer
from agents.models.base import Base
class Agent(Base):
__tablename__ = "agents"
id = Column(String, primary_key=True)
name = Column(String, nullable=False)
description = Column(String)
mode = Column(String, nullable=False)
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
@router.get("/agent/{agent_id}")
@cache(expire=60)
async def get_agent(agent_id: str):
return await agent_service.get_agent(agent_id)
Security Best Practices
Input validation:
from pydantic import BaseModel, validator
class UserInput(BaseModel):
name: str
email: str
@validator('email')
def validate_email(cls, v):
if not '@' in v:
raise ValueError('Invalid email')
return v
Rate limiting:
from fastapi import Depends
from agents.middleware.rate_limit import rate_limit
@router.post("/agent/create")
@rate_limit(max_requests=10, window_seconds=60)
async def create_agent(data: AgentCreate):
return await agent_service.create_agent(data)