This lab provides an in-depth exploration of MCP server architecture patterns, database design principles, and the technical implementation strategies that power robust, scalable database-integrated AI applications.
Building a production-ready MCP server with database integration requires careful architectural decisions.
This lab breaks down the key components, design patterns, and technical considerations that make our Zava Retail analytics solution robust, secure, and scalable.
You'll understand how each layer interacts, why specific technologies were chosen, and how to apply these patterns to your own MCP implementations.
By the end of this lab, you will be able to:
Our MCP server implements a layered architecture that separates concerns and promotes maintainability:
Responsibility: Handle MCP protocol communication and message routing
# FastMCP server setup
from fastmcp import FastMCP
mcp = FastMCP("Zava Retail Analytics")
# Tool registration with type safety
@mcp.tool()
async def execute_sales_query(
ctx: Context,
postgresql_query: Annotated[str, Field(description="Well-formed PostgreSQL query")]
) -> str:
"""Execute PostgreSQL queries with Row Level Security."""
return await query_executor.execute(postgresql_query, ctx)
Key Features:
Responsibility: Implement business rules and coordinate between protocol and data layers
class SalesAnalyticsService:
"""Business logic for retail analytics operations."""
async def get_store_performance(
self,
store_id: str,
time_period: str
) -> Dict[str, Any]:
"""Calculate store performance metrics."""
# Validate business rules
if not self._validate_store_access(store_id):
raise UnauthorizedError("Access denied for store")
# Coordinate data retrieval
sales_data = await self.db_provider.get_sales_data(store_id, time_period)
metrics = self._calculate_metrics(sales_data)
return {
"store_id": store_id,
"period": time_period,
"metrics": metrics,
"insights": self._generate_insights(metrics)
}
Key Features:
Responsibility: Manage database connections, query execution, and data mapping
class PostgreSQLProvider:
"""Data access layer for PostgreSQL operations."""
def __init__(self, connection_config: Dict[str, Any]):
self.connection_pool: Optional[Pool] = None
self.config = connection_config
async def execute_query(
self,
query: str,
rls_user_id: str
) -> List[Dict[str, Any]]:
"""Execute query with RLS context."""
async with self.connection_pool.acquire() as conn:
# Set RLS context
await conn.execute(
"SELECT set_config('app.current_rls_user_id', $1, false)",
rls_user_id
)
# Execute query with timeout
try:
rows = await asyncio.wait_for(
conn.fetch(query),
timeout=30.0
)
return [dict(row) for row in rows]
except asyncio.TimeoutError:
raise QueryTimeoutError("Query execution exceeded timeout")
Key Features:
Responsibility: Handle cross-cutting concerns like logging, monitoring, and configuration
class InfrastructureManager:
"""Infrastructure concerns management."""
def __init__(self):
self.logger = self._setup_logging()
self.metrics = self._setup_metrics()
self.config = self._load_configuration()
def _setup_logging(self) -> Logger:
"""Configure structured logging."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('mcp_server.log')
]
)
return logging.getLogger(__name__)
async def track_query_execution(
self,
query_type: str,
duration: float,
success: bool
):
"""Track query performance metrics."""
self.metrics.counter('query_total').labels(
type=query_type,
status='success' if success else 'error'
).inc()
self.metrics.histogram('query_duration').labels(
type=query_type
).observe(duration)
Our PostgreSQL schema implements several key patterns for multi-tenant MCP applications:
-- Core retail entities with store-based partitioning
CREATE TABLE retail.stores (
store_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
location VARCHAR(200) NOT NULL,
manager_id UUID NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE retail.customers (
customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
store_id UUID REFERENCES retail.stores(store_id),
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE retail.orders (
order_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID REFERENCES retail.customers(customer_id),
store_id UUID REFERENCES retail.stores(store_id),
order_date TIMESTAMP DEFAULT NOW(),
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) DEFAULT 'pending'
);
Design Principles:
-- Enable RLS on multi-tenant tables
ALTER TABLE retail.customers ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.order_items ENABLE ROW LEVEL SECURITY;
-- Store manager can only see their store's data
CREATE POLICY store_manager_customers ON retail.customers
FOR ALL TO store_managers
USING (store_id = get_current_user_store());
CREATE POLICY store_manager_orders ON retail.orders
FOR ALL TO store_managers
USING (store_id = get_current_user_store());
-- Regional managers see multiple stores
CREATE POLICY regional_manager_orders ON retail.orders
FOR ALL TO regional_managers
USING (store_id = ANY(get_user_store_list()));
-- Support function for RLS context
CREATE OR REPLACE FUNCTION get_current_user_store()
RETURNS UUID AS $$
BEGIN
RETURN current_setting('app.current_rls_user_id')::UUID;
EXCEPTION WHEN OTHERS THEN
RETURN '00000000-0000-0000-0000-000000000000'::UUID;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
RLS Benefits:
-- Product embeddings for semantic search
CREATE TABLE retail.product_description_embeddings (
product_id UUID PRIMARY KEY REFERENCES retail.products(product_id),
description_embedding vector(1536),
last_updated TIMESTAMP DEFAULT NOW()
);
-- Optimize vector similarity search
CREATE INDEX idx_product_embeddings_vector
ON retail.product_description_embeddings
USING ivfflat (description_embedding vector_cosine_ops);
-- Semantic search function
CREATE OR REPLACE FUNCTION search_products_by_description(
query_embedding vector(1536),
similarity_threshold FLOAT DEFAULT 0.7,
max_results INTEGER DEFAULT 20
)
RETURNS TABLE(
product_id UUID,
name VARCHAR,
description TEXT,
similarity_score FLOAT
) AS $$
BEGIN
RETURN QUERY
SELECT
p.product_id,
p.name,
p.description,
(1 - (pde.description_embedding <=> query_embedding)) AS similarity_score
FROM retail.products p
JOIN retail.product_description_embeddings pde ON p.product_id = pde.product_id
WHERE (pde.description_embedding <=> query_embedding) <= (1 - similarity_threshold)
ORDER BY similarity_score DESC
LIMIT max_results;
END;
$$ LANGUAGE plpgsql;
Efficient database connection management is critical for MCP server performance:
class ConnectionPoolManager:
"""Manages PostgreSQL connection pools."""
async def create_pool(self) -> Pool:
"""Create optimized connection pool."""
return await asyncpg.create_pool(
host=self.config.db_host,
port=self.config.db_port,
database=self.config.db_name,
user=self.config.db_user,
password=self.config.db_password,
# Pool configuration
min_size=2, # Minimum connections
max_size=10, # Maximum connections
max_inactive_connection_lifetime=300, # 5 minutes
# Query configuration
command_timeout=30, # Query timeout
server_settings={
"application_name": "zava-mcp-server",
"jit": "off", # Disable JIT for stability
"work_mem": "4MB", # Limit work memory
"statement_timeout": "30s"
}
)
async def execute_with_retry(
self,
query: str,
params: Tuple = None,
max_retries: int = 3
) -> List[Dict[str, Any]]:
"""Execute query with automatic retry logic."""
for attempt in range(max_retries):
try:
async with self.pool.acquire() as conn:
if params:
rows = await conn.fetch(query, *params)
else:
rows = await conn.fetch(query)
return [dict(row) for row in rows]
except (ConnectionError, InterfaceError) as e:
if attempt == max_retries - 1:
raise
# Exponential backoff
await asyncio.sleep(2 ** attempt)
logger.warning(f"Database connection failed, retrying ({attempt + 1}/{max_retries})")
class MCPServerManager:
"""Manages MCP server lifecycle and resources."""
async def startup(self):
"""Initialize server resources."""
# Create database connection pool
self.db_pool = await self.pool_manager.create_pool()
# Initialize AI services
self.ai_client = await self.create_ai_client()
# Setup monitoring
self.metrics_collector = MetricsCollector()
logger.info("MCP server startup complete")
async def shutdown(self):
"""Cleanup server resources."""
try:
# Close database connections
if self.db_pool:
await self.db_pool.close()
# Cleanup AI client
if self.ai_client:
await self.ai_client.close()
# Flush metrics
await self.metrics_collector.flush()
logger.info("MCP server shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
async def health_check(self) -> Dict[str, str]:
"""Verify server health status."""
status = {}
# Check database connection
try:
async with self.db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
status["database"] = "healthy"
except Exception as e:
status["database"] = f"unhealthy: {e}"
# Check AI service
try:
await self.ai_client.health_check()
status["ai_service"] = "healthy"
except Exception as e:
status["ai_service"] = f"unhealthy: {e}"
return status
Robust error handling ensures reliable MCP server operation:
class MCPError(Exception):
"""Base MCP server error."""
def __init__(self, message: str, error_code: str = "MCP_ERROR"):
self.message = message
self.error_code = error_code
super().__init__(message)
class DatabaseError(MCPError):
"""Database operation errors."""
def __init__(self, message: str, query: str = None):
super().__init__(message, "DATABASE_ERROR")
self.query = query
class AuthorizationError(MCPError):
"""Access control errors."""
def __init__(self, message: str, user_id: str = None):
super().__init__(message, "AUTHORIZATION_ERROR")
self.user_id = user_id
class QueryTimeoutError(DatabaseError):
"""Query execution timeout."""
def __init__(self, query: str):
super().__init__(f"Query timeout: {query[:100]}...", query)
self.error_code = "QUERY_TIMEOUT"
class ValidationError(MCPError):
"""Input validation errors."""
def __init__(self, field: str, value: Any, constraint: str):
message = f"Validation failed for {field}: {constraint}"
super().__init__(message, "VALIDATION_ERROR")
self.field = field
self.value = value
@contextmanager
async def error_handling_context(operation_name: str, user_id: str = None):
"""Centralized error handling for operations."""
start_time = time.time()
try:
yield
# Success metrics
duration = time.time() - start_time
metrics.operation_success.labels(operation=operation_name).inc()
metrics.operation_duration.labels(operation=operation_name).observe(duration)
except ValidationError as e:
logger.warning(f"Validation error in {operation_name}: {e.message}", extra={
"operation": operation_name,
"user_id": user_id,
"error_type": "validation",
"field": e.field
})
metrics.operation_error.labels(operation=operation_name, type="validation").inc()
raise
except AuthorizationError as e:
logger.warning(f"Authorization error in {operation_name}: {e.message}", extra={
"operation": operation_name,
"user_id": user_id,
"error_type": "authorization"
})
metrics.operation_error.labels(operation=operation_name, type="authorization").inc()
raise
except DatabaseError as e:
logger.error(f"Database error in {operation_name}: {e.message}", extra={
"operation": operation_name,
"user_id": user_id,
"error_type": "database",
"query": e.query[:100] if e.query else None
})
metrics.operation_error.labels(operation=operation_name, type="database").inc()
raise
except Exception as e:
logger.error(f"Unexpected error in {operation_name}: {str(e)}", extra={
"operation": operation_name,
"user_id": user_id,
"error_type": "unexpected"
}, exc_info=True)
metrics.operation_error.labels(operation=operation_name, type="unexpected").inc()
raise MCPError(f"Internal server error in {operation_name}")
class QueryPerformanceMonitor:
"""Monitor and optimize query performance."""
def __init__(self):
self.slow_query_threshold = 1.0 # seconds
self.query_stats = defaultdict(list)
@contextmanager
async def monitor_query(self, query: str, operation_type: str = "unknown"):
"""Monitor query execution time and performance."""
start_time = time.time()
query_hash = hashlib.md5(query.encode()).hexdigest()[:8]
try:
yield
duration = time.time() - start_time
# Record performance metrics
self.query_stats[operation_type].append(duration)
# Log slow queries
if duration > self.slow_query_threshold:
logger.warning(f"Slow query detected", extra={
"query_hash": query_hash,
"duration": duration,
"operation_type": operation_type,
"query": query[:200]
})
# Update metrics
metrics.query_duration.labels(type=operation_type).observe(duration)
except Exception as e:
duration = time.time() - start_time
logger.error(f"Query failed", extra={
"query_hash": query_hash,
"duration": duration,
"operation_type": operation_type,
"error": str(e)
})
raise
def get_performance_summary(self) -> Dict[str, Any]:
"""Generate performance summary report."""
summary = {}
for operation_type, durations in self.query_stats.items():
if durations:
summary[operation_type] = {
"count": len(durations),
"avg_duration": sum(durations) / len(durations),
"max_duration": max(durations),
"min_duration": min(durations),
"slow_queries": len([d for d in durations if d > self.slow_query_threshold])
}
return summary
class QueryCache:
"""Intelligent query result caching."""
def __init__(self, redis_url: str = None):
self.cache = {} # In-memory fallback
self.redis_client = redis.Redis.from_url(redis_url) if redis_url else None
self.cache_ttl = 300 # 5 minutes default
async def get_cached_result(
self,
cache_key: str,
query_func: Callable,
ttl: int = None
) -> Any:
"""Get result from cache or execute query."""
ttl = ttl or self.cache_ttl
# Try cache first
cached_result = await self._get_from_cache(cache_key)
if cached_result is not None:
metrics.cache_hit.labels(type="query").inc()
return cached_result
# Execute query
metrics.cache_miss.labels(type="query").inc()
result = await query_func()
# Cache result
await self._set_in_cache(cache_key, result, ttl)
return result
def _generate_cache_key(self, query: str, user_context: str) -> str:
"""Generate consistent cache key."""
key_data = f"{query}:{user_context}"
return hashlib.sha256(key_data.encode()).hexdigest()
After completing this lab, you should understand:
β Layered Architecture: How to separate concerns in MCP server design
β Database Patterns: Multi-tenant schema design and RLS implementation
β Connection Management: Efficient pooling and resource lifecycle
β Error Handling: Hierarchical error types and resilience patterns
β Performance Optimization: Monitoring, caching, and query optimization
β Production Readiness: Infrastructure concerns and operational patterns
Continue with Lab 02: Security and Multi-Tenancy to dive deep into:
---
Next: Ready to explore security patterns? Continue with Lab 02: Security and Multi-Tenancy
μ΄ μ€μ΅μ MCP μλ² μν€ν μ² ν¨ν΄, λ°μ΄ν°λ² μ΄μ€ μ€κ³ μμΉ, κ·Έλ¦¬κ³ κ²¬κ³ νκ³ νμ₯ κ°λ₯ν λ°μ΄ν°λ² μ΄μ€ ν΅ν© AI μ ν리μΌμ΄μ μ ꡬννλ κΈ°μ μ μ λ΅μ λν μ¬μΈ΅μ μΈ νꡬλ₯Ό μ 곡ν©λλ€.
λ°μ΄ν°λ² μ΄μ€ ν΅ν©μ ν¬ν¨ν νλ‘λμ μ€λΉ MCP μλ²λ₯Ό ꡬμΆνλ €λ©΄ μ μ€ν μν€ν μ² κ²°μ μ΄ νμν©λλ€. μ΄ μ€μ΅μμλ Zava Retail λΆμ μ루μ μ κ²¬κ³ νκ³ μμ νλ©° νμ₯ κ°λ₯νκ² λ§λλ ν΅μ¬ κ΅¬μ± μμ, μ€κ³ ν¨ν΄, κΈ°μ μ κ³ λ € μ¬νμ λΆν΄νμ¬ μ€λͺ ν©λλ€.
κ° κ³μΈ΅μ΄ μ΄λ»κ² μνΈμμ©νλμ§, νΉμ κΈ°μ μ΄ μ μ νλμλμ§, κ·Έλ¦¬κ³ μ΄λ¬ν ν¨ν΄μ μμ μ MCP ꡬνμ μ΄λ»κ² μ μ©ν μ μλμ§λ₯Ό μ΄ν΄νκ² λ κ²μ λλ€.
μ΄ μ€μ΅μ μλ£νλ©΄ λ€μμ μνν μ μμ΅λλ€:
μ°λ¦¬μ MCP μλ²λ κ³μΈ΅νλ μν€ν μ²λ₯Ό ꡬννμ¬ κ΄μ¬μ¬λ₯Ό λΆλ¦¬νκ³ μ μ§λ³΄μλ₯Ό μ΄μ§ν©λλ€:
μ± μ: MCP νλ‘ν μ½ ν΅μ λ° λ©μμ§ λΌμ°ν μ²λ¦¬
# FastMCP server setup
from fastmcp import FastMCP
mcp = FastMCP("Zava Retail Analytics")
# Tool registration with type safety
@mcp.tool()
async def execute_sales_query(
ctx: Context,
postgresql_query: Annotated[str, Field(description="Well-formed PostgreSQL query")]
) -> str:
"""Execute PostgreSQL queries with Row Level Security."""
return await query_executor.execute(postgresql_query, ctx)
μ£Όμ κΈ°λ₯:
μ± μ: λΉμ¦λμ€ κ·μΉ ꡬν λ° νλ‘ν μ½κ³Ό λ°μ΄ν° κ³μΈ΅ κ° μ‘°μ
class SalesAnalyticsService:
"""Business logic for retail analytics operations."""
async def get_store_performance(
self,
store_id: str,
time_period: str
) -> Dict[str, Any]:
"""Calculate store performance metrics."""
# Validate business rules
if not self._validate_store_access(store_id):
raise UnauthorizedError("Access denied for store")
# Coordinate data retrieval
sales_data = await self.db_provider.get_sales_data(store_id, time_period)
metrics = self._calculate_metrics(sales_data)
return {
"store_id": store_id,
"period": time_period,
"metrics": metrics,
"insights": self._generate_insights(metrics)
}
μ£Όμ κΈ°λ₯:
μ± μ: λ°μ΄ν°λ² μ΄μ€ μ°κ²° κ΄λ¦¬, 쿼리 μ€ν λ° λ°μ΄ν° λ§€ν
class PostgreSQLProvider:
"""Data access layer for PostgreSQL operations."""
def __init__(self, connection_config: Dict[str, Any]):
self.connection_pool: Optional[Pool] = None
self.config = connection_config
async def execute_query(
self,
query: str,
rls_user_id: str
) -> List[Dict[str, Any]]:
"""Execute query with RLS context."""
async with self.connection_pool.acquire() as conn:
# Set RLS context
await conn.execute(
"SELECT set_config('app.current_rls_user_id', $1, false)",
rls_user_id
)
# Execute query with timeout
try:
rows = await asyncio.wait_for(
conn.fetch(query),
timeout=30.0
)
return [dict(row) for row in rows]
except asyncio.TimeoutError:
raise QueryTimeoutError("Query execution exceeded timeout")
μ£Όμ κΈ°λ₯:
μ± μ: λ‘κΉ , λͺ¨λν°λ§, ꡬμ±κ³Ό κ°μ ν‘λ¨ κ΄μ¬μ¬ μ²λ¦¬
class InfrastructureManager:
"""Infrastructure concerns management."""
def __init__(self):
self.logger = self._setup_logging()
self.metrics = self._setup_metrics()
self.config = self._load_configuration()
def _setup_logging(self) -> Logger:
"""Configure structured logging."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('mcp_server.log')
]
)
return logging.getLogger(__name__)
async def track_query_execution(
self,
query_type: str,
duration: float,
success: bool
):
"""Track query performance metrics."""
self.metrics.counter('query_total').labels(
type=query_type,
status='success' if success else 'error'
).inc()
self.metrics.histogram('query_duration').labels(
type=query_type
).observe(duration)
μ°λ¦¬μ PostgreSQL μ€ν€λ§λ λ€μ€ ν λνΈ MCP μ ν리μΌμ΄μ μ μν λͺ κ°μ§ μ£Όμ ν¨ν΄μ ꡬνν©λλ€:
-- Core retail entities with store-based partitioning
CREATE TABLE retail.stores (
store_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
location VARCHAR(200) NOT NULL,
manager_id UUID NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE retail.customers (
customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
store_id UUID REFERENCES retail.stores(store_id),
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE retail.orders (
order_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID REFERENCES retail.customers(customer_id),
store_id UUID REFERENCES retail.stores(store_id),
order_date TIMESTAMP DEFAULT NOW(),
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) DEFAULT 'pending'
);
μ€κ³ μμΉ:
-- Enable RLS on multi-tenant tables
ALTER TABLE retail.customers ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.order_items ENABLE ROW LEVEL SECURITY;
-- Store manager can only see their store's data
CREATE POLICY store_manager_customers ON retail.customers
FOR ALL TO store_managers
USING (store_id = get_current_user_store());
CREATE POLICY store_manager_orders ON retail.orders
FOR ALL TO store_managers
USING (store_id = get_current_user_store());
-- Regional managers see multiple stores
CREATE POLICY regional_manager_orders ON retail.orders
FOR ALL TO regional_managers
USING (store_id = ANY(get_user_store_list()));
-- Support function for RLS context
CREATE OR REPLACE FUNCTION get_current_user_store()
RETURNS UUID AS $$
BEGIN
RETURN current_setting('app.current_rls_user_id')::UUID;
EXCEPTION WHEN OTHERS THEN
RETURN '00000000-0000-0000-0000-000000000000'::UUID;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
RLSμ μ΄μ :
-- Product embeddings for semantic search
CREATE TABLE retail.product_description_embeddings (
product_id UUID PRIMARY KEY REFERENCES retail.products(product_id),
description_embedding vector(1536),
last_updated TIMESTAMP DEFAULT NOW()
);
-- Optimize vector similarity search
CREATE INDEX idx_product_embeddings_vector
ON retail.product_description_embeddings
USING ivfflat (description_embedding vector_cosine_ops);
-- Semantic search function
CREATE OR REPLACE FUNCTION search_products_by_description(
query_embedding vector(1536),
similarity_threshold FLOAT DEFAULT 0.7,
max_results INTEGER DEFAULT 20
)
RETURNS TABLE(
product_id UUID,
name VARCHAR,
description TEXT,
similarity_score FLOAT
) AS $$
BEGIN
RETURN QUERY
SELECT
p.product_id,
p.name,
p.description,
(1 - (pde.description_embedding <=> query_embedding)) AS similarity_score
FROM retail.products p
JOIN retail.product_description_embeddings pde ON p.product_id = pde.product_id
WHERE (pde.description_embedding <=> query_embedding) <= (1 - similarity_threshold)
ORDER BY similarity_score DESC
LIMIT max_results;
END;
$$ LANGUAGE plpgsql;
ν¨μ¨μ μΈ λ°μ΄ν°λ² μ΄μ€ μ°κ²° κ΄λ¦¬λ MCP μλ² μ±λ₯μ μ€μν©λλ€:
class ConnectionPoolManager:
"""Manages PostgreSQL connection pools."""
async def create_pool(self) -> Pool:
"""Create optimized connection pool."""
return await asyncpg.create_pool(
host=self.config.db_host,
port=self.config.db_port,
database=self.config.db_name,
user=self.config.db_user,
password=self.config.db_password,
# Pool configuration
min_size=2, # Minimum connections
max_size=10, # Maximum connections
max_inactive_connection_lifetime=300, # 5 minutes
# Query configuration
command_timeout=30, # Query timeout
server_settings={
"application_name": "zava-mcp-server",
"jit": "off", # Disable JIT for stability
"work_mem": "4MB", # Limit work memory
"statement_timeout": "30s"
}
)
async def execute_with_retry(
self,
query: str,
params: Tuple = None,
max_retries: int = 3
) -> List[Dict[str, Any]]:
"""Execute query with automatic retry logic."""
for attempt in range(max_retries):
try:
async with self.pool.acquire() as conn:
if params:
rows = await conn.fetch(query, *params)
else:
rows = await conn.fetch(query)
return [dict(row) for row in rows]
except (ConnectionError, InterfaceError) as e:
if attempt == max_retries - 1:
raise
# Exponential backoff
await asyncio.sleep(2 ** attempt)
logger.warning(f"Database connection failed, retrying ({attempt + 1}/{max_retries})")
class MCPServerManager:
"""Manages MCP server lifecycle and resources."""
async def startup(self):
"""Initialize server resources."""
# Create database connection pool
self.db_pool = await self.pool_manager.create_pool()
# Initialize AI services
self.ai_client = await self.create_ai_client()
# Setup monitoring
self.metrics_collector = MetricsCollector()
logger.info("MCP server startup complete")
async def shutdown(self):
"""Cleanup server resources."""
try:
# Close database connections
if self.db_pool:
await self.db_pool.close()
# Cleanup AI client
if self.ai_client:
await self.ai_client.close()
# Flush metrics
await self.metrics_collector.flush()
logger.info("MCP server shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
async def health_check(self) -> Dict[str, str]:
"""Verify server health status."""
status = {}
# Check database connection
try:
async with self.db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
status["database"] = "healthy"
except Exception as e:
status["database"] = f"unhealthy: {e}"
# Check AI service
try:
await self.ai_client.health_check()
status["ai_service"] = "healthy"
except Exception as e:
status["ai_service"] = f"unhealthy: {e}"
return status
κ²¬κ³ ν μ€λ₯ μ²λ¦¬λ MCP μλ²μ μμ μ μΈ μ΄μμ 보μ₯ν©λλ€:
class MCPError(Exception):
"""Base MCP server error."""
def __init__(self, message: str, error_code: str = "MCP_ERROR"):
self.message = message
self.error_code = error_code
super().__init__(message)
class DatabaseError(MCPError):
"""Database operation errors."""
def __init__(self, message: str, query: str = None):
super().__init__(message, "DATABASE_ERROR")
self.query = query
class AuthorizationError(MCPError):
"""Access control errors."""
def __init__(self, message: str, user_id: str = None):
super().__init__(message, "AUTHORIZATION_ERROR")
self.user_id = user_id
class QueryTimeoutError(DatabaseError):
"""Query execution timeout."""
def __init__(self, query: str):
super().__init__(f"Query timeout: {query[:100]}...", query)
self.error_code = "QUERY_TIMEOUT"
class ValidationError(MCPError):
"""Input validation errors."""
def __init__(self, field: str, value: Any, constraint: str):
message = f"Validation failed for {field}: {constraint}"
super().__init__(message, "VALIDATION_ERROR")
self.field = field
self.value = value
@contextmanager
async def error_handling_context(operation_name: str, user_id: str = None):
"""Centralized error handling for operations."""
start_time = time.time()
try:
yield
# Success metrics
duration = time.time() - start_time
metrics.operation_success.labels(operation=operation_name).inc()
metrics.operation_duration.labels(operation=operation_name).observe(duration)
except ValidationError as e:
logger.warning(f"Validation error in {operation_name}: {e.message}", extra={
"operation": operation_name,
"user_id": user_id,
"error_type": "validation",
"field": e.field
})
metrics.operation_error.labels(operation=operation_name, type="validation").inc()
raise
except AuthorizationError as e:
logger.warning(f"Authorization error in {operation_name}: {e.message}", extra={
"operation": operation_name,
"user_id": user_id,
"error_type": "authorization"
})
metrics.operation_error.labels(operation=operation_name, type="authorization").inc()
raise
except DatabaseError as e:
logger.error(f"Database error in {operation_name}: {e.message}", extra={
"operation": operation_name,
"user_id": user_id,
"error_type": "database",
"query": e.query[:100] if e.query else None
})
metrics.operation_error.labels(operation=operation_name, type="database").inc()
raise
except Exception as e:
logger.error(f"Unexpected error in {operation_name}: {str(e)}", extra={
"operation": operation_name,
"user_id": user_id,
"error_type": "unexpected"
}, exc_info=True)
metrics.operation_error.labels(operation=operation_name, type="unexpected").inc()
raise MCPError(f"Internal server error in {operation_name}")
class QueryPerformanceMonitor:
"""Monitor and optimize query performance."""
def __init__(self):
self.slow_query_threshold = 1.0 # seconds
self.query_stats = defaultdict(list)
@contextmanager
async def monitor_query(self, query: str, operation_type: str = "unknown"):
"""Monitor query execution time and performance."""
start_time = time.time()
query_hash = hashlib.md5(query.encode()).hexdigest()[:8]
try:
yield
duration = time.time() - start_time
# Record performance metrics
self.query_stats[operation_type].append(duration)
# Log slow queries
if duration > self.slow_query_threshold:
logger.warning(f"Slow query detected", extra={
"query_hash": query_hash,
"duration": duration,
"operation_type": operation_type,
"query": query[:200]
})
# Update metrics
metrics.query_duration.labels(type=operation_type).observe(duration)
except Exception as e:
duration = time.time() - start_time
logger.error(f"Query failed", extra={
"query_hash": query_hash,
"duration": duration,
"operation_type": operation_type,
"error": str(e)
})
raise
def get_performance_summary(self) -> Dict[str, Any]:
"""Generate performance summary report."""
summary = {}
for operation_type, durations in self.query_stats.items():
if durations:
summary[operation_type] = {
"count": len(durations),
"avg_duration": sum(durations) / len(durations),
"max_duration": max(durations),
"min_duration": min(durations),
"slow_queries": len([d for d in durations if d > self.slow_query_threshold])
}
return summary
class QueryCache:
"""Intelligent query result caching."""
def __init__(self, redis_url: str = None):
self.cache = {} # In-memory fallback
self.redis_client = redis.Redis.from_url(redis_url) if redis_url else None
self.cache_ttl = 300 # 5 minutes default
async def get_cached_result(
self,
cache_key: str,
query_func: Callable,
ttl: int = None
) -> Any:
"""Get result from cache or execute query."""
ttl = ttl or self.cache_ttl
# Try cache first
cached_result = await self._get_from_cache(cache_key)
if cached_result is not None:
metrics.cache_hit.labels(type="query").inc()
return cached_result
# Execute query
metrics.cache_miss.labels(type="query").inc()
result = await query_func()
# Cache result
await self._set_in_cache(cache_key, result, ttl)
return result
def _generate_cache_key(self, query: str, user_context: str) -> str:
"""Generate consistent cache key."""
key_data = f"{query}:{user_context}"
return hashlib.sha256(key_data.encode()).hexdigest()
μ΄ μ€μ΅μ μλ£ν ν, λ€μμ μ΄ν΄ν μ μμ΄μΌ ν©λλ€:
β κ³μΈ΅νλ μν€ν μ²: MCP μλ² μ€κ³μμ κ΄μ¬μ¬λ₯Ό λΆλ¦¬νλ λ°©λ²
β λ°μ΄ν°λ² μ΄μ€ ν¨ν΄: λ€μ€ ν λνΈ μ€ν€λ§ μ€κ³ λ° RLS ꡬν
β μ°κ²° κ΄λ¦¬: ν¨μ¨μ μΈ νλ§ λ° λ¦¬μμ€ λΌμ΄νμ¬μ΄ν΄
β μ€λ₯ μ²λ¦¬: κ³μΈ΅μ μ€λ₯ μ ν λ° λ³΅μλ ₯ ν¨ν΄
β μ±λ₯ μ΅μ ν: λͺ¨λν°λ§, μΊμ± λ° μΏΌλ¦¬ μ΅μ ν
β νλ‘λμ μ€λΉ: μΈνλΌ κ΄μ¬μ¬ λ° μ΄μ ν¨ν΄
Lab 02: 보μ λ° λ€μ€ ν λνΈλ‘ κ³μ μ§ννμ¬ λ€μμ μ¬μΈ΅μ μΌλ‘ νꡬνμΈμ:
---
λ€μ: 보μ ν¨ν΄μ νꡬν μ€λΉκ° λμ ¨λμ? Lab 02: 보μ λ° λ€μ€ ν λνΈλ‘ κ³μ μ§ννμΈμ.
---
λ©΄μ± μ‘°ν:
μ΄ λ¬Έμλ AI λ²μ μλΉμ€ Co-op Translatorλ₯Ό μ¬μ©νμ¬ λ²μλμμ΅λλ€.
μ νμ±μ μν΄ μ΅μ μ λ€νκ³ μμΌλ, μλ λ²μμλ μ€λ₯λ λΆμ νμ±μ΄ ν¬ν¨λ μ μμμ μ μνμκΈ° λ°λλλ€.
μλ³Έ λ¬Έμμ μμ΄ λ²μ μ΄ κΆμ μλ μΆμ²λ‘ κ°μ£Όλμ΄μΌ ν©λλ€.
μ€μν μ 보μ κ²½μ°, μ λ¬Έμ μΈ μΈκ° λ²μμ κΆμ₯ν©λλ€.
μ΄ λ²μ μ¬μ©μΌλ‘ μΈν΄ λ°μνλ μ€ν΄λ μλͺ»λ ν΄μμ λν΄ λΉμ¬λ μ± μμ μ§μ§ μμ΅λλ€.