Postgres Database Backend

This module provides the PostgreSQL implementation of the BaseDatabase interface. It is a robust, production-ready database backend.

Overview

The PostgresDatabase class can be used to create a database connection to a PostgreSQL server.

Typical usage:

import asyncio
from agentstr.database.postgres import PostgresDatabase

# Note: To run this example, you need a running PostgreSQL server
# and the 'asyncpg' driver installed.
# pip install asyncpg

# Create a database instance with the connection string
db = PostgresDatabase(conn_str="postgresql://user:password@host/dbname")

async def main():
    await db.async_init()
    print("Connection to PostgreSQL successful.")
    # ... perform database operations
    await db.close()
    print("Connection closed.")

# To run this, you would typically use:
# if __name__ == "__main__":
#     asyncio.run(main())

Reference

class agentstr.database.postgres.PostgresDatabase(conn_str: str, *, agent_name: str | None = None)[source]

Bases: BaseDatabase

PostgreSQL implementation using asyncpg.

USER_TABLE_NAME = 'agentstr_users'
MESSAGE_TABLE_NAME = 'agentstr_messages'
__init__(conn_str: str, *, agent_name: str | None = None)[source]
async async_init() Self[source]

Perform any asynchronous initialisation required for the backend.

async close() None[source]

Close the underlying connection (if any).

async add_message(thread_id: str, user_id: str, role: Literal['user', 'agent', 'tool'], message: str = '', content: str = '', kind: str = 'request', satoshis: int | None = None, extra_inputs: dict[str, Any] = {}, extra_outputs: dict[str, Any] = {}) Message[source]

Append a message to a thread and return the stored model.

async get_messages(thread_id: str, user_id: str, *, limit: int | None = None, before_idx: int | None = None, after_idx: int | None = None, reverse: bool = False) List[Message][source]

Retrieve messages for thread_id ordered by idx.

async get_current_thread_id(user_id: str) str | None[source]

Return the current thread id for user_id within this agent scope.

async set_current_thread_id(user_id: str, thread_id: str | None) None[source]

Persist thread_id as the current thread for user_id.

async get_user(user_id: str) User[source]

Fetch a User by user_id. Non-existent users yield a default model with a zero balance.

async upsert_user(user: User) None[source]

Create or update user in storage atomically.

See Also