π² Sample Application Database
Discover and manage application accounts and their permissions
Introduction
This collection of scripts creates and populates a sample application database in Microsoft SQL Server demonstrating the Lumos JDBC Connector capabilities modeling authorization for a sample application.
The schema implements a comprehensive identity management data model including user/service accounts and permissions. The database is pre-configured with realistic sample data and it maintains industry-standard distributions for user statuses and account types, with configurable relationships between entities. The number of accounts and permissions can be adjusted as needed.
The included JDBC queries demonstrate all required Lumos connector capabilities including both listing and managing accounts and managing permissions. This sample database can be used for testing, development, and demonstration of Lumos JDBC Connector functionalities in a controlled environment.
In production, custom applications will have their own schema and column names that can similarly be modeled to be managed through Lumos JDBC Connector.
Database and Schema Setup
Follow the steps below to setup the sample database and schema:
- Launch SQL Server Management Studio and connect to your SQL Server instance
- Create a new database:
- In Object Explorer, right-click on "Databases"
- Select "New Database"
- Enter a name (e.g., "LumosDemo")
- Click "OK"
- Create a new query window:
- Right-click on your new database
- Select "New Query"
- Right-click on your new database
- Execute the schema creation:
- Copy the entire content of the first artifact (schema creation)
- Paste it into the query window
- Click "Execute" or press F5
-- Create Accounts table
CREATE TABLE Accounts (
integration_specific_id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT NEWID(),
email NVARCHAR(255) NOT NULL,
given_name NVARCHAR(100),
family_name NVARCHAR(100),
username NVARCHAR(100) NOT NULL,
user_status NVARCHAR(20) CHECK (user_status IN ('ACTIVE', 'INACTIVE', 'SUSPENDED', 'DEPROVISIONED', 'PENDING', 'DELETED')),
account_type NVARCHAR(20) CHECK (account_type IN ('SERVICE', 'USER')),
created_at DATETIME2 DEFAULT GETDATE(),
CONSTRAINT UQ_Email UNIQUE (email),
CONSTRAINT UQ_Username UNIQUE (username)
);
-- Create Resources table
CREATE TABLE Resources (
integration_specific_id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT NEWID(),
label NVARCHAR(255) NOT NULL,
resource_type NVARCHAR(50) CHECK (resource_type IN ('Site', 'Workspace')),
created_at DATETIME2 DEFAULT GETDATE()
);
-- Create Entitlements table
CREATE TABLE Entitlements (
integration_specific_id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT NEWID(),
integration_specific_resource_id UNIQUEIDENTIFIER,
entitlement_type NVARCHAR(50) CHECK (entitlement_type IN ('Group', 'Role')),
is_assignable BIT DEFAULT 1,
label NVARCHAR(255) NOT NULL,
created_at DATETIME2 DEFAULT GETDATE(),
CONSTRAINT FK_Entitlements_Resources FOREIGN KEY (integration_specific_resource_id)
REFERENCES Resources(integration_specific_id)
);
-- Create EntitlementAssociations table
CREATE TABLE EntitlementAssociations (
id BIGINT IDENTITY(1,1) PRIMARY KEY,
account_id UNIQUEIDENTIFIER,
integration_specific_entitlement_id UNIQUEIDENTIFIER,
integration_specific_resource_id UNIQUEIDENTIFIER,
assigned_at DATETIME2 DEFAULT GETDATE(),
CONSTRAINT FK_EntitlementAssociations_Accounts FOREIGN KEY (account_id)
REFERENCES Accounts(integration_specific_id),
CONSTRAINT FK_EntitlementAssociations_Entitlements FOREIGN KEY (integration_specific_entitlement_id)
REFERENCES Entitlements(integration_specific_id),
CONSTRAINT FK_EntitlementAssociations_Resources FOREIGN KEY (integration_specific_resource_id)
REFERENCES Resources(integration_specific_id)
);
-- Create ActivityLog table
CREATE TABLE ActivityLog (
id BIGINT IDENTITY(1,1) PRIMARY KEY,
account_id UNIQUEIDENTIFIER,
event_type NVARCHAR(100) NOT NULL,
happened_at DATETIME2 DEFAULT GETDATE(),
CONSTRAINT FK_ActivityLog_Accounts FOREIGN KEY (account_id)
REFERENCES Accounts(integration_specific_id)
);
-- Create indices for better query performance
CREATE INDEX IX_Accounts_Status ON Accounts(user_status);
CREATE INDEX IX_Resources_Type ON Resources(resource_type);
CREATE INDEX IX_Entitlements_Type ON Entitlements(entitlement_type);
CREATE INDEX IX_ActivityLog_AccountId ON ActivityLog(account_id);
CREATE INDEX IX_EntitlementAssociations_AccountId ON EntitlementAssociations(account_id);
To verify the setup, execute this query:
-- Verify table creation
SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE';
-- Check account distribution
SELECT user_status, account_type, COUNT(*) as count
FROM Accounts
GROUP BY user_status, account_type;
-- Verify entitlement associations
SELECT
a.username,
r.label as resource_name,
e.label as entitlement_name
FROM EntitlementAssociations ea
JOIN Accounts a ON ea.account_id = a.integration_specific_id
JOIN Resources r ON ea.integration_specific_resource_id = r.integration_specific_id
JOIN Entitlements e ON ea.integration_specific_entitlement_id = e.integration_specific_id;
Tips
If you run into any errors during execution:
- Make sure you're connected with appropriate permissions (sysadmin or db_owner)
- Check that no objects already exist (if they do, you might need to drop them first)
- Ensure you're executing against the correct database (you can use USE [LumosDemo]; at the start of your scripts)
Sample Data Generator
This is a Python script that will generate sample accounts and entitlements in the database.
Install Python Dependencies
pip install pyodbc faker tqdm
While running the Python script below, pass the server, database, uid and pwd to match your environment.
python lumos_generator.py --server sql.example.com --database LumosDemo --uid your_username --pwd your_password
import pyodbc
import uuid
import random
from datetime import datetime, timedelta
import faker
import tqdm
import argparse
import os
from typing import Optional
class LumosDataGenerator:
def __init__(self, connection_string, num_users=200, num_resources=10, num_entitlements=20):
self.conn = pyodbc.connect(connection_string)
self.cursor = self.conn.cursor()
self.fake = faker.Faker()
# Configuration
self.num_users = num_users
self.num_resources = num_resources
self.num_entitlements = num_entitlements
# Distributions
self.user_status_dist = {
'ACTIVE': 0.80,
'INACTIVE': 0.10,
'SUSPENDED': 0.07,
'PENDING': 0.03
}
self.account_type_dist = {
'USER': 0.90,
'SERVICE': 0.10
}
self.resource_type_dist = {
'Site': 0.40,
'Workspace': 0.60
}
self.entitlement_type_dist = {
'Group': 0.60,
'Role': 0.40
}
def create_tables(self):
"""Create all necessary tables if they don't exist"""
print("Creating/verifying tables...")
# Accounts table
self.cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'Accounts')
CREATE TABLE Accounts (
integration_specific_id UNIQUEIDENTIFIER PRIMARY KEY,
email NVARCHAR(255) NOT NULL,
given_name NVARCHAR(100),
family_name NVARCHAR(100),
username NVARCHAR(100) NOT NULL,
user_status NVARCHAR(50) NOT NULL,
account_type NVARCHAR(50) NOT NULL,
created_at DATETIME DEFAULT GETDATE()
)
""")
# Resources table
self.cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'Resources')
CREATE TABLE Resources (
integration_specific_id UNIQUEIDENTIFIER PRIMARY KEY,
label NVARCHAR(255) NOT NULL,
resource_type NVARCHAR(50) NOT NULL
)
""")
# Entitlements table
self.cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'Entitlements')
CREATE TABLE Entitlements (
integration_specific_id UNIQUEIDENTIFIER PRIMARY KEY,
integration_specific_resource_id UNIQUEIDENTIFIER,
entitlement_type NVARCHAR(50) NOT NULL,
is_assignable BIT NOT NULL,
label NVARCHAR(255) NOT NULL,
FOREIGN KEY (integration_specific_resource_id) REFERENCES Resources(integration_specific_id)
)
""")
# EntitlementAssociations table
self.cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'EntitlementAssociations')
CREATE TABLE EntitlementAssociations (
account_id UNIQUEIDENTIFIER,
integration_specific_entitlement_id UNIQUEIDENTIFIER,
integration_specific_resource_id UNIQUEIDENTIFIER,
FOREIGN KEY (account_id) REFERENCES Accounts(integration_specific_id),
FOREIGN KEY (integration_specific_entitlement_id) REFERENCES Entitlements(integration_specific_id),
FOREIGN KEY (integration_specific_resource_id) REFERENCES Resources(integration_specific_id),
PRIMARY KEY (account_id, integration_specific_entitlement_id, integration_specific_resource_id)
)
""")
# ActivityLog table
self.cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'ActivityLog')
CREATE TABLE ActivityLog (
id BIGINT IDENTITY(1,1) PRIMARY KEY,
account_id UNIQUEIDENTIFIER,
event_type NVARCHAR(50) NOT NULL,
happened_at DATETIME NOT NULL,
FOREIGN KEY (account_id) REFERENCES Accounts(integration_specific_id)
)
""")
self.conn.commit()
def generate_resources(self):
print("Generating resources...")
resources = []
departments = ['Engineering', 'HR', 'Finance', 'Marketing', 'Sales', 'IT', 'Operations']
locations = ['US', 'EU', 'APAC', 'LATAM']
for _ in tqdm.tqdm(range(self.num_resources)):
resource_id = str(uuid.uuid4())
resource_type = random.choices(list(self.resource_type_dist.keys()),
weights=list(self.resource_type_dist.values()))[0]
if resource_type == 'Site':
label = f"{random.choice(locations)} {random.choice(['Production', 'Development', 'Staging', 'Test'])} Site"
else:
label = f"{random.choice(departments)} {random.choice(['Team', 'Project', 'Division'])} Workspace"
resources.append((resource_id, label, resource_type))
self.cursor.executemany("""
INSERT INTO Resources (integration_specific_id, label, resource_type)
VALUES (?, ?, ?)
""", resources)
self.conn.commit()
return resources
def generate_entitlements(self, resources):
print("Generating entitlements...")
entitlements = []
resource_ids = [r[0] for r in resources]
for _ in tqdm.tqdm(range(self.num_entitlements)):
entitlement_id = str(uuid.uuid4())
resource_id = random.choice(resource_ids)
entitlement_type = random.choices(list(self.entitlement_type_dist.keys()),
weights=list(self.entitlement_type_dist.values()))[0]
label = f"{entitlement_type} {self.fake.job()}"
entitlements.append((entitlement_id, resource_id, entitlement_type, 1, label))
self.cursor.executemany("""
INSERT INTO Entitlements (
integration_specific_id, integration_specific_resource_id,
entitlement_type, is_assignable, label
)
VALUES (?, ?, ?, ?, ?)
""", entitlements)
self.conn.commit()
return entitlements
def generate_accounts(self):
print("Generating accounts...")
accounts = []
for i in tqdm.tqdm(range(self.num_users)):
account_id = str(uuid.uuid4())
account_type = random.choices(list(self.account_type_dist.keys()),
weights=list(self.account_type_dist.values()))[0]
if account_type == 'SERVICE':
service_name = f"svc_{self.fake.slug()}_{i}"
email = f"{service_name}@example.com"
status = random.choices(list(self.user_status_dist.keys()),
weights=list(self.user_status_dist.values()))[0]
accounts.append((
account_id, email, 'Service', 'Account',
service_name, status, account_type
))
else:
first_name = self.fake.first_name()
last_name = self.fake.last_name()
username = f"{first_name[0].lower()}{last_name.lower()}{i:04d}"
email = f"{username}@example.com"
status = random.choices(list(self.user_status_dist.keys()),
weights=list(self.user_status_dist.values()))[0]
accounts.append((
account_id, email, first_name, last_name,
username, status, account_type
))
self.cursor.executemany("""
INSERT INTO Accounts (
integration_specific_id, email, given_name, family_name,
username, user_status, account_type
)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", accounts)
self.conn.commit()
return accounts
def generate_entitlement_associations(self, accounts, entitlements):
print("Generating entitlement associations...")
associations = []
active_account_ids = [a[0] for a in accounts if a[5] in ['ACTIVE', 'INACTIVE', 'SUSPENDED', 'PENDING']]
for account_id in tqdm.tqdm(active_account_ids):
num_entitlements = random.randint(2, 5)
selected_entitlements = random.sample(entitlements, min(num_entitlements, len(entitlements)))
for entitlement in selected_entitlements:
associations.append((
account_id,
entitlement[0], # entitlement_id
entitlement[1] # resource_id
))
self.cursor.executemany("""
INSERT INTO EntitlementAssociations (
account_id, integration_specific_entitlement_id,
integration_specific_resource_id
)
VALUES (?, ?, ?)
""", associations)
self.conn.commit()
return associations
def generate_activity_logs(self, accounts):
print("Generating activity logs...")
activity_logs = []
event_types = ['LOGIN', 'LOGOUT', 'PASSWORD_CHANGE', 'RESOURCE_ACCESS',
'FAILED_LOGIN_ATTEMPT', 'PROFILE_UPDATE', 'API_ACCESS']
start_date = datetime.now() - timedelta(days=30)
for account in tqdm.tqdm(accounts):
if account[5] != 'DELETED': # Only generate logs for non-deleted accounts
account_id = account[0]
num_events = random.randint(3, 10)
for _ in range(num_events):
event_time = start_date + timedelta(
days=random.randint(0, 30),
hours=random.randint(0, 23),
minutes=random.randint(0, 59)
)
event_type = random.choice(event_types)
activity_logs.append((account_id, event_type, event_time))
self.cursor.executemany("""
INSERT INTO ActivityLog (account_id, event_type, happened_at)
VALUES (?, ?, ?)
""", activity_logs)
self.conn.commit()
def generate_all(self):
"""Generate all data in the correct order"""
try:
# Create tables first
self.create_tables()
# Clear existing data
tables = ['ActivityLog', 'EntitlementAssociations', 'Entitlements', 'Resources', 'Accounts']
for table in tables:
self.cursor.execute(f"DELETE FROM {table}")
self.conn.commit()
# Generate new data
resources = self.generate_resources()
entitlements = self.generate_entitlements(resources)
accounts = self.generate_accounts()
self.generate_entitlement_associations(accounts, entitlements)
self.generate_activity_logs(accounts)
print("\nData generation complete!")
# Print summary
for table in reversed(tables):
self.cursor.execute(f"SELECT COUNT(*) FROM {table}")
print(f"Total {table}: {self.cursor.fetchone()[0]}")
except Exception as e:
self.conn.rollback()
print(f"Error during data generation: {str(e)}")
raise
finally:
self.cursor.close()
self.conn.close()
def create_connection_string(server: str, database: str, uid: Optional[str] = None,
pwd: Optional[str] = None) -> str:
"""Create a connection string from parameters"""
conn_str = [
"DRIVER={SQL Server}",
f"SERVER={server}",
f"DATABASE={database}",
"TrustServerCertificate=yes"
]
if uid and pwd:
conn_str.extend([f"UID={uid}", f"PWD={pwd}"])
else:
conn_str.append("Trusted_Connection=yes")
return ";".join(conn_str)
def parse_args():
parser = argparse.ArgumentParser(description='Generate sample data for Lumos database')
# Database connection parameters
parser.add_argument('--server', required=True, help='SQL Server instance name')
parser.add_argument('--database', required=True, help='Database name')
parser.add_argument('--uid', help='Database user ID (optional if using Windows auth)')
parser.add_argument('--pwd', help='Database password (optional if using Windows auth)')
# Alternative: use environment variables for credentials
parser.add_argument('--use-env', action='store_true',
help='Use LUMOS_DB_UID and LUMOS_DB_PWD environment variables for credentials')
# Data generation parameters
parser.add_argument('--num-users', type=int, default=200,
help='Number of users to generate (default: 200)')
parser.add_argument('--num-resources', type=int, default=10,
help='Number of resources to generate (default: 10)')
parser.add_argument('--num-entitlements', type=int, default=20,
help='Number of entitlements to generate (default: 20)')
return parser.parse_args()
def main():
args = parse_args()
# Handle credentials
uid = None
pwd = None
if args.use_env:
uid = os.environ.get('LUMOS_DB_UID')
pwd = os.environ.get('LUMOS_DB_PWD')
if not (uid and pwd):
raise ValueError("When using --use-env, LUMOS_DB_UID and LUMOS_DB_PWD "
"environment variables must be set")
else:
uid = args.uid
pwd = args.pwd
try:
# Create connection string
connection_string = create_connection_string(
server=args.server,
database=args.database,
uid=uid,
pwd=pwd
)
# Initialize and run generator
generator = LumosDataGenerator(
connection_string=connection_string,
num_users=args.num_users,
num_resources=args.num_resources,
num_entitlements=args.num_entitlements
)
generator.generate_all()
except pyodbc.Error as e:
print(f"Database error occurred: {e}")
exit(1)
except Exception as e:
print(f"An error occurred: {str(e)}")
exit(1)
if __name__ == "__main__":
main()
Lumos JDBC Connector Setup
Setup a new JDBC integration in Lumos for SQL Server with following configuration and queries for read/write capabilties:
JDBC Connection String
jdbc:sqlserver://sql.example.com:1433;databaseName=LumosDemo;encrypt=true;trustServerCertificate=true
Read Capabilities
Validate Credentials
SELECT TOP 1 integration_specific_id as integration_specific_id, email as email, given_name as given_name, family_name as family_name, username as username, user_status as user_status, account_type as account_type, created_at as custom_created_date FROM Accounts WHERE user_status != 'DELETED';
List Accounts
SELECT integration_specific_id as integration_specific_id, email as email, given_name as given_name, family_name as family_name, username as username, user_status as user_status, account_type as account_type, created_at as custom_created_date FROM Accounts WHERE user_status IN ('ACTIVE', 'INACTIVE', 'SUSPENDED', 'PENDING') ORDER BY email;
List Entitlements
SELECT entitlement_type AS entitlement_type, integration_specific_id AS integration_specific_id, integration_specific_resource_id AS integration_specific_resource_id, is_assignable AS is_assignable, label AS label FROM Entitlements WHERE entitlement_type IS NOT NULL AND entitlement_type <> '' AND label IS NOT NULL AND label <> '' AND integration_specific_id IS NOT NULL AND integration_specific_resource_id IS NOT NULL AND is_assignable IS NOT NULL;
Find Entitlement Associations
SELECT a.integration_specific_id as account_id, e.integration_specific_id as integration_specific_entitlement_id, ea.integration_specific_resource_id as integration_specific_resource_id FROM EntitlementAssociations ea JOIN Accounts a ON ea.account_id = a.integration_specific_id JOIN Entitlements e ON ea.integration_specific_entitlement_id = e.integration_specific_id WHERE a.user_status != 'DELETED';
Get Last Activity
SELECT account_id as account_id, event_type as event_type, happened_at as happened_at FROM ActivityLog al JOIN Accounts a ON al.account_id = a.integration_specific_id WHERE a.user_status != 'DELETED' ORDER BY happened_at DESC;
List Resources
SELECT integration_specific_id as integration_specific_id, label as label, resource_type as resource_type FROM Resources;
Write Capabilities
Create Account
INSERT INTO Accounts (integration_specific_id, email, username, given_name, family_name, user_status, account_type) VALUES (NEWID(), {email}, {username}, {given_name}, {family_name}, {user_status}, 'USER');
Activate Account
UPDATE Accounts SET user_status = 'ACTIVE' WHERE integration_specific_id = {account_id};
Deactivate Account
UPDATE Accounts SET user_status = 'INACTIVE' WHERE integration_specific_id = {account_id};
Delete Account
UPDATE Accounts SET user_status = 'DELETED' WHERE integration_specific_id = {account_id};
Assign Entitlement
INSERT INTO EntitlementAssociations (account_id, integration_specific_entitlement_id, integration_specific_resource_id) VALUES ({account_integration_specific_id}, {entitlement_integration_specific_id}, {resource_integration_specific_id});
Unassign Entitlement
DELETE FROM EntitlementAssociations WHERE account_id = {account_integration_specific_id} AND integration_specific_entitlement_id = {entitlement_integration_specific_id} AND integration_specific_resource_id = {resource_integration_specific_id};
Conclusion
Wait for a synchronization to complete. Once the data has been ingested, use-cases like Access Review automatic entitlement removal can be tested.
Updated 15 days ago