AI Agents in LangGraph

 Deeplearning.ai “AI Agents in LangGraph” course summary.

Introduction

There were a few significant improvements throughout the last year. The first function, contacting LLMs, has made tool use considerably more predictable and consistent. Furthermore, particular tools such as search have been tailored for agentic use. When you enter a query into a search engine, it will return various links that you can follow to get answers. However, what an agent needs is answered when you refer to the website. Additionally, the agent requires predictable formats for its results. This is what Agentic Search offers. 

When three agents collaborate on a bigger task, this is known as an agentic workflow. The agentic workflow begins with planning, which involves thinking through the steps of the procedure. Tool utilization is a crucial aspect of an agentic workflow. LLMs need to know what resources are available and how to utilize them, such as our search tool. Reflection refers to iteratively enhancing a result, which may involve numerous LLMs critiquing and giving relevant recommendations to guide that form of editing cycle. Multi-agent communication is characterized by each agent playing a part and providing a distinct prompt that describes their prompt. Finally, there is a memory that keeps track of progress and results throughout numerous phases. Some of these features are associated with LLM, such as function calling for tool use, but many of them are implemented outside of LLM by the framework in which they operate.

LangGraph supports Cyclic Graphs.

At the far left, there is an early study on AI Agents called ReAct. Self-improvement cues are shown in the middle of the screen. The Alpha Coding paper, located on the right, demonstrates how to create a coding agent using flow engineering. Their overlap is that they behave in a cyclical manner. To support these structures, Langchain produced LangGraph. In this course, you'll create an agent from scratch using only an LLM and Python.

Then we'll learn about LangGraph components by constructing the same agent with them. Because search tools are an essential component of many agent applications, you will learn about agentic search capabilities and how to use them.

Two more capabilities are useful for creating agents. The first requirement is the ability to receive human input. This allows you to guide an agent during vital moments. The second characteristic is persistence. This is the capacity to save the present state of information so that you can access it later. This is ideal for both debugging agents and producing them.

Building an Agent from Scratch

Agents can do relatively complex tasks. A basic agent is not difficult to construct. We must distinguish between jobs assigned to the LLM and those managed by the code surrounding the LLM (known as runtime).

ReAct pattern

The ReAct pattern will serve as the foundation for the agent that we will create from scratch. ReAct stands for Reasoning + Acting. LLM considers what to do first before deciding on a course of action. The activity is then carried out in an environment, and an observation is returned. Following that observation, the LLM repeats. LLM considers what to do again and decides on another action to take, continuing until it determines that it is complete. 

import openai
import re
import httpx
import os
from dotenv import load_dotenv

_ = load_dotenv()
from openai import OpenAI

client = OpenAI()

chat_completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello world"}]
)

print(chat_completion.choices[0].message.content) # Hello! How can I assist you today?

class Agent:
def __init__(self, system=""):
self.system = system
self.messages = []
if self.system:
self.messages.append({"role": "system", "content": system})

def __call__(self, message):
self.messages.append({"role": "user", "content": message})
result = self.execute()
self.messages.append({"role": "assistant", "content": result})
return result

def execute(self):
completion = client.chat.completions.create(
model="gpt-4o",
temperature=0,
messages=self.messages)
return completion.choices[0].message.content

prompt = """
You run in a loop of Thought, Action, PAUSE, Observation.
At the end of the loop you output an Answer
Use Thought to describe your thoughts about the question you have been asked.
Use Action to run one of the actions available to you - then return PAUSE.
Observation will be the result of running those actions.

Your available actions are:

calculate:
e.g. calculate: 4 * 7 / 3
Runs a calculation and returns the number - uses Python so be sure to use floating point syntax if necessary

average_dog_weight:
e.g. average_dog_weight: Collie
returns average weight of a dog when given the breed

Example session:

Question: How much does a Bulldog weigh?
Thought: I should look the dogs weight using average_dog_weight
Action: average_dog_weight: Bulldog
PAUSE

You will be called again with this:

Observation: A Bulldog weights 51 lbs

You then output:

Answer: A bulldog weights 51 lbs
"""
.strip()

def calculate(what):
return eval(what)

def average_dog_weight(name):
if name in "Scottish Terrier":
return("Scottish Terriers average 20 lbs")
elif name in "Border Collie":
return("a Border Collies average weight is 37 lbs")
elif name in "Toy Poodle":
return("a toy poodles average weight is 7 lbs")
else:
return("An average dog weights 50 lbs")

known_actions = {
"calculate": calculate,
"average_dog_weight": average_dog_weight
}

abot = Agent(prompt)

result = abot("How much does a toy poodle weigh?")
print(result)
result = average_dog_weight("Toy Poodle")

print(result) # a toy poodles average weight is 7 lbs

next_prompt = "Observation: {}".format(result)

print(abot(next_prompt)) # Answer: A Toy Poodle weighs 7 lbs.

print(abot.messages)
abot = Agent(prompt) # Reinitializing to clear memory
question = """I have 2 dogs, a border collie and a scottish terrier. \
What is their combined weight"""

print(abot(question))
# Thought: To find the combined weight of a Border Collie and a Scottish
# Terrier, I need to first find the average weight of each breed and then add those weights together.
# I'll start by finding the average weight of a Border Collie.\n\nAction: average_dog_weight: Border Collie\nPAUSE

next_prompt = "Observation: {}".format(average_dog_weight("Border Collie"))
print(next_prompt) # Observation: a Border Collies average weight is 37 lbs
print(abot(next_prompt))
# Thought: Now that I know a Border Collie's average weight is 37 lbs, I need to find the average weight of a
# Scottish Terrier tocalculate the combined weight. \n\nAction: average_dog_weight: Scottish Terrier\nPAUSE

next_prompt = "Observation: {}".format(average_dog_weight("Scottish Terrier"))
print(next_prompt) # Observation: Scottish Terriers average 20 lbs
print(abot(next_prompt))
# Thought: With the average weight of a Border Collie being 37 lbs and a
# Scottish Terrier being 20 lbs, I can now calculate their combined weight.\n\nAction: calculate: 37+20\nPAUSE

next_prompt = "Observation: {}".format(eval("37 + 20"))
print(next_prompt) # Observation: 57
print(abot(next_prompt)) # Answer: The combined weight of a Border Collie and a Scottish Terrier is 57 lbs.

Add Loop

To automate it, we added the aforementioned code to a loop. We will write a regex to look for the action string. This will allow us to parse the LLM's response and decide if we want to take action or accept the final conclusion. 

action_re = re.compile('^Action: (\w+): (.*)$')   # python regular expression to selection action

def query(question, max_turns=5):
i = 0
bot = Agent(prompt)
next_prompt = question
while i < max_turns:
i += 1
result = bot(next_prompt)
print(result)
actions = [
action_re.match(a)
for a in result.split('\n')
if action_re.match(a)
]
if actions:
# There is an action to run
action, action_input = actions[0].groups()
if action not in known_actions:
raise Exception("Unknown action: {}: {}".format(action, action_input))
print(" -- running {} {}".format(action, action_input))
observation = known_actions[action](action_input)
print("Observation:", observation)
next_prompt = "Observation: {}".format(observation)
else:
return

question = """I have 2 dogs, a border collie and a scottish terrier. \
What is their combined weight"""

query(question)

LangGraph Components

We received a user message and had the system message, therefore we phoned LLM with them. LLM produced a thought and an action. Based on it, we decided to either return or call a tool. We placed all of this into a loop. We have two tools: calculate() and average_dog_weight(). If we called the tool, we would receive an observation. Next, we loop and re-input this observation before restarting.

Let’s break this down into LangChain components.

Langchain Prompts

Prompt templates allow reusable prompts. 

from langchain.prompts import PromptTemplate prompt_template = PromptTemplate.from_template("Tell me a {adjective} joke about {content}.")

There are also prompts for agents available in the hub: 

prompt = hub.pull(“hwchase17/react”)

LangChain Tools

# get a tool from the library
from langchain_community.tools.tavily_search import TavilySearchResults
tool = TavilySearchResults(max_results=2)

self.tools = {t.name: t for t in tools}
self.model = model.bind_tools([tool])

New Structures in LangGraph

LangGraph allows you to describe and orchestrate the control flow. It specifically allows you to generate cyclic graphs. It also includes built-in persistence. It is useful to be able to hold many discussions at the same time or recall past iterations and steps. Persistence also allows useful human-in-the-loop capabilities.

LangGraph is a LangChain extension that supports graphs. LangGraph provides for highly regulated "flows". Graphs are used to describe and portray both single and multi-agent flows. 

Graphs consist of nodes that represent agents or functions, edges that connect nodes, and conditional edges that represent decisions.

Data/State

State is one of the most significant aspects of LangGraph that is tracked over time. The agent state is accessible to all portions of the graph. The agent's state is local to the graph and can be stored in a persistence layer. This is often referred to as the agent state.

Simple State

Simple Agent State is made up of a list of BaseMessage sequences. The "operator.add" method adds new messages to the state rather than overwriting old ones.

class AgentState(TypeDict):
messages: Annotated[Sequence[BaseMessage], operator.add]

Complex State

Some of these parameters are not annotated, so when a new update is published to that variable, it overrides the old value; however, intermediate steps are annotated with "operator.add," which implies they are added when a new update is pushed.

class AgentState(TypeDict):
input: str
chat_history: list[BaseMessage]
agent_outcome: Union[AgentAction, AgentFinish, None]
intermediate_steps: Annotated[list[tuple[AgentAction, str], operator.add]

Code


LLM: call_openai, c_edge: exists_action, action:take_action

State

class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]

Let’s implement:

Tavily is a search engine that we will use as a tool.

from dotenv import load_dotenv
_ = load_dotenv()

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

tool = TavilySearchResults(max_results=4) #increased number of results
print(type(tool))
print(tool.name) # <class 'langchain_community.tools.tavily_search.tool.TavilySearchResults'> tavily_search_results_json

class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]

Note: in take_action below, some logic was added to cover the case that the LLM returned a non-existent tool name. Even with function calls, LLMs can still occasionally hallucinate. Note that all that is done is instructing the LLM to try again! An advantage of an agentic organization.

class Agent:

def __init__(self, model, tools, system=""):
self.system = system # system_message
graph = StateGraph(AgentState) # graph canvas that has nothing in it
graph.add_node("llm", self.call_openai)
graph.add_node("action", self.take_action)
graph.add_conditional_edges(
"llm", # node where the edge starts
self.exists_action, # function tells us where to go after that
{True: "action", False: END} # how to map the response of the fnction to the next node to go to
)
graph.add_edge("action", "llm") # regular edge
graph.set_entry_point("llm")
self.graph = graph.compile()
self.tools = {t.name: t for t in tools}
self.model = model.bind_tools(tools)

def exists_action(self, state: AgentState):
result = state['messages'][-1]
return len(result.tool_calls) > 0

def call_openai(self, state: AgentState):
messages = state['messages']
if self.system:
messages = [SystemMessage(content=self.system)] + messages
message = self.model.invoke(messages)
return {'messages': [message]}

def take_action(self, state: AgentState):
tool_calls = state['messages'][-1].tool_calls
results = []
for t in tool_calls:
print(f"Calling: {t}")
if not t['name'] in self.tools: # check for bad tool name from LLM
print("\n ....bad tool name....")
result = "bad tool name, retry" # instruct LLM to retry if bad
else:
result = self.tools[t['name']].invoke(t['args'])
results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
print("Back to the model!")
return {'messages': results}

prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""


model = ChatOpenAI(model="gpt-3.5-turbo") #reduce inference cost
abot = Agent(model, [tool], system=prompt)

messages = [HumanMessage(content="What is the weather in sf?")]
result = abot.graph.invoke({"messages": messages})

print(result)
print(result['messages'][-1].content)
messages = [HumanMessage(content="What is the weather in SF and LA?")]
result = abot.graph.invoke({"messages": messages})
print(result['messages'][-1].content)
# Note, the query was modified to produce more consistent results. 
# Results may vary per run and over time as search information and models change.

query = "Who won the super bowl in 2024? In what state is the winning team headquarters located? \
What is the GDP of that state? Answer each question."

messages = [HumanMessage(content=query)]

model = ChatOpenAI(model="gpt-4o") # requires more advanced model
abot = Agent(model, [tool], system=prompt)
result = abot.graph.invoke({"messages": messages})
print(result['messages'][-1].content)

Agentic Search Tools

Let's see how agentic search differs from conventional search and how to use it.

Why search tool

The agent receives the prompt and decides to call the search tool. The collected information is then returned to the agent.

Inside a search tool

The above is a very simple search tool implementation. If the agent decides to send the query to the search tool, the first step would be to comprehend the question and divide it into sub-questions as appropriate. This is a critical step since it can handle more sophisticated queries. The search tool will then have to choose the best source for each subquery by selecting from numerous integrations. The job does not finish with locating the correct source. The search tool would then need to retrieve only the relevant information for the subquery. A basic implementation of this involves chunking the data and performing a short vector search to extract the top-k values.

# libraries
from dotenv import load_dotenv
import os
from tavily import TavilyClient

# load environment variables from .env file
_ = load_dotenv()

# connect
client = TavilyClient(api_key=os.environ.get("TAVILY_API_KEY"))

# run search
result = client.search("What is in Nvidia's new Blackwell GPU?",
include_answer=True)

# print the answer
result["answer"]
'The new Nvidia Blackwell GPU architecture, specifically the Blackwell B200 GPU, is designed to power the next generation of AI supercomputers. It is expected to deliver up to 20 petaflops of compute performance and potentially more than quadruple the performance of its predecessor. Additionally, the Blackwell platform is aimed at enabling organizations to build and run real-time generative AI on trillion-parameter large language models at a significantly lower cost and energy consumption compared to its predecessor.'

Regular Search

# choose location (try to change to your own city!)

city = "San Francisco"

query = f"""
what is the current weather in {city}?
Should I travel there today?
"weather.com"
"""

Note: The search has been updated to return expected results in the event of an exception. High levels of student traffic can result in rate limit exceptions.

import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
import re

ddg = DDGS()

def search(query, max_results=6):
try:
results = ddg.text(query, max_results=max_results)
return [i["href"] for i in results]
except Exception as e:
print(f"returning previous results due to exception reaching ddg.")
results = [ # cover case where DDG rate limits due to high deeplearning.ai volume
"https://weather.com/weather/today/l/USCA0987:1:US",
"https://weather.com/weather/hourbyhour/l/54f9d8baac32496f6b5497b4bf7a277c3e2e6cc5625de69680e6169e7e38e9a8",
]
return results


for i in search(query):
print(i)
# https://weather.com/weather/tenday/l/San Francisco CA USCA0987:1:US
# https://weather.com/storms/hurricane/news/2024-07-05-hurricane-beryl-forecast-mexico-texas

def scrape_weather_info(url):
"""Scrape content from the given URL"""
if not url:
return "Weather information could not be found."

# fetch data
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers)
if response.status_code != 200:
return "Failed to retrieve the webpage."

# parse result
soup = BeautifulSoup(response.text, 'html.parser')
return soup
# use DuckDuckGo to find websites and take the first result
url = search(query)[0]

# scrape first wesbsite
soup = scrape_weather_info(url)

print(f"Website: {url}\n\n") # Website: https://weather.com/weather/today/l/San+Francisco+CA+USCA0987:1:US
print(str(soup.body)[:50000]) # limit long outputs

recents Specialty Forecasts San Francisco, CA Today's Forecast for San Francisco, CA Morning Afternoon Evening Overnight Weather Today in San Francisco, CA 5:58 am 8:32 pm Don't Miss Hourly Forecast Now 1 pm 2 pm 3 pm 4 pm Outside That's Not What Was Expected Daily Forecast Today Sun 14 Mon 15 Tue 16 Wed 17 Radar We Love Our Critters Summer Skin Essentials Home, Garage & Garden The Real Toll Of Extreme Heat Health News For You Weather in your inbox Your local forecast, plus daily trivia, stunning photos and our meteorologists’ top picks. All in one place, every weekday morning. By signing up, you're opting in to receive the Morning Brief email newsletter. To manage your data, visit Data Rights . Terms of Use | Privacy Policy Stay Safe Fact Versus Fiction Air Quality Index Air quality is considered satisfactory, and air pollution poses little or no risk. Health & Activities Seasonal Allergies and Pollen Count Forecast Grass pollen is low in your area Cold & Flu Forecast Flu risk is low in your area We recognize our responsibility to use data and technology for good. We may use or share your data with our data vendors. Take control of your data. © The Weather Company, LLC 2024

Agentic Search

# run search
result = client.search(query, max_results=1)

# print first result
data = result["results"][0]["content"]

print(data) # Weather.com brings you the most accurate monthly weather forecast for San Francisco, ... 13 69 ° 54 ° 14. 72 ° 54 ° 15 ... July: 66 ° 54 ° 0: August: 68 ° 56 ...

import json
from pygments import highlight, lexers, formatters

# parse JSON
parsed_json = json.loads(data.replace("'", '"'))

# pretty print JSON with syntax highlighting
formatted_json = json.dumps(parsed_json, indent=4)
colorful_json = highlight(formatted_json,
lexers.JsonLexer(),
formatters.TerminalFormatter())

print(colorful_json)

Persistence and Streaming

Agents typically work on longer-term projects. Persistence and streaming are the two most critical types of jobs for these types of projects.

Persistence allows you to retain track of an agent's condition at a specific point in time. This allows us to return to that state and continue from there in the future directions. This is critical for long-term applications.

Similarly, with streaming, you can send out a set of signals indicating what is going on at that same instant. So, for long-running apps, you can see exactly what the agent is doing. We might stream individual communications. This would be the AI message determining which action to execute, as well as the observation message representing the outcome of that action. We might stream tokens. We could want to stream the output from each token of the LLM call.

A checkpointer merely checks the state after and between each node. To add persistence to this agent, we will utilize SQLiteSaver. You can also connect to an external database, such as Redis or Postgres. 

Thread configuration will be utilized to maintain track of the many tracks within the persistent checkpointer. This will allow us to hold many discussions concurrently.

from dotenv import load_dotenv

_ = load_dotenv()

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

tool = TavilySearchResults(max_results=2)

class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]

from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")

class Agent:
def __init__(self, model, tools, checkpointer, system=""):
self.system = system
graph = StateGraph(AgentState)
graph.add_node("llm", self.call_openai)
graph.add_node("action", self.take_action)
graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
graph.add_edge("action", "llm")
graph.set_entry_point("llm")
self.graph = graph.compile(checkpointer=checkpointer)
self.tools = {t.name: t for t in tools}
self.model = model.bind_tools(tools)

def call_openai(self, state: AgentState):
messages = state['messages']
if self.system:
messages = [SystemMessage(content=self.system)] + messages
message = self.model.invoke(messages)
return {'messages': [message]}

def exists_action(self, state: AgentState):
result = state['messages'][-1]
return len(result.tool_calls) > 0

def take_action(self, state: AgentState):
tool_calls = state['messages'][-1].tool_calls
results = []
for t in tool_calls:
print(f"Calling: {t}")
result = self.tools[t['name']].invoke(t['args'])
results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
print("Back to the model!")
return {'messages': results}

prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""

model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

messages = [HumanMessage(content="What is the weather in sf?")]

thread = {"configurable": {"thread_id": "1"}}

for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v['messages'])

messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)

messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)

messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)

Streaming Tokens

You can also use async checkpoints and events.

from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
# Empty content in the context of OpenAI means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(content, end="|")

Human in the Loop

In many circumstances, we want to retain the human in the loop to keep track of what the agent is doing. This is simple to perform with LangGraph.

When you call get_state() or get_state_history(), more state information is kept in memory and shown.

The state is now stored at every state transition, as opposed to only at an interrupt or at the end. These alter the command output significantly but are a useful addition to the information available.

from dotenv import load_dotenv
_ = load_dotenv()

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

from uuid import uuid4
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, AIMessage

"""
In previous examples we've annotated the `messages` state key
with the default `operator.add` or `+` reducer, which always
appends new messages to the end of the existing messages array.

Now, to support replacing existing messages, we annotate the
`messages` key with a customer reducer function, which replaces
messages with the same `id`, and appends them otherwise.
"""

def reduce_messages(left: list[AnyMessage], right: list[AnyMessage]) -> list[AnyMessage]:
# assign ids to messages that don't have them
for message in right:
if not message.id:
message.id = str(uuid4())
# merge the new messages with the existing messages
merged = left.copy()
for message in right:
for i, existing in enumerate(merged):
# replace any existing messages with the same id
if existing.id == message.id:
merged[i] = message
break
else:
# append any new messages to the end
merged.append(message)
return merged

class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], reduce_messages]

tool = TavilySearchResults(max_results=2)

Manual human approval

When we compile the graph, instead of passing the checkpointer, we will additionally pass this interrupt before the equals action argument. We will add an interrupt before calling the action node. The action node is where we refer to the tools. We are doing this because we plan to introduce something that requires manual approval before running any tools. Sometimes you just want to interrupt when a specific tool is invoked. 

class Agent:
def __init__(self, model, tools, system="", checkpointer=None):
self.system = system
graph = StateGraph(AgentState)
graph.add_node("llm", self.call_openai)
graph.add_node("action", self.take_action)
graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
graph.add_edge("action", "llm")
graph.set_entry_point("llm")
self.graph = graph.compile(
checkpointer=checkpointer,
interrupt_before=["action"]
)
self.tools = {t.name: t for t in tools}
self.model = model.bind_tools(tools)

def call_openai(self, state: AgentState):
messages = state['messages']
if self.system:
messages = [SystemMessage(content=self.system)] + messages
message = self.model.invoke(messages)
return {'messages': [message]}

def exists_action(self, state: AgentState):
print(state)
result = state['messages'][-1]
return len(result.tool_calls) > 0

def take_action(self, state: AgentState):
tool_calls = state['messages'][-1].tool_calls
results = []
for t in tool_calls:
print(f"Calling: {t}")
result = self.tools[t['name']].invoke(t['args'])
results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
print("Back to the model!")
return {'messages': results}

prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""

model = ChatOpenAI(model="gpt-3.5-turbo")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

messages = [HumanMessage(content="Whats the weather in SF?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)

abot.graph.get_state(thread)

# The next node
abot.graph.get_state(thread).next # ('action', )

Continuing after interrupt

To proceed, we can call stream with the same thread configuration but pass in none as the input. This will stream results, and we will receive the tool message after invoking the tool.

for event in abot.graph.stream(None, thread):
for v in event.values():
print(v)

abot.graph.get_state(thread)

abot.graph.get_state(thread).next

messages = [HumanMessage("Whats the weather in LA?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)
while abot.graph.get_state(thread).next:
print("\n", abot.graph.get_state(thread),"\n")
_input = input("proceed?")
if _input != "y":
print("aborting")
break
for event in abot.graph.stream(None, thread):
for v in event.values():
print(v)

State Memory

As a graph runs, a snapshot of each state is saved in memory. AgentState is present, along with a thread and a unique identifier for each snapshot. You can access the snapshots using these thread identifiers.

StateSnapshot: {AgentState, useful_things}

Some commands can access memory, such as g.get_state(thread). If you offer the thread without a unique identity and simply return the thread ID, it will return the current state. There is also the get state history method, which provides an iterator with all of the state snapshots. The iterator allows you to retrieve all of the unique identifiers for each state. Given the thread identifier, or thread TS, which is the unique identifier, you may, for example, retrieve the first state, state one, and utilize it in an invoke command. This will set state one as the current state or starting point for the rest of the graph. This is essentially time travel.

g.invoke(None, {thread, thread_ts})
g.stream(None, {thread, thread_ts})

Modify State

Run until the interrupt and then modify the state.

messages = [HumanMessage("Whats the weather in LA?")]
thread = {"configurable": {"thread_id": "3"}}
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)

abot.graph.get_state(thread)

current_values = abot.graph.get_state(thread)

current_values.values['messages'][-1]

current_values.values['messages'][-1].tool_calls

# Tool calls associated with message. Does not actually work untill we update state on the graph.
_id = current_values.values['messages'][-1].tool_calls[0]['id']
current_values.values['messages'][-1].tool_calls = [
{'name': 'tavily_search_results_json',
'args': {'query': 'current weather in Louisiana'},
'id': _id}
]

abot.graph.update_state(thread, current_values.values)

abot.graph.get_state(thread)

for event in abot.graph.stream(None, thread):
for v in event.values():
print(v)

Time Travel

states = []
for state in abot.graph.get_state_history(thread):
print(state)
print('--')
states.append(state)

To get the same state as shot, modify the offset below to -3 from -1. This accounts for the initial state __start__ and the first state, which is now saved in state memory using the most recent software version.

to_replay = states[-3]

to_replay

for event in abot.graph.stream(None, to_replay.config):
for k, v in event.items():
print(v)

Go back in time and edit

to_replay

_id = to_replay.values['messages'][-1].tool_calls[0]['id']
to_replay.values['messages'][-1].tool_calls = [{'name': 'tavily_search_results_json',
'args': {'query': 'current weather in LA, accuweather'},
'id': _id}]

branch_state = abot.graph.update_state(to_replay.config, to_replay.values)

for event in abot.graph.stream(None, branch_state):
for k, v in event.items():
if k != "__end__":
print(v)

Add a message to a state at a given time

to_replay
_id = to_replay.values['messages'][-1].tool_calls[0]['id']
state_update = {"messages": [ToolMessage(
tool_call_id=_id,
name="tavily_search_results_json",
content="54 degree celcius",
)]}
branch_and_add = abot.graph.update_state(
to_replay.config,
state_update,
as_node="action")
for event in abot.graph.stream(None, branch_and_add):
for k, v in event.items():
print(v)

Extra Practice

Build a small graph

This is a small, simple graph that you may experiment with to gain greater insight into manipulating state memory.

from dotenv import load_dotenv
_ = load_dotenv()

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langgraph.checkpoint.sqlite import SqliteSaver
Create a simple two-node graph with the following state: -lnode: Last node. -scratch is a scratchpad location. -count: a counter that increments with each step.

class AgentState(TypedDict):

    lnode: str
scratch: str
count: Annotated[int, operator.add]
def node1(state: AgentState):
print(f"node1, count:{state['count']}")
return {"lnode": "node_1",
"count": 1,
}
def node2(state: AgentState):
print(f"node2, count:{state['count']}")
return {"lnode": "node_2",
"count": 1,
}

The graph goes N1->N2->N1… but breaks after count reaches 3.

def should_continue(state):
return state["count"] < 3

builder = StateGraph(AgentState)
builder.add_node("Node1", node1)
builder.add_node("Node2", node2)

builder.add_edge("Node1", "Node2")
builder.add_conditional_edges("Node2",
should_continue,
{True: "Node1", False: END})
builder.set_entry_point("Node1")

memory = SqliteSaver.from_conn_string(":memory:")
graph = builder.compile(checkpointer=memory)

Now we can set the thread and run:

thread = {"configurable": {"thread_id": str(1)}}
graph.invoke({"count":0, "scratch":"hi"},thread)

Looking at the current state

Get the current state. Take note of the values, which represent the agentState. Take note of both the configuration and the thread_ts. You will use these to refer to the snapshots below.

graph.get_state(thread)

View all state snapshots in memory. You can use the displayed count agent state variable to keep track of what you see. Notice that the iterator returns the most recent photos first. Also, keep in mind that the metadata includes a useful step variable that records the number of steps taken throughout graph execution. This is a bit detailed, but you can see that the parent_config represents the prior node's configuration. During initial startup, extra states are placed into memory to form a parent. This is something to consider if you branch or time travel below.

Look at state history

for state in graph.get_state_history(thread):
print(state, "\n")

Store only the configuration in a list. Take note of the sequence of counts on the right. get_state_history retrieves the most recent photos first.

states = []
for state in graph.get_state_history(thread):
states.append(state.config)
print(state.config, state.values['count'])

Grab an early state.

states[-3]

This is the state after Node1 is completed for the first time. Note next is Node2and count is 1.

graph.get_state(states[-3])

Go Back in Time

Use that state in invoke to travel back in time. Notice that it uses states[-3] as current_state and proceeds to node2.

graph.invoke(None, states[-3])

Notice the new states are now in state history. Notice the counts on the far right.

thread = {"configurable": {"thread_id": str(1)}}
for state in graph.get_state_history(thread):
print(state.config, state.values['count'])

Details can be found below. Lots much text, but attempt to locate the node that initiates the new branch. Notice that the parent configuration is not the preceding element in the stack, but rather the entry from state[-3].

thread = {"configurable": {"thread_id": str(1)}}
for state in graph.get_state_history(thread):
print(state,"\n")

Modify State

Let’s start by starting a fresh thread and running to clean out history.

thread2 = {"configurable": {"thread_id": str(2)}}
graph.invoke({"count":0, "scratch":"hi"},thread2)

from IPython.display import Imag
Image(graph.get_graph().draw_png())

states2 = []
for state in graph.get_state_history(thread2):
states2.append(state.config)
print(state.config, state.values['count'])

Start by grabbing a state.

save_state = graph.get_state(states2[-3])
save_state

Now, adjust the values. One subtle point to remember is that when the agent state was specified, the count operator was utilized. Add indicates that values are added to the current value. Instead of replacing the current count value, -3 is added to it.

save_state.values["count"] = -3
save_state.values["scratch"] = "hello"
save_state

Now update the state. This creates a new entry at the top or the latest entry in memory. This will become the current state.

graph.update_state(thread2,save_state.values)

The current state is at the top. You can match the thread_ts. Notice the parent_config, thread_ts of the new node - it is the previous node.

for i, state in enumerate(graph.get_state_history(thread2)):
if i >= 3: #print latest 3
break
print(state, '\n')

Try again with as_node

When writing with update_state(), you want to tell the graph logic which node should be assumed to be the writer. This enables the graph logic to find the node on the graph. After writing the data, the next() value is calculated by traversing the graph with the new state. In this scenario, Node1 wrote the current status. The graph may then compute the next state, which is Node2. Take note that in some graphs, this may require passing through conditional edges! Let us try this out.

graph.update_state(thread2,save_state.values, as_node="Node1")

for i, state in enumerate(graph.get_state_history(thread2)):
if i >= 3: #print latest 3
break
print(state, '\n')

invoke will run from the current state if not given a particular thread_ts. This is now the entry that was just added.

graph.invoke(None,thread2)

Print out the state history, notice the scratch value change on the latest entries.

for state in graph.get_state_history(thread2):
print(state,"\n")

Continue to experiment.

Essay Writer

We will create a tiny version of an AI researcher, specifically an essay writer.

plan: You are an expert writer tasked with writing a high level outline of an essay. Write such an outline for the user provided topic. Give an outline of the essay along with any relevant notes or instructions for the sections.
research_plan: You are a researcher charged with providing information that can be used when writing the following essay. Generate a list of search queries that will gather any relevant information. Only generate 3 queries max.
generate: You are an essay assistant tasked with writing excellent 5-paragraph essays. Generate … if the user provides critique , respond with a revised version of your previous attempts. Utilize information:
from dotenv import load_dotenv
_ = load_dotenv()

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, AIMessage, ChatMessage
memory = SqliteSaver.from_conn_string(":memory:")

class AgentState(TypedDict):
task: str
plan: str
draft: str
critique: str
content: List[str]
revision_number: int
max_revisions: int

from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

PLAN_PROMPT = """You are an expert writer tasked with writing a high level outline of an essay. \
Write such an outline for the user provided topic. Give an outline of the essay along with any relevant notes \
or instructions for the sections."""


WRITER_PROMPT = """You are an essay assistant tasked with writing excellent 5-paragraph essays.\
Generate the best essay possible for the user's request and the initial outline. \
If the user provides critique, respond with a revised version of your previous attempts. \
Utilize all the information below as needed:
------
{content}"""


REFLECTION_PROMPT = """You are a teacher grading an essay submission. \
Generate critique and recommendations for the user's submission. \
Provide detailed recommendations, including requests for length, depth, style, etc."""


RESEARCH_PLAN_PROMPT = """You are a researcher charged with providing information that can \
be used when writing the following essay. Generate a list of search queries that will gather \
any relevant information. Only generate 3 queries max."""


RESEARCH_CRITIQUE_PROMPT = """You are a researcher charged with providing information that can \
be used when making any requested revisions (as outlined below). \
Generate a list of search queries that will gather any relevant information. Only generate 3 queries max."""


from langchain_core.pydantic_v1 import BaseModel
class Queries(BaseModel):
queries: List[str]

from tavily import TavilyClient
import os
tavily = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])

def plan_node(state: AgentState):
messages = [
SystemMessage(content=PLAN_PROMPT),
HumanMessage(content=state['task'])
]
response = model.invoke(messages)
return {"plan": response.content}

def research_plan_node(state: AgentState):
queries = model.with_structured_output(Queries).invoke([
SystemMessage(content=RESEARCH_PLAN_PROMPT),
HumanMessage(content=state['task'])
])
content = state['content'] or []
for q in queries.queries:
response = tavily.search(query=q, max_results=2)
for r in response['results']:
content.append(r['content'])
return {"content": content}

def generation_node(state: AgentState):
content = "\n\n".join(state['content'] or [])
user_message = HumanMessage(
content=f"{state['task']}\n\nHere is my plan:\n\n{state['plan']}")
messages = [
SystemMessage(
content=WRITER_PROMPT.format(content=content)
),
user_message
]
response = model.invoke(messages)
return {
"draft": response.content,
"revision_number": state.get("revision_number", 1) + 1
}

def reflection_node(state: AgentState):
messages = [
SystemMessage(content=REFLECTION_PROMPT),
HumanMessage(content=state['draft'])
]
response = model.invoke(messages)
return {"critique": response.content}

def research_critique_node(state: AgentState):
queries = model.with_structured_output(Queries).invoke([
SystemMessage(content=RESEARCH_CRITIQUE_PROMPT),
HumanMessage(content=state['critique'])
])
content = state['content'] or []
for q in queries.queries:
response = tavily.search(query=q, max_results=2)
for r in response['results']:
content.append(r['content'])
return {"content": content}

def should_continue(state):
if state["revision_number"] > state["max_revisions"]:
return END
return "reflect"

builder = StateGraph(AgentState)

builder.add_node("planner", plan_node)
builder.add_node("generate", generation_node)
builder.add_node("reflect", reflection_node)
builder.add_node("research_plan", research_plan_node)
builder.add_node("research_critique", research_critique_node)

builder.set_entry_point("planner")

builder.add_conditional_edges(
"generate",
should_continue,
{END: END, "reflect": "reflect"}
)

builder.add_edge("planner", "research_plan")
builder.add_edge("research_plan", "generate")
builder.add_edge("reflect", "research_critique")
builder.add_edge("research_critique", "generate")

graph = builder.compile(checkpointer=memory)

from IPython.display import Image
Image(graph.get_graph().draw_png())

thread = {"configurable": {"thread_id": "1"}}
for s in graph.stream({
'task': "what is the difference between langchain and langsmith",
"max_revisions": 2,
"revision_number": 1,
}, thread):
print(s)

Essay Writer Interface

import warnings
warnings.filterwarnings("ignore")
from helper import ewriter, writer_gui

MultiAgent = ewriter()
app = writer_gui(MultiAgent.graph)
app.launch()

LangChain Resources

Langchain documentation: https://python.langchain.com/v0.2/docs/introduction/

LangServe: https://python.langchain.com/v0.1/docs/langserve/

LangSmith: https://docs.smith.langchain.com/

LangGraph: https://langchain-ai.github.io/langgraph/

Conclusion

Let's look at some agent flows that we couldn't construct in this course but should be familiar with.

The Multi-agent Architecture

Multi-agent architecture

We covered this agent with the writer's agent. Multi-agent architecture involves numerous agents working on the same shared state. These agents could simply be prompts and linguistic models, as they were in the writer. They could also refer to different tools as this. That could include prompts, linguistic models, and tools. They could have loops inside them. All agents operate in the same common state. As a result, agents pass the same shared state from one to the next.

The Supervisor Architecture

In this design, we have a supervisor who is communicating with some sub-agents. The supervisor will determine what input that agent will receive. And those agents, like graphs, can have different states within them. Furthermore, the idea of a shared state may differ. Aside than that, the supervisor design is very similar to the multi-agent architecture. The only important distinction is that one supervisor is in control of the worker agents. You may also wish to utilize a better agent as a supervisor because planning demands a high level of intelligence.

Flow Engineering Architecture

This design is described in the AlphaCodium publication, where researchers obtain cutting-edge coding performance using a graphical method such as this pipeline-like structure with a loop.

The flow engineering architecture includes loops for the original code solution, the public test iteration component, and the AI task iteration part. This is an interesting graph because the flow is very directed until a certain point. This is a bespoke architecture for a specific coding problem, and it often refers to determining the appropriate information flow for your agents to act and think.

Plan and Execute

In that vein, a frequent paradigm is to use a plan-and-execute style flow, in which you first conduct an explicit planning phase before beginning to execute the plan. So you might come up with a few steps that a sub-agent should take. It can then go do a comeback. You may amend the plan to reflect changes between variations. Then it moves on to the next thing and then returns until the plan is completed. Then you can see if the plan was completed or if you need to replan.

Language Agent Tree Search Architecture

This does a tree search on the state of probable actions. LATS first initiates an action. It then reflects and then descends based on that action, generating some further sub-actions to reflect on it. And from all of these thoughts, it may determine where in the tree of action states it wishes to return to. As a result, it can backpropagate and update parent nodes with new information, which may inform future directions based on the past state. Persistence is particularly significant in this case since it allows you to return to past states. 

LangGraph is designed to be very controllable, allowing you to build both cyclical and non-cyclical flows. This controllability is what sets LangGrpah apart from other frameworks.

Resource

[1] Deeplearning.ai, (2024), AI Agents in LangGraph:

[https://learn.deeplearning.ai/courses/ai-agents-in-langgraph/]

Post a Comment

0 Comments