Source code for agentstr.nostr_rag

import json
from typing import Literal

from pynostr.event import Event
from pydantic import BaseModel

from agentstr.logger import get_logger
from agentstr.nostr_client import NostrClient

try:
    from langchain_community.embeddings import FakeEmbeddings
    from langchain_core.documents import Document
    from langchain_core.messages import HumanMessage
    from langchain_core.vectorstores import InMemoryVectorStore
    from langchain_openai import ChatOpenAI
    langchain_installed = True
except ImportError:
    FakeEmbeddings = "FakeEmbeddings"
    InMemoryVectorStore = "InMemoryVectorStore"
    Document = "Document"
    HumanMessage = "HumanMessage"
    ChatOpenAI = "ChatOpenAI"
    langchain_installed = False

logger = get_logger(__name__)


[docs] class Author(BaseModel): pubkey: str name: str | None = None
[docs] class NostrRAG: """Retrieval-Augmented Generation (RAG) system for Nostr events. This class fetches Nostr events, builds a vector store knowledge base, and enables semantic search and question answering over the indexed content. Examples -------- Simple question answering over recent posts:: import asyncio from langchain_openai import ChatOpenAI from agentstr import NostrRAG relays = ["wss://relay.damus.io"] rag = NostrRAG(relays=relays, llm=ChatOpenAI(model_name="gpt-3.5-turbo")) async def main(): answer = await rag.query(question="What's new with Bitcoin?", limit=8) print(answer) asyncio.run(main()) Full runnable script: `rag.py <https://github.com/agentstr/agentstr-sdk/tree/main/examples/rag.py>`_ """
[docs] def __init__(self, nostr_client: NostrClient | None = None, vector_store=None, relays: list[str] | None = None, private_key: str | None = None, nwc_str: str | None = None, embeddings=None, llm=None, llm_model_name=None, llm_base_url=None, llm_api_key=None, known_authors: list[Author] | None = None): """Initialize the NostrRAG system. Args: nostr_client: An existing NostrClient instance (optional). vector_store: An existing vector store instance (optional). relays: List of Nostr relay URLs (if no client provided). private_key: Nostr private key in 'nsec' format (if no client provided). nwc_str: Nostr Wallet Connect string for payments (optional). embeddings: Embedding model for vectorizing documents (defaults to FakeEmbeddings with size 256). llm: Language model (optional). llm_model_name: Name of the language model to use (optional). llm_base_url: Base URL for the language model (optional). llm_api_key: API key for the language model (optional). Raises: ImportError: If LangChain is not installed. """ if not langchain_installed: logger.error("Langchain not found. Please install it to use NostrRAG. `pip install agentstr-sdk[rag]`") raise ImportError("Langchain not found. Please install it to use NostrRAG. `pip install agentstr-sdk[rag]`") self.nostr_client = nostr_client or NostrClient(relays=relays, private_key=private_key, nwc_str=nwc_str) self.embeddings = embeddings or FakeEmbeddings(size=256) self.vector_store = vector_store or InMemoryVectorStore(self.embeddings) self.known_authors = known_authors or [] self.known_authors_name_to_pubkey = {author.name: author.pubkey for author in self.known_authors} if llm is None and llm_model_name is None: raise ValueError("llm or llm_model_name must be provided") self.llm = llm or ChatOpenAI(model_name=llm_model_name, base_url=llm_base_url, api_key=llm_api_key, temperature=0)
async def _select_author(self, question: str) -> tuple[str, str]: """Select relevant users for the given question. Args: question: The question to find a relevant user in Returns: The selected user's name """ template = """ You are an user selector for Nostr. Given a question, suggest the relevant user mentioned in the question. You must select from the list of known users. Return ONLY the users in a JSON array format, like: ["Lyn Alden"] or ["Saifedean Ammous"] Only respond with 1 user. If no user is mentioned, return an empty array. Question: {question} Known users: {users} """ prompt = template.format(question=question, users=json.dumps([author.name for author in self.known_authors])) response = await self.llm.ainvoke([HumanMessage(content=prompt)]) try: authors = json.loads(response.content) if len(authors) == 0: return None, None elif authors[0] in self.known_authors_name_to_pubkey: return authors[0], self.known_authors_name_to_pubkey[authors[0]] else: logger.warning(f"Selected author not found in known authors: {authors[0]}") return None, None except json.JSONDecodeError: # If the response isn't valid JSON, try to extract authors logger.warning(f"Failed to parse author selection response. Response: {response.content}") return None, None async def _select_hashtags(self, question: str, previous_hashtags: list[str] | None = None) -> list[str]: """Select relevant hashtags for the given question. Args: question: The user's question previous_hashtags: Previously used hashtags for this conversation Returns: List of relevant hashtags """ template = """ You are a hashtag selector for Nostr. Given a question, suggest relevant hashtags that would help find relevant content. Return ONLY the hashtags in a JSON array format, like: ["#hashtag1", "#hashtag2"] Use at most 5 hashtags. Question: {question} Previous hashtags: {history} """ history = json.dumps(previous_hashtags or []) prompt = template.format(question=question, history=history) response = await self.llm.ainvoke([HumanMessage(content=prompt)]) try: hashtags = json.loads(response.content) return hashtags except json.JSONDecodeError: # If the response isn't valid JSON, try to extract hashtags text = response.content hashtags = [] # Find hashtags in the text for word in text.split(): if word.startswith("#"): hashtags.append(word) return hashtags[:5] # Return at most 5 hashtags def _process_event(self, event: Event) -> Document: """Process a Nostr event into a LangChain Document. Args: event: A Nostr event. Returns: Document: A LangChain Document with the event's content and ID. """ content = event.content metadata = event.to_dict() metadata.pop("content") return Document(page_content=content, id=event.id, metadata=metadata)
[docs] async def build_knowledge_base(self, question: str, limit: int = 10, query_type: Literal["hashtags", "authors"] = "hashtags") -> list[dict]: """Build a knowledge base from Nostr events relevant to the question. Args: question: The user's question to guide hashtag selection limit: Maximum number of posts to retrieve Returns: List of retrieved events """ # Select relevant hashtags for the question if query_type == "hashtags": hashtags = await self._select_hashtags(question) hashtags = [hashtag.lstrip("#") for hashtag in hashtags] logger.info(f"Selected hashtags: {hashtags}") # Fetch events for each hashtag events = await self.nostr_client.read_posts_by_tag(tags=hashtags, limit=limit) elif query_type == "authors": name, pubkey = await self._select_author(question) if pubkey is None: return [] events = await self.nostr_client.read_posts_by_author(pubkey=pubkey, limit=limit) for event in events: event.content = f"Posted by {name}:\n\n{event.content}" else: raise ValueError(f"Invalid query type: {query_type}") # Process events into documents documents = [self._process_event(event) for event in events] await self.vector_store.aadd_texts([doc.page_content for doc in documents]) return events
[docs] async def retrieve(self, question: str, limit: int = 5, query_type: Literal["hashtags", "authors"] = "hashtags") -> list[Document]: """Retrieve relevant documents from the knowledge base. Args: question: The user's question limit: Maximum number of documents to retrieve query_type: Type of query to use (hashtags or authors) Returns: List of retrieved documents """ await self.build_knowledge_base(question, limit=limit, query_type=query_type) return await self.vector_store.asimilarity_search(question, k=limit)
[docs] async def query(self, question: str, limit: int = 5, query_type: Literal["hashtags", "authors"] = "hashtags") -> str: """Ask a question using the knowledge base. Args: question: The user's question limit: Number of documents to retrieve for context query_type: Type of query to use (hashtags or authors) Returns: The generated response """ # Get relevant documents relevant_docs = await self.retrieve(question, limit, query_type) # Generate response using the LLM template = """ You are an expert assistant. Answer the following question based on the provided context. Question: {question} Context: {context} Answer:""" prompt = template.format( question=question, context="\n\n".join([doc.page_content for doc in relevant_docs]), ) logger.info(f"Using prompt:\n{prompt}") response = await self.llm.ainvoke([HumanMessage(content=prompt)]) return response.content