PydanticAI Agent System Tutorial: A Comprehensive Guide
Introduction
PydanticAI is a Python agent framework designed for building production-grade applications with Generative AI. It offers a streamlined approach compared to more abstract frameworks like LangChain, making it easier to develop, deploy, and maintain agent-based systems.
This tutorial will guide you through setting up PydanticAI, creating basic agents, implementing more complex agent systems, and providing best practices for debugging and monitoring.
There are two options to setup virtual environments: Conda and VENV.
Unless you know you are going to ONLY use Python, we suggest you use Conda since
it can also be used to manage non-Python languages.
Alternatively you can install PydanticAI and its dependencies in a Python VENV
123
# Create a virtual environment (recommended)
python-mvenvpydantic_env
sourcepydantic_env/bin/activate# On Windows: pydantic_env\Scripts\activate
Pip
1
pipinstallpydantic-aipydantic
You can also install various LLM model tools
1
pipinstallollamaopenai
Sample Test Code
1 2 3 4 5 6 7 8 910111213141516171819202122232425
frompydantic_aiimportAgent# Create an agent using the local DeepSeek-R1 model from Ollamaagent=Agent('ollama:deepseek-r1',# Use the Ollama model name formatsystem_prompt='Be concise, reply with one sentence.',model_kwargs={# Additional parameters for the Ollama API'base_url':'http://localhost:11434',# Default Ollama API URL'temperature':0.7,})# Run a simple queryresult=agent.run_sync('Where does "hello world" come from?')print(result.data)"""Expected output similar to:The first known use of "hello, world" was in a 1974 textbook about the C programming language."""# You can also try a more complex querycode_query=agent.run_sync('Write a Python function to calculate the Fibonacci sequence')print("\nCode generation example:")print(code_query.data)
For local development with Ollama, you'll need to have Ollama installed and a compatible model like DeepSeek R1:
123
# Download and install Ollama from https://ollama.ai/# Then pull the DeepSeek R1 model
ollamapulldeepseek-r1:7b
Before diving into code, let's understand key PydanticAI concepts:
Agents: Autonomous entities that use LLMs to make decisions and perform actions
Tools: Functions that agents can call to interact with external systems
Pydantic Models: Used to define structured inputs and outputs for tools
Prompts: Templates that guide the LLM's behavior within the agent
PydanticAI uses Pydantic's data validation to ensure type safety and proper documentation of tools, making it easier for LLMs to understand and use them correctly.
3. Creating Your First Agent
Let's create a simple agent that can respond to user queries:
frompydantic_aiimportAgentfrompydanticimportBaseModelimportlogging# Set up logginglogging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()])logger=logging.getLogger("pydantic_agent")# Define our agentclassSimpleAgent(Agent):"""A simple agent that can respond to user queries."""defget_system_prompt(self):return"""You are a helpful assistant that provides information about AI and programming. Answer questions concisely and accurately."""# Initialize the agentagent=SimpleAgent(model="openai/gpt-3.5-turbo",# For OpenAI# Alternatively for local Ollama model:# model="ollama/deepseek-r1:7b",)# Run the agentresponse=agent.run("What is an intelligent software agent?")print(response)
4. Tool Creation and Integration
Tools extend an agent's capabilities. Let's create some tools and integrate them:
frompydantic_aiimportAgent,toolfrompydanticimportBaseModelfromtypingimportList,Optionalimportdatetime# Define input/output models for our toolsclassWeatherRequest(BaseModel):location:strdate:Optional[datetime.date]=NoneclassWeatherResponse(BaseModel):temperature:floatconditions:strhumidity:Optional[float]=NoneclassSearchRequest(BaseModel):query:strmax_results:int=5classSearchResult(BaseModel):title:strurl:strsnippet:strclassSearchResponse(BaseModel):results:List[SearchResult]total_found:int# Create tool functions@tooldefget_weather(request:WeatherRequest)->WeatherResponse:""" Get current weather information for a specific location. Args: request: Contains the location and optional date for weather information Returns: Weather data including temperature and conditions """# In a real implementation, you would call a weather API herelogger.info(f"Getting weather for {request.location}")returnWeatherResponse(temperature=72.5,conditions="Sunny",humidity=45.0)@tooldefsearch_information(request:SearchRequest)->SearchResponse:""" Search for information on a specific topic. Args: request: Contains the search query and maximum number of results to return Returns: A list of search results with titles, URLs, and snippets """# In a real implementation, you would call a search APIlogger.info(f"Searching for: {request.query} (max: {request.max_results})")returnSearchResponse(results=[SearchResult(title="Example search result",url="https://example.com/result1",snippet="This is an example search result snippet")],total_found=1)# Create an agent with toolsclassAssistantAgent(Agent):"""An assistant agent with access to tools."""defget_system_prompt(self):return"""You are a helpful assistant with access to tools. Use the appropriate tool when needed to answer questions accurately."""# Register tools with the agenttools=[get_weather,search_information]# Initialize and use the agentassistant=AssistantAgent(model="openai/gpt-3.5-turbo")response=assistant.run("What's the weather like in New York?")print(response)
5. Advanced Agent Patterns
Now let's implement more advanced patterns like memory and multi-step reasoning:
frompydantic_aiimportAgent,tool,AgentExecutorfrompydanticimportBaseModelfromtypingimportList,Dict,Anyimportlogging# Enhanced logging for debugginglogger=logging.getLogger("advanced_agent")handler=logging.FileHandler("agent_debug.log")handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))logger.addHandler(handler)logger.setLevel(logging.DEBUG)# Memory modelclassConversationMemory(BaseModel):history:List[Dict[str,str]]=[]defadd_interaction(self,user_message:str,agent_response:str):self.history.append({"user":user_message,"agent":agent_response})logger.debug(f"Added to memory: User: {user_message[:50]}... Agent: {agent_response[:50]}...")defget_recent_history(self,limit:int=5)->List[Dict[str,str]]:returnself.history[-limit:]ifself.historyelse[]# ReAct pattern implementationclassReActAgent(Agent):"""An agent that uses the ReAct framework to interleave reasoning and action."""def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)self.memory=ConversationMemory()defget_system_prompt(self):recent_history=self.memory.get_recent_history()history_text="\n".join([f"User: {interaction['user']}\nAgent: {interaction['agent']}"forinteractioninrecent_history])returnf"""You are an assistant that carefully thinks through problems step by step. When faced with a complex task, break it down into smaller steps and solve each one. Recent conversation history:{history_text} Follow this format when using tools: Thought: I need to figure out what to do Action: Choose which tool to use Observation: Note the result ... (repeat as needed) Answer: Provide the final answer to the user """# Register tools and track executiondefrun(self,user_input:str)->str:logger.info(f"Processing user input: {user_input}")try:# Execute the agent with detailed loggingresponse=super().run(user_input)# Store interaction in memoryself.memory.add_interaction(user_input,response)returnresponseexceptExceptionase:logger.error(f"Error in agent execution: {str(e)}",exc_info=True)returnf"I encountered an error: {str(e)}"# Define tools for our ReAct agent@tooldefcalculate(expression:str)->float:""" Calculate the result of a mathematical expression. Args: expression: A string containing a mathematical expression Returns: The calculated result """logger.debug(f"Calculating: {expression}")try:# Warning: eval can be dangerous in production, use a safer alternativeresult=eval(expression)logger.debug(f"Calculation result: {result}")returnresultexceptExceptionase:logger.error(f"Calculation error: {str(e)}")raiseValueError(f"Error calculating expression: {str(e)}")# Initialize and use the ReAct agentreact_agent=ReActAgent(model="ollama/deepseek-r1:7b",temperature=0.2,# Lower temperature for more deterministic responsesmax_tokens=1000)# Example multi-step reasoning taskresponse=react_agent.run("If I have 5 apples and give 2 to my friend, then buy 3 more, how many do I have in total?")print(response)
6. Debugging Strategies
Effective debugging is crucial for agent development. Here's a comprehensive approach:
frompydantic_aiimportAgent,toolfrompydanticimportBaseModelimportloggingimportjsonimporttimefromtypingimportAny,Dict,List,Optional# Create a custom logger for agent debuggingclassAgentDebugLogger:def__init__(self,log_file="agent_debug.log",console_level=logging.INFO,file_level=logging.DEBUG):self.logger=logging.getLogger("agent_debugger")self.logger.setLevel(logging.DEBUG)# Clear existing handlers to avoid duplicationself.logger.handlers=[]# Console handlerconsole=logging.StreamHandler()console.setLevel(console_level)console.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))self.logger.addHandler(console)# File handler for detailed logsfile_handler=logging.FileHandler(log_file)file_handler.setLevel(file_level)file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))self.logger.addHandler(file_handler)deflog_tool_call(self,tool_name:str,inputs:Dict[str,Any],outputs:Any,duration:float):"""Log details of a tool call"""self.logger.debug(f"TOOL CALL: {tool_name}")self.logger.debug(f"INPUTS: {json.dumps(inputs,default=str)}")self.logger.debug(f"OUTPUTS: {json.dumps(outputs,default=str)}")self.logger.debug(f"DURATION: {duration:.4f}s")deflog_llm_call(self,prompt:str,response:str,duration:float):"""Log details of an LLM call"""self.logger.debug("LLM CALL:")self.logger.debug(f"PROMPT: {prompt[:200]}... (truncated)")self.logger.debug(f"RESPONSE: {response[:200]}... (truncated)")self.logger.debug(f"DURATION: {duration:.4f}s")# Save full prompt and response to files for detailed analysistimestamp=int(time.time())withopen(f"debug_prompt_{timestamp}.txt","w")asf:f.write(prompt)withopen(f"debug_response_{timestamp}.txt","w")asf:f.write(response)deflog_error(self,error_type:str,message:str,details:Optional[Dict]=None):"""Log error information"""self.logger.error(f"ERROR - {error_type}: {message}")ifdetails:self.logger.error(f"DETAILS: {json.dumps(details,default=str)}")# Create a debuggable agent wrapperclassDebuggableAgent(Agent):"""A wrapper for agents that adds detailed debugging capabilities"""def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)self.debug_logger=AgentDebugLogger()self.execution_stats={"tool_calls":0,"llm_calls":0,"errors":0,"total_duration":0}# Override the run method to add debuggingdefrun(self,user_input:str)->str:self.debug_logger.logger.info(f"AGENT RUN: Processing user input: {user_input}")start_time=time.time()try:# Capture the original tool calling mechanismoriginal_tool_call=self._call_tool# Define a wrapper to log tool callsdeflogged_tool_call(tool_name,**kwargs):self.execution_stats["tool_calls"]+=1tool_start=time.time()try:result=original_tool_call(tool_name,**kwargs)tool_duration=time.time()-tool_startself.debug_logger.log_tool_call(tool_name,kwargs,result,tool_duration)returnresultexceptExceptionase:self.execution_stats["errors"]+=1self.debug_logger.log_error("Tool Error",f"Error in {tool_name}: {str(e)}",kwargs)raise# Replace the tool call method with our logged versionself._call_tool=logged_tool_call# Run the agentresponse=super().run(user_input)# Restore original methodself._call_tool=original_tool_call# Update statisticsself.execution_stats["total_duration"]=time.time()-start_time# Log completionself.debug_logger.logger.info(f"AGENT COMPLETE: Duration={self.execution_stats['total_duration']:.2f}s, "f"Tool calls={self.execution_stats['tool_calls']}, "f"Errors={self.execution_stats['errors']}")returnresponseexceptExceptionase:self.execution_stats["errors"]+=1self.debug_logger.log_error("Agent Error",str(e))returnf"I encountered an error: {str(e)}"# Example usage of the debuggable agentdebuggable_agent=DebuggableAgent(model="ollama/deepseek-r1:7b",temperature=0.2)# Add tools to the agent@tooldeffetch_data(url:str)->Dict[str,Any]:"""Fetch data from a URL"""# Simulation of fetching dataif"error"inurl:raiseValueError("Failed to fetch data: connection error")return{"status":"success","data":{"sample":"value"}}debuggable_agent.tools=[fetch_data]# Test the debuggable agentresponse=debuggable_agent.run("Can you fetch data from https://example.com/api/data?")print(f"Agent response: {response}")# Intentionally trigger an error for demonstrationerror_response=debuggable_agent.run("Can you fetch data from https://error.example.com?")print(f"Error response: {error_response}")# Print execution statsprint(f"Execution stats: {json.dumps(debuggable_agent.execution_stats,indent=2)}")
7. Performance Optimization
When working with local models like DeepSeek R1 through Ollama, performance optimization becomes crucial:
frompydantic_aiimportAgent,toolimporttimeimportpsutilimportloggingimportjsonfromtypingimportDict,Any# Set up performance monitoring loggerperformance_logger=logging.getLogger("agent_performance")performance_logger.setLevel(logging.INFO)file_handler=logging.FileHandler("performance_metrics.log")file_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))performance_logger.addHandler(file_handler)classPerformanceMonitor:"""Monitor and optimize agent performance"""def__init__(self):self.metrics={"start_time":None,"end_time":None,"cpu_percent":[],"memory_percent":[],"response_times":[]}defstart_monitoring(self):"""Start monitoring system resources"""self.metrics["start_time"]=time.time()self._monitor_resources_thread=self._start_resource_monitoring()def_start_resource_monitoring(self):"""Start a thread to monitor CPU and memory usage"""importthreadingdefmonitor():whiletime.time()-self.metrics["start_time"]<120:# Monitor for 2 minutes maxself.metrics["cpu_percent"].append(psutil.cpu_percent())self.metrics["memory_percent"].append(psutil.virtual_memory().percent)time.sleep(0.5)thread=threading.Thread(target=monitor)thread.daemon=Truethread.start()returnthreaddefrecord_response_time(self,duration:float):"""Record an individual response time"""self.metrics["response_times"].append(duration)defstop_monitoring(self):"""Stop monitoring and save metrics"""self.metrics["end_time"]=time.time()total_duration=self.metrics["end_time"]-self.metrics["start_time"]# Summarize the metricssummary={"total_duration":total_duration,"avg_response_time":sum(self.metrics["response_times"])/len(self.metrics["response_times"])ifself.metrics["response_times"]else0,"max_response_time":max(self.metrics["response_times"])ifself.metrics["response_times"]else0,"avg_cpu_percent":sum(self.metrics["cpu_percent"])/len(self.metrics["cpu_percent"])ifself.metrics["cpu_percent"]else0,"max_cpu_percent":max(self.metrics["cpu_percent"])ifself.metrics["cpu_percent"]else0,"avg_memory_percent":sum(self.metrics["memory_percent"])/len(self.metrics["memory_percent"])ifself.metrics["memory_percent"]else0,"request_count":len(self.metrics["response_times"])}# Log the summaryperformance_logger.info(f"Performance Summary: {json.dumps(summary,indent=2)}")returnsummaryclassOptimizedAgent(Agent):"""An agent optimized for performance with local LLMs"""def__init__(self,*args,**kwargs):# Set optimal parameters for local LLM usagekwargs.setdefault("temperature",0.2)# Lower temperature for faster, more consistent responseskwargs.setdefault("max_tokens",500)# Limit token generation for speedsuper().__init__(*args,**kwargs)self.performance_monitor=PerformanceMonitor()self.performance_monitor.start_monitoring()defget_system_prompt(self):# Optimized prompt - shorter prompts process fasterreturn"""You are a helpful, efficient assistant. Keep responses concise and focused. Answer questions directly or use tools when appropriate."""defrun(self,user_input:str)->str:start_time=time.time()try:response=super().run(user_input)duration=time.time()-start_timeself.performance_monitor.record_response_time(duration)# Log performance for this requestperformance_logger.info(f"Request processed in {duration:.2f}s")returnresponseexceptExceptionase:duration=time.time()-start_timeself.performance_monitor.record_response_time(duration)performance_logger.error(f"Error processing request: {str(e)}, duration: {duration:.2f}s")returnf"Error: {str(e)}"deffinalize(self):"""Clean up and display final performance metrics"""summary=self.performance_monitor.stop_monitoring()print(f"Agent Performance Summary:")print(f" Total duration: {summary['total_duration']:.2f}s")print(f" Average response time: {summary['avg_response_time']:.2f}s")print(f" Requests processed: {summary['request_count']}")print(f" Avg CPU usage: {summary['avg_cpu_percent']:.1f}%")print(f" Avg memory usage: {summary['avg_memory_percent']:.1f}%")# Example usage with DeepSeek through Ollamaoptimized_agent=OptimizedAgent(model="ollama/deepseek-r1:7b",# Additional optimization parameters for Ollamamodel_kwargs={"num_ctx":2048,# Smaller context window for faster processing"repeat_penalty":1.1# Slight penalty to reduce token repetition})# Run a series of queries to test performancetest_queries=["What is the capital of France?","Explain how a binary search algorithm works","What are the main features of Python?","Define what an intelligent software agent is","What's the difference between supervised and unsupervised learning?"]forqueryintest_queries:print(f"\nQuery: {query}")response=optimized_agent.run(query)print(f"Response: {response[:100]}...")# Truncated for brevity# Get final performance metricsoptimized_agent.finalize()
8. Real-World Example: Building a Code Agent
Now let's build a more complex, practical agent for a real-world use case - a code assistant agent using PydanticAI that can help with Python programming tasks:
frompydantic_aiimportAgent,toolfrompydanticimportBaseModel,FieldfromtypingimportList,Dict,Any,Optionalimportloggingimportosimportjsonimportsubprocessimporttime# Advanced logging configurationlogging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("code_agent.log"),logging.StreamHandler()])logger=logging.getLogger("code_agent")# Tool input/output modelsclassCodeGenerationRequest(BaseModel):"""Request for generating code based on a description"""description:str=Field(...,description="Description of what the code should do")language:str=Field("python",description="Programming language to use")libraries:List[str]=Field(default_factory=list,description="List of libraries to use")classCodeGenerationResponse(BaseModel):"""Response containing the generated code"""code:str=Field(...,description="The generated code")explanation:str=Field(...,description="Explanation of how the code works")classCodeExecutionRequest(BaseModel):"""Request for executing code"""code:str=Field(...,description="Code to execute")input_data:Optional[Dict[str,Any]]=Field(None,description="Input data for code execution")classCodeExecutionResponse(BaseModel):"""Response from code execution"""output:str=Field(...,description="Output from code execution")success:bool=Field(...,description="Whether execution was successful")error:Optional[str]=Field(None,description="Error message if execution failed")execution_time:float=Field(...,description="Time taken to execute the code in seconds")classCodeReviewRequest(BaseModel):"""Request for reviewing code"""code:str=Field(...,description="Code to review")focus_areas:Optional[List[str]]=Field(None,description="Areas to focus on in the review")classCodeReviewResponse(BaseModel):"""Response from code review"""feedback:List[str]=Field(...,description="List of feedback points")suggestions:List[str]=Field(...,description="List of suggestions for improvement")overall_rating:int=Field(...,description="Rating from 1-10")# Define tools for our code agent@tooldefgenerate_code(request:CodeGenerationRequest)->CodeGenerationResponse:""" Generate code based on a description. Args: request: Contains the description of what the code should do, language, and libraries Returns: Generated code and explanation """logger.info(f"Generating {request.language} code for: {request.description}")start_time=time.time()# In a real implementation, you might use a code-specialized model here# This is a simplified examplecode=f"# {request.language} code for: {request.description}\n"ifrequest.language.lower()=="python":# Add importsifrequest.libraries:forlibinrequest.libraries:code+=f"import {lib}\n"code+="\n"# Add a simple function templatecode+=f"def main():\n"code+=f" # TODO: Implement {request.description}\n"code+=f" pass\n\n"code+=f"if __name__ == '__main__':\n"code+=f" main()\n"duration=time.time()-start_timelogger.debug(f"Code generation took {duration:.2f}s")returnCodeGenerationResponse(code=code,explanation=f"This is a basic template for {request.description}. It includes the necessary imports and a main function structure.")@tooldefexecute_code(request:CodeExecutionRequest)->CodeExecutionResponse:""" Execute Python code in a safe environment and return the output. Args: request: Contains the code to execute and optional input data Returns: Execution output, success status, any error messages, and execution time """logger.info("Executing code")logger.debug(f"Code to execute: {request.code[:100]}...")# Create a temporary Python filetemp_file="temp_execution.py"withopen(temp_file,"w")asf:f.write(request.code)start_time=time.time()try:# Execute in a controlled environment# Note: In production, you should use a proper sandboxresult=subprocess.run(["python",temp_file],capture_output=True,text=True,timeout=10# Limit execution time)success=result.returncode==0output=result.stdoutifsuccesselseresult.stderrerror=Noneifsuccesselseresult.stderrexceptsubprocess.TimeoutExpired:success=Falseoutput="Execution timed out after 10 seconds"error="Timeout error"exceptExceptionase:success=Falseoutput=f"Error executing code: {str(e)}"error=str(e)finally:# Clean upifos.path.exists(temp_file):os.remove(temp_file)execution_time=time.time()-start_timelogger.info(f"Code execution completed in {execution_time:.2f}s with success={success}")returnCodeExecutionResponse(output=output,success=success,error=error,execution_time=execution_time)@tooldefreview_code(request:CodeReviewRequest)->CodeReviewResponse:""" Review code and provide feedback for improvement. Args: request: Contains the code to review and optional focus areas Returns: Feedback, suggestions, and overall rating """logger.info("Reviewing code")# In a real implementation, you would analyze the code# This is a simplified examplefeedback=["Code structure is clean","Good use of comments"]suggestions=["Consider adding error handling","Add type hints for better readability"]rating=7focus_areas=request.focus_areasor[]if"performance"infocus_areas:feedback.append("No obvious performance issues detected")suggestions.append("Consider using list comprehensions for better performance")if"security"infocus_areas:feedback.append("No obvious security vulnerabilities detected")suggestions.append("Validate user inputs to prevent injection attacks")returnCodeReviewResponse(feedback=feedback,suggestions=suggestions,overall_rating=rating)# Create our code assistant agentclassCodeAssistantAgent(Agent):"""An agent specialized in code generation, execution, and review"""defget_system_prompt(self):return"""You are a Python programming assistant with expertise in software development. You can help users by generating code, executing it, and providing code reviews. When helping users with code: 1. Understand their requirements clearly 2. Generate appropriate code using the generate_code tool 3. Test the code using the execute_code tool if needed 4. Provide explanations and improvements using the review_code tool Keep your explanations clear and focused on the user's needs. """# Register toolstools=[generate_code,execute_code,review_code]# Initialize the agentcode_assistant=CodeAssistantAgent(model="ollama/deepseek-r1:7b",temperature=0.3,max_tokens=1000)# Example usagetest_requests=["Can you write a Python function to calculate the Fibonacci sequence?","I need a script to read a CSV file and calculate the average of a column","Can you help me understand how decorators work in Python?"]forrequestintest_requests:print(f"\n\n===== PROCESSING REQUEST: {request} =====\n")response=code_assistant.run(request)print(response)
# Troubleshooting model connection issuesfrompydantic_aiimportAgentimportlogging# Enable verbose logginglogging.basicConfig(level=logging.DEBUG)deftest_model_connection(model_name):"""Test connection to an LLM model"""try:agent=Agent(model=model_name)test_response=agent.run("Hello, are you working correctly?")print(f"Response from {model_name}: {test_response}")returnTrueexceptExceptionase:print(f"Error connecting to {model_name}: {str(e)}")returnFalse# Test different model configurationsmodels_to_test=["openai/gpt-3.5-turbo",# OpenAI model"ollama/deepseek-r1:7b",# Local Ollama model"anthropic/claude-3-sonnet"# Anthropic model]formodelinmodels_to_test:print(f"\nTesting connection to {model}...")success=test_model_connection(model)print(f"Connection test {'succeeded'ifsuccesselse'failed'}")
from pydantic_ai import Agent
from typing import List, Dict, Any
import logging
import json
# Set up logging
logging.basicConfig(level=logging.INFO)
context_logger = logging.getLogger("context_management")
class ContextManager:
"""Helper class to manage LLM context size and debug context issues"""
def __init__(self, max_tokens=4000):
self.max_tokens = max_tokens
self.conversation_history: List[Dict[str, Any]] = []
self.estimated_token_count = 0
def add_message(self, role: str, content: str):
"""Add a message to the conversation history"""
# Rough estimation: 1 token ≈ 4 characters
estimated_tokens = len(content) // 4
message = {"role": role, "content": content}
self.conversation_history.append(message)
self.estimated_token_count += estimated_tokens
context_logger.info(f"Added {role} message with ~{estimated_tokens} tokens")
context_logger.info(f"Current estimated token count: {self.estimated_token_count}/{self.max_tokens}")
# If we're approaching the limit, summarize older messages
if self.estimated_token_count > self.max_tokens * 0.8:
context_logger.warning(f"Approaching token limit, summarizing conversation")
self._summarize_history()
def _summarize_history(self):
"""Summarize older conversation messages to reduce token count"""
if len(self.conversation_history) <= 4:
return # Keep at least the last few messages
# Extract messages to summarize (all but the last 4)
to_summarize = self.conversation_history[:-4]
# Create a summary message
summary_content = f"[Summary of {len(to_summarize)} previous messages]"
summary_message = {"role": "system", "content": summary_content}
# Replace old messages with summary
self.conversation_history = [summary_message] + self.conversation_history[-4:]
# Recalculate token count
self.estimated_token_count = sum(len(msg["content"]) // 4 for msg in self.conversation_history)
context_logger.info(f"Summarized conversation history. New token count: {self.estimated_token_count}")
def get_formatted_history(self) -> str:
"""Get the conversation history formatted for the LLM prompt"""
formatted = ""
for msg in self.conversation_history:
formatted += f"{msg['role'].upper()}: {msg['content']}\n\n"
return formatted
def debug_token_usage(self):
"""Print detailed token usage for debugging"""
context_logger.debug("==== Token Usage Breakdown ====")
for i, msg in enumerate(self.conversation_history):
tokens = len(msg["content"]) // 4
percent = (tokens / self.estimated_token_count) * 100 if self.estimated_token_count > 0 else 0
context_logger.debug(f"Message {i+1} ({msg['role']}): ~{tokens} tokens ({percent:.1f}%)")
context_logger.debug(f"Total: ~{self.estimated_token_count} tokens")
# Example usage with an agent
class ContextAwareAgent(Agent):
"""An agent that is aware of context limitations"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.context_manager = ContextManager(max_tokens=4000)
def get_system_prompt(self):
# Include conversation history in the prompt
history = self.context_manager.get_formatted_history()
return f"""You are a helpful assistant with memory of our conversation.
Previous conversation:
{history}
Respond to the user's latest message.
"""
def run(self, user_input: str) -> str:
# Add the user message to history
self.context_manager.add_message("user", user_input)
# Run the agent
response = super().run(user_input)
# Add the agent response to history
self.context_manager.add_message("assistant", response)
# Debug token usage
self.context_manager.debug_token_usage()
return response
# Test the context-aware agent
context_agent = ContextAwareAgent(model="ollama/deepseek-r1:7b")
# Simulate a conversation
responses = []
for i in range(5):
user_message = f"This is test message {i+1}. Tell me something interesting about Python programming."
print(f"\n>> USER: {user_message}")
response = context_agent.run(user_message)
responses.append(response)
print(f"<< ASSISTANT: {response[:100]}...")
Deploying in Production
For production deployment of your PydanticAI agents, consider these best practices:
from pydantic_ai import Agent, tool
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
import logging
import time
import uuid
import json
from typing import Dict, Any, List, Optional
# Set up production-ready logging
logging.config.dictConfig({
'version': 1,
'formatters': {
'default': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'default',
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'level': 'DEBUG',
'formatter': 'default',
'filename': 'production.log',
'maxBytes': 10485760, # 10MB
'backupCount': 5,
}
},
'root': {
'level': 'INFO',
'handlers': ['console', 'file']
}
})
logger = logging.getLogger("production_agent")
# Define API models
class AgentRequest(BaseModel):
message: str
session_id: Optional[str] = None
class AgentResponse(BaseModel):
response: str
session_id: str
request_id: str
processing_time: float
# Create a production agent with monitoring and error handling
class ProductionAgent(Agent):
"""Production-ready agent with error handling and monitoring"""
def __init__(self, *args, fallback_response="I'm sorry, I'm having trouble processing that request right now. Please try again in a moment.", **kwargs):
super().__init__(*args, **kwargs)
self.fallback_response = fallback_response
self.sessions: Dict[str, Dict[str, Any]] = {}
def get_system_prompt(self, session_id=None):
# Get session-specific prompt if available
if session_id and session_id in self.sessions:
return self.sessions[session_id].get("system_prompt", self._default_system_prompt())
return self._default_system_prompt()
def _default_system_prompt(self):
return """You are a helpful, accurate, and professional assistant.
Provide clear and concise responses to user inquiries."""
def run_with_monitoring(self, user_input: str, session_id: Optional[str] = None) -> Dict[str, Any]:
"""Run the agent with comprehensive monitoring and error handling"""
start_time = time.time()
request_id = str(uuid.uuid4())
# Create session if it doesn't exist
if session_id and session_id not in self.sessions:
self.sessions[session_id] = {"created_at": time.time(), "requests": 0}
# Update session stats
if session_id:
self.sessions[session_id]["requests"] = self.sessions[session_id].get("requests", 0) + 1
self.sessions[session_id]["last_activity"] = time.time()
logger.info(f"Processing request {request_id} for session {session_id}")
try:
# Run the agent with the session-specific prompt
original_get_system_prompt = self.get_system_prompt
self.get_system_prompt = lambda: self.get_system_prompt(session_id)
response = super().run(user_input)
# Restore original method
self.get_system_prompt = original_get_system_prompt
processing_time = time.time() - start_time
logger.info(f"Request {request_id} processed in {processing_time:.2f}s")
# Log performance metrics (could be sent to monitoring system)
if processing_time > 5.0:
logger.warning(f"Slow response detected: {processing_time:.2f}s for request {request_id}")
return {
"response": response,
"session_id": session_id or request_id,
"request_id": request_id,
"processing_time": processing_time,
"status": "success"
}
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Error processing request {request_id}: {str(e)}", exc_info=True)
return {
"response": self.fallback_response,
"session_id": session_id or request_id,
"request_id": request_id,
"processing_time": processing_time,
"status": "error",
"error": str(e)
}
# Initialize FastAPI
app = FastAPI(title="PydanticAI Agent API")
# Initialize the production agent
production_agent = ProductionAgent(
model="ollama/deepseek-r1:7b",
# For high-volume production, consider using a cloud model with higher throughput
# model="openai/gpt-4-turbo",
temperature=0.2,
max_tokens=1000
)
# Add tools for the production agent
@tool
def search_knowledge_base(query: str) -> List[Dict[str, str]]:
"""Search the knowledge base for information"""
# In production, this would connect to your actual knowledge base
return [{"title": "Sample result", "content": "This is a sample search result"}]
production_agent.tools = [search_knowledge_base]
# API endpoints
@app.post("/api/agent", response_model=AgentResponse)
async def query_agent(request: AgentRequest, background_tasks: BackgroundTasks):
"""Process an agent request synchronously"""
result = production_agent.run_with_monitoring(request.message, request.session_id)
# Schedule cleanup in the background
background_tasks.add_task(cleanup_old_sessions)
if result["status"] == "error":
logger.error(f"Error in request {result['request_id']}: {result.get('error')}")
return AgentResponse(
response=result["response"],
session_id=result["session_id"],
request_id=result["request_id"],
processing_time=result["processing_time"]
)
async def cleanup_old_sessions():
"""Clean up inactive sessions"""
current_time = time.time()
inactive_threshold = 3600 # 1 hour
inactive_sessions = [
session_id for session_id, data in production_agent.sessions.items()
if current_time - data.get("last_activity", 0) > inactive_threshold
]
for session_id in inactive_sessions:
del production_agent.sessions[session_id]
logger.info(f"Cleaned up inactive session {session_id}")
# Health check endpoint
@app.get("/health")
async def health_check():
"""Health check endpoint for monitoring"""
return {"status": "healthy", "timestamp": time.time()}
Conclusion
PydanticAI offers a well-structured, efficient framework for building AI agents that balances simplicity with power. Key takeaways from this tutorial:
Start Simple: Begin with basic agents and gradually add complexity as needed.
Documentation is Key: Well-documented tools with clear Pydantic models help LLMs understand how to use them correctly.
Debugging is Essential: Implement comprehensive logging and monitoring from the start to identify and fix issues quickly.
Performance Matters: Optimize your agents for local LLM deployment when running models like DeepSeek R1 with Ollama.
Test Thoroughly: Verify agent behavior across a range of inputs before deploying to production.
Remember that agent development is an iterative process. Start with a minimal viable agent, test it thoroughly, and progressively enhance its capabilities based on real-world performance and user feedback.
As you build more complex agent systems, consider implementing the ReAct framework to interleave reasoning and action, and explore multi-agent architectures for tasks that benefit from specialized capabilities working together.