Building a Lumos Connector: Step-by-Step Tutorial
This tutorial guides you through the entire process of building a custom Lumos connector for your application. By the end, you'll have:
- A working test server that simulates your application's API
- A fully functional Lumos connector that integrates with this API
- The knowledge to adapt this connector for your real application
Setup Overview
We'll be working with two components:
- Mock API Server: A FastAPI application that simulates your application's API endpoints
- Lumos Connector: A Python package that translates between your API and Lumos
Part 1: Setting Up the Test Environment
Step 1: Install Dependencies
First, create a new directory for your project and set up a virtual environment:
# Create a project directory
mkdir lumos-connector-project
cd lumos-connector-project
# Create a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install required packages
pip install connector-py[dev] fastapi uvicorn httpx pydantic
Step 2: Set Up the Mock API Server
Create a file named mock_api_server.py
with the code from the test server artifact.
Run the server:
uvicorn mock_api_server:app --reload
The server should now be running at http://localhost:8000. You can access the API documentation at http://localhost:8000/docs.
Step 3: Test the API Server
Let's verify that the server is working correctly:
# Check the health endpoint
curl http://localhost:8000/api/health
# List users (requires authentication)
curl -H "Authorization: Bearer valid-token" http://localhost:8000/api/users
The first request should work without authentication, while the second should return user data if you provide the correct token.
Part 2: Creating the Lumos Connector
Step 1: Scaffold a New Connector
Now we'll create a new Lumos connector that will connect to our mock API:
# Create a new connector (from project root)
connector scaffold mockapp-connector mockapp_connector
# Navigate to the connector directory
cd mockapp_connector
# Install the connector dependencies
pip install ".[all]"
Step 2: Configure the Connector Settings
Edit mockapp_connector/settings.py
to define the configuration settings users will provide:
from pydantic import BaseModel, Field
class MockappSettings(BaseModel):
"""Settings for MockApp connector."""
api_url: str = Field(
default="http://localhost:8000",
description="The base URL of the MockApp API"
)
Step 3: Update Constants
Edit mockapp_connector/constants.py
to set the API base URL:
# API endpoints
API_BASE_PATH = "/api"
Step 4: Define Resource and Entitlement Types
Edit mockapp_connector/enums.py
to match the types in our mock API:
from enum import Enum
from connector.generated import EntitlementType, ResourceType
class MockappResourceTypes(str, Enum):
PROJECT = "project"
TEAM = "team"
GLOBAL = "global"
class MockappEntitlementTypes(str, Enum):
PROJECT_ROLE = "project_role"
TEAM_MEMBER = "team_member"
ADMIN_ROLE = "admin_role"
resource_types: list[ResourceType] = [
ResourceType(
type_id=MockappResourceTypes.PROJECT,
type_label="Project",
),
ResourceType(
type_id=MockappResourceTypes.TEAM,
type_label="Team",
),
ResourceType(
type_id=MockappResourceTypes.GLOBAL,
type_label="Global Resource",
)
]
entitlement_types: list[EntitlementType] = [
EntitlementType(
type_id=MockappEntitlementTypes.PROJECT_ROLE,
type_label="Project Role",
resource_type_id=MockappResourceTypes.PROJECT,
min=0,
),
EntitlementType(
type_id=MockappEntitlementTypes.TEAM_MEMBER,
type_label="Team Member",
resource_type_id=MockappResourceTypes.TEAM,
min=0,
),
EntitlementType(
type_id=MockappEntitlementTypes.ADMIN_ROLE,
type_label="Admin Role",
resource_type_id=MockappResourceTypes.GLOBAL,
min=0,
max=1, # Users can have at most one admin role
)
]
Step 5: Create API Client
Edit mockapp_connector/client.py
to create a client for the mock API:
import typing as t
from urllib.parse import urljoin
from connector.oai.base_clients import BaseIntegrationClient
from connector.oai.capability import Request, get_oauth, get_settings
from connector.utils.httpx_auth import BearerAuth
from connector.utils.client_utils import create_client_response
from mockapp_connector.constants import API_BASE_PATH
from mockapp_connector.settings import MockappSettings
class MockappClient(BaseIntegrationClient):
@classmethod
def prepare_client_args(cls, args: Request) -> dict[str, t.Any]:
settings = get_settings(args, MockappSettings)
return {
"auth": BearerAuth(token=get_oauth(args).access_token),
"base_url": settings.api_url,
}
async def get_users(self, limit: int | None = None, offset: int | None = None):
"""Fetch users from the API"""
params = {}
if limit:
params["limit"] = limit
if offset:
params["offset"] = offset
response = await self._http_client.get(f"{API_BASE_PATH}/users", params=params)
return response.json()
async def get_resources(self):
"""Fetch resources from the API"""
response = await self._http_client.get(f"{API_BASE_PATH}/resources")
return response.json()
async def get_entitlements(self):
"""Fetch entitlements from the API"""
response = await self._http_client.get(f"{API_BASE_PATH}/entitlements")
return response.json()
async def get_associations(self, user_id=None, entitlement_id=None, resource_id=None):
"""Fetch associations from the API"""
params = {}
if user_id:
params["user_id"] = user_id
if entitlement_id:
params["entitlement_id"] = entitlement_id
if resource_id:
params["resource_id"] = resource_id
response = await self._http_client.get(f"{API_BASE_PATH}/associations", params=params)
return response.json()
async def create_user(self, user_data):
"""Create a new user"""
response = await self._http_client.post(f"{API_BASE_PATH}/users", json=user_data)
return response.json()
async def update_user(self, user_id, user_data):
"""Update a user"""
response = await self._http_client.put(f"{API_BASE_PATH}/users/{user_id}", json=user_data)
return response.json()
async def delete_user(self, user_id):
"""Delete a user"""
await self._http_client.delete(f"{API_BASE_PATH}/users/{user_id}")
async def assign_entitlement(self, user_id, entitlement_id, resource_id):
"""Assign an entitlement to a user"""
data = {
"entitlement_id": entitlement_id,
"resource_id": resource_id
}
response = await self._http_client.post(f"{API_BASE_PATH}/users/{user_id}/entitlements", json=data)
return response.json()
async def unassign_entitlement(self, user_id, entitlement_id, resource_id):
"""Unassign an entitlement from a user"""
params = {"resource_id": resource_id}
await self._http_client.delete(
f"{API_BASE_PATH}/users/{user_id}/entitlements/{entitlement_id}",
params=params
)
Step 6: Implement Read Capabilities
Edit mockapp_connector/capabilities_read.py
to implement the required read capabilities:
from connector.generated import (
AccountStatus,
FindEntitlementAssociationsRequest,
FindEntitlementAssociationsResponse,
FoundAccountData,
FoundEntitlementAssociation,
FoundEntitlementData,
FoundResourceData,
GetLastActivityRequest,
GetLastActivityResponse,
LastActivityData,
ListAccountsRequest,
ListAccountsResponse,
ListEntitlementsRequest,
ListEntitlementsResponse,
ListResourcesRequest,
ListResourcesResponse,
Page,
ValidateCredentialsRequest,
ValidateCredentialsResponse,
ValidatedCredentials,
ActivityEventType,
)
from connector.oai.capability import get_page, get_settings
from datetime import datetime
from mockapp_connector.pagination import DEFAULT_PAGE_SIZE, NextPageToken, Pagination
from mockapp_connector.settings import MockappSettings
from mockapp_connector.client import MockappClient
from mockapp_connector.enums import MockappResourceTypes, MockappEntitlementTypes
async def validate_credentials(
args: ValidateCredentialsRequest,
) -> ValidateCredentialsResponse:
"""Validate credentials for MockApp API."""
async with MockappClient(args) as client:
try:
# Test the API with a simple call
await client.get_users(limit=1)
return ValidateCredentialsResponse(
response=ValidatedCredentials(
unique_tenant_id="mockapp-tenant-id",
valid=True,
),
)
except Exception:
return ValidateCredentialsResponse(
response=ValidatedCredentials(
valid=False,
),
)
async def list_accounts(args: ListAccountsRequest) -> ListAccountsResponse:
"""List user accounts from MockApp."""
endpoint = "/api/users"
try:
current_pagination = NextPageToken(get_page(args).token).paginations()[0]
except IndexError:
current_pagination = Pagination.default(endpoint)
page_size = get_page(args).size or DEFAULT_PAGE_SIZE
async with MockappClient(args) as client:
response = await client.get_users(
limit=page_size,
offset=current_pagination.offset,
)
# Transform API response to Lumos account format
accounts = []
for user in response.get("items", []):
status = AccountStatus.ACTIVE
if user.get("status") == "inactive":
status = AccountStatus.INACTIVE
elif user.get("status") == "deleted":
status = AccountStatus.DEACTIVATED
accounts.append(FoundAccountData(
account_id=user["id"],
email=user.get("email", ""),
display_name=user.get("name", ""),
status=status,
))
# Handle pagination
has_more = response.get("has_more", False)
next_pagination = []
if has_more:
next_pagination.append(
Pagination(
endpoint=endpoint,
offset=current_pagination.offset + len(accounts),
)
)
next_page_token = NextPageToken.from_paginations(next_pagination).token if next_pagination else None
return ListAccountsResponse(
response=accounts,
page=Page(
token=next_page_token,
size=page_size,
) if next_page_token else None,
)
async def list_resources(args: ListResourcesRequest) -> ListResourcesResponse:
"""List resources from MockApp."""
async with MockappClient(args) as client:
resources_data = await client.get_resources()
# Transform API response to Lumos resource format
resources = []
for resource in resources_data:
resources.append(FoundResourceData(
resource_id=resource["id"],
resource_type=resource["type"],
display_name=resource["name"],
description=resource.get("description", ""),
))
return ListResourcesResponse(
response=resources,
)
async def list_entitlements(args: ListEntitlementsRequest) -> ListEntitlementsResponse:
"""List entitlements from MockApp."""
async with MockappClient(args) as client:
entitlements_data = await client.get_entitlements()
# Transform API response to Lumos entitlement format
entitlements = []
for entitlement in entitlements_data:
entitlements.append(FoundEntitlementData(
entitlement_id=entitlement["id"],
entitlement_type=entitlement["type"],
resource_id=entitlement["resource_id"],
display_name=entitlement["name"],
description=entitlement.get("description", ""),
))
return ListEntitlementsResponse(
response=entitlements,
)
async def find_entitlement_associations(
args: FindEntitlementAssociationsRequest,
) -> FindEntitlementAssociationsResponse:
"""Find entitlement associations in MockApp."""
async with MockappClient(args) as client:
associations_data = await client.get_associations()
# Transform API response to Lumos association format
associations = []
for assoc in associations_data:
associations.append(FoundEntitlementAssociation(
account_id=assoc["user_id"],
entitlement_id=assoc["entitlement_id"],
resource_id=assoc["resource_id"],
))
return FindEntitlementAssociationsResponse(
response=associations,
)
async def get_last_activity(args: GetLastActivityRequest) -> GetLastActivityResponse:
"""Get last activity for users in MockApp."""
account_ids = args.request.account_ids
async with MockappClient(args) as client:
# For each requested user, get their details to check last login
activities = []
for account_id in account_ids:
try:
user_data = await client._http_client.get(f"/api/users/{account_id}")
user = user_data.json()
if user.get("last_login"):
activities.append(LastActivityData(
account_id=account_id,
event_type=ActivityEventType.LAST_LOGIN,
happened_at=user["last_login"],
))
except Exception:
# Skip users that can't be found
pass
return GetLastActivityResponse(
response=activities,
)
Step 7: Implement Write Capabilities
Edit mockapp_connector/capabilities_write.py
to implement write capabilities:
from connector.generated import (
AccountStatus,
ActivateAccountRequest,
ActivateAccountResponse,
AssignEntitlementRequest,
AssignEntitlementResponse,
CreateAccountRequest,
CreateAccountResponse,
DeactivateAccountRequest,
DeactivateAccountResponse,
DeleteAccountRequest,
DeleteAccountResponse,
FoundAccountData,
UnassignEntitlementRequest,
UnassignEntitlementResponse,
)
from mockapp_connector.client import MockappClient
async def create_account(args: CreateAccountRequest) -> CreateAccountResponse:
"""Create a new user account in MockApp."""
req = args.request
# Prepare user data for the API
user_data = {
"email": req.email,
"name": req.display_name,
"status": "active",
}
async with MockappClient(args) as client:
# Create the user
new_user = await client.create_user(user_data)
# Return the created account in Lumos format
return CreateAccountResponse(
response=FoundAccountData(
account_id=new_user["id"],
email=new_user["email"],
display_name=new_user["name"],
status=AccountStatus.ACTIVE,
)
)
async def activate_account(args: ActivateAccountRequest) -> ActivateAccountResponse:
"""Activate a user account in MockApp."""
account_id = args.request.account_id
# Prepare update data
update_data = {
"status": "active"
}
async with MockappClient(args) as client:
# Update the user
updated_user = await client.update_user(account_id, update_data)
# Return the updated account in Lumos format
return ActivateAccountResponse(
response=FoundAccountData(
account_id=updated_user["id"],
email=updated_user["email"],
display_name=updated_user["name"],
status=AccountStatus.ACTIVE,
)
)
async def deactivate_account(args: DeactivateAccountRequest) -> DeactivateAccountResponse:
"""Deactivate a user account in MockApp."""
account_id = args.request.account_id
# Prepare update data
update_data = {
"status": "inactive"
}
async with MockappClient(args) as client:
# Update the user
updated_user = await client.update_user(account_id, update_data)
# Return the updated account in Lumos format
return DeactivateAccountResponse(
response=FoundAccountData(
account_id=updated_user["id"],
email=updated_user["email"],
display_name=updated_user["name"],
status=AccountStatus.INACTIVE,
)
)
async def delete_account(args: DeleteAccountRequest) -> DeleteAccountResponse:
"""Delete a user account in MockApp."""
account_id = args.request.account_id
async with MockappClient(args) as client:
# Delete the user
await client.delete_user(account_id)
# Return empty response as the user has been deleted
return DeleteAccountResponse(
response=None,
)
async def assign_entitlement(args: AssignEntitlementRequest) -> AssignEntitlementResponse:
"""Assign an entitlement to a user in MockApp."""
req = args.request
async with MockappClient(args) as client:
# Assign the entitlement
await client.assign_entitlement(
req.account_id,
req.entitlement_id,
req.resource_id
)
# Return empty response on success
return AssignEntitlementResponse(
response=None,
)
async def unassign_entitlement(args: UnassignEntitlementRequest) -> UnassignEntitlementResponse:
"""Unassign an entitlement from a user in MockApp."""
req = args.request
async with MockappClient(args) as client:
# Unassign the entitlement
await client.unassign_entitlement(
req.account_id,
req.entitlement_id,
req.resource_id
)
# Return empty response on success
return UnassignEntitlementResponse(
response=None,
)
Step 8: Configure the Integration
Finally, edit mockapp_connector/integration.py
to register all capabilities:
import httpx
from connector.generated import TokenCredential
from connector.oai.capability import StandardCapabilityName
from connector.oai.errors import HTTPHandler
from connector.oai.integration import DescriptionData, Integration, AppCategory
from mockapp_connector.__about__ import __version__
from mockapp_connector.enums import entitlement_types, resource_types
from mockapp_connector.settings import MockappSettings
from mockapp_connector import capabilities_read, capabilities_write
integration = Integration(
app_id="mockapp",
version=__version__,
auth=TokenCredential, # Using token auth for our mock API
exception_handlers=[
(httpx.HTTPStatusError, HTTPHandler, None),
],
description_data=DescriptionData(
logo_url="https://logo.clearbit.com/example.com",
user_friendly_name="MockApp",
description="MockApp is a test application for demonstrating Lumos connectors",
categories=[AppCategory.DEVELOPERS, AppCategory.IT_AND_SECURITY],
),
settings_model=MockappSettings,
resource_types=resource_types,
entitlement_types=entitlement_types,
)
# Register all capabilities
integration.register_capabilities(
{
# Required capability
StandardCapabilityName.VALIDATE_CREDENTIALS: capabilities_read.validate_credentials,
# Read capabilities
StandardCapabilityName.LIST_ACCOUNTS: capabilities_read.list_accounts,
StandardCapabilityName.LIST_RESOURCES: capabilities_read.list_resources,
StandardCapabilityName.LIST_ENTITLEMENTS: capabilities_read.list_entitlements,
StandardCapabilityName.FIND_ENTITLEMENT_ASSOCIATIONS: capabilities_read.find_entitlement_associations,
StandardCapabilityName.GET_LAST_ACTIVITY: capabilities_read.get_last_activity,
# Write capabilities
StandardCapabilityName.CREATE_ACCOUNT: capabilities_write.create_account,
StandardCapabilityName.ACTIVATE_ACCOUNT: capabilities_write.activate_account,
StandardCapabilityName.DEACTIVATE_ACCOUNT: capabilities_write.deactivate_account,
StandardCapabilityName.DELETE_ACCOUNT: capabilities_write.delete_account,
StandardCapabilityName.ASSIGN_ENTITLEMENT: capabilities_write.assign_entitlement,
StandardCapabilityName.UNASSIGN_ENTITLEMENT: capabilities_write.unassign_entitlement,
}
)
Part 3: Testing Your Connector
Step 1: Run Basic Tests
First, let's verify the connector compiles correctly:
# Check for type errors
mypy .
# Run the included tests (which use mocked responses)
pytest
Step 2: Test with the Mock API
Now, with our mock API server running, let's test the connector against it:
# Validate credentials
mockapp-connector validate_credentials --json '{"auth":{"token":{"token":"valid-token"}},"request":{},"settings":{"api_url":"http://localhost:8000"}}'
# List accounts
mockapp-connector list_accounts --json '{"auth":{"token":{"token":"valid-token"}},"request":{},"settings":{"api_url":"http://localhost:8000"}}'
If everything is configured correctly, you should get successful responses from both commands.
Step 3: Test Write Operations
Let's also test a write operation:
# Create a new user
mockapp-connector create_account --json '{"auth":{"token":{"token":"valid-token"}},"request":{"email":"[email protected]","display_name":"New User"},"settings":{"api_url":"http://localhost:8000"}}'
Part 4: Adapting for Your Real Application
To adapt this connector for your real application, you'll need to:
- Update the settings: Modify settings.py to include any configuration specific to your application
- Adjust the client: Update client.py to match your API's endpoints and request/response formats
- Map data models: Update the transformation logic in capabilities_read.py and capabilities_write.py
- Change authentication: Update integration.py to use the correct auth type for your application
Key Files and Their Purpose
Here's a summary of the key files and what they're responsible for:
File | Purpose |
---|---|
integration.py | Configures the connector and registers capabilities |
settings.py | Defines the configuration settings |
constants.py | Contains API URLs and other constants |
enums.py | Defines resource and entitlement types |
client.py | Implements API communication |
capabilities_read.py | Implements read operations |
capabilities_write.py | Implements write operations |
pagination.py | Handles pagination for large data sets |
Common Patterns and Best Practices
- Error Handling: Use try/except blocks around API calls to handle errors gracefully
- Data Transformation: Create helper functions for transforming between API and Lumos data models
- Pagination: Follow the pagination pattern shown in list_accounts
- Authentication: Keep authentication logic in the client's prepare_client_args method
- Testing: Use the provided test framework for unit tests
Troubleshooting
- Authentication Errors: Verify your token is valid and being passed correctly
- Data Mapping Issues: Check that field names match between your API and Lumos models
- API Errors: Use httpx.codes to handle different response codes appropriately
- Pagination Problems: Make sure your pagination logic correctly handles empty results
Next Steps
- Add Tests: Create more comprehensive tests for your connector
- Add Documentation: Document any unique behavior or requirements
- Deploy: Use the packaging tools to deploy your connector
- Monitor: Add logging to help troubleshoot issues in production
Conclusion
You now have a working Lumos connector that can integrate with your application. By following the patterns in this tutorial, you can extend it to support additional capabilities or adapt it for other applications.
Updated 2 months ago