02 - Security and Multi-Tenancy

Module
PostgreSQL
Progress
23%

Security and Multi-Tenancy

🎯 What This Lab Covers

This lab provides comprehensive guidance on implementing enterprise-grade security and multi-tenancy for MCP servers.

You'll learn to design secure, compliant systems that protect sensitive retail data while enabling flexible access patterns across multiple tenants.

Overview

Security is paramount in retail applications that handle customer data, payment information, and business intelligence.

This lab covers the complete security architecture from authentication and authorization to data isolation and compliance monitoring.

We implement a defense-in-depth strategy combining Azure identity services, PostgreSQL Row Level Security, application-level controls, and comprehensive audit logging to create a robust, compliant platform.

Learning Objectives

By the end of this lab, you will be able to:

  • Implement enterprise-grade Row Level Security for multi-tenant data isolation
  • Design secure authentication and authorization patterns with Azure
  • Configure comprehensive audit logging for compliance requirements
  • Apply defense-in-depth security strategies across all application layers
  • Validate security implementations through systematic testing
  • Monitor security events and respond to potential threats
  • πŸ” Multi-Tenant Security Architecture

    Security Layers Overview

    
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    
    β”‚               Azure Front Door                  β”‚ ← WAF, DDoS Protection
    
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚              Application Gateway                β”‚ ← SSL Termination, Rate Limiting
    
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚                MCP Server                       β”‚ ← Authentication, Authorization
    
    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚  β”‚           Connection Layer                  β”‚ ← Connection Pooling, Circuit Breakers
    
    β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚  β”‚         Business Logic Layer               β”‚ ← Input Validation, Business Rules
    
    β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚  β”‚           Data Access Layer                β”‚ ← Query Sanitization, RLS Context
    
    β”‚  └──────────────────────────────────────────────
    
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚              PostgreSQL RLS                    β”‚ ← Row Level Security, Audit Triggers
    
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    
    

    Multi-Tenancy Models

    Our implementation uses the Shared Database, Shared Schema model with Row Level Security:

    Benefits:

  • Cost-effective resource utilization
  • Simplified maintenance and updates
  • Strong data isolation through RLS
  • Compliance-friendly audit trails
  • Trade-offs:

  • Requires careful RLS policy design
  • Schema changes affect all tenants
  • Need robust backup/restore procedures
  • πŸ›‘οΈ Row Level Security Implementation

    RLS Foundation

    
    -- Enable RLS on all multi-tenant tables
    
    ALTER TABLE retail.customers ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.products ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.sales_transactions ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.sales_transaction_items ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.product_embeddings ENABLE ROW LEVEL SECURITY;
    
    
    
    -- Create application role for MCP server
    
    CREATE ROLE mcp_user LOGIN;
    
    GRANT USAGE ON SCHEMA retail TO mcp_user;
    
    GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA retail TO mcp_user;
    
    

    Store Context Management

    
    -- Function to securely set store context
    
    CREATE OR REPLACE FUNCTION retail.set_store_context(store_id_param VARCHAR(50))
    
    RETURNS void
    
    LANGUAGE plpgsql
    
    SECURITY DEFINER
    
    SET search_path = retail, pg_temp
    
    AS $$
    
    DECLARE
    
        user_info RECORD;
    
    BEGIN
    
        -- Validate store exists and is active
    
        SELECT store_id, store_name, is_active 
    
        INTO user_info
    
        FROM retail.stores 
    
        WHERE store_id = store_id_param;
    
        
    
        IF NOT FOUND THEN
    
            RAISE EXCEPTION 'Store not found: %', store_id_param
    
                USING ERRCODE = 'invalid_parameter_value',
    
                      HINT = 'Verify store ID and ensure it exists in the system';
    
        END IF;
    
        
    
        IF NOT user_info.is_active THEN
    
            RAISE EXCEPTION 'Store is inactive: %', store_id_param
    
                USING ERRCODE = 'insufficient_privilege',
    
                      HINT = 'Contact administrator to activate store';
    
        END IF;
    
        
    
        -- Set the secure context
    
        PERFORM set_config('app.current_store_id', store_id_param, false);
    
        PERFORM set_config('app.store_name', user_info.store_name, false);
    
        PERFORM set_config('app.context_set_at', extract(epoch from current_timestamp)::text, false);
    
        
    
        -- Log context change for audit
    
        INSERT INTO retail.security_audit_log (
    
            event_type,
    
            user_name,
    
            store_id,
    
            ip_address,
    
            user_agent,
    
            details,
    
            severity
    
        ) VALUES (
    
            'store_context_set',
    
            current_user,
    
            store_id_param,
    
            inet_client_addr()::text,
    
            current_setting('application_name', true),
    
            jsonb_build_object(
    
                'store_name', user_info.store_name,
    
                'timestamp', current_timestamp,
    
                'session_id', pg_backend_pid()
    
            ),
    
            'INFO'
    
        );
    
    END;
    
    $$;
    
    
    
    -- Grant execute to MCP user
    
    GRANT EXECUTE ON FUNCTION retail.set_store_context TO mcp_user;
    
    

    RLS Policies

    
    -- Customers RLS Policy
    
    CREATE POLICY customers_store_isolation ON retail.customers
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        )
    
        WITH CHECK (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        );
    
    
    
    -- Products RLS Policy with additional business rules
    
    CREATE POLICY products_store_isolation ON retail.products
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
            AND is_active = TRUE  -- Additional business rule
    
        )
    
        WITH CHECK (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        );
    
    
    
    -- Sales Transactions RLS Policy
    
    CREATE POLICY sales_transactions_store_isolation ON retail.sales_transactions
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        )
    
        WITH CHECK (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        );
    
    
    
    -- Transaction Items RLS Policy (via join)
    
    CREATE POLICY sales_transaction_items_store_isolation ON retail.sales_transaction_items
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            transaction_id IN (
    
                SELECT transaction_id 
    
                FROM retail.sales_transactions 
    
                WHERE store_id = current_setting('app.current_store_id', true)
    
            )
    
        )
    
        WITH CHECK (
    
            transaction_id IN (
    
                SELECT transaction_id 
    
                FROM retail.sales_transactions 
    
                WHERE store_id = current_setting('app.current_store_id', true)
    
            )
    
        );
    
    
    
    -- Product Embeddings RLS Policy
    
    CREATE POLICY product_embeddings_store_isolation ON retail.product_embeddings
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        )
    
        WITH CHECK (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        );
    
    

    RLS Testing and Validation

    
    -- Test RLS policies with different store contexts
    
    DO $$
    
    DECLARE
    
        test_result RECORD;
    
        customer_count INTEGER;
    
        product_count INTEGER;
    
    BEGIN
    
        -- Test Seattle store context
    
        PERFORM retail.set_store_context('seattle');
    
        
    
        SELECT COUNT(*) INTO customer_count FROM retail.customers;
    
        SELECT COUNT(*) INTO product_count FROM retail.products;
    
        
    
        RAISE NOTICE 'Seattle store - Customers: %, Products: %', customer_count, product_count;
    
        
    
        -- Test Redmond store context
    
        PERFORM retail.set_store_context('redmond');
    
        
    
        SELECT COUNT(*) INTO customer_count FROM retail.customers;
    
        SELECT COUNT(*) INTO product_count FROM retail.products;
    
        
    
        RAISE NOTICE 'Redmond store - Customers: %, Products: %', customer_count, product_count;
    
        
    
        -- Verify data isolation
    
        IF customer_count > 0 AND product_count > 0 THEN
    
            RAISE NOTICE 'RLS policies are working correctly';
    
        ELSE
    
            RAISE WARNING 'RLS policies may not be configured correctly';
    
        END IF;
    
    END;
    
    $$;
    
    

    πŸ”‘ Authentication and Authorization

    Azure Entra ID Integration

    
    # mcp_server/security/authentication.py
    
    """
    
    Azure Entra ID authentication for MCP server.
    
    """
    
    import os
    
    import jwt
    
    import aiohttp
    
    import asyncio
    
    from typing import Dict, Optional, List
    
    from datetime import datetime, timezone
    
    from azure.identity.aio import DefaultAzureCredential
    
    from azure.keyvault.secrets.aio import SecretClient
    
    import logging
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    class AzureAuthenticator:
    
        """Handle Azure Entra ID authentication and token validation."""
    
        
    
        def __init__(self):
    
            self.tenant_id = os.getenv('AZURE_TENANT_ID')
    
            self.client_id = os.getenv('AZURE_CLIENT_ID')
    
            self.audience = os.getenv('AZURE_AUDIENCE', self.client_id)
    
            self.issuer = f"https://login.microsoftonline.com/{self.tenant_id}/v2.0"
    
            
    
            # Cache for JWKS (JSON Web Key Set)
    
            self._jwks_cache = None
    
            self._jwks_cache_expiry = None
    
            
    
            # Key Vault for secrets
    
            self.key_vault_url = os.getenv('AZURE_KEY_VAULT_URL')
    
            self.credential = DefaultAzureCredential()
    
            
    
            if self.key_vault_url:
    
                self.secret_client = SecretClient(
    
                    vault_url=self.key_vault_url,
    
                    credential=self.credential
    
                )
    
        
    
        async def validate_token(self, token: str) -> Dict:
    
            """Validate JWT token from Azure Entra ID."""
    
            
    
            try:
    
                # Get signing keys
    
                signing_keys = await self._get_signing_keys()
    
                
    
                # Decode token header to get key ID
    
                unverified_header = jwt.get_unverified_header(token)
    
                key_id = unverified_header.get('kid')
    
                
    
                if not key_id:
    
                    raise ValueError("Token missing key ID")
    
                
    
                # Find the corresponding key
    
                signing_key = None
    
                for key in signing_keys:
    
                    if key['kid'] == key_id:
    
                        signing_key = jwt.algorithms.RSAAlgorithm.from_jwk(key)
    
                        break
    
                
    
                if not signing_key:
    
                    raise ValueError(f"Unable to find signing key for kid: {key_id}")
    
                
    
                # Validate and decode token
    
                payload = jwt.decode(
    
                    token,
    
                    signing_key,
    
                    algorithms=['RS256'],
    
                    audience=self.audience,
    
                    issuer=self.issuer,
    
                    options={
    
                        'verify_exp': True,
    
                        'verify_aud': True,
    
                        'verify_iss': True
    
                    }
    
                )
    
                
    
                # Extract user information
    
                user_info = self._extract_user_info(payload)
    
                
    
                # Log successful authentication
    
                logger.info(
    
                    "User authenticated successfully",
    
                    extra={
    
                        'user_id': user_info['user_id'],
    
                        'email': user_info.get('email'),
    
                        'tenant_id': payload.get('tid')
    
                    }
    
                )
    
                
    
                return user_info
    
                
    
            except jwt.ExpiredSignatureError:
    
                logger.warning("Token has expired")
    
                raise ValueError("Token has expired")
    
            except jwt.InvalidAudienceError:
    
                logger.warning(f"Invalid audience in token. Expected: {self.audience}")
    
                raise ValueError("Invalid token audience")
    
            except jwt.InvalidIssuerError:
    
                logger.warning(f"Invalid issuer in token. Expected: {self.issuer}")
    
                raise ValueError("Invalid token issuer")
    
            except Exception as e:
    
                logger.error(f"Token validation failed: {str(e)}")
    
                raise ValueError(f"Token validation failed: {str(e)}")
    
        
    
        async def _get_signing_keys(self) -> List[Dict]:
    
            """Get JWKS from Azure Entra ID with caching."""
    
            
    
            current_time = datetime.now(timezone.utc)
    
            
    
            # Check if cache is valid
    
            if (self._jwks_cache and self._jwks_cache_expiry and 
    
                current_time < self._jwks_cache_expiry):
    
                return self._jwks_cache
    
            
    
            # Fetch new JWKS
    
            jwks_url = f"{self.issuer}/keys"
    
            
    
            async with aiohttp.ClientSession() as session:
    
                async with session.get(jwks_url) as response:
    
                    if response.status != 200:
    
                        raise Exception(f"Failed to fetch JWKS: {response.status}")
    
                    
    
                    jwks_data = await response.json()
    
                    
    
            # Cache for 1 hour
    
            self._jwks_cache = jwks_data['keys']
    
            self._jwks_cache_expiry = current_time.replace(
    
                hour=current_time.hour + 1
    
            )
    
            
    
            return self._jwks_cache
    
        
    
        def _extract_user_info(self, payload: Dict) -> Dict:
    
            """Extract user information from JWT payload."""
    
            
    
            return {
    
                'user_id': payload.get('oid') or payload.get('sub'),
    
                'email': payload.get('email') or payload.get('preferred_username'),
    
                'name': payload.get('name'),
    
                'tenant_id': payload.get('tid'),
    
                'roles': payload.get('roles', []),
    
                'groups': payload.get('groups', []),
    
                'app_roles': payload.get('app_roles', []),
    
                'scope': payload.get('scp', '').split() if payload.get('scp') else [],
    
                'expires_at': datetime.fromtimestamp(payload['exp'], timezone.utc),
    
                'issued_at': datetime.fromtimestamp(payload['iat'], timezone.utc)
    
            }
    
        
    
        async def get_user_store_access(self, user_id: str) -> List[str]:
    
            """Get list of stores the user has access to."""
    
            
    
            try:
    
                # This would typically query your user/store mapping
    
                # For demo, we'll use a simple Key Vault secret
    
                secret_name = f"user-{user_id}-stores"
    
                
    
                if self.secret_client:
    
                    secret = await self.secret_client.get_secret(secret_name)
    
                    store_list = secret.value.split(',')
    
                    return [store.strip() for store in store_list if store.strip()]
    
                
    
                # Fallback: return default store access
    
                logger.warning(f"No store mapping found for user {user_id}, using default")
    
                return ['seattle']  # Default store access
    
                
    
            except Exception as e:
    
                logger.error(f"Failed to get store access for user {user_id}: {e}")
    
                return []  # No access if we can't determine stores
    
    
    
    # Global authenticator instance
    
    azure_authenticator = AzureAuthenticator()
    
    

    Authorization Middleware

    
    # mcp_server/security/authorization.py
    
    """
    
    Authorization middleware and decorators for MCP server.
    
    """
    
    import functools
    
    from typing import Dict, List, Optional, Callable, Any
    
    from fastapi import HTTPException, status, Request
    
    from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
    
    import logging
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    security = HTTPBearer()
    
    
    
    class AuthorizationError(Exception):
    
        """Custom authorization error."""
    
        pass
    
    
    
    class RoleBasedAuth:
    
        """Role-based access control implementation."""
    
        
    
        # Define role hierarchy
    
        ROLE_HIERARCHY = {
    
            'store_admin': ['store_manager', 'store_user', 'store_readonly'],
    
            'store_manager': ['store_user', 'store_readonly'],
    
            'store_user': ['store_readonly'],
    
            'store_readonly': []
    
        }
    
        
    
        # Define permissions for each role
    
        ROLE_PERMISSIONS = {
    
            'store_admin': [
    
                'read_all', 'write_all', 'delete_all', 'manage_users'
    
            ],
    
            'store_manager': [
    
                'read_all', 'write_transactions', 'write_inventory', 'read_reports'
    
            ],
    
            'store_user': [
    
                'read_products', 'read_customers', 'write_transactions'
    
            ],
    
            'store_readonly': [
    
                'read_products', 'read_basic_reports'
    
            ]
    
        }
    
        
    
        @classmethod
    
        def has_permission(cls, user_roles: List[str], required_permission: str) -> bool:
    
            """Check if user has required permission."""
    
            
    
            user_permissions = set()
    
            
    
            for role in user_roles:
    
                # Add direct permissions
    
                user_permissions.update(cls.ROLE_PERMISSIONS.get(role, []))
    
                
    
                # Add inherited permissions
    
                inherited_roles = cls.ROLE_HIERARCHY.get(role, [])
    
                for inherited_role in inherited_roles:
    
                    user_permissions.update(cls.ROLE_PERMISSIONS.get(inherited_role, []))
    
            
    
            return required_permission in user_permissions
    
        
    
        @classmethod
    
        def get_user_stores(cls, user_info: Dict) -> List[str]:
    
            """Extract stores user has access to from user info."""
    
            
    
            # This would typically come from your user management system
    
            # For demo, we'll extract from custom claims or groups
    
            
    
            stores = []
    
            
    
            # Check for direct store assignments in groups
    
            for group in user_info.get('groups', []):
    
                if group.startswith('store_'):
    
                    store_id = group.replace('store_', '')
    
                    stores.append(store_id)
    
            
    
            # Check for app-specific roles
    
            for role in user_info.get('app_roles', []):
    
                if 'store:' in role:
    
                    _, store_id = role.split('store:', 1)
    
                    stores.append(store_id)
    
            
    
            return list(set(stores))  # Remove duplicates
    
    
    
    def require_auth(required_permission: str = None, require_store_access: bool = True):
    
        """Decorator to require authentication and authorization."""
    
        
    
        def decorator(func: Callable) -> Callable:
    
            @functools.wraps(func)
    
            async def wrapper(*args, **kwargs):
    
                # Extract request from args (FastAPI dependency injection)
    
                request = None
    
                for arg in args:
    
                    if isinstance(arg, Request):
    
                        request = arg
    
                        break
    
                
    
                if not request:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
    
                        detail="Request object not found"
    
                    )
    
                
    
                # Get authorization header
    
                auth_header = request.headers.get('Authorization')
    
                if not auth_header or not auth_header.startswith('Bearer '):
    
                    raise HTTPException(
    
                        status_code=status.HTTP_401_UNAUTHORIZED,
    
                        detail="Missing or invalid authorization header",
    
                        headers={"WWW-Authenticate": "Bearer"}
    
                    )
    
                
    
                token = auth_header.split(' ')[1]
    
                
    
                try:
    
                    # Validate token
    
                    user_info = await azure_authenticator.validate_token(token)
    
                    
    
                    # Check required permission
    
                    if required_permission:
    
                        user_roles = user_info.get('roles', [])
    
                        if not RoleBasedAuth.has_permission(user_roles, required_permission):
    
                            raise HTTPException(
    
                                status_code=status.HTTP_403_FORBIDDEN,
    
                                detail=f"Insufficient permissions. Required: {required_permission}"
    
                            )
    
                    
    
                    # Check store access
    
                    if require_store_access:
    
                        user_stores = RoleBasedAuth.get_user_stores(user_info)
    
                        if not user_stores:
    
                            raise HTTPException(
    
                                status_code=status.HTTP_403_FORBIDDEN,
    
                                detail="No store access configured for user"
    
                            )
    
                        
    
                        # Set default store context (first accessible store)
    
                        request.state.current_store = user_stores[0]
    
                        request.state.accessible_stores = user_stores
    
                    
    
                    # Add user info to request state
    
                    request.state.user_info = user_info
    
                    request.state.user_id = user_info['user_id']
    
                    
    
                    # Call the original function
    
                    return await func(*args, **kwargs)
    
                    
    
                except ValueError as e:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_401_UNAUTHORIZED,
    
                        detail=str(e),
    
                        headers={"WWW-Authenticate": "Bearer"}
    
                    )
    
                except AuthorizationError as e:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_403_FORBIDDEN,
    
                        detail=str(e)
    
                    )
    
            
    
            return wrapper
    
        return decorator
    
    
    
    def require_store_context(store_param: str = 'store_id'):
    
        """Decorator to validate and set store context."""
    
        
    
        def decorator(func: Callable) -> Callable:
    
            @functools.wraps(func)
    
            async def wrapper(*args, **kwargs):
    
                # Get store_id from kwargs
    
                store_id = kwargs.get(store_param)
    
                
    
                if not store_id:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_400_BAD_REQUEST,
    
                        detail=f"Missing required parameter: {store_param}"
    
                    )
    
                
    
                # Extract request from args
    
                request = None
    
                for arg in args:
    
                    if isinstance(arg, Request):
    
                        request = arg
    
                        break
    
                
    
                if not request or not hasattr(request.state, 'accessible_stores'):
    
                    raise HTTPException(
    
                        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
    
                        detail="Authentication required before store context validation"
    
                    )
    
                
    
                # Validate user has access to requested store
    
                if store_id not in request.state.accessible_stores:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_403_FORBIDDEN,
    
                        detail=f"Access denied to store: {store_id}"
    
                    )
    
                
    
                # Set store context in request state
    
                request.state.current_store = store_id
    
                
    
                return await func(*args, **kwargs)
    
            
    
            return wrapper
    
        return decorator
    
    

    πŸ” Security Audit and Compliance

    Comprehensive Audit Logging

    
    -- Security audit log table
    
    CREATE TABLE retail.security_audit_log (
    
        log_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    
        event_type VARCHAR(100) NOT NULL,
    
        user_name VARCHAR(100) NOT NULL,
    
        user_id VARCHAR(100),
    
        store_id VARCHAR(50),
    
        ip_address INET,
    
        user_agent TEXT,
    
        request_id VARCHAR(100),
    
        session_id VARCHAR(100),
    
        resource_type VARCHAR(100),
    
        resource_id VARCHAR(100),
    
        action VARCHAR(50) NOT NULL,
    
        success BOOLEAN NOT NULL DEFAULT TRUE,
    
        failure_reason TEXT,
    
        details JSONB,
    
        severity VARCHAR(20) DEFAULT 'INFO',
    
        created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    
        
    
        -- Ensure proper indexing for security queries
    
        CONSTRAINT valid_severity CHECK (severity IN ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'))
    
    );
    
    
    
    -- Indexes for security audit queries
    
    CREATE INDEX idx_security_audit_event_type ON retail.security_audit_log(event_type);
    
    CREATE INDEX idx_security_audit_user_name ON retail.security_audit_log(user_name);
    
    CREATE INDEX idx_security_audit_store_id ON retail.security_audit_log(store_id);
    
    CREATE INDEX idx_security_audit_created_at ON retail.security_audit_log(created_at);
    
    CREATE INDEX idx_security_audit_success ON retail.security_audit_log(success);
    
    CREATE INDEX idx_security_audit_severity ON retail.security_audit_log(severity);
    
    CREATE INDEX idx_security_audit_details ON retail.security_audit_log USING GIN(details);
    
    
    
    -- Function to log security events
    
    CREATE OR REPLACE FUNCTION retail.log_security_event(
    
        p_event_type VARCHAR(100),
    
        p_user_name VARCHAR(100),
    
        p_user_id VARCHAR(100) DEFAULT NULL,
    
        p_store_id VARCHAR(50) DEFAULT NULL,
    
        p_ip_address TEXT DEFAULT NULL,
    
        p_action VARCHAR(50) DEFAULT 'unknown',
    
        p_success BOOLEAN DEFAULT TRUE,
    
        p_failure_reason TEXT DEFAULT NULL,
    
        p_details JSONB DEFAULT NULL,
    
        p_severity VARCHAR(20) DEFAULT 'INFO'
    
    )
    
    RETURNS UUID
    
    LANGUAGE plpgsql
    
    SECURITY DEFINER
    
    AS $$
    
    DECLARE
    
        log_id UUID;
    
    BEGIN
    
        INSERT INTO retail.security_audit_log (
    
            event_type,
    
            user_name,
    
            user_id,
    
            store_id,
    
            ip_address,
    
            action,
    
            success,
    
            failure_reason,
    
            details,
    
            severity
    
        ) VALUES (
    
            p_event_type,
    
            p_user_name,
    
            p_user_id,
    
            p_store_id,
    
            p_ip_address::INET,
    
            p_action,
    
            p_success,
    
            p_failure_reason,
    
            p_details,
    
            p_severity
    
        ) RETURNING log_id INTO log_id;
    
        
    
        RETURN log_id;
    
    END;
    
    $$;
    
    
    
    -- Grant execute to MCP user
    
    GRANT EXECUTE ON FUNCTION retail.log_security_event TO mcp_user;
    
    

    Security Monitoring Views

    
    -- Failed authentication attempts
    
    CREATE VIEW retail.security_failed_auth AS
    
    SELECT 
    
        event_type,
    
        user_name,
    
        ip_address,
    
        COUNT(*) as attempt_count,
    
        MIN(created_at) as first_attempt,
    
        MAX(created_at) as last_attempt,
    
        ARRAY_AGG(DISTINCT failure_reason) as failure_reasons
    
    FROM retail.security_audit_log
    
    WHERE success = FALSE 
    
      AND event_type IN ('authentication_failed', 'token_validation_failed')
    
      AND created_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
    
    GROUP BY event_type, user_name, ip_address
    
    HAVING COUNT(*) >= 3  -- 3 or more failures
    
    ORDER BY attempt_count DESC, last_attempt DESC;
    
    
    
    -- Suspicious access patterns
    
    CREATE VIEW retail.security_suspicious_access AS
    
    SELECT 
    
        user_name,
    
        user_id,
    
        COUNT(DISTINCT ip_address) as ip_count,
    
        COUNT(DISTINCT store_id) as store_count,
    
        ARRAY_AGG(DISTINCT ip_address::TEXT) as ip_addresses,
    
        ARRAY_AGG(DISTINCT store_id) as stores_accessed,
    
        MIN(created_at) as first_access,
    
        MAX(created_at) as last_access
    
    FROM retail.security_audit_log
    
    WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
    
      AND success = TRUE
    
    GROUP BY user_name, user_id
    
    HAVING COUNT(DISTINCT ip_address) > 3  -- Access from multiple IPs
    
       OR COUNT(DISTINCT store_id) > 2     -- Access to multiple stores
    
    ORDER BY ip_count DESC, store_count DESC;
    
    
    
    -- Data access patterns
    
    CREATE VIEW retail.security_data_access_summary AS
    
    SELECT 
    
        DATE_TRUNC('hour', created_at) as access_hour,
    
        store_id,
    
        resource_type,
    
        action,
    
        COUNT(*) as access_count,
    
        COUNT(DISTINCT user_id) as unique_users
    
    FROM retail.security_audit_log
    
    WHERE resource_type IS NOT NULL
    
      AND created_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
    
    GROUP BY DATE_TRUNC('hour', created_at), store_id, resource_type, action
    
    ORDER BY access_hour DESC, access_count DESC;
    
    

    Security Event Monitoring

    
    # mcp_server/security/monitoring.py
    
    """
    
    Security monitoring and alerting for MCP server.
    
    """
    
    import asyncio
    
    import asyncpg
    
    from typing import Dict, List, Any
    
    from datetime import datetime, timedelta
    
    from dataclasses import dataclass
    
    import logging
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    @dataclass
    
    class SecurityAlert:
    
        """Security alert data structure."""
    
        alert_type: str
    
        severity: str
    
        message: str
    
        details: Dict[str, Any]
    
        timestamp: datetime
    
    
    
    class SecurityMonitor:
    
        """Monitor security events and generate alerts."""
    
        
    
        def __init__(self, db_connection_string: str):
    
            self.db_connection_string = db_connection_string
    
            self.alert_handlers = []
    
            
    
            # Alert thresholds
    
            self.thresholds = {
    
                'failed_auth_attempts': 5,      # per user per hour
    
                'multiple_ip_access': 3,        # different IPs per user per hour
    
                'excessive_data_access': 1000,  # queries per user per hour
    
                'privilege_escalation': 1,      # any attempt
    
                'unauthorized_store_access': 1  # any attempt
    
            }
    
        
    
        async def start_monitoring(self):
    
            """Start security monitoring loop."""
    
            logger.info("Starting security monitoring")
    
            
    
            while True:
    
                try:
    
                    await self._check_security_events()
    
                    await asyncio.sleep(300)  # Check every 5 minutes
    
                except Exception as e:
    
                    logger.error(f"Security monitoring error: {e}")
    
                    await asyncio.sleep(60)  # Short retry on error
    
        
    
        async def _check_security_events(self):
    
            """Check for security events and generate alerts."""
    
            
    
            conn = await asyncpg.connect(self.db_connection_string)
    
            
    
            try:
    
                # Check failed authentication attempts
    
                await self._check_failed_auth(conn)
    
                
    
                # Check suspicious access patterns
    
                await self._check_suspicious_access(conn)
    
                
    
                # Check data access anomalies
    
                await self._check_data_access_anomalies(conn)
    
                
    
                # Check unauthorized access attempts
    
                await self._check_unauthorized_access(conn)
    
                
    
            finally:
    
                await conn.close()
    
        
    
        async def _check_failed_auth(self, conn):
    
            """Check for excessive failed authentication attempts."""
    
            
    
            query = """
    
            SELECT 
    
                user_name,
    
                ip_address,
    
                COUNT(*) as attempt_count,
    
                MAX(created_at) as last_attempt
    
            FROM retail.security_audit_log
    
            WHERE success = FALSE 
    
              AND event_type IN ('authentication_failed', 'token_validation_failed')
    
              AND created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
    
            GROUP BY user_name, ip_address
    
            HAVING COUNT(*) >= $1
    
            """
    
            
    
            results = await conn.fetch(query, self.thresholds['failed_auth_attempts'])
    
            
    
            for record in results:
    
                alert = SecurityAlert(
    
                    alert_type='failed_authentication',
    
                    severity='HIGH',
    
                    message=f"Excessive failed login attempts for user {record['user_name']}",
    
                    details={
    
                        'user_name': record['user_name'],
    
                        'ip_address': str(record['ip_address']),
    
                        'attempt_count': record['attempt_count'],
    
                        'last_attempt': record['last_attempt'].isoformat()
    
                    },
    
                    timestamp=datetime.now()
    
                )
    
                
    
                await self._send_alert(alert)
    
        
    
        async def _check_suspicious_access(self, conn):
    
            """Check for suspicious access patterns."""
    
            
    
            query = """
    
            SELECT 
    
                user_name,
    
                user_id,
    
                COUNT(DISTINCT ip_address) as ip_count,
    
                ARRAY_AGG(DISTINCT ip_address::TEXT) as ip_addresses
    
            FROM retail.security_audit_log
    
            WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
    
              AND success = TRUE
    
            GROUP BY user_name, user_id
    
            HAVING COUNT(DISTINCT ip_address) >= $1
    
            """
    
            
    
            results = await conn.fetch(query, self.thresholds['multiple_ip_access'])
    
            
    
            for record in results:
    
                alert = SecurityAlert(
    
                    alert_type='suspicious_access',
    
                    severity='MEDIUM',
    
                    message=f"User {record['user_name']} accessed from multiple IP addresses",
    
                    details={
    
                        'user_name': record['user_name'],
    
                        'user_id': record['user_id'],
    
                        'ip_count': record['ip_count'],
    
                        'ip_addresses': record['ip_addresses']
    
                    },
    
                    timestamp=datetime.now()
    
                )
    
                
    
                await self._send_alert(alert)
    
        
    
        async def _check_unauthorized_access(self, conn):
    
            """Check for unauthorized store access attempts."""
    
            
    
            query = """
    
            SELECT 
    
                user_name,
    
                user_id,
    
                store_id,
    
                failure_reason,
    
                created_at
    
            FROM retail.security_audit_log
    
            WHERE success = FALSE 
    
              AND event_type = 'unauthorized_store_access'
    
              AND created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
    
            """
    
            
    
            results = await conn.fetch(query)
    
            
    
            for record in results:
    
                alert = SecurityAlert(
    
                    alert_type='unauthorized_access',
    
                    severity='HIGH',
    
                    message=f"Unauthorized store access attempt by {record['user_name']}",
    
                    details={
    
                        'user_name': record['user_name'],
    
                        'user_id': record['user_id'],
    
                        'store_id': record['store_id'],
    
                        'failure_reason': record['failure_reason'],
    
                        'timestamp': record['created_at'].isoformat()
    
                    },
    
                    timestamp=datetime.now()
    
                )
    
                
    
                await self._send_alert(alert)
    
        
    
        async def _send_alert(self, alert: SecurityAlert):
    
            """Send security alert to all configured handlers."""
    
            
    
            logger.warning(
    
                f"Security Alert: {alert.alert_type} - {alert.message}",
    
                extra={'alert_details': alert.details}
    
            )
    
            
    
            # Send to configured alert handlers
    
            for handler in self.alert_handlers:
    
                try:
    
                    await handler.send_alert(alert)
    
                except Exception as e:
    
                    logger.error(f"Failed to send alert via {handler.__class__.__name__}: {e}")
    
        
    
        def add_alert_handler(self, handler):
    
            """Add alert handler."""
    
            self.alert_handlers.append(handler)
    
    

    πŸ§ͺ Security Testing and Validation

    Automated Security Tests

    
    # tests/security/test_security.py
    
    """
    
    Comprehensive security tests for MCP server.
    
    """
    
    import pytest
    
    import asyncio
    
    import asyncpg
    
    from datetime import datetime, timezone
    
    import jwt
    
    from unittest.mock import Mock, patch
    
    
    
    class TestRowLevelSecurity:
    
        """Test Row Level Security implementation."""
    
        
    
        @pytest.fixture
    
        async def db_connection(self):
    
            """Database connection for testing."""
    
            conn = await asyncpg.connect(
    
                "postgresql://mcp_user:password@localhost:5432/retail_test"
    
            )
    
            yield conn
    
            await conn.close()
    
        
    
        async def test_store_context_isolation(self, db_connection):
    
            """Test that RLS properly isolates data by store."""
    
            
    
            # Set Seattle store context
    
            await db_connection.execute("SELECT retail.set_store_context('seattle')")
    
            
    
            # Get customer count
    
            seattle_customers = await db_connection.fetchval(
    
                "SELECT COUNT(*) FROM retail.customers"
    
            )
    
            
    
            # Set Redmond store context
    
            await db_connection.execute("SELECT retail.set_store_context('redmond')")
    
            
    
            # Get customer count
    
            redmond_customers = await db_connection.fetchval(
    
                "SELECT COUNT(*) FROM retail.customers"
    
            )
    
            
    
            # Verify isolation (counts should be different)
    
            assert seattle_customers != redmond_customers or (
    
                seattle_customers == 0 and redmond_customers == 0
    
            )
    
        
    
        async def test_unauthorized_store_access(self, db_connection):
    
            """Test that invalid store access is blocked."""
    
            
    
            with pytest.raises(Exception) as exc_info:
    
                await db_connection.execute("SELECT retail.set_store_context('invalid_store')")
    
            
    
            assert "Store not found" in str(exc_info.value)
    
        
    
        async def test_cross_store_data_leakage(self, db_connection):
    
            """Test that users cannot access data from other stores."""
    
            
    
            # Set context to one store
    
            await db_connection.execute("SELECT retail.set_store_context('seattle')")
    
            
    
            # Try to insert data with different store_id
    
            with pytest.raises(Exception):
    
                await db_connection.execute("""
    
                    INSERT INTO retail.customers (store_id, first_name, last_name, email)
    
                    VALUES ('redmond', 'Test', 'User', 'test@example.com')
    
                """)
    
    
    
    class TestAuthentication:
    
        """Test authentication and authorization."""
    
        
    
        def test_valid_jwt_token(self):
    
            """Test valid JWT token validation."""
    
            
    
            # Mock valid token
    
            token_payload = {
    
                'oid': 'user-123',
    
                'email': 'test@example.com',
    
                'name': 'Test User',
    
                'tid': 'tenant-123',
    
                'aud': 'app-client-id',
    
                'iss': 'https://login.microsoftonline.com/tenant-123/v2.0',
    
                'exp': int((datetime.now(timezone.utc)).timestamp()) + 3600,
    
                'iat': int((datetime.now(timezone.utc)).timestamp()),
    
                'roles': ['store_user']
    
            }
    
            
    
            # This would require mocking the JWKS endpoint
    
            # In real implementation, use proper test JWT tokens
    
            
    
        def test_expired_token_rejection(self):
    
            """Test that expired tokens are rejected."""
    
            
    
            token_payload = {
    
                'oid': 'user-123',
    
                'exp': int((datetime.now(timezone.utc)).timestamp()) - 3600,  # Expired
    
                'iat': int((datetime.now(timezone.utc)).timestamp()) - 7200
    
            }
    
            
    
            # Test would verify that expired tokens are rejected
    
            
    
        def test_invalid_audience_rejection(self):
    
            """Test that tokens with wrong audience are rejected."""
    
            
    
            token_payload = {
    
                'oid': 'user-123',
    
                'aud': 'wrong-audience',  # Invalid audience
    
                'exp': int((datetime.now(timezone.utc)).timestamp()) + 3600,
    
                'iat': int((datetime.now(timezone.utc)).timestamp())
    
            }
    
            
    
            # Test would verify that wrong audience tokens are rejected
    
    
    
    class TestAuthorization:
    
        """Test role-based authorization."""
    
        
    
        def test_role_hierarchy(self):
    
            """Test that role hierarchy works correctly."""
    
            
    
            from mcp_server.security.authorization import RoleBasedAuth
    
            
    
            # Store admin should have all permissions
    
            assert RoleBasedAuth.has_permission(['store_admin'], 'read_all')
    
            assert RoleBasedAuth.has_permission(['store_admin'], 'write_all')
    
            assert RoleBasedAuth.has_permission(['store_admin'], 'delete_all')
    
            
    
            # Store user should have limited permissions
    
            assert RoleBasedAuth.has_permission(['store_user'], 'read_products')
    
            assert not RoleBasedAuth.has_permission(['store_user'], 'delete_all')
    
            
    
            # Store readonly should have minimal permissions
    
            assert RoleBasedAuth.has_permission(['store_readonly'], 'read_products')
    
            assert not RoleBasedAuth.has_permission(['store_readonly'], 'write_transactions')
    
        
    
        def test_permission_inheritance(self):
    
            """Test that permissions are properly inherited."""
    
            
    
            from mcp_server.security.authorization import RoleBasedAuth
    
            
    
            # Manager should inherit user permissions
    
            assert RoleBasedAuth.has_permission(['store_manager'], 'read_products')
    
            assert RoleBasedAuth.has_permission(['store_manager'], 'write_transactions')
    
    
    
    # Security test runner
    
    if __name__ == "__main__":
    
        pytest.main([__file__, "-v"])
    
    

    Penetration Testing Checklist

    
    # security-test-checklist.yml
    
    penetration_testing:
    
      
    
      authentication_bypass:
    
        - name: "Test authentication bypass attempts"
    
          tests:
    
            - "Missing Authorization header"
    
            - "Malformed JWT tokens"
    
            - "Replay attack with expired tokens"
    
            - "Token signature manipulation"
    
            - "Audience/issuer manipulation"
    
        
    
      authorization_escalation:
    
        - name: "Test privilege escalation attempts"
    
          tests:
    
            - "Role manipulation in token"
    
            - "Store access boundary testing"
    
            - "Cross-tenant data access attempts"
    
            - "Administrative function access"
    
        
    
      sql_injection:
    
        - name: "Test SQL injection vulnerabilities"
    
          tests:
    
            - "Parameter injection in search queries"
    
            - "Store ID manipulation"
    
            - "JSON parameter injection"
    
            - "Union-based injection attempts"
    
        
    
      data_exposure:
    
        - name: "Test for data exposure vulnerabilities"
    
          tests:
    
            - "Error message information disclosure"
    
            - "Timing attack possibilities"
    
            - "Cross-store data leakage"
    
            - "Audit log exposure"
    
        
    
      rate_limiting:
    
        - name: "Test rate limiting and DoS protection"
    
          tests:
    
            - "Authentication endpoint flooding"
    
            - "API endpoint rate limits"
    
            - "Resource exhaustion attempts"
    
            - "Connection pool exhaustion"
    
    

    🎯 Key Takeaways

    After completing this lab, you should have:

    βœ… Multi-Tenant Security: Implemented Row Level Security for complete data isolation

    βœ… Azure Authentication: Integrated Azure Entra ID with JWT validation

    βœ… Role-Based Authorization: Configured hierarchical role and permission system

    βœ… Comprehensive Audit Logging: Established security event tracking and monitoring

    βœ… Security Testing: Implemented automated security validation tests

    βœ… Threat Monitoring: Created real-time security event detection and alerting

    πŸš€ What's Next

    Continue with Lab 03: Environment Setup to:

  • Configure development environments with security best practices
  • Set up Azure services for authentication and monitoring
  • Implement secure database connections and secrets management
  • Validate security configurations in development environments
  • πŸ“š Additional Resources

    Azure Security

  • Azure Entra ID Documentation - Complete identity platform guide
  • Azure Key Vault - Secrets management service
  • Azure Security Best Practices - Security guidance
  • Database Security

  • PostgreSQL Row Level Security - Official RLS documentation
  • Database Security Checklist - PostgreSQL security guide
  • Multi-Tenant Database Patterns - Architecture patterns
  • Security Testing

  • OWASP Testing Guide - Comprehensive security testing
  • JWT Security Best Practices - JWT security considerations
  • API Security Testing - API-specific security testing
  • ---

    Previous: Lab 01: Core Architecture Concepts

    Next: Lab 03: Environment Setup

    λ³΄μ•ˆ 및 λ©€ν‹° ν…Œλ„Œμ‹œ

    🎯 이 μ‹€μŠ΅μ—μ„œ λ‹€λ£¨λŠ” λ‚΄μš©

    이 μ‹€μŠ΅μ€ MCP μ„œλ²„μ— λŒ€ν•œ μ—”ν„°ν”„λΌμ΄μ¦ˆκΈ‰ λ³΄μ•ˆ 및 λ©€ν‹° ν…Œλ„Œμ‹œ κ΅¬ν˜„μ— λŒ€ν•œ 포괄적인 지침을 μ œκ³΅ν•©λ‹ˆλ‹€. λ―Όκ°ν•œ μ†Œλ§€ 데이터λ₯Ό λ³΄ν˜Έν•˜λ©΄μ„œ μ—¬λŸ¬ ν…Œλ„ŒνŠΈ 간에 μœ μ—°ν•œ μ ‘κ·Ό νŒ¨ν„΄μ„ κ°€λŠ₯ν•˜κ²Œ ν•˜λŠ” μ•ˆμ „ν•˜κ³  μ€€μˆ˜ν•œ μ‹œμŠ€ν…œμ„ μ„€κ³„ν•˜λŠ” 방법을 배우게 λ©λ‹ˆλ‹€.

    κ°œμš”

    고객 데이터, 결제 정보, λΉ„μ¦ˆλ‹ˆμŠ€ μΈν…”λ¦¬μ „μŠ€λ₯Ό μ²˜λ¦¬ν•˜λŠ” μ†Œλ§€ μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ—μ„œ λ³΄μ•ˆμ€ 맀우 μ€‘μš”ν•©λ‹ˆλ‹€. 이 μ‹€μŠ΅μ—μ„œλŠ” 인증 및 κΆŒν•œ λΆ€μ—¬λΆ€ν„° 데이터 격리 및 μ€€μˆ˜ λͺ¨λ‹ˆν„°λ§κΉŒμ§€ μ™„μ „ν•œ λ³΄μ•ˆ μ•„ν‚€ν…μ²˜λ₯Ό λ‹€λ£Ήλ‹ˆλ‹€.

    Azure ID μ„œλΉ„μŠ€, PostgreSQL ν–‰ μˆ˜μ€€ λ³΄μ•ˆ, μ• ν”Œλ¦¬μΌ€μ΄μ…˜ μˆ˜μ€€ μ œμ–΄, 포괄적인 감사 둜그λ₯Ό κ²°ν•©ν•œ 심측 λ°©μ–΄ μ „λž΅μ„ κ΅¬ν˜„ν•˜μ—¬ κ°•λ ₯ν•˜κ³  μ€€μˆ˜ν•œ ν”Œλž«νΌμ„ λ§Œλ“­λ‹ˆλ‹€.

    ν•™μŠ΅ λͺ©ν‘œ

    이 μ‹€μŠ΅μ„ μ™„λ£Œν•˜λ©΄ λ‹€μŒμ„ μˆ˜ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€:

  • κ΅¬ν˜„: λ©€ν‹° ν…Œλ„ŒνŠΈ 데이터 격리λ₯Ό μœ„ν•œ μ—”ν„°ν”„λΌμ΄μ¦ˆκΈ‰ ν–‰ μˆ˜μ€€ λ³΄μ•ˆ
  • 섀계: Azureλ₯Ό ν™œμš©ν•œ μ•ˆμ „ν•œ 인증 및 κΆŒν•œ λΆ€μ—¬ νŒ¨ν„΄
  • ꡬ성: μ€€μˆ˜ μš”κ΅¬ 사항을 μœ„ν•œ 포괄적인 감사 둜그
  • 적용: λͺ¨λ“  μ• ν”Œλ¦¬μΌ€μ΄μ…˜ κ³„μΈ΅μ—μ„œ 심측 λ°©μ–΄ λ³΄μ•ˆ μ „λž΅
  • 검증: 체계적인 ν…ŒμŠ€νŠΈλ₯Ό 톡해 λ³΄μ•ˆ κ΅¬ν˜„ 확인
  • λͺ¨λ‹ˆν„°λ§: λ³΄μ•ˆ 이벀트λ₯Ό κ°μ‹œν•˜κ³  잠재적 μœ„ν˜‘μ— λŒ€μ‘
  • πŸ” λ©€ν‹° ν…Œλ„ŒνŠΈ λ³΄μ•ˆ μ•„ν‚€ν…μ²˜

    λ³΄μ•ˆ 계측 κ°œμš”

    
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    
    β”‚               Azure Front Door                  β”‚ ← WAF, DDoS Protection
    
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚              Application Gateway                β”‚ ← SSL Termination, Rate Limiting
    
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚                MCP Server                       β”‚ ← Authentication, Authorization
    
    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚  β”‚           Connection Layer                  β”‚ ← Connection Pooling, Circuit Breakers
    
    β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚  β”‚         Business Logic Layer               β”‚ ← Input Validation, Business Rules
    
    β”‚  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚  β”‚           Data Access Layer                β”‚ ← Query Sanitization, RLS Context
    
    β”‚  └──────────────────────────────────────────────
    
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    
    β”‚              PostgreSQL RLS                    β”‚ ← Row Level Security, Audit Triggers
    
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    
    

    λ©€ν‹° ν…Œλ„Œμ‹œ λͺ¨λΈ

    우리의 κ΅¬ν˜„μ€ 곡유 λ°μ΄ν„°λ² μ΄μŠ€, 곡유 μŠ€ν‚€λ§ˆ λͺ¨λΈκ³Ό ν–‰ μˆ˜μ€€ λ³΄μ•ˆμ„ μ‚¬μš©ν•©λ‹ˆλ‹€:

    μž₯점:

  • λΉ„μš© 효율적인 μžμ› ν™œμš©
  • κ°„μ†Œν™”λœ μœ μ§€λ³΄μˆ˜ 및 μ—…λ°μ΄νŠΈ
  • RLSλ₯Ό ν†΅ν•œ κ°•λ ₯ν•œ 데이터 격리
  • μ€€μˆ˜ μΉœν™”μ μΈ 감사 기둝
  • 단점:

  • μ‹ μ€‘ν•œ RLS μ •μ±… 섀계 ν•„μš”
  • μŠ€ν‚€λ§ˆ 변경이 λͺ¨λ“  ν…Œλ„ŒνŠΈμ— 영ν–₯을 λ―ΈμΉ¨
  • κ²¬κ³ ν•œ λ°±μ—…/볡원 절차 ν•„μš”
  • πŸ›‘οΈ ν–‰ μˆ˜μ€€ λ³΄μ•ˆ κ΅¬ν˜„

    RLS 기초

    
    -- Enable RLS on all multi-tenant tables
    
    ALTER TABLE retail.customers ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.products ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.sales_transactions ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.sales_transaction_items ENABLE ROW LEVEL SECURITY;
    
    ALTER TABLE retail.product_embeddings ENABLE ROW LEVEL SECURITY;
    
    
    
    -- Create application role for MCP server
    
    CREATE ROLE mcp_user LOGIN;
    
    GRANT USAGE ON SCHEMA retail TO mcp_user;
    
    GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA retail TO mcp_user;
    
    

    μŠ€ν† μ–΄ μ»¨ν…μŠ€νŠΈ 관리

    
    -- Function to securely set store context
    
    CREATE OR REPLACE FUNCTION retail.set_store_context(store_id_param VARCHAR(50))
    
    RETURNS void
    
    LANGUAGE plpgsql
    
    SECURITY DEFINER
    
    SET search_path = retail, pg_temp
    
    AS $$
    
    DECLARE
    
        user_info RECORD;
    
    BEGIN
    
        -- Validate store exists and is active
    
        SELECT store_id, store_name, is_active 
    
        INTO user_info
    
        FROM retail.stores 
    
        WHERE store_id = store_id_param;
    
        
    
        IF NOT FOUND THEN
    
            RAISE EXCEPTION 'Store not found: %', store_id_param
    
                USING ERRCODE = 'invalid_parameter_value',
    
                      HINT = 'Verify store ID and ensure it exists in the system';
    
        END IF;
    
        
    
        IF NOT user_info.is_active THEN
    
            RAISE EXCEPTION 'Store is inactive: %', store_id_param
    
                USING ERRCODE = 'insufficient_privilege',
    
                      HINT = 'Contact administrator to activate store';
    
        END IF;
    
        
    
        -- Set the secure context
    
        PERFORM set_config('app.current_store_id', store_id_param, false);
    
        PERFORM set_config('app.store_name', user_info.store_name, false);
    
        PERFORM set_config('app.context_set_at', extract(epoch from current_timestamp)::text, false);
    
        
    
        -- Log context change for audit
    
        INSERT INTO retail.security_audit_log (
    
            event_type,
    
            user_name,
    
            store_id,
    
            ip_address,
    
            user_agent,
    
            details,
    
            severity
    
        ) VALUES (
    
            'store_context_set',
    
            current_user,
    
            store_id_param,
    
            inet_client_addr()::text,
    
            current_setting('application_name', true),
    
            jsonb_build_object(
    
                'store_name', user_info.store_name,
    
                'timestamp', current_timestamp,
    
                'session_id', pg_backend_pid()
    
            ),
    
            'INFO'
    
        );
    
    END;
    
    $$;
    
    
    
    -- Grant execute to MCP user
    
    GRANT EXECUTE ON FUNCTION retail.set_store_context TO mcp_user;
    
    

    RLS μ •μ±…

    
    -- Customers RLS Policy
    
    CREATE POLICY customers_store_isolation ON retail.customers
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        )
    
        WITH CHECK (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        );
    
    
    
    -- Products RLS Policy with additional business rules
    
    CREATE POLICY products_store_isolation ON retail.products
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
            AND is_active = TRUE  -- Additional business rule
    
        )
    
        WITH CHECK (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        );
    
    
    
    -- Sales Transactions RLS Policy
    
    CREATE POLICY sales_transactions_store_isolation ON retail.sales_transactions
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        )
    
        WITH CHECK (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        );
    
    
    
    -- Transaction Items RLS Policy (via join)
    
    CREATE POLICY sales_transaction_items_store_isolation ON retail.sales_transaction_items
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            transaction_id IN (
    
                SELECT transaction_id 
    
                FROM retail.sales_transactions 
    
                WHERE store_id = current_setting('app.current_store_id', true)
    
            )
    
        )
    
        WITH CHECK (
    
            transaction_id IN (
    
                SELECT transaction_id 
    
                FROM retail.sales_transactions 
    
                WHERE store_id = current_setting('app.current_store_id', true)
    
            )
    
        );
    
    
    
    -- Product Embeddings RLS Policy
    
    CREATE POLICY product_embeddings_store_isolation ON retail.product_embeddings
    
        FOR ALL
    
        TO mcp_user
    
        USING (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        )
    
        WITH CHECK (
    
            store_id = current_setting('app.current_store_id', true)
    
            AND current_setting('app.current_store_id', true) IS NOT NULL
    
            AND current_setting('app.current_store_id', true) != ''
    
        );
    
    

    RLS ν…ŒμŠ€νŠΈ 및 검증

    
    -- Test RLS policies with different store contexts
    
    DO $$
    
    DECLARE
    
        test_result RECORD;
    
        customer_count INTEGER;
    
        product_count INTEGER;
    
    BEGIN
    
        -- Test Seattle store context
    
        PERFORM retail.set_store_context('seattle');
    
        
    
        SELECT COUNT(*) INTO customer_count FROM retail.customers;
    
        SELECT COUNT(*) INTO product_count FROM retail.products;
    
        
    
        RAISE NOTICE 'Seattle store - Customers: %, Products: %', customer_count, product_count;
    
        
    
        -- Test Redmond store context
    
        PERFORM retail.set_store_context('redmond');
    
        
    
        SELECT COUNT(*) INTO customer_count FROM retail.customers;
    
        SELECT COUNT(*) INTO product_count FROM retail.products;
    
        
    
        RAISE NOTICE 'Redmond store - Customers: %, Products: %', customer_count, product_count;
    
        
    
        -- Verify data isolation
    
        IF customer_count > 0 AND product_count > 0 THEN
    
            RAISE NOTICE 'RLS policies are working correctly';
    
        ELSE
    
            RAISE WARNING 'RLS policies may not be configured correctly';
    
        END IF;
    
    END;
    
    $$;
    
    

    πŸ”‘ 인증 및 κΆŒν•œ λΆ€μ—¬

    Azure Entra ID 톡합

    
    # mcp_server/security/authentication.py
    
    """
    
    Azure Entra ID authentication for MCP server.
    
    """
    
    import os
    
    import jwt
    
    import aiohttp
    
    import asyncio
    
    from typing import Dict, Optional, List
    
    from datetime import datetime, timezone
    
    from azure.identity.aio import DefaultAzureCredential
    
    from azure.keyvault.secrets.aio import SecretClient
    
    import logging
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    class AzureAuthenticator:
    
        """Handle Azure Entra ID authentication and token validation."""
    
        
    
        def __init__(self):
    
            self.tenant_id = os.getenv('AZURE_TENANT_ID')
    
            self.client_id = os.getenv('AZURE_CLIENT_ID')
    
            self.audience = os.getenv('AZURE_AUDIENCE', self.client_id)
    
            self.issuer = f"https://login.microsoftonline.com/{self.tenant_id}/v2.0"
    
            
    
            # Cache for JWKS (JSON Web Key Set)
    
            self._jwks_cache = None
    
            self._jwks_cache_expiry = None
    
            
    
            # Key Vault for secrets
    
            self.key_vault_url = os.getenv('AZURE_KEY_VAULT_URL')
    
            self.credential = DefaultAzureCredential()
    
            
    
            if self.key_vault_url:
    
                self.secret_client = SecretClient(
    
                    vault_url=self.key_vault_url,
    
                    credential=self.credential
    
                )
    
        
    
        async def validate_token(self, token: str) -> Dict:
    
            """Validate JWT token from Azure Entra ID."""
    
            
    
            try:
    
                # Get signing keys
    
                signing_keys = await self._get_signing_keys()
    
                
    
                # Decode token header to get key ID
    
                unverified_header = jwt.get_unverified_header(token)
    
                key_id = unverified_header.get('kid')
    
                
    
                if not key_id:
    
                    raise ValueError("Token missing key ID")
    
                
    
                # Find the corresponding key
    
                signing_key = None
    
                for key in signing_keys:
    
                    if key['kid'] == key_id:
    
                        signing_key = jwt.algorithms.RSAAlgorithm.from_jwk(key)
    
                        break
    
                
    
                if not signing_key:
    
                    raise ValueError(f"Unable to find signing key for kid: {key_id}")
    
                
    
                # Validate and decode token
    
                payload = jwt.decode(
    
                    token,
    
                    signing_key,
    
                    algorithms=['RS256'],
    
                    audience=self.audience,
    
                    issuer=self.issuer,
    
                    options={
    
                        'verify_exp': True,
    
                        'verify_aud': True,
    
                        'verify_iss': True
    
                    }
    
                )
    
                
    
                # Extract user information
    
                user_info = self._extract_user_info(payload)
    
                
    
                # Log successful authentication
    
                logger.info(
    
                    "User authenticated successfully",
    
                    extra={
    
                        'user_id': user_info['user_id'],
    
                        'email': user_info.get('email'),
    
                        'tenant_id': payload.get('tid')
    
                    }
    
                )
    
                
    
                return user_info
    
                
    
            except jwt.ExpiredSignatureError:
    
                logger.warning("Token has expired")
    
                raise ValueError("Token has expired")
    
            except jwt.InvalidAudienceError:
    
                logger.warning(f"Invalid audience in token. Expected: {self.audience}")
    
                raise ValueError("Invalid token audience")
    
            except jwt.InvalidIssuerError:
    
                logger.warning(f"Invalid issuer in token. Expected: {self.issuer}")
    
                raise ValueError("Invalid token issuer")
    
            except Exception as e:
    
                logger.error(f"Token validation failed: {str(e)}")
    
                raise ValueError(f"Token validation failed: {str(e)}")
    
        
    
        async def _get_signing_keys(self) -> List[Dict]:
    
            """Get JWKS from Azure Entra ID with caching."""
    
            
    
            current_time = datetime.now(timezone.utc)
    
            
    
            # Check if cache is valid
    
            if (self._jwks_cache and self._jwks_cache_expiry and 
    
                current_time < self._jwks_cache_expiry):
    
                return self._jwks_cache
    
            
    
            # Fetch new JWKS
    
            jwks_url = f"{self.issuer}/keys"
    
            
    
            async with aiohttp.ClientSession() as session:
    
                async with session.get(jwks_url) as response:
    
                    if response.status != 200:
    
                        raise Exception(f"Failed to fetch JWKS: {response.status}")
    
                    
    
                    jwks_data = await response.json()
    
                    
    
            # Cache for 1 hour
    
            self._jwks_cache = jwks_data['keys']
    
            self._jwks_cache_expiry = current_time.replace(
    
                hour=current_time.hour + 1
    
            )
    
            
    
            return self._jwks_cache
    
        
    
        def _extract_user_info(self, payload: Dict) -> Dict:
    
            """Extract user information from JWT payload."""
    
            
    
            return {
    
                'user_id': payload.get('oid') or payload.get('sub'),
    
                'email': payload.get('email') or payload.get('preferred_username'),
    
                'name': payload.get('name'),
    
                'tenant_id': payload.get('tid'),
    
                'roles': payload.get('roles', []),
    
                'groups': payload.get('groups', []),
    
                'app_roles': payload.get('app_roles', []),
    
                'scope': payload.get('scp', '').split() if payload.get('scp') else [],
    
                'expires_at': datetime.fromtimestamp(payload['exp'], timezone.utc),
    
                'issued_at': datetime.fromtimestamp(payload['iat'], timezone.utc)
    
            }
    
        
    
        async def get_user_store_access(self, user_id: str) -> List[str]:
    
            """Get list of stores the user has access to."""
    
            
    
            try:
    
                # This would typically query your user/store mapping
    
                # For demo, we'll use a simple Key Vault secret
    
                secret_name = f"user-{user_id}-stores"
    
                
    
                if self.secret_client:
    
                    secret = await self.secret_client.get_secret(secret_name)
    
                    store_list = secret.value.split(',')
    
                    return [store.strip() for store in store_list if store.strip()]
    
                
    
                # Fallback: return default store access
    
                logger.warning(f"No store mapping found for user {user_id}, using default")
    
                return ['seattle']  # Default store access
    
                
    
            except Exception as e:
    
                logger.error(f"Failed to get store access for user {user_id}: {e}")
    
                return []  # No access if we can't determine stores
    
    
    
    # Global authenticator instance
    
    azure_authenticator = AzureAuthenticator()
    
    

    κΆŒν•œ λΆ€μ—¬ 미듀웨어

    
    # mcp_server/security/authorization.py
    
    """
    
    Authorization middleware and decorators for MCP server.
    
    """
    
    import functools
    
    from typing import Dict, List, Optional, Callable, Any
    
    from fastapi import HTTPException, status, Request
    
    from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
    
    import logging
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    security = HTTPBearer()
    
    
    
    class AuthorizationError(Exception):
    
        """Custom authorization error."""
    
        pass
    
    
    
    class RoleBasedAuth:
    
        """Role-based access control implementation."""
    
        
    
        # Define role hierarchy
    
        ROLE_HIERARCHY = {
    
            'store_admin': ['store_manager', 'store_user', 'store_readonly'],
    
            'store_manager': ['store_user', 'store_readonly'],
    
            'store_user': ['store_readonly'],
    
            'store_readonly': []
    
        }
    
        
    
        # Define permissions for each role
    
        ROLE_PERMISSIONS = {
    
            'store_admin': [
    
                'read_all', 'write_all', 'delete_all', 'manage_users'
    
            ],
    
            'store_manager': [
    
                'read_all', 'write_transactions', 'write_inventory', 'read_reports'
    
            ],
    
            'store_user': [
    
                'read_products', 'read_customers', 'write_transactions'
    
            ],
    
            'store_readonly': [
    
                'read_products', 'read_basic_reports'
    
            ]
    
        }
    
        
    
        @classmethod
    
        def has_permission(cls, user_roles: List[str], required_permission: str) -> bool:
    
            """Check if user has required permission."""
    
            
    
            user_permissions = set()
    
            
    
            for role in user_roles:
    
                # Add direct permissions
    
                user_permissions.update(cls.ROLE_PERMISSIONS.get(role, []))
    
                
    
                # Add inherited permissions
    
                inherited_roles = cls.ROLE_HIERARCHY.get(role, [])
    
                for inherited_role in inherited_roles:
    
                    user_permissions.update(cls.ROLE_PERMISSIONS.get(inherited_role, []))
    
            
    
            return required_permission in user_permissions
    
        
    
        @classmethod
    
        def get_user_stores(cls, user_info: Dict) -> List[str]:
    
            """Extract stores user has access to from user info."""
    
            
    
            # This would typically come from your user management system
    
            # For demo, we'll extract from custom claims or groups
    
            
    
            stores = []
    
            
    
            # Check for direct store assignments in groups
    
            for group in user_info.get('groups', []):
    
                if group.startswith('store_'):
    
                    store_id = group.replace('store_', '')
    
                    stores.append(store_id)
    
            
    
            # Check for app-specific roles
    
            for role in user_info.get('app_roles', []):
    
                if 'store:' in role:
    
                    _, store_id = role.split('store:', 1)
    
                    stores.append(store_id)
    
            
    
            return list(set(stores))  # Remove duplicates
    
    
    
    def require_auth(required_permission: str = None, require_store_access: bool = True):
    
        """Decorator to require authentication and authorization."""
    
        
    
        def decorator(func: Callable) -> Callable:
    
            @functools.wraps(func)
    
            async def wrapper(*args, **kwargs):
    
                # Extract request from args (FastAPI dependency injection)
    
                request = None
    
                for arg in args:
    
                    if isinstance(arg, Request):
    
                        request = arg
    
                        break
    
                
    
                if not request:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
    
                        detail="Request object not found"
    
                    )
    
                
    
                # Get authorization header
    
                auth_header = request.headers.get('Authorization')
    
                if not auth_header or not auth_header.startswith('Bearer '):
    
                    raise HTTPException(
    
                        status_code=status.HTTP_401_UNAUTHORIZED,
    
                        detail="Missing or invalid authorization header",
    
                        headers={"WWW-Authenticate": "Bearer"}
    
                    )
    
                
    
                token = auth_header.split(' ')[1]
    
                
    
                try:
    
                    # Validate token
    
                    user_info = await azure_authenticator.validate_token(token)
    
                    
    
                    # Check required permission
    
                    if required_permission:
    
                        user_roles = user_info.get('roles', [])
    
                        if not RoleBasedAuth.has_permission(user_roles, required_permission):
    
                            raise HTTPException(
    
                                status_code=status.HTTP_403_FORBIDDEN,
    
                                detail=f"Insufficient permissions. Required: {required_permission}"
    
                            )
    
                    
    
                    # Check store access
    
                    if require_store_access:
    
                        user_stores = RoleBasedAuth.get_user_stores(user_info)
    
                        if not user_stores:
    
                            raise HTTPException(
    
                                status_code=status.HTTP_403_FORBIDDEN,
    
                                detail="No store access configured for user"
    
                            )
    
                        
    
                        # Set default store context (first accessible store)
    
                        request.state.current_store = user_stores[0]
    
                        request.state.accessible_stores = user_stores
    
                    
    
                    # Add user info to request state
    
                    request.state.user_info = user_info
    
                    request.state.user_id = user_info['user_id']
    
                    
    
                    # Call the original function
    
                    return await func(*args, **kwargs)
    
                    
    
                except ValueError as e:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_401_UNAUTHORIZED,
    
                        detail=str(e),
    
                        headers={"WWW-Authenticate": "Bearer"}
    
                    )
    
                except AuthorizationError as e:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_403_FORBIDDEN,
    
                        detail=str(e)
    
                    )
    
            
    
            return wrapper
    
        return decorator
    
    
    
    def require_store_context(store_param: str = 'store_id'):
    
        """Decorator to validate and set store context."""
    
        
    
        def decorator(func: Callable) -> Callable:
    
            @functools.wraps(func)
    
            async def wrapper(*args, **kwargs):
    
                # Get store_id from kwargs
    
                store_id = kwargs.get(store_param)
    
                
    
                if not store_id:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_400_BAD_REQUEST,
    
                        detail=f"Missing required parameter: {store_param}"
    
                    )
    
                
    
                # Extract request from args
    
                request = None
    
                for arg in args:
    
                    if isinstance(arg, Request):
    
                        request = arg
    
                        break
    
                
    
                if not request or not hasattr(request.state, 'accessible_stores'):
    
                    raise HTTPException(
    
                        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
    
                        detail="Authentication required before store context validation"
    
                    )
    
                
    
                # Validate user has access to requested store
    
                if store_id not in request.state.accessible_stores:
    
                    raise HTTPException(
    
                        status_code=status.HTTP_403_FORBIDDEN,
    
                        detail=f"Access denied to store: {store_id}"
    
                    )
    
                
    
                # Set store context in request state
    
                request.state.current_store = store_id
    
                
    
                return await func(*args, **kwargs)
    
            
    
            return wrapper
    
        return decorator
    
    

    πŸ” λ³΄μ•ˆ 감사 및 μ€€μˆ˜

    포괄적인 감사 둜그

    
    -- Security audit log table
    
    CREATE TABLE retail.security_audit_log (
    
        log_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    
        event_type VARCHAR(100) NOT NULL,
    
        user_name VARCHAR(100) NOT NULL,
    
        user_id VARCHAR(100),
    
        store_id VARCHAR(50),
    
        ip_address INET,
    
        user_agent TEXT,
    
        request_id VARCHAR(100),
    
        session_id VARCHAR(100),
    
        resource_type VARCHAR(100),
    
        resource_id VARCHAR(100),
    
        action VARCHAR(50) NOT NULL,
    
        success BOOLEAN NOT NULL DEFAULT TRUE,
    
        failure_reason TEXT,
    
        details JSONB,
    
        severity VARCHAR(20) DEFAULT 'INFO',
    
        created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    
        
    
        -- Ensure proper indexing for security queries
    
        CONSTRAINT valid_severity CHECK (severity IN ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'))
    
    );
    
    
    
    -- Indexes for security audit queries
    
    CREATE INDEX idx_security_audit_event_type ON retail.security_audit_log(event_type);
    
    CREATE INDEX idx_security_audit_user_name ON retail.security_audit_log(user_name);
    
    CREATE INDEX idx_security_audit_store_id ON retail.security_audit_log(store_id);
    
    CREATE INDEX idx_security_audit_created_at ON retail.security_audit_log(created_at);
    
    CREATE INDEX idx_security_audit_success ON retail.security_audit_log(success);
    
    CREATE INDEX idx_security_audit_severity ON retail.security_audit_log(severity);
    
    CREATE INDEX idx_security_audit_details ON retail.security_audit_log USING GIN(details);
    
    
    
    -- Function to log security events
    
    CREATE OR REPLACE FUNCTION retail.log_security_event(
    
        p_event_type VARCHAR(100),
    
        p_user_name VARCHAR(100),
    
        p_user_id VARCHAR(100) DEFAULT NULL,
    
        p_store_id VARCHAR(50) DEFAULT NULL,
    
        p_ip_address TEXT DEFAULT NULL,
    
        p_action VARCHAR(50) DEFAULT 'unknown',
    
        p_success BOOLEAN DEFAULT TRUE,
    
        p_failure_reason TEXT DEFAULT NULL,
    
        p_details JSONB DEFAULT NULL,
    
        p_severity VARCHAR(20) DEFAULT 'INFO'
    
    )
    
    RETURNS UUID
    
    LANGUAGE plpgsql
    
    SECURITY DEFINER
    
    AS $$
    
    DECLARE
    
        log_id UUID;
    
    BEGIN
    
        INSERT INTO retail.security_audit_log (
    
            event_type,
    
            user_name,
    
            user_id,
    
            store_id,
    
            ip_address,
    
            action,
    
            success,
    
            failure_reason,
    
            details,
    
            severity
    
        ) VALUES (
    
            p_event_type,
    
            p_user_name,
    
            p_user_id,
    
            p_store_id,
    
            p_ip_address::INET,
    
            p_action,
    
            p_success,
    
            p_failure_reason,
    
            p_details,
    
            p_severity
    
        ) RETURNING log_id INTO log_id;
    
        
    
        RETURN log_id;
    
    END;
    
    $$;
    
    
    
    -- Grant execute to MCP user
    
    GRANT EXECUTE ON FUNCTION retail.log_security_event TO mcp_user;
    
    

    λ³΄μ•ˆ λͺ¨λ‹ˆν„°λ§ λ·°

    
    -- Failed authentication attempts
    
    CREATE VIEW retail.security_failed_auth AS
    
    SELECT 
    
        event_type,
    
        user_name,
    
        ip_address,
    
        COUNT(*) as attempt_count,
    
        MIN(created_at) as first_attempt,
    
        MAX(created_at) as last_attempt,
    
        ARRAY_AGG(DISTINCT failure_reason) as failure_reasons
    
    FROM retail.security_audit_log
    
    WHERE success = FALSE 
    
      AND event_type IN ('authentication_failed', 'token_validation_failed')
    
      AND created_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
    
    GROUP BY event_type, user_name, ip_address
    
    HAVING COUNT(*) >= 3  -- 3 or more failures
    
    ORDER BY attempt_count DESC, last_attempt DESC;
    
    
    
    -- Suspicious access patterns
    
    CREATE VIEW retail.security_suspicious_access AS
    
    SELECT 
    
        user_name,
    
        user_id,
    
        COUNT(DISTINCT ip_address) as ip_count,
    
        COUNT(DISTINCT store_id) as store_count,
    
        ARRAY_AGG(DISTINCT ip_address::TEXT) as ip_addresses,
    
        ARRAY_AGG(DISTINCT store_id) as stores_accessed,
    
        MIN(created_at) as first_access,
    
        MAX(created_at) as last_access
    
    FROM retail.security_audit_log
    
    WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
    
      AND success = TRUE
    
    GROUP BY user_name, user_id
    
    HAVING COUNT(DISTINCT ip_address) > 3  -- Access from multiple IPs
    
       OR COUNT(DISTINCT store_id) > 2     -- Access to multiple stores
    
    ORDER BY ip_count DESC, store_count DESC;
    
    
    
    -- Data access patterns
    
    CREATE VIEW retail.security_data_access_summary AS
    
    SELECT 
    
        DATE_TRUNC('hour', created_at) as access_hour,
    
        store_id,
    
        resource_type,
    
        action,
    
        COUNT(*) as access_count,
    
        COUNT(DISTINCT user_id) as unique_users
    
    FROM retail.security_audit_log
    
    WHERE resource_type IS NOT NULL
    
      AND created_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
    
    GROUP BY DATE_TRUNC('hour', created_at), store_id, resource_type, action
    
    ORDER BY access_hour DESC, access_count DESC;
    
    

    λ³΄μ•ˆ 이벀트 λͺ¨λ‹ˆν„°λ§

    
    # mcp_server/security/monitoring.py
    
    """
    
    Security monitoring and alerting for MCP server.
    
    """
    
    import asyncio
    
    import asyncpg
    
    from typing import Dict, List, Any
    
    from datetime import datetime, timedelta
    
    from dataclasses import dataclass
    
    import logging
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    @dataclass
    
    class SecurityAlert:
    
        """Security alert data structure."""
    
        alert_type: str
    
        severity: str
    
        message: str
    
        details: Dict[str, Any]
    
        timestamp: datetime
    
    
    
    class SecurityMonitor:
    
        """Monitor security events and generate alerts."""
    
        
    
        def __init__(self, db_connection_string: str):
    
            self.db_connection_string = db_connection_string
    
            self.alert_handlers = []
    
            
    
            # Alert thresholds
    
            self.thresholds = {
    
                'failed_auth_attempts': 5,      # per user per hour
    
                'multiple_ip_access': 3,        # different IPs per user per hour
    
                'excessive_data_access': 1000,  # queries per user per hour
    
                'privilege_escalation': 1,      # any attempt
    
                'unauthorized_store_access': 1  # any attempt
    
            }
    
        
    
        async def start_monitoring(self):
    
            """Start security monitoring loop."""
    
            logger.info("Starting security monitoring")
    
            
    
            while True:
    
                try:
    
                    await self._check_security_events()
    
                    await asyncio.sleep(300)  # Check every 5 minutes
    
                except Exception as e:
    
                    logger.error(f"Security monitoring error: {e}")
    
                    await asyncio.sleep(60)  # Short retry on error
    
        
    
        async def _check_security_events(self):
    
            """Check for security events and generate alerts."""
    
            
    
            conn = await asyncpg.connect(self.db_connection_string)
    
            
    
            try:
    
                # Check failed authentication attempts
    
                await self._check_failed_auth(conn)
    
                
    
                # Check suspicious access patterns
    
                await self._check_suspicious_access(conn)
    
                
    
                # Check data access anomalies
    
                await self._check_data_access_anomalies(conn)
    
                
    
                # Check unauthorized access attempts
    
                await self._check_unauthorized_access(conn)
    
                
    
            finally:
    
                await conn.close()
    
        
    
        async def _check_failed_auth(self, conn):
    
            """Check for excessive failed authentication attempts."""
    
            
    
            query = """
    
            SELECT 
    
                user_name,
    
                ip_address,
    
                COUNT(*) as attempt_count,
    
                MAX(created_at) as last_attempt
    
            FROM retail.security_audit_log
    
            WHERE success = FALSE 
    
              AND event_type IN ('authentication_failed', 'token_validation_failed')
    
              AND created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
    
            GROUP BY user_name, ip_address
    
            HAVING COUNT(*) >= $1
    
            """
    
            
    
            results = await conn.fetch(query, self.thresholds['failed_auth_attempts'])
    
            
    
            for record in results:
    
                alert = SecurityAlert(
    
                    alert_type='failed_authentication',
    
                    severity='HIGH',
    
                    message=f"Excessive failed login attempts for user {record['user_name']}",
    
                    details={
    
                        'user_name': record['user_name'],
    
                        'ip_address': str(record['ip_address']),
    
                        'attempt_count': record['attempt_count'],
    
                        'last_attempt': record['last_attempt'].isoformat()
    
                    },
    
                    timestamp=datetime.now()
    
                )
    
                
    
                await self._send_alert(alert)
    
        
    
        async def _check_suspicious_access(self, conn):
    
            """Check for suspicious access patterns."""
    
            
    
            query = """
    
            SELECT 
    
                user_name,
    
                user_id,
    
                COUNT(DISTINCT ip_address) as ip_count,
    
                ARRAY_AGG(DISTINCT ip_address::TEXT) as ip_addresses
    
            FROM retail.security_audit_log
    
            WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
    
              AND success = TRUE
    
            GROUP BY user_name, user_id
    
            HAVING COUNT(DISTINCT ip_address) >= $1
    
            """
    
            
    
            results = await conn.fetch(query, self.thresholds['multiple_ip_access'])
    
            
    
            for record in results:
    
                alert = SecurityAlert(
    
                    alert_type='suspicious_access',
    
                    severity='MEDIUM',
    
                    message=f"User {record['user_name']} accessed from multiple IP addresses",
    
                    details={
    
                        'user_name': record['user_name'],
    
                        'user_id': record['user_id'],
    
                        'ip_count': record['ip_count'],
    
                        'ip_addresses': record['ip_addresses']
    
                    },
    
                    timestamp=datetime.now()
    
                )
    
                
    
                await self._send_alert(alert)
    
        
    
        async def _check_unauthorized_access(self, conn):
    
            """Check for unauthorized store access attempts."""
    
            
    
            query = """
    
            SELECT 
    
                user_name,
    
                user_id,
    
                store_id,
    
                failure_reason,
    
                created_at
    
            FROM retail.security_audit_log
    
            WHERE success = FALSE 
    
              AND event_type = 'unauthorized_store_access'
    
              AND created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
    
            """
    
            
    
            results = await conn.fetch(query)
    
            
    
            for record in results:
    
                alert = SecurityAlert(
    
                    alert_type='unauthorized_access',
    
                    severity='HIGH',
    
                    message=f"Unauthorized store access attempt by {record['user_name']}",
    
                    details={
    
                        'user_name': record['user_name'],
    
                        'user_id': record['user_id'],
    
                        'store_id': record['store_id'],
    
                        'failure_reason': record['failure_reason'],
    
                        'timestamp': record['created_at'].isoformat()
    
                    },
    
                    timestamp=datetime.now()
    
                )
    
                
    
                await self._send_alert(alert)
    
        
    
        async def _send_alert(self, alert: SecurityAlert):
    
            """Send security alert to all configured handlers."""
    
            
    
            logger.warning(
    
                f"Security Alert: {alert.alert_type} - {alert.message}",
    
                extra={'alert_details': alert.details}
    
            )
    
            
    
            # Send to configured alert handlers
    
            for handler in self.alert_handlers:
    
                try:
    
                    await handler.send_alert(alert)
    
                except Exception as e:
    
                    logger.error(f"Failed to send alert via {handler.__class__.__name__}: {e}")
    
        
    
        def add_alert_handler(self, handler):
    
            """Add alert handler."""
    
            self.alert_handlers.append(handler)
    
    

    πŸ§ͺ λ³΄μ•ˆ ν…ŒμŠ€νŠΈ 및 검증

    μžλ™ν™”λœ λ³΄μ•ˆ ν…ŒμŠ€νŠΈ

    
    # tests/security/test_security.py
    
    """
    
    Comprehensive security tests for MCP server.
    
    """
    
    import pytest
    
    import asyncio
    
    import asyncpg
    
    from datetime import datetime, timezone
    
    import jwt
    
    from unittest.mock import Mock, patch
    
    
    
    class TestRowLevelSecurity:
    
        """Test Row Level Security implementation."""
    
        
    
        @pytest.fixture
    
        async def db_connection(self):
    
            """Database connection for testing."""
    
            conn = await asyncpg.connect(
    
                "postgresql://mcp_user:password@localhost:5432/retail_test"
    
            )
    
            yield conn
    
            await conn.close()
    
        
    
        async def test_store_context_isolation(self, db_connection):
    
            """Test that RLS properly isolates data by store."""
    
            
    
            # Set Seattle store context
    
            await db_connection.execute("SELECT retail.set_store_context('seattle')")
    
            
    
            # Get customer count
    
            seattle_customers = await db_connection.fetchval(
    
                "SELECT COUNT(*) FROM retail.customers"
    
            )
    
            
    
            # Set Redmond store context
    
            await db_connection.execute("SELECT retail.set_store_context('redmond')")
    
            
    
            # Get customer count
    
            redmond_customers = await db_connection.fetchval(
    
                "SELECT COUNT(*) FROM retail.customers"
    
            )
    
            
    
            # Verify isolation (counts should be different)
    
            assert seattle_customers != redmond_customers or (
    
                seattle_customers == 0 and redmond_customers == 0
    
            )
    
        
    
        async def test_unauthorized_store_access(self, db_connection):
    
            """Test that invalid store access is blocked."""
    
            
    
            with pytest.raises(Exception) as exc_info:
    
                await db_connection.execute("SELECT retail.set_store_context('invalid_store')")
    
            
    
            assert "Store not found" in str(exc_info.value)
    
        
    
        async def test_cross_store_data_leakage(self, db_connection):
    
            """Test that users cannot access data from other stores."""
    
            
    
            # Set context to one store
    
            await db_connection.execute("SELECT retail.set_store_context('seattle')")
    
            
    
            # Try to insert data with different store_id
    
            with pytest.raises(Exception):
    
                await db_connection.execute("""
    
                    INSERT INTO retail.customers (store_id, first_name, last_name, email)
    
                    VALUES ('redmond', 'Test', 'User', 'test@example.com')
    
                """)
    
    
    
    class TestAuthentication:
    
        """Test authentication and authorization."""
    
        
    
        def test_valid_jwt_token(self):
    
            """Test valid JWT token validation."""
    
            
    
            # Mock valid token
    
            token_payload = {
    
                'oid': 'user-123',
    
                'email': 'test@example.com',
    
                'name': 'Test User',
    
                'tid': 'tenant-123',
    
                'aud': 'app-client-id',
    
                'iss': 'https://login.microsoftonline.com/tenant-123/v2.0',
    
                'exp': int((datetime.now(timezone.utc)).timestamp()) + 3600,
    
                'iat': int((datetime.now(timezone.utc)).timestamp()),
    
                'roles': ['store_user']
    
            }
    
            
    
            # This would require mocking the JWKS endpoint
    
            # In real implementation, use proper test JWT tokens
    
            
    
        def test_expired_token_rejection(self):
    
            """Test that expired tokens are rejected."""
    
            
    
            token_payload = {
    
                'oid': 'user-123',
    
                'exp': int((datetime.now(timezone.utc)).timestamp()) - 3600,  # Expired
    
                'iat': int((datetime.now(timezone.utc)).timestamp()) - 7200
    
            }
    
            
    
            # Test would verify that expired tokens are rejected
    
            
    
        def test_invalid_audience_rejection(self):
    
            """Test that tokens with wrong audience are rejected."""
    
            
    
            token_payload = {
    
                'oid': 'user-123',
    
                'aud': 'wrong-audience',  # Invalid audience
    
                'exp': int((datetime.now(timezone.utc)).timestamp()) + 3600,
    
                'iat': int((datetime.now(timezone.utc)).timestamp())
    
            }
    
            
    
            # Test would verify that wrong audience tokens are rejected
    
    
    
    class TestAuthorization:
    
        """Test role-based authorization."""
    
        
    
        def test_role_hierarchy(self):
    
            """Test that role hierarchy works correctly."""
    
            
    
            from mcp_server.security.authorization import RoleBasedAuth
    
            
    
            # Store admin should have all permissions
    
            assert RoleBasedAuth.has_permission(['store_admin'], 'read_all')
    
            assert RoleBasedAuth.has_permission(['store_admin'], 'write_all')
    
            assert RoleBasedAuth.has_permission(['store_admin'], 'delete_all')
    
            
    
            # Store user should have limited permissions
    
            assert RoleBasedAuth.has_permission(['store_user'], 'read_products')
    
            assert not RoleBasedAuth.has_permission(['store_user'], 'delete_all')
    
            
    
            # Store readonly should have minimal permissions
    
            assert RoleBasedAuth.has_permission(['store_readonly'], 'read_products')
    
            assert not RoleBasedAuth.has_permission(['store_readonly'], 'write_transactions')
    
        
    
        def test_permission_inheritance(self):
    
            """Test that permissions are properly inherited."""
    
            
    
            from mcp_server.security.authorization import RoleBasedAuth
    
            
    
            # Manager should inherit user permissions
    
            assert RoleBasedAuth.has_permission(['store_manager'], 'read_products')
    
            assert RoleBasedAuth.has_permission(['store_manager'], 'write_transactions')
    
    
    
    # Security test runner
    
    if __name__ == "__main__":
    
        pytest.main([__file__, "-v"])
    
    

    침투 ν…ŒμŠ€νŠΈ 체크리슀트

    
    # security-test-checklist.yml
    
    penetration_testing:
    
      
    
      authentication_bypass:
    
        - name: "Test authentication bypass attempts"
    
          tests:
    
            - "Missing Authorization header"
    
            - "Malformed JWT tokens"
    
            - "Replay attack with expired tokens"
    
            - "Token signature manipulation"
    
            - "Audience/issuer manipulation"
    
        
    
      authorization_escalation:
    
        - name: "Test privilege escalation attempts"
    
          tests:
    
            - "Role manipulation in token"
    
            - "Store access boundary testing"
    
            - "Cross-tenant data access attempts"
    
            - "Administrative function access"
    
        
    
      sql_injection:
    
        - name: "Test SQL injection vulnerabilities"
    
          tests:
    
            - "Parameter injection in search queries"
    
            - "Store ID manipulation"
    
            - "JSON parameter injection"
    
            - "Union-based injection attempts"
    
        
    
      data_exposure:
    
        - name: "Test for data exposure vulnerabilities"
    
          tests:
    
            - "Error message information disclosure"
    
            - "Timing attack possibilities"
    
            - "Cross-store data leakage"
    
            - "Audit log exposure"
    
        
    
      rate_limiting:
    
        - name: "Test rate limiting and DoS protection"
    
          tests:
    
            - "Authentication endpoint flooding"
    
            - "API endpoint rate limits"
    
            - "Resource exhaustion attempts"
    
            - "Connection pool exhaustion"
    
    

    🎯 μ£Όμš” μš”μ 

    이 μ‹€μŠ΅μ„ μ™„λ£Œν•œ ν›„, λ‹€μŒμ„ μˆ˜ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€:

    βœ… λ©€ν‹° ν…Œλ„ŒνŠΈ λ³΄μ•ˆ: μ™„μ „ν•œ 데이터 격리λ₯Ό μœ„ν•œ ν–‰ μˆ˜μ€€ λ³΄μ•ˆ κ΅¬ν˜„

    βœ… Azure 인증: Azure Entra ID와 JWT 검증 톡합

    βœ… μ—­ν•  기반 κΆŒν•œ λΆ€μ—¬: 계측적 μ—­ν•  및 κΆŒν•œ μ‹œμŠ€ν…œ ꡬ성

    βœ… 포괄적인 감사 둜그: λ³΄μ•ˆ 이벀트 좔적 및 λͺ¨λ‹ˆν„°λ§ μ„€μ •

    βœ… λ³΄μ•ˆ ν…ŒμŠ€νŠΈ: μžλ™ν™”λœ λ³΄μ•ˆ 검증 ν…ŒμŠ€νŠΈ κ΅¬ν˜„

    βœ… μœ„ν˜‘ λͺ¨λ‹ˆν„°λ§: μ‹€μ‹œκ°„ λ³΄μ•ˆ 이벀트 감지 및 κ²½κ³  생성

    πŸš€ λ‹€μŒ 단계

    Lab 03: ν™˜κ²½ 섀정을 계속 μ§„ν–‰ν•˜μ—¬:

  • λ³΄μ•ˆ λͺ¨λ²” 사둀λ₯Ό ν™œμš©ν•œ 개발 ν™˜κ²½ ꡬ성
  • 인증 및 λͺ¨λ‹ˆν„°λ§μ„ μœ„ν•œ Azure μ„œλΉ„μŠ€ μ„€μ •
  • μ•ˆμ „ν•œ λ°μ΄ν„°λ² μ΄μŠ€ μ—°κ²° 및 λΉ„λ°€ 관리 κ΅¬ν˜„
  • 개발 ν™˜κ²½μ—μ„œ λ³΄μ•ˆ ꡬ성 검증
  • πŸ“š μΆ”κ°€ 자료

    Azure λ³΄μ•ˆ

  • Azure Entra ID λ¬Έμ„œ - μ™„μ „ν•œ ID ν”Œλž«νΌ κ°€μ΄λ“œ
  • Azure Key Vault - λΉ„λ°€ 관리 μ„œλΉ„μŠ€
  • Azure λ³΄μ•ˆ λͺ¨λ²” 사둀 - λ³΄μ•ˆ μ§€μΉ¨
  • λ°μ΄ν„°λ² μ΄μŠ€ λ³΄μ•ˆ

  • PostgreSQL ν–‰ μˆ˜μ€€ λ³΄μ•ˆ - 곡식 RLS λ¬Έμ„œ
  • λ°μ΄ν„°λ² μ΄μŠ€ λ³΄μ•ˆ 체크리슀트 - PostgreSQL λ³΄μ•ˆ κ°€μ΄λ“œ
  • λ©€ν‹° ν…Œλ„ŒνŠΈ λ°μ΄ν„°λ² μ΄μŠ€ νŒ¨ν„΄ - μ•„ν‚€ν…μ²˜ νŒ¨ν„΄
  • λ³΄μ•ˆ ν…ŒμŠ€νŠΈ

  • OWASP ν…ŒμŠ€νŠΈ κ°€μ΄λ“œ - 포괄적인 λ³΄μ•ˆ ν…ŒμŠ€νŠΈ
  • JWT λ³΄μ•ˆ λͺ¨λ²” 사둀 - JWT λ³΄μ•ˆ 고렀사항
  • API λ³΄μ•ˆ ν…ŒμŠ€νŠΈ - API μ „μš© λ³΄μ•ˆ ν…ŒμŠ€νŠΈ
  • ---

    이전: Lab 01: 핡심 μ•„ν‚€ν…μ²˜ κ°œλ…

    λ‹€μŒ: Lab 03: ν™˜κ²½ μ„€μ •

    ---

    λ©΄μ±… μ‘°ν•­:

    이 λ¬Έμ„œλŠ” AI λ²ˆμ—­ μ„œλΉ„μŠ€ Co-op Translatorλ₯Ό μ‚¬μš©ν•˜μ—¬ λ²ˆμ—­λ˜μ—ˆμŠ΅λ‹ˆλ‹€.

    정확성을 μœ„ν•΄ μ΅œμ„ μ„ λ‹€ν•˜κ³  μžˆμœΌλ‚˜, μžλ™ λ²ˆμ—­μ—λŠ” 였λ₯˜λ‚˜ 뢀정확성이 포함될 수 μžˆμŠ΅λ‹ˆλ‹€.

    원본 λ¬Έμ„œλ₯Ό ν•΄λ‹Ή μ–Έμ–΄λ‘œ μž‘μ„±λœ μƒνƒœμ—μ„œ κΆŒμœ„ μžˆλŠ” 자료둜 κ°„μ£Όν•΄μ•Ό ν•©λ‹ˆλ‹€.

    μ€‘μš”ν•œ μ •λ³΄μ˜ 경우, 전문적인 인간 λ²ˆμ—­μ„ ꢌμž₯ν•©λ‹ˆλ‹€.

    이 λ²ˆμ—­ μ‚¬μš©μœΌλ‘œ 인해 λ°œμƒν•˜λŠ” μ˜€ν•΄λ‚˜ 잘λͺ»λœ 해석에 λŒ€ν•΄ λ‹Ήμ‚¬λŠ” μ±…μž„μ„ μ§€μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

    MCP Academy — microsoft/mcp-for-beginners