Building a Lumos Connector: Step-by-Step Tutorial

This tutorial guides you through the entire process of building a custom Lumos connector for your application. By the end, you'll have:

  1. A working test server that simulates your application's API
  2. A fully functional Lumos connector that integrates with this API
  3. The knowledge to adapt this connector for your real application

Setup Overview

We'll be working with two components:

  1. Mock API Server: A FastAPI application that simulates your application's API endpoints
  2. Lumos Connector: A Python package that translates between your API and Lumos

Part 1: Setting Up the Test Environment

Step 1: Install Dependencies

First, create a new directory for your project and set up a virtual environment:

# Create a project directory
mkdir lumos-connector-project
cd lumos-connector-project

# Create a virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install required packages
pip install connector-py[dev] fastapi uvicorn httpx pydantic

Step 2: Set Up the Mock API Server

Create a file named mock_api_server.py with the code from the test server artifact.

Run the server:

uvicorn mock_api_server:app --reload

The server should now be running at http://localhost:8000. You can access the API documentation at http://localhost:8000/docs.

Step 3: Test the API Server

Let's verify that the server is working correctly:

# Check the health endpoint
curl http://localhost:8000/api/health

# List users (requires authentication)
curl -H "Authorization: Bearer valid-token" http://localhost:8000/api/users

The first request should work without authentication, while the second should return user data if you provide the correct token.

Part 2: Creating the Lumos Connector

Step 1: Scaffold a New Connector

Now we'll create a new Lumos connector that will connect to our mock API:

# Create a new connector (from project root)
connector scaffold mockapp-connector mockapp_connector

# Navigate to the connector directory
cd mockapp_connector

# Install the connector dependencies
pip install ".[all]"

Step 2: Configure the Connector Settings

Edit mockapp_connector/settings.py to define the configuration settings users will provide:

from pydantic import BaseModel, Field

class MockappSettings(BaseModel):
    """Settings for MockApp connector."""
    api_url: str = Field(
        default="http://localhost:8000",
        description="The base URL of the MockApp API"
    )

Step 3: Update Constants

Edit mockapp_connector/constants.py to set the API base URL:

# API endpoints
API_BASE_PATH = "/api"

Step 4: Define Resource and Entitlement Types

Edit mockapp_connector/enums.py to match the types in our mock API:

from enum import Enum
from connector.generated import EntitlementType, ResourceType

class MockappResourceTypes(str, Enum):
    PROJECT = "project"
    TEAM = "team"
    GLOBAL = "global"

class MockappEntitlementTypes(str, Enum):
    PROJECT_ROLE = "project_role"
    TEAM_MEMBER = "team_member"
    ADMIN_ROLE = "admin_role"

resource_types: list[ResourceType] = [
    ResourceType(
        type_id=MockappResourceTypes.PROJECT,
        type_label="Project",
    ),
    ResourceType(
        type_id=MockappResourceTypes.TEAM,
        type_label="Team",
    ),
    ResourceType(
        type_id=MockappResourceTypes.GLOBAL,
        type_label="Global Resource",
    )
]

entitlement_types: list[EntitlementType] = [
    EntitlementType(
        type_id=MockappEntitlementTypes.PROJECT_ROLE,
        type_label="Project Role",
        resource_type_id=MockappResourceTypes.PROJECT,
        min=0,
    ),
    EntitlementType(
        type_id=MockappEntitlementTypes.TEAM_MEMBER,
        type_label="Team Member",
        resource_type_id=MockappResourceTypes.TEAM,
        min=0,
    ),
    EntitlementType(
        type_id=MockappEntitlementTypes.ADMIN_ROLE,
        type_label="Admin Role",
        resource_type_id=MockappResourceTypes.GLOBAL,
        min=0,
        max=1,  # Users can have at most one admin role
    )
]

Step 5: Create API Client

Edit mockapp_connector/client.py to create a client for the mock API:

import typing as t
from urllib.parse import urljoin

from connector.oai.base_clients import BaseIntegrationClient
from connector.oai.capability import Request, get_oauth, get_settings
from connector.utils.httpx_auth import BearerAuth
from connector.utils.client_utils import create_client_response

from mockapp_connector.constants import API_BASE_PATH
from mockapp_connector.settings import MockappSettings

class MockappClient(BaseIntegrationClient):
    @classmethod
    def prepare_client_args(cls, args: Request) -> dict[str, t.Any]:
        settings = get_settings(args, MockappSettings)
        return {
            "auth": BearerAuth(token=get_oauth(args).access_token),
            "base_url": settings.api_url,
        }
    
    async def get_users(self, limit: int | None = None, offset: int | None = None):
        """Fetch users from the API"""
        params = {}
        if limit:
            params["limit"] = limit
        if offset:
            params["offset"] = offset
            
        response = await self._http_client.get(f"{API_BASE_PATH}/users", params=params)
        return response.json()
    
    async def get_resources(self):
        """Fetch resources from the API"""
        response = await self._http_client.get(f"{API_BASE_PATH}/resources")
        return response.json()
    
    async def get_entitlements(self):
        """Fetch entitlements from the API"""
        response = await self._http_client.get(f"{API_BASE_PATH}/entitlements")
        return response.json()
    
    async def get_associations(self, user_id=None, entitlement_id=None, resource_id=None):
        """Fetch associations from the API"""
        params = {}
        if user_id:
            params["user_id"] = user_id
        if entitlement_id:
            params["entitlement_id"] = entitlement_id
        if resource_id:
            params["resource_id"] = resource_id
            
        response = await self._http_client.get(f"{API_BASE_PATH}/associations", params=params)
        return response.json()
    
    async def create_user(self, user_data):
        """Create a new user"""
        response = await self._http_client.post(f"{API_BASE_PATH}/users", json=user_data)
        return response.json()
    
    async def update_user(self, user_id, user_data):
        """Update a user"""
        response = await self._http_client.put(f"{API_BASE_PATH}/users/{user_id}", json=user_data)
        return response.json()
    
    async def delete_user(self, user_id):
        """Delete a user"""
        await self._http_client.delete(f"{API_BASE_PATH}/users/{user_id}")
    
    async def assign_entitlement(self, user_id, entitlement_id, resource_id):
        """Assign an entitlement to a user"""
        data = {
            "entitlement_id": entitlement_id,
            "resource_id": resource_id
        }
        response = await self._http_client.post(f"{API_BASE_PATH}/users/{user_id}/entitlements", json=data)
        return response.json()
    
    async def unassign_entitlement(self, user_id, entitlement_id, resource_id):
        """Unassign an entitlement from a user"""
        params = {"resource_id": resource_id}
        await self._http_client.delete(
            f"{API_BASE_PATH}/users/{user_id}/entitlements/{entitlement_id}", 
            params=params
        )

Step 6: Implement Read Capabilities

Edit mockapp_connector/capabilities_read.py to implement the required read capabilities:

from connector.generated import (
    AccountStatus,
    FindEntitlementAssociationsRequest,
    FindEntitlementAssociationsResponse,
    FoundAccountData,
    FoundEntitlementAssociation,
    FoundEntitlementData,
    FoundResourceData,
    GetLastActivityRequest,
    GetLastActivityResponse,
    LastActivityData,
    ListAccountsRequest,
    ListAccountsResponse,
    ListEntitlementsRequest,
    ListEntitlementsResponse,
    ListResourcesRequest,
    ListResourcesResponse,
    Page,
    ValidateCredentialsRequest,
    ValidateCredentialsResponse,
    ValidatedCredentials,
    ActivityEventType,
)
from connector.oai.capability import get_page, get_settings
from datetime import datetime

from mockapp_connector.pagination import DEFAULT_PAGE_SIZE, NextPageToken, Pagination
from mockapp_connector.settings import MockappSettings
from mockapp_connector.client import MockappClient
from mockapp_connector.enums import MockappResourceTypes, MockappEntitlementTypes

async def validate_credentials(
    args: ValidateCredentialsRequest,
) -> ValidateCredentialsResponse:
    """Validate credentials for MockApp API."""
    async with MockappClient(args) as client:
        try:
            # Test the API with a simple call
            await client.get_users(limit=1)
            
            return ValidateCredentialsResponse(
                response=ValidatedCredentials(
                    unique_tenant_id="mockapp-tenant-id",
                    valid=True,
                ),
            )
        except Exception:
            return ValidateCredentialsResponse(
                response=ValidatedCredentials(
                    valid=False,
                ),
            )

async def list_accounts(args: ListAccountsRequest) -> ListAccountsResponse:
    """List user accounts from MockApp."""
    endpoint = "/api/users"
    try:
        current_pagination = NextPageToken(get_page(args).token).paginations()[0]
    except IndexError:
        current_pagination = Pagination.default(endpoint)

    page_size = get_page(args).size or DEFAULT_PAGE_SIZE
    
    async with MockappClient(args) as client:
        response = await client.get_users(
            limit=page_size, 
            offset=current_pagination.offset,
        )
        
        # Transform API response to Lumos account format
        accounts = []
        for user in response.get("items", []):
            status = AccountStatus.ACTIVE
            if user.get("status") == "inactive":
                status = AccountStatus.INACTIVE
            elif user.get("status") == "deleted":
                status = AccountStatus.DEACTIVATED
                
            accounts.append(FoundAccountData(
                account_id=user["id"],
                email=user.get("email", ""),
                display_name=user.get("name", ""),
                status=status,
            ))
            
        # Handle pagination
        has_more = response.get("has_more", False)
        next_pagination = []
        if has_more:
            next_pagination.append(
                Pagination(
                    endpoint=endpoint,
                    offset=current_pagination.offset + len(accounts),
                )
            )

        next_page_token = NextPageToken.from_paginations(next_pagination).token if next_pagination else None

    return ListAccountsResponse(
        response=accounts,
        page=Page(
            token=next_page_token,
            size=page_size,
        ) if next_page_token else None,
    )

async def list_resources(args: ListResourcesRequest) -> ListResourcesResponse:
    """List resources from MockApp."""
    async with MockappClient(args) as client:
        resources_data = await client.get_resources()
        
        # Transform API response to Lumos resource format
        resources = []
        for resource in resources_data:
            resources.append(FoundResourceData(
                resource_id=resource["id"],
                resource_type=resource["type"],
                display_name=resource["name"],
                description=resource.get("description", ""),
            ))
            
    return ListResourcesResponse(
        response=resources,
    )

async def list_entitlements(args: ListEntitlementsRequest) -> ListEntitlementsResponse:
    """List entitlements from MockApp."""
    async with MockappClient(args) as client:
        entitlements_data = await client.get_entitlements()
        
        # Transform API response to Lumos entitlement format
        entitlements = []
        for entitlement in entitlements_data:
            entitlements.append(FoundEntitlementData(
                entitlement_id=entitlement["id"],
                entitlement_type=entitlement["type"],
                resource_id=entitlement["resource_id"],
                display_name=entitlement["name"],
                description=entitlement.get("description", ""),
            ))
            
    return ListEntitlementsResponse(
        response=entitlements,
    )

async def find_entitlement_associations(
    args: FindEntitlementAssociationsRequest,
) -> FindEntitlementAssociationsResponse:
    """Find entitlement associations in MockApp."""
    async with MockappClient(args) as client:
        associations_data = await client.get_associations()
        
        # Transform API response to Lumos association format
        associations = []
        for assoc in associations_data:
            associations.append(FoundEntitlementAssociation(
                account_id=assoc["user_id"],
                entitlement_id=assoc["entitlement_id"],
                resource_id=assoc["resource_id"],
            ))
            
    return FindEntitlementAssociationsResponse(
        response=associations,
    )

async def get_last_activity(args: GetLastActivityRequest) -> GetLastActivityResponse:
    """Get last activity for users in MockApp."""
    account_ids = args.request.account_ids
    
    async with MockappClient(args) as client:
        # For each requested user, get their details to check last login
        activities = []
        
        for account_id in account_ids:
            try:
                user_data = await client._http_client.get(f"/api/users/{account_id}")
                user = user_data.json()
                
                if user.get("last_login"):
                    activities.append(LastActivityData(
                        account_id=account_id,
                        event_type=ActivityEventType.LAST_LOGIN,
                        happened_at=user["last_login"],
                    ))
            except Exception:
                # Skip users that can't be found
                pass
                
    return GetLastActivityResponse(
        response=activities,
    )

Step 7: Implement Write Capabilities

Edit mockapp_connector/capabilities_write.py to implement write capabilities:

from connector.generated import (
    AccountStatus,
    ActivateAccountRequest,
    ActivateAccountResponse,
    AssignEntitlementRequest,
    AssignEntitlementResponse,
    CreateAccountRequest,
    CreateAccountResponse,
    DeactivateAccountRequest,
    DeactivateAccountResponse,
    DeleteAccountRequest,
    DeleteAccountResponse,
    FoundAccountData,
    UnassignEntitlementRequest,
    UnassignEntitlementResponse,
)

from mockapp_connector.client import MockappClient

async def create_account(args: CreateAccountRequest) -> CreateAccountResponse:
    """Create a new user account in MockApp."""
    req = args.request
    
    # Prepare user data for the API
    user_data = {
        "email": req.email,
        "name": req.display_name,
        "status": "active",
    }
    
    async with MockappClient(args) as client:
        # Create the user
        new_user = await client.create_user(user_data)
        
        # Return the created account in Lumos format
        return CreateAccountResponse(
            response=FoundAccountData(
                account_id=new_user["id"],
                email=new_user["email"],
                display_name=new_user["name"],
                status=AccountStatus.ACTIVE,
            )
        )

async def activate_account(args: ActivateAccountRequest) -> ActivateAccountResponse:
    """Activate a user account in MockApp."""
    account_id = args.request.account_id
    
    # Prepare update data
    update_data = {
        "status": "active"
    }
    
    async with MockappClient(args) as client:
        # Update the user
        updated_user = await client.update_user(account_id, update_data)
        
        # Return the updated account in Lumos format
        return ActivateAccountResponse(
            response=FoundAccountData(
                account_id=updated_user["id"],
                email=updated_user["email"],
                display_name=updated_user["name"],
                status=AccountStatus.ACTIVE,
            )
        )

async def deactivate_account(args: DeactivateAccountRequest) -> DeactivateAccountResponse:
    """Deactivate a user account in MockApp."""
    account_id = args.request.account_id
    
    # Prepare update data
    update_data = {
        "status": "inactive"
    }
    
    async with MockappClient(args) as client:
        # Update the user
        updated_user = await client.update_user(account_id, update_data)
        
        # Return the updated account in Lumos format
        return DeactivateAccountResponse(
            response=FoundAccountData(
                account_id=updated_user["id"],
                email=updated_user["email"],
                display_name=updated_user["name"],
                status=AccountStatus.INACTIVE,
            )
        )

async def delete_account(args: DeleteAccountRequest) -> DeleteAccountResponse:
    """Delete a user account in MockApp."""
    account_id = args.request.account_id
    
    async with MockappClient(args) as client:
        # Delete the user
        await client.delete_user(account_id)
        
        # Return empty response as the user has been deleted
        return DeleteAccountResponse(
            response=None,
        )

async def assign_entitlement(args: AssignEntitlementRequest) -> AssignEntitlementResponse:
    """Assign an entitlement to a user in MockApp."""
    req = args.request
    
    async with MockappClient(args) as client:
        # Assign the entitlement
        await client.assign_entitlement(
            req.account_id, 
            req.entitlement_id, 
            req.resource_id
        )
        
        # Return empty response on success
        return AssignEntitlementResponse(
            response=None,
        )

async def unassign_entitlement(args: UnassignEntitlementRequest) -> UnassignEntitlementResponse:
    """Unassign an entitlement from a user in MockApp."""
    req = args.request
    
    async with MockappClient(args) as client:
        # Unassign the entitlement
        await client.unassign_entitlement(
            req.account_id, 
            req.entitlement_id, 
            req.resource_id
        )
        
        # Return empty response on success
        return UnassignEntitlementResponse(
            response=None,
        )

Step 8: Configure the Integration

Finally, edit mockapp_connector/integration.py to register all capabilities:

import httpx
from connector.generated import TokenCredential
from connector.oai.capability import StandardCapabilityName
from connector.oai.errors import HTTPHandler
from connector.oai.integration import DescriptionData, Integration, AppCategory

from mockapp_connector.__about__ import __version__
from mockapp_connector.enums import entitlement_types, resource_types
from mockapp_connector.settings import MockappSettings
from mockapp_connector import capabilities_read, capabilities_write

integration = Integration(
    app_id="mockapp",
    version=__version__,
    auth=TokenCredential,  # Using token auth for our mock API
    exception_handlers=[
        (httpx.HTTPStatusError, HTTPHandler, None),
    ],
    description_data=DescriptionData(
        logo_url="https://logo.clearbit.com/example.com",
        user_friendly_name="MockApp",
        description="MockApp is a test application for demonstrating Lumos connectors",
        categories=[AppCategory.DEVELOPERS, AppCategory.IT_AND_SECURITY],
    ),
    settings_model=MockappSettings,
    resource_types=resource_types,
    entitlement_types=entitlement_types,
)

# Register all capabilities
integration.register_capabilities(
    {
        # Required capability
        StandardCapabilityName.VALIDATE_CREDENTIALS: capabilities_read.validate_credentials,
        
        # Read capabilities
        StandardCapabilityName.LIST_ACCOUNTS: capabilities_read.list_accounts,
        StandardCapabilityName.LIST_RESOURCES: capabilities_read.list_resources,
        StandardCapabilityName.LIST_ENTITLEMENTS: capabilities_read.list_entitlements,
        StandardCapabilityName.FIND_ENTITLEMENT_ASSOCIATIONS: capabilities_read.find_entitlement_associations,
        StandardCapabilityName.GET_LAST_ACTIVITY: capabilities_read.get_last_activity,
        
        # Write capabilities
        StandardCapabilityName.CREATE_ACCOUNT: capabilities_write.create_account,
        StandardCapabilityName.ACTIVATE_ACCOUNT: capabilities_write.activate_account,
        StandardCapabilityName.DEACTIVATE_ACCOUNT: capabilities_write.deactivate_account,
        StandardCapabilityName.DELETE_ACCOUNT: capabilities_write.delete_account,
        StandardCapabilityName.ASSIGN_ENTITLEMENT: capabilities_write.assign_entitlement,
        StandardCapabilityName.UNASSIGN_ENTITLEMENT: capabilities_write.unassign_entitlement,
    }
)

Part 3: Testing Your Connector

Step 1: Run Basic Tests

First, let's verify the connector compiles correctly:

# Check for type errors
mypy .

# Run the included tests (which use mocked responses)
pytest

Step 2: Test with the Mock API

Now, with our mock API server running, let's test the connector against it:

# Validate credentials
mockapp-connector validate_credentials --json '{"auth":{"token":{"token":"valid-token"}},"request":{},"settings":{"api_url":"http://localhost:8000"}}'

# List accounts
mockapp-connector list_accounts --json '{"auth":{"token":{"token":"valid-token"}},"request":{},"settings":{"api_url":"http://localhost:8000"}}'

If everything is configured correctly, you should get successful responses from both commands.

Step 3: Test Write Operations

Let's also test a write operation:

# Create a new user
mockapp-connector create_account --json '{"auth":{"token":{"token":"valid-token"}},"request":{"email":"[email protected]","display_name":"New User"},"settings":{"api_url":"http://localhost:8000"}}'

Part 4: Adapting for Your Real Application

To adapt this connector for your real application, you'll need to:

  1. Update the settings: Modify settings.py to include any configuration specific to your application
  2. Adjust the client: Update client.py to match your API's endpoints and request/response formats
  3. Map data models: Update the transformation logic in capabilities_read.py and capabilities_write.py
  4. Change authentication: Update integration.py to use the correct auth type for your application

Key Files and Their Purpose

Here's a summary of the key files and what they're responsible for:

FilePurpose
integration.pyConfigures the connector and registers capabilities
settings.pyDefines the configuration settings
constants.pyContains API URLs and other constants
enums.pyDefines resource and entitlement types
client.pyImplements API communication
capabilities_read.pyImplements read operations
capabilities_write.pyImplements write operations
pagination.pyHandles pagination for large data sets

Common Patterns and Best Practices

  1. Error Handling: Use try/except blocks around API calls to handle errors gracefully
  2. Data Transformation: Create helper functions for transforming between API and Lumos data models
  3. Pagination: Follow the pagination pattern shown in list_accounts
  4. Authentication: Keep authentication logic in the client's prepare_client_args method
  5. Testing: Use the provided test framework for unit tests

Troubleshooting

  • Authentication Errors: Verify your token is valid and being passed correctly
  • Data Mapping Issues: Check that field names match between your API and Lumos models
  • API Errors: Use httpx.codes to handle different response codes appropriately
  • Pagination Problems: Make sure your pagination logic correctly handles empty results

Next Steps

  1. Add Tests: Create more comprehensive tests for your connector
  2. Add Documentation: Document any unique behavior or requirements
  3. Deploy: Use the packaging tools to deploy your connector
  4. Monitor: Add logging to help troubleshoot issues in production

Conclusion

You now have a working Lumos connector that can integrate with your application. By following the patterns in this tutorial, you can extend it to support additional capabilities or adapt it for other applications.