v2.0.0
Loading...
Searching...
No Matches
mna_graph_executor.cpp
Go to the documentation of this file.
1//=============================================================================================================
34
35//=============================================================================================================
36// INCLUDES
37//=============================================================================================================
38
39#include "mna_graph_executor.h"
40#include "mna_graph.h"
41#include "mna_op_registry.h"
42
43#include <QDir>
44#include <QTemporaryFile>
45#ifndef WASMBUILD
46#include <QProcess>
47#endif
48
49//=============================================================================================================
50// USED NAMESPACES
51//=============================================================================================================
52
53using namespace MNALIB;
54
55//=============================================================================================================
56// STATIC INITIALIZATION
57//=============================================================================================================
58
59MnaGraphExecutor::ProgressCallback MnaGraphExecutor::s_progressCallback;
60
61//=============================================================================================================
62// DEFINE MEMBER METHODS
63//=============================================================================================================
64
66 const QVariantMap& graphInputs)
67{
68 Context ctx;
69 ctx.graphInputs = graphInputs;
70
71 // Populate context with graph-level inputs keyed as "graph::portName"
72 for (auto it = graphInputs.constBegin(); it != graphInputs.constEnd(); ++it) {
73 ctx.results.insert(QStringLiteral("graph::") + it.key(), it.value());
74 }
75
76 // Evaluate parameter tree bindings before execution
77 graph.paramTree.evaluate(ctx.results);
78
79 // Apply current parameter tree values to node attributes
80 for (MnaNode& n : graph.nodes()) {
81 for (const QString& path : graph.paramTree.allPaths()) {
82 // Path format: "nodeId/attrKey"
83 int sep = path.indexOf(QLatin1Char('/'));
84 if (sep > 0) {
85 QString nodeId = path.left(sep);
86 QString attrKey = path.mid(sep + 1);
87 if (nodeId == n.id) {
88 n.attributes.insert(attrKey, graph.paramTree.param(path));
89 }
90 }
91 }
92 }
93
94 const QStringList order = graph.topologicalSort();
95 const int total = order.size();
96
97 for (int i = 0; i < total; ++i) {
98 const QString& nodeId = order[i];
99
100 if (s_progressCallback) {
101 s_progressCallback(nodeId, i + 1, total);
102 }
103
104 MnaNode& n = graph.node(nodeId);
105
106 // Gather inputs from upstream results
107 QVariantMap inputs;
108 for (const MnaPort& p : n.inputs) {
109 if (!p.sourceNodeId.isEmpty()) {
110 QString key = p.sourceNodeId + QStringLiteral("::") + p.sourcePortName;
111 inputs.insert(p.name, ctx.results.value(key));
112 }
113 }
114
115 // Execute the node
116 QVariantMap outputs = executeNode(n, inputs);
117
118 // Store outputs in context
119 for (auto it = outputs.constBegin(); it != outputs.constEnd(); ++it) {
120 ctx.results.insert(nodeId + QStringLiteral("::") + it.key(), it.value());
121 }
122
123 n.dirty = false;
124 n.executedAt = QDateTime::currentDateTimeUtc();
125 }
126
127 // Re-evaluate parameter tree after execution (for on_change bindings)
128 graph.paramTree.evaluate(ctx.results);
129
130 return ctx;
131}
132
133//=============================================================================================================
134
136 Context& existing)
137{
138 // Find dirty nodes and all their downstream dependents
139 QStringList dirty = graph.dirtyNodes();
140 QSet<QString> toExecute;
141 for (const QString& nodeId : dirty) {
142 toExecute.insert(nodeId);
143 const QStringList downstream = graph.downstreamNodes(nodeId);
144 for (const QString& d : downstream) {
145 toExecute.insert(d);
146 }
147 }
148
149 // Get topological order, filter to only those that need execution
150 const QStringList fullOrder = graph.topologicalSort();
151 QStringList order;
152 for (const QString& nodeId : fullOrder) {
153 if (toExecute.contains(nodeId)) {
154 order.append(nodeId);
155 }
156 }
157
158 const int total = order.size();
159
160 for (int i = 0; i < total; ++i) {
161 const QString& nodeId = order[i];
162
163 if (s_progressCallback) {
164 s_progressCallback(nodeId, i + 1, total);
165 }
166
167 MnaNode& n = graph.node(nodeId);
168
169 QVariantMap inputs;
170 for (const MnaPort& p : n.inputs) {
171 if (!p.sourceNodeId.isEmpty()) {
172 QString key = p.sourceNodeId + QStringLiteral("::") + p.sourcePortName;
173 inputs.insert(p.name, existing.results.value(key));
174 }
175 }
176
177 QVariantMap outputs = executeNode(n, inputs);
178
179 for (auto it = outputs.constBegin(); it != outputs.constEnd(); ++it) {
180 existing.results.insert(nodeId + QStringLiteral("::") + it.key(), it.value());
181 }
182
183 n.dirty = false;
184 n.executedAt = QDateTime::currentDateTimeUtc();
185 }
186
187 graph.paramTree.evaluate(existing.results);
188
189 return existing;
190}
191
192//=============================================================================================================
193
195 const QVariantMap& inputs)
196{
197 // Script execution — inline code via interpreter
198 if (node.execMode == MnaNodeExecMode::Script) {
199#ifdef WASMBUILD
200 QVariantMap outputs;
201 outputs.insert(QStringLiteral("stderr"), QStringLiteral("Script execution not supported in WebAssembly build (QProcess unavailable)"));
202 outputs.insert(QStringLiteral("exit_code"), -1);
203 return outputs;
204#else
205 const MnaScript& script = node.script;
206
207 // Determine file extension from language
208 QString ext = QStringLiteral(".txt");
209 if (script.language == QLatin1String("python")) ext = QStringLiteral(".py");
210 else if (script.language == QLatin1String("shell")) ext = QStringLiteral(".sh");
211 else if (script.language == QLatin1String("r")) ext = QStringLiteral(".R");
212 else if (script.language == QLatin1String("matlab")) ext = QStringLiteral(".m");
213 else if (script.language == QLatin1String("octave")) ext = QStringLiteral(".m");
214 else if (script.language == QLatin1String("julia")) ext = QStringLiteral(".jl");
215
216 // Substitute {{placeholder}} tokens in the code
217 QString code = script.code;
218 for (auto it = inputs.constBegin(); it != inputs.constEnd(); ++it) {
219 code.replace(QStringLiteral("{{") + it.key() + QStringLiteral("}}"),
220 it.value().toString());
221 }
222 for (auto it = node.attributes.constBegin(); it != node.attributes.constEnd(); ++it) {
223 code.replace(QStringLiteral("{{") + it.key() + QStringLiteral("}}"),
224 it.value().toString());
225 }
226
227 // Write code to temporary file
228 QTemporaryFile tempFile(QDir::tempPath() + QStringLiteral("/mna_script_XXXXXX") + ext);
229 tempFile.setAutoRemove(!script.keepTempFile);
230 if (!tempFile.open()) {
231 QVariantMap outputs;
232 outputs.insert(QStringLiteral("stderr"), QStringLiteral("Failed to create temporary script file"));
233 outputs.insert(QStringLiteral("exit_code"), -1);
234 return outputs;
235 }
236 tempFile.write(code.toUtf8());
237 tempFile.close();
238
239 // Determine interpreter
240 QString interpreter = script.interpreter;
241 if (interpreter.isEmpty()) {
242 if (script.language == QLatin1String("python")) interpreter = QStringLiteral("python3");
243 else if (script.language == QLatin1String("shell")) interpreter = QStringLiteral("/bin/bash");
244 else if (script.language == QLatin1String("r")) interpreter = QStringLiteral("Rscript");
245 else if (script.language == QLatin1String("matlab")) interpreter = QStringLiteral("matlab");
246 else if (script.language == QLatin1String("octave")) interpreter = QStringLiteral("octave");
247 else if (script.language == QLatin1String("julia")) interpreter = QStringLiteral("julia");
248 }
249
250 QStringList args = script.interpreterArgs;
251 args.append(tempFile.fileName());
252
253 QProcess process;
254 process.start(interpreter, args);
255 process.waitForFinished(-1);
256
257 QVariantMap outputs;
258 outputs.insert(QStringLiteral("stdout"), QString::fromUtf8(process.readAllStandardOutput()));
259 outputs.insert(QStringLiteral("stderr"), QString::fromUtf8(process.readAllStandardError()));
260 outputs.insert(QStringLiteral("exit_code"), process.exitCode());
261
262 return outputs;
263#endif
264 }
265
266 // IPC execution
267 if (node.execMode == MnaNodeExecMode::Ipc) {
268#ifdef WASMBUILD
269 QVariantMap outputs;
270 outputs.insert(QStringLiteral("stderr"), QStringLiteral("IPC execution not supported in WebAssembly build (QProcess unavailable)"));
271 outputs.insert(QStringLiteral("exit_code"), -1);
272 return outputs;
273#else
274 QProcess process;
275 if (!node.ipcWorkDir.isEmpty()) {
276 process.setWorkingDirectory(node.ipcWorkDir);
277 }
278
279 // Substitute {{placeholder}} tokens in arguments
280 QStringList resolvedArgs;
281 for (const QString& arg : node.ipcArgs) {
282 QString resolved = arg;
283 for (auto it = inputs.constBegin(); it != inputs.constEnd(); ++it) {
284 resolved.replace(QStringLiteral("{{") + it.key() + QStringLiteral("}}"),
285 it.value().toString());
286 }
287 // Also substitute from attributes
288 for (auto it = node.attributes.constBegin(); it != node.attributes.constEnd(); ++it) {
289 resolved.replace(QStringLiteral("{{") + it.key() + QStringLiteral("}}"),
290 it.value().toString());
291 }
292 resolvedArgs.append(resolved);
293 }
294
295 process.start(node.ipcCommand, resolvedArgs);
296 process.waitForFinished(-1);
297
298 QVariantMap outputs;
299 outputs.insert(QStringLiteral("stdout"), QString::fromUtf8(process.readAllStandardOutput()));
300 outputs.insert(QStringLiteral("stderr"), QString::fromUtf8(process.readAllStandardError()));
301 outputs.insert(QStringLiteral("exit_code"), process.exitCode());
302
303 // Populate outputs from cached results if specified
304 for (const MnaPort& p : node.outputs) {
305 if (!p.cachedResultPath.isEmpty()) {
306 outputs.insert(p.name, p.cachedResultPath);
307 }
308 }
309
310 return outputs;
311#endif
312 }
313
314 // Look up registered op function
315 const MnaOpRegistry& registry = MnaOpRegistry::instance();
316 MnaOpRegistry::OpFunc func = registry.opFunc(node.opType);
317
318 if (func) {
319 return func(inputs, node.attributes);
320 }
321
322 // No implementation registered — return empty
323 return {};
324}
325
326//=============================================================================================================
327
329{
330 s_progressCallback = cb;
331}
332
333//=============================================================================================================
334// Stream-mode execution
335//=============================================================================================================
336
338 PluginFactory factory)
339{
340 StreamContext ctx;
341 ctx.graph = &graph;
342
343 // 1. Validate the graph
344 QStringList errors;
345 if (!graph.validate(&errors)) {
346 qWarning() << "MnaGraphExecutor::startStream - graph validation failed:" << errors;
347 return ctx;
348 }
349
350 // 2. Topological sort
351 ctx.executionOrder = graph.topologicalSort();
352
353 // 3. Apply current parameter tree values to node attributes
354 for (MnaNode& n : graph.nodes()) {
355 for (const QString& path : graph.paramTree.allPaths()) {
356 int sep = path.indexOf(QLatin1Char('/'));
357 if (sep > 0) {
358 QString nodeId = path.left(sep);
359 QString attrKey = path.mid(sep + 1);
360 if (nodeId == n.id) {
361 n.attributes.insert(attrKey, graph.paramTree.param(path));
362 }
363 }
364 }
365 }
366
367 // 4. Instantiate live plugins via factory
368 for (const QString& nodeId : ctx.executionOrder) {
369 const MnaNode& n = graph.node(nodeId);
370 QObject* plugin = factory(n.opType);
371 if (!plugin) {
372 qWarning() << "MnaGraphExecutor::startStream - factory returned nullptr for opType:" << n.opType;
373 // Clean up already-created plugins
374 for (QObject* p : ctx.livePlugins) {
375 delete p;
376 }
377 ctx.livePlugins.clear();
378 return ctx;
379 }
380 ctx.livePlugins.insert(nodeId, plugin);
381 }
382
383 // 5. Wiring: the host application is responsible for connecting
384 // Qt signals/slots between the QObject* instances based on
385 // the port connections encoded in each node's input ports.
386 // The mna library provides the graph topology; the host app
387 // knows the concrete signal/slot signatures.
388
389 ctx.running = true;
390 return ctx;
391}
392
393//=============================================================================================================
394
396{
397 if (!ctx.running) {
398 return;
399 }
400
401 ctx.running = false;
402
403 // Stop in reverse topological order
404 for (int i = ctx.executionOrder.size() - 1; i >= 0; --i) {
405 const QString& nodeId = ctx.executionOrder[i];
406 QObject* plugin = ctx.livePlugins.value(nodeId);
407 if (plugin) {
408 delete plugin;
409 }
410 }
411
412 ctx.livePlugins.clear();
413 ctx.executionOrder.clear();
414 ctx.graph = nullptr;
415}
MnaOpRegistry class declaration — singleton catalog of operation schemas.
MnaGraph class declaration — directed acyclic graph of processing nodes.
MnaGraphExecutor class declaration — executes a computational graph.
MNE Analysis Container Format (mna/mnx).
@ Ipc
Delegates to an external process via inter-process communication.
Definition mna_types.h:126
@ Script
Inline code executed via interpreter (Python, shell, R, …).
Definition mna_types.h:127
MNA computational graph.
Definition mna_graph.h:71
MnaNode & node(const QString &nodeId)
Definition mna_graph.cpp:84
bool validate(QStringList *errors=nullptr) const
QStringList dirtyNodes() const
QStringList downstreamNodes(const QString &nodeId) const
MnaParamTree paramTree
Hierarchical parameter store with formula-driven bindings.
Definition mna_graph.h:98
QList< MnaNode > & nodes()
QStringList topologicalSort() const
static void setProgressCallback(ProgressCallback cb)
static Context executeIncremental(MnaGraph &graph, Context &existing)
static StreamContext startStream(MnaGraph &graph, PluginFactory factory)
static QVariantMap executeNode(const MnaNode &node, const QVariantMap &inputs)
std::function< QObject *(const QString &opType)> PluginFactory
static void stopStream(StreamContext &ctx)
static Context execute(MnaGraph &graph, const QVariantMap &graphInputs)
std::function< void(const QString &nodeId, int current, int total)> ProgressCallback
Progress callback type.
QMap< QString, QVariant > results
nodeId::portName → data (QVariant wrapping domain objects or file paths)
QVariantMap graphInputs
Graph-level inputs (populated before execution).
QStringList executionOrder
Topological order used for startup/shutdown.
bool running
Whether the stream is active.
MnaGraph * graph
The pipeline graph (owned externally).
QMap< QString, QObject * > livePlugins
nodeId → live plugin instance (QObject* avoids scan dependency)
Graph node representing a processing step.
Definition mna_node.h:75
MnaNodeExecMode execMode
Execution mode.
Definition mna_node.h:83
QString ipcCommand
External executable command.
Definition mna_node.h:86
QStringList ipcArgs
Command-line arguments (supports {{placeholder}} tokens).
Definition mna_node.h:87
bool dirty
Whether node needs re-execution.
Definition mna_node.h:100
MnaScript script
Inline source code, interpreter, language.
Definition mna_node.h:92
QList< MnaPort > inputs
Input ports.
Definition mna_node.h:80
QVariantMap attributes
Operation parameters.
Definition mna_node.h:78
QDateTime executedAt
Timestamp of last execution.
Definition mna_node.h:99
QString opType
Operation type (looked up in MnaOpRegistry).
Definition mna_node.h:77
QString ipcWorkDir
Working directory for external process.
Definition mna_node.h:88
QList< MnaPort > outputs
Output ports.
Definition mna_node.h:81
Operation registry for the MNA graph model.
static MnaOpRegistry & instance()
OpFunc opFunc(const QString &opType) const
std::function< QVariantMap(const QVariantMap &inputs, const QVariantMap &attributes)> OpFunc
Operation implementation callback type.
QStringList evaluate(const QMap< QString, QVariant > &results)
QStringList allPaths() const
QVariant param(const QString &path) const
Graph port descriptor.
Definition mna_port.h:68
QString name
Port name (unique within a node).
Definition mna_port.h:69
QString sourcePortName
Which output port on that node?
Definition mna_port.h:75
QString cachedResultPath
Relative path to cached result.
Definition mna_port.h:91
QString sourceNodeId
Which node produces this input? (empty → graph-level input).
Definition mna_port.h:74
Inline code for script-type graph nodes.
Definition mna_script.h:69
bool keepTempFile
true → preserve temp script file after execution (debug aid)
Definition mna_script.h:78
QString code
The inline source code (resolved at save-time if sourceUri is set).
Definition mna_script.h:74
QString interpreter
Definition mna_script.h:71
QString language
"python", "shell", "r", "matlab", "octave", "julia"
Definition mna_script.h:70
QStringList interpreterArgs
Extra args before the script file (e.g. ["-u"] for unbuffered Python).
Definition mna_script.h:73