Building a Lumos Connector: Step-by-Step Tutorial
This tutorial guides you through the entire process of building a custom Lumos connector for your application. By the end, you'll have:
- A working test server that simulates your application's API
- A fully functional Lumos connector that integrates with this API
- The knowledge to adapt this connector for your real application
Setup Overview
We'll be working with two components:
- Mock API Server: A FastAPI application that simulates your application's API endpoints
- Lumos Connector: A Python package that translates between your API and Lumos
Part 1: Setting Up the Test Environment
Step 1: Install Dependencies
# Create a project directory
mkdir lumos-connector-project
cd lumos-connector-project
# Create a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install required packages
pip install "connector-py[dev]" fastapi uvicorn httpx pydanticStep 2: Set Up the Mock API Server
Make sure to create the Mock API Server file, follow the instructions in Mock API Server for Testing Lumos Connectors.
From a new terminal window, we'll start the mock API server:
source venv/bin/activate
uvicorn mock_api_server:app --reloadThe server should now be running at http://localhost:8000. You can access the API documentation at http://localhost:8000/docs. Do not close this terminal window/session.
Step 3: Test the API Server
Let's verify that the server is working correctly, from your previous terminal window run:
# Check the health endpoint
curl http://localhost:8000/api/health
# List users (requires authentication)
curl -H "Authorization: Bearer valid-token" http://localhost:8000/api/usersPart 2: Creating the Lumos Connector
Step 1: Scaffold a New Connector
Now we'll create a new Lumos connector that will connect to our mock API:
# Create a new connector (from project root)
connector scaffold mockapp-connector mockapp_connector
# Navigate to the connector directory
cd mockapp_connector
# Install the connector dependencies
pip install ".[all]"Step 2: Configure the Connector Settings
Edit mockapp_connector/settings.py to define the configuration settings users will provide:
from pydantic import BaseModel, Field
class MockappConnectorSettings(BaseModel):
"""Settings for MockApp connector."""
api_url: str = Field(
default="http://localhost:8000",
description="The base URL of the MockApp API"
)
# Arbitrary default field, can be removed once unreferenced in tests
account_id: str = Field(
default="",
description="The account ID for the MockApp API"
)Step 3: Update Constants
Edit mockapp_connector/constants.py to set the API base URL:
# API endpoints
BASE_URL = "https://localhost:8000"
API_BASE_PATH = "/api"Step 4: Define Resource and Entitlement Types
Edit mockapp_connector/enums.py to match the types in our mock API:
from enum import Enum
from connector.generated import EntitlementType, ResourceType
class MockappResourceTypes(str, Enum):
PROJECT = "project"
TEAM = "team"
GLOBAL = "global"
class MockappEntitlementTypes(str, Enum):
PROJECT_ROLE = "project_role"
TEAM_MEMBER = "team_member"
ADMIN_ROLE = "admin_role"
resource_types: list[ResourceType] = [
ResourceType(
type_id=MockappResourceTypes.PROJECT,
type_label="Project",
),
ResourceType(
type_id=MockappResourceTypes.TEAM,
type_label="Team",
),
ResourceType(
type_id=MockappResourceTypes.GLOBAL,
type_label="Global Resource",
)
]
entitlement_types: list[EntitlementType] = [
EntitlementType(
type_id=MockappEntitlementTypes.PROJECT_ROLE,
type_label="Project Role",
resource_type_id=MockappResourceTypes.PROJECT,
min=0,
),
EntitlementType(
type_id=MockappEntitlementTypes.TEAM_MEMBER,
type_label="Team Member",
resource_type_id=MockappResourceTypes.TEAM,
min=0,
),
EntitlementType(
type_id=MockappEntitlementTypes.ADMIN_ROLE,
type_label="Admin Role",
resource_type_id=MockappResourceTypes.GLOBAL,
min=0,
max=1, # Users can have at most one admin role
)
]Step 5: Create API Client
Edit mockapp_connector/client.py to create a client for the mock API:
import typing as t
from connector.oai.base_clients import BaseIntegrationClient
from connector.oai.capability import Request, get_token_auth, get_settings
from connector.utils.httpx_auth import BearerAuth
from mockapp_connector.constants import API_BASE_PATH
from mockapp_connector.settings import MockappConnectorSettings
class MockappClient(BaseIntegrationClient):
@classmethod
def prepare_client_args(cls, args: Request) -> dict[str, t.Any]:
settings = get_settings(args, MockappConnectorSettings)
return {
"auth": BearerAuth(token=get_token_auth(args).token),
"base_url": settings.api_url,
}
async def get_users(self, limit: int | None = None, offset: int | None = None):
"""Fetch users from the API"""
params = {}
if limit:
params["limit"] = limit
if offset:
params["offset"] = offset
response = await self._http_client.get(f"{API_BASE_PATH}/users", params=params)
return response.json()
async def get_resources(self):
"""Fetch resources from the API"""
response = await self._http_client.get(f"{API_BASE_PATH}/resources")
return response.json()
async def get_entitlements(self):
"""Fetch entitlements from the API"""
response = await self._http_client.get(f"{API_BASE_PATH}/entitlements")
return response.json()
async def get_associations(self, user_id=None, entitlement_id=None, resource_id=None):
"""Fetch associations from the API"""
params = {}
if user_id:
params["user_id"] = user_id
if entitlement_id:
params["entitlement_id"] = entitlement_id
if resource_id:
params["resource_id"] = resource_id
response = await self._http_client.get(f"{API_BASE_PATH}/associations", params=params)
return response.json()
async def create_user(self, user_data):
"""Create a new user"""
response = await self._http_client.post(f"{API_BASE_PATH}/users", json=user_data)
return response.json()
async def update_user(self, user_id, user_data):
"""Update a user"""
response = await self._http_client.put(f"{API_BASE_PATH}/users/ID", json=user_data)
return response.json()
async def delete_user(self, user_id):
"""Delete a user"""
await self._http_client.delete(f"{API_BASE_PATH}/users/ID")
async def assign_entitlement(self, user_id, entitlement_id, resource_id):
"""Assign an entitlement to a user"""
data = {
"entitlement_id": entitlement_id,
"resource_id": resource_id
}
response = await self._http_client.post(f"{API_BASE_PATH}/users/ID/entitlements", json=data)
return response.json()
async def unassign_entitlement(self, user_id, entitlement_id, resource_id):
"""Unassign an entitlement from a user"""
params = {"resource_id": resource_id}
await self._http_client.delete(
f"{API_BASE_PATH}/users/ID/entitlements/{entitlement_id}",
params=params
)Step 6: Implement Read Capabilities
Edit mockapp_connector/capabilities_read.py to implement the required read capabilities:
from connector.generated import (
AccountStatus,
FindEntitlementAssociationsRequest,
FindEntitlementAssociationsResponse,
FoundAccountData,
FoundEntitlementAssociation,
FoundEntitlementData,
FoundResourceData,
GetLastActivityRequest,
GetLastActivityResponse,
LastActivityData,
ListAccountsRequest,
ListAccountsResponse,
ListEntitlementsRequest,
ListEntitlementsResponse,
ListResourcesRequest,
ListResourcesResponse,
Page,
ValidateCredentialsRequest,
ValidateCredentialsResponse,
ValidatedCredentials,
ActivityEventType,
)
from connector.oai.capability import get_page
from connector.oai.errors import ConnectorError, ErrorCode
from mockapp_connector.pagination import DEFAULT_PAGE_SIZE, NextPageToken, Pagination
from mockapp_connector.client import MockappClient
async def validate_credentials(
args: ValidateCredentialsRequest,
) -> ValidateCredentialsResponse:
"""Validate credentials for MockApp API."""
async with MockappClient(args) as client:
try:
# Test the API with a simple call
await client.get_users(limit=1)
return ValidateCredentialsResponse(
response=ValidatedCredentials(
unique_tenant_id="mockapp-tenant-id",
valid=True,
),
)
except Exception as e:
raise ConnectorError(
message=f"Exception occured: {str(e)}",
error_code=ErrorCode.API_ERROR,
) from e
async def list_accounts(args: ListAccountsRequest) -> ListAccountsResponse:
"""List user accounts from MockApp."""
endpoint = "/api/users"
try:
current_pagination = NextPageToken(get_page(args).token).paginations()[0]
except IndexError:
current_pagination = Pagination.default(endpoint)
page_size = get_page(args).size or DEFAULT_PAGE_SIZE
async with MockappClient(args) as client:
response = await client.get_users(
limit=page_size,
offset=current_pagination.offset,
)
# Transform API response to Lumos account format
accounts = []
for user in response.get("items", []):
status = AccountStatus.ACTIVE
if user.get("status") == "inactive":
status = AccountStatus.INACTIVE
elif user.get("status") == "deleted":
status = AccountStatus.SUSPENDED
accounts.append(FoundAccountData(
integration_specific_id=user["id"],
email=user.get("email", ""),
username=user.get("name", ""),
user_status=status,
))
# Handle pagination
has_more = response.get("has_more", False)
next_pagination = []
if has_more:
next_pagination.append(
Pagination(
endpoint=endpoint,
offset=current_pagination.offset + len(accounts),
)
)
next_page_token = NextPageToken.from_paginations(next_pagination).token if next_pagination else None
return ListAccountsResponse(
response=accounts,
page=Page(
token=next_page_token,
size=page_size,
) if next_page_token else None,
)
async def list_resources(args: ListResourcesRequest) -> ListResourcesResponse:
"""List resources from MockApp."""
async with MockappClient(args) as client:
resources_data = await client.get_resources()
# Transform API response to Lumos resource format
resources = []
for resource in resources_data:
resources.append(FoundResourceData(
integration_specific_id=resource["id"],
resource_type=resource["type"],
label=resource["name"],
))
return ListResourcesResponse(
response=resources,
)
async def list_entitlements(args: ListEntitlementsRequest) -> ListEntitlementsResponse:
"""List entitlements from MockApp."""
async with MockappClient(args) as client:
entitlements_data = await client.get_entitlements()
# Transform API response to Lumos entitlement format
entitlements = []
for entitlement in entitlements_data:
entitlements.append(FoundEntitlementData(
integration_specific_id=entitlement["id"],
entitlement_type=entitlement["type"],
integration_specific_resource_id=entitlement["resource_id"],
label=entitlement["name"],
))
return ListEntitlementsResponse(
response=entitlements,
)
async def find_entitlement_associations(
args: FindEntitlementAssociationsRequest,
) -> FindEntitlementAssociationsResponse:
"""Find entitlement associations in MockApp."""
async with MockappClient(args) as client:
associations_data = await client.get_associations()
# Transform API response to Lumos association format
associations = []
for assoc in associations_data:
associations.append(FoundEntitlementAssociation(
account_id=assoc["user_id"],
integration_specific_entitlement_id=assoc["entitlement_id"],
integration_specific_resource_id=assoc["resource_id"],
))
return FindEntitlementAssociationsResponse(
response=associations,
)
async def get_last_activity(args: GetLastActivityRequest) -> GetLastActivityResponse:
"""Get last activity for users in MockApp."""
account_ids = args.request.account_ids
async with MockappClient(args) as client:
# For each requested user, get their details to check last login
activities = []
for account_id in account_ids:
try:
user_data = await client._http_client.get(f"/api/users/{account_id}")
user = user_data.json()
if user.get("last_login"):
activities.append(LastActivityData(
account_id=account_id,
event_type=ActivityEventType.LAST_LOGIN,
happened_at=user["last_login"],
))
except Exception:
# Skip users that can't be found
pass
return GetLastActivityResponse(
response=activities,
)Step 7: Implement Write Capabilities
Edit mockapp_connector/capabilities_write.py to implement write capabilities:
from connector.generated import (
AccountStatus,
ActivateAccountRequest,
ActivateAccountResponse,
ActivatedAccount,
AssignEntitlementRequest,
AssignEntitlementResponse,
AssignedEntitlement,
CreateAccountResponse,
CreatedAccount,
DeactivateAccountRequest,
DeactivateAccountResponse,
DeactivatedAccount,
DeleteAccountRequest,
DeleteAccountResponse,
DeletedAccount,
UnassignedEntitlement,
UnassignEntitlementRequest,
UnassignEntitlementResponse,
)
from connector.oai.capability import CustomRequest
from mockapp_connector.dto.user import CreateAccount
from mockapp_connector.client import MockappClient
async def create_account(args: CustomRequest[CreateAccount]) -> CreateAccountResponse:
"""Create a new user account in MockApp."""
req = args.request
# Prepare user data for the API
user_data = {
"email": req.email,
"name": req.display_name,
"status": "active",
}
async with MockappClient(args) as client:
# Create the user
new_user = await client.create_user(user_data)
# Return the created account in Lumos format
return CreateAccountResponse(
response=CreatedAccount(
id=new_user["id"],
status=AccountStatus.ACTIVE,
created=True,
)
)
async def activate_account(args: ActivateAccountRequest) -> ActivateAccountResponse:
"""Activate a user account in MockApp."""
account_id = args.request.account_id
# Prepare update data
update_data = {
"status": "active"
}
async with MockappClient(args) as client:
# Update the user
await client.update_user(account_id, update_data)
# Return the new status
return ActivateAccountResponse(
response=ActivatedAccount(
activated=True,
status=AccountStatus.ACTIVE,
)
)
async def deactivate_account(args: DeactivateAccountRequest) -> DeactivateAccountResponse:
"""Deactivate a user account in MockApp."""
account_id = args.request.account_id
# Prepare update data
update_data = {
"status": "inactive"
}
async with MockappClient(args) as client:
# Update the user
await client.update_user(account_id, update_data)
# Return the new status
return DeactivateAccountResponse(
response=DeactivatedAccount(
deactivated=True,
status=AccountStatus.INACTIVE,
)
)
async def delete_account(args: DeleteAccountRequest) -> DeleteAccountResponse:
"""Delete a user account in MockApp."""
account_id = args.request.account_id
async with MockappClient(args) as client:
# Delete the user
await client.delete_user(account_id)
# Return the new status
return DeleteAccountResponse(
response=DeletedAccount(
deleted=True,
status=AccountStatus.INACTIVE,
)
)
async def assign_entitlement(args: AssignEntitlementRequest) -> AssignEntitlementResponse:
"""Assign an entitlement to a user in MockApp."""
req = args.request
async with MockappClient(args) as client:
# Assign the entitlement
await client.assign_entitlement(
req.account_integration_specific_id,
req.entitlement_integration_specific_id,
req.resource_integration_specific_id
)
# Return empty response on success
return AssignEntitlementResponse(
response=AssignedEntitlement(
assigned=True,
)
)
async def unassign_entitlement(args: UnassignEntitlementRequest) -> UnassignEntitlementResponse:
"""Unassign an entitlement from a user in MockApp."""
req = args.request
async with MockappClient(args) as client:
# Unassign the entitlement
await client.unassign_entitlement(
req.account_integration_specific_id,
req.entitlement_integration_specific_id,
req.resource_integration_specific_id
)
# Return empty response on success
return UnassignEntitlementResponse(
response=UnassignedEntitlement(
unassigned=True,
)
)Step 8: Update the User DTO
Edit mockapp_connector/dto/user.py to add the required API field display_name to the CreateAccount model:
from connector.generated import CreatableAccount
from pydantic import Field
# TODO: add/remove fields required to create account
class CreateAccount(CreatableAccount):
email: str = Field(default=None, description="The email address for the new account")
display_name: str = Field(default=None, description="The display name for the new account")
Step 9: Configure the Integration
Finally, edit mockapp_connector/integration.py to register all capabilities:
import httpx
from connector.generated import TokenCredential
from connector.oai.capability import StandardCapabilityName
from connector.oai.errors import HTTPHandler
from connector.oai.integration import DescriptionData, Integration, AppCategory
from mockapp_connector.__about__ import __version__
from mockapp_connector.enums import entitlement_types, resource_types
from mockapp_connector.settings import MockappConnectorSettings
from mockapp_connector import capabilities_read, capabilities_write
integration = Integration(
app_id="mockapp",
version=__version__,
auth=TokenCredential, # Using token auth for our mock API
exception_handlers=[
(httpx.HTTPStatusError, HTTPHandler, None),
],
description_data=DescriptionData(
logo_url="https://logo.clearbit.com/example.com",
user_friendly_name="MockApp",
description="MockApp is a test application for demonstrating Lumos connectors",
categories=[AppCategory.DEVELOPERS, AppCategory.IT_AND_SECURITY],
),
settings_model=MockappConnectorSettings,
resource_types=resource_types,
entitlement_types=entitlement_types,
)
# Register all capabilities
integration.register_capabilities(
{
# Required capability
StandardCapabilityName.VALIDATE_CREDENTIALS: capabilities_read.validate_credentials,
# Read capabilities
StandardCapabilityName.LIST_ACCOUNTS: capabilities_read.list_accounts,
StandardCapabilityName.LIST_RESOURCES: capabilities_read.list_resources,
StandardCapabilityName.LIST_ENTITLEMENTS: capabilities_read.list_entitlements,
StandardCapabilityName.FIND_ENTITLEMENT_ASSOCIATIONS: capabilities_read.find_entitlement_associations,
StandardCapabilityName.GET_LAST_ACTIVITY: capabilities_read.get_last_activity,
# Write capabilities
StandardCapabilityName.CREATE_ACCOUNT: capabilities_write.create_account,
StandardCapabilityName.ACTIVATE_ACCOUNT: capabilities_write.activate_account,
StandardCapabilityName.DEACTIVATE_ACCOUNT: capabilities_write.deactivate_account,
StandardCapabilityName.DELETE_ACCOUNT: capabilities_write.delete_account,
StandardCapabilityName.ASSIGN_ENTITLEMENT: capabilities_write.assign_entitlement,
StandardCapabilityName.UNASSIGN_ENTITLEMENT: capabilities_write.unassign_entitlement,
}
) Part 3: Testing Your Connector
Step 1: Run Basic Tests
First, let's verify the connector compiles correctly:
# First re-install the module
cd mockapp_connector
pip instal -e .
# Run the type checker
mypy .
# (Optional) run the tests, note that for successful test execution you would need to implement the test cases
pytest mockapp_connector/testsStep 2: Test with the Mock API
Now, with our mock API server running, let's test the connector against it:
# Validate credentials
mockapp-connector validate_credentials --json '{"auth":{"token":{"token":"valid-token"}},"request":{},"settings":{"api_url":"http://localhost:8000"}}'
# List accounts
mockapp-connector list_accounts --json '{"auth":{"token":{"token":"valid-token"}},"request":{},"settings":{"api_url":"http://localhost:8000"}}'If everything is configured correctly, you should get successful responses from both commands.
Step 3: Test Write Operations
Let's also test a write operation:
# Create a new user
mockapp-connector create_account --json '{"auth":{"token":{"token":"valid-token"}},"request":{"email":"[email protected]","display_name":"New User", "entitlements": []},"settings":{"api_url":"http://localhost:8000"}}'Part 4: Adapting for Your Real Application
To adapt this connector for your real application, you'll need to:
- Update the settings: Modify settings.py to include any configuration specific to your application
- Adjust the client: Update client.py to match your API's endpoints and request/response formats
- Map data models: Update the transformation logic in capabilities_read.py and capabilities_write.py
- Change authentication: Update integration.py to use the correct auth type for your application
Key Files and Their Purpose
Here's a summary of the key files and what they're responsible for:
| File | Purpose |
|---|---|
| integration.py | Configures the connector and registers capabilities |
| settings.py | Defines the configuration settings |
| constants.py | Contains API URLs and other constants |
| enums.py | Defines resource and entitlement types |
| client.py | Implements API communication |
| capabilities_read.py | Implements read operations |
| capabilities_write.py | Implements write operations |
| pagination.py | Handles pagination for large data sets |
| dto/*.py | Declares the necessary inputs/outputs for the API objects |
Common Patterns and Best Practices
- Error Handling: Use try/except blocks around API calls to handle errors gracefully
- Data Transformation: Create helper functions for transforming between API and Lumos data models
- Pagination: Follow the pagination pattern shown in list_accounts
- Authentication: Keep authentication logic in the client's prepare_client_args method
- Testing: Use the provided test framework for unit tests
Troubleshooting
- Authentication Errors: Verify your token is valid and being passed correctly
- Data Mapping Issues: Check that field names match between your API and Lumos models
- API Errors: Use httpx.codes to handle different response codes appropriately
- Pagination Problems: Make sure your pagination logic correctly handles empty results
Next Steps
- Implement Tests: Leverage the cases structure and properly test your implementation
- Add Documentation: Document any unique behavior or requirements
- Deploy: Use the packaging tools to deploy your connector
- Monitor: Add logging to help troubleshoot issues in production
Conclusion
You now have a working Lumos connector that can integrate with your application. By following the patterns in this tutorial, you can extend it to support additional capabilities or adapt it for other applications.