05 - MCP Server Implementation

Module
PostgreSQL
Progress
46%

MCP Server Implementation

🎯 What This Lab Covers

This hands-on lab guides you through implementing a production-ready MCP server using FastMCP framework.

You'll build the core server structure, implement database integration, create tools for data access, and establish the foundation for AI-powered retail analytics.

Overview

The MCP server is the heart of our retail analytics solution. It acts as a bridge between AI assistants and the PostgreSQL database, providing secure, intelligent access to business data through a standardized protocol.

This lab teaches you to build a robust, scalable MCP server following enterprise patterns and best practices.

Learning Objectives

By the end of this lab, you will be able to:

  • Build a FastMCP server with proper architecture and organization
  • Implement database integration with connection pooling and error handling
  • Create MCP tools for database schema introspection and query execution
  • Configure Row Level Security context management
  • Add health monitoring and observability features
  • Test your MCP server implementation locally and with VS Code
  • πŸ“ Project Structure

    Let's examine the MCP server organization:

    
    mcp_server/
    
    β”œβ”€β”€ __init__.py                 # Package initialization
    
    β”œβ”€β”€ config.py                   # Configuration management
    
    β”œβ”€β”€ health_check.py             # Health monitoring endpoints
    
    β”œβ”€β”€ sales_analysis.py           # Main MCP server implementation
    
    β”œβ”€β”€ sales_analysis_postgres.py  # Database integration layer
    
    └── sales_analysis_text_embeddings.py  # AI/semantic search integration
    
    

    πŸ”§ Configuration Management

    Environment Configuration (config.py)

    First, let's create a robust configuration system:

    
    # mcp_server/config.py
    
    """
    
    Configuration management for the MCP server.
    
    Handles environment variables, validation, and defaults.
    
    """
    
    import os
    
    import logging
    
    from typing import Optional, Dict, Any
    
    from dataclasses import dataclass
    
    from dotenv import load_dotenv
    
    
    
    # Load environment variables from .env file
    
    load_dotenv()
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    @dataclass
    
    class DatabaseConfig:
    
        """Database connection configuration."""
    
        host: str
    
        port: int
    
        database: str
    
        user: str
    
        password: str
    
        min_connections: int = 2
    
        max_connections: int = 10
    
        command_timeout: int = 30
    
        
    
        @classmethod
    
        def from_env(cls) -> 'DatabaseConfig':
    
            """Create configuration from environment variables."""
    
            return cls(
    
                host=os.getenv('POSTGRES_HOST', 'localhost'),
    
                port=int(os.getenv('POSTGRES_PORT', '5432')),
    
                database=os.getenv('POSTGRES_DB', 'zava'),
    
                user=os.getenv('POSTGRES_USER', 'postgres'),
    
                password=os.getenv('POSTGRES_PASSWORD', ''),
    
                min_connections=int(os.getenv('POSTGRES_MIN_CONNECTIONS', '2')),
    
                max_connections=int(os.getenv('POSTGRES_MAX_CONNECTIONS', '10')),
    
                command_timeout=int(os.getenv('POSTGRES_COMMAND_TIMEOUT', '30'))
    
            )
    
        
    
        def to_asyncpg_params(self) -> Dict[str, Any]:
    
            """Convert to asyncpg connection parameters."""
    
            return {
    
                'host': self.host,
    
                'port': self.port,
    
                'database': self.database,
    
                'user': self.user,
    
                'password': self.password,
    
                'command_timeout': self.command_timeout,
    
                'server_settings': {
    
                    'application_name': 'zava-mcp-server',
    
                    'jit': 'off',  # Disable JIT for stability
    
                    'work_mem': '4MB',
    
                    'statement_timeout': f'{self.command_timeout}s'
    
                }
    
            }
    
    
    
    @dataclass
    
    class AzureConfig:
    
        """Azure AI services configuration."""
    
        project_endpoint: str
    
        openai_endpoint: str
    
        embedding_model_deployment: str
    
        client_id: str
    
        client_secret: str
    
        tenant_id: str
    
        
    
        @classmethod
    
        def from_env(cls) -> 'AzureConfig':
    
            """Create configuration from environment variables."""
    
            return cls(
    
                project_endpoint=os.getenv('PROJECT_ENDPOINT', ''),
    
                openai_endpoint=os.getenv('AZURE_OPENAI_ENDPOINT', ''),
    
                embedding_model_deployment=os.getenv('EMBEDDING_MODEL_DEPLOYMENT_NAME', 'text-embedding-3-small'),
    
                client_id=os.getenv('AZURE_CLIENT_ID', ''),
    
                client_secret=os.getenv('AZURE_CLIENT_SECRET', ''),
    
                tenant_id=os.getenv('AZURE_TENANT_ID', '')
    
            )
    
        
    
        def is_configured(self) -> bool:
    
            """Check if all required Azure configuration is present."""
    
            return all([
    
                self.project_endpoint,
    
                self.openai_endpoint,
    
                self.client_id,
    
                self.client_secret,
    
                self.tenant_id
    
            ])
    
    
    
    @dataclass
    
    class ServerConfig:
    
        """MCP server configuration."""
    
        host: str = '0.0.0.0'
    
        port: int = 8000
    
        log_level: str = 'INFO'
    
        enable_cors: bool = True
    
        enable_health_check: bool = True
    
        applicationinsights_connection_string: Optional[str] = None
    
        
    
        @classmethod
    
        def from_env(cls) -> 'ServerConfig':
    
            """Create configuration from environment variables."""
    
            return cls(
    
                host=os.getenv('MCP_SERVER_HOST', '0.0.0.0'),
    
                port=int(os.getenv('MCP_SERVER_PORT', '8000')),
    
                log_level=os.getenv('LOG_LEVEL', 'INFO').upper(),
    
                enable_cors=os.getenv('ENABLE_CORS', 'true').lower() == 'true',
    
                enable_health_check=os.getenv('ENABLE_HEALTH_CHECK', 'true').lower() == 'true',
    
                applicationinsights_connection_string=os.getenv('APPLICATIONINSIGHTS_CONNECTION_STRING')
    
            )
    
    
    
    class MCPServerConfig:
    
        """Main configuration class for the MCP server."""
    
        
    
        def __init__(self):
    
            self.database = DatabaseConfig.from_env()
    
            self.azure = AzureConfig.from_env()
    
            self.server = ServerConfig.from_env()
    
            
    
            # Validate configuration
    
            self._validate_config()
    
        
    
        def _validate_config(self):
    
            """Validate configuration and log warnings for missing values."""
    
            if not self.database.password:
    
                logger.warning("Database password is empty. This may cause connection issues.")
    
            
    
            if not self.azure.is_configured():
    
                logger.warning("Azure configuration is incomplete. AI features may not work.")
    
            
    
            logger.info(f"Configuration loaded - Database: {self.database.host}:{self.database.port}")
    
            logger.info(f"Server will run on {self.server.host}:{self.server.port}")
    
    
    
    # Global configuration instance
    
    config = MCPServerConfig()
    
    

    Key Configuration Features

  • Environment Variable Loading: Automatic .env file support
  • Type Safety: Dataclass validation and type hints
  • Flexible Defaults: Sensible defaults for development
  • Validation: Configuration validation with helpful error messages
  • Security: Sensitive values only from environment variables
  • πŸ—„οΈ Database Integration Layer

    PostgreSQL Provider (sales_analysis_postgres.py)

    Let's implement the database integration layer:

    
    # mcp_server/sales_analysis_postgres.py
    
    """
    
    PostgreSQL database integration for MCP server.
    
    Handles connections, queries, and schema introspection.
    
    """
    
    import asyncio
    
    import asyncpg
    
    import logging
    
    from typing import Dict, Any, List, Optional, Tuple
    
    from contextlib import asynccontextmanager
    
    from datetime import datetime
    
    import json
    
    
    
    from .config import config
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    class PostgreSQLSchemaProvider:
    
        """Provides PostgreSQL database access and schema information."""
    
        
    
        def __init__(self):
    
            self.connection_pool: Optional[asyncpg.Pool] = None
    
            self.postgres_config = config.database.to_asyncpg_params()
    
            
    
        async def create_pool(self) -> None:
    
            """Create connection pool for database operations."""
    
            if self.connection_pool is None:
    
                try:
    
                    self.connection_pool = await asyncpg.create_pool(
    
                        **self.postgres_config,
    
                        min_size=config.database.min_connections,
    
                        max_size=config.database.max_connections,
    
                        max_inactive_connection_lifetime=300  # 5 minutes
    
                    )
    
                    logger.info("Database connection pool created successfully")
    
                except Exception as e:
    
                    logger.error(f"Failed to create database connection pool: {e}")
    
                    raise
    
        
    
        async def close_pool(self) -> None:
    
            """Close the connection pool."""
    
            if self.connection_pool:
    
                await self.connection_pool.close()
    
                self.connection_pool = None
    
                logger.info("Database connection pool closed")
    
        
    
        @asynccontextmanager
    
        async def get_connection(self):
    
            """Get a database connection from the pool."""
    
            if not self.connection_pool:
    
                await self.create_pool()
    
            
    
            async with self.connection_pool.acquire() as connection:
    
                yield connection
    
        
    
        async def set_rls_context(self, connection: asyncpg.Connection, rls_user_id: str) -> None:
    
            """Set Row Level Security context for the connection."""
    
            try:
    
                await connection.execute(
    
                    "SELECT set_config('app.current_rls_user_id', $1, false)",
    
                    rls_user_id
    
                )
    
                logger.debug(f"RLS context set for user: {rls_user_id}")
    
            except Exception as e:
    
                logger.error(f"Failed to set RLS context: {e}")
    
                raise
    
        
    
        async def get_table_schema(self, table_name: str, rls_user_id: str) -> Dict[str, Any]:
    
            """Get detailed schema information for a specific table."""
    
            async with self.get_connection() as conn:
    
                await self.set_rls_context(conn, rls_user_id)
    
                
    
                # Parse schema and table name
    
                if '.' in table_name:
    
                    schema_name, table_name = table_name.split('.', 1)
    
                else:
    
                    schema_name = 'retail'  # Default schema
    
                
    
                # Get column information
    
                columns_query = """
    
                    SELECT 
    
                        column_name,
    
                        data_type,
    
                        is_nullable,
    
                        column_default,
    
                        character_maximum_length,
    
                        numeric_precision,
    
                        numeric_scale,
    
                        ordinal_position
    
                    FROM information_schema.columns 
    
                    WHERE table_schema = $1 AND table_name = $2
    
                    ORDER BY ordinal_position
    
                """
    
                
    
                columns = await conn.fetch(columns_query, schema_name, table_name)
    
                
    
                if not columns:
    
                    raise ValueError(f"Table {schema_name}.{table_name} not found or not accessible")
    
                
    
                # Get foreign key relationships
    
                fk_query = """
    
                    SELECT 
    
                        kcu.column_name,
    
                        ccu.table_schema AS foreign_table_schema,
    
                        ccu.table_name AS foreign_table_name,
    
                        ccu.column_name AS foreign_column_name
    
                    FROM information_schema.table_constraints tc
    
                    JOIN information_schema.key_column_usage kcu 
    
                        ON tc.constraint_name = kcu.constraint_name
    
                    JOIN information_schema.constraint_column_usage ccu 
    
                        ON ccu.constraint_name = tc.constraint_name
    
                    WHERE tc.constraint_type = 'FOREIGN KEY' 
    
                        AND tc.table_schema = $1 
    
                        AND tc.table_name = $2
    
                """
    
                
    
                foreign_keys = await conn.fetch(fk_query, schema_name, table_name)
    
                
    
                # Get indexes
    
                index_query = """
    
                    SELECT 
    
                        indexname,
    
                        indexdef
    
                    FROM pg_indexes 
    
                    WHERE schemaname = $1 AND tablename = $2
    
                """
    
                
    
                indexes = await conn.fetch(index_query, schema_name, table_name)
    
                
    
                # Format schema information
    
                schema_info = {
    
                    "table_name": f"{schema_name}.{table_name}",
    
                    "columns": [
    
                        {
    
                            "name": col["column_name"],
    
                            "type": col["data_type"],
    
                            "nullable": col["is_nullable"] == "YES",
    
                            "default": col["column_default"],
    
                            "max_length": col["character_maximum_length"],
    
                            "precision": col["numeric_precision"],
    
                            "scale": col["numeric_scale"],
    
                            "position": col["ordinal_position"]
    
                        }
    
                        for col in columns
    
                    ],
    
                    "foreign_keys": [
    
                        {
    
                            "column": fk["column_name"],
    
                            "references": f"{fk['foreign_table_schema']}.{fk['foreign_table_name']}.{fk['foreign_column_name']}"
    
                        }
    
                        for fk in foreign_keys
    
                    ],
    
                    "indexes": [
    
                        {
    
                            "name": idx["indexname"],
    
                            "definition": idx["indexdef"]
    
                        }
    
                        for idx in indexes
    
                    ]
    
                }
    
                
    
                return schema_info
    
        
    
        async def get_multiple_table_schemas(
    
            self, 
    
            table_names: List[str], 
    
            rls_user_id: str
    
        ) -> str:
    
            """Get schema information for multiple tables."""
    
            schemas = []
    
            
    
            for table_name in table_names:
    
                try:
    
                    schema = await self.get_table_schema(table_name, rls_user_id)
    
                    schemas.append(self._format_schema_for_ai(schema))
    
                except Exception as e:
    
                    logger.warning(f"Failed to get schema for {table_name}: {e}")
    
                    schemas.append(f"Error retrieving schema for {table_name}: {str(e)}")
    
            
    
            return "\n\n".join(schemas)
    
        
    
        def _format_schema_for_ai(self, schema: Dict[str, Any]) -> str:
    
            """Format schema information for AI consumption."""
    
            table_name = schema["table_name"]
    
            columns = schema["columns"]
    
            foreign_keys = schema["foreign_keys"]
    
            
    
            # Create column definitions
    
            column_lines = []
    
            for col in columns:
    
                nullable = "NULL" if col["nullable"] else "NOT NULL"
    
                type_info = col["type"]
    
                
    
                if col["max_length"]:
    
                    type_info += f"({col['max_length']})"
    
                elif col["precision"] and col["scale"]:
    
                    type_info += f"({col['precision']},{col['scale']})"
    
                
    
                default_info = f" DEFAULT {col['default']}" if col["default"] else ""
    
                
    
                column_lines.append(f"  {col['name']} {type_info} {nullable}{default_info}")
    
            
    
            # Create foreign key information
    
            fk_lines = []
    
            for fk in foreign_keys:
    
                fk_lines.append(f"  {fk['column']} -> {fk['references']}")
    
            
    
            # Combine into readable format
    
            schema_text = f"Table: {table_name}\n"
    
            schema_text += "Columns:\n" + "\n".join(column_lines)
    
            
    
            if fk_lines:
    
                schema_text += "\n\nForeign Keys:\n" + "\n".join(fk_lines)
    
            
    
            return schema_text
    
        
    
        async def execute_query(
    
            self, 
    
            sql_query: str, 
    
            rls_user_id: str,
    
            max_rows: int = 20
    
        ) -> str:
    
            """Execute a SQL query with Row Level Security context."""
    
            async with self.get_connection() as conn:
    
                await self.set_rls_context(conn, rls_user_id)
    
                
    
                try:
    
                    # Set a query timeout
    
                    rows = await asyncio.wait_for(
    
                        conn.fetch(sql_query),
    
                        timeout=config.database.command_timeout
    
                    )
    
                    
    
                    if not rows:
    
                        return "Query executed successfully. No rows returned."
    
                    
    
                    # Limit result set size
    
                    limited_rows = rows[:max_rows]
    
                    
    
                    # Format results
    
                    result = self._format_query_results(limited_rows, len(rows), max_rows)
    
                    
    
                    logger.info(f"Query executed successfully. Returned {len(limited_rows)} rows.")
    
                    return result
    
                    
    
                except asyncio.TimeoutError:
    
                    error_msg = f"Query timeout after {config.database.command_timeout} seconds"
    
                    logger.error(error_msg)
    
                    raise Exception(error_msg)
    
                except Exception as e:
    
                    logger.error(f"Query execution failed: {e}")
    
                    raise
    
        
    
        def _format_query_results(
    
            self, 
    
            rows: List[asyncpg.Record], 
    
            total_rows: int,
    
            max_rows: int
    
        ) -> str:
    
            """Format query results for AI consumption."""
    
            if not rows:
    
                return "No results found."
    
            
    
            # Get column names
    
            columns = list(rows[0].keys())
    
            
    
            # Create header
    
            result_lines = [f"Results ({len(rows)} of {total_rows} rows):"]
    
            result_lines.append("=" * 50)
    
            
    
            # Add column headers
    
            header = " | ".join(columns)
    
            result_lines.append(header)
    
            result_lines.append("-" * len(header))
    
            
    
            # Add data rows
    
            for row in rows:
    
                formatted_values = []
    
                for col in columns:
    
                    value = row[col]
    
                    if value is None:
    
                        formatted_values.append("NULL")
    
                    elif isinstance(value, datetime):
    
                        formatted_values.append(value.strftime("%Y-%m-%d %H:%M:%S"))
    
                    elif isinstance(value, (dict, list)):
    
                        formatted_values.append(json.dumps(value))
    
                    else:
    
                        formatted_values.append(str(value))
    
                
    
                result_lines.append(" | ".join(formatted_values))
    
            
    
            # Add truncation notice if needed
    
            if total_rows > max_rows:
    
                result_lines.append(f"\n... and {total_rows - max_rows} more rows (truncated for display)")
    
            
    
            return "\n".join(result_lines)
    
        
    
        async def get_current_utc_date(self) -> str:
    
            """Get current UTC date/time."""
    
            async with self.get_connection() as conn:
    
                result = await conn.fetchval("SELECT NOW() AT TIME ZONE 'UTC'")
    
                return result.isoformat() + "Z"
    
        
    
        async def health_check(self) -> Dict[str, Any]:
    
            """Perform database health check."""
    
            try:
    
                async with self.get_connection() as conn:
    
                    # Simple connectivity test
    
                    result = await conn.fetchval("SELECT 1")
    
                    
    
                    # Check pool status
    
                    pool_info = {
    
                        "min_size": self.connection_pool._minsize if self.connection_pool else 0,
    
                        "max_size": self.connection_pool._maxsize if self.connection_pool else 0,
    
                        "current_size": self.connection_pool.get_size() if self.connection_pool else 0,
    
                        "idle_size": self.connection_pool.get_idle_size() if self.connection_pool else 0
    
                    }
    
                    
    
                    return {
    
                        "status": "healthy",
    
                        "database_responsive": result == 1,
    
                        "pool_info": pool_info
    
                    }
    
                    
    
            except Exception as e:
    
                return {
    
                    "status": "unhealthy",
    
                    "error": str(e)
    
                }
    
    
    
    # Global database provider instance
    
    db_provider = PostgreSQLSchemaProvider()
    
    

    Key Database Layer Features

  • Connection Pooling: Efficient resource management with asyncpg
  • RLS Integration: Automatic Row Level Security context setting
  • Schema Introspection: Dynamic table schema discovery
  • Error Handling: Comprehensive error management and logging
  • Query Formatting: AI-friendly result formatting
  • Health Monitoring: Database connectivity and pool status checks
  • πŸ”§ Main MCP Server Implementation

    FastMCP Server (sales_analysis.py)

    Now let's implement the main MCP server:

    
    # mcp_server/sales_analysis.py
    
    """
    
    Main MCP server implementation for Zava Retail Sales Analysis.
    
    Provides AI assistants with secure access to retail database.
    
    """
    
    import logging
    
    import asyncio
    
    from typing import Dict, Any, List, Annotated
    
    from contextlib import asynccontextmanager
    
    
    
    from fastmcp import FastMCP, Context
    
    from pydantic import Field
    
    
    
    from .config import config
    
    from .sales_analysis_postgres import db_provider
    
    from .health_check import setup_health_endpoints
    
    
    
    # Configure logging
    
    logging.basicConfig(
    
        level=getattr(logging, config.server.log_level),
    
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    
    )
    
    logger = logging.getLogger(__name__)
    
    
    
    # Create FastMCP server instance
    
    mcp = FastMCP("Zava Retail Sales Analysis")
    
    
    
    # List of valid tables for schema access
    
    VALID_TABLES = [
    
        "retail.stores",
    
        "retail.customers", 
    
        "retail.categories",
    
        "retail.product_types",
    
        "retail.products",
    
        "retail.orders",
    
        "retail.order_items",
    
        "retail.inventory"
    
    ]
    
    
    
    def get_rls_user_id(ctx: Context) -> str:
    
        """Extract Row Level Security User ID from request context."""
    
        # In HTTP mode, get from headers
    
        if hasattr(ctx, 'headers') and ctx.headers:
    
            rls_user_id = ctx.headers.get("x-rls-user-id")
    
            if rls_user_id:
    
                logger.debug(f"RLS User ID from headers: {rls_user_id}")
    
                return rls_user_id
    
        
    
        # Default fallback for development/testing
    
        default_id = "00000000-0000-0000-0000-000000000000"
    
        logger.warning(f"No RLS User ID found, using default: {default_id}")
    
        return default_id
    
    
    
    @mcp.tool()
    
    async def get_multiple_table_schemas(
    
        ctx: Context,
    
        table_names: Annotated[List[str], Field(description="List of table names to retrieve schemas for. Valid tables: " + ", ".join(VALID_TABLES))]
    
    ) -> str:
    
        """
    
        Retrieve database schemas for multiple tables in a single request.
    
        
    
        This tool provides comprehensive schema information including:
    
        - Column names, types, and constraints
    
        - Foreign key relationships
    
        - Index information
    
        - Table structure for AI query planning
    
        
    
        Args:
    
            table_names: List of valid table names from the retail schema
    
            
    
        Returns:
    
            Formatted schema information for all requested tables
    
        """
    
        rls_user_id = get_rls_user_id(ctx)
    
        
    
        # Validate table names
    
        invalid_tables = [table for table in table_names if table not in VALID_TABLES]
    
        if invalid_tables:
    
            logger.warning(f"Invalid table names requested: {invalid_tables}")
    
            return f"Error: Invalid table names: {', '.join(invalid_tables)}. Valid tables are: {', '.join(VALID_TABLES)}"
    
        
    
        try:
    
            logger.info(f"Retrieving schemas for tables: {table_names} (User: {rls_user_id})")
    
            result = await db_provider.get_multiple_table_schemas(table_names, rls_user_id)
    
            return result
    
        except Exception as e:
    
            logger.error(f"Error retrieving table schemas: {e}")
    
            return f"Error retrieving table schemas: {e!s}"
    
    
    
    @mcp.tool()
    
    async def execute_sales_query(
    
        ctx: Context,
    
        postgresql_query: Annotated[str, Field(description="A well-formed PostgreSQL query to execute against the retail database. Always get table schemas first before writing queries.")]
    
    ) -> str:
    
        """
    
        Execute PostgreSQL queries against the retail sales database with Row Level Security.
    
        
    
        This tool allows AI assistants to run analytical queries on retail data including:
    
        - Sales performance analysis
    
        - Customer behavior insights  
    
        - Inventory management queries
    
        - Product performance metrics
    
        - Store-specific reporting
    
        
    
        Important: Row Level Security ensures users only see data they're authorized to access.
    
        
    
        Args:
    
            postgresql_query: SQL query to execute (automatically filtered by RLS)
    
            
    
        Returns:
    
            Query results formatted for AI analysis (limited to 20 rows for readability)
    
        """
    
        rls_user_id = get_rls_user_id(ctx)
    
        
    
        try:
    
            logger.info(f"Executing query for user: {rls_user_id}")
    
            logger.debug(f"Query: {postgresql_query[:100]}...")
    
            
    
            result = await db_provider.execute_query(postgresql_query, rls_user_id)
    
            return result
    
        except Exception as e:
    
            logger.error(f"Error executing database query: {e}")
    
            return f"Error executing database query: {e!s}"
    
    
    
    @mcp.tool()
    
    async def get_current_utc_date(ctx: Context) -> str:
    
        """
    
        Get the current UTC date and time in ISO format.
    
        
    
        Useful for time-sensitive queries and date-based analysis.
    
        
    
        Returns:
    
            Current UTC date/time in ISO format (YYYY-MM-DDTHH:MM:SS.fffffZ)
    
        """
    
        try:
    
            result = await db_provider.get_current_utc_date()
    
            logger.debug(f"Current UTC date retrieved: {result}")
    
            return result
    
        except Exception as e:
    
            logger.error(f"Error getting current UTC date: {e}")
    
            return f"Error getting current UTC date: {e!s}"
    
    
    
    # Application lifecycle management
    
    @asynccontextmanager
    
    async def lifespan(app):
    
        """Manage application startup and shutdown."""
    
        logger.info("Starting Zava Retail MCP Server...")
    
        
    
        try:
    
            # Initialize database connection pool
    
            await db_provider.create_pool()
    
            logger.info("Database connection pool initialized")
    
            
    
            # Test database connectivity
    
            health_status = await db_provider.health_check()
    
            if health_status["status"] != "healthy":
    
                logger.error(f"Database health check failed: {health_status}")
    
                raise Exception("Database not healthy")
    
            
    
            logger.info("MCP Server startup complete")
    
            yield
    
            
    
        except Exception as e:
    
            logger.error(f"Startup failed: {e}")
    
            raise
    
        finally:
    
            # Cleanup
    
            logger.info("Shutting down MCP Server...")
    
            await db_provider.close_pool()
    
            logger.info("MCP Server shutdown complete")
    
    
    
    # Configure server application
    
    def create_app():
    
        """Create and configure the MCP server application."""
    
        
    
        # Get the FastMCP app instance
    
        app = mcp.sse_app()
    
        
    
        # Set up lifecycle management
    
        app.router.lifespan_context = lifespan
    
        
    
        # Add health check endpoints if enabled
    
        if config.server.enable_health_check:
    
            setup_health_endpoints(app, db_provider)
    
        
    
        # Configure CORS if enabled
    
        if config.server.enable_cors:
    
            from fastapi.middleware.cors import CORSMiddleware
    
            app.add_middleware(
    
                CORSMiddleware,
    
                allow_origins=["*"],  # Configure appropriately for production
    
                allow_credentials=True,
    
                allow_methods=["*"],
    
                allow_headers=["*"],
    
            )
    
        
    
        logger.info(f"MCP Server configured - CORS: {config.server.enable_cors}, Health: {config.server.enable_health_check}")
    
        
    
        return app
    
    
    
    # Create the application instance
    
    app = create_app()
    
    
    
    # Main entry point for development
    
    if __name__ == "__main__":
    
        import uvicorn
    
        
    
        logger.info(f"Starting development server on {config.server.host}:{config.server.port}")
    
        
    
        uvicorn.run(
    
            "sales_analysis:app",
    
            host=config.server.host,
    
            port=config.server.port,
    
            reload=True,
    
            log_level=config.server.log_level.lower()
    
        )
    
    

    Key MCP Server Features

  • Tool Registration: Declarative tool definitions with type safety
  • RLS Context Management: Automatic user identity extraction and context setting
  • Error Handling: Comprehensive error management with user-friendly messages
  • Lifecycle Management: Proper startup/shutdown with resource cleanup
  • Health Monitoring: Built-in health check endpoints
  • Development Support: Hot reload and debugging capabilities
  • πŸ₯ Health Monitoring

    Health Check Implementation (health_check.py)

    
    # mcp_server/health_check.py
    
    """
    
    Health check endpoints for monitoring MCP server status.
    
    """
    
    import logging
    
    from typing import Dict, Any
    
    from fastapi import FastAPI, HTTPException
    
    from fastapi.responses import JSONResponse
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    def setup_health_endpoints(app: FastAPI, db_provider) -> None:
    
        """Add health check endpoints to the FastAPI application."""
    
        
    
        @app.get("/health")
    
        async def health_check() -> JSONResponse:
    
            """Basic health check endpoint."""
    
            return JSONResponse(
    
                status_code=200,
    
                content={
    
                    "status": "healthy",
    
                    "service": "zava-retail-mcp-server",
    
                    "timestamp": await db_provider.get_current_utc_date()
    
                }
    
            )
    
        
    
        @app.get("/health/detailed")
    
        async def detailed_health_check() -> JSONResponse:
    
            """Detailed health check including database connectivity."""
    
            health_status = {
    
                "service": "zava-retail-mcp-server",
    
                "status": "healthy",
    
                "components": {}
    
            }
    
            
    
            overall_healthy = True
    
            
    
            # Check database
    
            try:
    
                db_health = await db_provider.health_check()
    
                health_status["components"]["database"] = db_health
    
                
    
                if db_health["status"] != "healthy":
    
                    overall_healthy = False
    
                    
    
            except Exception as e:
    
                health_status["components"]["database"] = {
    
                    "status": "unhealthy",
    
                    "error": str(e)
    
                }
    
                overall_healthy = False
    
            
    
            # Update overall status
    
            if not overall_healthy:
    
                health_status["status"] = "unhealthy"
    
            
    
            status_code = 200 if overall_healthy else 503
    
            
    
            return JSONResponse(
    
                status_code=status_code,
    
                content=health_status
    
            )
    
        
    
        @app.get("/health/ready")
    
        async def readiness_check() -> JSONResponse:
    
            """Kubernetes readiness probe endpoint."""
    
            try:
    
                # Test critical functionality
    
                db_health = await db_provider.health_check()
    
                
    
                if db_health["status"] != "healthy":
    
                    raise HTTPException(status_code=503, detail="Database not ready")
    
                
    
                return JSONResponse(
    
                    status_code=200,
    
                    content={"status": "ready"}
    
                )
    
                
    
            except Exception as e:
    
                logger.error(f"Readiness check failed: {e}")
    
                raise HTTPException(status_code=503, detail="Service not ready")
    
        
    
        @app.get("/health/live")
    
        async def liveness_check() -> JSONResponse:
    
            """Kubernetes liveness probe endpoint."""
    
            return JSONResponse(
    
                status_code=200,
    
                content={"status": "alive"}
    
            )
    
        
    
        logger.info("Health check endpoints configured")
    
    

    πŸ§ͺ Testing Your MCP Server

    Local Testing

    1. Start the MCP Server:

    ```bash

    # Activate virtual environment

    source mcp-env/bin/activate # macOS/Linux

    # mcp-env\Scripts\activate # Windows

    # Start server

    cd mcp_server

    python sales_analysis.py

    ```

    2. Test Health Endpoints:

    ```bash

    # Basic health check

    curl http://localhost:8000/health

    # Detailed health check

    curl http://localhost:8000/health/detailed

    ```

    3. Test MCP Tools:

    ```bash

    # List available tools

    curl -X POST http://localhost:8000/mcp \

    -H "Content-Type: application/json" \

    -H "x-rls-user-id: 00000000-0000-0000-0000-000000000000" \

    -d '{"method": "tools/list", "params": {}}'

    # Get table schemas

    curl -X POST http://localhost:8000/mcp \

    -H "Content-Type: application/json" \

    -H "x-rls-user-id: 00000000-0000-0000-0000-000000000000" \

    -d '{

    "method": "tools/call",

    "params": {

    "name": "get_multiple_table_schemas",

    "arguments": {

    "table_names": ["retail.stores", "retail.products"]

    }

    }

    }'

    ```

    VS Code Integration Testing

    1. Configure VS Code MCP:

    ```json

    // .vscode/mcp.json

    {

    "servers": {

    "zava-retail-test": {

    "url": "http://127.0.0.1:8000/mcp",

    "type": "http",

    "headers": {"x-rls-user-id": "00000000-0000-0000-0000-000000000000"}

    }

    }

    }

    ```

    2. Test in AI Chat:

    - Open VS Code AI Chat

    - Type #zava and select your server

    - Ask: "What tables are available?"

    - Ask: "Show me the top 5 stores by number of orders"

    Unit Testing

    Create comprehensive unit tests:

    
    # tests/test_mcp_server.py
    
    import pytest
    
    import asyncio
    
    from mcp_server.sales_analysis_postgres import PostgreSQLSchemaProvider
    
    from mcp_server.config import config
    
    
    
    @pytest.mark.asyncio
    
    async def test_database_connection():
    
        """Test database connectivity."""
    
        db = PostgreSQLSchemaProvider()
    
        
    
        try:
    
            await db.create_pool()
    
            health = await db.health_check()
    
            assert health["status"] == "healthy"
    
        finally:
    
            await db.close_pool()
    
    
    
    @pytest.mark.asyncio
    
    async def test_table_schema_retrieval():
    
        """Test table schema retrieval."""
    
        db = PostgreSQLSchemaProvider()
    
        
    
        try:
    
            await db.create_pool()
    
            schema = await db.get_table_schema("retail.stores", "00000000-0000-0000-0000-000000000000")
    
            
    
            assert schema["table_name"] == "retail.stores"
    
            assert len(schema["columns"]) > 0
    
            
    
        finally:
    
            await db.close_pool()
    
    
    
    @pytest.mark.asyncio
    
    async def test_query_execution():
    
        """Test query execution with RLS."""
    
        db = PostgreSQLSchemaProvider()
    
        
    
        try:
    
            await db.create_pool()
    
            result = await db.execute_query(
    
                "SELECT COUNT(*) as store_count FROM retail.stores",
    
                "00000000-0000-0000-0000-000000000000"
    
            )
    
            
    
            assert "store_count" in result
    
            
    
        finally:
    
            await db.close_pool()
    
    

    🎯 Key Takeaways

    After completing this lab, you should have:

    βœ… Working MCP Server: FastMCP server with database integration

    βœ… Configuration Management: Robust environment-based configuration

    βœ… Database Layer: PostgreSQL integration with connection pooling

    βœ… MCP Tools: Schema introspection and query execution tools

    βœ… RLS Integration: Row Level Security context management

    βœ… Health Monitoring: Comprehensive health check endpoints

    βœ… Testing Strategy: Local testing and VS Code integration

    πŸš€ What's Next

    Continue with Lab 06: Tool Development to:

  • Expand your MCP tool collection
  • Implement advanced query patterns
  • Add data validation and transformation
  • Create specialized analytics tools
  • πŸ“š Additional Resources

    FastMCP Framework

  • FastMCP Documentation - Official FastMCP guide
  • MCP Specification - Protocol specification
  • Tool Development Guide - Creating MCP tools
  • Database Integration

  • asyncpg Documentation - PostgreSQL async driver
  • Connection Pooling Best Practices - PostgreSQL tuning
  • Row Level Security Guide - RLS implementation
  • FastAPI Patterns

  • FastAPI Documentation - Web framework reference
  • Dependency Injection - FastAPI patterns
  • Background Tasks - Async task management
  • ---

    Next: Ready to expand your tools? Continue with Lab 06: Tool Development

    MCP μ„œλ²„ κ΅¬ν˜„

    🎯 이 μ‹€μŠ΅μ—μ„œ λ‹€λ£¨λŠ” λ‚΄μš©

    이 μ‹€μŠ΅μ€ FastMCP ν”„λ ˆμž„μ›Œν¬λ₯Ό μ‚¬μš©ν•˜μ—¬ ν”„λ‘œλ•μ…˜ μˆ˜μ€€μ˜ MCP μ„œλ²„λ₯Ό κ΅¬ν˜„ν•˜λŠ” 과정을 μ•ˆλ‚΄ν•©λ‹ˆλ‹€. 핡심 μ„œλ²„ ꡬ쑰λ₯Ό κ΅¬μΆ•ν•˜κ³ , λ°μ΄ν„°λ² μ΄μŠ€ 톡합을 κ΅¬ν˜„ν•˜λ©°, 데이터 μ•‘μ„ΈμŠ€λ₯Ό μœ„ν•œ 도ꡬλ₯Ό λ§Œλ“€κ³ , AI 기반 μ†Œλ§€ 뢄석을 μœ„ν•œ 기초λ₯Ό μ„€μ •ν•˜κ²Œ λ©λ‹ˆλ‹€.

    κ°œμš”

    MCP μ„œλ²„λŠ” μ†Œλ§€ 뢄석 μ†”λ£¨μ…˜μ˜ μ€‘μ‹¬μž…λ‹ˆλ‹€. 이 μ„œλ²„λŠ” AI μ–΄μ‹œμŠ€ν„΄νŠΈμ™€ PostgreSQL λ°μ΄ν„°λ² μ΄μŠ€ κ°„μ˜ 닀리 역할을 ν•˜λ©°, ν‘œμ€€ν™”λœ ν”„λ‘œν† μ½œμ„ 톡해 λΉ„μ¦ˆλ‹ˆμŠ€ 데이터λ₯Ό μ•ˆμ „ν•˜κ³  μ§€λŠ₯적으둜 μ•‘μ„ΈμŠ€ν•  수 μžˆλ„λ‘ ν•©λ‹ˆλ‹€.

    이 μ‹€μŠ΅μ—μ„œλŠ” μ—”ν„°ν”„λΌμ΄μ¦ˆ νŒ¨ν„΄κ³Ό λͺ¨λ²” 사둀λ₯Ό λ”°λ₯΄λŠ” κ²¬κ³ ν•˜κ³  ν™•μž₯ κ°€λŠ₯ν•œ MCP μ„œλ²„λ₯Ό κ΅¬μΆ•ν•˜λŠ” 방법을 λ°°μ›λ‹ˆλ‹€.

    ν•™μŠ΅ λͺ©ν‘œ

    이 μ‹€μŠ΅μ„ μ™„λ£Œν•˜λ©΄ λ‹€μŒμ„ μˆ˜ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€:

  • ꡬ좕: μ μ ˆν•œ μ•„ν‚€ν…μ²˜μ™€ 쑰직을 κ°–μΆ˜ FastMCP μ„œλ²„
  • κ΅¬ν˜„: μ—°κ²° 풀링 및 였λ₯˜ 처리λ₯Ό ν¬ν•¨ν•œ λ°μ΄ν„°λ² μ΄μŠ€ 톡합
  • 생성: λ°μ΄ν„°λ² μ΄μŠ€ μŠ€ν‚€λ§ˆ 탐색 및 쿼리 싀행을 μœ„ν•œ MCP 도ꡬ
  • ꡬ성: ν–‰ μˆ˜μ€€ λ³΄μ•ˆ(RLS) μ»¨ν…μŠ€νŠΈ 관리
  • μΆ”κ°€: μƒνƒœ λͺ¨λ‹ˆν„°λ§ 및 κ΄€μ°° κΈ°λŠ₯
  • ν…ŒμŠ€νŠΈ: MCP μ„œλ²„ κ΅¬ν˜„μ„ 둜컬 및 VS Codeμ—μ„œ ν…ŒμŠ€νŠΈ
  • πŸ“ ν”„λ‘œμ νŠΈ ꡬ쑰

    MCP μ„œλ²„μ˜ 쑰직을 μ‚΄νŽ΄λ³΄κ² μŠ΅λ‹ˆλ‹€:

    
    mcp_server/
    
    β”œβ”€β”€ __init__.py                 # Package initialization
    
    β”œβ”€β”€ config.py                   # Configuration management
    
    β”œβ”€β”€ health_check.py             # Health monitoring endpoints
    
    β”œβ”€β”€ sales_analysis.py           # Main MCP server implementation
    
    β”œβ”€β”€ sales_analysis_postgres.py  # Database integration layer
    
    └── sales_analysis_text_embeddings.py  # AI/semantic search integration
    
    

    πŸ”§ ꡬ성 관리

    ν™˜κ²½ ꡬ성 (config.py)

    λ¨Όμ € κ²¬κ³ ν•œ ꡬ성 μ‹œμŠ€ν…œμ„ λ§Œλ“€μ–΄ λ΄…μ‹œλ‹€:

    
    # mcp_server/config.py
    
    """
    
    Configuration management for the MCP server.
    
    Handles environment variables, validation, and defaults.
    
    """
    
    import os
    
    import logging
    
    from typing import Optional, Dict, Any
    
    from dataclasses import dataclass
    
    from dotenv import load_dotenv
    
    
    
    # Load environment variables from .env file
    
    load_dotenv()
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    @dataclass
    
    class DatabaseConfig:
    
        """Database connection configuration."""
    
        host: str
    
        port: int
    
        database: str
    
        user: str
    
        password: str
    
        min_connections: int = 2
    
        max_connections: int = 10
    
        command_timeout: int = 30
    
        
    
        @classmethod
    
        def from_env(cls) -> 'DatabaseConfig':
    
            """Create configuration from environment variables."""
    
            return cls(
    
                host=os.getenv('POSTGRES_HOST', 'localhost'),
    
                port=int(os.getenv('POSTGRES_PORT', '5432')),
    
                database=os.getenv('POSTGRES_DB', 'zava'),
    
                user=os.getenv('POSTGRES_USER', 'postgres'),
    
                password=os.getenv('POSTGRES_PASSWORD', ''),
    
                min_connections=int(os.getenv('POSTGRES_MIN_CONNECTIONS', '2')),
    
                max_connections=int(os.getenv('POSTGRES_MAX_CONNECTIONS', '10')),
    
                command_timeout=int(os.getenv('POSTGRES_COMMAND_TIMEOUT', '30'))
    
            )
    
        
    
        def to_asyncpg_params(self) -> Dict[str, Any]:
    
            """Convert to asyncpg connection parameters."""
    
            return {
    
                'host': self.host,
    
                'port': self.port,
    
                'database': self.database,
    
                'user': self.user,
    
                'password': self.password,
    
                'command_timeout': self.command_timeout,
    
                'server_settings': {
    
                    'application_name': 'zava-mcp-server',
    
                    'jit': 'off',  # Disable JIT for stability
    
                    'work_mem': '4MB',
    
                    'statement_timeout': f'{self.command_timeout}s'
    
                }
    
            }
    
    
    
    @dataclass
    
    class AzureConfig:
    
        """Azure AI services configuration."""
    
        project_endpoint: str
    
        openai_endpoint: str
    
        embedding_model_deployment: str
    
        client_id: str
    
        client_secret: str
    
        tenant_id: str
    
        
    
        @classmethod
    
        def from_env(cls) -> 'AzureConfig':
    
            """Create configuration from environment variables."""
    
            return cls(
    
                project_endpoint=os.getenv('PROJECT_ENDPOINT', ''),
    
                openai_endpoint=os.getenv('AZURE_OPENAI_ENDPOINT', ''),
    
                embedding_model_deployment=os.getenv('EMBEDDING_MODEL_DEPLOYMENT_NAME', 'text-embedding-3-small'),
    
                client_id=os.getenv('AZURE_CLIENT_ID', ''),
    
                client_secret=os.getenv('AZURE_CLIENT_SECRET', ''),
    
                tenant_id=os.getenv('AZURE_TENANT_ID', '')
    
            )
    
        
    
        def is_configured(self) -> bool:
    
            """Check if all required Azure configuration is present."""
    
            return all([
    
                self.project_endpoint,
    
                self.openai_endpoint,
    
                self.client_id,
    
                self.client_secret,
    
                self.tenant_id
    
            ])
    
    
    
    @dataclass
    
    class ServerConfig:
    
        """MCP server configuration."""
    
        host: str = '0.0.0.0'
    
        port: int = 8000
    
        log_level: str = 'INFO'
    
        enable_cors: bool = True
    
        enable_health_check: bool = True
    
        applicationinsights_connection_string: Optional[str] = None
    
        
    
        @classmethod
    
        def from_env(cls) -> 'ServerConfig':
    
            """Create configuration from environment variables."""
    
            return cls(
    
                host=os.getenv('MCP_SERVER_HOST', '0.0.0.0'),
    
                port=int(os.getenv('MCP_SERVER_PORT', '8000')),
    
                log_level=os.getenv('LOG_LEVEL', 'INFO').upper(),
    
                enable_cors=os.getenv('ENABLE_CORS', 'true').lower() == 'true',
    
                enable_health_check=os.getenv('ENABLE_HEALTH_CHECK', 'true').lower() == 'true',
    
                applicationinsights_connection_string=os.getenv('APPLICATIONINSIGHTS_CONNECTION_STRING')
    
            )
    
    
    
    class MCPServerConfig:
    
        """Main configuration class for the MCP server."""
    
        
    
        def __init__(self):
    
            self.database = DatabaseConfig.from_env()
    
            self.azure = AzureConfig.from_env()
    
            self.server = ServerConfig.from_env()
    
            
    
            # Validate configuration
    
            self._validate_config()
    
        
    
        def _validate_config(self):
    
            """Validate configuration and log warnings for missing values."""
    
            if not self.database.password:
    
                logger.warning("Database password is empty. This may cause connection issues.")
    
            
    
            if not self.azure.is_configured():
    
                logger.warning("Azure configuration is incomplete. AI features may not work.")
    
            
    
            logger.info(f"Configuration loaded - Database: {self.database.host}:{self.database.port}")
    
            logger.info(f"Server will run on {self.server.host}:{self.server.port}")
    
    
    
    # Global configuration instance
    
    config = MCPServerConfig()
    
    

    μ£Όμš” ꡬ성 κΈ°λŠ₯

  • ν™˜κ²½ λ³€μˆ˜ λ‘œλ“œ: μžλ™ .env 파일 지원
  • νƒ€μž… μ•ˆμ „μ„±: 데이터 클래슀 검증 및 νƒ€μž… 힌트
  • μœ μ—°ν•œ κΈ°λ³Έκ°’: κ°œλ°œμ„ μœ„ν•œ 합리적인 κΈ°λ³Έκ°’
  • 검증: μœ μš©ν•œ 였λ₯˜ λ©”μ‹œμ§€λ₯Ό ν¬ν•¨ν•œ ꡬ성 검증
  • λ³΄μ•ˆ: ν™˜κ²½ λ³€μˆ˜μ—μ„œλ§Œ λ―Όκ°ν•œ κ°’ λ‘œλ“œ
  • πŸ—„οΈ λ°μ΄ν„°λ² μ΄μŠ€ 톡합 계측

    PostgreSQL 제곡자 (sales_analysis_postgres.py)

    λ°μ΄ν„°λ² μ΄μŠ€ 톡합 계측을 κ΅¬ν˜„ν•΄ λ΄…μ‹œλ‹€:

    
    # mcp_server/sales_analysis_postgres.py
    
    """
    
    PostgreSQL database integration for MCP server.
    
    Handles connections, queries, and schema introspection.
    
    """
    
    import asyncio
    
    import asyncpg
    
    import logging
    
    from typing import Dict, Any, List, Optional, Tuple
    
    from contextlib import asynccontextmanager
    
    from datetime import datetime
    
    import json
    
    
    
    from .config import config
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    class PostgreSQLSchemaProvider:
    
        """Provides PostgreSQL database access and schema information."""
    
        
    
        def __init__(self):
    
            self.connection_pool: Optional[asyncpg.Pool] = None
    
            self.postgres_config = config.database.to_asyncpg_params()
    
            
    
        async def create_pool(self) -> None:
    
            """Create connection pool for database operations."""
    
            if self.connection_pool is None:
    
                try:
    
                    self.connection_pool = await asyncpg.create_pool(
    
                        **self.postgres_config,
    
                        min_size=config.database.min_connections,
    
                        max_size=config.database.max_connections,
    
                        max_inactive_connection_lifetime=300  # 5 minutes
    
                    )
    
                    logger.info("Database connection pool created successfully")
    
                except Exception as e:
    
                    logger.error(f"Failed to create database connection pool: {e}")
    
                    raise
    
        
    
        async def close_pool(self) -> None:
    
            """Close the connection pool."""
    
            if self.connection_pool:
    
                await self.connection_pool.close()
    
                self.connection_pool = None
    
                logger.info("Database connection pool closed")
    
        
    
        @asynccontextmanager
    
        async def get_connection(self):
    
            """Get a database connection from the pool."""
    
            if not self.connection_pool:
    
                await self.create_pool()
    
            
    
            async with self.connection_pool.acquire() as connection:
    
                yield connection
    
        
    
        async def set_rls_context(self, connection: asyncpg.Connection, rls_user_id: str) -> None:
    
            """Set Row Level Security context for the connection."""
    
            try:
    
                await connection.execute(
    
                    "SELECT set_config('app.current_rls_user_id', $1, false)",
    
                    rls_user_id
    
                )
    
                logger.debug(f"RLS context set for user: {rls_user_id}")
    
            except Exception as e:
    
                logger.error(f"Failed to set RLS context: {e}")
    
                raise
    
        
    
        async def get_table_schema(self, table_name: str, rls_user_id: str) -> Dict[str, Any]:
    
            """Get detailed schema information for a specific table."""
    
            async with self.get_connection() as conn:
    
                await self.set_rls_context(conn, rls_user_id)
    
                
    
                # Parse schema and table name
    
                if '.' in table_name:
    
                    schema_name, table_name = table_name.split('.', 1)
    
                else:
    
                    schema_name = 'retail'  # Default schema
    
                
    
                # Get column information
    
                columns_query = """
    
                    SELECT 
    
                        column_name,
    
                        data_type,
    
                        is_nullable,
    
                        column_default,
    
                        character_maximum_length,
    
                        numeric_precision,
    
                        numeric_scale,
    
                        ordinal_position
    
                    FROM information_schema.columns 
    
                    WHERE table_schema = $1 AND table_name = $2
    
                    ORDER BY ordinal_position
    
                """
    
                
    
                columns = await conn.fetch(columns_query, schema_name, table_name)
    
                
    
                if not columns:
    
                    raise ValueError(f"Table {schema_name}.{table_name} not found or not accessible")
    
                
    
                # Get foreign key relationships
    
                fk_query = """
    
                    SELECT 
    
                        kcu.column_name,
    
                        ccu.table_schema AS foreign_table_schema,
    
                        ccu.table_name AS foreign_table_name,
    
                        ccu.column_name AS foreign_column_name
    
                    FROM information_schema.table_constraints tc
    
                    JOIN information_schema.key_column_usage kcu 
    
                        ON tc.constraint_name = kcu.constraint_name
    
                    JOIN information_schema.constraint_column_usage ccu 
    
                        ON ccu.constraint_name = tc.constraint_name
    
                    WHERE tc.constraint_type = 'FOREIGN KEY' 
    
                        AND tc.table_schema = $1 
    
                        AND tc.table_name = $2
    
                """
    
                
    
                foreign_keys = await conn.fetch(fk_query, schema_name, table_name)
    
                
    
                # Get indexes
    
                index_query = """
    
                    SELECT 
    
                        indexname,
    
                        indexdef
    
                    FROM pg_indexes 
    
                    WHERE schemaname = $1 AND tablename = $2
    
                """
    
                
    
                indexes = await conn.fetch(index_query, schema_name, table_name)
    
                
    
                # Format schema information
    
                schema_info = {
    
                    "table_name": f"{schema_name}.{table_name}",
    
                    "columns": [
    
                        {
    
                            "name": col["column_name"],
    
                            "type": col["data_type"],
    
                            "nullable": col["is_nullable"] == "YES",
    
                            "default": col["column_default"],
    
                            "max_length": col["character_maximum_length"],
    
                            "precision": col["numeric_precision"],
    
                            "scale": col["numeric_scale"],
    
                            "position": col["ordinal_position"]
    
                        }
    
                        for col in columns
    
                    ],
    
                    "foreign_keys": [
    
                        {
    
                            "column": fk["column_name"],
    
                            "references": f"{fk['foreign_table_schema']}.{fk['foreign_table_name']}.{fk['foreign_column_name']}"
    
                        }
    
                        for fk in foreign_keys
    
                    ],
    
                    "indexes": [
    
                        {
    
                            "name": idx["indexname"],
    
                            "definition": idx["indexdef"]
    
                        }
    
                        for idx in indexes
    
                    ]
    
                }
    
                
    
                return schema_info
    
        
    
        async def get_multiple_table_schemas(
    
            self, 
    
            table_names: List[str], 
    
            rls_user_id: str
    
        ) -> str:
    
            """Get schema information for multiple tables."""
    
            schemas = []
    
            
    
            for table_name in table_names:
    
                try:
    
                    schema = await self.get_table_schema(table_name, rls_user_id)
    
                    schemas.append(self._format_schema_for_ai(schema))
    
                except Exception as e:
    
                    logger.warning(f"Failed to get schema for {table_name}: {e}")
    
                    schemas.append(f"Error retrieving schema for {table_name}: {str(e)}")
    
            
    
            return "\n\n".join(schemas)
    
        
    
        def _format_schema_for_ai(self, schema: Dict[str, Any]) -> str:
    
            """Format schema information for AI consumption."""
    
            table_name = schema["table_name"]
    
            columns = schema["columns"]
    
            foreign_keys = schema["foreign_keys"]
    
            
    
            # Create column definitions
    
            column_lines = []
    
            for col in columns:
    
                nullable = "NULL" if col["nullable"] else "NOT NULL"
    
                type_info = col["type"]
    
                
    
                if col["max_length"]:
    
                    type_info += f"({col['max_length']})"
    
                elif col["precision"] and col["scale"]:
    
                    type_info += f"({col['precision']},{col['scale']})"
    
                
    
                default_info = f" DEFAULT {col['default']}" if col["default"] else ""
    
                
    
                column_lines.append(f"  {col['name']} {type_info} {nullable}{default_info}")
    
            
    
            # Create foreign key information
    
            fk_lines = []
    
            for fk in foreign_keys:
    
                fk_lines.append(f"  {fk['column']} -> {fk['references']}")
    
            
    
            # Combine into readable format
    
            schema_text = f"Table: {table_name}\n"
    
            schema_text += "Columns:\n" + "\n".join(column_lines)
    
            
    
            if fk_lines:
    
                schema_text += "\n\nForeign Keys:\n" + "\n".join(fk_lines)
    
            
    
            return schema_text
    
        
    
        async def execute_query(
    
            self, 
    
            sql_query: str, 
    
            rls_user_id: str,
    
            max_rows: int = 20
    
        ) -> str:
    
            """Execute a SQL query with Row Level Security context."""
    
            async with self.get_connection() as conn:
    
                await self.set_rls_context(conn, rls_user_id)
    
                
    
                try:
    
                    # Set a query timeout
    
                    rows = await asyncio.wait_for(
    
                        conn.fetch(sql_query),
    
                        timeout=config.database.command_timeout
    
                    )
    
                    
    
                    if not rows:
    
                        return "Query executed successfully. No rows returned."
    
                    
    
                    # Limit result set size
    
                    limited_rows = rows[:max_rows]
    
                    
    
                    # Format results
    
                    result = self._format_query_results(limited_rows, len(rows), max_rows)
    
                    
    
                    logger.info(f"Query executed successfully. Returned {len(limited_rows)} rows.")
    
                    return result
    
                    
    
                except asyncio.TimeoutError:
    
                    error_msg = f"Query timeout after {config.database.command_timeout} seconds"
    
                    logger.error(error_msg)
    
                    raise Exception(error_msg)
    
                except Exception as e:
    
                    logger.error(f"Query execution failed: {e}")
    
                    raise
    
        
    
        def _format_query_results(
    
            self, 
    
            rows: List[asyncpg.Record], 
    
            total_rows: int,
    
            max_rows: int
    
        ) -> str:
    
            """Format query results for AI consumption."""
    
            if not rows:
    
                return "No results found."
    
            
    
            # Get column names
    
            columns = list(rows[0].keys())
    
            
    
            # Create header
    
            result_lines = [f"Results ({len(rows)} of {total_rows} rows):"]
    
            result_lines.append("=" * 50)
    
            
    
            # Add column headers
    
            header = " | ".join(columns)
    
            result_lines.append(header)
    
            result_lines.append("-" * len(header))
    
            
    
            # Add data rows
    
            for row in rows:
    
                formatted_values = []
    
                for col in columns:
    
                    value = row[col]
    
                    if value is None:
    
                        formatted_values.append("NULL")
    
                    elif isinstance(value, datetime):
    
                        formatted_values.append(value.strftime("%Y-%m-%d %H:%M:%S"))
    
                    elif isinstance(value, (dict, list)):
    
                        formatted_values.append(json.dumps(value))
    
                    else:
    
                        formatted_values.append(str(value))
    
                
    
                result_lines.append(" | ".join(formatted_values))
    
            
    
            # Add truncation notice if needed
    
            if total_rows > max_rows:
    
                result_lines.append(f"\n... and {total_rows - max_rows} more rows (truncated for display)")
    
            
    
            return "\n".join(result_lines)
    
        
    
        async def get_current_utc_date(self) -> str:
    
            """Get current UTC date/time."""
    
            async with self.get_connection() as conn:
    
                result = await conn.fetchval("SELECT NOW() AT TIME ZONE 'UTC'")
    
                return result.isoformat() + "Z"
    
        
    
        async def health_check(self) -> Dict[str, Any]:
    
            """Perform database health check."""
    
            try:
    
                async with self.get_connection() as conn:
    
                    # Simple connectivity test
    
                    result = await conn.fetchval("SELECT 1")
    
                    
    
                    # Check pool status
    
                    pool_info = {
    
                        "min_size": self.connection_pool._minsize if self.connection_pool else 0,
    
                        "max_size": self.connection_pool._maxsize if self.connection_pool else 0,
    
                        "current_size": self.connection_pool.get_size() if self.connection_pool else 0,
    
                        "idle_size": self.connection_pool.get_idle_size() if self.connection_pool else 0
    
                    }
    
                    
    
                    return {
    
                        "status": "healthy",
    
                        "database_responsive": result == 1,
    
                        "pool_info": pool_info
    
                    }
    
                    
    
            except Exception as e:
    
                return {
    
                    "status": "unhealthy",
    
                    "error": str(e)
    
                }
    
    
    
    # Global database provider instance
    
    db_provider = PostgreSQLSchemaProvider()
    
    

    μ£Όμš” λ°μ΄ν„°λ² μ΄μŠ€ 계측 κΈ°λŠ₯

  • μ—°κ²° 풀링: asyncpgλ₯Ό μ‚¬μš©ν•œ 효율적인 λ¦¬μ†ŒμŠ€ 관리
  • RLS 톡합: μžλ™ ν–‰ μˆ˜μ€€ λ³΄μ•ˆ μ»¨ν…μŠ€νŠΈ μ„€μ •
  • μŠ€ν‚€λ§ˆ 탐색: 동적 ν…Œμ΄λΈ” μŠ€ν‚€λ§ˆ 검색
  • 였λ₯˜ 처리: 포괄적인 였λ₯˜ 관리 및 λ‘œκΉ…
  • 쿼리 ν¬λ§·νŒ…: AI μΉœν™”μ μΈ κ²°κ³Ό ν¬λ§·νŒ…
  • μƒνƒœ λͺ¨λ‹ˆν„°λ§: λ°μ΄ν„°λ² μ΄μŠ€ μ—°κ²° 및 ν’€ μƒνƒœ 확인
  • πŸ”§ μ£Όμš” MCP μ„œλ²„ κ΅¬ν˜„

    FastMCP μ„œλ²„ (sales_analysis.py)

    이제 μ£Όμš” MCP μ„œλ²„λ₯Ό κ΅¬ν˜„ν•΄ λ΄…μ‹œλ‹€:

    
    # mcp_server/sales_analysis.py
    
    """
    
    Main MCP server implementation for Zava Retail Sales Analysis.
    
    Provides AI assistants with secure access to retail database.
    
    """
    
    import logging
    
    import asyncio
    
    from typing import Dict, Any, List, Annotated
    
    from contextlib import asynccontextmanager
    
    
    
    from fastmcp import FastMCP, Context
    
    from pydantic import Field
    
    
    
    from .config import config
    
    from .sales_analysis_postgres import db_provider
    
    from .health_check import setup_health_endpoints
    
    
    
    # Configure logging
    
    logging.basicConfig(
    
        level=getattr(logging, config.server.log_level),
    
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    
    )
    
    logger = logging.getLogger(__name__)
    
    
    
    # Create FastMCP server instance
    
    mcp = FastMCP("Zava Retail Sales Analysis")
    
    
    
    # List of valid tables for schema access
    
    VALID_TABLES = [
    
        "retail.stores",
    
        "retail.customers", 
    
        "retail.categories",
    
        "retail.product_types",
    
        "retail.products",
    
        "retail.orders",
    
        "retail.order_items",
    
        "retail.inventory"
    
    ]
    
    
    
    def get_rls_user_id(ctx: Context) -> str:
    
        """Extract Row Level Security User ID from request context."""
    
        # In HTTP mode, get from headers
    
        if hasattr(ctx, 'headers') and ctx.headers:
    
            rls_user_id = ctx.headers.get("x-rls-user-id")
    
            if rls_user_id:
    
                logger.debug(f"RLS User ID from headers: {rls_user_id}")
    
                return rls_user_id
    
        
    
        # Default fallback for development/testing
    
        default_id = "00000000-0000-0000-0000-000000000000"
    
        logger.warning(f"No RLS User ID found, using default: {default_id}")
    
        return default_id
    
    
    
    @mcp.tool()
    
    async def get_multiple_table_schemas(
    
        ctx: Context,
    
        table_names: Annotated[List[str], Field(description="List of table names to retrieve schemas for. Valid tables: " + ", ".join(VALID_TABLES))]
    
    ) -> str:
    
        """
    
        Retrieve database schemas for multiple tables in a single request.
    
        
    
        This tool provides comprehensive schema information including:
    
        - Column names, types, and constraints
    
        - Foreign key relationships
    
        - Index information
    
        - Table structure for AI query planning
    
        
    
        Args:
    
            table_names: List of valid table names from the retail schema
    
            
    
        Returns:
    
            Formatted schema information for all requested tables
    
        """
    
        rls_user_id = get_rls_user_id(ctx)
    
        
    
        # Validate table names
    
        invalid_tables = [table for table in table_names if table not in VALID_TABLES]
    
        if invalid_tables:
    
            logger.warning(f"Invalid table names requested: {invalid_tables}")
    
            return f"Error: Invalid table names: {', '.join(invalid_tables)}. Valid tables are: {', '.join(VALID_TABLES)}"
    
        
    
        try:
    
            logger.info(f"Retrieving schemas for tables: {table_names} (User: {rls_user_id})")
    
            result = await db_provider.get_multiple_table_schemas(table_names, rls_user_id)
    
            return result
    
        except Exception as e:
    
            logger.error(f"Error retrieving table schemas: {e}")
    
            return f"Error retrieving table schemas: {e!s}"
    
    
    
    @mcp.tool()
    
    async def execute_sales_query(
    
        ctx: Context,
    
        postgresql_query: Annotated[str, Field(description="A well-formed PostgreSQL query to execute against the retail database. Always get table schemas first before writing queries.")]
    
    ) -> str:
    
        """
    
        Execute PostgreSQL queries against the retail sales database with Row Level Security.
    
        
    
        This tool allows AI assistants to run analytical queries on retail data including:
    
        - Sales performance analysis
    
        - Customer behavior insights  
    
        - Inventory management queries
    
        - Product performance metrics
    
        - Store-specific reporting
    
        
    
        Important: Row Level Security ensures users only see data they're authorized to access.
    
        
    
        Args:
    
            postgresql_query: SQL query to execute (automatically filtered by RLS)
    
            
    
        Returns:
    
            Query results formatted for AI analysis (limited to 20 rows for readability)
    
        """
    
        rls_user_id = get_rls_user_id(ctx)
    
        
    
        try:
    
            logger.info(f"Executing query for user: {rls_user_id}")
    
            logger.debug(f"Query: {postgresql_query[:100]}...")
    
            
    
            result = await db_provider.execute_query(postgresql_query, rls_user_id)
    
            return result
    
        except Exception as e:
    
            logger.error(f"Error executing database query: {e}")
    
            return f"Error executing database query: {e!s}"
    
    
    
    @mcp.tool()
    
    async def get_current_utc_date(ctx: Context) -> str:
    
        """
    
        Get the current UTC date and time in ISO format.
    
        
    
        Useful for time-sensitive queries and date-based analysis.
    
        
    
        Returns:
    
            Current UTC date/time in ISO format (YYYY-MM-DDTHH:MM:SS.fffffZ)
    
        """
    
        try:
    
            result = await db_provider.get_current_utc_date()
    
            logger.debug(f"Current UTC date retrieved: {result}")
    
            return result
    
        except Exception as e:
    
            logger.error(f"Error getting current UTC date: {e}")
    
            return f"Error getting current UTC date: {e!s}"
    
    
    
    # Application lifecycle management
    
    @asynccontextmanager
    
    async def lifespan(app):
    
        """Manage application startup and shutdown."""
    
        logger.info("Starting Zava Retail MCP Server...")
    
        
    
        try:
    
            # Initialize database connection pool
    
            await db_provider.create_pool()
    
            logger.info("Database connection pool initialized")
    
            
    
            # Test database connectivity
    
            health_status = await db_provider.health_check()
    
            if health_status["status"] != "healthy":
    
                logger.error(f"Database health check failed: {health_status}")
    
                raise Exception("Database not healthy")
    
            
    
            logger.info("MCP Server startup complete")
    
            yield
    
            
    
        except Exception as e:
    
            logger.error(f"Startup failed: {e}")
    
            raise
    
        finally:
    
            # Cleanup
    
            logger.info("Shutting down MCP Server...")
    
            await db_provider.close_pool()
    
            logger.info("MCP Server shutdown complete")
    
    
    
    # Configure server application
    
    def create_app():
    
        """Create and configure the MCP server application."""
    
        
    
        # Get the FastMCP app instance
    
        app = mcp.sse_app()
    
        
    
        # Set up lifecycle management
    
        app.router.lifespan_context = lifespan
    
        
    
        # Add health check endpoints if enabled
    
        if config.server.enable_health_check:
    
            setup_health_endpoints(app, db_provider)
    
        
    
        # Configure CORS if enabled
    
        if config.server.enable_cors:
    
            from fastapi.middleware.cors import CORSMiddleware
    
            app.add_middleware(
    
                CORSMiddleware,
    
                allow_origins=["*"],  # Configure appropriately for production
    
                allow_credentials=True,
    
                allow_methods=["*"],
    
                allow_headers=["*"],
    
            )
    
        
    
        logger.info(f"MCP Server configured - CORS: {config.server.enable_cors}, Health: {config.server.enable_health_check}")
    
        
    
        return app
    
    
    
    # Create the application instance
    
    app = create_app()
    
    
    
    # Main entry point for development
    
    if __name__ == "__main__":
    
        import uvicorn
    
        
    
        logger.info(f"Starting development server on {config.server.host}:{config.server.port}")
    
        
    
        uvicorn.run(
    
            "sales_analysis:app",
    
            host=config.server.host,
    
            port=config.server.port,
    
            reload=True,
    
            log_level=config.server.log_level.lower()
    
        )
    
    

    μ£Όμš” MCP μ„œλ²„ κΈ°λŠ₯

  • 도ꡬ 등둝: νƒ€μž… μ•ˆμ „μ„±μ„ κ°–μΆ˜ 선언적 도ꡬ μ •μ˜
  • RLS μ»¨ν…μŠ€νŠΈ 관리: μ‚¬μš©μž 신원 μΆ”μΆœ 및 μ»¨ν…μŠ€νŠΈ μ„€μ • μžλ™ν™”
  • 였λ₯˜ 처리: μ‚¬μš©μž μΉœν™”μ μΈ λ©”μ‹œμ§€λ₯Ό ν¬ν•¨ν•œ 포괄적인 였λ₯˜ 관리
  • 라이프사이클 관리: λ¦¬μ†ŒμŠ€ 정리λ₯Ό ν¬ν•¨ν•œ μ μ ˆν•œ μ‹œμž‘/μ’…λ£Œ
  • μƒνƒœ λͺ¨λ‹ˆν„°λ§: λ‚΄μž₯ μƒνƒœ 확인 μ—”λ“œν¬μΈνŠΈ
  • 개발 지원: ν•« λ¦¬λ‘œλ“œ 및 디버깅 κΈ°λŠ₯
  • πŸ₯ μƒνƒœ λͺ¨λ‹ˆν„°λ§

    μƒνƒœ 확인 κ΅¬ν˜„ (health_check.py)

    
    # mcp_server/health_check.py
    
    """
    
    Health check endpoints for monitoring MCP server status.
    
    """
    
    import logging
    
    from typing import Dict, Any
    
    from fastapi import FastAPI, HTTPException
    
    from fastapi.responses import JSONResponse
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    def setup_health_endpoints(app: FastAPI, db_provider) -> None:
    
        """Add health check endpoints to the FastAPI application."""
    
        
    
        @app.get("/health")
    
        async def health_check() -> JSONResponse:
    
            """Basic health check endpoint."""
    
            return JSONResponse(
    
                status_code=200,
    
                content={
    
                    "status": "healthy",
    
                    "service": "zava-retail-mcp-server",
    
                    "timestamp": await db_provider.get_current_utc_date()
    
                }
    
            )
    
        
    
        @app.get("/health/detailed")
    
        async def detailed_health_check() -> JSONResponse:
    
            """Detailed health check including database connectivity."""
    
            health_status = {
    
                "service": "zava-retail-mcp-server",
    
                "status": "healthy",
    
                "components": {}
    
            }
    
            
    
            overall_healthy = True
    
            
    
            # Check database
    
            try:
    
                db_health = await db_provider.health_check()
    
                health_status["components"]["database"] = db_health
    
                
    
                if db_health["status"] != "healthy":
    
                    overall_healthy = False
    
                    
    
            except Exception as e:
    
                health_status["components"]["database"] = {
    
                    "status": "unhealthy",
    
                    "error": str(e)
    
                }
    
                overall_healthy = False
    
            
    
            # Update overall status
    
            if not overall_healthy:
    
                health_status["status"] = "unhealthy"
    
            
    
            status_code = 200 if overall_healthy else 503
    
            
    
            return JSONResponse(
    
                status_code=status_code,
    
                content=health_status
    
            )
    
        
    
        @app.get("/health/ready")
    
        async def readiness_check() -> JSONResponse:
    
            """Kubernetes readiness probe endpoint."""
    
            try:
    
                # Test critical functionality
    
                db_health = await db_provider.health_check()
    
                
    
                if db_health["status"] != "healthy":
    
                    raise HTTPException(status_code=503, detail="Database not ready")
    
                
    
                return JSONResponse(
    
                    status_code=200,
    
                    content={"status": "ready"}
    
                )
    
                
    
            except Exception as e:
    
                logger.error(f"Readiness check failed: {e}")
    
                raise HTTPException(status_code=503, detail="Service not ready")
    
        
    
        @app.get("/health/live")
    
        async def liveness_check() -> JSONResponse:
    
            """Kubernetes liveness probe endpoint."""
    
            return JSONResponse(
    
                status_code=200,
    
                content={"status": "alive"}
    
            )
    
        
    
        logger.info("Health check endpoints configured")
    
    

    πŸ§ͺ MCP μ„œλ²„ ν…ŒμŠ€νŠΈ

    둜컬 ν…ŒμŠ€νŠΈ

    1. MCP μ„œλ²„ μ‹œμž‘:

    ```bash

    # Activate virtual environment

    source mcp-env/bin/activate # macOS/Linux

    # mcp-env\Scripts\activate # Windows

    # Start server

    cd mcp_server

    python sales_analysis.py

    ```

    2. μƒνƒœ μ—”λ“œν¬μΈνŠΈ ν…ŒμŠ€νŠΈ:

    ```bash

    # Basic health check

    curl http://localhost:8000/health

    # Detailed health check

    curl http://localhost:8000/health/detailed

    ```

    3. MCP 도ꡬ ν…ŒμŠ€νŠΈ:

    ```bash

    # List available tools

    curl -X POST http://localhost:8000/mcp \

    -H "Content-Type: application/json" \

    -H "x-rls-user-id: 00000000-0000-0000-0000-000000000000" \

    -d '{"method": "tools/list", "params": {}}'

    # Get table schemas

    curl -X POST http://localhost:8000/mcp \

    -H "Content-Type: application/json" \

    -H "x-rls-user-id: 00000000-0000-0000-0000-000000000000" \

    -d '{

    "method": "tools/call",

    "params": {

    "name": "get_multiple_table_schemas",

    "arguments": {

    "table_names": ["retail.stores", "retail.products"]

    }

    }

    }'

    ```

    VS Code 톡합 ν…ŒμŠ€νŠΈ

    1. VS Code MCP ꡬ성:

    ```json

    // .vscode/mcp.json

    {

    "servers": {

    "zava-retail-test": {

    "url": "http://127.0.0.1:8000/mcp",

    "type": "http",

    "headers": {"x-rls-user-id": "00000000-0000-0000-0000-000000000000"}

    }

    }

    }

    ```

    2. AI μ±„νŒ…μ—μ„œ ν…ŒμŠ€νŠΈ:

    - VS Code AI μ±„νŒ… μ—΄κΈ°

    - #zavaλ₯Ό μž…λ ₯ν•˜κ³  μ„œλ²„ 선택

    - 질문: "μ‚¬μš© κ°€λŠ₯ν•œ ν…Œμ΄λΈ”μ€ λ¬΄μ—‡μΈκ°€μš”?"

    - 질문: "μ£Όλ¬Έ 수 κΈ°μ€€ μƒμœ„ 5개 λ§€μž₯을 λ³΄μ—¬μ£Όμ„Έμš”."

    λ‹¨μœ„ ν…ŒμŠ€νŠΈ

    포괄적인 λ‹¨μœ„ ν…ŒμŠ€νŠΈλ₯Ό μž‘μ„±ν•˜μ„Έμš”:

    
    # tests/test_mcp_server.py
    
    import pytest
    
    import asyncio
    
    from mcp_server.sales_analysis_postgres import PostgreSQLSchemaProvider
    
    from mcp_server.config import config
    
    
    
    @pytest.mark.asyncio
    
    async def test_database_connection():
    
        """Test database connectivity."""
    
        db = PostgreSQLSchemaProvider()
    
        
    
        try:
    
            await db.create_pool()
    
            health = await db.health_check()
    
            assert health["status"] == "healthy"
    
        finally:
    
            await db.close_pool()
    
    
    
    @pytest.mark.asyncio
    
    async def test_table_schema_retrieval():
    
        """Test table schema retrieval."""
    
        db = PostgreSQLSchemaProvider()
    
        
    
        try:
    
            await db.create_pool()
    
            schema = await db.get_table_schema("retail.stores", "00000000-0000-0000-0000-000000000000")
    
            
    
            assert schema["table_name"] == "retail.stores"
    
            assert len(schema["columns"]) > 0
    
            
    
        finally:
    
            await db.close_pool()
    
    
    
    @pytest.mark.asyncio
    
    async def test_query_execution():
    
        """Test query execution with RLS."""
    
        db = PostgreSQLSchemaProvider()
    
        
    
        try:
    
            await db.create_pool()
    
            result = await db.execute_query(
    
                "SELECT COUNT(*) as store_count FROM retail.stores",
    
                "00000000-0000-0000-0000-000000000000"
    
            )
    
            
    
            assert "store_count" in result
    
            
    
        finally:
    
            await db.close_pool()
    
    

    🎯 μ£Όμš” μš”μ 

    이 μ‹€μŠ΅μ„ μ™„λ£Œν•œ ν›„, λ‹€μŒμ„ κ°–μΆ”κ²Œ λ©λ‹ˆλ‹€:

    βœ… μž‘λ™ν•˜λŠ” MCP μ„œλ²„: λ°μ΄ν„°λ² μ΄μŠ€ 톡합을 κ°–μΆ˜ FastMCP μ„œλ²„

    βœ… ꡬ성 관리: ν™˜κ²½ 기반의 κ²¬κ³ ν•œ ꡬ성

    βœ… λ°μ΄ν„°λ² μ΄μŠ€ 계측: μ—°κ²° 풀링을 ν¬ν•¨ν•œ PostgreSQL 톡합

    βœ… MCP 도ꡬ: μŠ€ν‚€λ§ˆ 탐색 및 쿼리 μ‹€ν–‰ 도ꡬ

    βœ… RLS 톡합: ν–‰ μˆ˜μ€€ λ³΄μ•ˆ μ»¨ν…μŠ€νŠΈ 관리

    βœ… μƒνƒœ λͺ¨λ‹ˆν„°λ§: 포괄적인 μƒνƒœ 확인 μ—”λ“œν¬μΈνŠΈ

    βœ… ν…ŒμŠ€νŠΈ μ „λž΅: 둜컬 ν…ŒμŠ€νŠΈ 및 VS Code 톡합

    πŸš€ λ‹€μŒ 단계

    Lab 06: 도ꡬ κ°œλ°œμ„ 계속 μ§„ν–‰ν•˜μ—¬:

  • MCP 도ꡬ μ»¬λ ‰μ…˜ ν™•μž₯
  • κ³ κΈ‰ 쿼리 νŒ¨ν„΄ κ΅¬ν˜„
  • 데이터 검증 및 λ³€ν™˜ μΆ”κ°€
  • μ „λ¬Έν™”λœ 뢄석 도ꡬ 생성
  • πŸ“š μΆ”κ°€ 자료

    FastMCP ν”„λ ˆμž„μ›Œν¬

  • FastMCP λ¬Έμ„œ - 곡식 FastMCP κ°€μ΄λ“œ
  • MCP 사양 - ν”„λ‘œν† μ½œ 사양
  • 도ꡬ 개발 κ°€μ΄λ“œ - MCP 도ꡬ 생성
  • λ°μ΄ν„°λ² μ΄μŠ€ 톡합

  • asyncpg λ¬Έμ„œ - PostgreSQL 비동기 λ“œλΌμ΄λ²„
  • μ—°κ²° 풀링 λͺ¨λ²” 사둀 - PostgreSQL νŠœλ‹
  • ν–‰ μˆ˜μ€€ λ³΄μ•ˆ κ°€μ΄λ“œ - RLS κ΅¬ν˜„
  • FastAPI νŒ¨ν„΄

  • FastAPI λ¬Έμ„œ - μ›Ή ν”„λ ˆμž„μ›Œν¬ μ°Έμ‘°
  • μ˜μ‘΄μ„± μ£Όμž… - FastAPI νŒ¨ν„΄
  • λ°±κ·ΈλΌμš΄λ“œ μž‘μ—… - 비동기 μž‘μ—… 관리
  • ---

    λ‹€μŒ: 도ꡬλ₯Ό ν™•μž₯ν•  μ€€λΉ„κ°€ λ˜μ…¨λ‚˜μš”? Lab 06: 도ꡬ κ°œλ°œμ„ 계속 μ§„ν–‰ν•˜μ„Έμš”.

    ---

    λ©΄μ±… μ‘°ν•­:

    이 λ¬Έμ„œλŠ” AI λ²ˆμ—­ μ„œλΉ„μŠ€ Co-op Translatorλ₯Ό μ‚¬μš©ν•˜μ—¬ λ²ˆμ—­λ˜μ—ˆμŠ΅λ‹ˆλ‹€.

    정확성을 μœ„ν•΄ μ΅œμ„ μ„ λ‹€ν•˜κ³  μžˆμœΌλ‚˜, μžλ™ λ²ˆμ—­μ—λŠ” 였λ₯˜λ‚˜ 뢀정확성이 포함될 수 μžˆμŠ΅λ‹ˆλ‹€.

    원본 λ¬Έμ„œμ˜ 원어 버전을 μ‹ λ’°ν•  수 μžˆλŠ” κΆŒμœ„ μžˆλŠ” 자료둜 κ°„μ£Όν•΄μ•Ό ν•©λ‹ˆλ‹€.

    μ€‘μš”ν•œ μ •λ³΄μ˜ 경우, 전문적인 인간 λ²ˆμ—­μ„ ꢌμž₯ν•©λ‹ˆλ‹€.

    이 λ²ˆμ—­ μ‚¬μš©μœΌλ‘œ 인해 λ°œμƒν•˜λŠ” μ˜€ν•΄λ‚˜ 잘λͺ»λœ 해석에 λŒ€ν•΄ λ‹Ήμ‚¬λŠ” μ±…μž„μ„ μ§€μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

    MCP Academy — microsoft/mcp-for-beginners