Mock API Server for Testing Lumos Connectors

This script creates a FastAPI server that simulates a target system withusers, groups, and entitlements endpoints to test a Lumos connector against.

To use:

  1. Install dependencies: pip install fastapi uvicorn pydantic
  2. Run: uvicorn mock_api_server:app --reload
  3. Server will run on http://localhost:8000

API endpoints:

* GET /api/users - List users (supports pagination)
* GET /api/users/{user_id} - Get user details
* POST /api/users - Create user
* PUT /api/users/{user_id} - Update user
* DELETE /api/users/{user_id} - Delete user
* GET /api/resources - List resources
* GET /api/entitlements - List entitlements
* GET /api/associations - List entitlement associations
* POST /api/users/{user_id}/entitlements - Assign entitlement
* DELETE /api/users/{user_id}/entitlements/{entitlement_id} - Unassign entitlement

import time
import uuid
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Set, Any

from fastapi import FastAPI, HTTPException, Query, Depends, Header, Response, status
from pydantic import BaseModel, Field, EmailStr

app = FastAPI(title="Mock API for Lumos Connector Testing")

# =========== Models ===========

class UserStatus(str, Enum):
    ACTIVE = "active"
    INACTIVE = "inactive"
    DELETED = "deleted"

class ResourceType(str, Enum):
    PROJECT = "project"
    TEAM = "team"
    GLOBAL = "global"

class EntitlementType(str, Enum):
    PROJECT_ROLE = "project_role"
    TEAM_MEMBER = "team_member"
    ADMIN_ROLE = "admin_role"

class User(BaseModel):
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    email: EmailStr
    name: str
    status: UserStatus = UserStatus.ACTIVE
    created_at: str = Field(default_factory=lambda: datetime.now().isoformat())
    last_login: Optional[str] = None

class Resource(BaseModel):
    id: str
    name: str
    type: ResourceType
    description: Optional[str] = None

class Entitlement(BaseModel):
    id: str
    name: str
    type: EntitlementType
    resource_id: str
    description: Optional[str] = None

class Association(BaseModel):
    user_id: str
    entitlement_id: str
    resource_id: str
    assigned_at: str = Field(default_factory=lambda: datetime.now().isoformat())

class UserCreate(BaseModel):
    email: EmailStr
    name: str
    status: UserStatus = UserStatus.ACTIVE

class UserUpdate(BaseModel):
    email: Optional[EmailStr] = None
    name: Optional[str] = None
    status: Optional[UserStatus] = None

class EntitlementAssign(BaseModel):
    entitlement_id: str
    resource_id: str

class PaginatedResponse(BaseModel):
    items: List[Any]
    total: int
    has_more: bool

# =========== In-memory database ===========

# Sample users
db_users: Dict[str, User] = {}

# Sample resources
db_resources: Dict[str, Resource] = {
    "project-1": Resource(id="project-1", name="Marketing Project", type=ResourceType.PROJECT, 
                          description="Marketing campaigns and assets"),
    "project-2": Resource(id="project-2", name="Engineering Project", type=ResourceType.PROJECT,
                          description="Product development"),
    "team-1": Resource(id="team-1", name="Marketing Team", type=ResourceType.TEAM,
                       description="Marketing department"),
    "team-2": Resource(id="team-2", name="Engineering Team", type=ResourceType.TEAM,
                       description="Engineering department"),
    "global": Resource(id="global", name="Global", type=ResourceType.GLOBAL,
                      description="Global resources")
}

# Sample entitlements
db_entitlements: Dict[str, Entitlement] = {
    "project-admin": Entitlement(id="project-admin", name="Project Admin", 
                               type=EntitlementType.PROJECT_ROLE,
                               resource_id="project-1",
                               description="Admin access to projects"),
    "project-viewer": Entitlement(id="project-viewer", name="Project Viewer", 
                                type=EntitlementType.PROJECT_ROLE,
                                resource_id="project-1",
                                description="View-only access to projects"),
    "team-member": Entitlement(id="team-member", name="Team Member", 
                             type=EntitlementType.TEAM_MEMBER,
                             resource_id="team-1",
                             description="Member of a team"),
    "team-lead": Entitlement(id="team-lead", name="Team Lead", 
                           type=EntitlementType.TEAM_MEMBER,
                           resource_id="team-1",
                           description="Team leadership role"),
    "system-admin": Entitlement(id="system-admin", name="System Administrator", 
                              type=EntitlementType.ADMIN_ROLE,
                              resource_id="global",
                              description="Global administrator"),
}

# User-entitlement associations
db_associations: List[Association] = []

# =========== Authentication middleware ===========

api_tokens = {
    "valid-token": "mock-tenant-id"
}

def verify_token(authorization: str = Header(None)):
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or missing token",
        )
    
    token = authorization.replace("Bearer ", "")
    if token not in api_tokens:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token",
        )
    
    return token

# =========== Helper functions ===========

def create_sample_users(count: int = 10):
    """Create sample users if none exist"""
    if not db_users:
        domains = ["example.com", "test.org", "mockdata.net"]
        departments = ["Marketing", "Engineering", "Sales", "HR", "Finance"]
        
        for i in range(1, count + 1):
            domain = domains[i % len(domains)]
            dept = departments[i % len(departments)]
            
            user_id = f"user-{i}"
            user = User(
                id=user_id,
                email=f"user{i}@{domain}",
                name=f"{dept} User {i}",
                status=UserStatus.ACTIVE,
                created_at=(datetime.now() - timedelta(days=i*10)).isoformat()
            )
            
            # Add some recent login times
            if i % 3 != 0:  # Some users have logged in
                days_ago = i % 30
                user.last_login = (datetime.now() - timedelta(days=days_ago)).isoformat()
            
            db_users[user_id] = user
            
            # Create some associations
            if i <= 3:  # First 3 users are admins
                db_associations.append(Association(
                    user_id=user_id,
                    entitlement_id="system-admin",
                    resource_id="global"
                ))
            
            # Add project roles
            if dept == "Marketing":
                db_associations.append(Association(
                    user_id=user_id,
                    entitlement_id="project-admin" if i % 5 == 0 else "project-viewer",
                    resource_id="project-1"
                ))
            
            # Add team memberships
            team_id = "team-1" if dept in ["Marketing", "Sales"] else "team-2"
            entitlement_id = "team-lead" if i % 7 == 0 else "team-member"
            db_associations.append(Association(
                user_id=user_id,
                entitlement_id=entitlement_id,
                resource_id=team_id
            ))

# =========== API Endpoints ===========

@app.get("/")
async def root():
    """Health check endpoint"""
    return {"status": "healthy", "service": "Mock API Server for Lumos Connector Testing"}

@app.get("/api/users", response_model=PaginatedResponse)
async def list_users(
    limit: int = Query(10, ge=1, le=100),
    offset: int = Query(0, ge=0),
    token: str = Depends(verify_token)
):
    """List users with pagination"""
    create_sample_users()
    
    users = list(db_users.values())
    total = len(users)
    
    # Apply pagination
    paginated_users = users[offset:offset+limit]
    
    # Add a small delay to simulate network latency
    time.sleep(0.1)
    
    return {
        "items": paginated_users,
        "total": total,
        "has_more": offset + limit < total
    }

@app.get("/api/users/{user_id}", response_model=User)
async def get_user(user_id: str, token: str = Depends(verify_token)):
    """Get a user by ID"""
    create_sample_users()
    
    if user_id not in db_users:
        raise HTTPException(status_code=404, detail="User not found")
    
    return db_users[user_id]

@app.post("/api/users", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate, token: str = Depends(verify_token)):
    """Create a new user"""
    user_id = str(uuid.uuid4())
    
    new_user = User(
        id=user_id,
        email=user.email,
        name=user.name,
        status=user.status
    )
    
    db_users[user_id] = new_user
    return new_user

@app.put("/api/users/{user_id}", response_model=User)
async def update_user(user_id: str, user_update: UserUpdate, token: str = Depends(verify_token)):
    """Update a user"""
    create_sample_users()
    
    if user_id not in db_users:
        raise HTTPException(status_code=404, detail="User not found")
    
    current_user = db_users[user_id]
    
    # Update fields if provided
    if user_update.email is not None:
        current_user.email = user_update.email
    if user_update.name is not None:
        current_user.name = user_update.name
    if user_update.status is not None:
        current_user.status = user_update.status
    
    db_users[user_id] = current_user
    return current_user

@app.delete("/api/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: str, token: str = Depends(verify_token)):
    """Delete a user"""
    create_sample_users()
    
    if user_id not in db_users:
        raise HTTPException(status_code=404, detail="User not found")
    
    # Soft delete - change status to deleted
    db_users[user_id].status = UserStatus.DELETED
    
    return Response(status_code=status.HTTP_204_NO_CONTENT)

@app.get("/api/resources", response_model=List[Resource])
async def list_resources(token: str = Depends(verify_token)):
    """List all resources"""
    return list(db_resources.values())

@app.get("/api/entitlements", response_model=List[Entitlement])
async def list_entitlements(token: str = Depends(verify_token)):
    """List all entitlements"""
    return list(db_entitlements.values())

@app.get("/api/associations", response_model=List[Association])
async def list_associations(
    user_id: Optional[str] = None,
    entitlement_id: Optional[str] = None,
    resource_id: Optional[str] = None,
    token: str = Depends(verify_token)
):
    """List entitlement associations with optional filtering"""
    create_sample_users()  # Ensure we have sample data
    
    result = db_associations
    
    # Apply filters
    if user_id:
        result = [a for a in result if a.user_id == user_id]
    if entitlement_id:
        result = [a for a in result if a.entitlement_id == entitlement_id]
    if resource_id:
        result = [a for a in result if a.resource_id == resource_id]
    
    return result

@app.post("/api/users/{user_id}/entitlements", status_code=status.HTTP_201_CREATED)
async def assign_entitlement(
    user_id: str, 
    assignment: EntitlementAssign,
    token: str = Depends(verify_token)
):
    """Assign an entitlement to a user"""
    create_sample_users()
    
    if user_id not in db_users:
        raise HTTPException(status_code=404, detail="User not found")
    
    if assignment.entitlement_id not in db_entitlements:
        raise HTTPException(status_code=404, detail="Entitlement not found")
    
    if assignment.resource_id not in db_resources:
        raise HTTPException(status_code=404, detail="Resource not found")
    
    # Check if assignment already exists
    for assoc in db_associations:
        if (assoc.user_id == user_id and 
            assoc.entitlement_id == assignment.entitlement_id and
            assoc.resource_id == assignment.resource_id):
            return {"message": "Assignment already exists"}
    
    # Create new association
    new_assoc = Association(
        user_id=user_id,
        entitlement_id=assignment.entitlement_id,
        resource_id=assignment.resource_id
    )
    db_associations.append(new_assoc)
    
    return {"message": "Entitlement assigned successfully"}

@app.delete("/api/users/{user_id}/entitlements/{entitlement_id}", status_code=status.HTTP_204_NO_CONTENT)
async def unassign_entitlement(
    user_id: str, 
    entitlement_id: str,
    resource_id: str = Query(...),
    token: str = Depends(verify_token)
):
    """Unassign an entitlement from a user"""
    create_sample_users()
    
    if user_id not in db_users:
        raise HTTPException(status_code=404, detail="User not found")
    
    # Find and remove the association
    for i, assoc in enumerate(db_associations):
        if (assoc.user_id == user_id and 
            assoc.entitlement_id == entitlement_id and
            assoc.resource_id == resource_id):
            db_associations.pop(i)
            return Response(status_code=status.HTTP_204_NO_CONTENT)
    
    # If we get here, the association wasn't found
    raise HTTPException(status_code=404, detail="Association not found")

@app.get("/api/health")
async def health_check():
    """Health check endpoint that doesn't require auth"""
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "users_count": len(db_users),
        "resources_count": len(db_resources),
        "entitlements_count": len(db_entitlements),
        "associations_count": len(db_associations)
    }

# =========== OAuth simulation endpoints ===========

class TokenRequest(BaseModel):
    grant_type: str
    client_id: str
    client_secret: Optional[str] = None
    code: Optional[str] = None
    redirect_uri: Optional[str] = None
    refresh_token: Optional[str] = None

class TokenResponse(BaseModel):
    access_token: str
    token_type: str = "Bearer"
    expires_in: int = 3600
    refresh_token: Optional[str] = None

@app.post("/oauth/token", response_model=TokenResponse)
async def get_token(request: TokenRequest):
    """Simulate OAuth token endpoint"""
    # Validate client_id (in a real system, you'd check against registered clients)
    if request.client_id != "test-client-id":
        raise HTTPException(status_code=400, detail="Invalid client_id")
    
    # Handle different grant types
    if request.grant_type == "authorization_code":
        # Validate code (in a real system, this would be a temporary code from auth flow)
        if not request.code or request.code != "valid-auth-code":
            raise HTTPException(status_code=400, detail="Invalid authorization code")
        
        # Return new tokens
        return TokenResponse(
            access_token="valid-token",
            refresh_token="valid-refresh-token"
        )
        
    elif request.grant_type == "refresh_token":
        # Validate refresh token
        if not request.refresh_token or request.refresh_token != "valid-refresh-token":
            raise HTTPException(status_code=400, detail="Invalid refresh token")
        
        # Return new access token (optionally new refresh token too)
        return TokenResponse(
            access_token="refreshed-valid-token",
            refresh_token="refreshed-valid-refresh-token"
        )
        
    elif request.grant_type == "client_credentials":
        # Validate client secret
        if not request.client_secret or request.client_secret != "test-client-secret":
            raise HTTPException(status_code=400, detail="Invalid client credentials")
        
        # Return access token
        return TokenResponse(
            access_token="valid-token",
            # Note: refresh tokens are typically not issued for client credentials
        )
        
    else:
        raise HTTPException(status_code=400, detail=f"Unsupported grant type: {request.grant_type}")