Relay

This module provides the core relay functionality for the Agentstr SDK.

Overview

The EventRelay class handles communication with a single Nostr relay. It can be used to fetch events, send events, and listen for direct messages.

Typical usage:

import asyncio
from pynostr.key import PrivateKey
from pynostr.filters import Filters
from agentstr.relays.relay import EventRelay

async def main():
    # Use a real relay
    relay_url = "wss://relay.damus.io"

    # Generate a new private key for demonstration
    private_key = PrivateKey()

    # Initialize the EventRelay
    event_relay = EventRelay(relay_url, private_key)

    # Create filters to fetch recent text notes
    filters = Filters(kinds=[1], limit=5)

    # Fetch events
    events = await event_relay.get_events(filters)

    if events:
        print(f"Fetched {len(events)} events:")
        for event in events:
            print(f" - Event content: {event.content}")
    else:
        print("No events found.")

if __name__ == "__main__":
    asyncio.run(main())

Reference

pydantic model agentstr.relays.relay.DecryptedMessage[source]

Bases: BaseModel

A decrypted message from a Nostr relay.

Show JSON schema
{
   "title": "DecryptedMessage",
   "description": "A decrypted message from a Nostr relay.",
   "type": "object",
   "properties": {
      "event": {
         "$ref": "#/$defs/Event"
      },
      "message": {
         "title": "Message",
         "type": "string"
      }
   },
   "$defs": {
      "Event": {
         "properties": {
            "content": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Content"
            },
            "pubkey": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Pubkey"
            },
            "created_at": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Created At"
            },
            "kind": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": 1,
               "title": "Kind"
            },
            "tags": {
               "default": null,
               "items": {
                  "items": {
                     "type": "string"
                  },
                  "type": "array"
               },
               "title": "Tags",
               "type": "array"
            },
            "id": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Id"
            },
            "sig": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Sig"
            }
         },
         "title": "Event",
         "type": "object"
      }
   },
   "required": [
      "event",
      "message"
   ]
}

Fields:
field event: Event [Required]

The Nostr event containing the message.

field message: str [Required]

The decrypted message content.

agentstr.relays.relay.create_subscription(filters: Filters) list[str][source]

Create a subscription for the given filters.

Parameters:

filters – The filters to apply to the subscription.

Returns:

A list containing the subscription request components.

class agentstr.relays.relay.EventRelay(relay: str, private_key: PrivateKey | None = None, public_key: PublicKey | None = None)[source]

Bases: object

Handles communication with a single Nostr relay.

Parameters:
  • relay – WebSocket URL of the Nostr relay.

  • private_key – Private key for signing events.

  • public_key – Optional public key (derived from private_key if not provided).

__init__(relay: str, private_key: PrivateKey | None = None, public_key: PublicKey | None = None)[source]
async get_events(filters: Filters, limit: int = 10, timeout: int = 30, close_on_eose: bool = True) list[Event][source]

Fetch events matching the given filters from this relay.

Parameters:
  • filters – The filters to apply when fetching events.

  • limit – Maximum number of events to return. Defaults to 10.

  • timeout – Maximum time to wait for events in seconds. Defaults to 30.

  • close_on_eose – Whether to close the subscription after EOSE. Defaults to True.

Returns:

A list of up to limit events that match the filters, or an empty list if none found.

Note

Times out after timeout seconds if no matching events are found.

async get_event(filters: Filters, timeout: int = 30, close_on_eose: bool = True) Event | None[source]

Get a single event matching the filters or None if not found.

async send_event(event: Event)[source]

Publish an event to this relay.

decrypt_message(event: Event) DecryptedMessage | None[source]
async send_message(message: str | dict, recipient_pubkey: str) Event[source]
async receive_message(author_pubkey: str, timestamp: int | None = None, timeout: int = 30) DecryptedMessage | None[source]

Wait for and return the next direct message from the specified author.

async send_receive_message(message: str | dict, recipient_pubkey: str, timeout: int = 3) DecryptedMessage | None[source]
async event_listener(filters: Filters, callback: Callable[[Event], None], event_cache: ExpiringDict, lock: Lock)[source]

Continuously listen for events matching filters and call the callback for each one.

async direct_message_listener(filters: Filters, callback: Callable[[Event, str], None], event_cache: ExpiringDict, lock: Lock)[source]

Listen for direct messages and call the callback with decrypted content.