#================================================================================#
# Cartonnage has been tested on the following database versions:
#
# SQLite3 version 3.51.0
# Oracle 19c, 23ai, and 26ai
# MySQL 8.0.35
# PostgreSQL 17.7
# MicrosoftAzureSQL NA
#================================================================================#
import sqlite3
# Create database connection
connection = sqlite3.connect('hr.db', check_same_thread=True)
# Set the database adapter for Record class
# SQLite3 can be replaced by Oracle, MySQL, Postgres, or MicrosoftSQL
Record.database__ = SQLite3(connection)
# Transaction control - commit or rollback manually
Record.database__.commit() # Commit changes
Record.database__.rollback() # Rollback changes
Record.database__.close() # Close connection
# No SQL generated - connection setup only
# Supported databases:
# - SQLite3(connection)
# - Oracle(connection)
# - MySQL(connection)
# - Postgres(connection)
# - MicrosoftSQL(connection)
# No parameters - connection setup
# Database connection established
# Transaction control available via:
# - Record.database__.commit()
# - Record.database__.rollback()
# - Record.database__.close()
# Table Record classes
class Regions(Record): pass
class Countries(Record): pass
class Departments(Record): pass
class Jobs(Record): pass
class Job_History(Record): pass
class Locations(Record): pass
class Employees(Record): pass
class Dependents(Record): pass
# Alias classes for self-joins and subqueries
class RegionsAlias(Regions): pass
class CountriesAlias(Countries): pass
class DepartmentsAlias(Departments): pass
class JobsAlias(Jobs): pass
class Job_HistoryAlias(Job_History): pass
class LocationsAlias(Locations): pass
class EmployeesAlias(Employees): pass
class DependentsAlias(Dependents): pass
# Self-join alias for employee-manager relationship
class Managers(Employees): pass
# No SQL generated - class definitions only
# Classes map to tables:
# Regions -> regions
# Countries -> countries
# Departments -> departments
# Jobs -> jobs
# Job_History -> job_history
# Locations -> locations
# Employees -> employees
# Dependents -> dependents
# Managers -> employees (alias)
# No parameters - class definitions
# Classes are now available for ORM operations:
# - Direct table access (Employees, Jobs, etc.)
# - Alias classes for subqueries (EmployeesAlias, etc.)
# - Self-join support (Managers extends Employees)
# filter using Record.value(field=value)
employee = Employees().value(employee_id=100).select()
assert employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
SELECT * FROM Employees WHERE employee_id = ?
[100]
employee.data == {
'employee_id': 100,
'first_name': 'Steven',
'last_name': 'King',
'email': 'steven.king@sqltutorial.org',
'phone_number': '515.123.4567',
'hire_date': '1987-06-17',
'job_id': 4,
'salary': 24000,
'commission_pct': None,
'manager_id': None,
'department_id': 9
}
# filter using Record.where(expression)
employee = Employees().where(Employees.employee_id == 100).select()
assert employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
SELECT * FROM Employees WHERE employee_id = ?
[100]
employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
# join two tables
employee = (
Employees()
.join(Dependents, (Employees.employee_id == Dependents.employee_id))
.where(Dependents.first_name == "Jennifer")
.select(selected="Employees.*")
)
assert employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
SELECT Employees.* FROM Employees
JOIN Dependents ON Employees.employee_id = Dependents.employee_id
WHERE Dependents.first_name = ?
['Jennifer']
employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
# join same table (self-join)
employee = (
Employees()
.join(Managers, (Employees.manager_id == Managers.employee_id))
.where(Managers.first_name == 'Lex')
.select(selected='Employees.*, Managers.employee_id AS "manager_employee_id", Managers.email AS "manager_email"')
)
assert employee.data == {'employee_id': 103, 'first_name': 'Alexander', ...}
SELECT Employees.*, Managers.employee_id AS "manager_employee_id", Managers.email AS "manager_email"
FROM Employees
JOIN Employees AS Managers ON Employees.manager_id = Managers.employee_id
WHERE Managers.first_name = ?
['Lex']
employee.data == {
'employee_id': 103,
'first_name': 'Alexander',
'manager_employee_id': 102,
'manager_email': 'lex.de haan@sqltutorial.org'
}
# filter using like() - two different filteration methods
employee = (
Employees()
.like(first_name = 'Stev%') # calls internally Record.filter_.like()
.where(Employees.first_name.like('Stev%')) # calls internally Record.filter_.where()
.select()
)
assert employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
SELECT * FROM Employees
WHERE first_name LIKE ?
AND first_name LIKE ?
['Stev%', 'Stev%']
employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
# filter using is_null() - two different filteration methods
employee = (
Employees()
.is_null(manager_id = None) # calls internally Record.filter_.is_null()
.where(Employees.manager_id.is_null()) # calls internally Record.filter_.where()
.select()
)
assert employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
SELECT * FROM Employees
WHERE manager_id IS NULL
AND manager_id IS NULL
[]
employee.data == {'employee_id': 100, 'first_name': 'Steven', 'manager_id': None, ...}
# filter using is_not_null() - two different filteration methods
employee = (
Employees()
.where(employee_id = 101) # kwargs filter
.is_not_null(manager_id = None) # calls internally Record.filter_.is_not_null()
.where(Employees.manager_id.is_not_null()) # calls internally Record.filter_.where()
.select()
)
assert employee.data == {'employee_id': 101, 'first_name': 'Neena', ...}
SELECT * FROM Employees
WHERE employee_id = ?
AND manager_id IS NOT NULL
AND manager_id IS NOT NULL
[101]
employee.data == {'employee_id': 101, 'first_name': 'Neena', 'manager_id': 100, ...}
# filter using in_() - two different filteration methods
employee = (
Employees()
.in_(employee_id = [100]) # calls internally Record.filter_.in_()
.where(Employees.employee_id.in_([100])) # calls internally Record.filter_.where()
.select()
)
assert employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
SELECT * FROM Employees
WHERE employee_id IN (?)
AND employee_id IN (?)
[100, 100]
employee.data == {'employee_id': 100, 'first_name': 'Steven', ...}
# filter using not_in() - two different filteration methods
employee = (
Employees()
.where(Employees.first_name.like('Alex%'))
.not_in(employee_id = [115]) # calls internally Record.filter_.not_in()
.where(Employees.employee_id.not_in([115])) # calls internally Record.filter_.where()
.select()
)
assert employee.data == {'employee_id': 103, 'first_name': 'Alexander', ...}
SELECT * FROM Employees
WHERE first_name LIKE ?
AND employee_id NOT IN (?)
AND employee_id NOT IN (?)
['Alex%', 115, 115]
employee.data == {'employee_id': 103, 'first_name': 'Alexander', ...}
# filter using between() - two different filteration methods
employee = (
Employees()
.between(employee_id = (100, 101)) # calls internally Record.filter_.between()
.where(Employees.employee_id.between(100, 101)) # calls internally Record.filter_.where()
.select()
)
assert employee.recordset.data == [{'employee_id': 100, ...}, {'employee_id': 101, ...}]
SELECT * FROM Employees
WHERE employee_id BETWEEN ? AND ?
AND employee_id BETWEEN ? AND ?
[100, 101, 100, 101]
employee.recordset.data == [
{'employee_id': 100, 'first_name': 'Steven', ...},
{'employee_id': 101, 'first_name': 'Neena', ...}
]
# filter using gt(), ge(), le(), and lt() - two different filteration methods
# use filter chaining and & expressions
employee = (
Employees()
.gt(employee_id = 99).ge(employee_id = 100).le(employee_id = 101).lt(employee_id = 102)
.where(
(Employees.employee_id > 99) &
(Employees.employee_id >= 100) &
(Employees.employee_id <= 101) &
(Employees.employee_id < 102)
) # calls internally Record.filter_.where()
.select()
)
assert employee.recordset.data == [{'employee_id': 100, ...}, {'employee_id': 101, ...}]
SELECT * FROM Employees
WHERE employee_id > ?
AND employee_id >= ?
AND employee_id <= ?
AND employee_id < ?
AND employee_id > ?
AND employee_id >= ?
AND employee_id <= ?
AND employee_id < ?
[99, 100, 101, 102, 99, 100, 101, 102]
employee.recordset.data == [
{'employee_id': 100, 'first_name': 'Steven', ...},
{'employee_id': 101, 'first_name': 'Neena', ...}
]
# & | filters and expressions
f1 = (
((Employees.employee_id == 100) & (Employees.first_name == "Steven")) |
((Employees.employee_id == 101) & (Employees.first_name == "Neena"))
)
f2 = (
((Employees.employee_id == 102) & (Employees.first_name == "Lex")) |
((Employees.employee_id == 103) & (Employees.first_name == "Alexander"))
)
employee = (
Employees()
.where(f1 | f2)
.select()
)
assert len(employee.recordset.data) == 4
SELECT * FROM Employees
WHERE (
((employee_id = ? AND first_name = ?) OR (employee_id = ? AND first_name = ?))
OR
((employee_id = ? AND first_name = ?) OR (employee_id = ? AND first_name = ?))
)
[100, 'Steven', 101, 'Neena', 102, 'Lex', 103, 'Alexander']
employee.recordset.data == [
{'employee_id': 100, 'first_name': 'Steven', ...},
{'employee_id': 101, 'first_name': 'Neena', ...},
{'employee_id': 102, 'first_name': 'Lex', ...},
{'employee_id': 103, 'first_name': 'Alexander', ...}
]
# filter using multiple kwargs
employee = (
Employees()
.like(first_name = 'Stev%', last_name = 'Ki%') # calls internally Record.filter_.like()
.select()
)
assert employee.data == {'employee_id': 100, 'first_name': 'Steven', 'last_name': 'King', ...}
SELECT * FROM Employees
WHERE first_name LIKE ?
AND last_name LIKE ?
['Stev%', 'Ki%']
employee.data == {'employee_id': 100, 'first_name': 'Steven', 'last_name': 'King', ...}
# select all and get recordset [all records] count, convert them to list of Dictionaries/lists
reg = Regions().all()
assert reg.recordset.count() == 4
assert reg.columns == ['region_id', 'region_name']
assert reg.recordset.toDicts() == [{'region_id': 1, 'region_name': 'Europe'}, ...]
assert reg.recordset.toLists() == [[1, 'Europe'], [2, 'Americas'], [3, 'Asia'], [4, 'Middle East and Africa']]
SELECT * FROM Regions
[]
reg.recordset.count() == 4
reg.columns == ['region_id', 'region_name']
reg.recordset.toLists() == [
[1, 'Europe'],
[2, 'Americas'],
[3, 'Asia'],
[4, 'Middle East and Africa']
]
# insert single record
emp1 = Employees()
emp1.data = {'employee_id': 19950519, 'first_name': 'William', 'last_name': 'Wallace', 'email': None, 'phone_number': '555.666.777'}
emp1.insert()
# assert emp1.rowsCount() == 1 # confirm number of inserted rows
INSERT INTO Employees (employee_id, first_name, last_name, email, phone_number) VALUES (?, ?, ?, ?, ?)
[19950519, 'William', 'Wallace', None, '555.666.777']
# Record inserted successfully
# update single record
emp1 = (
Employees()
.value(employee_id=19950519)
.set(email='william.wallace@sqltutorial.org', phone_number=None)
.update()
)
# assert emp1.rowsCount() == 1 # confirm number of updated rows
UPDATE Employees SET email = ?, phone_number = ? WHERE employee_id = ?
['william.wallace@sqltutorial.org', None, 19950519]
# Record updated successfully
# check updated record
emp1 = Employees()
emp1.value(employee_id=19950519).select()
assert emp1.data == {
'employee_id': 19950519,
'first_name': 'William',
'last_name': 'Wallace',
'email': 'william.wallace@sqltutorial.org',
'phone_number': None,
...
}
SELECT * FROM Employees WHERE employee_id = ?
[19950519]
emp1.data == {
'employee_id': 19950519,
'first_name': 'William',
'last_name': 'Wallace',
'email': 'william.wallace@sqltutorial.org',
'phone_number': None,
'hire_date': None,
'job_id': None,
'salary': None,
'commission_pct': None,
'manager_id': None,
'department_id': None
}
# delete by exists and subquery
emp1 = (
EmployeesAlias()
.where(
(Dependents.employee_id == EmployeesAlias.employee_id) &
(EmployeesAlias.email == 'william.gietz@sqltutorial.org') &
(EmployeesAlias.employee_id.in_([206]))
)
)
emp2 = Employees().where(Employees.manager_id == 205)
dep1 = (
Dependents()
.exists(_ = emp1)
.in_subquery(employee_id = emp2, selected="employee_id")
.where(Dependents._.exists(emp1))
.where(Dependents.employee_id.in_subquery(emp2, selected="employee_id"))
.delete()
)
assert dep1.rowsCount() == 1
# Verify deletion
dep1 = Dependents()
dep1.where(Dependents.employee_id == 206).select()
assert dep1.data == {}
DELETE FROM Dependents
WHERE EXISTS (
SELECT 1 FROM Employees
WHERE Dependents.employee_id = Employees.employee_id
AND email = ?
AND employee_id IN (?)
)
AND employee_id IN (
SELECT employee_id FROM Employees WHERE manager_id = ?
)
['william.gietz@sqltutorial.org', 206, 205]
dep1.rowsCount() == 1
# Verify deletion
dep1 = Dependents()
dep1.where(Dependents.employee_id == 206).select()
assert dep1.data == {}
# filter and fetchmany records into recordset
jobs = Jobs().where(Jobs.job_title.like('%Accountant%')).select()
assert jobs.recordset.count() == 2
assert jobs.columns == ['job_id', 'job_title', 'min_salary', 'max_salary']
assert jobs.recordset.toLists() == [[1, 'Public Accountant', 4200, 9000], [6, 'Accountant', 4200, 9000]]
# UPDATE: Iterate and set new min_salary for each job (not recommended if you can update with one predicate)
for job in jobs:
job.set(min_salary=4500)
jobs.recordset.set(min_salary=5000) # set on all records in recordset
for job in jobs.recordset: # iterate over recordset
assert job.job_id in [1, 6], job.job_id
jobs.recordset.update()
recordsetLen = len(jobs.recordset) # get len() of a Recordset
assert recordsetLen == 2
# If your recordset's records have a Null value, set the onColumns parameter:
# jobs.recordset.update(onColumns=['job_id'])
# Verify update
jobs = Jobs().where(Jobs.job_title.like('%Accountant%')).select()
assert jobs.recordset.toLists() == [[1, 'Public Accountant', 5000, 9000], [6, 'Accountant', 5000, 9000]]
secondJobInRecordset = jobs.recordset[1] # access record instance by index
assert secondJobInRecordset.job_id == 6
# DELETE all filtered records
jobs.recordset.delete()
# jobs.recordset.delete(onColumns=['job_id']) # if unsure about Null values
# Verify deletion
jobs = Jobs().where(Jobs.job_title.like('%Accountant%')).select()
assert jobs.recordset.toLists() == []
# SELECT
SELECT * FROM Jobs WHERE job_title LIKE ?
# UPDATE (for each record)
UPDATE Jobs SET min_salary = ? WHERE job_id = ? AND job_title = ? AND min_salary = ? AND max_salary = ?
# DELETE (for each record)
DELETE FROM Jobs WHERE job_id = ? AND job_title = ? AND min_salary = ? AND max_salary = ?
# SELECT
['%Accountant%']
# UPDATE
[5000, 1, 'Public Accountant', 4200, 9000], [5000, 6, 'Accountant', 4200, 9000]
# DELETE
[1, 'Public Accountant', 5000, 9000], [6, 'Accountant', 5000, 9000]
# After SELECT
jobs.recordset.count() == 2
jobs.recordset.toLists() == [[1, 'Public Accountant', 4200, 9000], [6, 'Accountant', 4200, 9000]]
# After UPDATE
jobs.recordset.toLists() == [[1, 'Public Accountant', 5000, 9000], [6, 'Accountant', 5000, 9000]]
# After DELETE
jobs.recordset.toLists() == []
# recordset from list of dicts
recordset = Recordset.fromDicts(Employees,
[
{'employee_id': 5, 'first_name': "Mickey", 'last_name': "Mouse"},
{'employee_id': 6, 'first_name': "Donald", 'last_name': "Duck"}
]
)
assert recordset.data == [{'employee_id': 5, 'first_name': 'Mickey', 'last_name': 'Mouse'}, ...]
# OR: instantiate recordset manually
recordset = Recordset()
# create employee(s)/record(s) instances using value()
e1 = Employees().value(employee_id=5, first_name="Mickey", last_name="Mouse")
e2 = Employees().value(employee_id=6, first_name="Donald", last_name="Duck")
# add employee(s)/record(s) instances to the previously instantiated Recordset
recordset.add(e1, e2)
# Recordset INSERT:
recordset.insert()
assert recordset.rowsCount() == 2 # not work for Azure/MicrosoftSQL
# Set update values
e1.set(manager_id=77)
e2.set(manager_id=88)
e1.where(Employees.employee_id > 4) # add general condition to all records
# Recordset UPDATE:
recordset.update()
assert recordset.rowsCount() == 2
# Recordset DELETE:
e1.where(Employees.manager_id < 100) # add general condition
recordset.delete()
assert recordset.rowsCount() == 2
# Batch INSERT
INSERT INTO Employees (employee_id, first_name, last_name) VALUES (?, ?, ?)
INSERT INTO Employees (employee_id, first_name, last_name) VALUES (?, ?, ?)
# Batch UPDATE (with general condition)
UPDATE Employees SET manager_id = ? WHERE employee_id = ? AND employee_id > ?
UPDATE Employees SET manager_id = ? WHERE employee_id = ? AND employee_id > ?
# Batch DELETE (with general condition)
DELETE FROM Employees WHERE employee_id = ? AND manager_id < ?
DELETE FROM Employees WHERE employee_id = ? AND manager_id < ?
# INSERT
[5, 'Mickey', 'Mouse'], [6, 'Donald', 'Duck']
# UPDATE
[77, 5, 4], [88, 6, 4]
# DELETE
[5, 100], [6, 100]
# After INSERT
recordset.rowsCount() == 2
# After UPDATE
recordset.rowsCount() == 2
# Verify: employees with manager_id 77, 88 exist
# After DELETE
recordset.rowsCount() == 2
# Verify: employees with manager_id 77, 88 no longer exist
# Execute raw sql statement and get recordset of the returned rows
records = Record(statement="SELECT * FROM Employees WHERE employee_id IN(100, 101, 102) ", operation=Database.select)
assert records.recordset.count() == 3
for record in records:
assert record.employee_id in [100, 101, 102]
SELECT * FROM Employees WHERE employee_id IN(100, 101, 102)
# No parameters - raw SQL
records.recordset.count() == 3
# Each record has employee_id in [100, 101, 102]
# Execute parameterized sql statement and get recordset of the returned rows
placeholder = Record.database__.placeholder()
records = Record(
statement=f"SELECT * FROM Employees WHERE employee_id IN({placeholder}, {placeholder}, {placeholder})",
parameters=(100, 101, 102),
operation=Database.select
)
assert records.recordset.count() == 3
for record in records:
assert record.employee_id in [100, 101, 102]
SELECT * FROM Employees WHERE employee_id IN(?, ?, ?)
[100, 101, 102]
records.recordset.count() == 3
# Each record has employee_id in [100, 101, 102]
# instantiating Class's instance with fields' values
employee = Employees(statement=None, parameters=None, employee_id=1000, first_name="Super", last_name="Man", operation=Database.select)
employee.insert()
# Verify insertion
employee = Employees().value(employee_id=1000).select()
assert employee.data == {
'employee_id': 1000,
'first_name': 'Super',
'last_name': 'Man',
'email': None,
...
}
INSERT INTO Employees (employee_id, first_name, last_name) VALUES (?, ?, ?)
[1000, 'Super', 'Man']
employee.data == {
'employee_id': 1000,
'first_name': 'Super',
'last_name': 'Man',
'email': None,
'phone_number': None,
'hire_date': None,
'job_id': None,
'salary': None,
'commission_pct': None,
'manager_id': None,
'department_id': None
}
# group_by with HAVING clause
employees = Employees().select(
selected='manager_id, count(1) AS "count"',
group_by='manager_id HAVING count(1) > 4',
order_by='manager_id ASC'
)
assert employees.recordset.data == [
{'manager_id': 100, 'count': 14},
{'manager_id': 101, 'count': 5},
{'manager_id': 108, 'count': 5},
{'manager_id': 114, 'count': 5}
]
SELECT manager_id, count(1) AS "count"
FROM Employees
GROUP BY manager_id HAVING count(1) > 4
ORDER BY manager_id ASC
[]
employees.recordset.data == [
{'manager_id': 100, 'count': 14},
{'manager_id': 101, 'count': 5},
{'manager_id': 108, 'count': 5},
{'manager_id': 114, 'count': 5}
]
emp = (
Employees()
.set(
first_name=Expression("UPPER(first_name)"),
last_name=Expression("LOWER(last_name)"),
# UPPER and LOWER are used because:
# SQLite3, Oracle, and MySQL use new values uppered and lowered before concat.
# Postgres and MySQL uses original value before UPPER and LOWER.
email=Expression("UPPER(first_name) || '.' || LOWER(last_name) || '@test.com'"),
salary=Employees.salary + 100, # Arithmetic using field reference
commission_pct=Expression('COALESCE(commission_pct, 1)')
)
.value(employee_id=100)
.where(
(Employees.salary == Expression('salary')) &
(Employees.salary == Expression('salary + 0'))
)
.update()
.select(selected="first_name, last_name, email, salary, commission_pct")
)
emp.data['salary'] = int(emp.data['salary'])
assert emp.data == {
'first_name': 'STEVEN',
'last_name': 'king',
'email': 'STEVEN.king@test.com',
'salary': 24100,
'commission_pct': 1
}
UPDATE Employees
SET first_name = UPPER(first_name),
last_name = LOWER(last_name),
email = UPPER(first_name) || '.' || LOWER(last_name) || '@test.com',
salary = salary + ?,
commission_pct = COALESCE(commission_pct, 1)
WHERE employee_id = ?
AND salary = salary
AND salary = salary + 0
[100, 100]
emp.data == {
'first_name': 'STEVEN',
'last_name': 'king',
'email': 'STEVEN.king@test.com',
'salary': 24100,
'commission_pct': 1
}
# Create employees to upsert
emp1 = Employees().set(**{'employee_id': 100, 'first_name': 'Ahmed', 'salary': 4000})
emp2 = Employees().set(**{'employee_id': 101, 'first_name': 'Kamal', 'salary': 5000})
# Create recordset
rs = Recordset()
rs.add(emp1, emp2)
# Add conditional filter for upsert (SQLite3/Postgres syntax)
# Only update if new salary is less than existing salary
emp = Employees().where(Employees.first_name.in_(['Steven', 'Neena']))
emp1.where(
(EXCLUDED.salary < Employees.salary) &
(Employees.last_name.in_subquery(emp, selected='last_name'))
)
# Perform upsert on employee_id conflict
rs.upsert(onColumns='employee_id')
# Verify results
emp = Employees().where(Employees.employee_id.in_([100, 101])).select()
assert emp.recordset.data[0]['first_name'] == 'Ahmed'
assert emp.recordset.data[1]['first_name'] == 'Kamal'
# SQLite3/Postgres:
INSERT INTO Employees (employee_id, first_name, salary)
VALUES (?, ?, ?)
ON CONFLICT (employee_id) DO UPDATE SET
first_name = EXCLUDED.first_name,
salary = EXCLUDED.salary
WHERE EXCLUDED.salary < Employees.salary
AND Employees.last_name IN (SELECT last_name FROM Employees WHERE first_name IN (?, ?))
# Oracle/MicrosoftSQL uses MERGE statement:
MERGE INTO Employees T
USING (SELECT ? AS employee_id, ? AS first_name, ? AS salary) S
ON (T.employee_id = S.employee_id)
WHEN MATCHED AND S.salary < T.salary AND T.last_name IN (...) THEN
UPDATE SET first_name = S.first_name, salary = S.salary
WHEN NOT MATCHED THEN
INSERT (employee_id, first_name, salary) VALUES (S.employee_id, S.first_name, S.salary)
[100, 'Ahmed', 4000, 'Steven', 'Neena']
# Employees updated with conditional logic:
# - employee_id 100: first_name = 'Ahmed', salary = 4000
# - employee_id 101: first_name = 'Kamal', salary = 5000
# Only updated where condition was met
# Create recordset with employees
recordset = Recordset.fromDicts(Employees,
[
{'employee_id': 5, 'first_name': "Mickey", 'last_name': "Mouse"},
{'employee_id': 6, 'first_name': "Donald", 'last_name': "Duck"}
]
)
# Instantiate a session with a database instance
session = Session(Record.database__)
# Set Recordset insert query to current Session
session.set(recordset.insert_())
# Update all the Recordset's records with the same phone number
recordset.set(phone_number='+201011223344')
# Set Recordset update query to current Session
session.set(recordset.update_())
# Commit current session
session.commit()
# Verify inserted and updated records
insertedEmployeesAfterUpdate = (
Employees().where(Employees.employee_id.in_([5, 6]))
.select(selected="employee_id, first_name, last_name, phone_number")
)
assert insertedEmployeesAfterUpdate.recordset.data == [
{'employee_id': 5, 'first_name': 'Mickey', 'last_name': 'Mouse', 'phone_number': '+201011223344'},
{'employee_id': 6, 'first_name': 'Donald', 'last_name': 'Duck', 'phone_number': '+201011223344'}
]
# Set Recordset delete query to current Session
session.set(recordset.delete_(onColumns=["employee_id"]))
# Commit the session
session.commit()
# Savepoint operations
session.savepoint("sp1")
session.set(recordset.insert_())
session.rollbackTo('sp1') # Rollback to savepoint
session.releaseSavepoint('sp1') # Release the savepoint
# INSERT (via session)
INSERT INTO Employees (employee_id, first_name, last_name) VALUES (?, ?, ?)
INSERT INTO Employees (employee_id, first_name, last_name) VALUES (?, ?, ?)
# UPDATE (via session)
UPDATE Employees SET phone_number = ? WHERE employee_id = ?
UPDATE Employees SET phone_number = ? WHERE employee_id = ?
# COMMIT
COMMIT
# DELETE (via session)
DELETE FROM Employees WHERE employee_id = ?
DELETE FROM Employees WHERE employee_id = ?
# SAVEPOINT operations
SAVEPOINT sp1
ROLLBACK TO sp1
RELEASE SAVEPOINT sp1
# INSERT
[5, 'Mickey', 'Mouse'], [6, 'Donald', 'Duck']
# UPDATE
['+201011223344', 5], ['+201011223344', 6]
# DELETE
[5], [6]
# After INSERT + UPDATE + COMMIT:
insertedEmployeesAfterUpdate.recordset.data == [
{'employee_id': 5, 'first_name': 'Mickey', 'last_name': 'Mouse', 'phone_number': '+201011223344'},
{'employee_id': 6, 'first_name': 'Donald', 'last_name': 'Duck', 'phone_number': '+201011223344'}
]
# Savepoint allows partial rollback within transaction
(
Employees()
.from_(Departments)
.where(
(Employees.employee_id == 100) &
(Employees.department_id == Departments.department_id)
)
.select(selected="Employees.*, Departments.department_name")
)
# Generates: SELECT ... FROM Employees, Departments WHERE ...
SELECT Employees.*, Departments.department_name
FROM Employees Employees, Departments Departments
WHERE Employees.employee_id = ?
AND Employees.department_id = Departments.department_id
[100]
# Returns employee with department name joined via WHERE clause
# Define alias class for the subquery
class EmployeesGt1000(Employees): pass
# Create subquery for employees with salary > 1000
sub = Employees().where(Employees.salary > 1000).select_().alias('EmployeesGt1000')
# Query using subquery as table source
(
Employees()
.from_(sub)
.where(Employees.employee_id == EmployeesGt1000.employee_id)
.select(selected="Employees.*, EmployeesGt1000.salary")
)
# Generates: SELECT ... FROM Employees, (SELECT * FROM Employees WHERE salary > 1000) AS EmployeesGt1000 WHERE ...
SELECT Employees.*, EmployeesGt1000.salary
FROM Employees Employees,
(SELECT * FROM Employees WHERE salary > ?) AS EmployeesGt1000
WHERE Employees.employee_id = EmployeesGt1000.employee_id
[1000]
# Returns employees matching the subquery condition
# Define alias classes for CTE
class Hierarchy(Record): pass # for recursive CTE
class P(Employees): pass
class C(Employees): pass
class ExecutivesDepartment(Departments): pass
class AdministrationJobs(Jobs): pass
hierarchy = Hierarchy() # used as subquery reference
# Non-recursive CTEs with materialization hints
cte1 = (
P()
.where(P.manager_id.is_null())
.cte(selected='/*+ INLINE */ employee_id, manager_id, first_name', materialization=None)
)
cte2 = (
C()
.join(Hierarchy, (C.manager_id == Hierarchy.employee_id))
.where(C.first_name == 'Neena')
.cte(selected='/*+ MATERIALIZE */ C.employee_id, C.manager_id, C.first_name', materialization=None)
)
cte3 = (
ExecutivesDepartment()
.where(ExecutivesDepartment.department_name == 'Executive')
.cte(selected='/*+ INLINE */ ExecutivesDepartment.*', materialization=None)
)
cte4 = (
AdministrationJobs()
.where(AdministrationJobs.job_title.like('Administration%'))
.cte(selected='/*+ MATERIALIZE */ AdministrationJobs.*', materialization=None)
)
# For SQLite3/Postgres, set materialization explicitly
cte1.materialization = False # NOT MATERIALIZED
cte2.materialization = True # MATERIALIZED
cte3.materialization = False
cte4.materialization = True
# Create recursive CTE using + (UNION ALL) or ^ (UNION)
recursive_cte = (cte1 + cte2) # + for UNION ALL
recursive_cte.alias = "Hierarchy" # Required alias for recursive CTE
recursive_cte.columnsAliases = "employee_id, manager_id, first_name"
# Build WITH clause
# RECURSIVE keyword required for MySQL/Postgres, optional for SQLite3
with_cte = WithCTE((cte3 >> cte4 >> recursive_cte), recursive=True)
# For Postgres 14+, add CYCLE or SEARCH options
# with_cte = WithCTE(..., recursive=True, options='SEARCH DEPTH FIRST BY employee_id SET ordercol')
# SQLite3/Postgres:
WITH RECURSIVE
ExecutivesDepartment AS NOT MATERIALIZED (
SELECT ExecutivesDepartment.* FROM Departments
WHERE department_name = ?
),
AdministrationJobs AS MATERIALIZED (
SELECT AdministrationJobs.* FROM Jobs
WHERE job_title LIKE ?
),
Hierarchy (employee_id, manager_id, first_name) AS NOT MATERIALIZED (
SELECT employee_id, manager_id, first_name FROM Employees
WHERE manager_id IS NULL
UNION ALL
SELECT C.employee_id, C.manager_id, C.first_name
FROM Employees C
JOIN Hierarchy ON C.manager_id = Hierarchy.employee_id
WHERE C.first_name = ?
)
['Executive', 'Administration%', 'Neena']
# CTE definitions ready for use with SELECT, UPDATE, or DELETE
emp = (
Employees()
.with_cte(with_cte)
.join(Hierarchy, (Employees.employee_id == Hierarchy.employee_id))
.join(ExecutivesDepartment, (Employees.department_id == ExecutivesDepartment.department_id))
.join(AdministrationJobs, (Employees.job_id == AdministrationJobs.job_id))
)
# For MicrosoftSQL, add MAXRECURSION option
if Record.database__.name == 'MicrosoftSQL':
emp.select(option='OPTION (MAXRECURSION 100)')
else:
emp.select()
for r in emp:
print(r.data)
WITH RECURSIVE
ExecutivesDepartment AS (...),
AdministrationJobs AS (...),
Hierarchy (employee_id, manager_id, first_name) AS (...)
SELECT *
FROM Employees
JOIN Hierarchy ON Employees.employee_id = Hierarchy.employee_id
JOIN ExecutivesDepartment ON Employees.department_id = ExecutivesDepartment.department_id
JOIN AdministrationJobs ON Employees.job_id = AdministrationJobs.job_id
# MicrosoftSQL:
... OPTION (MAXRECURSION 100)
[...]
# Returns employees matching the CTE hierarchy with department and job info
emp = (
Employees()
.with_cte(with_cte)
.where(Employees.employee_id.in_subquery(hierarchy, selected='employee_id'))
.set(salary=2000)
.update()
)
# SQLite3 returns -1 for complex operations
if Record.database__.name == 'SQLite3':
assert emp.rowsCount() == -1
else:
assert emp.rowsCount() == 2
WITH RECURSIVE
ExecutivesDepartment AS (...),
AdministrationJobs AS (...),
Hierarchy (employee_id, manager_id, first_name) AS (...)
UPDATE Employees
SET salary = ?
WHERE employee_id IN (SELECT employee_id FROM Hierarchy)
[2000]
# Updated 2 employees matching the hierarchy
# SQLite3 returns -1, other databases return actual count
emp = (
Employees()
.with_cte(with_cte)
.where(Employees.employee_id.in_subquery(hierarchy, selected='employee_id'))
.delete()
)
# SQLite3 returns -1 for complex operations
if Record.database__.name == 'SQLite3':
assert emp.rowsCount() == -1
else:
assert emp.rowsCount() == 2
WITH RECURSIVE
ExecutivesDepartment AS (...),
AdministrationJobs AS (...),
Hierarchy (employee_id, manager_id, first_name) AS (...)
DELETE FROM Employees
WHERE employee_id IN (SELECT employee_id FROM Hierarchy)
[]
# Deleted 2 employees matching the hierarchy
# SQLite3 returns -1, other databases return actual count
# Create recordset with selected employees
employees = Recordset()
emp1 = (
Employees()
.value(employee_id=100)
.select()
).set(last_name='Ahmed')
emp2 = (
Employees()
.value(employee_id=101)
.select()
).set(last_name='kamal')
employees.add(emp1, emp2)
# Add with_cte and filters to Recordset through its first Record instance
emp1.with_cte(with_cte).where(
Employees.employee_id.in_subquery(hierarchy, selected='employee_id')
)
# Update using onColumns for null-safe matching
employees.update(onColumns=["employee_id"])
# Check affected rows
if Record.database__.name in ["SQLite3", "MicrosoftSQL"]:
assert employees.rowsCount() == -1 # Complex operations not tracked
else:
assert employees.rowsCount() == 2
WITH RECURSIVE ... AS (...)
UPDATE Employees
SET last_name = ?
WHERE employee_id = ?
AND employee_id IN (SELECT employee_id FROM Hierarchy)
WITH RECURSIVE ... AS (...)
UPDATE Employees
SET last_name = ?
WHERE employee_id = ?
AND employee_id IN (SELECT employee_id FROM Hierarchy)
['Ahmed', 100], ['kamal', 101]
# Updated 2 records with CTE filter
# SQLite3/MicrosoftSQL return -1 for complex operations
# Delete using CTE filter
# Note: Database evaluates CTE after each deletion!
# After deleting employee 100 (parent with manager_id=null),
# no parent is qualified, so 101 won't match in subsequent iteration
employees.delete(onColumns=["employee_id"])
if Record.database__.name in ["SQLite3", "MicrosoftSQL"]:
assert employees.rowsCount() == -1
else:
assert employees.rowsCount() == 1 # Only 1 deleted due to CTE re-evaluation
# Verify remaining records
availableEmployeesAfterDeletion = (
Employees()
.where(Employees.employee_id.in_([100, 101]))
.select(selected="employee_id")
)
assert availableEmployeesAfterDeletion.recordset.data == [{'employee_id': 101}]
WITH RECURSIVE ... AS (...)
DELETE FROM Employees
WHERE employee_id = ?
AND employee_id IN (SELECT employee_id FROM Hierarchy)
# Note: After first DELETE, CTE is re-evaluated.
# If hierarchy root is deleted, child records no longer match.
[100], [101]
# Only 1 record deleted due to CTE re-evaluation after each DELETE
# Employee 101 remains because hierarchy root (100) was deleted first
availableEmployeesAfterDeletion.recordset.data == [{'employee_id': 101}]
# Delete remaining employee for clean insert test
availableEmployeesAfterDeletion.delete()
# Insert with CTE (not supported by Oracle/MySQL)
if Record.database__.name not in ["Oracle", "MySQL"]:
employees.insert()
if Record.database__.name in ["SQLite3", "MicrosoftSQL"]:
assert employees.rowsCount() == -1
else:
assert employees.rowsCount() == 2
WITH RECURSIVE ... AS (...)
INSERT INTO Employees (employee_id, first_name, last_name, ...)
VALUES (?, ?, ?, ...)
# Note: INSERT with CTE not supported by Oracle and MySQL
[100, 'Steven', 'Ahmed', ...], [101, 'Neena', 'kamal', ...]
# 2 records inserted with CTE
# Not supported on Oracle/MySQL
# SQLite3/MicrosoftSQL return -1 for complex operations