面包屑图标 当前位置: 首页
AI资讯
热点详情

LangGraph知识图谱驱动供应链AI智能体智能升级

AI热点日报
AI热点日报时间:2026-06-29
热点解读

检索增强生成(RAG)正在改变大模型的应用方式,通过将外部知识注入大语言模型,使其不再依赖“死记硬背”来回答问题。典型的RAG流程是将文档或数据切分后生成向量嵌入,再利用相似度搜索获取相关信息,最后将其整合到LLM的上下文中。这种方法在处理非结构化文本时确实表现出色。 然而,现实世界中的企业数据远不

检索增强生成(RAG)正在改变大模型的应用方式,通过将外部知识注入大语言模型,使其不再依赖“死记硬背”来回答问题。典型的RAG流程是将文档或数据切分后生成向量嵌入,再利用相似度搜索获取相关信息,最后将其整合到LLM的上下文中。这种方法在处理非结构化文本时确实表现出色。

然而,现实世界中的企业数据远不止文本那么简单。数值属性、聚合计算、关系连接等任务,文本嵌入往往难以胜任。那么,应该如何应对呢?一个自然的思路是将知识图谱与结构化工具整合进来。下面,我们以供应商管理系统为例,详细讲解这套组合拳的具体应用。

数据链接:数据集

创建本文中使用的Neo4j实例的链接:图实例

理解RAG应用

一个典型的RAG应用主要包含三个核心步骤:

  • 向量存储/嵌入:将非结构化文本转换为向量表示。

  • 检索:通过相似性搜索找到最相关的文本片段。

  • 生成:让LLM根据这些检索到的片段生成回答。

但纯嵌入方法存在明显的局限性:一旦遇到结构化或关系型数据,例如数值过滤、排序、聚合等操作,其效果就会大打折扣。而这正是知识图谱的优势所在。

文本嵌入的局限性

文本嵌入在RAG中应用广泛,但并非万能。尤其在处理结构化数据时,其短板十分突出,直接影响查询的准确性和有效性。

过滤和排序

文本嵌入擅长理解语义,但天生不支持“过滤”或“排序”操作。例如,您想找出供应能力大于40,000的供应商,文本嵌入就无能为力——因为它不是为处理数值条件设计的,无法对某个字段直接应用“大于”或“小于”这样的操作。而在许多数据驱动的场景中,这些操作恰恰是刚需。

聚合

聚合操作同样面临挑战。比如询问“欧洲有多少供应商?”,这涉及计数或分组。文本嵌入本身不具备这种能力。它在语义搜索方面表现出色,但面对计数、求和、分组这类结构化任务,确实难以应对。

数据复杂性

生产环境中的数据关系远比“A和B有关联”复杂得多。嵌入关注的是语义含义,但很容易忽略实体之间更深层次的关系连接。例如,在知识图谱中,供应商、位置、产品等实体相互交织,需要同时理解实体本身及其关系。嵌入对这些复杂性往往力不从心,难以提取出真正基于关系的精准洞察。

为什么需要知识图谱

知识图谱是一种高效的数据建模方法——它并非使用孤立的表格或列表来呈现数据,而是通过节点(代表实体)和边(代表关系)来建模。这种方式非常适合还原现实世界中数据天然的互联特性。

例如,在商业场景中,供应商、位置、产品等实体可以分别建模为节点,并通过“供应”或“位于”这类关系将它们连接起来。这样一来,理解数据中不同元素之间的关系就变得直观多了。尤其是在需要理清复杂数据连接的任务中,这种结构更具优势。

存储结构化数据

知识图谱中的每个节点都可以拥有自己的属性,例如供应能力、名称、位置,用于描述这个实体。节点之间的关系也可以带有属性,从而支持更丰富的上下文。例如,一个供应商节点通过“位于”关系与一个位置节点相连,关系本身还可以携带城市、国家等属性。

使用Cypher查询

Neo4j是目前广受欢迎的图数据库,它使用Cypher——一种专为图设计的声明式查询语言——来操作图。用Cypher编写涉及过滤、聚合甚至路径匹配的复杂查询,会非常方便。例如,您想找到满足特定供应能力的供应商,或者查找两个地点之间的最短路径,都能轻松实现。

结合非结构化和结构化数据

知识图谱的一大优势是能够将结构化和非结构化数据融合在一起。您可以给节点添加嵌入属性,这样向量嵌入(代表非结构化数据,如文本描述)就能与图中的结构化数据共存。这意味着,您既可以进行语义搜索,例如“查找与原材料相关的供应商”,同时也能执行传统的结构化查询,例如“查找供应能力超过40,000的供应商”。这种将两类数据整合到一个数据库中的做法,为更全面的查询和更明智的决策打开了空间。

将知识图谱与语言模型结合

将知识图谱与大型语言模型结合,关键在于发挥各自的长处。LLM擅长理解复杂语言、解释上下文、生成连贯的类人回答。但在处理结构化查询,如过滤、排序、聚合时,它并非最优选择。此时,像Neo4j这样的知识图谱就能体现其价值——它专门处理结构化数据查询,既快又准。

将生成数据库查询(例如Neo4j的Cypher)这类任务交给专门设计的工具函数,整个应用会变得更加稳健、可靠。这种分工很合理:LLM专注于它擅长的自然语言理解和生成,知识图谱则高效处理结构化和基于关系的查询。最终,您将得到一个系统,能够无缝应对非结构化语言数据和结构化关系查询,兼得两者优势。

项目概述:供应商管理

我们将实现一个系统,用于存储、搜索和检索来自全球各地的供应商。每个供应商包含以下属性:

  • • 名称

  • • 位置

  • • 供应能力(数值)

  • • 简短描述

系统必须满足以下需求:

  • • 基于供应能力的过滤。

  • • 计数和分组。

  • • 基于描述的向量相似性搜索。

  • • 聚合(例如按位置分组)。

安装所需库

在笔记本或终端中,安装以下库:

!pip install --quiet neo4j pyvis langchain-community langchain-openai langgraph

简要说明每个库的用途:

  • neo4j:Neo4j的Python驱动,用于连接图数据库。

  • pyvis:用于可视化图结构,非必需但很方便。

  • langchain-community:LangChain相关的社区工具包。

  • langchain-openai:用于调用OpenAI的LLM和嵌入模型。

  • langgraph:用于构建带图逻辑的状态机,例如ReAct智能体。

配置环境变量

为了安全地连接Neo4j和OpenAI,建议将凭据存储在环境变量中。这样,代码可以访问敏感信息,而无需硬编码。设置方式如下:

import os

NEO4J_URI = "neo4j+s://.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = ""
OPENAI_API_KEY = ""

os.environ["NEO4J_URI"] = NEO4J_URI
os.environ["NEO4J_USERNAME"] = NEO4J_USER
os.environ["NEO4J_PASSWORD"] = NEO4J_PASSWORD
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

连接到Neo4j

需要连接到一个Neo4j实例,才能进行数据导入和查询。以下是实现连接的代码:

from langchain_community.graphs import Neo4jGraph

graph = Neo4jGraph(refresh_schema=False)

Neo4jGraph负责建立连接,并可根据需要选择是否刷新模式。

在Neo4j中构建知识图谱

使用CSV进行数据导入

准备两个CSV文件:

  1. 1. nodes.csv,包含id、location、name、description等列。

  2. 2. relationships.csv,包含START_ID、END_ID、TYPE等列。

以下是数据导入脚本:

import csv
import numpy as np
from neo4j import GraphDatabase

NODES_CSV = "nodes.csv"
RELATIONSHIPS_CSV = "relationships.csv"
def get_label_for_type(node_type):
    mapping = {
        "Supplier": "Supplier",
        "Manufacturer": "Manufacturer",
        "Distributor": "Distributor",
        "Retailer": "Retailer",
        "Product": "Product"
    }
    return mapping.get(node_type, "Entity")


def ingest_nodes(driver):
    with driver.session() as session:
        with open(NODES_CSV, mode='r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                node_id = row['id:ID']
                name = row['name']
                node_type = row['type']
                location = row['location']
                supply_capacity = np.random.randint(1000, 50001)
                description = row['description']
                label = get_label_for_type(node_type)
                if location.strip():
                    query = f"""
                    MERGE (n:{label} {{id:$id}})
                    SET n.name = $name, n.location = $location, 
                        n.description = $description, n.supply_capacity = $supply_capacity
                    """
                    params = {
                        "id": node_id,
                        "name": name,
                        "location": location,
                        "description": description,
                        "supply_capacity": supply_capacity
                    }
                else:
                    query = f"""
                    MERGE (n:{label} {{id:$id}})
                    SET n.name = $name
                    """
                    params = {"id": node_id, "name": name}
                session.run(query, params)


def ingest_relationships(driver):
    with driver.session() as session:
        with open(RELATIONSHIPS_CSV, mode='r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                start_id = row[':START_ID']
                end_id = row[':END_ID']
                rel_type = row[':TYPE']
                product = row['product']
                if product.strip():
                    query = f"""
                    MATCH (start {{id:$start_id}})
                    MATCH (end {{id:$end_id}})
                    MERGE (start)-[r:{rel_type} {{product:$product}}]->(end)
                    """
                    params = {
                        "start_id": start_id,
                        "end_id": end_id,
                        "product": product
                    }
                else:
                    query = f"""
                    MATCH (start {{id:$start_id}})
                    MATCH (end {{id:$end_id}})
                    MERGE (start)-[r:{rel_type}]->(end)
                    """
                    params = {
                        "start_id": start_id,
                        "end_id": end_id
                    }
                session.run(query, params)
  • ingest_nodes函数从CSV中读取供应商数据,并在Neo4j中创建节点。

  • • 每个节点代表一个供应商,包含名称、位置、供应能力等属性。

  • MERGE命令确保节点要么新建,要么更新,避免重复。

  • ingest_relationships函数从CSV中读取关系数据,并在节点之间建立连接。

  • • 在供应商和产品节点之间创建关系,关系可携带产品等属性。

  • MERGE命令同样保证关系的唯一性。

创建索引

在Neo4j中创建索引或约束,对性能和数据唯一性至关重要:

def create_indexes(driver):
    with driver.session() as session:
        for label in ["Supplier", "Manufacturer", "Distributor", "Retailer", "Product"]:
            session.run(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{label}) REQUIRE n.id IS UNIQUE")
  • • 这段代码为每种节点类型的id属性创建唯一约束,确保每个节点都有唯一标识符。

  • CREATE CONSTRAINT语句能优化查询性能,同时避免数据重复。

运行数据导入

下面将所有定义好的功能串联起来,启动数据导入流程:

driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
create_indexes(driver)
ingest_nodes(driver)
ingest_relationships(driver)
print("Data ingestion complete.")
driver.close()

导入完成后,您可以在Neo4j AuraDB实例中查看图的结构。

在Neo4j中存储向量嵌入

结构化数据已经处理完毕,但还需要一步:为描述字段添加文本嵌入。将向量嵌入存储到Neo4j中,就能实现语义查询——例如“查找与某文本描述最相似的供应商”。

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Neo4jVector

embedding = OpenAIEmbeddings(model="text-embedding-3-small")
neo4j_vector = Neo4jVector.from_existing_graph(
    embedding=embedding,
    index_name="supply_chain",
    node_label="Supplier",
    text_node_properties=["description"],
    embedding_node_property="embedding",
)

这段配置的含义是:

  • node_label="Supplier":嵌入仅存储到供应商节点上。

  • text_node_properties=["description"]:对“描述”属性进行嵌入。

  • embedding_node_property="embedding":生成的向量存储在该属性中。

现在,在Neo4j中,您既可以利用向量进行语义搜索,也能执行结构化查询,两者完美打通。

处理结构化查询的工具

为了处理计数、按数值过滤供应商等结构化查询,需要定义一些工具。每个工具本质上是一个函数,LLM可以调用它。使用Pydantic来指定每个工具的输入格式,既清晰又便于类型检查。

供应商计数工具

需要一个工具,根据可选过滤条件统计供应商数量,例如最小供应能力、最大供应能力,或者按某个属性进行分组。下面定义了输入结构,以及查询Neo4j并返回计数的函数。

输入架构

from pydantic import BaseModel, Field
from typing import Optional, Dict, List

class SupplierCountInput(BaseModel):
    min_supply_amount: Optional[int] = Field(
        description="Minimum supply amount of the suppliers"
    )
    max_supply_amount: Optional[int] = Field(
        description="Maximum supply amount of the suppliers"
    )
    grouping_key: Optional[str] = Field(
        description="The key to group by the aggregation", 
        enum=["supply_capacity", "location"]
    )

SupplierCountInput定义了工具的输入结构,包括用于按供应能力过滤的可选字段,以及按属性(例如位置)分组的能力。

函数实现

import re
from langchain_core.tools import tool

def extract_param_name(filter: str) -> str:
    pattern = r'$w+'
    match = re.search(pattern, filter)
    if match:
        return match.group()[1:]
    return None

@tool("supplier-count", args_schema=SupplierCountInput)
def supplier_count(
    min_supply_amount: Optional[int],
    max_supply_amount: Optional[int],
    grouping_key: Optional[str],
) -> List[Dict]:
    """Calculate the count of Suppliers based on particular filters"""
    filters = [
        ("t.supply_capacity >= $min_supply_amount", min_supply_amount),
        ("t.supply_capacity <= $max_supply_amount", max_supply_amount)
    ]
    params = {
        extract_param_name(condition): value
        for condition, value in filters
        if value is not None
    }
    where_clause = " AND ".join(
        [condition for condition, value in filters if value is not None]
    )
    cypher_statement = "MATCH (t:Supplier) "
    if where_clause:
        cypher_statement += f"WHERE {where_clause} "
    return_clause = (
        f"t.{grouping_key}, count(t) AS supplier_count" 
        if grouping_key 
        else "count(t) AS supplier_count"
    )
    cypher_statement += f"RETURN {return_clause}"
    print(cypher_statement)  # Debugging output
    return graph.query(cypher_statement, params=params)

工作原理

  • extract_param_name:辅助函数,从过滤条件中提取参数名(例如$min_supply_amount)。

  • supplier_count:核心工具函数,根据给定条件动态构建Cypher查询,统计供应商数量,并支持按属性分组。

  • • 查询是动态拼接的,按需添加筛选条件,然后在Neo4j中执行。

供应商列表工具

还需要一个工具,用于列出供应商。可以选择按供应能力排序,按能力过滤,如果提供了description,甚至可以进行向量搜索。

输入架构

class SupplierListInput(BaseModel):
    sort_by: str = Field(description="How to sort Suppliers by supply capacity", enum=['supply_capacity'])
    k: Optional[int] = Field(description="Number of Suppliers to return")
    description: Optional[str] = Field(description="Description of the Suppliers")
    min_supply_amount: Optional[int] = Field(description="Minimum supply amount of the suppliers")
    max_supply_amount: Optional[int] = Field(description="Maximum supply amount of the suppliers")

SupplierListInput定义了工具输入,包含按供应能力过滤、按指定键排序,以及可选的description字段用于语义搜索。

函数实现

@tool("supplier-list", args_schema=SupplierListInput)
def supplier_list(
    sort_by: str = "supply_capacity",
    k : int = 4,
    description: Optional[str] = None,
    min_supply_amount: Optional[int] = None,
    max_supply_amount: Optional[int] = None,
) -> List[Dict]:
    """List suppliers based on particular filters"""

    # Handle vector-only search when no prefiltering is applied
    if description and not min_supply_amount and not max_supply_amount:
        return neo4j_vector.similarity_search(description, k=k)
    filters = [
        ("t.supply_capacity >= $min_supply_amount", min_supply_amount),
        ("t.supply_capacity <= $max_supply_amount", max_supply_amount)
    ]
    params = {
        key.split("$")[1]: value for key, value in filters if value is not None
    }
    where_clause = " AND ".join([condition for condition, value in filters if value is not None])
    cypher_statement = "MATCH (t:Supplier) "
    if where_clause:
        cypher_statement += f"WHERE {where_clause} "
    # Sorting and returning
    cypher_statement += " RETURN t.name AS name, t.location AS location, t.description as description, t.supply_capacity AS supply_capacity ORDER BY "
    if description:
        cypher_statement += (
            "vector.similarity.cosine(t.embedding, $embedding) DESC "
        )
        params["embedding"] = embedding.embed_query(description)
    elif sort_by == "supply_capacity":
        cypher_statement += "t.supply_capacity DESC "
    else:
        # Fallback or other possible sorting
        cypher_statement += "t.year DESC "
    cypher_statement += " LIMIT toInteger($limit)"
    params["limit"] = k or 4
    print(cypher_statement)  # Debugging output
    data = graph.query(cypher_statement, params=params)
    return data

supplier_list工具会列出供应商,可以按供应能力过滤,也能按供应能力或其他标准排序。如果提供了描述,它会利用Neo4j中的向量存储进行相似性搜索。函数同样动态构建查询,然后在Neo4j中执行。

关键点

  • 向量搜索:如果只提供描述,则完全依赖Neo4j的向量存储进行搜索。

  • 结构化+向量结合:如果同时提供了其他过滤条件,则会构建一个Cypher查询,按向量相似性或供应能力排序。

  • 鲁棒性:查询构建逻辑经过严格编码,减少了出错的可能性,保持清晰。

集成LangChain和LangGraph

接下来,使用LangGraph创建一个ReAct风格的智能体。这个智能体能够自行判断何时调用supplier-countsupplier-list等工具。LangChain负责管理LLM的接口,LangGraph则定义智能体的整体流程。

构建智能体

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage


llm = ChatOpenAI(model='gpt-4-turbo')
tools = [supplier_count, supplier_list]
llm_with_tools = llm.bind_tools(tools)
sys_msg = SystemMessage(content="You are a helpful assistant tasked with finding and explaining relevant information about Supply chain")
  • llm_with_tools:将ChatOpenAI模型与工具绑定,使LLM在执行业务时能按需调用这些工具。

  • sys_msg:设置智能体的初始系统消息,为其明确角色。

使用LangGraph定义流程

定义了三个核心部分:

  1. 1. assistant:由LLM解读消息,决定是否需要调用工具。

  2. 2. tools:专门执行工具请求的节点。

  3. 3. 条件边:检查LLM输出的最后一条消息中是否包含工具调用。如果有,则跳转到工具节点;否则终止。

from langgraph.graph import StateGraph, START, MessagesState
from langgraph.prebuilt import tools_condition, ToolNode
from IPython.display import Image, display

def assistant(state: MessagesState):
   return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}


builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# Define edges: 
builder.add_edge(START, "assistant")
# If there's a tool call, go to 'tools'; else finish
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")
react_graph = builder.compile()
display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))

演示与测试

下面是见证成果的时刻。向智能体发送几种不同类型的查询,观察它如何决定调用哪种工具。

统计供应商

当LLM识别到用户的问题是在统计供应商数量时,就会调用supplier-count工具,并将结果返回给用户。

from langchain_core.messages import HumanMessage

messages = [
    HumanMessage(
        content="How many suppliers ha ve supply capacity more than 20000 and is located in Oslo?"
    )
]
result = react_graph.invoke({"messages": messages})
for m in result["messages"]:
    m.pretty_print()
  1. 1. LLM识别到需要统计供应商数量 → 调用supplier-count

  2. 2. 工具返回供应商数量。

  3. 3. LLM将这个数字告知用户。

列出供应商

当用户要求列出符合某种条件的供应商时,调用的是supplier-list工具。

messages = [
    HumanMessage(
        content="What are the suppliers ha ving capacity above 40000?"
    )
]
result = react_graph.invoke({"messages": messages})
for m in result["messages"]:
    m.pretty_print()

在这里,LLM调用supplier-list,从Neo4j中按条件筛选出供应能力大于40,000的供应商列表。

组合查询

如果用户的问题同时包含描述和数值条件,智能体就会结合向量相似性和结构化查询。

messages = [
    HumanMessage(
        content="Find suppliers that deal with steel and ha ve at least 20000 supply capacity."
    )
]
result = react_graph.invoke({"messages": messages})
for m in result["messages"]:
    m.pretty_print()

工具会为供应能力≥20000构建一个WHERE子句,同时利用向量属性对描述中“与钢铁相关”的内容进行语义匹配。

结论

跳出纯嵌入式的查询,将知识图谱和结构化工具引入RAG体系,释放出了一种强大的协同效应:

  • 精确性:数值过滤、计数、分组等操作交给Neo4j的Cypher查询处理,结果非常精确。

  • 语义相关性:文本嵌入在语义匹配方面依然表现出色,两者互补。

  • 稳定性:将生成式查询的风险降到最低,改由确定性的函数调用来完成,大大减少了生产环境中的错误。

  • 可扩展性:随着数据模式变得复杂,可以通过增加更专业的工具来扩展能力。LLM的角色转变为工具调度者,而不是自己去生成复杂查询。

这样一套系统,本质上是对两类能力的协同调度——语义搜索由文本嵌入来保障,精确的结构化查询则交给Neo4j的知识图谱。两全其美,各得其所。

热点追踪提示词
你是一名 AI 行业编辑,请围绕下面这条热点输出一份资讯解读:
热点:LangGraph知识图谱驱动供应链AI智能体智能升级要求:
1. 先用一句话解释这条热点在讲什么
2. 再总结它为什么重要
3. 说明会影响哪些 AI 产品或内容方向
4. 最后给出 3 个适合资讯站使用的标题
来源:https://www.53ai.com/news/knowledgegraph/2025011876539.html
ai 人工智能

游乐网为非赢利性网站,所展示的游戏/软件/文章内容均来自于互联网或第三方用户上传分享,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系youleyoucom@outlook.com。

相关热点
AI热点2026-07-04 19:00
Daetama数据科学完整准备工作系统指南与精选学习资源汇总

Daetama是面向数据科学面试和SQL能力提升的练习平台,已收录超100个覆盖基础到进阶的SQL题目,求职板块与课程模块在开发中,团队保持每周更新节奏,提供系统性刷题与模拟面试场景。

AI热点2026-07-04 19:00
AI驱动配音平台 Speakmulti

SpeakMulti是一款AI驱动的配音平台,可将YouTube视频翻译成多种语言,保留原始说话者的音色和语调,降低本地化成本。用户提交视频并选择目标语言后,AI自动完成配音,并由专家团队审核,确保准确自然。

AI热点2026-07-04 18:59
Umi-OCR图片转文字识别软件

需求人群 如果你经常需要从图片中提取文字——例如整理截图内容、翻译图片里的外语文本、识别带有水印的图片信息——那么 Umi-OCR 无疑是一款相当实用的工具。它完全在本地运行,无需联网,对隐私保护极为友好。 产品特色 这款工具的核心亮点都集中在实用性上。截屏识别操作非常顺手,按下快捷键即可框选区域,

AI热点2026-07-04 18:59
用AI生成你最爱的画家或艺术运动风格绘画

艺术创作与人工智能的融合,正在开启一个全新的创作时代。moonlightai 正是这样一款AI绘画工具,能够帮助用户通过人工智能快速生成不同风格的绘画作品——无论你想复刻文艺复兴时期的古典优雅,还是为画作注入梵高般炽热的笔触,甚至从艾沃佐夫斯基的海浪星空中汲取灵感,它都能轻松实现。 需求人群 简单来

延伸阅读