Skip to main content

Agent Canvas Go Port — Design Document

Last cross-checked against code: 2026-06-17 Source of truth: internal/agent/ (canvas, component, tool, runtime, workflowx, dsl, audio, sandbox, observability/otel/) + internal/service/ (agent.go, canvas_decode.go) + internal/handler/ (agent.go) + tools/ (gen-component-parity, migrate-canvas)


0. How to read this document

  • Sections 1–13 describe the current design.
  • Section 14 lists the actionable backlog for the next iteration.
  • Appendices preserve the per-component / per-tool inventories and corner-case catalogues.

1. 概述 / Overview

1.1 目标

RAGFlow 的 Agent Canvas(编排 22 个 component + 21 个 tool 的 DSL 执行器)从 Python 移植到 Go。Python 端位于 agent/canvas.pyGraph / Canvas)+ agent/component/base.pyComponentBase / ComponentParamBase)+ agent/tools/。Go 端独立实现于 internal/agent/,与 Python 端通过共享 DSL JSON schema 兼容(v1↔v2 双向转换器在 internal/agent/dsl/,已收敛为单一 wire 形态)。

1.2 核心架构决策

State + Workflow 混血:eino 的 compose.Workflow 提供声明式拓扑(节点 + exec 边)+ 并发调度;compose.WithGenLocalState + WithStatePreHandler/WithStatePostHandler 提供任意节点读任意节点输出的"状态变量"能力。State 解决 {{cpn_id@param}} 任意交叉引用问题,Workflow 解决执行拓扑 + cancel + checkpoint 问题。

5-tier 移植策略:T1(直接复用 eino 内置)→ T2(薄包装)→ T3(Lambda + State)→ T4(嵌套 Workflow 子图)→ T5(重 I/O + 第三方 lib)。判定原则:功能相当 → 优先 eino 内置,禁止复制 Python 端的黑魔法(_feeded_deprecated_params、partial hack、thread_pool_exec 异步伪装等)。

Checkpoint 存 Redis:eino compose.CheckPointStore 是纯 KV 接口,Redis String + EXPIRE 是天然 fit。业务元数据(status / canvas_id / parent_run_id)走独立 Redis Hash(由应用层显式控制,不依赖 eino 自动写)。

Observability 走 OpenTelemetry:用 OTLP HTTP exporter + eino callbacks.Handler 注入 span。

AGPL-3 零容忍:T5 DOCX 库穷举后全部 AGPL-3/维护停滞,自实现 OOXML writerarchive/zip stdlib + text/template);PDF 选 signintech/gopdf (MIT);Excel 选 xuri/excelize/v2 (BSD-3);Markdown 选 yuin/goldmark (MIT)。

Wait-for-User 用 eino 原生 interrupt:废除自实现 sentinel 链路(__wait_for_user__ / _user_input_provided / synthetic Loop / cycle_wrap.go / wait_for_user.go);改用 compose.Interrupt + compose.ResumeWithData 一等 API,节点内 compose.GetResumeContext[T](ctx) 读用户输入。

Real Compile/Invoke 接入生产链buildRunFunc 驱动真正的 canvas.CompileCompiledCanvas.Invoke 流程。

1.3 Reuse-First Principle

Before adding any new component, runtime abstraction, or third-party dependency, every phase must check whether the capability already exists elsewhere in the codebase or its declared dependencies.

Decision order (apply in sequence; first match wins):

  1. Reuse the existing RAGFlow model/service capability as-is. If internal/entity/models/anthropic.go, internal/handler/chat_session.go, or similar already has the capability, just wire it through — don't reimplement.
  2. Wrap an existing eino / workflowx / MCP-client primitive. If eino's compose.NewGraphMultiBranch or workflowx.AddLoopNode or internal/utility/mcp_client.go already provides the mechanism, build a thin adapter.
  3. Promote an already-declared-but-indirect dependency. If the dependency is already in go.mod (even as // indirect), the work is to import it directly and use it.
  4. Add a registry alias only (no new body) when an existing engine-level mechanism already handles the semantics.
  5. Only as a last resort — add a new component, a new interface method, or a new third-party dependency. Each such addition must come with a written justification explaining why steps 1-4 don't apply.

Anti-patterns explicitly rejected: ❌ Adding InvokeAsync to the Component interface (would compete with eino compose.Parallel); ❌ Registering LoopItem / ExitLoop as components; ❌ Reimplementing Python's runtime path extension in Go; ❌ Building a new MCP subsystem; ❌ "Introducing" gonja (it's already a declared dep).


2. 顶层模块布局 / Module Layout

internal/agent/
├── canvas/ # 画布执行器(eino 编译、状态调度、checkpoint、cancel、stream、interrupt)
│ ├── canvas.go # Canvas struct, BuildWorkflow, Run/Stream
│ ├── runner.go # canvas.Runner; SSE event emission + interrupt catch
│ ├── scheduler.go # State pre/post handler + 节点 lambda + legacyNoOpNames
│ ├── node_body.go # 单节点 lambda 体 (per-class timeout via resolveTimeout)
│ ├── timeout.go # componentDefaults map; 4-level resolver (per-class env → per-class table → uniform env → 600s fallback)
│ ├── loop_subgraph.go # Loop 宏展开 (buildSubWorkflow + translateLoopCondition)
│ ├── interrupt_resume.go # eino interrupt 封装: UserFillUpNodeBody / IsInterruptError / ExtractInterruptContexts
│ ├── multibranch.go # Switch / Categorize 路由的 eino MultiBranch 集成
│ ├── cancel.go # Redis cancel 协议 (watchCancel goroutine)
│ ├── stream.go # SSE 通道
│ ├── compile.go # eino 编译 + WithCheckPointStore + checkPointAdapter (不覆盖 InternalSerializer)
│ ├── checkpoint_store.go # RedisCheckPointStore (Get/Set/Delete) — interface 包含 Delete
│ ├── run_tracker.go # RunTracker (Start/MarkSucceeded/MarkFailed/MarkCancelled/AttachCheckpoint)
│ ├── state_serializer.go # CanvasStateSerializer (encoding/json)
│ └── state_export.go # WithState / GetStateFromContext 薄重导出

├── component/ # 19 components + 6 helpers (含 fixture_stubs.go + universe_a_wrappers.go)
│ ├── base.go # Component interface + ParamError + ErrNotImplemented
│ ├── registry.go # name → factory 映射 (auto-init)
│ ├── runtime_wire.go # 组件与 runtime 包的桥接
│ ├── io_init.go # T5 组件初始化
│ ├── fixture_stubs.go # IterationStub / IterationItemStub / RetrievalStub / SearchMyDataset alias / ExeSQLStub
│ ├── universe_a_wrappers.go # newRetrievalComponent / newExeSQLComponent / newTavilySearchComponent — Universe A → Universe B 委派
│ ├── production_chain_fixes_test.go # 生产链回归 pin 测试
│ ├── agent.go # T1 — react.NewAgent + tool artifact capture + maybeAppendCitation + Reset() interface-assert
│ ├── llm.go # T1 — EinoChatModel 薄包装; VisualFiles / Cite / MessageHistoryWindowSize / ChatTemplateKwargs / OutputStructure / JSONOutput / TopP / MaxRetries / DelayAfterError
│ ├── llm_retry.go # retryInvoker + Unwrap(); unwrapChatInvoker 辅助
│ ├── switch.go # T2 — 12 of 12 operators (==/!=/contains/not contains/start with/end with/empty/not empty/>/</>=/<=)
│ ├── begin.go / message.go / categorize.go / invoke.go / browser.go
│ ├── data_operations.go / list_operations.go / string_transform.go
│ ├── variable_aggregator.go / variable_assigner.go
│ ├── fillup.go / userfillup.go
│ ├── loop.go # T4 — no-op marker, 实际工作由 loop_subgraph 接管
│ ├── parallel.go # T4 — workflowx.AddParallelNode 包装
│ ├── docs_generator.go / excel_processor.go # T5
│ └── render.go # output_format HTML/Markdown/plain renderer

├── tool/ # 21 tools (统一 eino tool.InvokableTool)
│ ├── registry.go # BuildAll / BuildByName (alias: exesql=execute_sql, retrieval=search_my_dateset=search_my_dataset)
│ ├── http_helper.go # 共用 HTTP client (context + retry + backoff)
│ ├── ssrf.go # SSRF 防护
│ ├── mcp.go # MCPToolAdapter — InvokableRun 调 mcpclient.CallTool over streamable-HTTP
│ ├── retrieval.go / retrieval_service.go / retrieval_nlp.go / retrieval_kg.go # RetrievalService 双 registry: nlp + kg
│ ├── sandbox_bridge.go # CodeExec sandbox providers 桥接
│ └── akshare.go / arxiv.go / code_exec.go / code_exec_client.go / crawler.go / deepl.go
│ / duckduckgo.go / email.go / exesql.go / github.go / google.go
│ / google_scholar.go / jin10.go / pubmed.go / qweather.go
│ / searxng.go / tavily.go / tushare.go / wencai.go / wikipedia.go / yahoo_finance.go

├── runtime/ # canvas + component 共享的运行时契约(无 cycle)
│ ├── component.go # Component interface (从 component/base.go 提取)
│ ├── context.go # GetStateFromContext / withState
│ ├── state.go # CanvasState + NewCanvasState + GetVar/SetVar/ReadVars + MarshalJSON/UnmarshalJSON + compose.RegisterSerializableType
│ ├── template.go # ResolveTemplate (regex 快速路径)
│ ├── template_jinja.go # gonja 兜底
│ ├── selector.go # component selector 辅助
│ └── metrics.go # runtime metrics + Prometheus counters

├── workflowx/ # eino 扩展(零侵入,外部 helper)
│ ├── loop.go # AddLoopNode[T] — 通用 do-while 循环节点
│ ├── parallel.go # AddParallelNode[I,O] — 通用 bounded-concurrency 节点
│ └── *_test.go # 单元 + 集成测试

├── sandbox/ # CodeExec 沙箱 providers
│ ├── provider.go / manager.go / http.go / result_protocol.go / artifacts.go
│ ├── self_managed.go / aliyun.go / e2b.go / local.go / ssh.go
│ └── e2b_test.go / local_test.go / manager_test.go / result_protocol_test.go / self_managed_test.go / ssh_test.go

├── audio/ # TTS
│ ├── tts.go # Synthesizer interface + 错误哨兵 + 默认 stub
│ ├── model_provider_synthesizer.go # calls models.BaseModel.AudioSpeech (60+ driver impls)
│ ├── tts_dispatch.go # TTSDispatcher interface + NewTTSDispatchFunc
│ └── *_test.go

├── observability/otel/ # OTel SDK + eino callbacks.Handler
│ ├── provider.go # TracerProvider 工厂
│ └── handler.go # eino callbacks.Handler → OTel span

└── dsl/ # DSL normalize
├── normalize.go # NormalizeForCanvas (enforceHandleIds / buildGraphFromComponents / foldLegacyLoopVariants)
├── normalize_test.go
└── testdata/ # 7 fixtures (all / browser / dfx_picture_parser / questions_category / resume / subaget / switch)

internal/handler/
├── agent.go # HTTP API (RunAgent SSE with RunEvent.Type dispatch)
├── agent_wait_for_user_test.go # 4 e2e tests pinning wait-for-user orchestrator side
└── admin_runtime.go # POST /api/v1/admin/canvas-runtime/:tenant_id

internal/service/
├── agent.go # AgentService.RunAgent / buildRunFunc / NewAgentService[WithOptions] / option injection
├── canvas_decode.go # decodeCanvasFromDSL
├── canvas_decode_test.go
├── agent_run_e2e_test.go # 4 e2e tests
└── agent_sessions.go # session CRUD

cmd/server_main.go # Redis CheckPointStore + RunTracker + TTS service wire-up

internal/observability/otel/
├── provider.go # TracerProvider 工厂 (读 OTEL_EXPORTER_OTLP_ENDPOINT)
├── handler.go # eino callbacks.Handler → OTel span
└── handler_test.go # tracetest.SpanRecorder

实际文件计数

  • Components: 19 个 — 见 §4.2
  • Tools: 21 个 — 见 §4.5
  • Sandbox providers: 5 个 (self_managed, aliyun, e2b, local, ssh)
  • Test files: 60+ (canvas 17, component 50+, tool 30+, runtime 4, workflowx 8, sandbox 6, audio 3, service 8+, handler 10+)

3. 架构 / Architecture

3.1 State + Workflow 混血

eino compose.Workflow 本身只支持 DAG(节点间数据通过 declared predecessor 输出传递),没有"任意节点读任意节点输出"的现成 API。RAGFlow Python 端用 self._canvas.get_variable_value("cpn_id@param") 实现 {{cpn_id@param}} 任意交叉引用。

Go 端方案

  1. State 承载变量:每个 canvas run 创建 *CanvasState,挂在 context.Value 上。所有节点通过 runtime.GetStateFromContext(ctx) 读写。
  2. State pre-handler:在 wf.AddLambdaNode(...) 时挂 compose.WithStatePreHandler[map[string]any, *runtime.CanvasState](canvasPre),从 State 提取节点输入。
  3. State post-handler:挂 compose.WithStatePostHandler,把节点输出回写 State。
  4. Workflow 承载拓扑:节点按 downstream / upstream 加 exec 边,数据流走 State 不走边
// internal/agent/canvas/scheduler.go
node := wf.AddLambdaNode(cpnID, nodeBody,
compose.WithStatePreHandler[map[string]any, *runtime.CanvasState](canvasPre),
compose.WithStatePostHandler[map[string]any, *runtime.CanvasState](canvasPost),
)
for _, upID := range comp.Upstream {
node.AddInput(upID)
}

CanvasState 序列化

CanvasState 结构包含 sync.RWMutex,原生无法被 encoding/json 序列化(Marshaler 接口与 mutex 不兼容)。通过:

  • MarshalJSON / UnmarshalJSON 方法 — 输出/读取 canvasStateJSON 内部结构(不暴露 mutex)
  • compose.RegisterSerializableType[CanvasState] — 让 eino StatePre/PostHandler 在 interrupt path 能 marshal/unmarshal state

eino InternalSerializer 是另一个独立的序列化机制(eino 内部 checkpoint payload),WithStateSerializer/compose.Serializer 共享。生产代码只 wire WithCheckPointStore (保留 eino InternalSerializer 默认值) + CanvasState 自带 MarshalJSON

3.2 runtime 包:消除 canvas <-> component cycle

问题component/ 大量文件(Begin/Message/Switch/Browser/...)需要调 canvas.CanvasState / canvas.GetStateFromContext / canvas.ResolveTemplate / canvas.SetDefaultFactory;同时 canvas 通过 ComponentFactory 间接依赖 component 的具体实现。强行 canvas -> component 形成 Go import cycle。

方案:把"运行时共用契约"提取到 internal/agent/runtime/canvas 和 component 都依赖 runtime,但不互相依赖

提取到 runtime留在 canvas留在 component
Component interfaceDSL graph types (Canvas, CanvasComponent, CanvasComponentObj)component registry + factory
CanvasState + GetVar/SetVar/ReadVars + MarshalJSON拓扑构建 (BuildWorkflow, buildLoopExpansion, scheduler wiring)具体 component 实现
GetStateFromContext / withState / WithStatecheckpoint / workflow 编译 orchestrationNewBeginComponent, NewMessageComponent, ...
ResolveTemplate + template_jinja (gonja fallback)Loop 宏展开 logic
ParamError, ErrNotImplemented

3.3 eino interrupt 路径

UserFillUp 节点 → compose.Interrupt(ctx, inputSpec)

返回 *InterruptSignal (实现 error 接口)

图引擎捕获 → 自动 checkpoint → 向上传播

Runner.Run 捕获 → SSE "waiting_for_user" + 保存 interrupt id

用户提交 → Runner.Run 注入 __resume_interrupt_id__ + __resume_data__

buildRunFunc 消费 → compose.ResumeWithData(ctx2, id, data)

节点重入 → 顶部 compose.GetResumeContext[T](ctx) → 返回用户输入

核心实现

// internal/agent/canvas/interrupt_resume.go
func UserFillUpNodeBody(cpnID string, params map[string]any) func(ctx context.Context, input map[string]any) (map[string]any, error) {
inputSpec := buildInputSpec(params)
return func(ctx context.Context, input map[string]any) (map[string]any, error) {
// Resume path: 节点重入时, 顶部检查 resume context
if isResume, hasData, data := compose.GetResumeContext[any](ctx); isResume && hasData {
return map[string]any{
"user_input": data,
cpnID: data,
}, nil
}
// 首次执行: 调 Interrupt 暂停图
if err := compose.Interrupt(ctx, inputSpec); err != nil {
return nil, err
}
return nil, errors.New("UserFillUp: interrupt did not halt execution")
}
}

Runner.Run interrupt catchinternal/agent/canvas/runner.go):

if info, ok := compose.ExtractInterruptInfo(runErr); ok {
ctxs := info.InterruptContexts // []*compose.InterruptCtx
if len(ctxs) > 0 {
d.saveInterruptID(canvasID, sessionID, ctxs[0].ID)
payload, _ := json.Marshal(WaitingForUserEvent{CpnID: ctxs[0].ID})
push(out, RunEvent{Type: "waiting_for_user", Data: string(payload)})
return
}
}

Resume 传参buildRunFunc):

if resumeID, ok := root["__resume_interrupt_id__"].(string); ok && resumeID != "" {
resumeData := root["__resume_data__"]
delete(root, "__resume_interrupt_id__")
delete(root, "__resume_data__")
ctx2 = compose.ResumeWithData(ctx2, resumeID, resumeData)
}

Cycle 处理:前端契约保证生产画布无环(hasCanvasCycle 阻止保存),eino 的 DAG 检查在 Compile() 时自动拒绝有环图,无需额外防御。

3.4 真实 Compile/Invoke 接入生产链

// internal/service/agent.go — buildRunFunc

func (s *AgentService) buildRunFunc(canvasID string, versionRow *entity.UserCanvasVersion, dsl map[string]any) canvas.RunFunc {
return func(ctx context.Context, root map[string]any) (*canvas.CanvasState, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

taskID := ""
if versionRow != nil {
taskID = versionRow.ID
}

c, err := decodeCanvasFromDSL(dsl)
if err != nil {
return nil, err
}

runID := canvasID
if sessionID, ok := root["session_id"].(string); ok && sessionID != "" {
runID = runID + "-" + sessionID
}
state := canvas.NewCanvasState(runID, taskID)

userInput, _ := root["user_input"].(string)
state.Sys["query"] = userInput
ctx2 := runtime.WithState(ctx, state)

if resumeID, ok := root["__resume_interrupt_id__"].(string); ok && resumeID != "" {
resumeData := root["__resume_data__"]
delete(root, "__resume_interrupt_id__")
delete(root, "__resume_data__")
ctx2 = compose.ResumeWithData(ctx2, resumeID, resumeData)
}

if s.runTracker != nil {
_ = s.runTracker.Start(ctx2, runID, canvasID, tenantIDFromRoot(root), userInput)
}

var cc *canvas.CompiledCanvas
if s.checkpointStore != nil && s.stateSerializer != nil {
cc, err = canvas.Compile(ctx2, c,
canvas.WithCheckPointStore(s.checkpointStore),
canvas.WithStateSerializer(s.stateSerializer),
)
} else {
cc, err = canvas.Compile(ctx2, c)
}
if err != nil {
s.markRunFailed(ctx2, runID, "compile: "+err.Error())
return nil, fmt.Errorf("canvas compile: %w: %w", err, ErrAgentStorageError)
}

if s.runTracker != nil {
_ = s.runTracker.AttachCheckpoint(ctx2, runID, runID)
}

_, err = cc.Workflow.Invoke(ctx2, map[string]any{"query": userInput})
if err != nil {
if canvas.IsInterruptError(err) || canvas.ExtractInterruptContexts(err) != nil {
s.markRunFailed(ctx2, runID, "interrupt: "+err.Error())
return state, err
}
s.markRunFailed(ctx2, runID, "invoke: "+err.Error())
return nil, fmt.Errorf("canvas invoke: %w: %w", err, ErrAgentStorageError)
}

s.markRunSucceeded(ctx2, runID)
return state, nil
}
}

AgentService option injectioninternal/service/agent.go):

type AgentService struct {
// ... existing fields
checkpointStore canvas.CheckPointStore // nil = in-memory (test path)
stateSerializer canvas.StateSerializer // nil = eino default
runTracker *canvas.RunTracker // nil = best-effort no-tracking
runner *canvas.Runner
}

func NewAgentService() *AgentService {
return NewAgentServiceWithOptions(nil, nil, nil)
}

func NewAgentServiceWithOptions(
cp canvas.CheckPointStore,
ser canvas.StateSerializer,
rt *canvas.RunTracker,
) *AgentService {
return &AgentService{...}
}

Production boot wiringcmd/server_main.go):

// SetRedisCheckPointStore + CanvasStateSerializer + RunTracker → NewAgentServiceWithOptions
// + configureTTSSynthesizer (audio.SetModelProviderSynthesizer)
// Redis 不可达时 graceful degradation: 退化为 in-memory (nil options)

DSL decoderinternal/service/canvas_decode.go):

decodeCanvasFromDSL 接受两种形态:

  1. IMPORT shape: obj.component_name / obj.params (Python v1 DSL 直接写入)
  2. NormalizeForCanvas output shape: 扁平 name / params (生产路径走 NormalizeForCanvas)

不采用 JSON round-trip — 直接 map walking 更清晰,因为生产路径已通过 NormalizeForCanvas 扁平化。所有失败模式 wrap ErrAgentStorageError


4. Component 库 / Component Library

4.1 5-tier 移植策略

Tier含义验收
T1直接用 eino 已有类型/接口,零代码eino 单元测试覆盖
T2薄包装 1 struct + factory,对齐 Python 行为参数跨 eino/RAGFlow 边界 + 1 e2e
T3compose.Lambda + StatePre/PostHandler1 单测 + 1 e2e
T4嵌套 compose.Workflow + getState[CanvasState](ctx)子图单测 + 完整 e2e
T5重 I/O + 第三方 lib单测 + e2e + 失败注入

判定原则:T1 > T2 > T3 > T4 > T5 时禁止跳级

4.2 Component 现状(19 个 .go 文件)

ComponentPython 行为TierGo 实现状态
LLMLLMBundle 单轮 chat + JSON output + cite + streamT1EinoChatModel 薄包装 internal/entity/models/<provider>.go;实现 model.ToolCallingChatModelretryInvoker.Unwrap() + unwrapChatInvoker 实现 normal-absolute-count retry 语义
AgentReAct + tool/MCP + 多轮 streamT1react.NewAgent + compose.ToolsNodeConfig{Tools: tools} + 22 tool 全注册;citation 中间件 + tool artifact 收集已实现;Reset()interface{ Reset() } 类型断言
Switch多条件 (and/or) → 多 downstream + ELSET2compose.NewGraphMultiBranch 路由;12 of 12 operators (==/!=/contains/not contains/start with/end with/empty/not empty/>/</>=/<=) + case-insensitive string ops
CategorizeLLM 分类 + 路由T3Lambda 调 LLM + compose.NewGraphMultiBranch
BeginDSL 入口 + 注入 inputs + 文件 inputsT3Lambda + StatePreHandler;文件走 internal/service/file_service.go
UserFillUp / FillupJinja2 + file inputs + wait-for-user interruptT3text/template 替代 Jinja2 + eino interrupt via interrupt_resume.go
Message最终输出(jinja2 + stream + downloads + filegen + TTS + memory)T3Lambda + schema.StreamReader + text/template + MinIO + TTS dispatch + MemorySaver🟡 真实 TTS binary + MemorySaver completion deferred
InvokeHTTP 客户端 + HTML 清洗 + JSONT3net/http + golang.org/x/net/html
BrowserLLM + HTTP + 文件下载 + MinIOT3复用 Invoke + LLM + storage
DataOperationsdict 7 类操作T3Lambda + encoding/json + go/ast
ListOperationsslice 6 类操作T3Lambda + slices (Go 1.21+ stdlib)
StringTransformsplit/merge + Jinja2T3Lambda + strings.Split + text/template
VariableAggregator多 group,first-non-emptyT3Lambda + State 读
VariableAssigner11 个算子原地改 StateT3Lambda + State 写
Loop条件循环 + loop_variables 初始化 + 终止评估T4compose.NewWorkflow + workflowx.AddLoopNode(loop.go 自身变为 no-op marker;实际工作由 canvas/loop_subgraph.go 宏展开接管)
Parallel数组并行处理T4workflowx.AddParallelNode 包装
DocsGeneratorpdf/docx/txt/md/html 生成T5signintech/gopdf (PDF) + 自实现 OOXML writer (DOCX) + yuin/goldmark (MD);render.go 提供 HTML/Markdown/plain rendering🟡 txt/md/html writers 部分缺失
ExcelProcessorpandas 读/合并/转换 ExcelT5xuri/excelize/v2 (BSD-3)
Retrieval (Universe A)canvas DAG nodeT2newRetrievalComponent — 委派给 Universe B RetrievalTool

4.3 不移植的 Python 端"遗产" / Iteration LoopItem ExitLoop 重分类

Python 端不移植原因
_feeded_deprecated_params / _deprecated_params / _user_feeded_params 三层装饰DSL v2 已去除;Go ComponentParamBase 不引入
ComponentParamBase.validate() + param_validation/*.json 96 文件Go struct tag + go-playground/validator/v10 替代
ComponentBase.thread_limiter = asyncio.Semaphore(...)Go errgroup.SetLimit(MAX_CONCURRENT_CHATS) (stdlib x/sync)
partial 流式 hackeino schema.StreamReader 原生流式
thread_pool_exec(self._invoke, **kwargs) 异步伪装Go 全程 goroutine
set_output("_ERROR", ...) + set_exception_default_value() 双轨Go error 单一返回 + eino OnError callback
ExitLoop no-op 节点DSL v1 compat 通过 legacyNoOpNames 在 canvas 层吸收,不注册 component
LoopItem 组件LoopItem 角色由 workflowx.AddLoopNode 内部 machinery 取代,不注册 componentTestLoop_Registered enforces absence
Iteration / IterationItem 组件IterationStub + IterationItemStub 注册为 compat stubs(DSL round-trip)

4.4 Two Registry Universes (Universe A vs Universe B)

┌──────────────────────────────────────────────────────────────┐
│ Universe A — Canvas DAG Components │
│ Registry: internal/agent/component/registry.go (auto-init) │
│ Interface: Component { Invoke, Stream, Inputs, Outputs } │
│ Output: map[string]any │
│ Names: PascalCase — Retrieval, TavilySearch, ExeSQL, │
│ Answer, Generate, Begin, LLM, Switch, … │
│ Used by: Canvas DAG nodes (placed on the canvas directly) │
├──────────────────────────────────────────────────────────────┤
│ Universe B — Agent ReAct Tools │
│ Registry: internal/agent/tool/registry.go │
│ Interface: einotool.BaseTool { Info, InvokableRun } │
│ Output: JSON string (envelope) │
│ Names: snake_case — retrieval, tavily, execute_sql, … │
│ Used by: Agent component's tools=["…"] list, called via │
│ eino ReAct loop │
└──────────────────────────────────────────────────────────────┘

Mapping table

Universe A (PascalCase)Universe B (snake_case)当前状态
Retrievalretrieval / search_my_dateset / search_my_dataset委派到 Universe B real (nlp + kg 双 backend)
ExeSQLexecute_sql / exesql委派到 Universe B real (mysql/pg/mssql/oceanbase/trino)
TavilySearchtavily委派到 Universe B real
Answer需要 orchestrator-side pause/resume(已通过 eino interrupt 实现)
Generatealias to LLM component
SearchMyDataset注册为 Retrieval alias (4 spellings: PascalCase + snake_case + Python-typo)

4.5 Tool 实现统一模式

// internal/agent/tool/registry.go
type Tool interface {
einotool.InvokableTool // eino 协议:Info() / InvokableRun(ctx, args, opts)
}

func BuildAll(names []string, params map[string]map[string]any) ([]einotool.BaseTool, error)
func BuildByName(name string, params map[string]any) (einotool.BaseTool, error)

21 tool 表 (alias 不算新 tool): akshare, arxiv, code_exec, crawler, deepl, duckduckgo, email, exesql(=execute_sql), github, google, google_scholar, jin10, pubmed, qweather, retrieval(=search_my_dateset=search_my_dataset), searxng, tavily, tushare, wencai, wikipedia, yahoo_finance。

Retrieval 双 registry

  • internal/agent/tool/retrieval_nlp.goNLPRetrievalAdapter 桥接 nlp.RetrievalService
  • internal/agent/tool/retrieval_kg.goKGRetrievalAdapter 桥接 kg.Retrieval(...) (GraphRAG, use_kg=true)
  • internal/agent/tool/retrieval_service.go — 两个独立 SetRetrievalService / SetKGRetrievalService registry; un-wired 返回 ErrRetrievalServiceMissing / ErrKGRetrievalServiceMissing

MCP toolsinternal/agent/tool/mcp.goMCPToolAdapter.InvokableRun 通过 mcpclient.CallTool over streamable-HTTP dispatch。

Tool 通用模式:HTTP 类 tool 走 http_helper.go (context + retry + 指数 backoff);ExeSQL 走 stdlib database/sql + 各 driver (mysql / pg / mssql / oceanbase / trino);CodeExec 走 internal/agent/sandbox/ 5 providers (self_managed / aliyun / e2b / local / ssh) + tool/sandbox_bridge.go 桥接;Retrieval 走进程内 internal/service/nlp/retrieval.go (Dealer 后端已 Go 化)。

4.6 Component & Tool Inventory

Parity legend: ✅ implemented & tested · 🟡 scaffolded (loud-fail sentinel) · ⚠️ implemented with a known gap vs Python.

Universe A — Canvas DAG components (24)

NameSourceStatus
Agentinternal/agent/component/agent.go
Begininternal/agent/component/begin.go
Browserinternal/agent/component/browser.go
Categorizeinternal/agent/component/categorize.go
DataOperationsinternal/agent/component/data_operations.go
DocsGeneratorinternal/agent/component/docs_generator.go
ExcelProcessorinternal/agent/component/excel_processor.go
ExeSQLinternal/agent/component/universe_a_wrappers.go⚠️ Wrapper exists; registry primary still stub
Fillupinternal/agent/component/fillup.go
Generateinternal/agent/component/fixture_stubs.go✅ Legacy alias for DSL round-trip
Invokeinternal/agent/component/invoke.go
Iterationinternal/agent/component/fixture_stubs.go✅ Legacy alias; compat stub
IterationIteminternal/agent/component/fixture_stubs.go✅ Legacy alias; compat stub
ListOperationsinternal/agent/component/list_operations.go
LLMinternal/agent/component/llm.go
Loopinternal/agent/component/loop.go✅ Engine-level macro (LoopItem/ExitLoop deliberately not registered)
Messageinternal/agent/component/message.go🟡 TTS real engine + MemorySaver completion still deferred
Parallelinternal/agent/component/parallel.go
Retrievalinternal/agent/component/universe_a_wrappers.go⚠️ Wrapper exists; registry primary still stub (also covers SearchMyDataset alias)
StringTransforminternal/agent/component/string_transform.go
Switchinternal/agent/component/switch.go✅ All 12 operators with case-folded string ops
TavilySearchinternal/agent/component/universe_a_wrappers.go⚠️ Wrapper exists; registry primary still stub
UserFillUpinternal/agent/component/userfillup.go
VariableAggregatorinternal/agent/component/variable_aggregator.go
VariableAssignerinternal/agent/component/variable_assigner.go
Answerinternal/agent/component/fixture_stubs.go🟡 Compat stub; canvas pause/resume is real but the Answer node is still a placeholder

Stub vs wrapper: Retrieval / TavilySearch / ExeSQL have real delegation wrappers in universe_a_wrappers.go; the registry still maps them to stubs in fixture_stubs.go. Tracked in §14.

Universe B — eino ReAct tools (25 = 23 standalone + 2 aliases)

NameSourceStatus
akshareinternal/agent/tool/akshare.go
arxivinternal/agent/tool/arxiv.go
code_execinternal/agent/tool/code_exec.go + code_exec_client.go✅ All 5 sandbox providers
crawlerinternal/agent/tool/crawler.go
deeplinternal/agent/tool/deepl.go
duckduckgointernal/agent/tool/duckduckgo.go
emailinternal/agent/tool/email.go
execute_sqlinternal/agent/tool/exesql.go⚠️ SELECT-only; rejects Trino/DB2 (ErrExeSQLUnsupportedDB)
exesqlinternal/agent/tool/exesql.go⚠️ Alias of execute_sql
githubinternal/agent/tool/github.go
googleinternal/agent/tool/google.go
google_scholarinternal/agent/tool/google_scholar.go
jin10internal/agent/tool/jin10.go
mcpinternal/agent/tool/mcp.go🟡 MCPToolAdapter wraps mcpclient.Tool; InvokableRun returns "not yet implemented" until mcpclient.CallTools lands
pubmedinternal/agent/tool/pubmed.go
qweatherinternal/agent/tool/qweather.go
retrievalinternal/agent/tool/retrieval.go✅ Adapter + boot wiring (cmd/server_main.go)
search_my_datasetinternal/agent/tool/registry.go✅ Alias of retrieval
search_my_datesetinternal/agent/tool/registry.go✅ Python-typo alias of retrieval
searxnginternal/agent/tool/searxng.go
tavilyinternal/agent/tool/tavily.go
tushareinternal/agent/tool/tushare.go
wencaiinternal/agent/tool/wencai.go
wikipediainternal/agent/tool/wikipedia.go
yahoo_financeinternal/agent/tool/yahoo_finance.go

Total: 49 named entities (24 components + 25 tools).


5. DSL 单一形态

RAGFlow agent DSL 现在只有一种 wire 形态(之前 v1/v2 双轨已删):

{
"globals": {...}, // sys.query / sys.user_id / ...
"graph": { "nodes": [...], "edges": [...] }, // React-Flow 布局
"variables": {...}, // 用户级变量
"components": { "<Name>:<UUID>": { // 执行拓扑
"downstream": [...], "upstream": [...],
"obj": { "component_name": "Name", "params": {...} }
}},
"path": [...], "retrieval": {...}, "history": [...] // 运行时状态
}

单一 wire 的硬性保证

  1. 后端 GET/PUT 收到的 DSL 必定同时含 graph + components。前端 use-build-dsl.ts 在 PUT 时一并填充两个块,back-end 不依赖 graph
  2. Go 端的唯一入口是 dsl.NormalizeForCanvasinternal/handler/agent.go:226internal/service/agent.go:217,273)。所有 Python ↔ Go 路径的 dsl 都在解码边界过一次。
  3. internal/agent/dsl/ 包当前仅 normalize.go + normalize_test.go + testdata/(v1↔v2 转换器与 v2.go/loader.go/converter_v1_to_v2.go/converter_v2_to_v1.gogit rm)。

5.1 NormalizeForCanvas:解码边界的三步流水线

internal/agent/dsl/normalize.goNormalizeForCanvas(dsl map[string]any) map[string]any

  1. enforceHandleIds(dsl) — 把 graph.edges[*].sourceHandle / targetHandle 规约为 React-Flow 约定。
  2. buildGraphFromComponents(components) — 若 graph.nodes 缺失,从 components 派生默认布局。
  3. foldLegacyLoopVariants(dsl) — 把 Loop+LoopItem / Iteration+IterationItem 折叠成单个 Loop / Parallel 节点。

5.2 Loop / Iteration 折叠语义

  • Python 端保留 Loop+LoopItem / Iteration+IterationItem 旧类名(stable server,本次不动)。
  • Go 端 Loop 已经是单节点(internal/agent/component/loop.go),Parallel 已经是单节点。Iteration / IterationItem 仅作为 alias 留在 internal/agent/component/fixture_stubs.go,stub 体内委托给 Parallel factory
  • 前端 Operator 枚举里 Iteration / IterationStart / LoopStart 保留。

5.3 Compile 入口的兼容兜底

canvas.Compile(ctx, c *Canvas, opts...) 接收的 *Canvas 预期已经过 NormalizeForCanvas。如果某条路径直接 unmarshal dsl 后丢给 Compile 而没走 decoder,Compile 入口会 log.Printf 一行 stderr warning。

5.4 7 个 testdata 顶层结构

internal/agent/dsl/testdata/{all, browser, dfx_picture_parser, questions_category, resume, subaget, switch}.json 顶层都是 {globals, graph, variables}graph.nodes / graph.edges 完整)。没有 components 顶层 key。这是 import / export 文件的形态。

5.5 前端 dsl-bridge:单一 import 路径

web/src/pages/agent/utils/dsl-bridge.ts 重写为单一模式:

  • 删除 DSL_MODE / DslMode / if (DSL_MODE === 'v1') / if (DSL_MODE === 'v2') 编译期分支
  • importDsl(rawParsed, isAgent) 单一优先级:raw.graph.nodes 非空 → 用之;否则 fallback 到 empty seed
  • dslToGraph(dsl) 同样只读 dsl.graph.nodes

6. workflowx 扩展 / workflowx Extensions

internal/agent/workflowx/ 提供零侵入 eino 扩展——不修改 eino 源码,只提供外部 helper。

6.1 AddLoopNode[T] — 通用循环节点

API

func AddLoopNode[T any](
ctx context.Context,
wf *compose.Workflow[T, T],
key string,
sub *compose.Workflow[T, T],
shouldQuit LoopCondition[T],
opts ...LoopOption,
) (*compose.WorkflowNode, error)

执行模型(do-while 语义):

  1. 接收 current
  2. 跑一次 sub-workflow 拿 next
  3. shouldQuit(ctx, iteration, current, next)iteration 从 1 开始
  4. 满足 quit → 返回 next;否则 current = next 继续
  5. 必须至少执行一次

实现要点

  • compose.AnyLambda[T, T, struct{}](...) 包裹 invoke + stream 双路径
  • WithLoopMaxIterations(n) 强建议(防意外死循环)
  • WithLoopStream(mode)LoopStreamFinalOnly (默认) / LoopStreamEveryIteration
  • 错误处理:ErrLoopMaxIterationsExceeded / ErrLoopSubGraphInterrupted / ErrLoopResumeStateInvalid / ErrLoopQuitConditionFailed
  • 嵌套子 workflow 走 compose.Runnable[T,T] + sub-checkpoint 通过 loop-owned bridge store

Checkpoint/Resume 合约

  • Invoke path 嵌套 interrupt → 通过 compose.CompositeInterrupt 向上传播;resume 从中断的 iteration 继续(不重头)
  • Stream path 走 iteration-granular 恢复合约:已完整发到下游的 iteration 不重放
  • 稳定 child checkpoint ID 通过 WithLoopCheckpointIDBuilder(nodeKey, iteration);默认 workflowx-loop:<nodeKey>:<iteration> 命名空间

Loop 在 canvas 中的应用

  • Loop 在 Go 端是单节点:registry 注册 + 工厂,但 LoopComponent.Invoke 是 no-op
  • BuildWorkflow 看到名为 Loop 的 cpn 时:调用 expandLoopSubgraph 收集下游、构建 sub-compose.Workflow[map[string]any, map[string]any]、调 workflowx.AddLoopNode 把结果作为单节点插入外图
  • LoopItem / ExitLoop 已删除(v1 compat 通过 legacyNoOpNames 在 canvas 层吸收)

6.2 AddParallelNode[I, O] — 通用并发节点

API

func AddParallelNode[I, O any](
ctx context.Context,
wf *compose.Workflow[[]I, []O],
key string,
sub Compilable[I, O],
opts ...ParallelOption,
) (*compose.WorkflowNode, error)

实现要点

  • 外层 invoke-only;内层 sub workflow 可 stream-capable
  • WithParallelMaxConcurrency(n int):0 / 1 = 顺序执行;> 1 = 信号量并发
  • 顺序保持不变量outputs[i] 永远对应 inputs[i]
  • 错误处理:ErrParallelCompileFailed / ErrParallelResumeStateInvalid;per-item 错误用 fmt.Errorf("item %d: %w", idx, err) 包装
  • 嵌套 interrupt:累积到 compose.CompositeInterrupt(ctx, nil, state, interruptErrs...)
  • 恢复不变量:CompletedResults ∪ InterruptedIndices = 0..TotalCount-1(partition 完整)

Parallel 在 canvas 中的应用

  • Parallel component 走 T4 薄包装:注册时传 agenttool.BuildByName("parallel", params)(实际是 internal/agent/component/parallel.goParallelComponent),内部用 workflowx.AddParallelNode 把 sub-workflow 插入外图

6.3 Canvas parallel batch (eino intrinsic, NOT workflowx parallel)

关键发现:Phase 4.1 "Canvas parallel batch execution" 不需要额外实现 — eino compose.Workflow.Run 本身就在每个 topological wave 内 spawn 一个 go t.execute() per ready node

  • canvas/parallel_batch_test.go::TestBuildWorkflow_ParallelBatchStructure pin 4-node sibling compile
  • canvas/parallel_timing_test.go::TestCanvas_ParallelExecution_StaticAnalysis pin 5-node DAG compile 静态分析

workflowx/parallel.go 仍存在,但仅用于 Parallel component (Loop/Iteration 风格的 array parallel),不是 canvas 层的 ready-node 调度。


7. Checkpoint + Run Tracker / Persistence

7.1 双 key 设计

Key 1:agent:cp:{check_point_id} — eino payload 存储

  • 类型:String (直接存 []byte不走 JSON — eino Serializer 已负责序列化)
  • TTL:30 天,Set 时 EXPIRE 30*24*3600 一次设置
  • eino CheckPointStore纯 KV 接口Get(ctx, id) ([]byte, bool, error) / Set(ctx, id, []byte) error
  • eino 不会自动写入 status / canvas_id / tenant_id / run_id / parent_id / expires_at 等业务字段

Key 2:agent:run:{run_id} — 业务元数据存储 (Redis Hash)

字段类型含义
canvas_idstringuser_canvas.id
tenant_idstring从 user-tenant lookup
checkpoint_idstring当前 run 的最新 checkpoint (指向 key 1)
parent_run_idstringresume_from 源 run (续跑链),可空
statusint (0/1/2/3)0=running 1=succeeded 2=failed 3=cancelled
failure_reasonstring失败原因 (err.Error())
cancel_requestedint (0/1)1=用户/admin 已请求 cancel
started_atint (epoch ms)
finished_atint (epoch ms)退出时填写
  • TTL:30 天 (与 key 1 同步)
  • RunTracker.Start/MarkSucceeded/MarkFailed/MarkCancelled/AttachCheckpoint 显式调用
  • 不依赖 eino 自动写 — cancel/fail 后的 status=failed 由应用层自己写

7.2 4 个 eino payload 写入触发 (写 agent:cp:*)

#触发点eino 源码用途
W1节点显式 compose.Interrupt(ctx, info) / StatefulInterrupt(ctx, info, state)compose/interrupt.gohuman-in-the-loop、外部 API 回调、限流暂停
W2compose.WithInterruptBeforeNodes([]string) / WithInterruptAfterNodes([]string) 编译期拦截点compose/interrupt.go命中后写盘 + 终止 run (与 W1 共用 handleInterrupt 路径);默认开 0 个
W3子 graph interrupt 向上传播subGraphInterruptError嵌套 subgraph / ToolsNode / agentic 抛 interrupt 时,父 graph 同步落盘
W4运行退出WithCheckPointID + WithWriteToCheckPointIDrun 退出时最后一次落盘

7.3 4 个业务元数据写入 + 1 个恢复触发

#触发点写入函数
B1Canvas run 启动RunTracker.Start(runID, canvasID, tenantID, parentRunID)
B2Run 正常完成RunTracker.MarkSucceeded(runID)
B3Run 失败RunTracker.MarkFailed(runID, err.Error())
B4Run 被 cancelRunTracker.MarkCancelled(runID)
B5Compile 成功后RunTracker.AttachCheckpoint(runID, cpID)
R1HTTP POST /run?resume_from=run_xxxhandler: HGetAll("agent:run:run_xxx")checkpoint_idWithCheckPointID(cpID) + WithWriteToCheckPointID(newCP) + RunTracker.Start(newRunID, canvas, tenant, "run_xxx")

7.4 CheckPointStore / StateSerializer 接口设计

internal/agent/canvas/checkpoint_store.go

type CheckPointStore interface {
Get(ctx context.Context, id string) ([]byte, bool, error)
Set(ctx context.Context, id string, data []byte) error
Delete(ctx context.Context, id string) error // 自定义扩展, eino compose.CheckPointStore 无此方法
}

internal/agent/canvas/state_serializer.go

type StateSerializer interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
}

// CanvasStateSerializer — encoding/json
type CanvasStateSerializer struct{}
func (CanvasStateSerializer) Marshal(v any) ([]byte, error) { return json.Marshal(v) }
func (CanvasStateSerializer) Unmarshal(b []byte, v any) error { return json.Unmarshal(b, v) }

internal/agent/canvas/compile.go — 关键修正:

// 注意: 不能用 compose.WithSerializer 覆盖 eino 的 InternalSerializer!
// eino 的 compose.Serializer 同时控制 (a) 用户提供的 state 序列化 AND (b) eino 内部
// graph state 序列化。覆盖会破坏 eino graph 内部 marshal/unmarshal 逻辑。
//
// 正确做法: 仅 wire WithCheckPointStore (custom KV 接口), 让 eino 内部
// InternalSerializer 保留默认值。同时 CanvasState 自带 MarshalJSON 让
// eino StatePre/PostHandler 能序列化 state。
func Compile(ctx context.Context, c *Canvas, opts ...CompileOption) (*CompiledCanvas, error) {
cfg := CompileOptions{}
for _, o := range opts { o(&cfg) }

compileOpts := []compose.GraphCompileOption{
compose.WithCheckPointStore(checkPointAdapter{cfg.Store}), // 适配 Delete
}
// 显式 NOT 调用 compose.WithSerializer
return wf.Compile(ctx, compileOpts...)
}

// checkPointAdapter drops the Delete method that compose.CheckPointStore does not declare.
type checkPointAdapter struct{ inner CheckPointStore }
func (a checkPointAdapter) Get(ctx context.Context, id string) ([]byte, bool, error) {
return a.inner.Get(ctx, id)
}
func (a checkPointAdapter) Set(ctx context.Context, id string, data []byte) error {
return a.inner.Set(ctx, id, data)
}

CompiledCanvas struct

type CompiledCanvas struct {
Workflow compose.Runnable
CheckPointID string // 暂时空字符串; V2.1 从 eino Runnable 表面化
}

7.5 Cancel 协议 (两段式)

// internal/agent/canvas/cancel.go
func Run(ctx context.Context, taskID string, compiled compose.Runnable[...]) error {
einoCtx, interrupt := compose.WithGraphInterrupt(ctx)
defer close(stopCh)

go watchCancel(taskID, func() {
interrupt(compose.WithGraphInterruptTimeout(30 * time.Second))
})

return compiled.Invoke(einoCtx, input,
compose.WithCheckPointID(genID(taskID)),
compose.WithWriteToCheckPointID(genID(taskID)),
)
}

func watchCancel(taskID string, onCancel func()) {
ticker := time.NewTicker(500 * time.Millisecond) // 500ms 轮询
defer ticker.Stop()
for {
select {
case <-stopCh: return
case <-ticker.C:
v, _ := redis.Get(context.Background(), fmt.Sprintf("%s-cancel", taskID))
if v != "" { onCancel(); return }
}
}
}

Python 兼容{task_id}-cancel Redis key 命名与 Python 端 task_service.py 协议完全一致


8. OpenTelemetry 可观测性 / Observability

8.1 总体设计

Canvas run goroutine (Go)

eino Graph Engine
↓ (OnStart / OnEnd / OnError auto-injected)
callbacks.Handler (业务实现)
├─ OTelHandler
│ └─ 开始 span → 注入 attributes → 结束 span
│ └─ otlphttpexporter → OTel Collector (外部)
│ ├─ Jaeger / Tempo (trace UI)
│ ├─ Langfuse (LLM 专门)
│ └─ Prometheus / Grafana
└─ SSEHandler (业务事件流) → admin UI

8.2 双通道分离

通道用途协议消费者
SSE业务事件("node 开始/结束/消息")text/event-stream HTTPadmin UI
OTel span系统可观测性(节点耗时/错误/token)OTLP HTTP运维/APM

8.3 eino callback → OTel 映射

eino 时机OTel 行为Span attribute
OnStart(ctx, info, input)tracer.Start(ctx, info.Name) → 写入 ctxeino.component.name, eino.component.type, eino.input.size
OnEnd(ctx, info, output)span.End()eino.output.size
OnError(ctx, info, err)span.RecordError(err) + span.SetStatus(codes.Error, ...)eino.error.message

8.4 启动配置

export OTEL_EXPORTER_OTLP_ENDPOINT="http://otel-collector:4318"
export OTEL_SERVICE_NAME="ragflow-agent"
export OTEL_RESOURCE_ATTRIBUTES="service.namespace=ragflow,deployment.environment=production"
export OTEL_TRACES_SAMPLER="parentbased_traceidratio"
export OTEL_TRACES_SAMPLER_ARG="0.1" # 10% 采样

降级:未配置 OTEL_EXPORTER_OTLP_ENDPOINT → handler 退化为 noop,不影响业务。


9. 多版本 Agent 管理 / Multi-version Agents

Go 端支持多版本并存永不覆盖):

场景行为
编辑器保存草稿UPDATE user_canvas SET dsl=? WHERE id=? (不创建 version)
点击"发布"INSERT user_canvas_version(...) 新行;UPDATE user_canvas SET release=true, dsl=?, update_at=NOW()
Run 不带 version拉取最新 user_canvas_version (create_time DESC LIMIT 1)
Run ?version=v_xxx拉取指定 user_canvas_version
Run ?version=draft拉取 user_canvas.dsl (编辑器未发布状态)

API 端

  • GET /api/v1/agents/{id}/versions — 列表
  • POST /api/v1/agents/{id}/versions — 显式发布
  • DELETE /api/v1/agents/{id}/versions/{version_id} — 删除
  • POST /api/v1/agents/{id}/run?version=xxx — 指定版本运行

10. 第三方库选型 / Third-party Libraries (License Gate)

10.1 决策结论

用途License备注
PDF 生成signintech/gopdfMIT主选;TTF 字体注册 + CJK + header/footer 内置
DOCX 生成自实现 OOXML writerGo archive/zip stdlib + text/template + //go:embed
Excel 读写xuri/excelize/v2BSD-3无 license 风险
Markdown 解析yuin/goldmarkMITCommonMark 标准
HTML 解析golang.org/x/net/htmlBSD-3stdlib 旁路
OpenTelemetry SDKgo.opentelemetry.io/otel v1.44.0Apache-2.0
MySQL drivergo-sql-driver/mysqlMPL-2.0ExeSQL 走 stdlib database/sql
PG driverlib/pqMIT
MSSQL driverdenisenkom/go-mssqldbBSD-3
Trino drivertrinodb/trino-go-client v0.333.0Apache-2.0ExeSQL Trino dialect
Jinja2 模板nikolalohinski/gonja v1.5.3MITPhase 8a — 直接 import (from indirect)
Test SQL mockDATA-DOG/go-sqlmockMITExeSQL 注入测试

10.2 AGPL-3 零容忍

RAGFlow 是 Apache-2.0;AGPL-3 强传染会让整个 RAGFlow Go 二进制被迫 AGPL-3 化。所有候选 AGPL-3 库 (unipdf / unioffice / fumiama-go-docx / baliance-gooxml) 全部排除

AGPL-3 预筛规则

  • README header 含 "AGPL" 或 "Affero" → 直接拒绝
  • LICENSE 文件首行含 "Affero General Public License" → 拒绝
  • GitHub license badge 显示 AGPL-3.0 / SSPL-1.0 → 拒绝
  • CI 中 go-licenses check 命中 AGPL → 构建失败

11. HTTP 接口 / HTTP API

MethodPathHandler说明
GET/api/v1/agentsListAgents已存在
POST/api/v1/agentsCreateAgent
GET/api/v1/agents/{id}GetAgent
PATCH/api/v1/agents/{id}UpdateAgent
DELETE/api/v1/agents/{id}DeleteAgent级联删除所有 version
POST/api/v1/agents/{id}/runRunAgent同步; ?version=v_xxx 缺省=最新
POST/api/v1/agents/{id}/streamStreamAgentSSE; emits message / waiting_for_user / error / done events
POST/api/v1/agents/{id}/cancelCancelAgent写 Redis cancel key
GET/api/v1/agents/{id}/versionsListVersions
POST/api/v1/agents/{id}/versionsPublishVersion
GET/api/v1/agents/{id}/versions/{vid}GetVersion
DELETE/api/v1/agents/{id}/versions/{vid}DeleteVersion
POST/api/v1/admin/canvas-runtime/:tenant_idAdminRuntime翻转租户 override

SSE 事件 payload

event: message
data: {"answer": "...", "reference": [...]}

event: waiting_for_user
data: {"cpn_id": "node:userfillup_1"}

event: error
data: {"error": "..."}

event: done
data: [DONE]

12. 验收标准 / Acceptance Criteria

类别标准
功能19 component × ≥3 单测 = ≥57 个 component 单测
功能21 tool × ≥2 单测 = ≥42 个 tool 单测
eino 复用T1 组件 (LLM/Agent) 回归:跑 eino 自带 react_test.go / chatmodel_test.go / compose_test.go 不退化
功能{{cpn_id@param}} 任意节点读任意节点, 单测覆盖
功能SSE 事件序列与 Python agent_api.py 一致: message / waiting_for_user / error / done
wait-for-userCanvas 含 UserFillUp 节点 → 首次运行到 UserFillUp 暂停 → SSE waiting_for_user → 用户提交后恢复运行 → 最终输出 message + done 事件
RunAgent e2e4 e2e sub-tests: TestRunAgent_RealCanvas_BeginMessage / _CompileFails / _InvokeFails / _WaitForUserResume
RunTrackerminiredis-backed e2e pinning Start → AttachCheckpoint → MarkSucceeded sequence
TTS dispatchmodel-provider integration wired (audio.NewTTSDispatchFunc)
per-class timeoutExeSQL→3s, TavilySearch→12s, uniform fallback, env override
LLM retryMaxRetries=5 → exactly 6 invoker calls (absolute count)
可靠Redis 取消协议:cancel → 5s 内节点 stop (500ms 轮询下 p99 ≤ 500ms)
可观测性OTel handler P99 overhead < 2% (100 节点)
checkpointRedis RedisCheckPointStore Get/Set/Delete 通过 eino 集成测试
代码质量公共 API 100% godoc 注释;>=80% test coverage on internal/agent/canvas

13. 风险 & 缓解 / Risks

风险严重度缓解
eino State 在高并发下 mutex 竞争Phase 1 末 benchmark;若 > 5% 调度开销,引入分片 mutex
v1 DSL 100% 兼容不可能不兼容的旧 DSL 走"自动转换 + 提示"路径
Tool 外部 HTTP 失败复用 http_helper.go 的 retry
前端 DSL 编辑器只懂 v1Phase 5 维持 v1 写出能力
测试环境无 LLM key所有 LLM 组件测试走 mock provider driver
LLM retry multiplicative stackingretryInvoker.Unwrap() + unwrapChatInvoker 让 MaxRetries = absolute count
CodeExec feature gap vs Python5 sandbox providers 已 ported;docs/develop/sandbox-python-go-diff.md 详细记录 per-provider diff
real TTS binary shape TBDmodel-provider 60+ driver 路由;real binary 由 model provider 决定
real MemorySaver 端口 partialpartial port;user-deferred

14. Future Work

可操作的下一轮跟进项 (按优先级):

  1. Compile LRU cache — LRU 按 (canvasID, versionID, DSL-hash) 缓存编译产物;仅在 profiling 显示 Compile 主导热路径时启动。1-2 周。
  2. Browser Playwright parity — Python browser.py 29.4K vs Go 8.9K,差 3.3×。需要 scope 决策:完整 Playwright 移植 vs 缩减到核心场景。1 周。
  3. ExcelProcessor pandas-fidelity audit — Python 端 15.5K vs Go 当前 happy-path 覆盖。1 天 audit + 修补。
  4. Phase 8b real MemorySaver completion — 端口 internal/service/memory_message_service.go 完整实现。1-2 周,user-deferred。
  5. Phase 5c DB2 support — CGO + github.com/ibmdb/go_ibmdb + native client lib。仅在 e2e 需求浮现时启动。0.5-1 周。
  6. Phase 5d CodeExec 完整对等 — 5 sandbox providers + artifacts/args/timeout/per-language base image 已 ported;file output collection paths, GraphRAG adapter 仍剩余。1-2 周。
  7. Phase 6 gray + Phase 7 cleanup — per-tenant runtime 灰度切换;agent_api.py@deprecated + 兼容 proxy shim。2-4 周。
  8. DSL v3 — 类型化表达式 (编译期校验 {{cpn_id@param}})。
  9. eino 生态对齐AddAgenticModelNode 替换 LLM component; AddRetrieverNode 替换 Retrieval component。
  10. GraphRAG component Go 化 (独立项目排期)。
  11. WebSocket 流支持 (pending demand)。
  12. Checkpoint 增强 — 跨 canvas run 复用、增量 checkpoint (仅写 diff channel)。

Sandbox provider gaps (consolidated from the port diff)

The five Python sandbox providers are ported to Go with functional parity (self_managed, aliyun, local, ssh) and one strict superset (e2b — Go is real, Python is a stub). Admin-panel settings reader lands in ProviderManager.LoadFromSettings (see commit history). The remaining 7 gaps are intentional and tracked here:

  • Aliyun Go SDK gaps (v1.1.0) — ⏸️ blocked on upstream aliyun SDK. Two related gaps to revisit when the SDK catches up: (1) TemplateName not sent on CreateCodeInterpreter (operators must pre-create non-default templates via Python or the aliyun console, then reference by name in metadata); (2) execute uses raw HTTP because the SDK has no execute method (the wire format was reverse-engineered from the Python SDK). Swap to the SDK calls when both APIs land. (1-2 days once the SDK releases; no in-house workaround)
  • LocalProvider rlimits not applied — Go os/exec has no portable pre-start hook; rlimits (RLIMIT_AS/CPU/FSIZE/NOFILE) are not enforced. The Go LocalProvider is not a security boundary — for adversarial code, use SelfManagedProvider (executor_manager + gVisor) or AliyunCodeInterpreterProvider (cloud microVM). This matches the Python note that "local" is "for development / trusted environments". (no fix planned — by design)
  • SSHProvider uses SSH exec, not SFTP — avoids the github.com/pkg/sftp dependency. For workloads with many large artifacts, switch to pkg/sftp if profiling shows exec overhead. (1 day, deferred until profiling shows it matters)
  • Windows build of LocalProvidersyscall.Setpgid is POSIX-only. The Go side is //go:build !windows; the Python side runs on Windows via process.kill(). Tracked; not blocking because RAGFlow production is Linux. (1-2 days, deferred)
  • e2b community Go SDK is a single-maintainer portgithub.com/eric642/e2b-go-sdk v0.1.3 (Apache-2.0). Re-evaluate quarterly; fork to github.com/infiniflow/e2b-go-sdk if maintenance lags. (1 day fork if needed)
  • OTel spans on provider ops — providers are log-free; OTel span propagation is on the HTTP client only (via otelhttp.NewTransport). Providers themselves do not emit OTel spans. (1 day)

15. Operations Guide

15.1 Boot wiring

cmd/server_main.go registers the runtime in three layers:

  1. ProviderManager (internal/agent/sandbox/manager.go) — chooses which sandbox provider backs CodeExec. Default self_managed; override via SANDBOX_PROVIDER_TYPE. Falls back to env-driven init when the admin-panel settings table is empty/malformed.
  2. RetrievalService (internal/agent/tool/retrieval_service.go) — nlp.NewRetrievalService(docEngine, docDAO) and kg.NewRetrieval(...) are wired via tool.SetRetrievalService(...) / tool.SetKGRetrievalService(...) at boot. The first backs use_kg=false; the second backs use_kg=true.
  3. AgentService (internal/service/agent.go) — accepts optional Redis-backed CheckPointStore / StateSerializer / RunTracker via NewAgentServiceWithOptions(...). Boot installs these when Redis is up; otherwise the fields stay nil and the service falls back to in-memory mode (transparent to callers).

Any layer that is not wired at boot produces a loud-fail sentinel (see §15.3) — stubs never silently return empty results.

15.2 Feature flags

Env varDefaultEffect
SANDBOX_PROVIDER_TYPEself_managedOne of self_managed / aliyun_codeinterpreter / e2b / local / ssh
SANDBOX_EXECUTOR_MANAGER_URLhttp://sandbox-executor-manager:9385self-managed endpoint
SANDBOX_EXECUTOR_MANAGER_TIMEOUT30 (s)self-managed per-call timeout
AGENTRUN_* (5 vars)n/aaliyun code interpreter
E2B_API_KEY / E2B_ACCESS_TOKENn/ae2b (one required)
E2B_TEMPLATEbasee2b sandbox template
LOCAL_* (8 vars)n/alocal subprocess
SSH_HOST / SSH_PORT / SSH_USERNAME / SSH_PASSWORD / SSH_PRIVATE_KEY / SSH_PRIVATE_KEY_PATHn/aSSH provider
COMPONENT_EXEC_TIMEOUT600 (s)canvas-level per-invocation timeout; per-class overrides via env-derived map (see canvas/timeout.go)

15.3 Known deferred items (loud-fail sentinels)

SentinelCauseFix
ErrRetrievalServiceMissingtool.SetRetrievalService(...) not called at bootWire nlp.NewRetrievalService at boot (default in cmd/server_main.go)
ErrKGRetrievalServiceMissingCanvas uses use_kg=true and tool.SetKGRetrievalService(...) not calledWire kg.NewRetrieval(...) at boot (default in cmd/server_main.go)
ErrMemoryServiceMissingcomponent.SetMemorySaver(...) not called at bootWire NewMemoryMessageService(...) (default in cmd/server_main.go)
ErrEmbedderNotWiredMemorySaver reached but no embedder configuredPort the embedding model — see §14
ErrSandboxNotConfiguredSANDBOX_PROVIDER_TYPE set to unknown valueSet to one of the 5 supported values
ErrE2BProviderNotImplementedSANDBOX_PROVIDER_TYPE=e2b and no E2B_API_KEY/E2B_ACCESS_TOKENProvide one of the two env vars
ErrTTSEngineNotConfiguredMessage runs with auto_play=true and no audio.SetSynthesizer(...)Wire a TTS engine at boot — see §14
ErrExeSQLUnsupportedDBdb_type is trino or ibm db2Add the driver registration — see §14

15.4 Canvas migration (Python → Go)

tools/migrate-canvas cross-validates Python's normalize_chunker_dsl against Go's NormalizeForCanvas. Manual equivalent until the tool ships:

  1. Export canvas JSON from Python: GET /api/v1/canvas/<id>/export.
  2. Validate Python normalizer: uv run python -c "from agent.canvas import normalize_chunker_dsl; print(normalize_chunker_dsl(json.load(open('canvas.json'))))".
  3. Validate Go normalizer: go test ./internal/agent/dsl/ -run TestNormalize -v (uses fixtures in internal/agent/dsl/testdata/).
  4. Diff the two normalized forms. If structurally identical, the canvas is Go-portable.

15.5 Testing

go test -count=1 ./internal/agent/...           # all agent tests
go test -count=1 ./internal/agent/component/ # component tests
go test -count=1 ./internal/agent/tool/ # tool tests + retrieval + sandbox providers
go test -count=1 ./internal/agent/sandbox/ # 5 sandbox providers + manager
go test -count=1 ./internal/agent/canvas/ # canvas engine, parallel, interrupt/resume
go test -count=1 ./internal/agent/runtime/ # state, template, history window

Fixtures: internal/agent/dsl/testdata/ (7 JSONs) drive the e2e suite and match the input corpus Python's normalize_chunker_dsl accepts.


附录 A · 关键文件 / Key Files

设计点关键文件
State 模式internal/agent/canvas/{state.go, scheduler.go} + internal/agent/runtime/{state.go, context.go, template.go, template_jinja.go}
CanvasState MarshalJSONinternal/agent/runtime/state.go
runtime 提取internal/agent/runtime/*.go (8 文件) + internal/agent/canvas/state_export.go
Loop 宏展开internal/agent/canvas/loop_subgraph.go + internal/agent/component/loop.go (no-op marker)
Parallelinternal/agent/component/parallel.go + internal/agent/workflowx/parallel.go
Loop 通用节点internal/agent/workflowx/loop.go + loop_*_test.go
Interrupt 路径internal/agent/canvas/interrupt_resume.go + internal/agent/canvas/runner.go
Checkpointinternal/agent/canvas/{checkpoint_store.go, run_tracker.go, state_serializer.go, compile.go}
Compile 适配internal/agent/canvas/compile.go (checkPointAdapter)
Per-class timeoutinternal/agent/canvas/timeout.go + node_body.go
Cancel 协议internal/agent/canvas/cancel.go
OTelinternal/observability/otel/{provider.go, handler.go, handler_test.go}
DSL normalizeinternal/agent/dsl/{normalize.go, normalize_test.go} + testdata/
Tool registryinternal/agent/tool/{registry.go, http_helper.go, ssrf.go, mcp.go, retrieval*.go}
Component 5-tierinternal/agent/component/{base.go, registry.go, runtime_wire.go, fixture_stubs.go, universe_a_wrappers.go} + 19 component .go
AgentService V2internal/service/agent.go (buildRunFunc) + internal/service/canvas_decode.go + internal/service/agent_run_e2e_test.go
Sandbox providersinternal/agent/sandbox/{self_managed.go, aliyun.go, e2b.go, local.go, ssh.go, manager.go} + tool/sandbox_bridge.go
TTS dispatchinternal/agent/audio/{tts.go, tts_dispatch.go, model_provider_synthesizer.go}

附录 B · 测试覆盖 / Test Coverage

测试文件数覆盖点
internal/agent/canvas17canvas_test.go, scheduler_test.go, state_test.go, variable_test.go, state_bench_test.go, state_serializer_test.go, checkpoint_store_test.go, run_tracker_test.go, cancel_test.go, stream_test.go, loop_subgraph_test.go, loop_semantics_test.go, dsl_examples_e2e_test.go, interrupt_resume_test.go, multibranch_test.go, node_body_timeout_test.go, node_body_per_class_timeout_integration_test.go, parallel_batch_test.go, parallel_timing_test.go
internal/agent/component50+各 component _test.go + verify_p1_test.go + production_chain_fixes_test.go
internal/agent/tool30+各 tool _test.go + registry_test.go + retrieval_nlp_test.go + retrieval_kg_test.go + exesql_trino_test.go + exesql_unsupported_test.go + http_helper_test.go + ssrf_test.go + mcp_test.go
internal/agent/runtime4metrics_test.go, selector_test.go, state_test.go, template_jinja_test.go
internal/agent/workflowx8loop_test.go, loop_options_test.go, loop_integration_test.go, loop_example_test.go, parallel_test.go, parallel_options_test.go, parallel_integration_test.go, parallel_helpers_test.go
internal/agent/dsl1normalize_test.go
internal/agent/audio3model_provider_synthesizer_test.go, tts_dispatch_test.go, tts_test.go
internal/agent/sandbox6e2b_test.go, local_test.go, manager_test.go, result_protocol_test.go, self_managed_test.go, ssh_test.go
internal/observability/otel1handler_test.go (tracetest.SpanRecorder)
internal/service8+canvas_decode_test.go, agent_run_e2e_test.go, agent_test.go, agent_sessions_test.go, chat_session_test.go, ...
internal/handler10+agent_test.go, agent_wait_for_user_test.go, admin_runtime_test.go, ...

附录 C · Deepdoc Service Endpoints (DLA/OCR/TSR)

C.1 Endpoint summary

EndpointURLStatusGo port need
DLA (Document Layout Analysis)POST {DEEPDOC_URL}/predictRemote HTTP (via dla_cli.py)Go client with 3-retry + 18s timeout
OCRNo remote endpointLocal ONNX onlyNone — ErrNotImplemented stub
TSR (Table Structure Recognition)No remote endpointLocal ONNX onlyNone — ErrNotImplemented stub

Single toggle: DEEPDOC_URL (preferred) or TENSORRT_DLA_SVR (legacy).

C.2 DLA HTTP contract

  • Method: POST {DEEPDOC_URL}/predict
  • Body: multipart/form-data, field name request, raw JPEG bytes
  • Response: {"bboxes": [[left, top, right, bottom, score, type_idx], ...]}
  • Timeout: 18s per request; 3 retries per image
  • Failure sentinel: empty list []

附录 D · DSL v1 Corner Cases Inventory

D.1 Top-level DSL shape

{
"components": {
"<cpn_id>": {
"obj": {"component_name": "Retrieval", "params": {...}},
"downstream": ["generate_0"],
"upstream": ["answer_0"]
}
},
"path": ["begin"],
"history": [],
"retrieval": {"chunks": [], "doc_aggs": []},
"globals": {"sys.query": "", "sys.user_id": "...", "sys.conversation_turns": 0,
"sys.files": [], "sys.history": [], "sys.date": "..."},
"variables": {},
"memory": []
}

D.2 Variable reference syntax

Two regexes:

variable_ref_patt    = r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*"
iteration_alias_patt = r"\{* *\{(item|index|result)\} *\}*"

Key behaviors the Go port must mirror:

  • Brace tolerance: {{var}}, {{ var }}, {{{var}}} are all valid
  • sys.*/env.*: namespace-only (no @), read from State flat namespace
  • cpn_id@param.nested.path: dot-path traversal with json.loads on strings, dict.get, list[int] index
  • Empty {{...}}: resolves to "", never crashes
  • is_reff: returns True only if cpn_id@param resolves to a known component

D.3 Component-name case-insensitivity

All comparisons use .lower(). Stored cpn_ids may be any case. Go port must NOT key component map by case-sensitive cpn_id.

附录 E · Component & Tool Interface Inventory

E.1 Component inventory (22 → 19 active)

#ComponentFilecomponent_nameTierKey behavior
1Beginbegin.pyBeginT3Consumes kwargs["inputs"], resolves file inputs via FileService
2UserFillUpfillup.pyUserFillUpT3Renders tips with variable interpolation; eino interrupt
3Fillup(alias)FillupT3Thin alias of UserFillUp (disable enable_tips)
4Messagemessage.pyMessageT3jinja2 prompt + stream + TTS + filegen + memory save
5LLMllm.pyLLMT1Sync + async paths; chatModel.Generate / Stream; structured JSON output
6Categorizecategorize.pyCategorizeT3LLM one-shot classification → _next (routing list) + category_name
7Switchswitch.pySwitchT212 operators; _next = matching downstream(s)
8Agentagent_with_tools.pyAgentT1ReAct loop with LLMBundle + tool binding + citations
9Iterationiteration.pyIterationT4Compat stub → Parallel (Go)
10IterationItemiterationitem.pyIterationItemT4Compat stub
11Looploop.pyLoopT4workflowx.AddLoopNode (Go)
12LoopItemloopitem.pyLoopItem(none)Engine-handled, not registered
13ExitLoopexit_loop.pyExitLoop(none)legacyNoOpNames (Go)
14Invokeinvoke.pyInvokeT3HTTP GET/POST/PUT/PATCH/DELETE + headers/proxy/timeout
15Browserbrowser.pyBrowserT3LLM-driven browsing
16DataOperationsdata_operations.pyDataOperationsT37 ops: select_keys/literal_eval/combine/filter/append_or_update/remove/rename
17ListOperationslist_operations.pyListOperationsT36 ops: nth/head/tail/filter/sort/drop_duplicates
18StringTransformstring_transform.pyStringTransformT3split/merge/jinja2 template ops
19VariableAggregatorvariable_aggregator.pyVariableAggregatorT3Returns first non-empty in each variable group
20VariableAssignervariable_assigner.pyVariableAssignerT311 ops
21DocsGeneratordocs_generator.pyDocGeneratorT5MD → PDF/DOCX/TXT/MD/HTML
22ExcelProcessorexcel_processor.pyExcelProcessorT5pandas read/write/merge/convert

E.2 Tool inventory (21)

All tools extend ToolBase, expose get_meta() (OpenAI function-call schema), _invoke/_invoke_async.

#Toolcomponent_nameBehavior
1AkShareAkShareChinese financial data (HTTP)
2ArXivArXivexport.arxiv.org/api/query search
3CodeExecCodeExecgRPC client to Python sandbox; 5 sandbox providers in internal/agent/sandbox/
4CrawlerCrawlerGeneric HTML scraper
5DeepLDeepLDeepL Translate API (HTTP)
6DuckDuckGoDuckDuckGohtml.duckduckgo.com/html search
7EmailEmailSMTP send via smtplib
8ExeSQLExeSQLMySQL/PG/MSSQL/Trino/OceanBase via stdlib database/sql
9GitHubGitHubGitHub REST API search
10GoogleGoogleSerpAPI / Google CSE search
11GoogleScholarGoogleScholarScholar via SerpAPI
12Jin10Jin10Chinese financial news feed
13PubMedPubMedNCBI E-utilities
14QWeatherQWeatherHeFeng weather API
15RetrievalRetrievalnlp.Dealer + kg.Retrieval (Go dual-registry)
16SearXNGSearXNGMeta-search
17TavilySearchTavilySearchTavily search API
18TavilyExtractTavilyExtractTavily extract API
19TuShareTuShareTushare Chinese financial data
20WenCaiWenCai同花顺 问财 stock Q&A
21WikipediaWikipediaWikipedia REST API
22YahooFinanceYahooFinanceYahoo Finance unofficial API
MCP(server_id)MCPToolAdapter over streamable-HTTP

附录 F · Open Questions (actionable)

IDQuestionActionEffort
OQ #1Iteration semantic preservation✅ Done — engine design
OQ #2MCP tool priority✅ Done — thin wrapper
OQ #3DSL normalization✅ Done — Go-side + tools/migrate-canvas built
OQ #4History window behavior✅ Done — canvas-level session
OQ #5Citation injection scope✅ Done — LLM + Agent
OQ #6Component timeout granularity✅ Done — per-class table is a Go enhancement over Python's uniform 600s
OQ #7Universe A/B naming asymmetry✅ Done — keep dual-naming convention
OQ #8GraphRAG scope✅ Done — KGRetrievalAdapter wired
OQ #9generate legacy alias⏸️ Deferred
OQ #10Phase 5a vs 5b ordering✅ Done — single Retrieval milestone
OQ #11Per-component env-driven timeout✅ Done — canvas-level uniform 600s
OQ #12Embedding model port✅ Done — model provider architecture
OQ #13Switch operator coverage✅ Done — 12/12
OQ #14Universe A SearchMyDataset alias✅ Done — 4 spellings
OQ #15LLM max_retries / delay_after_error✅ Done — retryInvoker.Unwrap() normal-absolute-count
OQ #16Phase 4.4 orchestrator side✅ Done — Runner.Run catches interrupt
OQ #17Phase 5d CodeExec full feature parity⏸️ Partial — 5 providers + artifacts/args/timeout/per-language base image done; GraphRAG adapter remains1-2 weeks
OQ #18Phase 8b real TTS engine✅ Done — dispatcher routes through 60+ model drivers, no shell-out needed
OQ #19Phase 8b real MemorySaver completion⏸️ Open1-2 weeks
OQ #20Phase 5c DB2 e2e demand⏸️ Open (CGO + native lib)0.5-1 week if needed
OQ #21Compile LRU cache⏸️ Open — defer until profiling1-2 weeks
OQ #22Phase 6 component hardening⏸️ Open — Browser Playwright parity + ExcelProcessor audit1-2 weeks
OQ #23tools/gen-component-parity script✅ Done

Last verified: 2026-06-17

On this page