Skip to content

Conversation

@lquerel
Copy link
Contributor

@lquerel lquerel commented Feb 12, 2026

Change Summary

Sorry in advance, this is a fairly large PR, but it's for a good reason as it aims to stabilize our configuration model, which we discussed during our SIG meetings.

  • Reworked node identity to use type: NodeUrn and removed the old kind/plugin_urn split.
  • Evolved NodeUrn from a type alias to a concrete parsed type (namespace, id, kind) with zero-cost part access and canonical URN reconstruction.
  • Moved URN normalization/parsing logic into the node_urn module and cleaned up obsolete URN plumbing.
  • Fully removed node-level out_ports wiring from NodeUserConfig.
  • Externalized graph wiring into top-level connections in PipelineConfig.
  • Simplified connection syntax:
    • removed out_port field from connections
    • default source output is implicit (default)
    • multi-output selection stays explicit via from: node["output"]
  • Standardized naming around output ports:
    • config fields use outputs and default_output
    • default output name is default
    • outputs/default_output are optional for single-output nodes
  • Replaced connection fanout schema with policy-oriented schema:
    • policies.dispatch with one_of (default) and broadcast. I believe one_of better reflect the underlying implementation (was never really a round robin strategy as the channel receivers were competing together).
    • broadcast is currently parsed but rejected for multi-destination edges (reserved for future support)
    • single-destination edges treat dispatch as no-op
  • Refactored PipelineConfigBuilder API for readability in tests:
    • one_of(src, targets) and broadcast(src, targets) for default output
    • one_of_output(src, output, targets) and broadcast_output(...) for explicit output
    • added to(src, dst) and to_output(src, output, dst) aliases
  • Updated engine wiring internals and channel identity labeling to use dispatch policy terminology (one_of/broadcast) consistently.
  • Updated docs and examples to the new model:

To do: update the configuration of our continuous benchmarks.

What issue does this PR close?

How are these changes tested?

All unit tests passed

Are there any user-facing changes?

The structure of the configuration files have changed.

@github-actions github-actions bot added the rust Pull requests that update Rust code label Feb 12, 2026
@lquerel lquerel self-assigned this Feb 12, 2026
@lquerel lquerel removed this from OTel-Arrow Feb 12, 2026
@lquerel lquerel added the engine-capability Internal engine features label Feb 12, 2026

/// The URN of a node type.
pub type NodeUrn = Cow<'static, str>;
pub use node_urn::NodeUrn;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NodeUrn is now a concrete type

kind: NodeKind,
}

impl NodeUrn {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the methods here were present in other files before. I just group them under this new NodeUrn type.

Comment on lines +421 to +423
OneOf,
/// Send each message to every destination.
Broadcast,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

Comment on lines 915 to 916
// The fanout processor expects each destination port to map to a single downstream node.
// Multiple targets on the same source port would introduce load-balancing semantics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the fanout processor is a special case --

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not great. Need to find a better approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing a WiringContract directly in the node factory. By default, there would be no additional wiring restrictions beyond those already in place for orphan nodes, cycle detection, and similar safeguards. The fanout processor would be able to declare a constraint such as: "pipeline builder, do not accept more than one connection per output".

Comment on lines +1474 to +1475
nuc.add_output("main_output");
nuc.set_default_output("main_output");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, see!

@codecov
Copy link

codecov bot commented Feb 13, 2026

Codecov Report

❌ Patch coverage is 88.80937% with 172 lines in your changes missing coverage. Please review.
✅ Project coverage is 86.63%. Comparing base (75a2f71) to head (ecaa3ff).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2031      +/-   ##
==========================================
+ Coverage   86.62%   86.63%   +0.01%     
==========================================
  Files         527      528       +1     
  Lines      168879   169481     +602     
==========================================
+ Hits       146283   146823     +540     
- Misses      22062    22124      +62     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 88.69% <88.80%> (+<0.01%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 90.29% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
quiver 91.84% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@lquerel lquerel marked this pull request as ready for review February 13, 2026 02:18
@lquerel lquerel requested a review from a team as a code owner February 13, 2026 02:18
@lquerel lquerel changed the title [WIP] Dataflow Engine Pipeline configuration stabilization Dataflow Engine Pipeline configuration stabilization Feb 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine-capability Internal engine features rust Pull requests that update Rust code

Projects

Status: No status

2 participants