Skip to content

Node节点是否线程安全? #764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
HY-love-sleep opened this issue Apr 30, 2025 · 5 comments
Open

Node节点是否线程安全? #764

HY-love-sleep opened this issue Apr 30, 2025 · 5 comments

Comments

@HY-love-sleep
Copy link
Contributor

类似OverAllState, com.alibaba.cloud.ai.graph.node下的node, 在多线程同时调用同一个 Builder 实例时, 是否也有线程安全问题?

@disaster1-tesk
Copy link
Collaborator

类似OverAllState, com.alibaba.cloud.ai.graph.node下的node, 在多线程同时调用同一个 Builder 实例时, 是否也有线程安全问题?

贴具体的代码看一下,没太懂你的意思

@HY-love-sleep
Copy link
Contributor Author

类似OverAllState, com.alibaba.cloud.ai.graph.node下的node, 在多线程同时调用同一个 Builder 实例时, 是否也有线程安全问题?

贴具体的代码看一下,没太懂你的意思

例如com.alibaba.cloud.ai.graph.node.QuestionClassifierNode

public static class Builder {

	private String inputTextKey;

	private ChatClient chatClient;

	private List<String> categories;

	private List<String> classificationInstructions;

	public Builder inputTextKey(String input) {
		this.inputTextKey = input;
		return this;
	}

	public Builder chatClient(ChatClient chatClient) {
		this.chatClient = chatClient;
		return this;
	}

	public Builder categories(List<String> categories) {
		this.categories = categories;
		return this;
	}

	public Builder classificationInstructions(List<String> classificationInstructions) {
		this.classificationInstructions = classificationInstructions;
		return this;
	}

	public QuestionClassifierNode build() {
		return new QuestionClassifierNode(chatClient, inputTextKey, categories, classificationInstructions);
	}

}

内部字段可变且无同步, 如果两个线程同时调用同一个 Builder 实例的链式方法并紧接着 build(),我理解可能拿到部分被另一个线程覆盖的字段值

@inlines10
Copy link
Contributor

不同线程都是new 的QuestionClassifierNode

@HY-love-sleep
Copy link
Contributor Author

我理解不是new出来的, 而是共享一个QuestionClassifierNode, node节点是这么创建的: QuestionClassifierNode feedbackClassifier = QuestionClassifierNode.builder()…build(); QuestionClassifierNode specificQuestionClassifier = QuestionClassifierNode.builder()…build(); 然后把这两个实例注册到同一个 StateGraph 单例里: StateGraph stateGraph = new StateGraph(…) .addNode("feedback_classifier", node_async(feedbackClassifier)) .addNode("specific_question_classifier", node_async(specificQuestionClassifier)) …; 后续每个 HTTP 请求线程调用 compiledGraph.invoke(...),都会复用同一套节点实例,并发地在它们身上执行 apply()。

OverAllState非线程安全的, 在多线程并发调用CompiledGraph.invoke(...)时, 会首先调用 public Optional invoke(Map<String, Object> inputs) { return this.invoke(stateGraph.getOverAllState().input(inputs), RunnableConfig.builder().build()); }, 获取同一个 OverAllState 实例, 并调用 .input(inputs) 直接在该实例上修改 data,最终,会把这个同一个 OverAllState 传给整个图的执行流程, 在node的apply方法中: if (StringUtils.hasLength(inputTextKey)) { this.inputText = (String) state.value(inputTextKey).orElse(this.inputText); }, 会把这次输入保存在节点自身的实例字段。 这就有两个共享可变点: 1.同一个 OverAllState 被所有并发请求线程的 invoke() 修改和读取──它的 data Map 会被多次 .input(...)/.updateState(...) 并发写入。 2.同一个 QuestionClassifierNode 实例 的 inputText 字段(以及可能的其他中间字段)被多个线程同时 overwrite。

我理解即使将OverAllState修改为线程安全了, 这里node的设计还是有点问题, 因为QuestionClassifierNode的字段是可变的, 即使this.inputText = (String) state.value(inputTextKey).orElse(this.inputText);这里的state.value(...)不冲突了, 多个线程同时调用同一node的apply方法还是有上述问题。

@inlines10 @disaster1-tesk

@disaster1-tesk
Copy link
Collaborator

我理解不是new出来的, 而是共享一个QuestionClassifierNode, node节点是这么创建的: QuestionClassifierNode feedbackClassifier = QuestionClassifierNode.builder()…build(); QuestionClassifierNode specificQuestionClassifier = QuestionClassifierNode.builder()…build(); 然后把这两个实例注册到同一个 StateGraph 单例里: StateGraph stateGraph = new StateGraph(…) .addNode("feedback_classifier", node_async(feedbackClassifier)) .addNode("specific_question_classifier", node_async(specificQuestionClassifier)) …; 后续每个 HTTP 请求线程调用 compiledGraph.invoke(...),都会复用同一套节点实例,并发地在它们身上执行 apply()。
OverAllState非线程安全的, 在多线程并发调用CompiledGraph.invoke(...)时, 会首先调用 public Optional invoke(Map<String, Object> inputs) { return this.invoke(stateGraph.getOverAllState().input(inputs), RunnableConfig.builder().build()); }, 获取同一个 OverAllState 实例, 并调用 .input(inputs) 直接在该实例上修改 data,最终,会把这个同一个 OverAllState 传给整个图的执行流程, 在node的apply方法中: if (StringUtils.hasLength(inputTextKey)) { this.inputText = (String) state.value(inputTextKey).orElse(this.inputText); }, 会把这次输入保存在节点自身的实例字段。 这就有两个共享可变点: 1.同一个 OverAllState 被所有并发请求线程的 invoke() 修改和读取──它的 data Map 会被多次 .input(...)/.updateState(...) 并发写入。 2.同一个 QuestionClassifierNode 实例 的 inputText 字段(以及可能的其他中间字段)被多个线程同时 overwrite。
我理解即使将OverAllState修改为线程安全了, 这里node的设计还是有点问题, 因为QuestionClassifierNode的字段是可变的, 即使this.inputText = (String) state.value(inputTextKey).orElse(this.inputText);这里的state.value(...)不冲突了, 多个线程同时调用同一node的apply方法还是有上述问题。

@inlines10 @disaster1-tesk

你这说的其实还是stategraph创建时共用一个overallstate的问题,针对这个情况,你可以在http每次调用new新的stategraoh跟overallstate就可以解决这个问题,这个跟#727 里面说的是一样的,727issue里面说的其实也可以通过我这种方式解决,不过这种解决方案会频繁创建对象,因此不太推荐

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants