Skip to main content

Database Connections

You can register your PostgreSQL database with Oryonix for each environment. Make sure your database is accessible via the Internet and you have the right credentials. Both standard PostgreSQL and NeonDB are supported.

Register Your Database

Use onix environment database add to connect a database to your environment.

To register a database, you need to provide:

  • A name for the connection (used in your code to reference this database)
  • The database type (e.g. postgres or neondb)
  • Authentication details (credentials or token)
  • The host where the database is running
  • The name of the database on the host

Example: PostgreSQL

onix environment database add \
--name <connection-name> \
--database-type postgres \
--auth-type credentials \
--host <host> \
--database <database-name> \
--username <username> \
--password <password>

Example: NeonDB

onix environment database add \
--name <connection-name> \
--database-type neondb \
--auth-type token \
--host <host> \
--database <database-name> \
--authorization-token <token> \
--project-id <project-id> \
--role-id <role-id>

Use the Database in Python

Access your database via the onix.db module inside @onix.action functions. The module follows PEP 249 (DB-API 2.0).

Always open connections inside actions, not at module level or in global variables. Each action should open, use, and close its own connection. Oryonix manages connection lifecycle for you; there is no connection pool to configure.

import onix

@onix.action
def get_user(user_id: str):
with onix.db.connect("<connection-name>") as conn:
with conn.cursor() as cur:
cur.execute("SELECT id, name, email FROM users WHERE id = $1", [int(user_id)])
row = cur.fetchone()
return {"id": row[0], "name": row[1], "email": row[2]} if row else None

The string passed to onix.db.connect() is the name you gave the database when registering it.

API Reference

onix.db.connect(db_name) → Connection

Opens a connection to the named database. Returns a Connection object.

Connection

MethodDescription
cursor()Returns a new Cursor
close()Closes the connection

Supports use as a context manager (with onix.db.connect(...) as conn:).

Cursor

MethodDescription
execute(sql, params=None)Execute a query. Use $1, $2, ... placeholders for parameters
executemany(sql, seq_of_params)Execute a query once for each parameter set
fetchone()Fetch the next row as a tuple, or None if no more rows
fetchmany(size=arraysize)Fetch up to size rows as a list of tuples
fetchall()Fetch all remaining rows as a list of tuples
close()Close the cursor

cursor.description returns column metadata after a query is executed. Each entry is a 7-tuple (name, type_code, display_size, internal_size, precision, scale, null_ok) per PEP 249. name and type_code are populated — type_code is one of the PEP 249 type objects (STRING, BINARY, NUMBER, DATETIME). The remaining five fields are None.

cur.execute("SELECT name, occurred_on, payload FROM events LIMIT 1")
cur.fetchone()
for col in cur.description:
print(col[0], col[1])
# name <class 'str'>
# occurred_on <class 'datetime.date'>
# payload <class 'bytes'>

Supports use as a context manager (with conn.cursor() as cur:).

Parameters

Parameters are passed as a list and referenced positionally with $1, $2, etc. The types are inferred automatically from the prepared statement — you do not need to cast values manually.

cur.execute("SELECT * FROM orders WHERE user_id = $1 AND status = $2", [user_id, "pending"])

Supported Types

Postgres typePython value inPython value out
BOOLboolbool
INT2intint
INT4intint
INT8intint
FLOAT4floatfloat
FLOAT8floatfloat
TEXT, VARCHAR, UUID, NUMERIC, etc.strstr
DATEdatetime.datedatetime.date
TIMEdatetime.timedatetime.time
TIMESTAMPdatetime.datetimedatetime.datetime
TIMESTAMPTZdatetime.datetime (with tzinfo)datetime.datetime (with tzinfo)
BYTEAbytesbytes
JSON, JSONBstr (JSON-encoded)str
NULLNoneNone

Type Helpers

onix.db.Date, onix.db.Time, onix.db.Timestamp, and onix.db.Binary are aliases for the Python stdlib types datetime.date, datetime.time, datetime.datetime, and bytes respectively. You can use either form.

Date(year, month, day)

cur.execute(
"INSERT INTO events (name, occurred_on) VALUES ($1, $2)",
["launch", onix.db.Date(2024, 1, 15)],
)

Rows returned from a DATE column come back as a datetime.date object.

Time(hour, minute, second)

cur.execute("SELECT * FROM slots WHERE start_time > $1", [onix.db.Time(9, 0, 0)])

Rows returned from a TIME column come back as a datetime.time object.

Timestamp(year, month, day, hour, minute, second)

cur.execute(
"INSERT INTO logs (created_at) VALUES ($1)",
[onix.db.Timestamp(2024, 1, 15, 12, 30, 0)],
)

To insert the current time into a TIMESTAMPTZ column, use datetime.datetime.now(datetime.timezone.utc):

import datetime

cur.execute(
"INSERT INTO logs (created_at) VALUES ($1)",
[datetime.datetime.now(datetime.timezone.utc)],
)
warning

Always pass timezone-aware datetimes when writing to TIMESTAMPTZ columns. Workers can run on machines in any timezone, so datetime.datetime.now() (without a timezone) and time.localtime() will produce different results depending on where the worker runs. Use datetime.datetime.now(datetime.timezone.utc) or another explicit timezone.

Rows returned from a TIMESTAMP column come back as a datetime.datetime object. Rows from a TIMESTAMPTZ column come back as a timezone-aware datetime.datetime (UTC).

Binary(data)

Converts bytes or bytearray to bytes for writing to a BYTEA column.

cur.execute(
"INSERT INTO files (name, content) VALUES ($1, $2)",
["hello.txt", onix.db.Binary(b"hello world")],
)

Rows returned from a BYTEA column come back as bytes directly.

cur.execute("SELECT content FROM files WHERE name = $1", ["hello.txt"])
row = cur.fetchone()
print(row[0]) # bytes: b"hello world"

Tick Constructors

DateFromTicks(ticks), TimeFromTicks(ticks), and TimestampFromTicks(ticks) construct Date, Time, and Timestamp objects from a Unix timestamp (seconds since epoch), interpreted as UTC.

Iterate with a Cursor

Cursor supports the iterator protocol, so you can loop directly over results without calling fetchall():

cur.execute("SELECT id, name FROM users")
for row in cur:
print(row[0], row[1])

Large Result Sets

For queries that return many rows, fetch in batches using fetchmany() to avoid loading everything into memory at once:

cur.execute("SELECT * FROM events ORDER BY created_at DESC")
while True:
batch = cur.fetchmany(100)
if not batch:
break
for row in batch:
process(row)

Best Practices

Do One Query per Action

As described in Flow and Action Functions, actions are retried automatically on transient failures and must be idempotent. This applies directly to database work: keep each action focused on a single operation so that a retry cannot double-apply a write.

For writes, make operations idempotent where possible:

@onix.action
def create_user(name: str, email: str) -> str | None:
with onix.db.connect("<connection-name>") as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (name, email) VALUES ($1, $2) ON CONFLICT (email) DO NOTHING RETURNING id",
[name, email],
)
row = cur.fetchone()
return str(row[0]) if row else None

Use Flows for Multi-Step Operations

If you need multiple writes to happen reliably in sequence, model each step as its own action and compose them in a flow. Each action is independently retried on failure, which is the safe alternative to a single long-running transaction.

@onix.action
def create_order(user_id: str, items: list) -> str:
with onix.db.connect("<connection-name>") as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO orders (user_id) VALUES ($1) RETURNING id", [user_id])
row = cur.fetchone()
return str(row[0])

@onix.action
def add_order_items(order_id: str, items: list):
with onix.db.connect("<connection-name>") as conn:
with conn.cursor() as cur:
cur.executemany(
"INSERT INTO order_items (order_id, product_id, qty) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
[[order_id, item["product_id"], item["qty"]] for item in items],
)

@onix.flow
def place_order(user_id: str, items: list):
order_id = create_order(user_id, items)
add_order_items(order_id, items)

If add_order_items is retried, the ON CONFLICT DO NOTHING ensures items are not duplicated.

warning

Decomposing operations into separate actions does not provide transactional isolation. Between create_order and add_order_items completing, other database clients can observe the intermediate state — an order row with no items. If your application requires true atomicity or isolation (e.g. financial writes, inventory changes where partial state must never be visible), perform all related writes within a single action instead.

@onix.action
def place_order(user_id: str, items: list) -> str:
with onix.db.connect("<connection-name>") as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO orders (user_id) VALUES ($1) RETURNING id", [user_id])
order_id = str(cur.fetchone()[0])
cur.executemany(
"INSERT INTO order_items (order_id, product_id, qty) VALUES ($1, $2, $3)",
[[order_id, item["product_id"], item["qty"]] for item in items],
)
return order_id

The tradeoff is that if this action is retried after a partial write, you need to handle idempotency yourself (e.g. with ON CONFLICT DO NOTHING or a client-supplied idempotency key).

Error Handling

onix.db uses the standard PEP 249 exception hierarchy:

Warning
Error
├── InterfaceError # Misuse of the API (e.g. using a closed connection or cursor)
└── DatabaseError
├── OperationalError # Connection failures, timeouts, server unavailable
├── ProgrammingError # Bad SQL, wrong number of parameters, unknown table/column
├── IntegrityError # Constraint violations (unique, foreign key, not null, etc.)
├── DataError # Invalid values for the column type
└── InternalError # Cursor or transaction in an invalid state

Transient Failures

OperationalError (connection failures, timeouts, server unavailable) is treated as a transient failure — Oryonix will retry the action automatically. You do not need to write retry logic for these.

Non-transient Failures

Catch errors that represent conditions your code can respond to meaningfully:

@onix.action
def create_user(name: str, email: str) -> str | None:
with onix.db.connect("<connection-name>") as conn:
with conn.cursor() as cur:
try:
cur.execute(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
[name, email],
)
row = cur.fetchone()
except onix.db.IntegrityError:
# email already exists — return None or raise a domain error
return None
return str(row[0])

IntegrityError — constraint violations (duplicate unique values, foreign key mismatches, not-null violations). These will not resolve on retry and should be handled in your action.

ProgrammingError — bad SQL, wrong parameter count, unknown table or column. Indicates a bug in your code.

InterfaceError — API misuse such as operating on a closed connection or cursor. Also a bug, not a runtime condition.

Do not catch OperationalError to implement your own retry loop — let Oryonix handle it.

To catch any database error regardless of cause, use the top-level onix.db.Error:

try:
cur.execute(...)
except onix.db.Error as e:
logger.error("database error: %s", e)
raise

Full Example

import onix

@onix.action
def create_user(name: str, email: str):
with onix.db.connect("<connection-name>") as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
[name, email],
)

@onix.action
def list_users() -> list:
with onix.db.connect("<connection-name>") as conn:
with conn.cursor() as cur:
cur.execute("SELECT id, name, email FROM users ORDER BY name")
return [
{"id": row[0], "name": row[1], "email": row[2]}
for row in cur
]