Mock API Server for Testing Lumos Connectors
This script creates a FastAPI server that simulates a target system withusers, groups, and entitlements endpoints to test a Lumos connector against.
To use:
- Install dependencies: pip install fastapi uvicorn pydantic
- Run: uvicorn mock_api_server:app --reload
- Server will run on http://localhost:8000
API endpoints:
* GET /api/users - List users (supports pagination)
* GET /api/users/{user_id} - Get user details
* POST /api/users - Create user
* PUT /api/users/{user_id} - Update user
* DELETE /api/users/{user_id} - Delete user
* GET /api/resources - List resources
* GET /api/entitlements - List entitlements
* GET /api/associations - List entitlement associations
* POST /api/users/{user_id}/entitlements - Assign entitlement
* DELETE /api/users/{user_id}/entitlements/{entitlement_id} - Unassign entitlement
import time
import uuid
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Set, Any
from fastapi import FastAPI, HTTPException, Query, Depends, Header, Response, status
from pydantic import BaseModel, Field, EmailStr
app = FastAPI(title="Mock API for Lumos Connector Testing")
# =========== Models ===========
class UserStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
DELETED = "deleted"
class ResourceType(str, Enum):
PROJECT = "project"
TEAM = "team"
GLOBAL = "global"
class EntitlementType(str, Enum):
PROJECT_ROLE = "project_role"
TEAM_MEMBER = "team_member"
ADMIN_ROLE = "admin_role"
class User(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
email: EmailStr
name: str
status: UserStatus = UserStatus.ACTIVE
created_at: str = Field(default_factory=lambda: datetime.now().isoformat())
last_login: Optional[str] = None
class Resource(BaseModel):
id: str
name: str
type: ResourceType
description: Optional[str] = None
class Entitlement(BaseModel):
id: str
name: str
type: EntitlementType
resource_id: str
description: Optional[str] = None
class Association(BaseModel):
user_id: str
entitlement_id: str
resource_id: str
assigned_at: str = Field(default_factory=lambda: datetime.now().isoformat())
class UserCreate(BaseModel):
email: EmailStr
name: str
status: UserStatus = UserStatus.ACTIVE
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
name: Optional[str] = None
status: Optional[UserStatus] = None
class EntitlementAssign(BaseModel):
entitlement_id: str
resource_id: str
class PaginatedResponse(BaseModel):
items: List[Any]
total: int
has_more: bool
# =========== In-memory database ===========
# Sample users
db_users: Dict[str, User] = {}
# Sample resources
db_resources: Dict[str, Resource] = {
"project-1": Resource(id="project-1", name="Marketing Project", type=ResourceType.PROJECT,
description="Marketing campaigns and assets"),
"project-2": Resource(id="project-2", name="Engineering Project", type=ResourceType.PROJECT,
description="Product development"),
"team-1": Resource(id="team-1", name="Marketing Team", type=ResourceType.TEAM,
description="Marketing department"),
"team-2": Resource(id="team-2", name="Engineering Team", type=ResourceType.TEAM,
description="Engineering department"),
"global": Resource(id="global", name="Global", type=ResourceType.GLOBAL,
description="Global resources")
}
# Sample entitlements
db_entitlements: Dict[str, Entitlement] = {
"project-admin": Entitlement(id="project-admin", name="Project Admin",
type=EntitlementType.PROJECT_ROLE,
resource_id="project-1",
description="Admin access to projects"),
"project-viewer": Entitlement(id="project-viewer", name="Project Viewer",
type=EntitlementType.PROJECT_ROLE,
resource_id="project-1",
description="View-only access to projects"),
"team-member": Entitlement(id="team-member", name="Team Member",
type=EntitlementType.TEAM_MEMBER,
resource_id="team-1",
description="Member of a team"),
"team-lead": Entitlement(id="team-lead", name="Team Lead",
type=EntitlementType.TEAM_MEMBER,
resource_id="team-1",
description="Team leadership role"),
"system-admin": Entitlement(id="system-admin", name="System Administrator",
type=EntitlementType.ADMIN_ROLE,
resource_id="global",
description="Global administrator"),
}
# User-entitlement associations
db_associations: List[Association] = []
# =========== Authentication middleware ===========
api_tokens = {
"valid-token": "mock-tenant-id"
}
def verify_token(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing token",
)
token = authorization.replace("Bearer ", "")
if token not in api_tokens:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
return token
# =========== Helper functions ===========
def create_sample_users(count: int = 10):
"""Create sample users if none exist"""
if not db_users:
domains = ["example.com", "test.org", "mockdata.net"]
departments = ["Marketing", "Engineering", "Sales", "HR", "Finance"]
for i in range(1, count + 1):
domain = domains[i % len(domains)]
dept = departments[i % len(departments)]
user_id = f"user-{i}"
user = User(
id=user_id,
email=f"user{i}@{domain}",
name=f"{dept} User {i}",
status=UserStatus.ACTIVE,
created_at=(datetime.now() - timedelta(days=i*10)).isoformat()
)
# Add some recent login times
if i % 3 != 0: # Some users have logged in
days_ago = i % 30
user.last_login = (datetime.now() - timedelta(days=days_ago)).isoformat()
db_users[user_id] = user
# Create some associations
if i <= 3: # First 3 users are admins
db_associations.append(Association(
user_id=user_id,
entitlement_id="system-admin",
resource_id="global"
))
# Add project roles
if dept == "Marketing":
db_associations.append(Association(
user_id=user_id,
entitlement_id="project-admin" if i % 5 == 0 else "project-viewer",
resource_id="project-1"
))
# Add team memberships
team_id = "team-1" if dept in ["Marketing", "Sales"] else "team-2"
entitlement_id = "team-lead" if i % 7 == 0 else "team-member"
db_associations.append(Association(
user_id=user_id,
entitlement_id=entitlement_id,
resource_id=team_id
))
# =========== API Endpoints ===========
@app.get("/")
async def root():
"""Health check endpoint"""
return {"status": "healthy", "service": "Mock API Server for Lumos Connector Testing"}
@app.get("/api/users", response_model=PaginatedResponse)
async def list_users(
limit: int = Query(10, ge=1, le=100),
offset: int = Query(0, ge=0),
token: str = Depends(verify_token)
):
"""List users with pagination"""
create_sample_users()
users = list(db_users.values())
total = len(users)
# Apply pagination
paginated_users = users[offset:offset+limit]
# Add a small delay to simulate network latency
time.sleep(0.1)
return {
"items": paginated_users,
"total": total,
"has_more": offset + limit < total
}
@app.get("/api/users/{user_id}", response_model=User)
async def get_user(user_id: str, token: str = Depends(verify_token)):
"""Get a user by ID"""
create_sample_users()
if user_id not in db_users:
raise HTTPException(status_code=404, detail="User not found")
return db_users[user_id]
@app.post("/api/users", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate, token: str = Depends(verify_token)):
"""Create a new user"""
user_id = str(uuid.uuid4())
new_user = User(
id=user_id,
email=user.email,
name=user.name,
status=user.status
)
db_users[user_id] = new_user
return new_user
@app.put("/api/users/{user_id}", response_model=User)
async def update_user(user_id: str, user_update: UserUpdate, token: str = Depends(verify_token)):
"""Update a user"""
create_sample_users()
if user_id not in db_users:
raise HTTPException(status_code=404, detail="User not found")
current_user = db_users[user_id]
# Update fields if provided
if user_update.email is not None:
current_user.email = user_update.email
if user_update.name is not None:
current_user.name = user_update.name
if user_update.status is not None:
current_user.status = user_update.status
db_users[user_id] = current_user
return current_user
@app.delete("/api/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: str, token: str = Depends(verify_token)):
"""Delete a user"""
create_sample_users()
if user_id not in db_users:
raise HTTPException(status_code=404, detail="User not found")
# Soft delete - change status to deleted
db_users[user_id].status = UserStatus.DELETED
return Response(status_code=status.HTTP_204_NO_CONTENT)
@app.get("/api/resources", response_model=List[Resource])
async def list_resources(token: str = Depends(verify_token)):
"""List all resources"""
return list(db_resources.values())
@app.get("/api/entitlements", response_model=List[Entitlement])
async def list_entitlements(token: str = Depends(verify_token)):
"""List all entitlements"""
return list(db_entitlements.values())
@app.get("/api/associations", response_model=List[Association])
async def list_associations(
user_id: Optional[str] = None,
entitlement_id: Optional[str] = None,
resource_id: Optional[str] = None,
token: str = Depends(verify_token)
):
"""List entitlement associations with optional filtering"""
create_sample_users() # Ensure we have sample data
result = db_associations
# Apply filters
if user_id:
result = [a for a in result if a.user_id == user_id]
if entitlement_id:
result = [a for a in result if a.entitlement_id == entitlement_id]
if resource_id:
result = [a for a in result if a.resource_id == resource_id]
return result
@app.post("/api/users/{user_id}/entitlements", status_code=status.HTTP_201_CREATED)
async def assign_entitlement(
user_id: str,
assignment: EntitlementAssign,
token: str = Depends(verify_token)
):
"""Assign an entitlement to a user"""
create_sample_users()
if user_id not in db_users:
raise HTTPException(status_code=404, detail="User not found")
if assignment.entitlement_id not in db_entitlements:
raise HTTPException(status_code=404, detail="Entitlement not found")
if assignment.resource_id not in db_resources:
raise HTTPException(status_code=404, detail="Resource not found")
# Check if assignment already exists
for assoc in db_associations:
if (assoc.user_id == user_id and
assoc.entitlement_id == assignment.entitlement_id and
assoc.resource_id == assignment.resource_id):
return {"message": "Assignment already exists"}
# Create new association
new_assoc = Association(
user_id=user_id,
entitlement_id=assignment.entitlement_id,
resource_id=assignment.resource_id
)
db_associations.append(new_assoc)
return {"message": "Entitlement assigned successfully"}
@app.delete("/api/users/{user_id}/entitlements/{entitlement_id}", status_code=status.HTTP_204_NO_CONTENT)
async def unassign_entitlement(
user_id: str,
entitlement_id: str,
resource_id: str = Query(...),
token: str = Depends(verify_token)
):
"""Unassign an entitlement from a user"""
create_sample_users()
if user_id not in db_users:
raise HTTPException(status_code=404, detail="User not found")
# Find and remove the association
for i, assoc in enumerate(db_associations):
if (assoc.user_id == user_id and
assoc.entitlement_id == entitlement_id and
assoc.resource_id == resource_id):
db_associations.pop(i)
return Response(status_code=status.HTTP_204_NO_CONTENT)
# If we get here, the association wasn't found
raise HTTPException(status_code=404, detail="Association not found")
@app.get("/api/health")
async def health_check():
"""Health check endpoint that doesn't require auth"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"users_count": len(db_users),
"resources_count": len(db_resources),
"entitlements_count": len(db_entitlements),
"associations_count": len(db_associations)
}
# =========== OAuth simulation endpoints ===========
class TokenRequest(BaseModel):
grant_type: str
client_id: str
client_secret: Optional[str] = None
code: Optional[str] = None
redirect_uri: Optional[str] = None
refresh_token: Optional[str] = None
class TokenResponse(BaseModel):
access_token: str
token_type: str = "Bearer"
expires_in: int = 3600
refresh_token: Optional[str] = None
@app.post("/oauth/token", response_model=TokenResponse)
async def get_token(request: TokenRequest):
"""Simulate OAuth token endpoint"""
# Validate client_id (in a real system, you'd check against registered clients)
if request.client_id != "test-client-id":
raise HTTPException(status_code=400, detail="Invalid client_id")
# Handle different grant types
if request.grant_type == "authorization_code":
# Validate code (in a real system, this would be a temporary code from auth flow)
if not request.code or request.code != "valid-auth-code":
raise HTTPException(status_code=400, detail="Invalid authorization code")
# Return new tokens
return TokenResponse(
access_token="valid-token",
refresh_token="valid-refresh-token"
)
elif request.grant_type == "refresh_token":
# Validate refresh token
if not request.refresh_token or request.refresh_token != "valid-refresh-token":
raise HTTPException(status_code=400, detail="Invalid refresh token")
# Return new access token (optionally new refresh token too)
return TokenResponse(
access_token="refreshed-valid-token",
refresh_token="refreshed-valid-refresh-token"
)
elif request.grant_type == "client_credentials":
# Validate client secret
if not request.client_secret or request.client_secret != "test-client-secret":
raise HTTPException(status_code=400, detail="Invalid client credentials")
# Return access token
return TokenResponse(
access_token="valid-token",
# Note: refresh tokens are typically not issued for client credentials
)
else:
raise HTTPException(status_code=400, detail=f"Unsupported grant type: {request.grant_type}")
Updated 18 days ago