01 - Core Architecture Concepts

Module
PostgreSQL
Progress
15%

Core Architecture Concepts

🎯 What This Lab Covers

This lab provides an in-depth exploration of MCP server architecture patterns, database design principles, and the technical implementation strategies that power robust, scalable database-integrated AI applications.

Overview

Building a production-ready MCP server with database integration requires careful architectural decisions.

This lab breaks down the key components, design patterns, and technical considerations that make our Zava Retail analytics solution robust, secure, and scalable.

You'll understand how each layer interacts, why specific technologies were chosen, and how to apply these patterns to your own MCP implementations.

Learning Objectives

By the end of this lab, you will be able to:

  • Analyze the layered architecture of an MCP server with database integration
  • Understand the role and responsibilities of each architectural component
  • Design database schemas that support multi-tenant MCP applications
  • Implement connection pooling and resource management strategies
  • Apply error handling and logging patterns for production systems
  • Evaluate trade-offs between different architectural approaches
  • πŸ—οΈ MCP Server Architecture Layers

    Our MCP server implements a layered architecture that separates concerns and promotes maintainability:

    Layer 1: Protocol Layer (FastMCP)

    Responsibility: Handle MCP protocol communication and message routing

    
    # FastMCP server setup
    
    from fastmcp import FastMCP
    
    
    
    mcp = FastMCP("Zava Retail Analytics")
    
    
    
    # Tool registration with type safety
    
    @mcp.tool()
    
    async def execute_sales_query(
    
        ctx: Context,
    
        postgresql_query: Annotated[str, Field(description="Well-formed PostgreSQL query")]
    
    ) -> str:
    
        """Execute PostgreSQL queries with Row Level Security."""
    
        return await query_executor.execute(postgresql_query, ctx)
    
    

    Key Features:

  • Protocol Compliance: Full MCP specification support
  • Type Safety: Pydantic models for request/response validation
  • Async Support: Non-blocking I/O for high concurrency
  • Error Handling: Standardized error responses
  • Layer 2: Business Logic Layer

    Responsibility: Implement business rules and coordinate between protocol and data layers

    
    class SalesAnalyticsService:
    
        """Business logic for retail analytics operations."""
    
        
    
        async def get_store_performance(
    
            self, 
    
            store_id: str, 
    
            time_period: str
    
        ) -> Dict[str, Any]:
    
            """Calculate store performance metrics."""
    
            
    
            # Validate business rules
    
            if not self._validate_store_access(store_id):
    
                raise UnauthorizedError("Access denied for store")
    
            
    
            # Coordinate data retrieval
    
            sales_data = await self.db_provider.get_sales_data(store_id, time_period)
    
            metrics = self._calculate_metrics(sales_data)
    
            
    
            return {
    
                "store_id": store_id,
    
                "period": time_period,
    
                "metrics": metrics,
    
                "insights": self._generate_insights(metrics)
    
            }
    
    

    Key Features:

  • Business Rule Enforcement: Store access validation and data integrity
  • Service Coordination: Orchestration between database and AI services
  • Data Transformation: Converting raw data to business insights
  • Caching Strategy: Performance optimization for frequent queries
  • Layer 3: Data Access Layer

    Responsibility: Manage database connections, query execution, and data mapping

    
    class PostgreSQLProvider:
    
        """Data access layer for PostgreSQL operations."""
    
        
    
        def __init__(self, connection_config: Dict[str, Any]):
    
            self.connection_pool: Optional[Pool] = None
    
            self.config = connection_config
    
        
    
        async def execute_query(
    
            self, 
    
            query: str, 
    
            rls_user_id: str
    
        ) -> List[Dict[str, Any]]:
    
            """Execute query with RLS context."""
    
            
    
            async with self.connection_pool.acquire() as conn:
    
                # Set RLS context
    
                await conn.execute(
    
                    "SELECT set_config('app.current_rls_user_id', $1, false)",
    
                    rls_user_id
    
                )
    
                
    
                # Execute query with timeout
    
                try:
    
                    rows = await asyncio.wait_for(
    
                        conn.fetch(query),
    
                        timeout=30.0
    
                    )
    
                    return [dict(row) for row in rows]
    
                except asyncio.TimeoutError:
    
                    raise QueryTimeoutError("Query execution exceeded timeout")
    
    

    Key Features:

  • Connection Pooling: Efficient resource management
  • Transaction Management: ACID compliance and rollback handling
  • Query Optimization: Performance monitoring and optimization
  • RLS Integration: Row-level security context management
  • Layer 4: Infrastructure Layer

    Responsibility: Handle cross-cutting concerns like logging, monitoring, and configuration

    
    class InfrastructureManager:
    
        """Infrastructure concerns management."""
    
        
    
        def __init__(self):
    
            self.logger = self._setup_logging()
    
            self.metrics = self._setup_metrics()
    
            self.config = self._load_configuration()
    
        
    
        def _setup_logging(self) -> Logger:
    
            """Configure structured logging."""
    
            logging.basicConfig(
    
                level=logging.INFO,
    
                format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    
                handlers=[
    
                    logging.StreamHandler(),
    
                    logging.FileHandler('mcp_server.log')
    
                ]
    
            )
    
            return logging.getLogger(__name__)
    
        
    
        async def track_query_execution(
    
            self, 
    
            query_type: str, 
    
            duration: float, 
    
            success: bool
    
        ):
    
            """Track query performance metrics."""
    
            self.metrics.counter('query_total').labels(
    
                type=query_type,
    
                status='success' if success else 'error'
    
            ).inc()
    
            
    
            self.metrics.histogram('query_duration').labels(
    
                type=query_type
    
            ).observe(duration)
    
    

    πŸ—„οΈ Database Design Patterns

    Our PostgreSQL schema implements several key patterns for multi-tenant MCP applications:

    1. Multi-Tenant Schema Design

    
    -- Core retail entities with store-based partitioning
    
    CREATE TABLE retail.stores (
    
        store_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    
        name VARCHAR(100) NOT NULL,
    
        location VARCHAR(200) NOT NULL,
    
        manager_id UUID NOT NULL,
    
        created_at TIMESTAMP DEFAULT NOW()
    
    );
    
    
    
    CREATE TABLE retail.customers (
    
        customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    
        store_id UUID REFERENCES retail.stores(store_id),
    
        first_name VARCHAR(50) NOT NULL,
    
        last_name VARCHAR(50) NOT NULL,
    
        email VARCHAR(100) UNIQUE,
    
        created_at TIMESTAMP DEFAULT NOW()
    
    );
    
    
    
    CREATE TABLE retail.orders (
    
        order_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    
        customer_id UUID REFERENCES retail.customers(customer_id),
    
        store_id UUID REFERENCES retail.stores(store_id),
    
        order_date TIMESTAMP DEFAULT NOW(),
    
        total_amount DECIMAL(10,2) NOT NULL,
    
        status VARCHAR(20) DEFAULT 'pending'
    
    );
    
    

    Design Principles:

  • Foreign Key Consistency: Ensure data integrity across tables
  • Store ID Propagation: Every transactional table includes store_id
  • UUID Primary Keys: Globally unique identifiers for distributed systems
  • Timestamp Tracking: Audit trail for all data changes
  • 2. Row Level Security Implementation

    
    -- Enable RLS on multi-tenant tables
    
    ALTER TABLE retail.customers ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.orders ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.order_items ENABLE ROW LEVEL SECURITY;
    
    
    
    -- Store manager can only see their store's data
    
    CREATE POLICY store_manager_customers ON retail.customers
    
        FOR ALL TO store_managers
    
        USING (store_id = get_current_user_store());
    
    
    
    CREATE POLICY store_manager_orders ON retail.orders
    
        FOR ALL TO store_managers
    
        USING (store_id = get_current_user_store());
    
    
    
    -- Regional managers see multiple stores
    
    CREATE POLICY regional_manager_orders ON retail.orders
    
        FOR ALL TO regional_managers
    
        USING (store_id = ANY(get_user_store_list()));
    
    
    
    -- Support function for RLS context
    
    CREATE OR REPLACE FUNCTION get_current_user_store()
    
    RETURNS UUID AS $$
    
    BEGIN
    
        RETURN current_setting('app.current_rls_user_id')::UUID;
    
    EXCEPTION WHEN OTHERS THEN
    
        RETURN '00000000-0000-0000-0000-000000000000'::UUID;
    
    END;
    
    $$ LANGUAGE plpgsql SECURITY DEFINER;
    
    

    RLS Benefits:

  • Automatic Filtering: Database enforces data isolation
  • Application Simplicity: No complex WHERE clauses needed
  • Security by Default: Impossible to accidentally access wrong data
  • Audit Compliance: Clear data access boundaries
  • 3. Vector Search Schema

    
    -- Product embeddings for semantic search
    
    CREATE TABLE retail.product_description_embeddings (
    
        product_id UUID PRIMARY KEY REFERENCES retail.products(product_id),
    
        description_embedding vector(1536),
    
        last_updated TIMESTAMP DEFAULT NOW()
    
    );
    
    
    
    -- Optimize vector similarity search
    
    CREATE INDEX idx_product_embeddings_vector 
    
    ON retail.product_description_embeddings 
    
    USING ivfflat (description_embedding vector_cosine_ops);
    
    
    
    -- Semantic search function
    
    CREATE OR REPLACE FUNCTION search_products_by_description(
    
        query_embedding vector(1536),
    
        similarity_threshold FLOAT DEFAULT 0.7,
    
        max_results INTEGER DEFAULT 20
    
    )
    
    RETURNS TABLE(
    
        product_id UUID,
    
        name VARCHAR,
    
        description TEXT,
    
        similarity_score FLOAT
    
    ) AS $$
    
    BEGIN
    
        RETURN QUERY
    
        SELECT 
    
            p.product_id,
    
            p.name,
    
            p.description,
    
            (1 - (pde.description_embedding <=> query_embedding)) AS similarity_score
    
        FROM retail.products p
    
        JOIN retail.product_description_embeddings pde ON p.product_id = pde.product_id
    
        WHERE (pde.description_embedding <=> query_embedding) <= (1 - similarity_threshold)
    
        ORDER BY similarity_score DESC
    
        LIMIT max_results;
    
    END;
    
    $$ LANGUAGE plpgsql;
    
    

    πŸ”Œ Connection Management Patterns

    Efficient database connection management is critical for MCP server performance:

    Connection Pool Configuration

    
    class ConnectionPoolManager:
    
        """Manages PostgreSQL connection pools."""
    
        
    
        async def create_pool(self) -> Pool:
    
            """Create optimized connection pool."""
    
            return await asyncpg.create_pool(
    
                host=self.config.db_host,
    
                port=self.config.db_port,
    
                database=self.config.db_name,
    
                user=self.config.db_user,
    
                password=self.config.db_password,
    
                
    
                # Pool configuration
    
                min_size=2,          # Minimum connections
    
                max_size=10,         # Maximum connections
    
                max_inactive_connection_lifetime=300,  # 5 minutes
    
                
    
                # Query configuration
    
                command_timeout=30,   # Query timeout
    
                server_settings={
    
                    "application_name": "zava-mcp-server",
    
                    "jit": "off",          # Disable JIT for stability
    
                    "work_mem": "4MB",     # Limit work memory
    
                    "statement_timeout": "30s"
    
                }
    
            )
    
        
    
        async def execute_with_retry(
    
            self, 
    
            query: str, 
    
            params: Tuple = None,
    
            max_retries: int = 3
    
        ) -> List[Dict[str, Any]]:
    
            """Execute query with automatic retry logic."""
    
            
    
            for attempt in range(max_retries):
    
                try:
    
                    async with self.pool.acquire() as conn:
    
                        if params:
    
                            rows = await conn.fetch(query, *params)
    
                        else:
    
                            rows = await conn.fetch(query)
    
                        return [dict(row) for row in rows]
    
                        
    
                except (ConnectionError, InterfaceError) as e:
    
                    if attempt == max_retries - 1:
    
                        raise
    
                    
    
                    # Exponential backoff
    
                    await asyncio.sleep(2 ** attempt)
    
                    logger.warning(f"Database connection failed, retrying ({attempt + 1}/{max_retries})")
    
    

    Resource Lifecycle Management

    
    class MCPServerManager:
    
        """Manages MCP server lifecycle and resources."""
    
        
    
        async def startup(self):
    
            """Initialize server resources."""
    
            # Create database connection pool
    
            self.db_pool = await self.pool_manager.create_pool()
    
            
    
            # Initialize AI services
    
            self.ai_client = await self.create_ai_client()
    
            
    
            # Setup monitoring
    
            self.metrics_collector = MetricsCollector()
    
            
    
            logger.info("MCP server startup complete")
    
        
    
        async def shutdown(self):
    
            """Cleanup server resources."""
    
            try:
    
                # Close database connections
    
                if self.db_pool:
    
                    await self.db_pool.close()
    
                
    
                # Cleanup AI client
    
                if self.ai_client:
    
                    await self.ai_client.close()
    
                
    
                # Flush metrics
    
                await self.metrics_collector.flush()
    
                
    
                logger.info("MCP server shutdown complete")
    
                
    
            except Exception as e:
    
                logger.error(f"Error during shutdown: {e}")
    
        
    
        async def health_check(self) -> Dict[str, str]:
    
            """Verify server health status."""
    
            status = {}
    
            
    
            # Check database connection
    
            try:
    
                async with self.db_pool.acquire() as conn:
    
                    await conn.fetchval("SELECT 1")
    
                status["database"] = "healthy"
    
            except Exception as e:
    
                status["database"] = f"unhealthy: {e}"
    
            
    
            # Check AI service
    
            try:
    
                await self.ai_client.health_check()
    
                status["ai_service"] = "healthy"
    
            except Exception as e:
    
                status["ai_service"] = f"unhealthy: {e}"
    
            
    
            return status
    
    

    πŸ›‘οΈ Error Handling and Resilience Patterns

    Robust error handling ensures reliable MCP server operation:

    Hierarchical Error Types

    
    class MCPError(Exception):
    
        """Base MCP server error."""
    
        def __init__(self, message: str, error_code: str = "MCP_ERROR"):
    
            self.message = message
    
            self.error_code = error_code
    
            super().__init__(message)
    
    
    
    class DatabaseError(MCPError):
    
        """Database operation errors."""
    
        def __init__(self, message: str, query: str = None):
    
            super().__init__(message, "DATABASE_ERROR")
    
            self.query = query
    
    
    
    class AuthorizationError(MCPError):
    
        """Access control errors."""
    
        def __init__(self, message: str, user_id: str = None):
    
            super().__init__(message, "AUTHORIZATION_ERROR")
    
            self.user_id = user_id
    
    
    
    class QueryTimeoutError(DatabaseError):
    
        """Query execution timeout."""
    
        def __init__(self, query: str):
    
            super().__init__(f"Query timeout: {query[:100]}...", query)
    
            self.error_code = "QUERY_TIMEOUT"
    
    
    
    class ValidationError(MCPError):
    
        """Input validation errors."""
    
        def __init__(self, field: str, value: Any, constraint: str):
    
            message = f"Validation failed for {field}: {constraint}"
    
            super().__init__(message, "VALIDATION_ERROR")
    
            self.field = field
    
            self.value = value
    
    

    Error Handling Middleware

    
    @contextmanager
    
    async def error_handling_context(operation_name: str, user_id: str = None):
    
        """Centralized error handling for operations."""
    
        start_time = time.time()
    
        
    
        try:
    
            yield
    
            
    
            # Success metrics
    
            duration = time.time() - start_time
    
            metrics.operation_success.labels(operation=operation_name).inc()
    
            metrics.operation_duration.labels(operation=operation_name).observe(duration)
    
            
    
        except ValidationError as e:
    
            logger.warning(f"Validation error in {operation_name}: {e.message}", extra={
    
                "operation": operation_name,
    
                "user_id": user_id,
    
                "error_type": "validation",
    
                "field": e.field
    
            })
    
            metrics.operation_error.labels(operation=operation_name, type="validation").inc()
    
            raise
    
            
    
        except AuthorizationError as e:
    
            logger.warning(f"Authorization error in {operation_name}: {e.message}", extra={
    
                "operation": operation_name,
    
                "user_id": user_id,
    
                "error_type": "authorization"
    
            })
    
            metrics.operation_error.labels(operation=operation_name, type="authorization").inc()
    
            raise
    
            
    
        except DatabaseError as e:
    
            logger.error(f"Database error in {operation_name}: {e.message}", extra={
    
                "operation": operation_name,
    
                "user_id": user_id,
    
                "error_type": "database",
    
                "query": e.query[:100] if e.query else None
    
            })
    
            metrics.operation_error.labels(operation=operation_name, type="database").inc()
    
            raise
    
            
    
        except Exception as e:
    
            logger.error(f"Unexpected error in {operation_name}: {str(e)}", extra={
    
                "operation": operation_name,
    
                "user_id": user_id,
    
                "error_type": "unexpected"
    
            }, exc_info=True)
    
            metrics.operation_error.labels(operation=operation_name, type="unexpected").inc()
    
            raise MCPError(f"Internal server error in {operation_name}")
    
    

    πŸ“Š Performance Optimization Strategies

    Query Performance Monitoring

    
    class QueryPerformanceMonitor:
    
        """Monitor and optimize query performance."""
    
        
    
        def __init__(self):
    
            self.slow_query_threshold = 1.0  # seconds
    
            self.query_stats = defaultdict(list)
    
        
    
        @contextmanager
    
        async def monitor_query(self, query: str, operation_type: str = "unknown"):
    
            """Monitor query execution time and performance."""
    
            start_time = time.time()
    
            query_hash = hashlib.md5(query.encode()).hexdigest()[:8]
    
            
    
            try:
    
                yield
    
                
    
                duration = time.time() - start_time
    
                
    
                # Record performance metrics
    
                self.query_stats[operation_type].append(duration)
    
                
    
                # Log slow queries
    
                if duration > self.slow_query_threshold:
    
                    logger.warning(f"Slow query detected", extra={
    
                        "query_hash": query_hash,
    
                        "duration": duration,
    
                        "operation_type": operation_type,
    
                        "query": query[:200]
    
                    })
    
                
    
                # Update metrics
    
                metrics.query_duration.labels(type=operation_type).observe(duration)
    
                
    
            except Exception as e:
    
                duration = time.time() - start_time
    
                logger.error(f"Query failed", extra={
    
                    "query_hash": query_hash,
    
                    "duration": duration,
    
                    "operation_type": operation_type,
    
                    "error": str(e)
    
                })
    
                raise
    
        
    
        def get_performance_summary(self) -> Dict[str, Any]:
    
            """Generate performance summary report."""
    
            summary = {}
    
            
    
            for operation_type, durations in self.query_stats.items():
    
                if durations:
    
                    summary[operation_type] = {
    
                        "count": len(durations),
    
                        "avg_duration": sum(durations) / len(durations),
    
                        "max_duration": max(durations),
    
                        "min_duration": min(durations),
    
                        "slow_queries": len([d for d in durations if d > self.slow_query_threshold])
    
                    }
    
            
    
            return summary
    
    

    Caching Strategy

    
    class QueryCache:
    
        """Intelligent query result caching."""
    
        
    
        def __init__(self, redis_url: str = None):
    
            self.cache = {}  # In-memory fallback
    
            self.redis_client = redis.Redis.from_url(redis_url) if redis_url else None
    
            self.cache_ttl = 300  # 5 minutes default
    
        
    
        async def get_cached_result(
    
            self, 
    
            cache_key: str, 
    
            query_func: Callable,
    
            ttl: int = None
    
        ) -> Any:
    
            """Get result from cache or execute query."""
    
            ttl = ttl or self.cache_ttl
    
            
    
            # Try cache first
    
            cached_result = await self._get_from_cache(cache_key)
    
            if cached_result is not None:
    
                metrics.cache_hit.labels(type="query").inc()
    
                return cached_result
    
            
    
            # Execute query
    
            metrics.cache_miss.labels(type="query").inc()
    
            result = await query_func()
    
            
    
            # Cache result
    
            await self._set_in_cache(cache_key, result, ttl)
    
            
    
            return result
    
        
    
        def _generate_cache_key(self, query: str, user_context: str) -> str:
    
            """Generate consistent cache key."""
    
            key_data = f"{query}:{user_context}"
    
            return hashlib.sha256(key_data.encode()).hexdigest()
    
    

    🎯 Key Takeaways

    After completing this lab, you should understand:

    βœ… Layered Architecture: How to separate concerns in MCP server design

    βœ… Database Patterns: Multi-tenant schema design and RLS implementation

    βœ… Connection Management: Efficient pooling and resource lifecycle

    βœ… Error Handling: Hierarchical error types and resilience patterns

    βœ… Performance Optimization: Monitoring, caching, and query optimization

    βœ… Production Readiness: Infrastructure concerns and operational patterns

    πŸš€ What's Next

    Continue with Lab 02: Security and Multi-Tenancy to dive deep into:

  • Row Level Security implementation details
  • Authentication and authorization patterns
  • Multi-tenant data isolation strategies
  • Security audit and compliance considerations
  • πŸ“š Additional Resources

    Architecture Patterns

  • Clean Architecture in Python - Architectural patterns for Python applications
  • Database Design Patterns - Relational database design principles
  • Microservices Patterns - Service architecture patterns
  • PostgreSQL Advanced Topics

  • PostgreSQL Performance Tuning - Database optimization guide
  • Connection Pooling Best Practices - Connection management
  • Query Planning and Optimization - Query performance
  • Python Async Patterns

  • AsyncIO Best Practices - Async programming patterns
  • FastAPI Architecture - Modern Python web architecture
  • Pydantic Models - Data validation and serialization
  • ---

    Next: Ready to explore security patterns? Continue with Lab 02: Security and Multi-Tenancy

    핡심 μ•„ν‚€ν…μ²˜ κ°œλ…

    🎯 이 μ‹€μŠ΅μ—μ„œ λ‹€λ£¨λŠ” λ‚΄μš©

    이 μ‹€μŠ΅μ€ MCP μ„œλ²„ μ•„ν‚€ν…μ²˜ νŒ¨ν„΄, λ°μ΄ν„°λ² μ΄μŠ€ 섀계 원칙, 그리고 κ²¬κ³ ν•˜κ³  ν™•μž₯ κ°€λŠ₯ν•œ λ°μ΄ν„°λ² μ΄μŠ€ 톡합 AI μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ„ κ΅¬ν˜„ν•˜λŠ” 기술적 μ „λž΅μ— λŒ€ν•œ 심측적인 탐ꡬλ₯Ό μ œκ³΅ν•©λ‹ˆλ‹€.

    κ°œμš”

    λ°μ΄ν„°λ² μ΄μŠ€ 톡합을 ν¬ν•¨ν•œ ν”„λ‘œλ•μ…˜ μ€€λΉ„ MCP μ„œλ²„λ₯Ό κ΅¬μΆ•ν•˜λ €λ©΄ μ‹ μ€‘ν•œ μ•„ν‚€ν…μ²˜ 결정이 ν•„μš”ν•©λ‹ˆλ‹€. 이 μ‹€μŠ΅μ—μ„œλŠ” Zava Retail 뢄석 μ†”λ£¨μ…˜μ„ κ²¬κ³ ν•˜κ³  μ•ˆμ „ν•˜λ©° ν™•μž₯ κ°€λŠ₯ν•˜κ²Œ λ§Œλ“œλŠ” 핡심 ꡬ성 μš”μ†Œ, 섀계 νŒ¨ν„΄, 기술적 κ³ λ € 사항을 λΆ„ν•΄ν•˜μ—¬ μ„€λͺ…ν•©λ‹ˆλ‹€.

    각 계측이 μ–΄λ–»κ²Œ μƒν˜Έμž‘μš©ν•˜λŠ”μ§€, νŠΉμ • 기술이 μ™œ μ„ νƒλ˜μ—ˆλŠ”μ§€, 그리고 μ΄λŸ¬ν•œ νŒ¨ν„΄μ„ μžμ‹ μ˜ MCP κ΅¬ν˜„μ— μ–΄λ–»κ²Œ μ μš©ν•  수 μžˆλŠ”μ§€λ₯Ό μ΄ν•΄ν•˜κ²Œ 될 κ²ƒμž…λ‹ˆλ‹€.

    ν•™μŠ΅ λͺ©ν‘œ

    이 μ‹€μŠ΅μ„ μ™„λ£Œν•˜λ©΄ λ‹€μŒμ„ μˆ˜ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€:

  • 뢄석: λ°μ΄ν„°λ² μ΄μŠ€ 톡합 MCP μ„œλ²„μ˜ κ³„μΈ΅ν™”λœ μ•„ν‚€ν…μ²˜ 뢄석
  • 이해: 각 μ•„ν‚€ν…μ²˜ ꡬ성 μš”μ†Œμ˜ μ—­ν• κ³Ό μ±…μž„
  • 섀계: 닀쀑 ν…Œλ„ŒνŠΈ MCP μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ„ μ§€μ›ν•˜λŠ” λ°μ΄ν„°λ² μ΄μŠ€ μŠ€ν‚€λ§ˆ 섀계
  • κ΅¬ν˜„: μ—°κ²° 풀링 및 λ¦¬μ†ŒμŠ€ 관리 μ „λž΅
  • 적용: ν”„λ‘œλ•μ…˜ μ‹œμŠ€ν…œμ„ μœ„ν•œ 였λ₯˜ 처리 및 λ‘œκΉ… νŒ¨ν„΄
  • 평가: λ‹€μ–‘ν•œ μ•„ν‚€ν…μ²˜ μ ‘κ·Ό 방식 κ°„μ˜ νŠΈλ ˆμ΄λ“œμ˜€ν”„ 평가
  • πŸ—οΈ MCP μ„œλ²„ μ•„ν‚€ν…μ²˜ 계측

    우리의 MCP μ„œλ²„λŠ” κ³„μΈ΅ν™”λœ μ•„ν‚€ν…μ²˜λ₯Ό κ΅¬ν˜„ν•˜μ—¬ 관심사λ₯Ό λΆ„λ¦¬ν•˜κ³  μœ μ§€λ³΄μˆ˜λ₯Ό μ΄‰μ§„ν•©λ‹ˆλ‹€:

    계측 1: ν”„λ‘œν† μ½œ 계측 (FastMCP)

    μ±…μž„: MCP ν”„λ‘œν† μ½œ 톡신 및 λ©”μ‹œμ§€ λΌμš°νŒ… 처리

    
    # FastMCP server setup
    
    from fastmcp import FastMCP
    
    
    
    mcp = FastMCP("Zava Retail Analytics")
    
    
    
    # Tool registration with type safety
    
    @mcp.tool()
    
    async def execute_sales_query(
    
        ctx: Context,
    
        postgresql_query: Annotated[str, Field(description="Well-formed PostgreSQL query")]
    
    ) -> str:
    
        """Execute PostgreSQL queries with Row Level Security."""
    
        return await query_executor.execute(postgresql_query, ctx)
    
    

    μ£Όμš” κΈ°λŠ₯:

  • ν”„λ‘œν† μ½œ μ€€μˆ˜: MCP 사양 μ™„λ²½ 지원
  • νƒ€μž… μ•ˆμ „μ„±: μš”μ²­/응닡 μœ νš¨μ„± 검사λ₯Ό μœ„ν•œ Pydantic λͺ¨λΈ
  • 비동기 지원: 높은 λ™μ‹œμ„±μ„ μœ„ν•œ 비차단 I/O
  • 였λ₯˜ 처리: ν‘œμ€€ν™”λœ 였λ₯˜ 응닡
  • 계측 2: λΉ„μ¦ˆλ‹ˆμŠ€ 둜직 계측

    μ±…μž„: λΉ„μ¦ˆλ‹ˆμŠ€ κ·œμΉ™ κ΅¬ν˜„ 및 ν”„λ‘œν† μ½œκ³Ό 데이터 계측 κ°„ μ‘°μ •

    
    class SalesAnalyticsService:
    
        """Business logic for retail analytics operations."""
    
        
    
        async def get_store_performance(
    
            self, 
    
            store_id: str, 
    
            time_period: str
    
        ) -> Dict[str, Any]:
    
            """Calculate store performance metrics."""
    
            
    
            # Validate business rules
    
            if not self._validate_store_access(store_id):
    
                raise UnauthorizedError("Access denied for store")
    
            
    
            # Coordinate data retrieval
    
            sales_data = await self.db_provider.get_sales_data(store_id, time_period)
    
            metrics = self._calculate_metrics(sales_data)
    
            
    
            return {
    
                "store_id": store_id,
    
                "period": time_period,
    
                "metrics": metrics,
    
                "insights": self._generate_insights(metrics)
    
            }
    
    

    μ£Όμš” κΈ°λŠ₯:

  • λΉ„μ¦ˆλ‹ˆμŠ€ κ·œμΉ™ μ€€μˆ˜: μ €μž₯μ†Œ μ ‘κ·Ό 검증 및 데이터 무결성
  • μ„œλΉ„μŠ€ μ‘°μ •: λ°μ΄ν„°λ² μ΄μŠ€μ™€ AI μ„œλΉ„μŠ€ κ°„ μ˜€μΌ€μŠ€νŠΈλ ˆμ΄μ…˜
  • 데이터 λ³€ν™˜: μ›μ‹œ 데이터λ₯Ό λΉ„μ¦ˆλ‹ˆμŠ€ μΈμ‚¬μ΄νŠΈλ‘œ λ³€ν™˜
  • 캐싱 μ „λž΅: λΉˆλ²ˆν•œ 쿼리에 λŒ€ν•œ μ„±λŠ₯ μ΅œμ ν™”
  • 계측 3: 데이터 μ ‘κ·Ό 계측

    μ±…μž„: λ°μ΄ν„°λ² μ΄μŠ€ μ—°κ²° 관리, 쿼리 μ‹€ν–‰ 및 데이터 λ§€ν•‘

    
    class PostgreSQLProvider:
    
        """Data access layer for PostgreSQL operations."""
    
        
    
        def __init__(self, connection_config: Dict[str, Any]):
    
            self.connection_pool: Optional[Pool] = None
    
            self.config = connection_config
    
        
    
        async def execute_query(
    
            self, 
    
            query: str, 
    
            rls_user_id: str
    
        ) -> List[Dict[str, Any]]:
    
            """Execute query with RLS context."""
    
            
    
            async with self.connection_pool.acquire() as conn:
    
                # Set RLS context
    
                await conn.execute(
    
                    "SELECT set_config('app.current_rls_user_id', $1, false)",
    
                    rls_user_id
    
                )
    
                
    
                # Execute query with timeout
    
                try:
    
                    rows = await asyncio.wait_for(
    
                        conn.fetch(query),
    
                        timeout=30.0
    
                    )
    
                    return [dict(row) for row in rows]
    
                except asyncio.TimeoutError:
    
                    raise QueryTimeoutError("Query execution exceeded timeout")
    
    

    μ£Όμš” κΈ°λŠ₯:

  • μ—°κ²° 풀링: 효율적인 λ¦¬μ†ŒμŠ€ 관리
  • νŠΈλžœμž­μ…˜ 관리: ACID μ€€μˆ˜ 및 λ‘€λ°± 처리
  • 쿼리 μ΅œμ ν™”: μ„±λŠ₯ λͺ¨λ‹ˆν„°λ§ 및 μ΅œμ ν™”
  • RLS 톡합: ν–‰ μˆ˜μ€€ λ³΄μ•ˆ μ»¨ν…μŠ€νŠΈ 관리
  • 계측 4: 인프라 계측

    μ±…μž„: λ‘œκΉ…, λͺ¨λ‹ˆν„°λ§, ꡬ성과 같은 νš‘λ‹¨ 관심사 처리

    
    class InfrastructureManager:
    
        """Infrastructure concerns management."""
    
        
    
        def __init__(self):
    
            self.logger = self._setup_logging()
    
            self.metrics = self._setup_metrics()
    
            self.config = self._load_configuration()
    
        
    
        def _setup_logging(self) -> Logger:
    
            """Configure structured logging."""
    
            logging.basicConfig(
    
                level=logging.INFO,
    
                format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    
                handlers=[
    
                    logging.StreamHandler(),
    
                    logging.FileHandler('mcp_server.log')
    
                ]
    
            )
    
            return logging.getLogger(__name__)
    
        
    
        async def track_query_execution(
    
            self, 
    
            query_type: str, 
    
            duration: float, 
    
            success: bool
    
        ):
    
            """Track query performance metrics."""
    
            self.metrics.counter('query_total').labels(
    
                type=query_type,
    
                status='success' if success else 'error'
    
            ).inc()
    
            
    
            self.metrics.histogram('query_duration').labels(
    
                type=query_type
    
            ).observe(duration)
    
    

    πŸ—„οΈ λ°μ΄ν„°λ² μ΄μŠ€ 섀계 νŒ¨ν„΄

    우리의 PostgreSQL μŠ€ν‚€λ§ˆλŠ” 닀쀑 ν…Œλ„ŒνŠΈ MCP μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ„ μœ„ν•œ λͺ‡ κ°€μ§€ μ£Όμš” νŒ¨ν„΄μ„ κ΅¬ν˜„ν•©λ‹ˆλ‹€:

    1. 닀쀑 ν…Œλ„ŒνŠΈ μŠ€ν‚€λ§ˆ 섀계

    
    -- Core retail entities with store-based partitioning
    
    CREATE TABLE retail.stores (
    
        store_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    
        name VARCHAR(100) NOT NULL,
    
        location VARCHAR(200) NOT NULL,
    
        manager_id UUID NOT NULL,
    
        created_at TIMESTAMP DEFAULT NOW()
    
    );
    
    
    
    CREATE TABLE retail.customers (
    
        customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    
        store_id UUID REFERENCES retail.stores(store_id),
    
        first_name VARCHAR(50) NOT NULL,
    
        last_name VARCHAR(50) NOT NULL,
    
        email VARCHAR(100) UNIQUE,
    
        created_at TIMESTAMP DEFAULT NOW()
    
    );
    
    
    
    CREATE TABLE retail.orders (
    
        order_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    
        customer_id UUID REFERENCES retail.customers(customer_id),
    
        store_id UUID REFERENCES retail.stores(store_id),
    
        order_date TIMESTAMP DEFAULT NOW(),
    
        total_amount DECIMAL(10,2) NOT NULL,
    
        status VARCHAR(20) DEFAULT 'pending'
    
    );
    
    

    섀계 원칙:

  • μ™Έλž˜ ν‚€ 일관성: ν…Œμ΄λΈ” κ°„ 데이터 무결성 보μž₯
  • μ €μž₯μ†Œ ID μ „νŒŒ: λͺ¨λ“  νŠΈλžœμž­μ…˜ ν…Œμ΄λΈ”μ— store_id 포함
  • UUID κΈ°λ³Έ ν‚€: λΆ„μ‚° μ‹œμŠ€ν…œμ„ μœ„ν•œ μ „μ—­μ μœΌλ‘œ κ³ μœ ν•œ μ‹λ³„μž
  • νƒ€μž„μŠ€νƒ¬ν”„ 좔적: λͺ¨λ“  데이터 변경에 λŒ€ν•œ 감사 기둝
  • 2. ν–‰ μˆ˜μ€€ λ³΄μ•ˆ κ΅¬ν˜„

    
    -- Enable RLS on multi-tenant tables
    
    ALTER TABLE retail.customers ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.orders ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.order_items ENABLE ROW LEVEL SECURITY;
    
    
    
    -- Store manager can only see their store's data
    
    CREATE POLICY store_manager_customers ON retail.customers
    
        FOR ALL TO store_managers
    
        USING (store_id = get_current_user_store());
    
    
    
    CREATE POLICY store_manager_orders ON retail.orders
    
        FOR ALL TO store_managers
    
        USING (store_id = get_current_user_store());
    
    
    
    -- Regional managers see multiple stores
    
    CREATE POLICY regional_manager_orders ON retail.orders
    
        FOR ALL TO regional_managers
    
        USING (store_id = ANY(get_user_store_list()));
    
    
    
    -- Support function for RLS context
    
    CREATE OR REPLACE FUNCTION get_current_user_store()
    
    RETURNS UUID AS $$
    
    BEGIN
    
        RETURN current_setting('app.current_rls_user_id')::UUID;
    
    EXCEPTION WHEN OTHERS THEN
    
        RETURN '00000000-0000-0000-0000-000000000000'::UUID;
    
    END;
    
    $$ LANGUAGE plpgsql SECURITY DEFINER;
    
    

    RLS의 이점:

  • μžλ™ 필터링: λ°μ΄ν„°λ² μ΄μŠ€κ°€ 데이터 격리λ₯Ό κ°•μ œ
  • μ• ν”Œλ¦¬μΌ€μ΄μ…˜ λ‹¨μˆœν™”: λ³΅μž‘ν•œ WHERE 절 λΆˆν•„μš”
  • κΈ°λ³Έ λ³΄μ•ˆ: 잘λͺ»λœ 데이터 접근이 λΆˆκ°€λŠ₯
  • 감사 μ€€μˆ˜: λͺ…ν™•ν•œ 데이터 μ ‘κ·Ό 경계
  • 3. 벑터 검색 μŠ€ν‚€λ§ˆ

    
    -- Product embeddings for semantic search
    
    CREATE TABLE retail.product_description_embeddings (
    
        product_id UUID PRIMARY KEY REFERENCES retail.products(product_id),
    
        description_embedding vector(1536),
    
        last_updated TIMESTAMP DEFAULT NOW()
    
    );
    
    
    
    -- Optimize vector similarity search
    
    CREATE INDEX idx_product_embeddings_vector 
    
    ON retail.product_description_embeddings 
    
    USING ivfflat (description_embedding vector_cosine_ops);
    
    
    
    -- Semantic search function
    
    CREATE OR REPLACE FUNCTION search_products_by_description(
    
        query_embedding vector(1536),
    
        similarity_threshold FLOAT DEFAULT 0.7,
    
        max_results INTEGER DEFAULT 20
    
    )
    
    RETURNS TABLE(
    
        product_id UUID,
    
        name VARCHAR,
    
        description TEXT,
    
        similarity_score FLOAT
    
    ) AS $$
    
    BEGIN
    
        RETURN QUERY
    
        SELECT 
    
            p.product_id,
    
            p.name,
    
            p.description,
    
            (1 - (pde.description_embedding <=> query_embedding)) AS similarity_score
    
        FROM retail.products p
    
        JOIN retail.product_description_embeddings pde ON p.product_id = pde.product_id
    
        WHERE (pde.description_embedding <=> query_embedding) <= (1 - similarity_threshold)
    
        ORDER BY similarity_score DESC
    
        LIMIT max_results;
    
    END;
    
    $$ LANGUAGE plpgsql;
    
    

    πŸ”Œ μ—°κ²° 관리 νŒ¨ν„΄

    효율적인 λ°μ΄ν„°λ² μ΄μŠ€ μ—°κ²° κ΄€λ¦¬λŠ” MCP μ„œλ²„ μ„±λŠ₯에 μ€‘μš”ν•©λ‹ˆλ‹€:

    μ—°κ²° ν’€ ꡬ성

    
    class ConnectionPoolManager:
    
        """Manages PostgreSQL connection pools."""
    
        
    
        async def create_pool(self) -> Pool:
    
            """Create optimized connection pool."""
    
            return await asyncpg.create_pool(
    
                host=self.config.db_host,
    
                port=self.config.db_port,
    
                database=self.config.db_name,
    
                user=self.config.db_user,
    
                password=self.config.db_password,
    
                
    
                # Pool configuration
    
                min_size=2,          # Minimum connections
    
                max_size=10,         # Maximum connections
    
                max_inactive_connection_lifetime=300,  # 5 minutes
    
                
    
                # Query configuration
    
                command_timeout=30,   # Query timeout
    
                server_settings={
    
                    "application_name": "zava-mcp-server",
    
                    "jit": "off",          # Disable JIT for stability
    
                    "work_mem": "4MB",     # Limit work memory
    
                    "statement_timeout": "30s"
    
                }
    
            )
    
        
    
        async def execute_with_retry(
    
            self, 
    
            query: str, 
    
            params: Tuple = None,
    
            max_retries: int = 3
    
        ) -> List[Dict[str, Any]]:
    
            """Execute query with automatic retry logic."""
    
            
    
            for attempt in range(max_retries):
    
                try:
    
                    async with self.pool.acquire() as conn:
    
                        if params:
    
                            rows = await conn.fetch(query, *params)
    
                        else:
    
                            rows = await conn.fetch(query)
    
                        return [dict(row) for row in rows]
    
                        
    
                except (ConnectionError, InterfaceError) as e:
    
                    if attempt == max_retries - 1:
    
                        raise
    
                    
    
                    # Exponential backoff
    
                    await asyncio.sleep(2 ** attempt)
    
                    logger.warning(f"Database connection failed, retrying ({attempt + 1}/{max_retries})")
    
    

    λ¦¬μ†ŒμŠ€ 라이프사이클 관리

    
    class MCPServerManager:
    
        """Manages MCP server lifecycle and resources."""
    
        
    
        async def startup(self):
    
            """Initialize server resources."""
    
            # Create database connection pool
    
            self.db_pool = await self.pool_manager.create_pool()
    
            
    
            # Initialize AI services
    
            self.ai_client = await self.create_ai_client()
    
            
    
            # Setup monitoring
    
            self.metrics_collector = MetricsCollector()
    
            
    
            logger.info("MCP server startup complete")
    
        
    
        async def shutdown(self):
    
            """Cleanup server resources."""
    
            try:
    
                # Close database connections
    
                if self.db_pool:
    
                    await self.db_pool.close()
    
                
    
                # Cleanup AI client
    
                if self.ai_client:
    
                    await self.ai_client.close()
    
                
    
                # Flush metrics
    
                await self.metrics_collector.flush()
    
                
    
                logger.info("MCP server shutdown complete")
    
                
    
            except Exception as e:
    
                logger.error(f"Error during shutdown: {e}")
    
        
    
        async def health_check(self) -> Dict[str, str]:
    
            """Verify server health status."""
    
            status = {}
    
            
    
            # Check database connection
    
            try:
    
                async with self.db_pool.acquire() as conn:
    
                    await conn.fetchval("SELECT 1")
    
                status["database"] = "healthy"
    
            except Exception as e:
    
                status["database"] = f"unhealthy: {e}"
    
            
    
            # Check AI service
    
            try:
    
                await self.ai_client.health_check()
    
                status["ai_service"] = "healthy"
    
            except Exception as e:
    
                status["ai_service"] = f"unhealthy: {e}"
    
            
    
            return status
    
    

    πŸ›‘οΈ 였λ₯˜ 처리 및 볡원λ ₯ νŒ¨ν„΄

    κ²¬κ³ ν•œ 였λ₯˜ μ²˜λ¦¬λŠ” MCP μ„œλ²„μ˜ μ•ˆμ •μ μΈ μš΄μ˜μ„ 보μž₯ν•©λ‹ˆλ‹€:

    계측적 였λ₯˜ μœ ν˜•

    
    class MCPError(Exception):
    
        """Base MCP server error."""
    
        def __init__(self, message: str, error_code: str = "MCP_ERROR"):
    
            self.message = message
    
            self.error_code = error_code
    
            super().__init__(message)
    
    
    
    class DatabaseError(MCPError):
    
        """Database operation errors."""
    
        def __init__(self, message: str, query: str = None):
    
            super().__init__(message, "DATABASE_ERROR")
    
            self.query = query
    
    
    
    class AuthorizationError(MCPError):
    
        """Access control errors."""
    
        def __init__(self, message: str, user_id: str = None):
    
            super().__init__(message, "AUTHORIZATION_ERROR")
    
            self.user_id = user_id
    
    
    
    class QueryTimeoutError(DatabaseError):
    
        """Query execution timeout."""
    
        def __init__(self, query: str):
    
            super().__init__(f"Query timeout: {query[:100]}...", query)
    
            self.error_code = "QUERY_TIMEOUT"
    
    
    
    class ValidationError(MCPError):
    
        """Input validation errors."""
    
        def __init__(self, field: str, value: Any, constraint: str):
    
            message = f"Validation failed for {field}: {constraint}"
    
            super().__init__(message, "VALIDATION_ERROR")
    
            self.field = field
    
            self.value = value
    
    

    였λ₯˜ 처리 미듀웨어

    
    @contextmanager
    
    async def error_handling_context(operation_name: str, user_id: str = None):
    
        """Centralized error handling for operations."""
    
        start_time = time.time()
    
        
    
        try:
    
            yield
    
            
    
            # Success metrics
    
            duration = time.time() - start_time
    
            metrics.operation_success.labels(operation=operation_name).inc()
    
            metrics.operation_duration.labels(operation=operation_name).observe(duration)
    
            
    
        except ValidationError as e:
    
            logger.warning(f"Validation error in {operation_name}: {e.message}", extra={
    
                "operation": operation_name,
    
                "user_id": user_id,
    
                "error_type": "validation",
    
                "field": e.field
    
            })
    
            metrics.operation_error.labels(operation=operation_name, type="validation").inc()
    
            raise
    
            
    
        except AuthorizationError as e:
    
            logger.warning(f"Authorization error in {operation_name}: {e.message}", extra={
    
                "operation": operation_name,
    
                "user_id": user_id,
    
                "error_type": "authorization"
    
            })
    
            metrics.operation_error.labels(operation=operation_name, type="authorization").inc()
    
            raise
    
            
    
        except DatabaseError as e:
    
            logger.error(f"Database error in {operation_name}: {e.message}", extra={
    
                "operation": operation_name,
    
                "user_id": user_id,
    
                "error_type": "database",
    
                "query": e.query[:100] if e.query else None
    
            })
    
            metrics.operation_error.labels(operation=operation_name, type="database").inc()
    
            raise
    
            
    
        except Exception as e:
    
            logger.error(f"Unexpected error in {operation_name}: {str(e)}", extra={
    
                "operation": operation_name,
    
                "user_id": user_id,
    
                "error_type": "unexpected"
    
            }, exc_info=True)
    
            metrics.operation_error.labels(operation=operation_name, type="unexpected").inc()
    
            raise MCPError(f"Internal server error in {operation_name}")
    
    

    πŸ“Š μ„±λŠ₯ μ΅œμ ν™” μ „λž΅

    쿼리 μ„±λŠ₯ λͺ¨λ‹ˆν„°λ§

    
    class QueryPerformanceMonitor:
    
        """Monitor and optimize query performance."""
    
        
    
        def __init__(self):
    
            self.slow_query_threshold = 1.0  # seconds
    
            self.query_stats = defaultdict(list)
    
        
    
        @contextmanager
    
        async def monitor_query(self, query: str, operation_type: str = "unknown"):
    
            """Monitor query execution time and performance."""
    
            start_time = time.time()
    
            query_hash = hashlib.md5(query.encode()).hexdigest()[:8]
    
            
    
            try:
    
                yield
    
                
    
                duration = time.time() - start_time
    
                
    
                # Record performance metrics
    
                self.query_stats[operation_type].append(duration)
    
                
    
                # Log slow queries
    
                if duration > self.slow_query_threshold:
    
                    logger.warning(f"Slow query detected", extra={
    
                        "query_hash": query_hash,
    
                        "duration": duration,
    
                        "operation_type": operation_type,
    
                        "query": query[:200]
    
                    })
    
                
    
                # Update metrics
    
                metrics.query_duration.labels(type=operation_type).observe(duration)
    
                
    
            except Exception as e:
    
                duration = time.time() - start_time
    
                logger.error(f"Query failed", extra={
    
                    "query_hash": query_hash,
    
                    "duration": duration,
    
                    "operation_type": operation_type,
    
                    "error": str(e)
    
                })
    
                raise
    
        
    
        def get_performance_summary(self) -> Dict[str, Any]:
    
            """Generate performance summary report."""
    
            summary = {}
    
            
    
            for operation_type, durations in self.query_stats.items():
    
                if durations:
    
                    summary[operation_type] = {
    
                        "count": len(durations),
    
                        "avg_duration": sum(durations) / len(durations),
    
                        "max_duration": max(durations),
    
                        "min_duration": min(durations),
    
                        "slow_queries": len([d for d in durations if d > self.slow_query_threshold])
    
                    }
    
            
    
            return summary
    
    

    캐싱 μ „λž΅

    
    class QueryCache:
    
        """Intelligent query result caching."""
    
        
    
        def __init__(self, redis_url: str = None):
    
            self.cache = {}  # In-memory fallback
    
            self.redis_client = redis.Redis.from_url(redis_url) if redis_url else None
    
            self.cache_ttl = 300  # 5 minutes default
    
        
    
        async def get_cached_result(
    
            self, 
    
            cache_key: str, 
    
            query_func: Callable,
    
            ttl: int = None
    
        ) -> Any:
    
            """Get result from cache or execute query."""
    
            ttl = ttl or self.cache_ttl
    
            
    
            # Try cache first
    
            cached_result = await self._get_from_cache(cache_key)
    
            if cached_result is not None:
    
                metrics.cache_hit.labels(type="query").inc()
    
                return cached_result
    
            
    
            # Execute query
    
            metrics.cache_miss.labels(type="query").inc()
    
            result = await query_func()
    
            
    
            # Cache result
    
            await self._set_in_cache(cache_key, result, ttl)
    
            
    
            return result
    
        
    
        def _generate_cache_key(self, query: str, user_context: str) -> str:
    
            """Generate consistent cache key."""
    
            key_data = f"{query}:{user_context}"
    
            return hashlib.sha256(key_data.encode()).hexdigest()
    
    

    🎯 μ£Όμš” μš”μ 

    이 μ‹€μŠ΅μ„ μ™„λ£Œν•œ ν›„, λ‹€μŒμ„ 이해할 수 μžˆμ–΄μ•Ό ν•©λ‹ˆλ‹€:

    βœ… κ³„μΈ΅ν™”λœ μ•„ν‚€ν…μ²˜: MCP μ„œλ²„ μ„€κ³„μ—μ„œ 관심사λ₯Ό λΆ„λ¦¬ν•˜λŠ” 방법

    βœ… λ°μ΄ν„°λ² μ΄μŠ€ νŒ¨ν„΄: 닀쀑 ν…Œλ„ŒνŠΈ μŠ€ν‚€λ§ˆ 섀계 및 RLS κ΅¬ν˜„

    βœ… μ—°κ²° 관리: 효율적인 풀링 및 λ¦¬μ†ŒμŠ€ 라이프사이클

    βœ… 였λ₯˜ 처리: 계측적 였λ₯˜ μœ ν˜• 및 볡원λ ₯ νŒ¨ν„΄

    βœ… μ„±λŠ₯ μ΅œμ ν™”: λͺ¨λ‹ˆν„°λ§, 캐싱 및 쿼리 μ΅œμ ν™”

    βœ… ν”„λ‘œλ•μ…˜ μ€€λΉ„: 인프라 관심사 및 운영 νŒ¨ν„΄

    πŸš€ λ‹€μŒ 단계

    Lab 02: λ³΄μ•ˆ 및 닀쀑 ν…Œλ„ŒνŠΈλ‘œ 계속 μ§„ν–‰ν•˜μ—¬ λ‹€μŒμ„ μ‹¬μΈ΅μ μœΌλ‘œ νƒκ΅¬ν•˜μ„Έμš”:

  • ν–‰ μˆ˜μ€€ λ³΄μ•ˆ κ΅¬ν˜„ 세뢀사항
  • 인증 및 κΆŒν•œ λΆ€μ—¬ νŒ¨ν„΄
  • 닀쀑 ν…Œλ„ŒνŠΈ 데이터 격리 μ „λž΅
  • λ³΄μ•ˆ 감사 및 μ€€μˆ˜ κ³ λ € 사항
  • πŸ“š μΆ”κ°€ 자료

    μ•„ν‚€ν…μ²˜ νŒ¨ν„΄

  • Pythonμ—μ„œμ˜ 클린 μ•„ν‚€ν…μ²˜ - Python μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ„ μœ„ν•œ μ•„ν‚€ν…μ²˜ νŒ¨ν„΄
  • λ°μ΄ν„°λ² μ΄μŠ€ 섀계 νŒ¨ν„΄ - κ΄€κ³„ν˜• λ°μ΄ν„°λ² μ΄μŠ€ 섀계 원칙
  • λ§ˆμ΄ν¬λ‘œμ„œλΉ„μŠ€ νŒ¨ν„΄ - μ„œλΉ„μŠ€ μ•„ν‚€ν…μ²˜ νŒ¨ν„΄
  • PostgreSQL κ³ κΈ‰ 주제

  • PostgreSQL μ„±λŠ₯ νŠœλ‹ - λ°μ΄ν„°λ² μ΄μŠ€ μ΅œμ ν™” κ°€μ΄λ“œ
  • μ—°κ²° 풀링 λͺ¨λ²” 사둀 - μ—°κ²° 관리
  • 쿼리 κ³„νš 및 μ΅œμ ν™” - 쿼리 μ„±λŠ₯
  • Python 비동기 νŒ¨ν„΄

  • AsyncIO λͺ¨λ²” 사둀 - 비동기 ν”„λ‘œκ·Έλž˜λ° νŒ¨ν„΄
  • FastAPI μ•„ν‚€ν…μ²˜ - ν˜„λŒ€μ μΈ Python μ›Ή μ•„ν‚€ν…μ²˜
  • Pydantic λͺ¨λΈ - 데이터 μœ νš¨μ„± 검사 및 직렬화
  • ---

    λ‹€μŒ: λ³΄μ•ˆ νŒ¨ν„΄μ„ 탐ꡬ할 μ€€λΉ„κ°€ λ˜μ…¨λ‚˜μš”? Lab 02: λ³΄μ•ˆ 및 닀쀑 ν…Œλ„ŒνŠΈλ‘œ 계속 μ§„ν–‰ν•˜μ„Έμš”.

    ---

    λ©΄μ±… μ‘°ν•­:

    이 λ¬Έμ„œλŠ” AI λ²ˆμ—­ μ„œλΉ„μŠ€ Co-op Translatorλ₯Ό μ‚¬μš©ν•˜μ—¬ λ²ˆμ—­λ˜μ—ˆμŠ΅λ‹ˆλ‹€.

    정확성을 μœ„ν•΄ μ΅œμ„ μ„ λ‹€ν•˜κ³  μžˆμœΌλ‚˜, μžλ™ λ²ˆμ—­μ—λŠ” 였λ₯˜λ‚˜ 뢀정확성이 포함될 수 μžˆμŒμ„ μœ μ˜ν•˜μ‹œκΈ° λ°”λžλ‹ˆλ‹€.

    원본 λ¬Έμ„œμ˜ 원어 버전이 κΆŒμœ„ μžˆλŠ” 좜처둜 κ°„μ£Όλ˜μ–΄μ•Ό ν•©λ‹ˆλ‹€.

    μ€‘μš”ν•œ μ •λ³΄μ˜ 경우, 전문적인 인간 λ²ˆμ—­μ„ ꢌμž₯ν•©λ‹ˆλ‹€.

    이 λ²ˆμ—­ μ‚¬μš©μœΌλ‘œ 인해 λ°œμƒν•˜λŠ” μ˜€ν•΄λ‚˜ 잘λͺ»λœ 해석에 λŒ€ν•΄ λ‹Ήμ‚¬λŠ” μ±…μž„μ„ μ§€μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

    MCP Academy — microsoft/mcp-for-beginners