TanKnimeCTL

首先介绍一下KnimeTANGO,Knime是一款开源的数据分析和可视化软件,TANGO是一种分布式控制系统,更具体的介绍请查看两者的官网和源码仓库。

TanKnimeCTL 即是Knime节点又是TANGO设备服务,通过该节点可以在TANGO中控制Knime工作流的启停,查询工作流的状态和连接信息等。

项目设计的架构图如下所示:

TanKnimeCTL 节点通过TANGO protocol与控制系统其它服务进行通信,通过Knime节点与Knime工作流进行通信。主要依赖于jtango和knime-sdk的相关库,knime-sdk相关的库主要使用workflowManager、nodeContext、nodeContainer部分暴露的API来操作Knime工作流。

功能概述

TanKnimeCTL 这一设备服务提供了五种Attribute和十五种Command:

版本更新说明

相比于上一个版本,主要增加一个command和修补了一些潜在bug:

  1. 增加bbWriteExt这个命令,写入黑板的同时会将黑板以流变量的形式发出,注意使用该命令会导致控制节点重置并重新执行
  2. 删除某些命令中desc的中文字符
  3. 解决knime view无法正常显示的问题(jacorb配置了java.awt.headless

Attributes

workflowState会展示工作流中的节点状态信息,对于component和metanode会嵌套显示其内部节点信息。json中的节点ID均为其父工作流下的ID而非全局ID。

需要注意的是在构建工作流时,如果对component或者metanode的内部节点有改动,例如增加新内部节点,或者在内部嵌套创建component或者metanode,请及时保存工作流以便节点获取最新状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
[Attribute]
name: workflowState
type: String
desc: 以JSON的形式展示工作流和其中各节点的状态
example:
{
"workflowState" : {
"name" : "RELEASE",
"state" : "CONFIGURED",
"ID" : "4"
},
"nodeState" : {
"11" : {
"name" : "Component",
"state" : "CONFIGURED",
"subNodes" : {
"1" : {
"name" : "Component Input",
"state" : "CONFIGURED",
"message" : "",
"type" : "nativeNode"
},
"2" : {
"name" : "Component Output",
"state" : "CONFIGURED",
"message" : "",
"type" : "nativeNode"
},
"9" : {
"name" : "Histogram (JavaScript)",
"state" : "CONFIGURED",
"message" : "",
"type" : "nativeNode"
},
"10" : {
"name" : "Data Generator",
"state" : "CONFIGURED",
"message" : "",
"type" : "nativeNode"
}
},
"type" : "componentNode",
"message" : ""
},
"3" : {
"name" : "Data Generator",
"state" : "EXECUTED",
"type" : "nativeNode",
"message" : ""
},
"14" : {
"name" : "Metanode",
"state" : "CONFIGURED",
"subNodes" : {
"12" : {
"name" : "Histogram (JavaScript)",
"state" : "CONFIGURED",
"message" : "",
"type" : "nativeNode"
},
"13" : {
"name" : "Data Generator",
"state" : "CONFIGURED",
"message" : "",
"type" : "nativeNode"
}
},
"type" : "metaNode",
"message" : ""
},
"15" : {
"name" : "TangoControlNode",
"state" : "CONFIGURED",
"type" : "nativeNode",
"message" : ""
},
"7" : {
"name" : "Histogram (JavaScript)",
"state" : "CONFIGURED",
"type" : "nativeNode",
"message" : ""
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[Attribute]
name: workflowTopoGraph
type: String
desc: 以JSON的形式展示工作流的拓扑图(连接关系),标识每个节点的出边
example:
{
"workflowTopoGraph" : {
"11" : {
"ownConnections" : [ ],
"subConnections" : {
"1" : [ ],
"2" : [ ],
"9" : [ ],
"10" : [ {
"destNodeId" : "9",
"sourcePort" : 1,
"destPort" : 1
} ]
}
},
"3" : [ {
"destNodeId" : "7",
"sourcePort" : 1,
"destPort" : 1
} ],
"14" : {
"ownConnections" : [ ],
"subConnections" : {
"12" : [ ],
"13" : [ {
"destNodeId" : "12",
"sourcePort" : 1,
"destPort" : 1
} ]
}
},
"15" : [ ],
"7" : [ ]
}
}
1
2
3
4
[Attribute]
name: workflowId
type: String
desc: 工作流的ID,例如:4
1
2
3
4
[Attribute]
name: workflowName
type: String
desc: 工作流的名字,例如:RELEASE
1
2
3
4
5
6
7
8
9
[Attribute]
name: experimentKey
type: String
desc: 传入参数
example: {
"experiment_no": "20220101080000",
"experiment_stage": "A",
"stage_run_no": 1
}

Commands

1
2
3
4
5
[Command]
name: executeAll
inArgs: null
desc: 执行工作流中所有的节点,已执行或配置错误的节点不会执行
note: 如果当前工作流没有可执行的节点会抛出异常
1
2
3
4
5
[Command]
name: executeUptoHere
inArgs: String (NodeId,例如: "14""14:12"
desc: 执行指定节点和其所有前缀节点
note: 如果节点已经被执行了或者其处于不可执行的状态或者节点不存在会抛出异常
1
2
3
4
5
[Command]
name: resetAll
inArgs: null
desc: 重置所有节点
note: 如果当前工作流没有可重置的节点会抛出异常
1
2
3
4
5
[Command]
name: resetNodeAndSuccessors
inArgs: String (NodeId,例如: "14""14:12"
desc: 重置指定节点和其所有后缀节点
note: 如果节点处于不可重置的状态或者节点不存在会抛出异常
1
2
3
4
5
[Command]
name: stopExecution
inArgs: null
desc: 暂停工作流的执行,没有执行完的节点会变为CONFIGURED状态
note: 如果所有节点均没有在运行中会抛出异常
1
2
3
4
5
[Command]
name: nodeReset
inArgs: String (NodeId,例如: "14""14:12"
desc: 重置指定节点
note: 如果后缀节点还有没有重置的,则会抛出异常
1
2
3
4
5
[Command]
name: nodeStart
inArgs: String (NodeId,例如: "14""14:12"
desc: 启动指定节点
note: 如果前缀节点还有没有执行完成的,则会抛出异常
1
2
3
4
5
6
[Command]
name: nodeState
inArgs: String (NodeId,例如: "14""14:12"
outArgs: short (例如READY状态,则为0)
desc: 获取节点状态
note: 如果节点不存在则会抛出异常
1
2
3
4
5
6
[Command]
name: nodeStatus
inArgs: String (NodeId,例如: "14""14:12"
outArgs: Status, if failure return error string
desc: 当节点存在错误时可以通过该command获取错误信息
note: 如果节点不存在则会抛出异常
1
2
3
4
5
[Command]
name: nodeStop
inArgs: String (NodeId,例如: "14""14:12"
desc: 暂停指定节点的执行
note: 如果节点没有处于运行中或者节点不存在会抛出异常
1
2
3
4
5
[Command]
name: setExperimentKey
inArgs: String[] ([0] experiment_no [1] experiment_stage [2] stage_run_no)
desc: 暂停指定节点的执行
note: 如果节点没有处于运行中或者节点不存在会抛出异常
1
2
3
4
5
[Command]
name: bbRead
inArgs: 黑板的键Key
outArgs: 黑板的值Value
desc: 读取黑板缓存
1
2
3
4
[Command]
name: bbWrite
inArgs: json String
desc: 写黑板缓存
1
2
3
4
5
[Command]
name: nodeTransfer
inArgs: String[] ([0] target node id [1] the json string you want to transfer)
desc: 传递给指定节点流变量
note: 目标节点会被reset
1
2
3
4
[Command]
name: bbWriteExt
inArgs: json String
desc: 写入黑板的同时会将黑板以流变量的形式发给所有后续节点,注意使用该命令会导致控制节点重置并重新执行

工作流状态与TanKnimeCTL的状态映射:

1
2
3
4
5
WorkFlow State         TanKnimeCTL State
IDLE ------> FAULT
CONFIGURED ------> STANDBY
EXECUTED ------> ON
EXECUTING ------> RUNNING

使用说明

1. 向Knime导入TanKnimeCTL节点

  • 解压org.knime.ww.tangocontrolnode.update.7z到任意目录
  • 打开Knime,选择左上角的File->Preferences->Install/Update->Available Software Sites
  • 点击Add…
  • 任意取一个名字,点击Local选择刚才解压的文件夹,随后Apply
  • 点击File-> Install Knime Extensions
  • 选择tanknimectl对应的节点导入并重启knime即可

2. 设备服务注册

在工作流中创建TangoControlNode节点之前需要在TANGO中注册相应的设备服务。以工作流test_wf为例,在Jive中注册时,其三段式为:

1
2
3
Server:TanKnimeCTL/test_wf
Class:TanKnimeCTL
Devices:test_wf/TanKnimeCTL/1

3. 具体使用方式

在为工作流命好名并在Jive中注册相应的TANGO设备服务后,即可开始工作流的编写,拖拽右侧节点仓库中的TangoControlNode到流程图中即可,相应的状态信息和连接信息会自动更新,在重启Knime或者相应工作流时,对应的设备服务会自动启动并读取工作流状态,无需用户进行任何配置操作。