【开源、教程】RAG全流程实现(java+完整代码):第二弹

【开源、教程】RAG全流程实现(java+完整代码):第二弹
【开源、教程】RAG全流程实现(java+完整代码):第二弹
本帖使用社区开源推广,符合推广要求。我申明并遵循社区要求的以下内容:
  • 我的帖子已经打上 开源推广 标签:
  • 我的开源项目完整开源,无未开源部分:
  • 我的开源项目已链接认可 LINUX DO 社区:
  • 我帖子内的项目介绍,AI生成、润色内容部分已截图发出:
  • 以上选择我承诺是永久有效的,接受社区和佬友监督:

以下为项目介绍正文内容,AI生成、润色内容已使用截图方式发出


前言

本教程的环境基于 jdk8 + langchain4j 0.35

教程源码放在这里了:

github.com

GitHub - worenbudaoni/rag-study-helper: 一个学习检索增强生成的全流程助手

一个学习检索增强生成的全流程助手

文章内容

因为内容比较多,我会从下面三个文章进行讲解,后续发布后会贴出来,这节讲:接入飞书WIKI文档

强烈建议先看完第一弹,不然后面代码有可能看不懂

实现逻辑

后面会有图文讲解的,这里就相当于大概介绍一下,看个大概就好,有不了解的不要先去搜,我后文都会讲,如果讲漏了麻烦评论一下,我改正

飞书开发者平台
1、去飞书开发者平台创建一个应用
2、给应用赋予权限(权限管理 菜单)
3、给应用赋予机器人能力(添加应用能力 菜单)
4、发布(版本管理与发布 菜单)
5、获取应用的app-id、app-secret(凭证和基础信息 菜单)
飞书app
1、创建一个群聊
2、把机器人给拉进去
3、点击左边菜单的更多找到知识库,新建知识库(下面统一称为 WIKI)
4、添加群聊(机器人)为管理员:点 WIKI 进去会打开一个网站,左下角有个设置点进去,在成员设置->角色与权限->管理员,添加管理员,搜索我们刚才创建的有机器人的群聊并添加
5、在页面的连接处找到space-id,如:https://kcnvw23rzo5r.feishu.cn/wiki/settings/666666(666666就是我们要的space-id)
项目
1、创建一个job,用来定时获取文档(下面为job启动后的流程)
2、通过app-id、app-secret获取tenant_access_token和expire
3、通过tenant_access_token和space-id获取文档信息(名字、更新时间、documentToken 等等)
4、通过documentToken去获取文档的内容(字符串)
5、走【开源、教程】RAG全流程实现(java+完整代码):第一弹的入库流程

一、飞书开发者平台

app-id、app-secret 是啥

app-id:应用的唯一标识
app-secret:应用的密钥,在创建应用时由平台生成,可用于获取app_access_token

1、飞书开放平台创建企业应用开发者后台 - 飞书开放平台

这里注意的是创建好应用后需要审核、启用,所以企业级的应用权限在领导手上,我们可以创建一个个人版的飞书账号来做实验

image

2、给应用赋予权限 (权限管理菜单)

直接复制我的也行

{
  "scopes": {
    "tenant": [
      "bitable:app:readonly",
      "docx:document:readonly",
      "drive:drive:readonly",
      "drive:file:readonly",
      "wiki:wiki:readonly"
    ],
    "user": []
  }
}

image

3、给应用赋予机器人能力 (添加应用能力菜单)

image

4、发布 (版本管理与发布菜单)

创建好后发布就行

image

5、获取应用的app-id、app-secret(凭证和基础信息菜单)

image

二、飞书app

space-id是啥

就是飞书知识库(WIKI)所对应的空间ID,我们找到这个空间就可以找到下面的文档
打个比方就是图书馆的书架,书架有个唯一标识(小说),我们根据这个书架ID(小说)去找下面所有的书

1、创建一个群聊

image

2、把机器人给拉进去

image

3、点击左边菜单的更多找到知识库,新建知识库(下面统一称为 WIKI)

image

4、添加群聊(机器人)为管理员

点 WIKI 进去会打开一个网站,左下角有个设置点进去,在成员设置->角色与权限->管理员,添加管理员,搜索我们刚才创建的有机器人的群聊并添加

image

5、在页面的连接处找到space-id

如:https://kcnvw23rzo5r.feishu.cn/wiki/settings/666666(666666就是我们要的space-id)

image

三、项目

这里就不按照上面实现逻辑写的走了,我就按照代码里的讲解

1、job总览(步骤拆解在后面)FeishuSyncService.java

// 没想加重框架,如果用xxl-job什么的,自己搬一下就行了
@Scheduled(cron = "${app.feishu.cron}")  
public void syncWiki() {  
    log.info("Starting Feishu wiki sync for space: {}", spaceId);  
    try {  
	    // 获取全部文档信息(通过app-id、app-secret、space-id)
	    // 这里逻辑后面会讲
        List<WikiNode> nodes = feishuClient.getWikiNodeTree(spaceId);  
        log.info("Found {} nodes in wiki", nodes.size());  
		// 成功数量,跳过数量(如果数据存在了关系型数据库且没有更新就跳过),失败数量
        int synced = 0, skipped = 0, failed = 0;  
  
        for (WikiNode node : nodes) {  
	        // 后缀
            String objType = node.getObjType();
            // 文档令牌 用来获取 文档内容
            String nodeToken = node.getNodeToken();
            // 更新时间判断是否需要跳过
            long updateTime = node.getUpdateTime();  
			// 是否入库
            Documents doc = documentsMapper.selectOne(  
                    Wrappers.<Documents>lambdaQuery()  
                            .eq(Documents::getFeishuNodeToken, nodeToken)  
            );  
            // 文档是否更新是否需要跳过,这个更新时间不在where条件里面是因为后续要继续用到这个数据
            if (doc != null && doc.getFeishuUpdateTime() != null  
                    && doc.getFeishuUpdateTime() == updateTime) {  
                skipped++;  
                continue;            }  
  
            try { 
	            // 文档内容
                String content;
                // 获取文件名
                String fileName;  
                switch (objType) {  
                    case "doc":  
                    case "docx":  
	                    // 获取文档内容
	                    // 这里逻辑就不讲了,我后面扔给飞书的文档,照着对接或者看我源码就好
                        content = feishuClient.getDocumentContent(node.getObjToken());  
                        fileName = node.getNodeTitle() + "_文档";  
                        break;                    case "sheet":  
                        content = feishuClient.getSheetContent(node.getObjToken());  
                        fileName = node.getNodeTitle() + "_表格";  
                        break;                    case "bitable":  
                        content = feishuClient.getBitableContent(node.getObjToken());  
                        fileName = node.getNodeTitle() + "_多维表格";  
                        break;                    default:  
                        skipped++;  
                        continue;                }  
  
                // 如果是更新,先删旧向量和映射记录  
                if (doc != null) {  
                    // 查询旧文档相关的向量映射  
                    List<DocumentChunks> oldChunks = documentChunksMapper.selectList(  
                            Wrappers.<DocumentChunks>lambdaQuery()  
                                    .eq(DocumentChunks::getDocumentId, doc.getId())  
                    );  
                    // 有两张表
                    // 第一张为文档库:记录文档标题、更新时间、创建人等信息
                    // 第二张为分片库:记录向量数据库插入后的向量ID
                    // 向量ID
                    List<String> vectorIds = oldChunks.stream()  
                            .map(DocumentChunks::getVectorId)  
                            .collect(Collectors.toList());  
                    // 删除向量  
                    embeddingStore.removeAll(vectorIds);  
  
                    // 删除映射记录  
                    documentChunksMapper.delete(  
                            Wrappers.<DocumentChunks>lambdaQuery()  
                                    .eq(DocumentChunks::getDocumentId, doc.getId())  
                    );  
                    // 删除文档  
                    documentsMapper.deleteById(doc.getId());  
                }  
  
                // RAG 入库流程  (第一篇文章中亦有记载(跟第一章代码有些许出入,看完第一章后,直接看源码更佳))
                ingestionService.ingestFeishuDocument(fileName, content, nodeToken, updateTime, objType);  
                synced++;  
                log.info("  Synced: {} ({})", node.getNodeTitle(), nodeToken);  
            } catch (Exception e) {  
                log.error("  Failed to sync node: {} ({})", node.getNodeTitle(), nodeToken, e);  
                failed++;  
            }  
        }  
  
        // 清理远程已删除的文档  
        // 这里的逻辑是
        // 第一次job执行:查询飞书wiki给了 A、B、C 三个文档入库
        // 后面有人在wiki中删了 C 文档
        // 第二次job执行:只有查询出 A、B 两个文档
        // 这时就要去数据库中和向量库中删除多余的 C 文档 
        List<String> remoteTokens = nodes.stream()  
                .map(WikiNode::getNodeToken)  
                .collect(Collectors.toList());  
        if (!remoteTokens.isEmpty()) {  
	        // MySQL 查出本地多出的记录,只遍历需要删除的  
            List<Documents> toRemove = documentsMapper.selectList(  
                    Wrappers.<Documents>lambdaQuery()  
                            .isNotNull(Documents::getFeishuNodeToken)  
                            .notIn(Documents::getFeishuNodeToken, remoteTokens)  
            );  
            for (Documents removed : toRemove) {  
                log.info("Document removed remotely, cleaning up: {} ({})", removed.getDocumentName(), removed.getFeishuNodeToken());  
                List<DocumentChunks> chunks = documentChunksMapper.selectList(  
                        Wrappers.<DocumentChunks>lambdaQuery()  
                                .eq(DocumentChunks::getDocumentId, removed.getId())  
                );  
                List<String> vectorIds = chunks.stream()  
                        .map(DocumentChunks::getVectorId)  
                        .collect(Collectors.toList());  
                // 向量数据库 删
                embeddingStore.removeAll(vectorIds);  
				// 关系型数据库 分片库 删
                documentChunksMapper.delete(  
                        Wrappers.<DocumentChunks>lambdaQuery()  
                                .eq(DocumentChunks::getDocumentId, removed.getId())  
                );  
                // 关系型数据库 文档库 删
                documentsMapper.deleteById(removed.getId());  
            }  
        }  
  
        log.info("Feishu wiki sync complete: synced={}, skipped={}, failed={}",  
                synced, skipped, failed);  
    } catch (Exception e) {  
        log.error("Feishu wiki sync failed", e);  
    }  
}

2、递归获取知识库所有文档节点FeishuClient.java

这里其实没什么特别好讲的点,就是参考飞书文档,然后请求并解析
我在源码里也标记了文档的地址,所以这里放一个总体的查询地址
开发文档 - 飞书开放平台

/**  
 * 获取 tenant_access_token(内部自动缓存和刷新)  
 */  
public synchronized String getAccessToken() throws IOException {  
    if (cachedToken != null && System.currentTimeMillis() < tokenExpireAt) {  
        return cachedToken;  
    }  
    String json = "{\"app_id\":\"" + appId + "\",\"app_secret\":\"" + appSecret + "\"}";  
    // https://open.feishu.cn/document/server-docs/authentication-management/access-token/tenant_access_token_internal  
    Request request = new Request.Builder()  
            .url(baseUrl + "/open-apis/auth/v3/tenant_access_token/internal")  
            .post(RequestBody.create(JSON, json))  
            .build();  
    try (Response resp = httpClient.newCall(request).execute()) {  
        JsonNode body = objectMapper.readTree(resp.body().string());  
        if (body.get("code").asInt() != 0) {  
            throw new IOException("Failed to get access token: " + body);  
        }  
        cachedToken = body.get("tenant_access_token").asText();  
        // tenant_access_token 的最大有效期是 2 小时  
        // 7200 是秒  
        int expire = body.get("expire").asInt(7200);  
        // 防御性编程 免得刚好过期 由于网络延时 造成接口调用失败  
        tokenExpireAt = System.currentTimeMillis() + (expire - 60) * 1000L;  
        return cachedToken;  
    }  
}  
  
/**  
 * 递归获取知识库所有文档节点。  
 */  
public List<WikiNode> getWikiNodeTree(String spaceId) throws IOException {  
    List<WikiNode> allNodes = new ArrayList<>();  
    collectNodes(spaceId, null, allNodes);  
    return allNodes;  
}  
  
private void collectNodes(String spaceId, String parentNodeToken, List<WikiNode> result) throws IOException {  
    List<WikiNode> currentLevelNodes = new ArrayList<>();  
    String pageToken = null;  
    do {  
        // https://open.feishu.cn/document/server-docs/docs/wiki-v2/space-node/create  
        StringBuilder url = new StringBuilder(baseUrl + "/open-apis/wiki/v2/spaces/" + spaceId + "/nodes");  
        if (parentNodeToken != null) {  
            url.append("/").append(parentNodeToken).append("/children");  
        }  
        url.append("?page_size=50");  
        if (pageToken != null) {  
            url.append("&page_token=").append(pageToken);  
        }  
  
        Request request = new Request.Builder()  
                .url(url.toString())  
                .header("Authorization", "Bearer " + getAccessToken())  
                .get()  
                .build();  
  
        try (Response resp = httpClient.newCall(request).execute()) {  
            JsonNode body = objectMapper.readTree(resp.body().string());  
            if (body.get("code").asInt() != 0) {  
                log.error("Wiki API error for URL [{}]: {}", url, body);  
                break;            }  
  
            JsonNode items = body.path("data").path("items");  
            for (JsonNode item : items) {  
                WikiNode node = new WikiNode();  
                // 节点token  
                node.setNodeToken(item.path("node_token").asText());  
                // 对应文档类型的token,可根据 obj_type 判断属于哪种文档类型。  
                node.setObjToken(item.path("obj_token").asText());  
                // 文档类型,对于快捷方式,该字段是对应的实体的obj_type。  
                // 可选值有:  
                // doc:旧版文档 sheet:表格 mindnote:思维导图 bitable:多维表格 file:文件 docx:新版文档 slides:幻灯片  
                node.setObjType(item.path("obj_type").asText());  
                // 文档标题  
                node.setNodeTitle(item.path("title").asText());  
                node.setParentNodeToken(parentNodeToken);  
                // 是否有子节点  
                node.setHasChild(item.path("has_child").asBoolean(false));  
                // 文档最近编辑时间  
                String editTime = item.path("obj_edit_time").asText();  
                node.setUpdateTime(Long.parseLong(editTime.isEmpty() ? "0" : editTime));  
                currentLevelNodes.add(node);  
            }  
  
            pageToken = body.path("data").path("page_token").asText(null);  
        }  
    } while (pageToken != null && !pageToken.isEmpty());  
  
    // Add all nodes from this level, then recurse into children  
    result.addAll(currentLevelNodes);  
    for (WikiNode node : currentLevelNodes) {  
        if (node.isHasChild()) {  
            collectNodes(spaceId, node.getNodeToken(), result);  
        }  
    }  
}

四、测试

1、导入文档

还是拿这个 补鸡稻 作为测试案例

image

2、配置 app-id、app-secret、space-id和sync-enable

我这里通过 jvm 运行参数注入,免得又把 apikey 给上传到 github 了
sync-enable 记得为 true,不然不注册 spring bean

image

3、运行时发现文档可以查询到,并且入库了

image

4、提问

image

5、测试结束,完结撒花:cherry_blossom:

2 个帖子 - 2 位参与者

阅读完整话题

来源: LinuxDo 最新话题查看原文