Introduction
Getting Started
This guide will walk us through installing Panopticon and running our first data pipeline. By the end, we will understand the basic pattern for building pipelines and be ready to explore more advanced features.
Installation
Add panopticon-core to your project's Cargo.toml:
[dependencies]
panopticon-core = "0.2"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
anyhow = "1"
Panopticon uses Tokio for async runtime and anyhow for error handling. These are required dependencies for running pipelines.
Hello Pipeline
Let us build a minimal pipeline that loads a CSV file and prints some basic information about it. This demonstrates the core pattern we will use throughout Panopticon.
use panopticon_core::prelude::*; #[tokio::main] async fn main() -> anyhow::Result<()> { // 1. Create a new pipeline let mut pipeline = Pipeline::new(); // 2. Define file loading attributes let file_attrs = ObjectBuilder::new() .insert( "files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "products") .insert("file", "/path/to/products.csv") .insert("format", "csv") .build_scalar(), ]), ) .build_hashmap(); // 3. Add a namespace and command to the pipeline pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; // 4. Compile and execute let completed = pipeline.compile().await?.execute().await?; // 5. Access results let results = completed.results(ResultSettings::default()).await?; println!("Pipeline completed with {} command(s)", results.len()); Ok(()) }
Let us break down what is happening:
-
Pipeline::new() creates a draft pipeline ready to accept namespaces and commands.
-
ObjectBuilder constructs the attribute map that configures our command. Here we define a file to load with its name, path, and format.
-
add_namespace() creates a logical grouping, and add_command() adds a command to that namespace. The command is identified by
namespace.command(e.g.,data.load). -
compile() validates the pipeline and resolves dependencies. execute() runs all commands in the correct order.
-
results() returns a
ResultStorecontaining all command outputs, which we can query by path.
Running Examples
The Panopticon repository includes several examples demonstrating different features. To run them:
# Clone the repository
git clone https://github.com/dolly-parseton/panopticon.git
cd panopticon
# Run the multi-format loading example
cargo run --example multi_format_load
# Run the aggregation example
cargo run --example aggregate_and_export
# Run the conditional execution example
cargo run --example when_conditional
Each example is self-contained and includes comments explaining what it demonstrates.
Available Examples
| Example | Description |
|---|---|
multi_format_load | Loading CSV, JSON, and Parquet files |
aggregate_and_export | Aggregation operations and result export |
when_conditional | Conditional command execution |
template_inheritance | Tera template inheritance patterns |
iterate_object_keys | Iterating over dynamic data |
pipeline_reuse | Reusing pipeline definitions |
custom_command | Building your own commands |
command_spec_safety | Command specification validation |
Project Structure
A typical Panopticon project follows this structure:
my-pipeline/
├── Cargo.toml
├── src/
│ └── main.rs # Pipeline definition and execution
├── templates/ # Tera templates (if using TemplateCommand)
│ └── report.html
└── data/ # Input data files
├── input.csv
└── config.json
For larger projects, we recommend organizing pipelines into modules:
my-pipeline/
├── Cargo.toml
├── src/
│ ├── main.rs
│ ├── pipelines/
│ │ ├── mod.rs
│ │ ├── etl.rs # ETL pipeline
│ │ └── reports.rs # Reporting pipeline
│ └── commands/ # Custom commands (if extending)
│ └── mod.rs
└── ...
Next Steps
Now that we have a basic pipeline running, we are ready to explore the core concepts:
- Core Concepts - Understand the pipeline state machine, namespaces, and data stores
- Commands Overview - Learn about the built-in commands available
- Working with Data - Master store paths and data access patterns
If you want to build custom commands for your specific use case, see the Extending Panopticon guide.
Core Concepts
Panopticon is built around a small set of interlocking concepts that, once understood, make the library predictable and composable. This section walks through the mental model we use when designing pipelines.
The Big Picture
A Panopticon pipeline is a declarative data processing workflow. Rather than writing imperative code that fetches data, transforms it, and stores results, we describe what we want to happen and let the library figure out the execution details.
┌─────────────────────────────────────────────────────────────────┐
│ Pipeline │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Namespaces │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ "data" │ │ "query" │ │ "stats" │ │ │
│ │ │ (Once) │ │ (Once) │ │ (Iterative) │ │ │
│ │ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │ │
│ │ │ │Command│ │ │ │Command│ │ │ │Command│ │ │ │
│ │ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Data Stores │ │
│ │ ScalarStore (JSON-like values) │ │
│ │ TabularStore (Polars DataFrames) │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Key Concepts
Pipeline State Machine
Pipelines progress through three states: Draft, Ready, and Completed. This state machine prevents common errors like modifying a running pipeline or executing an incomplete one. We use Rust's type system to enforce these transitions at compile time.
Namespaces
Namespaces group related commands and control how they execute. The three namespace types - Once, Iterative, and Static - let us express single operations, loops over data, and constant configuration respectively.
Commands and Attributes
Commands are the units of work in a pipeline. Each command has a type (like FileCommand or SqlCommand), a name, and attributes that configure its behavior. We configure commands using the ObjectBuilder pattern for type-safe attribute construction.
Data Stores
All data flows through two stores: the ScalarStore for JSON-like values (strings, numbers, objects, arrays) and the TabularStore for Polars DataFrames. Commands read from and write to these stores using StorePaths - dot-separated addresses like data.load.users.data.
How They Fit Together
Here is the typical flow of a Panopticon pipeline:
- Build - We create a
Pipeline<Draft>, add namespaces, and configure commands with attributes - Compile - We call
.compile()to validate the pipeline and transition toPipeline<Ready> - Execute - We call
.execute()to run all commands, transitioning toPipeline<Completed> - Collect - We call
.results()to gather outputs from the data stores - Iterate (optional) - We call
.edit()to return to Draft state and add more commands
This lifecycle ensures that pipelines are valid before execution and that results are only accessible after completion. The following sections dive deeper into each concept.
Pipeline State Machine
The Pipeline type in Panopticon uses a compile-time state machine to enforce correct usage. This pattern prevents entire categories of bugs - we cannot accidentally execute an incomplete pipeline, modify one that is running, or access results that do not yet exist.
The Three States
┌─────────┐ compile() ┌─────────┐ execute() ┌───────────┐
│ Draft │ ──────────────▶ │ Ready │ ──────────────▶ │ Completed │
└─────────┘ └─────────┘ └───────────┘
▲ │ │
│ edit() │ edit() │
└───────────────────────────┴───────────────────────────┘
Draft
A Pipeline<Draft> is under construction. In this state we can:
- Add namespaces with
add_namespace() - Add commands to namespaces via the returned
NamespaceHandle - Configure services and options
We cannot execute a Draft pipeline. The .execute() method simply does not exist on this type.
Ready
A Pipeline<Ready> has passed validation and is prepared for execution. The transition from Draft to Ready happens via .compile(), which performs several checks:
- Namespace names are unique and not reserved
- Command names are unique within their namespace
- Iterative namespaces have valid store paths
- Command attributes pass schema validation
- The execution plan is valid (no circular dependencies)
From Ready, we can either:
- Call
.execute()to run the pipeline - Call
.edit()to return to Draft state for modifications
Completed
A Pipeline<Completed> has finished executing all commands. The execution context containing all results is stored in the Completed state. From here we can:
- Call
.results()to collect outputs into aResultStore - Call
.restart()to return to Ready state and re-execute - Call
.edit()to return to Draft state and add more commands
Why a State Machine?
This design is intentional. Consider what could go wrong without it:
Without state machine:
#![allow(unused)] fn main() { // Hypothetical bad API - don't do this let pipeline = Pipeline::new(); pipeline.add_command(...); let results = pipeline.execute(); // What if add_command() failed? pipeline.add_command(...); // Modifying during execution? let more_results = pipeline.results(); // Which execution? }
With state machine:
#![allow(unused)] fn main() { // The actual API - compile-time guarantees let mut pipeline = Pipeline::new(); // Draft pipeline.add_namespace(...).await?; // Still Draft let ready = pipeline.compile().await?; // Ready - validated! let completed = ready.execute().await?; // Completed - all commands ran let results = completed.results(...).await?; // Results available let pipeline = completed.edit(); // Back to Draft }
The type system prevents us from calling methods that do not make sense in the current state. If we try to call .execute() on a Draft pipeline, we get a compile error - not a runtime panic.
Practical Example: Pipeline Reuse
One powerful pattern enabled by the state machine is incremental pipeline building. We can execute a pipeline, inspect results, then add more processing steps:
use panopticon_core::prelude::*; #[tokio::main] async fn main() -> anyhow::Result<()> { // ===== Pass 1: Load data and query ===== println!("=== Pass 1: Load + Query ===\n"); let mut pipeline = Pipeline::new(); // Load users let file_attrs = ObjectBuilder::new() .insert( "files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("file", "fixtures/users.csv") .insert("format", "csv") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; // Query: all users sorted by age let sql_attrs = ObjectBuilder::new() .insert( "tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("source", "data.load.users.data") .build_scalar(), ]), ) .insert("query", "SELECT name, age FROM users ORDER BY age DESC") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("query")) .await? .add_command::<SqlCommand>("sorted", &sql_attrs) .await?; // Execute pass 1: Draft -> Ready -> Completed let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; println!(" Namespaces in pass 1: data, query"); // ===== Pass 2: Edit pipeline, add aggregation, re-execute ===== println!("\n=== Pass 2: Edit + Aggregate ===\n"); // Return to Draft state - all previous namespaces and commands preserved let mut pipeline = completed.edit(); // Add an aggregation namespace to the existing pipeline let agg_attrs = ObjectBuilder::new() .insert("source", "data.load.users.data") .insert( "aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "user_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "avg_age") .insert("column", "age") .insert("op", "mean") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("users", &agg_attrs) .await?; // Re-compile and execute let completed = pipeline.compile().await?.execute().await?; println!(" Namespaces in pass 2: data, query, stats"); println!("\nPipeline successfully edited and re-executed."); Ok(()) }
The key insight is that calling .edit() on a Completed pipeline returns us to Draft while preserving all existing namespaces and commands. We can then add more processing steps and re-execute.
State Transitions Summary
| From | To | Method | What Happens |
|---|---|---|---|
| Draft | Ready | .compile() | Validates pipeline configuration |
| Ready | Completed | .execute() | Runs all commands |
| Ready | Draft | .edit() | Returns to editing mode |
| Completed | Draft | .edit() | Returns to editing mode |
| Completed | Ready | .restart() | Clears results, ready to re-execute |
Implementation Details
For those curious about the implementation, Panopticon uses Rust's type system to encode states:
#![allow(unused)] fn main() { // Marker types for states pub struct Draft; pub struct Ready; pub struct Completed { context: ExecutionContext, } // Generic pipeline parameterized by state pub struct Pipeline<T = Draft> { pub(crate) services: PipelineServices, pub(crate) namespaces: Vec<Namespace>, pub(crate) commands: Vec<CommandSpec>, state: T, } }
Each state has its own impl Pipeline<State> block defining only the methods valid for that state. The Completed state holds the ExecutionContext containing all results, which is why .results() is only available on Pipeline<Completed>.
This pattern is sometimes called the "typestate" pattern in Rust. It moves invariant checking from runtime to compile time, resulting in APIs that are impossible to misuse.
Namespaces
Namespaces are the organizational unit in Panopticon. Every command belongs to exactly one namespace, and the namespace type determines how those commands execute.
The Three Namespace Types
┌─────────────────────────────────────────────────────────────────┐
│ Namespace Types │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Once Execute commands once, in order │
│ ───────────────────────────────────────────── │
│ [cmd1] → [cmd2] → [cmd3] │
│ │
│ Iterative Execute commands once per item in a collection │
│ ───────────────────────────────────────────── │
│ for item in collection: │
│ [cmd1] → [cmd2] → [cmd3] │
│ │
│ Static No commands, just provides constant values │
│ ───────────────────────────────────────────── │
│ { key1: value1, key2: value2 } │
│ │
└─────────────────────────────────────────────────────────────────┘
Once Namespaces
The Once namespace is the default. Commands in a Once namespace execute exactly once, in the order they were added.
#![allow(unused)] fn main() { // Create a Once namespace (the default) pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; }
This is the most common namespace type. Use it for:
- Loading data from files or APIs
- Running SQL queries
- Performing one-time transformations
Iterative Namespaces
Iterative namespaces execute their commands once for each item in a collection. The collection can come from:
- An array in the scalar store
- Object keys from a JSON object
- A column in a DataFrame
- A string split by a delimiter
#![allow(unused)] fn main() { // Create an Iterative namespace that loops over object keys let mut handle = pipeline .add_namespace( NamespaceBuilder::new("classify") .iterative() .store_path(StorePath::from_segments(["config", "regions"])) .scalar_object_keys(None, false) .iter_var("region") .index_var("idx"), ) .await?; handle .add_command::<ConditionCommand>("check", &condition_attrs) .await?; }
During execution, Panopticon:
- Resolves the collection from the store path
- For each item, sets the iteration variables (
regionandidxin this example) - Executes all commands in the namespace
- Cleans up the iteration variables
Results from iterative commands are indexed. Instead of storing at classify.check.result, we store at classify.check[0].result, classify.check[1].result, etc.
Static Namespaces
Static namespaces contain no commands - they exist purely to provide constant values to the data stores. Think of them as configuration namespaces.
#![allow(unused)] fn main() { // Create a Static namespace with configuration values pipeline .add_namespace( NamespaceBuilder::new("config") .static_ns() .insert("api_version", ScalarValue::String("v2".into())) .insert( "regions", ObjectBuilder::new() .insert("us-east", "Virginia") .insert("us-west", "Oregon") .insert("eu-west", "Ireland") .build_scalar(), ), ) .await?; }
Values from static namespaces are available to all subsequent commands via Tera templating:
#![allow(unused)] fn main() { // In a later command's attributes .insert("endpoint", "https://api.example.com/{{ config.api_version }}/data") }
Iteration Sources
Iterative namespaces support several source types for determining what to iterate over:
ScalarArray
Iterate over elements in a JSON array:
#![allow(unused)] fn main() { NamespaceBuilder::new("process") .iterative() .store_path(StorePath::from_segments(["data", "items"])) .scalar_array(None) // None = all items, Some((start, end)) = range .iter_var("item") }
ScalarObjectKeys
Iterate over keys of a JSON object:
#![allow(unused)] fn main() { NamespaceBuilder::new("classify") .iterative() .store_path(StorePath::from_segments(["config", "regions"])) .scalar_object_keys(None, false) // None = all keys, Some(vec) = filter .iter_var("region") }
The second parameter controls exclusion - true means "iterate over all keys except those listed".
ScalarStringSplit
Iterate over parts of a delimited string:
#![allow(unused)] fn main() { NamespaceBuilder::new("tags") .iterative() .store_path(StorePath::from_segments(["data", "tag_list"])) .string_split(",") .iter_var("tag") }
TabularColumn
Iterate over unique values in a DataFrame column:
#![allow(unused)] fn main() { NamespaceBuilder::new("by_category") .iterative() .store_path(StorePath::from_segments(["data", "products", "data"])) .tabular_column("category", None) .iter_var("category") }
Complete Example: Object Key Iteration
Here is a full example showing how to iterate over object keys:
use panopticon_core::prelude::*; #[tokio::main] async fn main() -> anyhow::Result<()> { let mut pipeline = Pipeline::new(); // --- Static namespace: an object whose keys we will iterate --- pipeline .add_namespace( NamespaceBuilder::new("config").static_ns().insert( "regions", ObjectBuilder::new() .insert("us-east", "Virginia") .insert("us-west", "Oregon") .insert("eu-west", "Ireland") .build_scalar(), ), ) .await?; // --- Iterative namespace: loop over each region key --- let condition_attrs = ObjectBuilder::new() .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "is_us") .insert("if", "region is starting_with(\"us-\")") .insert("then", "Region {{ region }} is in the US") .build_scalar(), ObjectBuilder::new() .insert("name", "is_eu") .insert("if", "region is starting_with(\"eu-\")") .insert("then", "Region {{ region }} is in the EU") .build_scalar(), ]), ) .insert("default", "Region {{ region }} is in an unknown area") .build_hashmap(); let mut handle = pipeline .add_namespace( NamespaceBuilder::new("classify") .iterative() .store_path(StorePath::from_segments(["config", "regions"])) .scalar_object_keys(None, false) .iter_var("region") .index_var("idx"), ) .await?; handle .add_command::<ConditionCommand>("region", &condition_attrs) .await?; // --- Execute --- let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // --- Print results per iteration --- println!("=== Iterating over region keys ===\n"); let mut idx = 0; loop { let source = StorePath::from_segments(["classify", "region"]).with_index(idx); let Some(cmd_results) = results.get_by_source(&source) else { break; }; let result = cmd_results .data_get(&source.with_segment("result")) .and_then(|r| r.as_scalar()) .expect("Expected result"); println!(" [{}] {}", idx, result.1); idx += 1; } println!("\nProcessed {} region(s)", idx); Ok(()) }
Output:
=== Iterating over region keys ===
[0] Region us-east is in the US
[1] Region us-west is in the US
[2] Region eu-west is in the EU
Processed 3 region(s)
Reserved Names
Two namespace names are reserved and cannot be used:
item- Default iteration variable nameindex- Default index variable name
If you try to create a namespace with a reserved name, the builder will return an error.
Namespace Execution Order
Namespaces execute in the order they were added to the pipeline. This is important because later namespaces can reference data produced by earlier ones.
#![allow(unused)] fn main() { // 1. Load data (executes first) pipeline.add_namespace(NamespaceBuilder::new("data")).await?; // 2. Query the loaded data (executes second) pipeline.add_namespace(NamespaceBuilder::new("query")).await?; // 3. Aggregate the query results (executes third) pipeline.add_namespace(NamespaceBuilder::new("stats")).await?; }
Within a namespace, commands also execute in order. The combination of namespace ordering and command ordering gives us predictable, deterministic pipeline execution.
Commands and Attributes
Commands are the workhorses of Panopticon. Each command performs a specific operation - loading files, running SQL queries, evaluating conditions, and so on. We configure commands through attributes, which are key-value pairs that control the command's behavior.
The Command Trait
Under the hood, a command is any type that implements three traits:
#![allow(unused)] fn main() { pub trait Command: FromAttributes + Descriptor + Executable {} }
- FromAttributes - Constructs the command from an attribute map
- Descriptor - Provides metadata (type name, attribute schema, result schema)
- Executable - Performs the actual work during pipeline execution
Panopticon provides a blanket implementation, so any type implementing the three base traits automatically implements Command.
Adding Commands to a Pipeline
Commands are always added to a namespace. The pattern looks like this:
#![allow(unused)] fn main() { let mut handle = pipeline .add_namespace(NamespaceBuilder::new("data")) .await?; handle .add_command::<FileCommand>("load", &file_attrs) .await?; }
The turbofish syntax <FileCommand> tells Panopticon which command type to use. The string "load" is the command's name within this namespace, which becomes part of the store path for its results (data.load.*).
Building Attributes with ObjectBuilder
Attributes are a HashMap<String, ScalarValue>. While we could construct this manually, ObjectBuilder provides a more ergonomic interface:
#![allow(unused)] fn main() { let file_attrs = ObjectBuilder::new() .insert( "files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("file", "data/users.csv") .insert("format", "csv") .build_scalar(), ]), ) .build_hashmap(); }
ObjectBuilder Methods
| Method | Description |
|---|---|
new() | Create a new empty builder |
insert(key, value) | Add a key-value pair |
object(key, nested) | Add a nested ObjectBuilder |
build_scalar() | Convert to a ScalarValue::Object |
build_hashmap() | Convert to HashMap<String, ScalarValue> |
Nested Objects
For complex attribute structures, we can nest ObjectBuilders:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert("name", "report") .object("options", ObjectBuilder::new() .insert("format", "json") .insert("pretty", true) ) .build_hashmap(); }
This produces the equivalent of:
{
"name": "report",
"options": {
"format": "json",
"pretty": true
}
}
Arrays of Objects
Many commands accept arrays of configuration objects:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "total") .insert("op", "sum") .insert("column", "amount") .build_scalar(), ObjectBuilder::new() .insert("name", "average") .insert("op", "mean") .insert("column", "amount") .build_scalar(), ]), ) .build_hashmap(); }
Common Attributes
All commands support a when attribute for conditional execution:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert("when", "data.load.status == \"success\"") // ... other attributes .build_hashmap(); }
The when attribute is evaluated as a Tera expression. If it evaluates to a falsy value, the command is skipped and its status is set to "skipped".
Attribute Validation
When a pipeline compiles, Panopticon validates all command attributes against their schemas. Each command type declares:
- Required attributes - Must be present
- Optional attributes - May be omitted
- Type constraints - Values must match expected types
If validation fails, .compile() returns an error describing what is wrong.
Command Results
Every command produces results, which are stored at paths derived from the namespace and command name. All commands automatically produce:
| Result | Type | Description |
|---|---|---|
status | String | "success", "skipped", "error", or "cancelled" |
duration_ms | Number | Execution time in milliseconds |
Commands also produce their own specific results. For example, ConditionCommand produces:
| Result | Type | Description |
|---|---|---|
result | String | The value from the matched branch or default |
matched | Bool | Whether a branch condition matched |
branch_index | Number | Index of matched branch, or -1 for default |
Result Kinds: Meta vs Data
Results are categorized as either Meta or Data:
- Meta results describe the execution (status, duration, row counts)
- Data results contain the actual output (query results, computed values)
This distinction matters when collecting results - we might want to include all data but only summary metadata.
Example: ConditionCommand
Let us walk through a complete example using ConditionCommand:
use panopticon_core::prelude::*; #[tokio::main] async fn main() -> anyhow::Result<()> { let mut pipeline = Pipeline::new(); // Static config providing a value to check pipeline .add_namespace( NamespaceBuilder::new("input") .static_ns() .insert("score", ScalarValue::Number(85.into())), ) .await?; // Condition command to classify the score let condition_attrs = ObjectBuilder::new() .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "excellent") .insert("if", "input.score >= 90") .insert("then", "Excellent work!") .build_scalar(), ObjectBuilder::new() .insert("name", "good") .insert("if", "input.score >= 70") .insert("then", "Good job!") .build_scalar(), ObjectBuilder::new() .insert("name", "needs_work") .insert("if", "input.score >= 50") .insert("then", "Keep practicing!") .build_scalar(), ]), ) .insert("default", "Please try again.") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("classify")) .await? .add_command::<ConditionCommand>("score", &condition_attrs) .await?; // Execute let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // Get the result let source = StorePath::from_segments(["classify", "score"]); let cmd_results = results.get_by_source(&source).expect("Expected results"); let result = cmd_results .data_get(&source.with_segment("result")) .and_then(|r| r.as_scalar()) .expect("Expected result"); println!("Classification: {}", result.1); // "Good job!" Ok(()) }
Built-in Commands
Panopticon provides several built-in commands:
| Command | Purpose |
|---|---|
FileCommand | Load data from CSV, JSON, or Parquet files |
SqlCommand | Run SQL queries against loaded DataFrames |
AggregateCommand | Compute aggregations (sum, mean, count, etc.) |
ConditionCommand | Evaluate conditional logic with branches |
TemplateCommand | Generate text using Tera templates |
Each command has its own attribute schema documented in the Commands section.
Tera Substitution in Attributes
String attributes support Tera template syntax. Before a command executes, Panopticon substitutes any {{ ... }} expressions with values from the scalar store:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert("query", "SELECT * FROM users WHERE region = '{{ config.region }}'") .build_hashmap(); }
This enables dynamic configuration based on earlier command results or static namespace values.
Summary
The command system in Panopticon follows a consistent pattern:
- Commands implement
FromAttributes,Descriptor, andExecutable - We add commands to namespaces using
add_command::<T>(name, &attrs) - Attributes are built using
ObjectBuilderfor type safety - All commands support the
whenattribute for conditional execution - Results are stored at
namespace.command.fieldpaths - String attributes support Tera templating for dynamic values
This design keeps command configuration declarative while enabling powerful dynamic behavior through templating and conditional execution.
Data Stores
All data in a Panopticon pipeline flows through two stores: the ScalarStore for JSON-like values and the TabularStore for Polars DataFrames. Understanding these stores is essential for working with command inputs and outputs.
Two Stores, Two Data Models
┌─────────────────────────────────────────────────────────────────┐
│ ExecutionContext │
│ │
│ ┌─────────────────────────────┐ ┌─────────────────────────┐ │
│ │ ScalarStore │ │ TabularStore │ │
│ │ │ │ │ │
│ │ • Strings │ │ • Polars DataFrames │ │
│ │ • Numbers │ │ • Columnar data │ │
│ │ • Booleans │ │ • SQL-queryable │ │
│ │ • Arrays │ │ │ │
│ │ • Objects (nested) │ │ │ │
│ │ • Null │ │ │ │
│ │ │ │ │ │
│ │ Backed by Tera Context │ │ HashMap<String, DF> │ │
│ │ (enables templating) │ │ │ │
│ └─────────────────────────────┘ └─────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
ScalarStore
The ScalarStore holds JSON-like values. Internally, it wraps a Tera context, which means all scalar values are automatically available for template substitution.
#![allow(unused)] fn main() { pub type ScalarValue = tera::Value; // Re-export of serde_json::Value }
Scalar values can be:
- Null - Absence of a value
- Bool -
trueorfalse - Number - Integers or floating-point
- String - Text data
- Array - Ordered list of values
- Object - Key-value map (nested structure)
TabularStore
The TabularStore holds Polars DataFrames - efficient columnar data structures ideal for analytical queries.
#![allow(unused)] fn main() { pub type TabularValue = polars::prelude::DataFrame; }
DataFrames are stored by their full store path (as a dotted string key). Commands like SqlCommand can register these as tables and query across them.
Store Paths
Values in both stores are addressed using StorePath - a structured path that typically follows the pattern namespace.command.field:
#![allow(unused)] fn main() { // Create a store path let path = StorePath::from_segments(["data", "load", "users", "data"]); // Access with dotted notation let dotted = path.to_dotted(); // "data.load.users.data" // Add segments let status_path = path.with_segment("status"); // "data.load.users.status" // Add iteration index let indexed = path.with_index(0); // "data.load.users[0]" }
Store paths provide a consistent way to reference data throughout the pipeline.
ScalarValue Operations
Creating Values
Panopticon provides helper functions and the ObjectBuilder for creating scalar values:
#![allow(unused)] fn main() { use panopticon_core::prelude::*; // Primitives convert automatically via Into<ScalarValue> let string_val: ScalarValue = "hello".into(); let number_val: ScalarValue = 42.into(); let bool_val: ScalarValue = true.into(); // Arrays let array_val = ScalarValue::Array(vec![ "a".into(), "b".into(), "c".into(), ]); // Objects using ObjectBuilder let object_val = ObjectBuilder::new() .insert("name", "Alice") .insert("age", 30) .insert("active", true) .build_scalar(); }
Type Checking and Extraction
ScalarValue provides methods for type checking and extraction:
#![allow(unused)] fn main() { // Standard serde_json methods if let Some(s) = value.as_str() { /* use string */ } if let Some(n) = value.as_i64() { /* use integer */ } if let Some(b) = value.as_bool() { /* use boolean */ } if let Some(arr) = value.as_array() { /* use array */ } if let Some(obj) = value.as_object() { /* use object */ } // Extension trait with error handling use panopticon_core::prelude::ScalarAsExt; let name = value.as_str_or_err("name")?; // Returns Result let count = value.as_i64_or_err("count")?; let items = value.as_array_or_err("items")?; }
Map Extension Trait
For working with object maps, the ScalarMapExt trait provides convenient accessors:
#![allow(unused)] fn main() { use panopticon_core::prelude::ScalarMapExt; let obj = value.as_object().unwrap(); // Required fields (returns Result) let name = obj.get_required_string("name")?; let count = obj.get_required_i64("count")?; let enabled = obj.get_required_bool("enabled")?; // Optional fields (returns Option) let description = obj.get_optional_string("description"); let limit = obj.get_optional_i64("limit"); }
How Commands Use Stores
Commands read inputs from and write outputs to the stores. Here is a typical pattern:
┌──────────────┐ ┌─────────────┐ ┌──────────────┐
│ ScalarStore │────────▶│ Command │────────▶│ ScalarStore │
│ (inputs) │ │ │ │ (outputs) │
└──────────────┘ │ - Read │ └──────────────┘
│ - Process │
┌──────────────┐ │ - Write │ ┌──────────────┐
│ TabularStore │────────▶│ │────────▶│ TabularStore │
│ (inputs) │ └─────────────┘ │ (outputs) │
└──────────────┘ └──────────────┘
FileCommand Example
When FileCommand loads a CSV file:
- Reads the file from disk (not from stores)
- Writes DataFrame to TabularStore at
namespace.command.name.data - Writes metadata to ScalarStore:
namespace.command.name.rows- Row countnamespace.command.name.columns- Column countnamespace.command.status- "success"namespace.command.duration_ms- Execution time
SqlCommand Example
When SqlCommand runs a query:
- Reads DataFrames from TabularStore based on
tablesattribute - Registers them as SQL tables
- Executes the query
- Writes result DataFrame to TabularStore
- Writes metadata to ScalarStore
Template Substitution
Because ScalarStore wraps a Tera context, all values are available for template substitution in command attributes:
#![allow(unused)] fn main() { // Static namespace provides config pipeline.add_namespace( NamespaceBuilder::new("config") .static_ns() .insert("region", "us-east") .insert("limit", ScalarValue::Number(100.into())) ).await?; // SQL command uses templated query let sql_attrs = ObjectBuilder::new() .insert("query", "SELECT * FROM users WHERE region = '{{ config.region }}' LIMIT {{ config.limit }}") .build_hashmap(); }
During execution, Panopticon substitutes {{ config.region }} with "us-east" and {{ config.limit }} with 100 before the command runs.
Accessing Results
After pipeline execution, we retrieve results through the ResultStore:
#![allow(unused)] fn main() { let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // Get results for a specific command let source = StorePath::from_segments(["data", "load"]); let cmd_results = results.get_by_source(&source).expect("Expected results"); // Access metadata let rows = cmd_results.meta_get(&source.with_segment("rows")); // Access data let status = cmd_results.data_get(&source.with_segment("result")); }
ResultValue Types
Results come in two forms:
#![allow(unused)] fn main() { pub enum ResultValue { Scalar { ty: ScalarType, value: ScalarValue, }, Tabular { path: PathBuf, // File path where DataFrame was written format: OutputFormat, rows_count: usize, columns_count: usize, }, } }
Tabular results are written to disk (CSV, JSON, or Parquet) and the ResultValue contains the path and summary statistics.
ScalarType Enum
For type introspection, Panopticon provides a ScalarType enum:
#![allow(unused)] fn main() { pub enum ScalarType { Null, Bool, Number, String, Array, Object, } }
This is useful for schema validation and result type checking.
Practical Example
Here is a complete example showing data flow through the stores:
use panopticon_core::prelude::*; #[tokio::main] async fn main() -> anyhow::Result<()> { let mut pipeline = Pipeline::new(); // 1. Static namespace adds values to ScalarStore pipeline .add_namespace( NamespaceBuilder::new("config") .static_ns() .insert("threshold", ScalarValue::Number(50.into())) ) .await?; // 2. FileCommand writes DataFrame to TabularStore let file_attrs = ObjectBuilder::new() .insert("files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "scores") .insert("file", "data/scores.csv") .insert("format", "csv") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; // 3. SqlCommand reads from TabularStore, uses ScalarStore for templating let sql_attrs = ObjectBuilder::new() .insert("tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "scores") .insert("source", "data.load.scores.data") .build_scalar(), ])) .insert("query", "SELECT * FROM scores WHERE value > {{ config.threshold }}") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("filtered")) .await? .add_command::<SqlCommand>("high_scores", &sql_attrs) .await?; // Execute and collect results let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // 4. Results contain both scalar metadata and tabular file paths let source = StorePath::from_segments(["filtered", "high_scores"]); if let Some(cmd_results) = results.get_by_source(&source) { // Metadata from ScalarStore if let Some(rows) = cmd_results.meta_get(&source.with_segment("rows")) { println!("Filtered rows: {}", rows); } // Tabular data was written to file if let Some(result_value) = cmd_results.data_get(&source.with_segment("data")) { if let Some((path, _format)) = result_value.as_tabular() { println!("Data written to: {}", path.display()); } } } Ok(()) }
Summary
The dual-store architecture in Panopticon separates concerns:
- ScalarStore handles configuration, metadata, and template substitution
- TabularStore handles large datasets efficiently with Polars
This separation allows us to:
- Use JSON-like values for flexible configuration
- Leverage Tera templating for dynamic attribute values
- Process large datasets with columnar efficiency
- Query across DataFrames using SQL
Understanding how data flows through these stores is key to building effective Panopticon pipelines.
Built-in Commands
Panopticon includes a set of built-in commands that cover common data pipeline operations. These commands work together to load, transform, analyze, and export data.
Command Overview
| Command | Purpose | Key Use Cases |
|---|---|---|
| FileCommand | Load data files | Read CSV, JSON, and Parquet files into the tabular store |
| SqlCommand | Query tabular data | Filter, join, transform data using SQL syntax |
| AggregateCommand | Compute statistics | Calculate count, sum, mean, max, min, median, and more |
| ConditionCommand | Branch logic | Evaluate Tera expressions to produce conditional outputs |
| TemplateCommand | Render templates | Generate files using Tera templates with inheritance |
Common Patterns
Data Loading and Analysis
A typical pipeline starts by loading data with FileCommand, optionally transforms it with SqlCommand, and computes metrics with AggregateCommand:
#![allow(unused)] fn main() { // Load CSV data pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; // Query the loaded data pipeline .add_namespace(NamespaceBuilder::new("query")) .await? .add_command::<SqlCommand>("filtered", &query_attrs) .await?; // Aggregate results pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("summary", &agg_attrs) .await?; }
Conditional Execution
All commands support the optional when attribute, which is a Tera expression evaluated at runtime. If it resolves to a falsy value, the command is skipped:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert("when", "inputs.feature_enabled") // Skip if false .insert("source", "data.load.users.data") // ... other attributes .build_hashmap(); }
When a command is skipped:
- Its
statusmeta result is set to"skipped" - Data results are absent from the ResultStore
- Dependent commands that reference its outputs will fail unless handled
Store Path References
Commands that consume data from earlier pipeline stages use store paths to reference results. Store paths are dot-separated strings that identify locations in the data stores:
namespace.command.result_key
For example:
data.load.users.data- The tabular data loaded from users filestats.summary.row_count- A scalar aggregation result
See Store Paths for more details.
Tera Template Substitution
Many command attributes support Tera template syntax for dynamic values. This is indicated by "supports Tera substitution" in the attribute documentation:
#![allow(unused)] fn main() { .insert("file", "{{ config.data_dir }}/users.csv") .insert("query", "SELECT * FROM users WHERE status = '{{ inputs.status }}'") }
The execution context automatically substitutes these expressions using values from the scalar store.
Result Types
Commands produce two types of results:
Meta Results
Metadata about the command execution (row counts, sizes, column lists). These are accessed via meta_get() on the command results:
#![allow(unused)] fn main() { let row_count = cmd_results .meta_get(&source.with_segment("rows")) .expect("Expected rows"); }
Data Results
The primary outputs of the command (DataFrames, computed values). These are accessed via data_get():
#![allow(unused)] fn main() { let df = cmd_results .data_get(&source.with_segment("data")) .and_then(|r| r.as_tabular()); }
Each command's documentation lists which results are meta vs. data.
FileCommand
FileCommand loads data files from disk into the tabular store. It supports CSV, JSON, and Parquet formats.
When to Use
Use FileCommand when you need to:
- Load one or more data files into a pipeline
- Ingest data in different formats (CSV, JSON, Parquet)
- Make tabular data available for SQL queries or aggregations
Attributes
| Attribute | Type | Required | Description |
|---|---|---|---|
files | Array of objects | Yes | Array of file specifications to load |
File Object Fields
Each object in the files array has the following fields:
| Field | Type | Required | Description |
|---|---|---|---|
name | String | Yes | Identifier for this file in the tabular store |
file | String | Yes | Path to the file (supports Tera substitution) |
format | String | Yes | File format: csv, json, or parquet (supports Tera substitution) |
Results
Meta Results
| Result | Type | Description |
|---|---|---|
count | Number | Total number of files loaded |
total_rows | Number | Sum of rows across all loaded files |
total_size | Number | Sum of file sizes in bytes |
Data Results (Per File)
For each file in the files array, the following results are produced under {output_prefix}.{name}:
| Result | Type | Description |
|---|---|---|
data | Tabular (DataFrame) | The loaded data |
rows | Number | Row count for this file |
size | Number | File size in bytes |
columns | Array | Column names in the loaded data |
Examples
Loading a Single CSV File
#![allow(unused)] fn main() { use panopticon_core::prelude::*; let attrs = ObjectBuilder::new() .insert( "files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("file", "/path/to/users.csv") .insert("format", "csv") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &attrs) .await?; // After execution, the data is available at: // - data.load.users.data (the DataFrame) // - data.load.users.rows (row count) // - data.load.users.columns (column names) }
Loading Multiple Formats
Load CSV, JSON, and Parquet files in a single command:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("file", "fixtures/users.csv") .insert("format", "csv") .build_scalar(), ObjectBuilder::new() .insert("name", "events") .insert("file", "fixtures/events.json") .insert("format", "json") .build_scalar(), ObjectBuilder::new() .insert("name", "metrics") .insert("file", "fixtures/metrics.parquet") .insert("format", "parquet") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &attrs) .await?; }
After execution:
data.load.users.data- DataFrame from users.csvdata.load.events.data- DataFrame from events.jsondata.load.metrics.data- DataFrame from metrics.parquetdata.load.count- 3 (number of files loaded)data.load.total_rows- Combined row count
Using Tera Substitution for Dynamic Paths
#![allow(unused)] fn main() { // First, set up a static namespace with configuration pipeline .add_namespace( NamespaceBuilder::new("config") .static_ns() .insert("data_dir", ScalarValue::String("/var/data".to_string())) .insert("file_format", ScalarValue::String("csv".to_string())), ) .await?; // Then reference those values in FileCommand let attrs = ObjectBuilder::new() .insert( "files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "daily_report") .insert("file", "{{ config.data_dir }}/report.{{ config.file_format }}") .insert("format", "{{ config.file_format }}") .build_scalar(), ]), ) .build_hashmap(); }
Accessing Results
#![allow(unused)] fn main() { let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; let source = StorePath::from_segments(["data", "load"]); let cmd_results = results.get_by_source(&source).expect("Expected results"); // Access meta results let file_count = cmd_results .meta_get(&source.with_segment("count")) .expect("Expected count"); let total_rows = cmd_results .meta_get(&source.with_segment("total_rows")) .expect("Expected total_rows"); println!("Loaded {} files with {} total rows", file_count, total_rows); // Access per-file meta let users_rows = cmd_results .meta_get(&StorePath::from_dotted("data.load.users.rows")) .expect("Expected users rows"); }
Common Patterns
FileCommand + SqlCommand
Load data with FileCommand, then query it with SqlCommand:
#![allow(unused)] fn main() { // Load let file_attrs = ObjectBuilder::new() .insert("files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "orders") .insert("file", "orders.csv") .insert("format", "csv") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; // Query - reference the loaded data by store path let query_attrs = ObjectBuilder::new() .insert("tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "orders") .insert("source", "data.load.orders.data") // Store path reference .build_scalar(), ])) .insert("query", "SELECT * FROM orders WHERE status = 'completed'") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("query")) .await? .add_command::<SqlCommand>("completed", &query_attrs) .await?; }
Error Handling
FileCommand will return an error if:
- The file does not exist
- The path points to a directory instead of a file
- The file format is not one of
csv,json, orparquet - The file content cannot be parsed as the specified format
Format Notes
CSV
- Assumes the first row contains headers
- Uses default CSV parsing options from Polars
JSON
- Expects newline-delimited JSON (NDJSON) or JSON array format
- Uses Polars'
JsonReader
Parquet
- Reads standard Apache Parquet files
- Efficient for large datasets with columnar storage
SqlCommand
SqlCommand executes SQL queries against tabular data stored in the pipeline. It uses Polars' SQL context to provide full SQL query capabilities on DataFrames.
When to Use
Use SqlCommand when you need to:
- Filter rows based on conditions
- Select specific columns
- Join multiple tables together
- Group and aggregate data
- Transform data using SQL expressions
- Order and limit results
Attributes
| Attribute | Type | Required | Description |
|---|---|---|---|
tables | Array of objects | Yes | Table mappings from store paths to SQL table names |
query | String | Yes | SQL query to execute (supports Tera substitution) |
Table Object Fields
Each object in the tables array maps a store path to a table name for use in SQL:
| Field | Type | Required | Description |
|---|---|---|---|
name | String | Yes | Table name to use in the SQL query |
source | String | Yes | Store path to tabular data (e.g., data.load.users.data) |
Results
Data Results
| Result | Type | Description |
|---|---|---|
data | Tabular (DataFrame) | The query result |
Meta Results
| Result | Type | Description |
|---|---|---|
rows | Number | Number of rows in the result |
columns | Array | Column names in the result |
Examples
Basic Query
#![allow(unused)] fn main() { use panopticon_core::prelude::*; // Assume data was loaded with FileCommand at data.load.users.data let attrs = ObjectBuilder::new() .insert( "tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("source", "data.load.users.data") .build_scalar(), ]), ) .insert("query", "SELECT * FROM users WHERE active = true") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("query")) .await? .add_command::<SqlCommand>("active_users", &attrs) .await?; // Result available at: query.active_users.data }
Joining Multiple Tables
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("source", "data.load.users.data") .build_scalar(), ObjectBuilder::new() .insert("name", "orders") .insert("source", "data.load.orders.data") .build_scalar(), ]), ) .insert( "query", "SELECT u.name, u.email, o.order_id, o.total \ FROM users u \ INNER JOIN orders o ON u.id = o.user_id \ ORDER BY o.total DESC", ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("query")) .await? .add_command::<SqlCommand>("user_orders", &attrs) .await?; }
Cross Join
Combine every row from one table with every row from another:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("source", "data.load.users.data") .build_scalar(), ObjectBuilder::new() .insert("name", "events") .insert("source", "data.load.events.data") .build_scalar(), ]), ) .insert( "query", "SELECT u.name, u.email, e.type AS event_type, e.timestamp \ FROM users u CROSS JOIN events e \ ORDER BY u.name, e.timestamp", ) .build_hashmap(); }
Dynamic Query with Tera Substitution
Use Tera expressions to parameterize queries:
#![allow(unused)] fn main() { // Static namespace with filter values pipeline .add_namespace( NamespaceBuilder::new("inputs") .static_ns() .insert("status", ScalarValue::String("active".to_string())) .insert("min_age", ScalarValue::from(18)), ) .await?; // Query with Tera substitution let attrs = ObjectBuilder::new() .insert( "tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("source", "data.load.users.data") .build_scalar(), ]), ) .insert( "query", "SELECT * FROM users WHERE status = '{{ inputs.status }}' AND age >= {{ inputs.min_age }}", ) .build_hashmap(); }
Aggregation in SQL
While AggregateCommand is available for simple aggregations, SQL can perform grouped aggregations:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "orders") .insert("source", "data.load.orders.data") .build_scalar(), ]), ) .insert( "query", "SELECT category, COUNT(*) as order_count, SUM(total) as revenue \ FROM orders \ GROUP BY category \ ORDER BY revenue DESC", ) .build_hashmap(); }
Accessing Results
#![allow(unused)] fn main() { let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; let source = StorePath::from_segments(["query", "active_users"]); let cmd_results = results.get_by_source(&source).expect("Expected results"); // Access meta results let row_count = cmd_results .meta_get(&source.with_segment("rows")) .expect("Expected rows"); let columns = cmd_results .meta_get(&source.with_segment("columns")) .expect("Expected columns"); println!("Query returned {} rows with columns: {}", row_count, columns); // Access the DataFrame let data_result = cmd_results .data_get(&source.with_segment("data")) .expect("Expected data"); }
Common Patterns
Chaining SQL Queries
Use the output of one SQL query as input to another:
#![allow(unused)] fn main() { // First query: filter data let filter_attrs = ObjectBuilder::new() .insert("tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "orders") .insert("source", "data.load.orders.data") .build_scalar(), ])) .insert("query", "SELECT * FROM orders WHERE status = 'completed'") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("step1")) .await? .add_command::<SqlCommand>("filtered", &filter_attrs) .await?; // Second query: aggregate the filtered data let agg_attrs = ObjectBuilder::new() .insert("tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "orders") .insert("source", "step1.filtered.data") // Reference previous result .build_scalar(), ])) .insert("query", "SELECT category, SUM(total) as revenue FROM orders GROUP BY category") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("step2")) .await? .add_command::<SqlCommand>("by_category", &agg_attrs) .await?; }
SqlCommand + AggregateCommand
Query with SQL, then compute scalar statistics with AggregateCommand:
#![allow(unused)] fn main() { // SQL query let query_attrs = ObjectBuilder::new() .insert("tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "products") .insert("source", "data.load.products.data") .build_scalar(), ])) .insert("query", "SELECT * FROM products WHERE in_stock = true") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("query")) .await? .add_command::<SqlCommand>("in_stock", &query_attrs) .await?; // Aggregate the query result let agg_attrs = ObjectBuilder::new() .insert("source", "query.in_stock.data") .insert("aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "avg_price") .insert("column", "price") .insert("op", "mean") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("summary", &agg_attrs) .await?; }
SQL Dialect
SqlCommand uses Polars' SQL context, which supports a subset of standard SQL:
SELECT,FROM,WHERE,ORDER BY,LIMITJOIN(INNER, LEFT, RIGHT, FULL, CROSS)GROUP BY,HAVING- Common aggregate functions:
COUNT,SUM,AVG,MIN,MAX - String functions, date functions, and more
Refer to the Polars SQL documentation for the complete list of supported features.
Error Handling
SqlCommand will return an error if:
- A table source store path does not exist
- The SQL query syntax is invalid
- A referenced column does not exist in the table
- The query execution fails for any reason
AggregateCommand
AggregateCommand computes scalar statistics from tabular data. It supports a variety of aggregation operations including count, sum, mean, min, max, median, and more.
When to Use
Use AggregateCommand when you need to:
- Compute summary statistics from a DataFrame
- Extract scalar values (counts, sums, averages) for use in templates or conditions
- Calculate multiple aggregations in a single command
- Get values like row count, unique counts, or null counts
Attributes
| Attribute | Type | Required | Description |
|---|---|---|---|
source | String | Yes | Store path to tabular data (e.g., data.load.products.data) |
aggregations | Array of objects | Yes | Array of aggregation specifications |
Aggregation Object Fields
Each object in the aggregations array specifies one aggregation:
| Field | Type | Required | Description |
|---|---|---|---|
name | String | Yes | Output scalar name for this aggregation |
column | String | No | Column to aggregate (not required for count) |
op | String | Yes | Aggregation operation to perform |
Supported Operations
| Operation | Aliases | Column Required | Description |
|---|---|---|---|
sum | - | Yes | Sum of values in the column |
mean | avg, average | Yes | Arithmetic mean of values |
min | - | Yes | Minimum value |
max | - | Yes | Maximum value |
count | len | No | Number of rows in the DataFrame |
first | - | Yes | First value in the column |
last | - | Yes | Last value in the column |
std | stddev | Yes | Standard deviation |
median | - | Yes | Median value |
n_unique | nunique, distinct | Yes | Count of unique values |
null_count | nulls | Yes | Count of null values in the column |
Results
Data Results (Per Aggregation)
For each aggregation in the aggregations array, a scalar result is produced:
| Result | Type | Description |
|---|---|---|
{name} | Scalar (Number) | The computed aggregation value |
The result path is {output_prefix}.{name}, where {name} is the name field from the aggregation object.
Examples
Basic Aggregations
#![allow(unused)] fn main() { use panopticon_core::prelude::*; let attrs = ObjectBuilder::new() .insert("source", "data.load.products.data") .insert( "aggregations", ScalarValue::Array(vec![ // Count doesn't need a column ObjectBuilder::new() .insert("name", "row_count") .insert("op", "count") .build_scalar(), // Sum requires a column ObjectBuilder::new() .insert("name", "total_price") .insert("column", "price") .insert("op", "sum") .build_scalar(), // Mean/average ObjectBuilder::new() .insert("name", "avg_price") .insert("column", "price") .insert("op", "mean") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("summary", &attrs) .await?; // Results available at: // - stats.summary.row_count // - stats.summary.total_price // - stats.summary.avg_price }
Full Statistical Summary
Compute comprehensive statistics for a dataset:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert("source", "data.load.products.data") .insert( "aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "row_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "total_price") .insert("column", "price") .insert("op", "sum") .build_scalar(), ObjectBuilder::new() .insert("name", "avg_price") .insert("column", "price") .insert("op", "mean") .build_scalar(), ObjectBuilder::new() .insert("name", "max_quantity") .insert("column", "quantity") .insert("op", "max") .build_scalar(), ObjectBuilder::new() .insert("name", "min_quantity") .insert("column", "quantity") .insert("op", "min") .build_scalar(), ObjectBuilder::new() .insert("name", "median_price") .insert("column", "price") .insert("op", "median") .build_scalar(), ObjectBuilder::new() .insert("name", "price_stddev") .insert("column", "price") .insert("op", "std") .build_scalar(), ObjectBuilder::new() .insert("name", "unique_categories") .insert("column", "category") .insert("op", "n_unique") .build_scalar(), ObjectBuilder::new() .insert("name", "missing_descriptions") .insert("column", "description") .insert("op", "null_count") .build_scalar(), ]), ) .build_hashmap(); }
First and Last Values
Extract the first or last value from a column (useful for time-series data):
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert("source", "data.load.events.data") .insert( "aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "first_event") .insert("column", "event_type") .insert("op", "first") .build_scalar(), ObjectBuilder::new() .insert("name", "last_event") .insert("column", "event_type") .insert("op", "last") .build_scalar(), ObjectBuilder::new() .insert("name", "earliest_timestamp") .insert("column", "timestamp") .insert("op", "first") .build_scalar(), ObjectBuilder::new() .insert("name", "latest_timestamp") .insert("column", "timestamp") .insert("op", "last") .build_scalar(), ]), ) .build_hashmap(); }
Accessing Results
#![allow(unused)] fn main() { let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; let source = StorePath::from_segments(["stats", "products"]); let cmd_results = results.get_by_source(&source).expect("Expected results"); // Access aggregation results let row_count = cmd_results .data_get(&source.with_segment("row_count")) .and_then(|r| r.as_scalar()) .expect("Expected row_count"); let avg_price = cmd_results .data_get(&source.with_segment("avg_price")) .and_then(|r| r.as_scalar()) .expect("Expected avg_price"); println!("Products: {} rows, average price: {}", row_count.1, avg_price.1); }
Common Patterns
Using Aggregates in Templates
Aggregation results are stored in the scalar store and can be referenced in Tera templates:
#![allow(unused)] fn main() { // First: aggregate data let agg_attrs = ObjectBuilder::new() .insert("source", "data.load.products.data") .insert("aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "total_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "total_value") .insert("column", "price") .insert("op", "sum") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("summary", &agg_attrs) .await?; // Then: use in a template let template_attrs = ObjectBuilder::new() .insert("templates", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "report") .insert("content", "Total products: {{ stats.summary.total_count }}\nTotal value: ${{ stats.summary.total_value }}") .build_scalar(), ])) .insert("render", "report") .insert("output", "/tmp/report.txt") .build_hashmap(); }
Using Aggregates in Conditions
Branch logic based on aggregation results:
#![allow(unused)] fn main() { // Aggregate first let agg_attrs = ObjectBuilder::new() .insert("source", "data.load.orders.data") .insert("aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "order_count") .insert("op", "count") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("metrics")) .await? .add_command::<AggregateCommand>("orders", &agg_attrs) .await?; // Condition based on aggregate let condition_attrs = ObjectBuilder::new() .insert("branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "high_volume") .insert("if", "metrics.orders.order_count > 1000") .insert("then", "High order volume detected") .build_scalar(), ObjectBuilder::new() .insert("name", "normal") .insert("if", "true") .insert("then", "Normal order volume") .build_scalar(), ])) .build_hashmap(); }
Aggregating SQL Query Results
Chain with SqlCommand to aggregate filtered data:
#![allow(unused)] fn main() { // Query let query_attrs = ObjectBuilder::new() .insert("tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "orders") .insert("source", "data.load.orders.data") .build_scalar(), ])) .insert("query", "SELECT * FROM orders WHERE status = 'completed'") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("query")) .await? .add_command::<SqlCommand>("completed", &query_attrs) .await?; // Aggregate the filtered results let agg_attrs = ObjectBuilder::new() .insert("source", "query.completed.data") // Reference SQL output .insert("aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "completed_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "total_revenue") .insert("column", "total") .insert("op", "sum") .build_scalar(), ])) .build_hashmap(); }
Error Handling
AggregateCommand will return an error if:
- The source store path does not exist
- A specified column does not exist in the DataFrame
- An operation that requires a column (anything except
count) is missing thecolumnfield - The operation name is not recognized
Type Handling
- Numeric columns return appropriate numeric types (integer or float)
firstandlastoperations work on both numeric and string columns- Non-finite values (NaN, infinity) are converted to null
- Integer results are preserved when possible (e.g., count, sum of integers)
ConditionCommand
ConditionCommand evaluates Tera expressions to select between multiple branches. It provides if/then branching logic for pipelines, producing a result based on the first matching condition.
When to Use
Use ConditionCommand when you need to:
- Choose between different values based on runtime conditions
- Implement feature flags or configuration-based branching
- Generate different outputs based on data characteristics
- Create conditional messages or labels
Attributes
| Attribute | Type | Required | Description |
|---|---|---|---|
branches | Array of objects | Yes | Array of condition branches evaluated in order |
default | String | No | Default value if no branch matches (supports Tera substitution) |
Branch Object Fields
Each object in the branches array defines one condition:
| Field | Type | Required | Description |
|---|---|---|---|
name | String | Yes | Unique identifier for this branch |
if | String | Yes | Tera expression to evaluate as the condition |
then | String | Yes | Value if condition is true (supports Tera substitution) |
Results
Data Results (Fixed)
| Result | Type | Description |
|---|---|---|
result | String | The value from the matched branch or default |
matched | Boolean | Whether a branch condition matched (false if default was used) |
branch_index | Number | Index of the matched branch (0-based), or -1 if default was used |
Data Results (Per Branch)
For each branch in the branches array, an object is stored:
| Result | Type | Description |
|---|---|---|
{name} | Object | Contains matched (bool) and value (string) for this branch |
Condition Evaluation
The if expression is wrapped in {{ }} and evaluated as a Tera expression. The result is considered truthy if it is:
- A non-empty string (except
"false","0","null","undefined") - A non-zero number
- Boolean
true
Branches are evaluated in order. The first truthy branch wins, and subsequent branches are not evaluated.
Examples
Basic Conditional
#![allow(unused)] fn main() { use panopticon_core::prelude::*; // Set up inputs pipeline .add_namespace( NamespaceBuilder::new("inputs") .static_ns() .insert("user_role", ScalarValue::String("admin".to_string())), ) .await?; let attrs = ObjectBuilder::new() .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "admin_greeting") .insert("if", "inputs.user_role == 'admin'") .insert("then", "Welcome, Administrator!") .build_scalar(), ObjectBuilder::new() .insert("name", "user_greeting") .insert("if", "inputs.user_role == 'user'") .insert("then", "Welcome, User!") .build_scalar(), ]), ) .insert("default", "Welcome, Guest!") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("greeting")) .await? .add_command::<ConditionCommand>("message", &attrs) .await?; // Result: "Welcome, Administrator!" }
Feature Flag Pattern
#![allow(unused)] fn main() { // Static namespace with feature flags pipeline .add_namespace( NamespaceBuilder::new("features") .static_ns() .insert("new_dashboard", ScalarValue::Bool(true)) .insert("beta_api", ScalarValue::Bool(false)), ) .await?; let attrs = ObjectBuilder::new() .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "new_dash") .insert("if", "features.new_dashboard") .insert("then", "/v2/dashboard") .build_scalar(), ]), ) .insert("default", "/v1/dashboard") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("routing")) .await? .add_command::<ConditionCommand>("dashboard_path", &attrs) .await?; }
Data-Driven Conditions
Use values from earlier pipeline stages (like aggregations):
#![allow(unused)] fn main() { // Aggregate order data let agg_attrs = ObjectBuilder::new() .insert("source", "data.load.orders.data") .insert("aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "order_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "total_revenue") .insert("column", "total") .insert("op", "sum") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("metrics")) .await? .add_command::<AggregateCommand>("orders", &agg_attrs) .await?; // Branch based on metrics let condition_attrs = ObjectBuilder::new() .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "high_volume") .insert("if", "metrics.orders.order_count > 1000") .insert("then", "HIGH_VOLUME") .build_scalar(), ObjectBuilder::new() .insert("name", "high_revenue") .insert("if", "metrics.orders.total_revenue > 50000") .insert("then", "HIGH_REVENUE") .build_scalar(), ObjectBuilder::new() .insert("name", "normal") .insert("if", "true") // Always matches as fallback .insert("then", "NORMAL") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("classification")) .await? .add_command::<ConditionCommand>("tier", &condition_attrs) .await?; }
Using Tera Filters and Functions
The if expression supports full Tera syntax:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "long_name") .insert("if", "inputs.name | length > 10") .insert("then", "Name is long") .build_scalar(), ObjectBuilder::new() .insert("name", "uppercase") .insert("if", "inputs.name == inputs.name | upper") .insert("then", "Name is all uppercase") .build_scalar(), ]), ) .insert("default", "Name is normal") .build_hashmap(); }
Accessing Results
#![allow(unused)] fn main() { let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; let source = StorePath::from_segments(["greeting", "message"]); let cmd_results = results.get_by_source(&source).expect("Expected results"); // Main result let result = cmd_results .data_get(&source.with_segment("result")) .and_then(|r| r.as_scalar()) .expect("Expected result"); println!("Result: {}", result.1); // Did any branch match? let matched = cmd_results .data_get(&source.with_segment("matched")) .and_then(|r| r.as_scalar()) .expect("Expected matched"); println!("Matched: {}", matched.1); // Which branch matched? let index = cmd_results .data_get(&source.with_segment("branch_index")) .and_then(|r| r.as_scalar()) .expect("Expected branch_index"); println!("Branch index: {}", index.1); // 0, 1, 2... or -1 for default }
Common Patterns
Conditional with when Attribute
Combine ConditionCommand with the when attribute to skip the entire command:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert("when", "inputs.feature_enabled") // Skip if false .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "greeting") .insert("if", "true") .insert("then", "Hello, {{ inputs.user_name }}! Feature is active.") .build_scalar(), ]), ) .insert("default", "Fallback message") .build_hashmap(); }
When when is false:
- The command status is
"skipped" - No data results are produced
result,matched, andbranch_indexare absent
Multiple Independent Conditions
Evaluate multiple conditions that don't depend on each other:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "is_admin") .insert("if", "inputs.role == 'admin'") .insert("then", "true") .build_scalar(), ObjectBuilder::new() .insert("name", "is_premium") .insert("if", "inputs.subscription == 'premium'") .insert("then", "true") .build_scalar(), ObjectBuilder::new() .insert("name", "has_access") .insert("if", "inputs.access_granted") .insert("then", "true") .build_scalar(), ]), ) .build_hashmap(); // Access individual branch results: // condition.check.is_admin.matched // condition.check.is_premium.matched // condition.check.has_access.matched }
Cascading If/Else
Use true as the final condition for an else clause:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "premium") .insert("if", "metrics.score > 90") .insert("then", "PREMIUM") .build_scalar(), ObjectBuilder::new() .insert("name", "standard") .insert("if", "metrics.score > 50") .insert("then", "STANDARD") .build_scalar(), ObjectBuilder::new() .insert("name", "basic") .insert("if", "true") // Else clause .insert("then", "BASIC") .build_scalar(), ]), ) .build_hashmap(); }
Using Results in Templates
Condition results are stored in the scalar store and can be used in templates:
#![allow(unused)] fn main() { // Condition command let condition_attrs = ObjectBuilder::new() .insert("branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "status") .insert("if", "metrics.health > 80") .insert("then", "healthy") .build_scalar(), ])) .insert("default", "degraded") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("check")) .await? .add_command::<ConditionCommand>("health", &condition_attrs) .await?; // Template using the result let template_attrs = ObjectBuilder::new() .insert("templates", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "report") .insert("content", "System status: {{ check.health.result }}") .build_scalar(), ])) .insert("render", "report") .insert("output", "/tmp/status.txt") .build_hashmap(); }
Error Handling
ConditionCommand will return an error if:
- A branch is missing required fields (
name,if, orthen) - A Tera expression in
iforthencannot be evaluated - Referenced variables in expressions do not exist in the scalar store
TemplateCommand
TemplateCommand renders Tera templates and writes the output to a file. It supports template inheritance, includes, and loading templates from files or globs.
When to Use
Use TemplateCommand when you need to:
- Generate reports, configuration files, or other text output
- Use template inheritance with base templates and blocks
- Render dynamic content using data from the pipeline
- Create multiple output files from templates
Attributes
| Attribute | Type | Required | Description |
|---|---|---|---|
templates | Array of objects | No | Inline template definitions (can be combined with template_glob) |
template_glob | String | No | Glob pattern to load templates from disk (e.g., templates/**/*.tera) |
render | String | Yes | Name of the template to render (supports Tera substitution) |
output | String | Yes | File path to write the rendered output (supports Tera substitution) |
capture | Boolean | No | If true, store the rendered content in the content result (default: false) |
At least one of templates or template_glob must provide the template to render.
Template Object Fields
Each object in the templates array defines one template:
| Field | Type | Required | Description |
|---|---|---|---|
name | String | Yes | Name to register the template under |
content | String | No | Raw template content (mutually exclusive with file) |
file | String | No | Path to template file (mutually exclusive with content) |
You must specify either content or file, but not both.
Results
Meta Results
| Result | Type | Description |
|---|---|---|
line_count | Number | Number of lines in the rendered output |
size | Number | Size in bytes of the rendered output |
Data Results
| Result | Type | Description |
|---|---|---|
content | String | The rendered content (only populated when capture is true, otherwise empty) |
Examples
Simple Inline Template
#![allow(unused)] fn main() { use panopticon_core::prelude::*; // Set up data for the template pipeline .add_namespace( NamespaceBuilder::new("inputs") .static_ns() .insert("title", ScalarValue::String("Monthly Report".to_string())) .insert("date", ScalarValue::String("2024-01-15".to_string())), ) .await?; let attrs = ObjectBuilder::new() .insert( "templates", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "report") .insert("content", "# {{ inputs.title }}\n\nGenerated on: {{ inputs.date }}") .build_scalar(), ]), ) .insert("render", "report") .insert("output", "/tmp/report.md") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("generate")) .await? .add_command::<TemplateCommand>("report", &attrs) .await?; }
Template Inheritance with Glob Loading
Load templates from disk using a glob pattern:
#![allow(unused)] fn main() { // Directory structure: // templates/ // base.tera - {% block content %}{% endblock %} // header.tera - Navigation HTML // page.tera - {% extends "base.tera" %}{% block content %}...{% endblock %} let attrs = ObjectBuilder::new() .insert("template_glob", "templates/**/*.tera") .insert("render", "page.tera") .insert("output", "/tmp/page.html") .insert("capture", true) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("render")) .await? .add_command::<TemplateCommand>("page", &attrs) .await?; }
Template Inheritance with Inline Templates
Define a base template and a child template inline:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "templates", ScalarValue::Array(vec![ // Base template with blocks ObjectBuilder::new() .insert("name", "base") .insert("content", r#"<!DOCTYPE html> <html> <head><title>{% block title %}Default Title{% endblock %}</title></head> <body> {% block content %}{% endblock %} </body> </html>"#) .build_scalar(), // Child template that extends base ObjectBuilder::new() .insert("name", "page") .insert("content", r#"{% extends "base" %} {% block title %}{{ inputs.page_title }}{% endblock %} {% block content %} <h1>{{ inputs.page_title }}</h1> <p>{{ inputs.page_content }}</p> {% endblock %}"#) .build_scalar(), ]), ) .insert("render", "page") .insert("output", "/tmp/page.html") .build_hashmap(); }
Loading Templates from Files
Reference template files instead of inline content:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "templates", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "base") .insert("file", "templates/base.tera") .build_scalar(), ObjectBuilder::new() .insert("name", "page") .insert("file", "templates/page.tera") .build_scalar(), ]), ) .insert("render", "page") .insert("output", "/tmp/output.html") .build_hashmap(); }
Using Pipeline Data in Templates
Templates have access to all scalar values in the store:
#![allow(unused)] fn main() { // Load and aggregate data let agg_attrs = ObjectBuilder::new() .insert("source", "data.load.products.data") .insert("aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "total_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "total_value") .insert("column", "price") .insert("op", "sum") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("summary", &agg_attrs) .await?; // Use aggregation results in template let template_attrs = ObjectBuilder::new() .insert( "templates", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "summary") .insert("content", r#"Product Summary =============== Total products: {{ stats.summary.total_count }} Total value: ${{ stats.summary.total_value }} {% if stats.summary.total_count > 100 %} Note: High product count! {% endif %}"#) .build_scalar(), ]), ) .insert("render", "summary") .insert("output", "/tmp/summary.txt") .insert("capture", true) .build_hashmap(); }
Dynamic Output Path
Use Tera substitution for the output path:
#![allow(unused)] fn main() { pipeline .add_namespace( NamespaceBuilder::new("config") .static_ns() .insert("output_dir", ScalarValue::String("/var/reports".to_string())) .insert("report_name", ScalarValue::String("monthly".to_string())), ) .await?; let attrs = ObjectBuilder::new() .insert("templates", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "report") .insert("content", "Report content here...") .build_scalar(), ])) .insert("render", "report") .insert("output", "{{ config.output_dir }}/{{ config.report_name }}.txt") .build_hashmap(); }
Accessing Results
#![allow(unused)] fn main() { let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; let source = StorePath::from_segments(["render", "page"]); let cmd_results = results.get_by_source(&source).expect("Expected results"); // Meta results let size = cmd_results .meta_get(&source.with_segment("size")) .expect("Expected size"); let lines = cmd_results .meta_get(&source.with_segment("line_count")) .expect("Expected line_count"); println!("Rendered {} bytes, {} lines", size, lines); // Content (only if capture=true) if let Some(content) = cmd_results .data_get(&source.with_segment("content")) .and_then(|r| r.as_scalar()) { println!("Content: {}", content.1); } }
Common Patterns
Combining Glob and Inline Templates
Load base templates from disk, add custom templates inline:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert("template_glob", "templates/**/*.tera") // Load from disk .insert( "templates", ScalarValue::Array(vec![ // Add or override templates ObjectBuilder::new() .insert("name", "custom_page") .insert("content", r#"{% extends "base.tera" %} {% block content %}Custom content here{% endblock %}"#) .build_scalar(), ]), ) .insert("render", "custom_page") .insert("output", "/tmp/custom.html") .build_hashmap(); }
Iterating Over Data with Tera
Use Tera's for loop to iterate over arrays:
#![allow(unused)] fn main() { pipeline .add_namespace( NamespaceBuilder::new("inputs") .static_ns() .insert( "items", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "Item 1") .insert("price", 10.0) .build_scalar(), ObjectBuilder::new() .insert("name", "Item 2") .insert("price", 20.0) .build_scalar(), ]), ), ) .await?; let attrs = ObjectBuilder::new() .insert("templates", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "list") .insert("content", r#"Items: {% for item in inputs.items %} - {{ item.name }}: ${{ item.price }} {% endfor %}"#) .build_scalar(), ])) .insert("render", "list") .insert("output", "/tmp/items.txt") .build_hashmap(); }
Using Tera Includes
Include templates within other templates:
#![allow(unused)] fn main() { let attrs = ObjectBuilder::new() .insert( "templates", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "header") .insert("content", "<header>{{ inputs.site_name }}</header>") .build_scalar(), ObjectBuilder::new() .insert("name", "footer") .insert("content", "<footer>Copyright 2024</footer>") .build_scalar(), ObjectBuilder::new() .insert("name", "page") .insert("content", r#"{% include "header" %} <main>Page content</main> {% include "footer" %}"#) .build_scalar(), ]), ) .insert("render", "page") .insert("output", "/tmp/page.html") .build_hashmap(); }
Using Condition Results in Templates
Reference ConditionCommand results:
#![allow(unused)] fn main() { // Condition first let condition_attrs = ObjectBuilder::new() .insert("branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "status") .insert("if", "metrics.health > 80") .insert("then", "healthy") .build_scalar(), ])) .insert("default", "degraded") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("check")) .await? .add_command::<ConditionCommand>("health", &condition_attrs) .await?; // Template using condition result let template_attrs = ObjectBuilder::new() .insert("templates", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "status") .insert("content", r#"System Status: {{ check.health.result }} {% if check.health.matched %} (condition matched at branch {{ check.health.branch_index }}) {% else %} (using default value) {% endif %}"#) .build_scalar(), ])) .insert("render", "status") .insert("output", "/tmp/status.txt") .build_hashmap(); }
Tera Features
TemplateCommand uses the Tera templating engine, which supports:
- Variable substitution:
{{ variable }} - Conditionals:
{% if condition %}...{% endif %} - Loops:
{% for item in items %}...{% endfor %} - Template inheritance:
{% extends "base" %}and{% block name %}...{% endblock %} - Includes:
{% include "partial" %} - Filters:
{{ value | upper }},{{ value | length }}, etc. - Built-in functions and operators
See the Tera documentation for complete syntax reference.
Error Handling
TemplateCommand will return an error if:
- A template file specified in
filedoes not exist - Both
contentandfileare specified for a template (mutually exclusive) - Neither
contentnorfileis specified for a template - The
template_globpattern is invalid or cannot be read - The template specified in
renderdoes not exist - Template syntax errors (unclosed blocks, invalid expressions)
- Referenced variables do not exist in the scalar store
- The output directory cannot be created
- The output file cannot be written
Directory Creation
TemplateCommand automatically creates parent directories for the output file if they do not exist.
Working with Data
This section covers how data flows through Panopticon pipelines and the tools available for manipulating and accessing that data.
Data Flow Overview
In Panopticon, data flows between commands through two parallel storage systems:
+------------------+ +------------------+ +------------------+
| Command A | | Command B | | Command C |
+------------------+ +------------------+ +------------------+
| | |
v v v
+------------------------------------------------------------------------+
| ExecutionContext |
| +----------------------------+ +----------------------------+ |
| | ScalarStore | | TabularStore | |
| | (JSON-like values) | | (Polars DataFrames) | |
| +----------------------------+ +----------------------------+ |
+------------------------------------------------------------------------+
^ ^ ^
| | |
StorePath refs StorePath refs StorePath refs
The Two Stores
Panopticon maintains two separate data stores during pipeline execution:
| Store | Type Alias | Contents | Use Case |
|---|---|---|---|
| ScalarStore | ScalarValue | JSON-compatible values (strings, numbers, booleans, arrays, objects) | Configuration, metadata, single values, template variables |
| TabularStore | TabularValue | Polars DataFrames | Structured data, CSV/JSON/Parquet files, SQL query results |
StorePath: The Universal Reference
All data in both stores is addressed using StorePath - a dot-separated path that uniquely identifies values:
#![allow(unused)] fn main() { // Creating paths let path = StorePath::from_segments(["namespace", "command", "field"]); let child = path.with_segment("subfield"); // namespace.command.field.subfield let indexed = path.with_index(0); // namespace.command.field.0 }
StorePaths serve as:
- Storage keys: Where commands write their outputs
- Dependency references: How commands declare what data they need
- Template variables: Accessed via
{{ namespace.command.field }}syntax - Result accessors: How you retrieve data after pipeline execution
Data Flow Example
Consider a pipeline that loads a CSV file and performs aggregations:
Pipeline Definition:
====================
Namespace: data Namespace: stats
+------------------+ +------------------+
| FileCommand | | AggregateCommand |
| name: "load" | -----------> | source: "data. |
| | | load.products. |
| | | data" |
+------------------+ +------------------+
Data Flow:
==========
1. FileCommand executes:
- Reads products.csv
- Stores DataFrame at: data.load.products.data
- Stores row count at: data.load.products.row_count
2. AggregateCommand executes:
- Retrieves DataFrame from: data.load.products.data
- Computes aggregations
- Stores results at: stats.products.*
Chapter Overview
This section contains three detailed chapters:
Store Paths
Learn the StorePath API for creating, manipulating, and navigating data paths:
from_segments()- Build paths from componentswith_segment()- Extend paths with new segmentswith_index()- Add numeric indices for iteration
Tera Templating
Master the Tera templating syntax used throughout Panopticon:
- Variable interpolation with
{{ path.to.value }} - Filters for transforming values
- Control structures for conditional content
- Template inheritance for complex outputs
Polars DataFrames
Work with tabular data using Polars:
- Understanding
TabularValue(the DataFrame type alias) - Accessing DataFrame results from the
TabularStore - Export formats: CSV, JSON, Parquet
Quick Reference
Accessing Data During Execution
Commands receive an ExecutionContext that provides access to both stores:
#![allow(unused)] fn main() { // In a command's execute method: async fn execute(&self, ctx: &ExecutionContext, source: &StorePath) -> Result<()> { // Read scalar values let value = ctx.scalar().get(&some_path).await?; // Read tabular values let df = ctx.tabular().get(&some_path).await?; // Template substitution (uses ScalarStore) let rendered = ctx.substitute("Hello, {{ user.name }}!").await?; // Write results ctx.scalar().insert(&source.with_segment("output"), my_value).await?; ctx.tabular().insert(&source.with_segment("data"), my_df).await?; Ok(()) } }
Accessing Data After Execution
After a pipeline completes, use ResultStore to access outputs:
#![allow(unused)] fn main() { let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // Get results for a specific command let source = StorePath::from_segments(["namespace", "command"]); if let Some(cmd_results) = results.get_by_source(&source) { // Access scalar results if let Some(value) = cmd_results.data_get(&source.with_segment("field")) { if let Some((ty, scalar)) = value.as_scalar() { println!("Scalar: {:?} = {}", ty, scalar); } } // Access tabular results (exported to disk) if let Some(value) = cmd_results.data_get(&source.with_segment("data")) { if let Some((path, format, rows, cols)) = value.as_tabular() { println!("Table: {} ({} rows x {} cols)", path.display(), rows, cols); } } } }
Next Steps
Continue to Store Paths to learn the details of the StorePath API.
Store Paths
StorePath is the fundamental addressing mechanism in Panopticon. Every piece of data - whether scalar values or tabular DataFrames - is stored and retrieved using a StorePath.
What is a StorePath?
A StorePath is a sequence of string segments that form a hierarchical path, similar to filesystem paths but using dots as separators:
namespace.command.field.subfield
^ ^ ^ ^
| | | +-- Nested field
| | +--------- Result field
| +---------------- Command name
+------------------------- Namespace name
The path data.load.products.row_count represents:
- Namespace:
data - Command:
load - Field:
products - Subfield:
row_count
Creating StorePaths
from_segments()
Build a StorePath from an iterator of segments:
#![allow(unused)] fn main() { use panopticon_core::prelude::*; // From a slice of string literals let path = StorePath::from_segments(["namespace", "command", "field"]); assert_eq!(path.to_dotted(), "namespace.command.field"); // From a Vec let segments = vec!["data", "load", "products"]; let path = StorePath::from_segments(segments); assert_eq!(path.to_dotted(), "data.load.products"); // From any iterator of Into<String> let path = StorePath::from_segments(["a", "b", "c"].into_iter()); }
from_dotted()
Parse a dotted string into a StorePath:
#![allow(unused)] fn main() { let path = StorePath::from_dotted("config.regions.us-east"); assert_eq!(path.segments(), &["config", "regions", "us-east"]); }
Extending StorePaths
StorePaths are immutable by default. Extension methods return new paths:
with_segment()
Add a named segment to create a child path:
#![allow(unused)] fn main() { let base = StorePath::from_segments(["namespace", "command"]); // Add a field let field = base.with_segment("output"); assert_eq!(field.to_dotted(), "namespace.command.output"); // Chain multiple segments let nested = base .with_segment("results") .with_segment("summary"); assert_eq!(nested.to_dotted(), "namespace.command.results.summary"); }
with_index()
Add a numeric index segment (useful for iteration):
#![allow(unused)] fn main() { let base = StorePath::from_segments(["classify", "region"]); // First iteration let iter0 = base.with_index(0); assert_eq!(iter0.to_dotted(), "classify.region.0"); // Access a field within an iteration let result = iter0.with_segment("result"); assert_eq!(result.to_dotted(), "classify.region.0.result"); }
add_segment() (Mutable)
Mutate a path in place:
#![allow(unused)] fn main() { let mut path = StorePath::from_segments(["namespace"]); path.add_segment("command"); path.add_segment("field"); assert_eq!(path.to_dotted(), "namespace.command.field"); }
Inspecting StorePaths
segments()
Get the path segments as a slice:
#![allow(unused)] fn main() { let path = StorePath::from_segments(["a", "b", "c"]); assert_eq!(path.segments(), &["a", "b", "c"]); }
namespace()
Get the first segment (typically the namespace name):
#![allow(unused)] fn main() { let path = StorePath::from_segments(["data", "load", "file"]); assert_eq!(path.namespace(), Some(&"data".to_string())); let empty = StorePath::default(); assert_eq!(empty.namespace(), None); }
to_dotted()
Convert to a dot-separated string:
#![allow(unused)] fn main() { let path = StorePath::from_segments(["config", "database", "host"]); assert_eq!(path.to_dotted(), "config.database.host"); }
starts_with()
Check if a path is a prefix of another:
#![allow(unused)] fn main() { let parent = StorePath::from_segments(["data", "load"]); let child = StorePath::from_segments(["data", "load", "products", "data"]); let other = StorePath::from_segments(["stats", "aggregate"]); assert!(child.starts_with(&parent)); assert!(!other.starts_with(&parent)); }
contains()
Check if any segment matches a value:
#![allow(unused)] fn main() { let path = StorePath::from_segments(["data", "load", "products"]); assert!(path.contains("load")); assert!(!path.contains("stats")); }
Practical Examples
Example: Iterating Over Object Keys
This example from iterate_object_keys.rs shows how StorePaths work with iterative namespaces:
use panopticon_core::prelude::*; #[tokio::main] async fn main() -> anyhow::Result<()> { let mut pipeline = Pipeline::new(); // Static namespace with region data pipeline.add_namespace( NamespaceBuilder::new("config") .static_ns() .insert("regions", ObjectBuilder::new() .insert("us-east", "Virginia") .insert("us-west", "Oregon") .insert("eu-west", "Ireland") .build_scalar() ), ).await?; // Iterative namespace that loops over region keys let mut handle = pipeline.add_namespace( NamespaceBuilder::new("classify") .iterative() .store_path(StorePath::from_segments(["config", "regions"])) .scalar_object_keys(None, false) .iter_var("region") .index_var("idx"), ).await?; // Add condition command for each iteration handle.add_command::<ConditionCommand>("region", &condition_attrs).await?; // Execute pipeline let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // Access results per iteration using indexed StorePaths let mut idx = 0; loop { // Build path with iteration index let source = StorePath::from_segments(["classify", "region"]) .with_index(idx); // Try to get results for this iteration let Some(cmd_results) = results.get_by_source(&source) else { break; // No more iterations }; // Access specific fields let result = cmd_results .data_get(&source.with_segment("result")) .and_then(|r| r.as_scalar()) .expect("Expected result"); let matched = cmd_results .data_get(&source.with_segment("matched")) .and_then(|r| r.as_scalar()) .expect("Expected matched"); println!("[{}] {} (matched: {})", idx, result.1, matched.1); idx += 1; } Ok(()) }
Example: Accessing Aggregation Results
#![allow(unused)] fn main() { use panopticon_core::prelude::*; // After pipeline execution... let results = completed.results(ResultSettings::default()).await?; // Build path to the stats command let stats_source = StorePath::from_segments(["stats", "products"]); if let Some(cmd_results) = results.get_by_source(&stats_source) { // Access individual aggregation results let fields = ["row_count", "total_price", "avg_price", "max_quantity"]; for field in fields { let field_path = stats_source.with_segment(field); if let Some(value) = cmd_results.data_get(&field_path) { if let Some((ty, scalar)) = value.as_scalar() { println!("{}: {} ({:?})", field, scalar, ty); } } } } }
StorePath in Templates
StorePaths directly correspond to Tera template variables. The path config.database.host is accessed in templates as:
{{ config.database.host }}
See Tera Templating for more details on template syntax.
StorePath Diagram
StorePath Structure:
====================
StorePath::from_segments(["data", "load", "products", "row_count"])
| | | |
v v v v
+------+------+---------+-----------+
segments: Vec<String>|"data"|"load"|"products"|"row_count"|
+------+------+---------+-----------+
0 1 2 3
to_dotted() -> "data.load.products.row_count"
namespace() -> Some("data")
segments() -> &["data", "load", "products", "row_count"]
Path Operations:
================
base = ["data", "load"]
|
+-- with_segment("file") -> ["data", "load", "file"]
|
+-- with_index(0) -> ["data", "load", "0"]
|
+-- with_segment("products")
|
+-- with_segment("data") -> ["data", "load", "products", "data"]
Best Practices
-
Use meaningful segment names: Paths should be self-documenting
#![allow(unused)] fn main() { // Good StorePath::from_segments(["users", "fetch", "active_count"]) // Less clear StorePath::from_segments(["u", "f", "ac"]) } -
Build paths incrementally: Use
with_segment()to build on base paths#![allow(unused)] fn main() { let base = StorePath::from_segments(["namespace", "command"]); let output = base.with_segment("output"); let data = base.with_segment("data"); } -
Use
with_index()for iteration: Makes iteration patterns clear#![allow(unused)] fn main() { for i in 0..count { let iter_path = base.with_index(i); // Process iteration... } } -
Store base paths as constants: Avoid typos in repeated path construction
#![allow(unused)] fn main() { const DATA_NS: &[&str] = &["data", "load"]; let base = StorePath::from_segments(DATA_NS.iter().copied()); }
Next Steps
Continue to Tera Templating to learn how StorePaths integrate with template syntax.
Tera Templating
Panopticon uses Tera as its templating engine. Tera provides a powerful, Jinja2-inspired syntax for variable interpolation, filters, and control structures.
How Templates Connect to Data
The ScalarStore holds all scalar values (strings, numbers, booleans, arrays, objects) and serves as the template context. Any value stored via a StorePath becomes available in templates using dot notation:
ScalarStore Contents: Template Access:
===================== ================
inputs.site_name = "My Site" -> {{ inputs.site_name }}
inputs.page_title = "Home" -> {{ inputs.page_title }}
config.debug = true -> {{ config.debug }}
data.load.row_count = 42 -> {{ data.load.row_count }}
Basic Variable Interpolation
Simple Values
Access scalar values using double curly braces:
<h1>{{ inputs.site_name }}</h1>
<p>Processing {{ data.load.row_count }} records</p>
Nested Objects
Navigate into nested structures with dot notation:
#![allow(unused)] fn main() { // Stored as: ObjectBuilder::new() .object("database", ObjectBuilder::new() .insert("host", "localhost") .insert("port", 5432)) .build_scalar() }
<!-- Template -->
Connecting to {{ config.database.host }}:{{ config.database.port }}
Array Access
Access array elements by index:
First item: {{ items.0 }}
Third item: {{ items.2 }}
Filters
Filters transform values using the pipe (|) syntax:
Built-in Filters
<!-- String manipulation -->
{{ name | upper }} <!-- JOHN -->
{{ name | lower }} <!-- john -->
{{ name | capitalize }} <!-- John -->
{{ name | title }} <!-- John Doe -->
{{ text | trim }} <!-- removes whitespace -->
{{ text | truncate(length=20) }}
<!-- Number formatting -->
{{ price | round }} <!-- 10 -->
{{ price | round(precision=2) }}<!-- 9.99 -->
<!-- Collections -->
{{ items | length }} <!-- array length -->
{{ items | first }} <!-- first element -->
{{ items | last }} <!-- last element -->
{{ items | reverse }} <!-- reversed array -->
{{ items | join(sep=", ") }} <!-- comma-separated -->
<!-- Default values -->
{{ maybe_null | default(value="N/A") }}
<!-- JSON encoding -->
{{ object | json_encode() }}
{{ object | json_encode(pretty=true) }}
<!-- Escaping -->
{{ html_content | safe }} <!-- no escaping -->
{{ user_input | escape }} <!-- HTML escape -->
Filter Chaining
Chain multiple filters together:
{{ name | trim | upper | truncate(length=10) }}
Control Structures
Conditionals
{% if config.debug %}
<div class="debug-panel">Debug mode enabled</div>
{% endif %}
{% if user.role == "admin" %}
<a href="/admin">Admin Panel</a>
{% elif user.role == "editor" %}
<a href="/edit">Edit Content</a>
{% else %}
<span>Welcome, guest</span>
{% endif %}
Loops
Iterate over arrays:
<ul>
{% for item in inputs.nav_items %}
<li><a href="{{ item.url }}">{{ item.label }}</a></li>
{% endfor %}
</ul>
Loop variables:
{% for item in items %}
{{ loop.index }} <!-- 1-indexed position -->
{{ loop.index0 }} <!-- 0-indexed position -->
{{ loop.first }} <!-- true if first iteration -->
{{ loop.last }} <!-- true if last iteration -->
{% endfor %}
Iterate over object key-value pairs:
{% for key, value in config.settings %}
{{ key }}: {{ value }}
{% endfor %}
Template Inheritance
Tera supports template inheritance for building complex layouts:
Base Template (base.tera)
<!DOCTYPE html>
<html>
<head>
<title>{% block title %}Default Title{% endblock %}</title>
</head>
<body>
{% block header %}{% endblock %}
<main>
{% block content %}{% endblock %}
</main>
<footer>
<p>Generated by Panopticon</p>
</footer>
</body>
</html>
Child Template (page.tera)
{% extends "base.tera" %}
{% block title %}{{ inputs.page_title }} - {{ inputs.site_name }}{% endblock %}
{% block header %}
{% include "header.tera" %}
{% endblock %}
{% block content %}
<article>
<h2>{{ inputs.page_title }}</h2>
<p>{{ inputs.page_content }}</p>
</article>
{% endblock %}
Include Template (header.tera)
<header>
<h1>{{ inputs.site_name }}</h1>
<nav>
{% for item in inputs.nav_items %}
<a href="{{ item.url }}">{{ item.label }}</a>
{% endfor %}
</nav>
</header>
Using Templates in Panopticon
TemplateCommand
The TemplateCommand renders Tera templates:
#![allow(unused)] fn main() { use panopticon_core::prelude::*; let mut pipeline = Pipeline::new(); // Add input data pipeline.add_namespace( NamespaceBuilder::new("inputs") .static_ns() .insert("site_name", ScalarValue::String("Panopticon Demo".into())) .insert("page_title", ScalarValue::String("Getting Started".into())) .insert("page_content", ScalarValue::String("Welcome!".into())) .insert("nav_items", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("label", "Home") .insert("url", "/") .build_scalar(), ObjectBuilder::new() .insert("label", "Docs") .insert("url", "/docs") .build_scalar(), ])), ).await?; // Configure template command let template_attrs = ObjectBuilder::new() .insert("template_glob", "/path/to/templates/**/*.tera") .insert("render", "page.tera") .insert("output", "/output/page.html") .insert("capture", true) // Also store rendered content in ScalarStore .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("render")) .await? .add_command::<TemplateCommand>("page", &template_attrs) .await?; }
Inline Template Substitution
Use ctx.substitute() for inline template rendering:
#![allow(unused)] fn main() { // In command execution let greeting = ctx.substitute("Hello, {{ user.name }}!").await?; }
Condition Expressions
The ConditionCommand uses Tera expressions for branching:
#![allow(unused)] fn main() { let condition_attrs = ObjectBuilder::new() .insert("branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "is_us") .insert("if", "region is starting_with(\"us-\")") .insert("then", "Region {{ region }} is in the US") .build_scalar(), ObjectBuilder::new() .insert("name", "is_eu") .insert("if", "region is starting_with(\"eu-\")") .insert("then", "Region {{ region }} is in the EU") .build_scalar(), ])) .insert("default", "Region {{ region }} is in an unknown area") .build_hashmap(); }
Tera Tests
Tera "tests" check conditions on values (used with is keyword):
{% if value is defined %}...{% endif %}
{% if value is undefined %}...{% endif %}
{% if value is odd %}...{% endif %}
{% if value is even %}...{% endif %}
{% if text is containing("needle") %}...{% endif %}
{% if text is starting_with("prefix") %}...{% endif %}
{% if text is ending_with("suffix") %}...{% endif %}
{% if text is matching("regex") %}...{% endif %}
Data Flow Diagram
Template Rendering Flow:
========================
+-------------------+
| ScalarStore |
| |
| inputs.site_name |
| inputs.page_title |
| inputs.nav_items |
| config.debug |
+--------+----------+
|
v
+-------------------+
| Tera Context | <- ScalarStore becomes the template context
+--------+----------+
|
v
+-------------------+
| Template File |
| |
| {{ inputs.xxx }} |
| {% for item %} |
| {% if config %} |
+--------+----------+
|
v
+-------------------+
| Rendered Output |
| |
| <h1>My Site</h1> |
| <p>Welcome!</p> |
+-------------------+
Common Patterns
Conditional CSS Classes
<div class="alert {% if level == "error" %}alert-danger{% elif level == "warning" %}alert-warning{% else %}alert-info{% endif %}">
{{ message }}
</div>
Building URLs with Parameters
<a href="/users/{{ user.id }}?tab={{ tab | default(value="overview") }}">
View Profile
</a>
JSON Data Embedding
<script>
const config = {{ config | json_encode(pretty=true) | safe }};
</script>
Iteration with Separators
{% for tag in tags %}{{ tag }}{% if not loop.last %}, {% endif %}{% endfor %}
Troubleshooting
Common Errors
| Error | Cause | Solution |
|---|---|---|
Variable not found | Path doesn't exist in ScalarStore | Check StorePath and ensure data is stored before template renders |
Cannot iterate over | Value is not an array | Verify the value type with {% if items is iterable %} |
Filter not found | Typo in filter name | Check Tera documentation for correct filter names |
Debugging Tips
-
Check available data: Use
{{ __tera_context }}to dump all available variables (if enabled) -
Use default values: Prevent errors from missing data
{{ maybe_missing | default(value="fallback") }} -
Test variable existence:
{% if my_var is defined %} {{ my_var }} {% else %} Variable not set {% endif %}
Reference Links
Next Steps
Continue to Polars DataFrames to learn about working with tabular data.
Polars DataFrames
Panopticon uses Polars for tabular data operations. Polars is a high-performance DataFrame library written in Rust, providing excellent performance for data manipulation tasks.
TabularValue and TabularStore
Type Definitions
#![allow(unused)] fn main() { // TabularValue is a type alias for Polars DataFrame pub type TabularValue = polars::prelude::DataFrame; // TabularStore manages DataFrames during pipeline execution pub struct TabularStore { store: Arc<RwLock<HashMap<String, TabularValue>>>, } }
Store Operations
The TabularStore provides async methods for managing DataFrames:
#![allow(unused)] fn main() { // Insert a DataFrame ctx.tabular().insert(&path, dataframe).await?; // Retrieve a DataFrame let df: Option<TabularValue> = ctx.tabular().get(&path).await?; // Remove a DataFrame let removed: Option<TabularValue> = ctx.tabular().remove(&path).await?; // List all stored paths let keys: Vec<String> = ctx.tabular().keys().await; }
Loading Tabular Data
FileCommand
Load data from CSV, JSON, or Parquet files:
#![allow(unused)] fn main() { let file_attrs = ObjectBuilder::new() .insert("files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "products") .insert("file", "/path/to/products.csv") .insert("format", "csv") .build_scalar(), ObjectBuilder::new() .insert("name", "orders") .insert("file", "/path/to/orders.parquet") .insert("format", "parquet") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; }
After execution, DataFrames are stored at paths like:
data.load.products.data- The DataFramedata.load.products.row_count- Number of rows (scalar)data.load.orders.data- Another DataFrame
SqlCommand
Query data using SQL:
#![allow(unused)] fn main() { let sql_attrs = ObjectBuilder::new() .insert("query", "SELECT * FROM products WHERE price > 100") .insert("sources", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "products") .insert("path", "data.load.products.data") .build_scalar(), ])) .build_hashmap(); }
Aggregating Data
AggregateCommand
Perform statistical aggregations on DataFrames:
#![allow(unused)] fn main() { let agg_attrs = ObjectBuilder::new() .insert("source", "data.load.products.data") .insert("aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "row_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "total_price") .insert("column", "price") .insert("op", "sum") .build_scalar(), ObjectBuilder::new() .insert("name", "avg_price") .insert("column", "price") .insert("op", "mean") .build_scalar(), ObjectBuilder::new() .insert("name", "max_quantity") .insert("column", "quantity") .insert("op", "max") .build_scalar(), ObjectBuilder::new() .insert("name", "min_quantity") .insert("column", "quantity") .insert("op", "min") .build_scalar(), ObjectBuilder::new() .insert("name", "median_price") .insert("column", "price") .insert("op", "median") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("products", &agg_attrs) .await?; }
Supported aggregation operations:
count- Row count (no column required)sum- Sum of column valuesmean- Average of column valuesmin- Minimum valuemax- Maximum valuemedian- Median value
Accessing DataFrame Results
During Execution
Commands can access DataFrames from the execution context:
#![allow(unused)] fn main() { async fn execute(&self, ctx: &ExecutionContext, source: &StorePath) -> Result<()> { // Get DataFrame from TabularStore let df_path = StorePath::from_segments(["data", "load", "products", "data"]); let df = ctx.tabular() .get(&df_path) .await? .context("DataFrame not found")?; // Work with the DataFrame println!("Columns: {:?}", df.get_column_names()); println!("Shape: {:?}", df.shape()); Ok(()) } }
After Execution
The ResultStore provides access to both scalar and tabular results:
#![allow(unused)] fn main() { let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; for cmd_results in results.iter() { println!("Source: {}", cmd_results.source().to_dotted()); // Iterate over metadata for (path, value) in cmd_results.meta_iter() { println!(" [meta] {} = {}", path.to_dotted(), value); } // Iterate over data results for (path, value) in cmd_results.data_iter() { match value.as_scalar() { Some((ty, val)) => { println!(" [data] {} = {} ({:?})", path.to_dotted(), val, ty); } None => { // Tabular results are exported to disk let (file_path, format, rows, cols) = value.as_tabular().expect("Expected tabular result"); println!( " [data] {} => {} ({} rows x {} cols)", path.to_dotted(), file_path.display(), rows, cols ); } } } } }
Export Formats
DataFrames are automatically exported to disk when accessing results. Configure the format via ResultSettings:
#![allow(unused)] fn main() { // Export as CSV let settings = ResultSettings::new() .with_format(TabularFormat::Csv) .with_output_path(PathBuf::from("/output/directory")); // Export as JSON let settings = ResultSettings::new() .with_format(TabularFormat::Json); // Export as Parquet (default) let settings = ResultSettings::new() .with_format(TabularFormat::Parquet); let results = completed.results(settings).await?; }
TabularFormat Options
| Format | Extension | Use Case |
|---|---|---|
TabularFormat::Csv | .csv | Human-readable, spreadsheet compatible |
TabularFormat::Json | .json | Web applications, APIs |
TabularFormat::Parquet | .parquet | Efficient storage, data pipelines |
Data Flow Diagram
Tabular Data Flow:
==================
+------------------+
| Input Files |
| - products.csv |
| - orders.json |
+--------+---------+
|
v
+------------------+
| FileCommand |
+--------+---------+
|
v
+------------------+
| TabularStore |
| |
| data.load. |
| products.data | <-- DataFrame
| orders.data | <-- DataFrame
+--------+---------+
|
+----+----+
| |
v v
+-------+ +------------+
| SQL | | Aggregate |
+---+---+ +-----+------+
| |
v v
+------------------+
| TabularStore |
| (updated) |
+--------+---------+
|
v
+------------------+
| ResultStore |
+--------+---------+
|
v
+------------------+
| Output Files |
| - .csv |
| - .json |
| - .parquet |
+------------------+
Working with Polars Directly
When building custom commands, you can use Polars DataFrame operations:
#![allow(unused)] fn main() { use polars::prelude::*; // Create a DataFrame let df = df!( "name" => &["Alice", "Bob", "Charlie"], "age" => &[30, 25, 35], "city" => &["NYC", "LA", "Chicago"] )?; // Filter rows let filtered = df.clone().lazy() .filter(col("age").gt(lit(28))) .collect()?; // Select columns let selected = df.clone().lazy() .select([col("name"), col("city")]) .collect()?; // Group and aggregate let grouped = df.clone().lazy() .group_by([col("city")]) .agg([ col("age").mean().alias("avg_age"), col("name").count().alias("count"), ]) .collect()?; // Store in TabularStore ctx.tabular().insert(&source.with_segment("result"), filtered).await?; }
Complete Example
use panopticon_core::prelude::*; use std::path::PathBuf; #[tokio::main] async fn main() -> anyhow::Result<()> { let output_dir = tempfile::tempdir()?; let mut pipeline = Pipeline::new(); // --- Load product data --- let file_attrs = ObjectBuilder::new() .insert("files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "products") .insert("file", "/path/to/products.csv") .insert("format", "csv") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; // --- Aggregate: compute statistics --- let agg_attrs = ObjectBuilder::new() .insert("source", "data.load.products.data") .insert("aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "row_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "total_price") .insert("column", "price") .insert("op", "sum") .build_scalar(), ])) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("products", &agg_attrs) .await?; // --- Execute with custom output path --- let completed = pipeline.compile().await?.execute().await?; let settings = ResultSettings::new() .with_output_path(output_dir.path().to_path_buf()) .with_format(TabularFormat::Json); let results = completed.results(settings).await?; // --- Access results --- println!("=== Result Store ({} command(s)) ===\n", results.len()); for cmd_results in results.iter() { println!("Source: {}", cmd_results.source().to_dotted()); for (path, value) in cmd_results.data_iter() { match value.as_scalar() { Some((ty, val)) => { println!(" {} = {} ({:?})", path.to_dotted(), val, ty); } None => { let (file_path, fmt, rows, cols) = value.as_tabular().expect("Expected tabular"); println!( " {} => {} ({} rows x {} cols)", path.to_dotted(), file_path.display(), rows, cols ); } } } } Ok(()) }
Reference Links
Next Steps
Explore Pipeline Patterns to learn about iteration, conditional execution, and advanced pipeline techniques.
Pipeline Patterns
This section covers common patterns and recipes for building effective Panopticon pipelines. Each pattern addresses a specific problem with a concrete solution you can adapt to your use case.
Pattern Overview
| Pattern | Problem | Solution |
|---|---|---|
| Iteration | Process each item in a collection | Use iterative namespaces with iter_var and index_var |
| Conditional Execution | Skip commands based on runtime conditions | Use the when attribute with Tera expressions |
| Pipeline Editing | Add stages to an already-executed pipeline | Use .edit() to return to Draft state |
| Result Access | Retrieve and export pipeline outputs | Configure ResultSettings and iterate ResultStore |
When to Use These Patterns
Iteration
Use iterative namespaces when you need to:
- Process each key in a configuration object
- Apply the same operation to every item in an array
- Loop over unique values in a DataFrame column
- Split a string and handle each segment
Example scenario: You have a configuration object with region keys (us-east, us-west, eu-west) and need to run a classification command for each region.
Conditional Execution
Use the when attribute when you need to:
- Feature-flag parts of your pipeline
- Skip expensive operations when their inputs are empty
- Create debug-only or production-only commands
- Short-circuit processing based on earlier results
Example scenario: You have a feature flag in your configuration, and certain commands should only execute when that flag is enabled.
Pipeline Editing
Use .edit() when you need to:
- Add follow-up analysis after initial exploration
- Build pipelines incrementally based on intermediate results
- Implement REPL-style workflows
- Reuse expensive data loading across multiple analyses
Example scenario: You loaded a large dataset and ran an initial query. Now you want to add aggregation commands without re-loading the data.
Result Access
Configure ResultSettings when you need to:
- Export tabular results to a specific directory
- Choose output format (CSV, JSON, Parquet)
- Exclude certain commands from result collection
- Iterate over all results programmatically
Example scenario: After pipeline execution, you want to write all DataFrames to a temporary directory as Parquet files and print a summary of all computed metrics.
Combining Patterns
These patterns compose naturally. A real-world pipeline might:
- Load data into a static namespace for configuration
- Use an iterative namespace to process each configuration key
- Apply
whenconditions to skip processing for disabled regions - Call
.edit()to add aggregation after reviewing initial results - Export all results with custom
ResultSettings
#![allow(unused)] fn main() { // Pseudocode showing pattern composition let mut pipeline = Pipeline::new(); // Static config (Pattern: static namespaces) pipeline.add_namespace(NamespaceBuilder::new("config").static_ns() .insert("regions", regions_object) .insert("debug_mode", ScalarValue::Bool(false)) ).await?; // Iterative processing (Pattern: iteration) let mut handle = pipeline.add_namespace( NamespaceBuilder::new("process") .iterative() .store_path(StorePath::from_segments(["config", "regions"])) .scalar_object_keys(None, false) .iter_var("region") ).await?; // Conditional command (Pattern: when) handle.add_command::<ExpensiveCommand>("analyze", &ObjectBuilder::new() .insert("when", "config.debug_mode == false") // Skip in debug mode .build_hashmap() ).await?; // Execute first pass let completed = pipeline.compile().await?.execute().await?; // Add more analysis (Pattern: pipeline editing) let mut pipeline = completed.edit(); pipeline.add_namespace(NamespaceBuilder::new("summary")).await? .add_command::<AggregateCommand>("totals", &agg_attrs).await?; // Re-execute and collect results (Pattern: result access) let completed = pipeline.compile().await?.execute().await?; let results = completed.results( ResultSettings::new() .with_output_path(output_dir) .with_format(TabularFormat::Parquet) ).await?; }
The following pages dive into each pattern with complete, runnable examples.
Iteration
Problem: You have a collection of items (object keys, array elements, or DataFrame column values) and need to run the same commands for each item.
Solution: Create an iterative namespace that loops over the collection, exposing each item via iter_var and its position via index_var.
How Iterative Namespaces Work
When you create an iterative namespace, Panopticon:
- Resolves the source collection from the data store
- Executes all commands in the namespace once per item
- Stores results with an index suffix (e.g.,
classify.region[0],classify.region[1]) - Exposes the current item and index as template variables
Iterator Types
Panopticon supports four iterator types:
| Type | Source | Iterates Over |
|---|---|---|
scalar_object_keys | JSON object | Keys of the object |
scalar_array | JSON array | Elements of the array |
string_split | String value | Segments split by delimiter |
tabular_column | DataFrame column | Unique values in the column |
Basic Pattern: Object Keys
The most common pattern is iterating over the keys of a configuration object.
use panopticon_core::prelude::*; #[tokio::main] async fn main() -> anyhow::Result<()> { let mut pipeline = Pipeline::new(); // Create a static namespace with an object to iterate over pipeline .add_namespace( NamespaceBuilder::new("config").static_ns().insert( "regions", ObjectBuilder::new() .insert("us-east", "Virginia") .insert("us-west", "Oregon") .insert("eu-west", "Ireland") .build_scalar(), ), ) .await?; // Create an iterative namespace that loops over region keys let mut handle = pipeline .add_namespace( NamespaceBuilder::new("classify") .iterative() .store_path(StorePath::from_segments(["config", "regions"])) .scalar_object_keys(None, false) // All keys, no exclusions .iter_var("region") // Current key available as {{ region }} .index_var("idx"), // Current index available as {{ idx }} ) .await?; // This command runs once per region key let attrs = ObjectBuilder::new() .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "is_us") .insert("if", "region is starting_with(\"us-\")") .insert("then", "Region {{ region }} is in the US") .build_scalar(), ObjectBuilder::new() .insert("name", "is_eu") .insert("if", "region is starting_with(\"eu-\")") .insert("then", "Region {{ region }} is in the EU") .build_scalar(), ]), ) .insert("default", "Region {{ region }} is in an unknown area") .build_hashmap(); handle .add_command::<ConditionCommand>("check", &attrs) .await?; // Execute the pipeline let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // Access results by index for idx in 0.. { let source = StorePath::from_segments(["classify", "check"]).with_index(idx); let Some(cmd_results) = results.get_by_source(&source) else { break; // No more iterations }; let result = cmd_results .data_get(&source.with_segment("result")) .and_then(|r| r.as_scalar()) .expect("Expected result"); println!("[{}] {}", idx, result.1); } Ok(()) }
Output:
[0] Region us-east is in the US
[1] Region us-west is in the US
[2] Region eu-west is in the EU
Key Concepts
iter_var and index_var
These methods define the template variable names used during iteration:
#![allow(unused)] fn main() { .iter_var("region") // {{ region }} contains the current item .index_var("idx") // {{ idx }} contains 0, 1, 2, ... }
If not specified, the defaults are:
iter_var:"item"index_var:"index"
These variables are available in:
- Tera template expressions in command attributes
- The
whencondition for conditional execution - Any attribute that supports Tera substitution
store_path
The store_path points to the collection in the scalar or tabular store:
#![allow(unused)] fn main() { .store_path(StorePath::from_segments(["config", "regions"])) }
This path must exist when the pipeline executes. If it does not, execution fails with an error.
Result Indexing
Iterative namespace results are indexed by iteration number. To access them:
#![allow(unused)] fn main() { // Build the base path let base = StorePath::from_segments(["namespace", "command"]); // Access specific iteration let iteration_0 = base.with_index(0); // namespace.command[0] let iteration_1 = base.with_index(1); // namespace.command[1] // Get results let results_0 = results.get_by_source(&iteration_0); }
Filtering Object Keys
You can filter which keys to iterate over:
#![allow(unused)] fn main() { // Only iterate over specific keys .scalar_object_keys(Some(vec!["us-east".to_string(), "eu-west".to_string()]), false) // Exclude specific keys (iterate over all except these) .scalar_object_keys(Some(vec!["us-west".to_string()]), true) // Iterate over all keys .scalar_object_keys(None, false) }
Iterating Over Arrays
To iterate over array elements instead of object keys:
#![allow(unused)] fn main() { pipeline .add_namespace( NamespaceBuilder::new("config").static_ns().insert( "items", ScalarValue::Array(vec![ ScalarValue::String("apple".to_string()), ScalarValue::String("banana".to_string()), ScalarValue::String("cherry".to_string()), ]), ), ) .await?; let mut handle = pipeline .add_namespace( NamespaceBuilder::new("process") .iterative() .store_path(StorePath::from_segments(["config", "items"])) .scalar_array(None) // All elements .iter_var("fruit"), ) .await?; }
With a range to limit iterations:
#![allow(unused)] fn main() { .scalar_array(Some((0, 2))) // Only first two elements (indices 0 and 1) }
Iterating Over String Segments
Split a string and iterate over the parts:
#![allow(unused)] fn main() { pipeline .add_namespace( NamespaceBuilder::new("config").static_ns() .insert("path", ScalarValue::String("/usr/local/bin".to_string())), ) .await?; let mut handle = pipeline .add_namespace( NamespaceBuilder::new("segments") .iterative() .store_path(StorePath::from_segments(["config", "path"])) .string_split("/") // Split on "/" .iter_var("segment"), ) .await?; }
Iterating Over DataFrame Columns
Extract unique values from a DataFrame column:
#![allow(unused)] fn main() { // Assuming data.load.users.data contains a DataFrame with a "department" column let mut handle = pipeline .add_namespace( NamespaceBuilder::new("by_dept") .iterative() .store_path(StorePath::from_segments(["data", "load", "users", "data"])) .tabular_column("department", None) // Unique values from "department" .iter_var("dept"), ) .await?; }
This iterates over the unique, non-null values in the specified column. Use a range to limit:
#![allow(unused)] fn main() { .tabular_column("department", Some((0, 5))) // First 5 unique values }
Best Practices
Use Descriptive Variable Names
Choose iter_var names that reflect what you are iterating over:
#![allow(unused)] fn main() { // Good: clear what we're iterating .iter_var("region") .iter_var("user_id") .iter_var("filename") // Avoid: generic names .iter_var("item") .iter_var("x") }
Keep Iteration Counts Reasonable
Each iteration creates separate command executions and results. For large collections, consider:
- Filtering with
scalar_object_keys(Some(keys), false) - Using ranges with
scalar_array(Some((start, end))) - Pre-filtering data with
SqlCommandbefore iteration
Access Results Systematically
When iterating over results, use a loop that terminates when get_by_source returns None:
#![allow(unused)] fn main() { let mut idx = 0; loop { let source = StorePath::from_segments(["ns", "cmd"]).with_index(idx); let Some(results) = store.get_by_source(&source) else { break; }; // Process results... idx += 1; } }
This pattern handles any number of iterations without hardcoding the count.
Conditional Execution
Problem: You want certain commands to execute only when specific conditions are met at runtime.
Solution: Use the when attribute with a Tera expression. If it evaluates to a falsy value, the command is skipped.
How the when Attribute Works
Every command in Panopticon supports an optional when attribute:
- Before executing the command, the runtime evaluates the
whenexpression - If the result is falsy (
false,null, empty string,0), the command is skipped - Skipped commands have
status = "skipped"in their metadata - Data results are absent for skipped commands
Basic Pattern: Feature Flags
The most common use case is feature-flagging parts of your pipeline.
use panopticon_core::prelude::*; async fn run_with_feature_flag(enabled: bool) -> anyhow::Result<()> { let mut pipeline = Pipeline::with_services(PipelineServices::defaults()); // Static namespace with configuration pipeline .add_namespace( NamespaceBuilder::new("inputs") .static_ns() .insert("feature_enabled", ScalarValue::Bool(enabled)) .insert("user_name", ScalarValue::String("Alice".to_string())), ) .await?; // Command with `when` guard let attrs = ObjectBuilder::new() .insert("when", "inputs.feature_enabled") // <-- The condition .insert( "branches", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "greeting") .insert("if", "true") .insert("then", "Hello, {{ inputs.user_name }}! Feature is active.") .build_scalar(), ]), ) .insert("default", "Fallback message") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("example")) .await? .add_command::<ConditionCommand>("greeting", &attrs) .await?; // Execute let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // Check the status let source = StorePath::from_segments(["example", "greeting"]); let cmd_results = results .get_by_source(&source) .expect("Expected results"); let status = cmd_results .meta_get(&source.with_segment("status")) .expect("Expected status"); println!("status = {}", status); // Data is only present when the command executed if let Some(result) = cmd_results .data_get(&source.with_segment("result")) .and_then(|r| r.as_scalar()) { println!("result = {}", result.1); } else { println!("(no data - command was skipped)"); } Ok(()) } #[tokio::main] async fn main() -> anyhow::Result<()> { println!("=== Feature flag: TRUE ==="); run_with_feature_flag(true).await?; println!("\n=== Feature flag: FALSE ==="); run_with_feature_flag(false).await?; Ok(()) }
Output:
=== Feature flag: TRUE ===
status = "completed"
result = Hello, Alice! Feature is active.
=== Feature flag: FALSE ===
status = "skipped"
(no data - command was skipped)
Tera Expression Syntax
The when value is a Tera expression that has access to the entire scalar store. Common patterns:
Boolean Checks
#![allow(unused)] fn main() { // Direct boolean value .insert("when", "config.debug_mode") // Negation .insert("when", "not config.production") // Comparison .insert("when", "config.log_level == \"debug\"") }
Numeric Comparisons
#![allow(unused)] fn main() { // Greater than .insert("when", "stats.row_count > 0") // Range check .insert("when", "inputs.threshold >= 10 and inputs.threshold <= 100") }
String Tests
#![allow(unused)] fn main() { // Equality .insert("when", "config.environment == \"production\"") // Prefix check .insert("when", "region is starting_with(\"us-\")") // Contains .insert("when", "config.tags is containing(\"important\")") }
Existence Checks
#![allow(unused)] fn main() { // Check if a value is defined .insert("when", "optional_config is defined") // Check for null .insert("when", "maybe_value is not none") }
Combined Conditions
#![allow(unused)] fn main() { // AND .insert("when", "config.enabled and stats.count > 0") // OR .insert("when", "config.mode == \"full\" or config.force") // Complex .insert("when", "(env == \"prod\" or env == \"staging\") and feature_flags.new_flow") }
Handling Skipped Commands
When a command is skipped, you need to handle its absence in downstream commands.
Check Status in Results
#![allow(unused)] fn main() { let status = cmd_results .meta_get(&source.with_segment("status")) .expect("Expected status"); match status.as_str() { Some("completed") => { // Process data results } Some("skipped") => { // Handle skipped case } _ => { // Unexpected status } } }
Use Tera Defaults in Templates
When referencing potentially-skipped command outputs in templates:
#![allow(unused)] fn main() { // Use default filter to handle missing values .insert("message", "Count: {{ stats.count | default(value=0) }}") }
Chain Conditions
If command B depends on command A's output, and A might be skipped:
#![allow(unused)] fn main() { // Command A: might be skipped let a_attrs = ObjectBuilder::new() .insert("when", "config.run_expensive_query") // ... .build_hashmap(); handle.add_command::<SqlCommand>("query_a", &a_attrs).await?; // Command B: only runs if A ran successfully let b_attrs = ObjectBuilder::new() .insert("when", "ns.query_a.status == \"completed\"") // ... .build_hashmap(); handle.add_command::<AggregateCommand>("aggregate_b", &b_attrs).await?; }
Use Cases
Debug-Only Commands
Skip verbose logging in production:
#![allow(unused)] fn main() { .insert("when", "config.debug") }
Empty Data Guards
Skip processing when there is no data:
#![allow(unused)] fn main() { .insert("when", "data.load.rows > 0") }
Environment-Specific Logic
Run different commands per environment:
#![allow(unused)] fn main() { // Production-only .insert("when", "config.env == \"production\"") // Development-only .insert("when", "config.env == \"development\"") }
Conditional Exports
Only export when there are results worth saving:
#![allow(unused)] fn main() { .insert("when", "stats.significant_findings > 0") }
Iteration Guards
Within an iterative namespace, skip certain items:
#![allow(unused)] fn main() { // Skip disabled regions .insert("when", "region is not starting_with(\"disabled-\")") // Only process items meeting criteria .insert("when", "item.status == \"active\"") }
Best Practices
Keep Conditions Simple
Complex conditions are hard to debug. Prefer:
#![allow(unused)] fn main() { // Good: simple, readable .insert("when", "config.feature_enabled") // Avoid: complex nested logic .insert("when", "((a and b) or (c and not d)) and (e or f)") }
If you need complex logic, compute a boolean in an earlier command and reference it.
Document Skip Behavior
When a command might be skipped, document what happens:
#![allow(unused)] fn main() { // This command is skipped when feature_x is disabled. // Downstream commands that reference its output use default values. let attrs = ObjectBuilder::new() .insert("when", "config.feature_x") // ... }
Test Both Paths
Always test your pipeline with conditions evaluating to both true and false to ensure proper handling.
Use Status Checks for Dependencies
Instead of duplicating conditions, check the upstream command's status:
#![allow(unused)] fn main() { // Instead of repeating the condition: // .insert("when", "config.feature_x") // Check if the dependency actually ran: .insert("when", "upstream.cmd.status == \"completed\"") }
This ensures consistency even if the original condition logic changes.
Pipeline Editing
Problem: You have executed a pipeline and want to add more commands without re-running everything from scratch.
Solution: Call .edit() on a completed pipeline to return to Draft state, add new namespaces and commands, then re-compile and execute.
How Pipeline Editing Works
The Pipeline type has a state machine with three states:
Draft --compile--> Ready --execute--> Completed
^ |
| |
+------------------edit()-------------------+
When you call .edit() on a Pipeline<Completed>:
- The pipeline returns to
Pipeline<Draft>state - All existing namespaces and commands are preserved
- All data in the stores is preserved
- You can add new namespaces and commands
- Re-compilation and execution runs only the new additions (existing results remain)
Basic Pattern: Two-Pass Pipeline
A common workflow: load data, run initial analysis, review, then add more analysis.
use panopticon_core::prelude::*; use std::path::PathBuf; fn fixtures_dir() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("fixtures") } #[tokio::main] async fn main() -> anyhow::Result<()> { // ===== Pass 1: Load data and run initial query ===== println!("=== Pass 1: Load + Query ===\n"); let mut pipeline = Pipeline::new(); // Load users from CSV let file_attrs = ObjectBuilder::new() .insert( "files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("file", fixtures_dir().join("users.csv").to_string_lossy().to_string()) .insert("format", "csv") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; // Run a SQL query let sql_attrs = ObjectBuilder::new() .insert( "tables", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "users") .insert("source", "data.load.users.data") .build_scalar(), ]), ) .insert("query", "SELECT name, age FROM users ORDER BY age DESC") .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("query")) .await? .add_command::<SqlCommand>("sorted", &sql_attrs) .await?; // Execute pass 1 let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // Inspect pass 1 results let query_source = StorePath::from_segments(["query", "sorted"]); let query_results = results.get_by_source(&query_source).expect("Expected query results"); let rows = query_results.meta_get(&query_source.with_segment("rows")).expect("Expected rows"); println!(" query.sorted: {} rows", rows); println!(" Namespaces: data, query"); // ===== Pass 2: Add aggregation to existing pipeline ===== println!("\n=== Pass 2: Edit + Aggregate ===\n"); // Return to Draft state - this is the key step! let mut pipeline = completed.edit(); // Add aggregation namespace (data from pass 1 is still available) let agg_attrs = ObjectBuilder::new() .insert("source", "data.load.users.data") .insert( "aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "user_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "avg_age") .insert("column", "age") .insert("op", "mean") .build_scalar(), ObjectBuilder::new() .insert("name", "oldest") .insert("column", "age") .insert("op", "max") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("users", &agg_attrs) .await?; // Re-compile and execute let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // Original results are still present let query_source = StorePath::from_segments(["query", "sorted"]); let query_results = results.get_by_source(&query_source).expect("query.sorted should still exist"); let rows = query_results.meta_get(&query_source.with_segment("rows")).expect("Expected rows"); println!(" query.sorted: {} rows (preserved from pass 1)", rows); // New aggregation results are available let stats_source = StorePath::from_segments(["stats", "users"]); let stats_results = results.get_by_source(&stats_source).expect("Expected stats results"); for name in ["user_count", "avg_age", "oldest"] { let value = stats_results .data_get(&stats_source.with_segment(name)) .and_then(|r| r.as_scalar()) .expect(&format!("Expected {}", name)); println!(" stats.users.{} = {}", name, value.1); } println!(" Namespaces: data, query, stats"); println!("\nPipeline successfully edited and re-executed."); Ok(()) }
Output:
=== Pass 1: Load + Query ===
query.sorted: 5 rows
Namespaces: data, query
=== Pass 2: Edit + Aggregate ===
query.sorted: 5 rows (preserved from pass 1)
stats.users.user_count = 5
stats.users.avg_age = 32.4
stats.users.oldest = 45
Namespaces: data, query, stats
Pipeline successfully edited and re-executed.
Key Points
State Preservation
When you call .edit():
- Preserved: Namespaces, commands, store data, execution results
- Reset: Pipeline state returns to Draft
This means pass 2 can reference data created in pass 1 without re-executing the original commands.
Ownership Transfer
The .edit() method consumes the Pipeline<Completed>:
#![allow(unused)] fn main() { // completed is consumed here let mut pipeline = completed.edit(); // This would fail - completed no longer exists: // let _ = completed.results(...); // ERROR: use of moved value }
Re-compilation Required
After editing, you must call .compile() before .execute():
#![allow(unused)] fn main() { let mut pipeline = completed.edit(); pipeline.add_namespace(...).await?; // Must compile before executing let completed = pipeline.compile().await?.execute().await?; }
Use Cases
Exploratory Data Analysis
Build your analysis incrementally:
#![allow(unused)] fn main() { // Step 1: Load and explore let completed = load_and_preview().await?; // ... review results ... // Step 2: Add filtering based on what you saw let mut pipeline = completed.edit(); add_filters(&mut pipeline).await?; let completed = pipeline.compile().await?.execute().await?; // ... review filtered results ... // Step 3: Add aggregation let mut pipeline = completed.edit(); add_aggregations(&mut pipeline).await?; let completed = pipeline.compile().await?.execute().await?; }
Conditional Follow-Up
Add analysis based on intermediate results:
#![allow(unused)] fn main() { let completed = pipeline.compile().await?.execute().await?; let results = completed.results(ResultSettings::default()).await?; // Check if we need additional analysis let row_count = get_row_count(&results); if row_count > 1000 { let mut pipeline = completed.edit(); add_sampling_namespace(&mut pipeline).await?; let completed = pipeline.compile().await?.execute().await?; } }
REPL-Style Workflows
In an interactive environment, let users add commands incrementally:
#![allow(unused)] fn main() { let mut completed = initial_pipeline.compile().await?.execute().await?; loop { // Show current results display_results(&completed).await?; // Get user input let command = get_user_command()?; if command == "quit" { break; } // Add the new command let mut pipeline = completed.edit(); add_user_command(&mut pipeline, &command).await?; completed = pipeline.compile().await?.execute().await?; } }
Expensive Data Reuse
Load expensive data once, analyze multiple ways:
#![allow(unused)] fn main() { // Load large dataset (expensive) let completed = load_big_dataset().await?; // Analysis 1 let mut p1 = completed.edit(); add_analysis_1(&mut p1).await?; let c1 = p1.compile().await?.execute().await?; save_results(&c1, "analysis_1").await?; // Analysis 2 (starts fresh from after load, not from c1) let mut p2 = completed.edit(); add_analysis_2(&mut p2).await?; let c2 = p2.compile().await?.execute().await?; save_results(&c2, "analysis_2").await?; }
Note: In this pattern, each .edit() call forks from the same point. Changes in p1 do not affect p2.
Best Practices
Name Namespaces for Phases
Use namespace names that indicate which pass they belong to:
#![allow(unused)] fn main() { // Pass 1 .add_namespace(NamespaceBuilder::new("load")) .add_namespace(NamespaceBuilder::new("initial_query")) // Pass 2 .add_namespace(NamespaceBuilder::new("refined_query")) .add_namespace(NamespaceBuilder::new("aggregations")) }
Document the Multi-Pass Structure
When your pipeline has multiple passes, document the intended flow:
#![allow(unused)] fn main() { // Pipeline structure: // Pass 1: Load data, run initial validation // Pass 2: Apply transformations based on validation results // Pass 3: Generate final reports }
Avoid Namespace Name Conflicts
Each namespace must have a unique name. Adding a namespace with a duplicate name will fail:
#![allow(unused)] fn main() { // Pass 1 pipeline.add_namespace(NamespaceBuilder::new("data")).await?; // Pass 2 - this fails! let mut pipeline = completed.edit(); pipeline.add_namespace(NamespaceBuilder::new("data")).await?; // ERROR: duplicate name }
Consider Memory Usage
All store data is preserved across edits. For very large datasets, this can increase memory usage. If you need to release memory:
#![allow(unused)] fn main() { // Create a fresh pipeline instead of editing let mut new_pipeline = Pipeline::new(); // Copy only the data you need }
Ready State Editing
You can also call .edit() on a Pipeline<Ready> (after compile, before execute):
#![allow(unused)] fn main() { let ready = pipeline.compile().await?; // Oops, forgot something let mut pipeline = ready.edit(); pipeline.add_namespace(...).await?; let ready = pipeline.compile().await?; let completed = ready.execute().await?; }
This is useful when you realize you need to add something after compilation but before execution.
Result Access
Problem: After pipeline execution, you need to retrieve results, export tabular data, and iterate over outputs programmatically.
Solution: Use ResultSettings to configure output behavior and ResultStore to access and iterate over all command results.
How Result Access Works
After calling .execute(), you have a Pipeline<Completed>. To access results:
- Create
ResultSettingsto configure output path and format - Call
.results(settings)to get aResultStore - Use
ResultStoremethods to access individual command results - Each
CommandResultscontains metadata and data, accessible via iterators or direct lookup
Basic Pattern: Configure and Collect
use panopticon_core::prelude::*; use std::path::PathBuf; fn fixtures_dir() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("fixtures") } #[tokio::main] async fn main() -> anyhow::Result<()> { let output_dir = tempfile::tempdir()?; let mut pipeline = Pipeline::new(); // Load product data let file_attrs = ObjectBuilder::new() .insert( "files", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "products") .insert("file", fixtures_dir().join("products.csv").to_string_lossy().to_string()) .insert("format", "csv") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("data")) .await? .add_command::<FileCommand>("load", &file_attrs) .await?; // Compute aggregations let agg_attrs = ObjectBuilder::new() .insert("source", "data.load.products.data") .insert( "aggregations", ScalarValue::Array(vec![ ObjectBuilder::new() .insert("name", "row_count") .insert("op", "count") .build_scalar(), ObjectBuilder::new() .insert("name", "total_price") .insert("column", "price") .insert("op", "sum") .build_scalar(), ObjectBuilder::new() .insert("name", "avg_price") .insert("column", "price") .insert("op", "mean") .build_scalar(), ]), ) .build_hashmap(); pipeline .add_namespace(NamespaceBuilder::new("stats")) .await? .add_command::<AggregateCommand>("products", &agg_attrs) .await?; // Execute pipeline let completed = pipeline.compile().await?.execute().await?; // Configure result settings let settings = ResultSettings::new() .with_output_path(output_dir.path().to_path_buf()) .with_format(TabularFormat::Json); // Collect results let results = completed.results(settings).await?; // Iterate over all results println!("=== Result Store ({} command(s)) ===\n", results.len()); for cmd_results in results.iter() { println!("Source: {}", cmd_results.source().to_dotted()); // Print metadata for (path, value) in cmd_results.meta_iter() { println!(" [meta] {} = {}", path.to_dotted(), value); } // Print data for (path, value) in cmd_results.data_iter() { match value.as_scalar() { Some((ty, val)) => { println!(" [data] {} = {} ({:?})", path.to_dotted(), val, ty); } None => { let (file_path, fmt, rows, cols) = value.as_tabular().expect("Expected tabular result"); println!( " [data] {} => {} ({} rows x {} cols)", path.to_dotted(), file_path.display(), rows, cols ); } } } println!(); } Ok(()) }
Output:
=== Result Store (2 command(s)) ===
Source: data.load
[meta] data.load.products.rows = 10
[meta] data.load.products.columns = ["name", "price", "quantity"]
[data] data.load.products.data => /tmp/xxx/data_load_products.json (10 rows x 3 cols)
Source: stats.products
[meta] stats.products.status = "completed"
[data] stats.products.row_count = 10 (Int)
[data] stats.products.total_price = 1250.50 (Float)
[data] stats.products.avg_price = 125.05 (Float)
ResultSettings
ResultSettings configures how results are collected and exported.
Creating Settings
#![allow(unused)] fn main() { // Default settings let settings = ResultSettings::default(); // Or use the builder let settings = ResultSettings::new(); }
Output Path
Specify where tabular data files are written:
#![allow(unused)] fn main() { let settings = ResultSettings::new() .with_output_path(PathBuf::from("/path/to/output")); }
Default: ./panopticon_results in the current working directory.
Output Format
Choose the format for tabular data exports:
#![allow(unused)] fn main() { // JSON (default) let settings = ResultSettings::new() .with_format(TabularFormat::Json); // CSV let settings = ResultSettings::new() .with_format(TabularFormat::Csv); // Parquet (efficient binary format) let settings = ResultSettings::new() .with_format(TabularFormat::Parquet); }
Excluded Commands
Skip specific commands when collecting results:
#![allow(unused)] fn main() { let settings = ResultSettings::new() .with_excluded_commands(vec![ StorePath::from_segments(["debug", "verbose_log"]), StorePath::from_segments(["temp", "intermediate"]), ]); }
Excluded commands do not appear in the ResultStore and their tabular data is not exported.
ResultStore
The ResultStore contains all command results from the pipeline execution.
Basic Access
#![allow(unused)] fn main() { let results = completed.results(settings).await?; // Number of commands with results let count = results.len(); // Check if empty if results.is_empty() { println!("No results"); } }
Lookup by Source
Access a specific command's results by its store path:
#![allow(unused)] fn main() { let source = StorePath::from_segments(["namespace", "command"]); if let Some(cmd_results) = results.get_by_source(&source) { // Process this command's results } }
Iteration
Iterate over all command results:
#![allow(unused)] fn main() { for cmd_results in results.iter() { println!("Command: {}", cmd_results.source().to_dotted()); } }
CommandResults
Each CommandResults contains the outputs from a single command.
Source Path
The path identifying this command:
#![allow(unused)] fn main() { let source: &StorePath = cmd_results.source(); println!("Command at: {}", source.to_dotted()); }
Metadata Access
Metadata includes execution information like row counts, column names, and status:
#![allow(unused)] fn main() { // Direct lookup let source = StorePath::from_segments(["data", "load"]); let rows = cmd_results .meta_get(&source.with_segment("products").with_segment("rows")) .expect("Expected rows"); // Iterate all metadata for (path, value) in cmd_results.meta_iter() { println!("{} = {}", path.to_dotted(), value); } // Get all metadata keys for key in cmd_results.meta_keys() { println!("Meta key: {}", key.to_dotted()); } }
Data Access
Data includes the actual outputs (scalar values or tabular data references):
#![allow(unused)] fn main() { // Direct lookup let source = StorePath::from_segments(["stats", "products"]); let avg = cmd_results .data_get(&source.with_segment("avg_price")) .and_then(|r| r.as_scalar()); // Iterate all data for (path, value) in cmd_results.data_iter() { match value.as_scalar() { Some((ty, val)) => println!("Scalar: {} = {}", path.to_dotted(), val), None => { let (file, fmt, rows, cols) = value.as_tabular().unwrap(); println!("Tabular: {} -> {}", path.to_dotted(), file.display()); } } } // Get all data keys for key in cmd_results.data_keys() { println!("Data key: {}", key.to_dotted()); } }
ResultValue
Each data value is either scalar or tabular.
Scalar Values
#![allow(unused)] fn main() { if let Some((ty, value)) = result_value.as_scalar() { // ty: &ScalarType (Int, Float, String, Bool, etc.) // value: &ScalarValue println!("Type: {:?}, Value: {}", ty, value); } // Type check if result_value.is_scalar() { // ... } }
Tabular Values
#![allow(unused)] fn main() { if let Some((path, format, rows, cols)) = result_value.as_tabular() { // path: &PathBuf - file location on disk // format: &TabularFormat - Csv, Json, or Parquet // rows: usize - row count // cols: usize - column count println!("File: {}, Format: {}, Shape: {}x{}", path.display(), format, rows, cols); } // Type check if result_value.is_tabular() { // ... } }
Patterns for Common Tasks
Print Summary Report
#![allow(unused)] fn main() { println!("Pipeline Results Summary"); println!("========================"); for cmd_results in results.iter() { let source = cmd_results.source(); print!("{}: ", source.to_dotted()); // Check status if let Some(status) = cmd_results.meta_get(&source.with_segment("status")) { if status.as_str() == Some("skipped") { println!("SKIPPED"); continue; } } // Count outputs let scalar_count = cmd_results.data_iter() .filter(|(_, v)| v.is_scalar()) .count(); let tabular_count = cmd_results.data_iter() .filter(|(_, v)| v.is_tabular()) .count(); println!("{} scalars, {} tables", scalar_count, tabular_count); } }
Export All Tables to Directory
#![allow(unused)] fn main() { let settings = ResultSettings::new() .with_output_path(export_dir.to_path_buf()) .with_format(TabularFormat::Parquet); let results = completed.results(settings).await?; // List exported files println!("Exported files:"); for entry in std::fs::read_dir(&export_dir)? { let entry = entry?; let meta = entry.metadata()?; println!(" {} ({} bytes)", entry.file_name().to_string_lossy(), meta.len()); } }
Collect Scalar Metrics
#![allow(unused)] fn main() { let mut metrics: HashMap<String, f64> = HashMap::new(); for cmd_results in results.iter() { for (path, value) in cmd_results.data_iter() { if let Some((ScalarType::Float, scalar)) = value.as_scalar() { if let Some(f) = scalar.as_f64() { metrics.insert(path.to_dotted(), f); } } } } for (name, value) in &metrics { println!("{}: {:.2}", name, value); } }
Handle Iterative Results
For iterative namespaces, results are indexed:
#![allow(unused)] fn main() { let mut iteration = 0; loop { let source = StorePath::from_segments(["iterative_ns", "command"]) .with_index(iteration); let Some(cmd_results) = results.get_by_source(&source) else { break; // No more iterations }; println!("Iteration {}: {:?}", iteration, cmd_results.source().to_dotted()); // Process this iteration's results... iteration += 1; } println!("Total iterations: {}", iteration); }
Best Practices
Choose the Right Format
- JSON: Human-readable, good for debugging and small datasets
- CSV: Widely compatible, good for sharing with other tools
- Parquet: Efficient storage, good for large datasets and further processing
Clean Up Output Directories
#![allow(unused)] fn main() { // Use tempdir for automatic cleanup let output_dir = tempfile::tempdir()?; let settings = ResultSettings::new() .with_output_path(output_dir.path().to_path_buf()); // output_dir is deleted when it goes out of scope }
Handle Missing Results Gracefully
#![allow(unused)] fn main() { let source = StorePath::from_segments(["maybe", "exists"]); match results.get_by_source(&source) { Some(cmd_results) => { // Process results } None => { println!("Command {} not in results (possibly excluded or skipped)", source.to_dotted()); } } }
Use Type-Safe Path Construction
Build paths systematically to avoid typos:
#![allow(unused)] fn main() { // Define base paths once let stats_base = StorePath::from_segments(["stats", "products"]); // Build specific paths from the base let row_count = stats_base.with_segment("row_count"); let avg_price = stats_base.with_segment("avg_price"); let total = stats_base.with_segment("total_price"); }
Services & IO
Panopticon pipelines can interact with the outside world through services. The PipelineServices struct bundles two categories of functionality:
- PipelineIO - Send notifications and prompt for user input
- EventHooks - React to pipeline lifecycle events (compilation, execution, completion)
Services are optional. A pipeline without services runs silently, which is often what you want for batch processing or automated workflows. When you need feedback or interactivity, services provide a clean abstraction.
Attaching Services to a Pipeline
Use Pipeline::with_services() to attach a PipelineServices instance:
#![allow(unused)] fn main() { use panopticon_core::prelude::*; let services = PipelineServices::defaults(); let mut pipeline = Pipeline::with_services(services); }
Using PipelineIO
The PipelineIO trait provides two methods for interacting with users:
notify(message)- Display a message (fire-and-forget)prompt(message)- Display a message and wait for a response
Commands access these through the ExecutionContext:
#![allow(unused)] fn main() { // Inside a command's execute method ctx.services().notify("Processing started...").await?; if let Some(answer) = ctx.services().prompt("Continue? (y/n)").await? { if answer.to_lowercase() == "y" { // proceed } } }
Multiple IO services can be registered. When you call notify, all registered services receive the message. When you call prompt, services are tried in order until one returns a response.
Event Hooks
Event hooks let you observe pipeline lifecycle events without modifying command logic. Hooks fire at key moments:
- Draft phase: After namespaces/commands are added, before/after compilation
- Ready phase: Before/after pipeline, namespace, and command execution
- Completed phase: When results are being collected
This is useful for logging, metrics, progress reporting, or custom debugging.
Default Services
PipelineServices::defaults() behaves differently based on build mode:
| Build Mode | IO Service | Event Hooks |
|---|---|---|
Debug (cfg(debug_assertions)) | StdoutInteraction | DebugEventHooks |
| Release | None | None |
In debug builds, you get console output and lifecycle logging out of the box. In release builds, services start empty for maximum control.
To start with no services regardless of build mode:
#![allow(unused)] fn main() { let services = PipelineServices::new(); }
Implementing Custom Services
This guide covers using services. For implementing your own PipelineIO or EventHooks:
See the Extend guide: Implementing Services
The Extend guide covers:
- Implementing the
PipelineIOtrait for custom notification channels - Implementing
EventHooksfor lifecycle callbacks - Available hook events and their payloads
- Registering multiple services with
add_io()andadd_hook()