Introduction

Getting Started

This guide will walk us through installing Panopticon and running our first data pipeline. By the end, we will understand the basic pattern for building pipelines and be ready to explore more advanced features.

Installation

Add panopticon-core to your project's Cargo.toml:

[dependencies]
panopticon-core = "0.2"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
anyhow = "1"

Panopticon uses Tokio for async runtime and anyhow for error handling. These are required dependencies for running pipelines.

Hello Pipeline

Let us build a minimal pipeline that loads a CSV file and prints some basic information about it. This demonstrates the core pattern we will use throughout Panopticon.

use panopticon_core::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 1. Create a new pipeline
    let mut pipeline = Pipeline::new();

    // 2. Define file loading attributes
    let file_attrs = ObjectBuilder::new()
        .insert(
            "files",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "products")
                    .insert("file", "/path/to/products.csv")
                    .insert("format", "csv")
                    .build_scalar(),
            ]),
        )
        .build_hashmap();

    // 3. Add a namespace and command to the pipeline
    pipeline
        .add_namespace(NamespaceBuilder::new("data"))
        .await?
        .add_command::<FileCommand>("load", &file_attrs)
        .await?;

    // 4. Compile and execute
    let completed = pipeline.compile().await?.execute().await?;

    // 5. Access results
    let results = completed.results(ResultSettings::default()).await?;

    println!("Pipeline completed with {} command(s)", results.len());

    Ok(())
}

Let us break down what is happening:

  1. Pipeline::new() creates a draft pipeline ready to accept namespaces and commands.

  2. ObjectBuilder constructs the attribute map that configures our command. Here we define a file to load with its name, path, and format.

  3. add_namespace() creates a logical grouping, and add_command() adds a command to that namespace. The command is identified by namespace.command (e.g., data.load).

  4. compile() validates the pipeline and resolves dependencies. execute() runs all commands in the correct order.

  5. results() returns a ResultStore containing all command outputs, which we can query by path.

Running Examples

The Panopticon repository includes several examples demonstrating different features. To run them:

# Clone the repository
git clone https://github.com/dolly-parseton/panopticon.git
cd panopticon

# Run the multi-format loading example
cargo run --example multi_format_load

# Run the aggregation example
cargo run --example aggregate_and_export

# Run the conditional execution example
cargo run --example when_conditional

Each example is self-contained and includes comments explaining what it demonstrates.

Available Examples

ExampleDescription
multi_format_loadLoading CSV, JSON, and Parquet files
aggregate_and_exportAggregation operations and result export
when_conditionalConditional command execution
template_inheritanceTera template inheritance patterns
iterate_object_keysIterating over dynamic data
pipeline_reuseReusing pipeline definitions
custom_commandBuilding your own commands
command_spec_safetyCommand specification validation

Project Structure

A typical Panopticon project follows this structure:

my-pipeline/
├── Cargo.toml
├── src/
│   └── main.rs          # Pipeline definition and execution
├── templates/           # Tera templates (if using TemplateCommand)
│   └── report.html
└── data/                # Input data files
    ├── input.csv
    └── config.json

For larger projects, we recommend organizing pipelines into modules:

my-pipeline/
├── Cargo.toml
├── src/
│   ├── main.rs
│   ├── pipelines/
│   │   ├── mod.rs
│   │   ├── etl.rs       # ETL pipeline
│   │   └── reports.rs   # Reporting pipeline
│   └── commands/        # Custom commands (if extending)
│       └── mod.rs
└── ...

Next Steps

Now that we have a basic pipeline running, we are ready to explore the core concepts:

If you want to build custom commands for your specific use case, see the Extending Panopticon guide.

Core Concepts

Panopticon is built around a small set of interlocking concepts that, once understood, make the library predictable and composable. This section walks through the mental model we use when designing pipelines.

The Big Picture

A Panopticon pipeline is a declarative data processing workflow. Rather than writing imperative code that fetches data, transforms it, and stores results, we describe what we want to happen and let the library figure out the execution details.

┌─────────────────────────────────────────────────────────────────┐
│                         Pipeline                                │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                      Namespaces                           │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │  │
│  │  │   "data"    │  │   "query"   │  │   "stats"   │        │  │
│  │  │   (Once)    │  │   (Once)    │  │ (Iterative) │        │  │
│  │  │  ┌───────┐  │  │  ┌───────┐  │  │  ┌───────┐  │        │  │
│  │  │  │Command│  │  │  │Command│  │  │  │Command│  │        │  │
│  │  │  └───────┘  │  │  └───────┘  │  │  └───────┘  │        │  │
│  │  └─────────────┘  └─────────────┘  └─────────────┘        │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                  │
│                              ▼                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                     Data Stores                           │  │
│  │     ScalarStore (JSON-like values)                        │  │
│  │     TabularStore (Polars DataFrames)                      │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

Key Concepts

Pipeline State Machine

Pipelines progress through three states: Draft, Ready, and Completed. This state machine prevents common errors like modifying a running pipeline or executing an incomplete one. We use Rust's type system to enforce these transitions at compile time.

Namespaces

Namespaces group related commands and control how they execute. The three namespace types - Once, Iterative, and Static - let us express single operations, loops over data, and constant configuration respectively.

Commands and Attributes

Commands are the units of work in a pipeline. Each command has a type (like FileCommand or SqlCommand), a name, and attributes that configure its behavior. We configure commands using the ObjectBuilder pattern for type-safe attribute construction.

Data Stores

All data flows through two stores: the ScalarStore for JSON-like values (strings, numbers, objects, arrays) and the TabularStore for Polars DataFrames. Commands read from and write to these stores using StorePaths - dot-separated addresses like data.load.users.data.

How They Fit Together

Here is the typical flow of a Panopticon pipeline:

  1. Build - We create a Pipeline<Draft>, add namespaces, and configure commands with attributes
  2. Compile - We call .compile() to validate the pipeline and transition to Pipeline<Ready>
  3. Execute - We call .execute() to run all commands, transitioning to Pipeline<Completed>
  4. Collect - We call .results() to gather outputs from the data stores
  5. Iterate (optional) - We call .edit() to return to Draft state and add more commands

This lifecycle ensures that pipelines are valid before execution and that results are only accessible after completion. The following sections dive deeper into each concept.

Pipeline State Machine

The Pipeline type in Panopticon uses a compile-time state machine to enforce correct usage. This pattern prevents entire categories of bugs - we cannot accidentally execute an incomplete pipeline, modify one that is running, or access results that do not yet exist.

The Three States

┌─────────┐    compile()    ┌─────────┐    execute()    ┌───────────┐
│  Draft  │ ──────────────▶ │  Ready  │ ──────────────▶ │ Completed │
└─────────┘                 └─────────┘                 └───────────┘
     ▲                           │                           │
     │         edit()            │          edit()           │
     └───────────────────────────┴───────────────────────────┘

Draft

A Pipeline<Draft> is under construction. In this state we can:

  • Add namespaces with add_namespace()
  • Add commands to namespaces via the returned NamespaceHandle
  • Configure services and options

We cannot execute a Draft pipeline. The .execute() method simply does not exist on this type.

Ready

A Pipeline<Ready> has passed validation and is prepared for execution. The transition from Draft to Ready happens via .compile(), which performs several checks:

  • Namespace names are unique and not reserved
  • Command names are unique within their namespace
  • Iterative namespaces have valid store paths
  • Command attributes pass schema validation
  • The execution plan is valid (no circular dependencies)

From Ready, we can either:

  • Call .execute() to run the pipeline
  • Call .edit() to return to Draft state for modifications

Completed

A Pipeline<Completed> has finished executing all commands. The execution context containing all results is stored in the Completed state. From here we can:

  • Call .results() to collect outputs into a ResultStore
  • Call .restart() to return to Ready state and re-execute
  • Call .edit() to return to Draft state and add more commands

Why a State Machine?

This design is intentional. Consider what could go wrong without it:

Without state machine:

#![allow(unused)]
fn main() {
// Hypothetical bad API - don't do this
let pipeline = Pipeline::new();
pipeline.add_command(...);
let results = pipeline.execute();  // What if add_command() failed?
pipeline.add_command(...);         // Modifying during execution?
let more_results = pipeline.results();  // Which execution?
}

With state machine:

#![allow(unused)]
fn main() {
// The actual API - compile-time guarantees
let mut pipeline = Pipeline::new();           // Draft
pipeline.add_namespace(...).await?;           // Still Draft

let ready = pipeline.compile().await?;        // Ready - validated!
let completed = ready.execute().await?;       // Completed - all commands ran

let results = completed.results(...).await?;  // Results available
let pipeline = completed.edit();              // Back to Draft
}

The type system prevents us from calling methods that do not make sense in the current state. If we try to call .execute() on a Draft pipeline, we get a compile error - not a runtime panic.

Practical Example: Pipeline Reuse

One powerful pattern enabled by the state machine is incremental pipeline building. We can execute a pipeline, inspect results, then add more processing steps:

use panopticon_core::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // ===== Pass 1: Load data and query =====
    println!("=== Pass 1: Load + Query ===\n");

    let mut pipeline = Pipeline::new();

    // Load users
    let file_attrs = ObjectBuilder::new()
        .insert(
            "files",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "users")
                    .insert("file", "fixtures/users.csv")
                    .insert("format", "csv")
                    .build_scalar(),
            ]),
        )
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("data"))
        .await?
        .add_command::<FileCommand>("load", &file_attrs)
        .await?;

    // Query: all users sorted by age
    let sql_attrs = ObjectBuilder::new()
        .insert(
            "tables",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "users")
                    .insert("source", "data.load.users.data")
                    .build_scalar(),
            ]),
        )
        .insert("query", "SELECT name, age FROM users ORDER BY age DESC")
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("query"))
        .await?
        .add_command::<SqlCommand>("sorted", &sql_attrs)
        .await?;

    // Execute pass 1: Draft -> Ready -> Completed
    let completed = pipeline.compile().await?.execute().await?;
    let results = completed.results(ResultSettings::default()).await?;
    println!("  Namespaces in pass 1: data, query");

    // ===== Pass 2: Edit pipeline, add aggregation, re-execute =====
    println!("\n=== Pass 2: Edit + Aggregate ===\n");

    // Return to Draft state - all previous namespaces and commands preserved
    let mut pipeline = completed.edit();

    // Add an aggregation namespace to the existing pipeline
    let agg_attrs = ObjectBuilder::new()
        .insert("source", "data.load.users.data")
        .insert(
            "aggregations",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "user_count")
                    .insert("op", "count")
                    .build_scalar(),
                ObjectBuilder::new()
                    .insert("name", "avg_age")
                    .insert("column", "age")
                    .insert("op", "mean")
                    .build_scalar(),
            ]),
        )
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("stats"))
        .await?
        .add_command::<AggregateCommand>("users", &agg_attrs)
        .await?;

    // Re-compile and execute
    let completed = pipeline.compile().await?.execute().await?;
    println!("  Namespaces in pass 2: data, query, stats");
    println!("\nPipeline successfully edited and re-executed.");

    Ok(())
}

The key insight is that calling .edit() on a Completed pipeline returns us to Draft while preserving all existing namespaces and commands. We can then add more processing steps and re-execute.

State Transitions Summary

FromToMethodWhat Happens
DraftReady.compile()Validates pipeline configuration
ReadyCompleted.execute()Runs all commands
ReadyDraft.edit()Returns to editing mode
CompletedDraft.edit()Returns to editing mode
CompletedReady.restart()Clears results, ready to re-execute

Implementation Details

For those curious about the implementation, Panopticon uses Rust's type system to encode states:

#![allow(unused)]
fn main() {
// Marker types for states
pub struct Draft;
pub struct Ready;
pub struct Completed {
    context: ExecutionContext,
}

// Generic pipeline parameterized by state
pub struct Pipeline<T = Draft> {
    pub(crate) services: PipelineServices,
    pub(crate) namespaces: Vec<Namespace>,
    pub(crate) commands: Vec<CommandSpec>,
    state: T,
}
}

Each state has its own impl Pipeline<State> block defining only the methods valid for that state. The Completed state holds the ExecutionContext containing all results, which is why .results() is only available on Pipeline<Completed>.

This pattern is sometimes called the "typestate" pattern in Rust. It moves invariant checking from runtime to compile time, resulting in APIs that are impossible to misuse.

Namespaces

Namespaces are the organizational unit in Panopticon. Every command belongs to exactly one namespace, and the namespace type determines how those commands execute.

The Three Namespace Types

┌─────────────────────────────────────────────────────────────────┐
│                        Namespace Types                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   Once        Execute commands once, in order                   │
│   ─────────────────────────────────────────────                 │
│   [cmd1] → [cmd2] → [cmd3]                                      │
│                                                                 │
│   Iterative   Execute commands once per item in a collection    │
│   ─────────────────────────────────────────────                 │
│   for item in collection:                                       │
│       [cmd1] → [cmd2] → [cmd3]                                  │
│                                                                 │
│   Static      No commands, just provides constant values        │
│   ─────────────────────────────────────────────                 │
│   { key1: value1, key2: value2 }                                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Once Namespaces

The Once namespace is the default. Commands in a Once namespace execute exactly once, in the order they were added.

#![allow(unused)]
fn main() {
// Create a Once namespace (the default)
pipeline
    .add_namespace(NamespaceBuilder::new("data"))
    .await?
    .add_command::<FileCommand>("load", &file_attrs)
    .await?;
}

This is the most common namespace type. Use it for:

  • Loading data from files or APIs
  • Running SQL queries
  • Performing one-time transformations

Iterative Namespaces

Iterative namespaces execute their commands once for each item in a collection. The collection can come from:

  • An array in the scalar store
  • Object keys from a JSON object
  • A column in a DataFrame
  • A string split by a delimiter
#![allow(unused)]
fn main() {
// Create an Iterative namespace that loops over object keys
let mut handle = pipeline
    .add_namespace(
        NamespaceBuilder::new("classify")
            .iterative()
            .store_path(StorePath::from_segments(["config", "regions"]))
            .scalar_object_keys(None, false)
            .iter_var("region")
            .index_var("idx"),
    )
    .await?;

handle
    .add_command::<ConditionCommand>("check", &condition_attrs)
    .await?;
}

During execution, Panopticon:

  1. Resolves the collection from the store path
  2. For each item, sets the iteration variables (region and idx in this example)
  3. Executes all commands in the namespace
  4. Cleans up the iteration variables

Results from iterative commands are indexed. Instead of storing at classify.check.result, we store at classify.check[0].result, classify.check[1].result, etc.

Static Namespaces

Static namespaces contain no commands - they exist purely to provide constant values to the data stores. Think of them as configuration namespaces.

#![allow(unused)]
fn main() {
// Create a Static namespace with configuration values
pipeline
    .add_namespace(
        NamespaceBuilder::new("config")
            .static_ns()
            .insert("api_version", ScalarValue::String("v2".into()))
            .insert(
                "regions",
                ObjectBuilder::new()
                    .insert("us-east", "Virginia")
                    .insert("us-west", "Oregon")
                    .insert("eu-west", "Ireland")
                    .build_scalar(),
            ),
    )
    .await?;
}

Values from static namespaces are available to all subsequent commands via Tera templating:

#![allow(unused)]
fn main() {
// In a later command's attributes
.insert("endpoint", "https://api.example.com/{{ config.api_version }}/data")
}

Iteration Sources

Iterative namespaces support several source types for determining what to iterate over:

ScalarArray

Iterate over elements in a JSON array:

#![allow(unused)]
fn main() {
NamespaceBuilder::new("process")
    .iterative()
    .store_path(StorePath::from_segments(["data", "items"]))
    .scalar_array(None)  // None = all items, Some((start, end)) = range
    .iter_var("item")
}

ScalarObjectKeys

Iterate over keys of a JSON object:

#![allow(unused)]
fn main() {
NamespaceBuilder::new("classify")
    .iterative()
    .store_path(StorePath::from_segments(["config", "regions"]))
    .scalar_object_keys(None, false)  // None = all keys, Some(vec) = filter
    .iter_var("region")
}

The second parameter controls exclusion - true means "iterate over all keys except those listed".

ScalarStringSplit

Iterate over parts of a delimited string:

#![allow(unused)]
fn main() {
NamespaceBuilder::new("tags")
    .iterative()
    .store_path(StorePath::from_segments(["data", "tag_list"]))
    .string_split(",")
    .iter_var("tag")
}

TabularColumn

Iterate over unique values in a DataFrame column:

#![allow(unused)]
fn main() {
NamespaceBuilder::new("by_category")
    .iterative()
    .store_path(StorePath::from_segments(["data", "products", "data"]))
    .tabular_column("category", None)
    .iter_var("category")
}

Complete Example: Object Key Iteration

Here is a full example showing how to iterate over object keys:

use panopticon_core::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut pipeline = Pipeline::new();

    // --- Static namespace: an object whose keys we will iterate ---
    pipeline
        .add_namespace(
            NamespaceBuilder::new("config").static_ns().insert(
                "regions",
                ObjectBuilder::new()
                    .insert("us-east", "Virginia")
                    .insert("us-west", "Oregon")
                    .insert("eu-west", "Ireland")
                    .build_scalar(),
            ),
        )
        .await?;

    // --- Iterative namespace: loop over each region key ---
    let condition_attrs = ObjectBuilder::new()
        .insert(
            "branches",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "is_us")
                    .insert("if", "region is starting_with(\"us-\")")
                    .insert("then", "Region {{ region }} is in the US")
                    .build_scalar(),
                ObjectBuilder::new()
                    .insert("name", "is_eu")
                    .insert("if", "region is starting_with(\"eu-\")")
                    .insert("then", "Region {{ region }} is in the EU")
                    .build_scalar(),
            ]),
        )
        .insert("default", "Region {{ region }} is in an unknown area")
        .build_hashmap();

    let mut handle = pipeline
        .add_namespace(
            NamespaceBuilder::new("classify")
                .iterative()
                .store_path(StorePath::from_segments(["config", "regions"]))
                .scalar_object_keys(None, false)
                .iter_var("region")
                .index_var("idx"),
        )
        .await?;
    handle
        .add_command::<ConditionCommand>("region", &condition_attrs)
        .await?;

    // --- Execute ---
    let completed = pipeline.compile().await?.execute().await?;
    let results = completed.results(ResultSettings::default()).await?;

    // --- Print results per iteration ---
    println!("=== Iterating over region keys ===\n");

    let mut idx = 0;
    loop {
        let source = StorePath::from_segments(["classify", "region"]).with_index(idx);
        let Some(cmd_results) = results.get_by_source(&source) else {
            break;
        };

        let result = cmd_results
            .data_get(&source.with_segment("result"))
            .and_then(|r| r.as_scalar())
            .expect("Expected result");
        println!("  [{}] {}", idx, result.1);

        idx += 1;
    }

    println!("\nProcessed {} region(s)", idx);

    Ok(())
}

Output:

=== Iterating over region keys ===

  [0] Region us-east is in the US
  [1] Region us-west is in the US
  [2] Region eu-west is in the EU

Processed 3 region(s)

Reserved Names

Two namespace names are reserved and cannot be used:

  • item - Default iteration variable name
  • index - Default index variable name

If you try to create a namespace with a reserved name, the builder will return an error.

Namespace Execution Order

Namespaces execute in the order they were added to the pipeline. This is important because later namespaces can reference data produced by earlier ones.

#![allow(unused)]
fn main() {
// 1. Load data (executes first)
pipeline.add_namespace(NamespaceBuilder::new("data")).await?;

// 2. Query the loaded data (executes second)
pipeline.add_namespace(NamespaceBuilder::new("query")).await?;

// 3. Aggregate the query results (executes third)
pipeline.add_namespace(NamespaceBuilder::new("stats")).await?;
}

Within a namespace, commands also execute in order. The combination of namespace ordering and command ordering gives us predictable, deterministic pipeline execution.

Commands and Attributes

Commands are the workhorses of Panopticon. Each command performs a specific operation - loading files, running SQL queries, evaluating conditions, and so on. We configure commands through attributes, which are key-value pairs that control the command's behavior.

The Command Trait

Under the hood, a command is any type that implements three traits:

#![allow(unused)]
fn main() {
pub trait Command: FromAttributes + Descriptor + Executable {}
}
  • FromAttributes - Constructs the command from an attribute map
  • Descriptor - Provides metadata (type name, attribute schema, result schema)
  • Executable - Performs the actual work during pipeline execution

Panopticon provides a blanket implementation, so any type implementing the three base traits automatically implements Command.

Adding Commands to a Pipeline

Commands are always added to a namespace. The pattern looks like this:

#![allow(unused)]
fn main() {
let mut handle = pipeline
    .add_namespace(NamespaceBuilder::new("data"))
    .await?;

handle
    .add_command::<FileCommand>("load", &file_attrs)
    .await?;
}

The turbofish syntax <FileCommand> tells Panopticon which command type to use. The string "load" is the command's name within this namespace, which becomes part of the store path for its results (data.load.*).

Building Attributes with ObjectBuilder

Attributes are a HashMap<String, ScalarValue>. While we could construct this manually, ObjectBuilder provides a more ergonomic interface:

#![allow(unused)]
fn main() {
let file_attrs = ObjectBuilder::new()
    .insert(
        "files",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "users")
                .insert("file", "data/users.csv")
                .insert("format", "csv")
                .build_scalar(),
        ]),
    )
    .build_hashmap();
}

ObjectBuilder Methods

MethodDescription
new()Create a new empty builder
insert(key, value)Add a key-value pair
object(key, nested)Add a nested ObjectBuilder
build_scalar()Convert to a ScalarValue::Object
build_hashmap()Convert to HashMap<String, ScalarValue>

Nested Objects

For complex attribute structures, we can nest ObjectBuilders:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert("name", "report")
    .object("options",
        ObjectBuilder::new()
            .insert("format", "json")
            .insert("pretty", true)
    )
    .build_hashmap();
}

This produces the equivalent of:

{
    "name": "report",
    "options": {
        "format": "json",
        "pretty": true
    }
}

Arrays of Objects

Many commands accept arrays of configuration objects:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "aggregations",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "total")
                .insert("op", "sum")
                .insert("column", "amount")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "average")
                .insert("op", "mean")
                .insert("column", "amount")
                .build_scalar(),
        ]),
    )
    .build_hashmap();
}

Common Attributes

All commands support a when attribute for conditional execution:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert("when", "data.load.status == \"success\"")
    // ... other attributes
    .build_hashmap();
}

The when attribute is evaluated as a Tera expression. If it evaluates to a falsy value, the command is skipped and its status is set to "skipped".

Attribute Validation

When a pipeline compiles, Panopticon validates all command attributes against their schemas. Each command type declares:

  • Required attributes - Must be present
  • Optional attributes - May be omitted
  • Type constraints - Values must match expected types

If validation fails, .compile() returns an error describing what is wrong.

Command Results

Every command produces results, which are stored at paths derived from the namespace and command name. All commands automatically produce:

ResultTypeDescription
statusString"success", "skipped", "error", or "cancelled"
duration_msNumberExecution time in milliseconds

Commands also produce their own specific results. For example, ConditionCommand produces:

ResultTypeDescription
resultStringThe value from the matched branch or default
matchedBoolWhether a branch condition matched
branch_indexNumberIndex of matched branch, or -1 for default

Result Kinds: Meta vs Data

Results are categorized as either Meta or Data:

  • Meta results describe the execution (status, duration, row counts)
  • Data results contain the actual output (query results, computed values)

This distinction matters when collecting results - we might want to include all data but only summary metadata.

Example: ConditionCommand

Let us walk through a complete example using ConditionCommand:

use panopticon_core::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut pipeline = Pipeline::new();

    // Static config providing a value to check
    pipeline
        .add_namespace(
            NamespaceBuilder::new("input")
                .static_ns()
                .insert("score", ScalarValue::Number(85.into())),
        )
        .await?;

    // Condition command to classify the score
    let condition_attrs = ObjectBuilder::new()
        .insert(
            "branches",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "excellent")
                    .insert("if", "input.score >= 90")
                    .insert("then", "Excellent work!")
                    .build_scalar(),
                ObjectBuilder::new()
                    .insert("name", "good")
                    .insert("if", "input.score >= 70")
                    .insert("then", "Good job!")
                    .build_scalar(),
                ObjectBuilder::new()
                    .insert("name", "needs_work")
                    .insert("if", "input.score >= 50")
                    .insert("then", "Keep practicing!")
                    .build_scalar(),
            ]),
        )
        .insert("default", "Please try again.")
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("classify"))
        .await?
        .add_command::<ConditionCommand>("score", &condition_attrs)
        .await?;

    // Execute
    let completed = pipeline.compile().await?.execute().await?;
    let results = completed.results(ResultSettings::default()).await?;

    // Get the result
    let source = StorePath::from_segments(["classify", "score"]);
    let cmd_results = results.get_by_source(&source).expect("Expected results");

    let result = cmd_results
        .data_get(&source.with_segment("result"))
        .and_then(|r| r.as_scalar())
        .expect("Expected result");

    println!("Classification: {}", result.1);  // "Good job!"

    Ok(())
}

Built-in Commands

Panopticon provides several built-in commands:

CommandPurpose
FileCommandLoad data from CSV, JSON, or Parquet files
SqlCommandRun SQL queries against loaded DataFrames
AggregateCommandCompute aggregations (sum, mean, count, etc.)
ConditionCommandEvaluate conditional logic with branches
TemplateCommandGenerate text using Tera templates

Each command has its own attribute schema documented in the Commands section.

Tera Substitution in Attributes

String attributes support Tera template syntax. Before a command executes, Panopticon substitutes any {{ ... }} expressions with values from the scalar store:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert("query", "SELECT * FROM users WHERE region = '{{ config.region }}'")
    .build_hashmap();
}

This enables dynamic configuration based on earlier command results or static namespace values.

Summary

The command system in Panopticon follows a consistent pattern:

  1. Commands implement FromAttributes, Descriptor, and Executable
  2. We add commands to namespaces using add_command::<T>(name, &attrs)
  3. Attributes are built using ObjectBuilder for type safety
  4. All commands support the when attribute for conditional execution
  5. Results are stored at namespace.command.field paths
  6. String attributes support Tera templating for dynamic values

This design keeps command configuration declarative while enabling powerful dynamic behavior through templating and conditional execution.

Data Stores

All data in a Panopticon pipeline flows through two stores: the ScalarStore for JSON-like values and the TabularStore for Polars DataFrames. Understanding these stores is essential for working with command inputs and outputs.

Two Stores, Two Data Models

┌─────────────────────────────────────────────────────────────────┐
│                      ExecutionContext                           │
│                                                                 │
│  ┌─────────────────────────────┐  ┌─────────────────────────┐   │
│  │        ScalarStore          │  │      TabularStore       │   │
│  │                             │  │                         │   │
│  │  • Strings                  │  │  • Polars DataFrames    │   │
│  │  • Numbers                  │  │  • Columnar data        │   │
│  │  • Booleans                 │  │  • SQL-queryable        │   │
│  │  • Arrays                   │  │                         │   │
│  │  • Objects (nested)         │  │                         │   │
│  │  • Null                     │  │                         │   │
│  │                             │  │                         │   │
│  │  Backed by Tera Context     │  │  HashMap<String, DF>    │   │
│  │  (enables templating)       │  │                         │   │
│  └─────────────────────────────┘  └─────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

ScalarStore

The ScalarStore holds JSON-like values. Internally, it wraps a Tera context, which means all scalar values are automatically available for template substitution.

#![allow(unused)]
fn main() {
pub type ScalarValue = tera::Value;  // Re-export of serde_json::Value
}

Scalar values can be:

  • Null - Absence of a value
  • Bool - true or false
  • Number - Integers or floating-point
  • String - Text data
  • Array - Ordered list of values
  • Object - Key-value map (nested structure)

TabularStore

The TabularStore holds Polars DataFrames - efficient columnar data structures ideal for analytical queries.

#![allow(unused)]
fn main() {
pub type TabularValue = polars::prelude::DataFrame;
}

DataFrames are stored by their full store path (as a dotted string key). Commands like SqlCommand can register these as tables and query across them.

Store Paths

Values in both stores are addressed using StorePath - a structured path that typically follows the pattern namespace.command.field:

#![allow(unused)]
fn main() {
// Create a store path
let path = StorePath::from_segments(["data", "load", "users", "data"]);

// Access with dotted notation
let dotted = path.to_dotted();  // "data.load.users.data"

// Add segments
let status_path = path.with_segment("status");  // "data.load.users.status"

// Add iteration index
let indexed = path.with_index(0);  // "data.load.users[0]"
}

Store paths provide a consistent way to reference data throughout the pipeline.

ScalarValue Operations

Creating Values

Panopticon provides helper functions and the ObjectBuilder for creating scalar values:

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

// Primitives convert automatically via Into<ScalarValue>
let string_val: ScalarValue = "hello".into();
let number_val: ScalarValue = 42.into();
let bool_val: ScalarValue = true.into();

// Arrays
let array_val = ScalarValue::Array(vec![
    "a".into(),
    "b".into(),
    "c".into(),
]);

// Objects using ObjectBuilder
let object_val = ObjectBuilder::new()
    .insert("name", "Alice")
    .insert("age", 30)
    .insert("active", true)
    .build_scalar();
}

Type Checking and Extraction

ScalarValue provides methods for type checking and extraction:

#![allow(unused)]
fn main() {
// Standard serde_json methods
if let Some(s) = value.as_str() { /* use string */ }
if let Some(n) = value.as_i64() { /* use integer */ }
if let Some(b) = value.as_bool() { /* use boolean */ }
if let Some(arr) = value.as_array() { /* use array */ }
if let Some(obj) = value.as_object() { /* use object */ }

// Extension trait with error handling
use panopticon_core::prelude::ScalarAsExt;

let name = value.as_str_or_err("name")?;      // Returns Result
let count = value.as_i64_or_err("count")?;
let items = value.as_array_or_err("items")?;
}

Map Extension Trait

For working with object maps, the ScalarMapExt trait provides convenient accessors:

#![allow(unused)]
fn main() {
use panopticon_core::prelude::ScalarMapExt;

let obj = value.as_object().unwrap();

// Required fields (returns Result)
let name = obj.get_required_string("name")?;
let count = obj.get_required_i64("count")?;
let enabled = obj.get_required_bool("enabled")?;

// Optional fields (returns Option)
let description = obj.get_optional_string("description");
let limit = obj.get_optional_i64("limit");
}

How Commands Use Stores

Commands read inputs from and write outputs to the stores. Here is a typical pattern:

┌──────────────┐         ┌─────────────┐         ┌──────────────┐
│ ScalarStore  │────────▶│   Command   │────────▶│ ScalarStore  │
│ (inputs)     │         │             │         │ (outputs)    │
└──────────────┘         │  - Read     │         └──────────────┘
                         │  - Process  │
┌──────────────┐         │  - Write    │         ┌──────────────┐
│ TabularStore │────────▶│             │────────▶│ TabularStore │
│ (inputs)     │         └─────────────┘         │ (outputs)    │
└──────────────┘                                 └──────────────┘

FileCommand Example

When FileCommand loads a CSV file:

  1. Reads the file from disk (not from stores)
  2. Writes DataFrame to TabularStore at namespace.command.name.data
  3. Writes metadata to ScalarStore:
    • namespace.command.name.rows - Row count
    • namespace.command.name.columns - Column count
    • namespace.command.status - "success"
    • namespace.command.duration_ms - Execution time

SqlCommand Example

When SqlCommand runs a query:

  1. Reads DataFrames from TabularStore based on tables attribute
  2. Registers them as SQL tables
  3. Executes the query
  4. Writes result DataFrame to TabularStore
  5. Writes metadata to ScalarStore

Template Substitution

Because ScalarStore wraps a Tera context, all values are available for template substitution in command attributes:

#![allow(unused)]
fn main() {
// Static namespace provides config
pipeline.add_namespace(
    NamespaceBuilder::new("config")
        .static_ns()
        .insert("region", "us-east")
        .insert("limit", ScalarValue::Number(100.into()))
).await?;

// SQL command uses templated query
let sql_attrs = ObjectBuilder::new()
    .insert("query", "SELECT * FROM users WHERE region = '{{ config.region }}' LIMIT {{ config.limit }}")
    .build_hashmap();
}

During execution, Panopticon substitutes {{ config.region }} with "us-east" and {{ config.limit }} with 100 before the command runs.

Accessing Results

After pipeline execution, we retrieve results through the ResultStore:

#![allow(unused)]
fn main() {
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(ResultSettings::default()).await?;

// Get results for a specific command
let source = StorePath::from_segments(["data", "load"]);
let cmd_results = results.get_by_source(&source).expect("Expected results");

// Access metadata
let rows = cmd_results.meta_get(&source.with_segment("rows"));

// Access data
let status = cmd_results.data_get(&source.with_segment("result"));
}

ResultValue Types

Results come in two forms:

#![allow(unused)]
fn main() {
pub enum ResultValue {
    Scalar {
        ty: ScalarType,
        value: ScalarValue,
    },
    Tabular {
        path: PathBuf,      // File path where DataFrame was written
        format: OutputFormat,
        rows_count: usize,
        columns_count: usize,
    },
}
}

Tabular results are written to disk (CSV, JSON, or Parquet) and the ResultValue contains the path and summary statistics.

ScalarType Enum

For type introspection, Panopticon provides a ScalarType enum:

#![allow(unused)]
fn main() {
pub enum ScalarType {
    Null,
    Bool,
    Number,
    String,
    Array,
    Object,
}
}

This is useful for schema validation and result type checking.

Practical Example

Here is a complete example showing data flow through the stores:

use panopticon_core::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut pipeline = Pipeline::new();

    // 1. Static namespace adds values to ScalarStore
    pipeline
        .add_namespace(
            NamespaceBuilder::new("config")
                .static_ns()
                .insert("threshold", ScalarValue::Number(50.into()))
        )
        .await?;

    // 2. FileCommand writes DataFrame to TabularStore
    let file_attrs = ObjectBuilder::new()
        .insert("files", ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "scores")
                .insert("file", "data/scores.csv")
                .insert("format", "csv")
                .build_scalar(),
        ]))
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("data"))
        .await?
        .add_command::<FileCommand>("load", &file_attrs)
        .await?;

    // 3. SqlCommand reads from TabularStore, uses ScalarStore for templating
    let sql_attrs = ObjectBuilder::new()
        .insert("tables", ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "scores")
                .insert("source", "data.load.scores.data")
                .build_scalar(),
        ]))
        .insert("query", "SELECT * FROM scores WHERE value > {{ config.threshold }}")
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("filtered"))
        .await?
        .add_command::<SqlCommand>("high_scores", &sql_attrs)
        .await?;

    // Execute and collect results
    let completed = pipeline.compile().await?.execute().await?;
    let results = completed.results(ResultSettings::default()).await?;

    // 4. Results contain both scalar metadata and tabular file paths
    let source = StorePath::from_segments(["filtered", "high_scores"]);
    if let Some(cmd_results) = results.get_by_source(&source) {
        // Metadata from ScalarStore
        if let Some(rows) = cmd_results.meta_get(&source.with_segment("rows")) {
            println!("Filtered rows: {}", rows);
        }

        // Tabular data was written to file
        if let Some(result_value) = cmd_results.data_get(&source.with_segment("data")) {
            if let Some((path, _format)) = result_value.as_tabular() {
                println!("Data written to: {}", path.display());
            }
        }
    }

    Ok(())
}

Summary

The dual-store architecture in Panopticon separates concerns:

  • ScalarStore handles configuration, metadata, and template substitution
  • TabularStore handles large datasets efficiently with Polars

This separation allows us to:

  • Use JSON-like values for flexible configuration
  • Leverage Tera templating for dynamic attribute values
  • Process large datasets with columnar efficiency
  • Query across DataFrames using SQL

Understanding how data flows through these stores is key to building effective Panopticon pipelines.

Built-in Commands

Panopticon includes a set of built-in commands that cover common data pipeline operations. These commands work together to load, transform, analyze, and export data.

Command Overview

CommandPurposeKey Use Cases
FileCommandLoad data filesRead CSV, JSON, and Parquet files into the tabular store
SqlCommandQuery tabular dataFilter, join, transform data using SQL syntax
AggregateCommandCompute statisticsCalculate count, sum, mean, max, min, median, and more
ConditionCommandBranch logicEvaluate Tera expressions to produce conditional outputs
TemplateCommandRender templatesGenerate files using Tera templates with inheritance

Common Patterns

Data Loading and Analysis

A typical pipeline starts by loading data with FileCommand, optionally transforms it with SqlCommand, and computes metrics with AggregateCommand:

#![allow(unused)]
fn main() {
// Load CSV data
pipeline
    .add_namespace(NamespaceBuilder::new("data"))
    .await?
    .add_command::<FileCommand>("load", &file_attrs)
    .await?;

// Query the loaded data
pipeline
    .add_namespace(NamespaceBuilder::new("query"))
    .await?
    .add_command::<SqlCommand>("filtered", &query_attrs)
    .await?;

// Aggregate results
pipeline
    .add_namespace(NamespaceBuilder::new("stats"))
    .await?
    .add_command::<AggregateCommand>("summary", &agg_attrs)
    .await?;
}

Conditional Execution

All commands support the optional when attribute, which is a Tera expression evaluated at runtime. If it resolves to a falsy value, the command is skipped:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert("when", "inputs.feature_enabled")  // Skip if false
    .insert("source", "data.load.users.data")
    // ... other attributes
    .build_hashmap();
}

When a command is skipped:

  • Its status meta result is set to "skipped"
  • Data results are absent from the ResultStore
  • Dependent commands that reference its outputs will fail unless handled

Store Path References

Commands that consume data from earlier pipeline stages use store paths to reference results. Store paths are dot-separated strings that identify locations in the data stores:

namespace.command.result_key

For example:

  • data.load.users.data - The tabular data loaded from users file
  • stats.summary.row_count - A scalar aggregation result

See Store Paths for more details.

Tera Template Substitution

Many command attributes support Tera template syntax for dynamic values. This is indicated by "supports Tera substitution" in the attribute documentation:

#![allow(unused)]
fn main() {
.insert("file", "{{ config.data_dir }}/users.csv")
.insert("query", "SELECT * FROM users WHERE status = '{{ inputs.status }}'")
}

The execution context automatically substitutes these expressions using values from the scalar store.

Result Types

Commands produce two types of results:

Meta Results

Metadata about the command execution (row counts, sizes, column lists). These are accessed via meta_get() on the command results:

#![allow(unused)]
fn main() {
let row_count = cmd_results
    .meta_get(&source.with_segment("rows"))
    .expect("Expected rows");
}

Data Results

The primary outputs of the command (DataFrames, computed values). These are accessed via data_get():

#![allow(unused)]
fn main() {
let df = cmd_results
    .data_get(&source.with_segment("data"))
    .and_then(|r| r.as_tabular());
}

Each command's documentation lists which results are meta vs. data.

FileCommand

FileCommand loads data files from disk into the tabular store. It supports CSV, JSON, and Parquet formats.

When to Use

Use FileCommand when you need to:

  • Load one or more data files into a pipeline
  • Ingest data in different formats (CSV, JSON, Parquet)
  • Make tabular data available for SQL queries or aggregations

Attributes

AttributeTypeRequiredDescription
filesArray of objectsYesArray of file specifications to load

File Object Fields

Each object in the files array has the following fields:

FieldTypeRequiredDescription
nameStringYesIdentifier for this file in the tabular store
fileStringYesPath to the file (supports Tera substitution)
formatStringYesFile format: csv, json, or parquet (supports Tera substitution)

Results

Meta Results

ResultTypeDescription
countNumberTotal number of files loaded
total_rowsNumberSum of rows across all loaded files
total_sizeNumberSum of file sizes in bytes

Data Results (Per File)

For each file in the files array, the following results are produced under {output_prefix}.{name}:

ResultTypeDescription
dataTabular (DataFrame)The loaded data
rowsNumberRow count for this file
sizeNumberFile size in bytes
columnsArrayColumn names in the loaded data

Examples

Loading a Single CSV File

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

let attrs = ObjectBuilder::new()
    .insert(
        "files",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "users")
                .insert("file", "/path/to/users.csv")
                .insert("format", "csv")
                .build_scalar(),
        ]),
    )
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("data"))
    .await?
    .add_command::<FileCommand>("load", &attrs)
    .await?;

// After execution, the data is available at:
// - data.load.users.data      (the DataFrame)
// - data.load.users.rows      (row count)
// - data.load.users.columns   (column names)
}

Loading Multiple Formats

Load CSV, JSON, and Parquet files in a single command:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "files",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "users")
                .insert("file", "fixtures/users.csv")
                .insert("format", "csv")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "events")
                .insert("file", "fixtures/events.json")
                .insert("format", "json")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "metrics")
                .insert("file", "fixtures/metrics.parquet")
                .insert("format", "parquet")
                .build_scalar(),
        ]),
    )
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("data"))
    .await?
    .add_command::<FileCommand>("load", &attrs)
    .await?;
}

After execution:

  • data.load.users.data - DataFrame from users.csv
  • data.load.events.data - DataFrame from events.json
  • data.load.metrics.data - DataFrame from metrics.parquet
  • data.load.count - 3 (number of files loaded)
  • data.load.total_rows - Combined row count

Using Tera Substitution for Dynamic Paths

#![allow(unused)]
fn main() {
// First, set up a static namespace with configuration
pipeline
    .add_namespace(
        NamespaceBuilder::new("config")
            .static_ns()
            .insert("data_dir", ScalarValue::String("/var/data".to_string()))
            .insert("file_format", ScalarValue::String("csv".to_string())),
    )
    .await?;

// Then reference those values in FileCommand
let attrs = ObjectBuilder::new()
    .insert(
        "files",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "daily_report")
                .insert("file", "{{ config.data_dir }}/report.{{ config.file_format }}")
                .insert("format", "{{ config.file_format }}")
                .build_scalar(),
        ]),
    )
    .build_hashmap();
}

Accessing Results

#![allow(unused)]
fn main() {
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(ResultSettings::default()).await?;

let source = StorePath::from_segments(["data", "load"]);
let cmd_results = results.get_by_source(&source).expect("Expected results");

// Access meta results
let file_count = cmd_results
    .meta_get(&source.with_segment("count"))
    .expect("Expected count");
let total_rows = cmd_results
    .meta_get(&source.with_segment("total_rows"))
    .expect("Expected total_rows");

println!("Loaded {} files with {} total rows", file_count, total_rows);

// Access per-file meta
let users_rows = cmd_results
    .meta_get(&StorePath::from_dotted("data.load.users.rows"))
    .expect("Expected users rows");
}

Common Patterns

FileCommand + SqlCommand

Load data with FileCommand, then query it with SqlCommand:

#![allow(unused)]
fn main() {
// Load
let file_attrs = ObjectBuilder::new()
    .insert("files", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "orders")
            .insert("file", "orders.csv")
            .insert("format", "csv")
            .build_scalar(),
    ]))
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("data"))
    .await?
    .add_command::<FileCommand>("load", &file_attrs)
    .await?;

// Query - reference the loaded data by store path
let query_attrs = ObjectBuilder::new()
    .insert("tables", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "orders")
            .insert("source", "data.load.orders.data")  // Store path reference
            .build_scalar(),
    ]))
    .insert("query", "SELECT * FROM orders WHERE status = 'completed'")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("query"))
    .await?
    .add_command::<SqlCommand>("completed", &query_attrs)
    .await?;
}

Error Handling

FileCommand will return an error if:

  • The file does not exist
  • The path points to a directory instead of a file
  • The file format is not one of csv, json, or parquet
  • The file content cannot be parsed as the specified format

Format Notes

CSV

  • Assumes the first row contains headers
  • Uses default CSV parsing options from Polars

JSON

  • Expects newline-delimited JSON (NDJSON) or JSON array format
  • Uses Polars' JsonReader

Parquet

  • Reads standard Apache Parquet files
  • Efficient for large datasets with columnar storage

SqlCommand

SqlCommand executes SQL queries against tabular data stored in the pipeline. It uses Polars' SQL context to provide full SQL query capabilities on DataFrames.

When to Use

Use SqlCommand when you need to:

  • Filter rows based on conditions
  • Select specific columns
  • Join multiple tables together
  • Group and aggregate data
  • Transform data using SQL expressions
  • Order and limit results

Attributes

AttributeTypeRequiredDescription
tablesArray of objectsYesTable mappings from store paths to SQL table names
queryStringYesSQL query to execute (supports Tera substitution)

Table Object Fields

Each object in the tables array maps a store path to a table name for use in SQL:

FieldTypeRequiredDescription
nameStringYesTable name to use in the SQL query
sourceStringYesStore path to tabular data (e.g., data.load.users.data)

Results

Data Results

ResultTypeDescription
dataTabular (DataFrame)The query result

Meta Results

ResultTypeDescription
rowsNumberNumber of rows in the result
columnsArrayColumn names in the result

Examples

Basic Query

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

// Assume data was loaded with FileCommand at data.load.users.data
let attrs = ObjectBuilder::new()
    .insert(
        "tables",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "users")
                .insert("source", "data.load.users.data")
                .build_scalar(),
        ]),
    )
    .insert("query", "SELECT * FROM users WHERE active = true")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("query"))
    .await?
    .add_command::<SqlCommand>("active_users", &attrs)
    .await?;

// Result available at: query.active_users.data
}

Joining Multiple Tables

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "tables",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "users")
                .insert("source", "data.load.users.data")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "orders")
                .insert("source", "data.load.orders.data")
                .build_scalar(),
        ]),
    )
    .insert(
        "query",
        "SELECT u.name, u.email, o.order_id, o.total \
         FROM users u \
         INNER JOIN orders o ON u.id = o.user_id \
         ORDER BY o.total DESC",
    )
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("query"))
    .await?
    .add_command::<SqlCommand>("user_orders", &attrs)
    .await?;
}

Cross Join

Combine every row from one table with every row from another:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "tables",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "users")
                .insert("source", "data.load.users.data")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "events")
                .insert("source", "data.load.events.data")
                .build_scalar(),
        ]),
    )
    .insert(
        "query",
        "SELECT u.name, u.email, e.type AS event_type, e.timestamp \
         FROM users u CROSS JOIN events e \
         ORDER BY u.name, e.timestamp",
    )
    .build_hashmap();
}

Dynamic Query with Tera Substitution

Use Tera expressions to parameterize queries:

#![allow(unused)]
fn main() {
// Static namespace with filter values
pipeline
    .add_namespace(
        NamespaceBuilder::new("inputs")
            .static_ns()
            .insert("status", ScalarValue::String("active".to_string()))
            .insert("min_age", ScalarValue::from(18)),
    )
    .await?;

// Query with Tera substitution
let attrs = ObjectBuilder::new()
    .insert(
        "tables",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "users")
                .insert("source", "data.load.users.data")
                .build_scalar(),
        ]),
    )
    .insert(
        "query",
        "SELECT * FROM users WHERE status = '{{ inputs.status }}' AND age >= {{ inputs.min_age }}",
    )
    .build_hashmap();
}

Aggregation in SQL

While AggregateCommand is available for simple aggregations, SQL can perform grouped aggregations:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "tables",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "orders")
                .insert("source", "data.load.orders.data")
                .build_scalar(),
        ]),
    )
    .insert(
        "query",
        "SELECT category, COUNT(*) as order_count, SUM(total) as revenue \
         FROM orders \
         GROUP BY category \
         ORDER BY revenue DESC",
    )
    .build_hashmap();
}

Accessing Results

#![allow(unused)]
fn main() {
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(ResultSettings::default()).await?;

let source = StorePath::from_segments(["query", "active_users"]);
let cmd_results = results.get_by_source(&source).expect("Expected results");

// Access meta results
let row_count = cmd_results
    .meta_get(&source.with_segment("rows"))
    .expect("Expected rows");
let columns = cmd_results
    .meta_get(&source.with_segment("columns"))
    .expect("Expected columns");

println!("Query returned {} rows with columns: {}", row_count, columns);

// Access the DataFrame
let data_result = cmd_results
    .data_get(&source.with_segment("data"))
    .expect("Expected data");
}

Common Patterns

Chaining SQL Queries

Use the output of one SQL query as input to another:

#![allow(unused)]
fn main() {
// First query: filter data
let filter_attrs = ObjectBuilder::new()
    .insert("tables", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "orders")
            .insert("source", "data.load.orders.data")
            .build_scalar(),
    ]))
    .insert("query", "SELECT * FROM orders WHERE status = 'completed'")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("step1"))
    .await?
    .add_command::<SqlCommand>("filtered", &filter_attrs)
    .await?;

// Second query: aggregate the filtered data
let agg_attrs = ObjectBuilder::new()
    .insert("tables", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "orders")
            .insert("source", "step1.filtered.data")  // Reference previous result
            .build_scalar(),
    ]))
    .insert("query", "SELECT category, SUM(total) as revenue FROM orders GROUP BY category")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("step2"))
    .await?
    .add_command::<SqlCommand>("by_category", &agg_attrs)
    .await?;
}

SqlCommand + AggregateCommand

Query with SQL, then compute scalar statistics with AggregateCommand:

#![allow(unused)]
fn main() {
// SQL query
let query_attrs = ObjectBuilder::new()
    .insert("tables", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "products")
            .insert("source", "data.load.products.data")
            .build_scalar(),
    ]))
    .insert("query", "SELECT * FROM products WHERE in_stock = true")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("query"))
    .await?
    .add_command::<SqlCommand>("in_stock", &query_attrs)
    .await?;

// Aggregate the query result
let agg_attrs = ObjectBuilder::new()
    .insert("source", "query.in_stock.data")
    .insert("aggregations", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "avg_price")
            .insert("column", "price")
            .insert("op", "mean")
            .build_scalar(),
    ]))
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("stats"))
    .await?
    .add_command::<AggregateCommand>("summary", &agg_attrs)
    .await?;
}

SQL Dialect

SqlCommand uses Polars' SQL context, which supports a subset of standard SQL:

  • SELECT, FROM, WHERE, ORDER BY, LIMIT
  • JOIN (INNER, LEFT, RIGHT, FULL, CROSS)
  • GROUP BY, HAVING
  • Common aggregate functions: COUNT, SUM, AVG, MIN, MAX
  • String functions, date functions, and more

Refer to the Polars SQL documentation for the complete list of supported features.

Error Handling

SqlCommand will return an error if:

  • A table source store path does not exist
  • The SQL query syntax is invalid
  • A referenced column does not exist in the table
  • The query execution fails for any reason

AggregateCommand

AggregateCommand computes scalar statistics from tabular data. It supports a variety of aggregation operations including count, sum, mean, min, max, median, and more.

When to Use

Use AggregateCommand when you need to:

  • Compute summary statistics from a DataFrame
  • Extract scalar values (counts, sums, averages) for use in templates or conditions
  • Calculate multiple aggregations in a single command
  • Get values like row count, unique counts, or null counts

Attributes

AttributeTypeRequiredDescription
sourceStringYesStore path to tabular data (e.g., data.load.products.data)
aggregationsArray of objectsYesArray of aggregation specifications

Aggregation Object Fields

Each object in the aggregations array specifies one aggregation:

FieldTypeRequiredDescription
nameStringYesOutput scalar name for this aggregation
columnStringNoColumn to aggregate (not required for count)
opStringYesAggregation operation to perform

Supported Operations

OperationAliasesColumn RequiredDescription
sum-YesSum of values in the column
meanavg, averageYesArithmetic mean of values
min-YesMinimum value
max-YesMaximum value
countlenNoNumber of rows in the DataFrame
first-YesFirst value in the column
last-YesLast value in the column
stdstddevYesStandard deviation
median-YesMedian value
n_uniquenunique, distinctYesCount of unique values
null_countnullsYesCount of null values in the column

Results

Data Results (Per Aggregation)

For each aggregation in the aggregations array, a scalar result is produced:

ResultTypeDescription
{name}Scalar (Number)The computed aggregation value

The result path is {output_prefix}.{name}, where {name} is the name field from the aggregation object.

Examples

Basic Aggregations

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

let attrs = ObjectBuilder::new()
    .insert("source", "data.load.products.data")
    .insert(
        "aggregations",
        ScalarValue::Array(vec![
            // Count doesn't need a column
            ObjectBuilder::new()
                .insert("name", "row_count")
                .insert("op", "count")
                .build_scalar(),
            // Sum requires a column
            ObjectBuilder::new()
                .insert("name", "total_price")
                .insert("column", "price")
                .insert("op", "sum")
                .build_scalar(),
            // Mean/average
            ObjectBuilder::new()
                .insert("name", "avg_price")
                .insert("column", "price")
                .insert("op", "mean")
                .build_scalar(),
        ]),
    )
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("stats"))
    .await?
    .add_command::<AggregateCommand>("summary", &attrs)
    .await?;

// Results available at:
// - stats.summary.row_count
// - stats.summary.total_price
// - stats.summary.avg_price
}

Full Statistical Summary

Compute comprehensive statistics for a dataset:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert("source", "data.load.products.data")
    .insert(
        "aggregations",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "row_count")
                .insert("op", "count")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "total_price")
                .insert("column", "price")
                .insert("op", "sum")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "avg_price")
                .insert("column", "price")
                .insert("op", "mean")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "max_quantity")
                .insert("column", "quantity")
                .insert("op", "max")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "min_quantity")
                .insert("column", "quantity")
                .insert("op", "min")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "median_price")
                .insert("column", "price")
                .insert("op", "median")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "price_stddev")
                .insert("column", "price")
                .insert("op", "std")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "unique_categories")
                .insert("column", "category")
                .insert("op", "n_unique")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "missing_descriptions")
                .insert("column", "description")
                .insert("op", "null_count")
                .build_scalar(),
        ]),
    )
    .build_hashmap();
}

First and Last Values

Extract the first or last value from a column (useful for time-series data):

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert("source", "data.load.events.data")
    .insert(
        "aggregations",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "first_event")
                .insert("column", "event_type")
                .insert("op", "first")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "last_event")
                .insert("column", "event_type")
                .insert("op", "last")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "earliest_timestamp")
                .insert("column", "timestamp")
                .insert("op", "first")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "latest_timestamp")
                .insert("column", "timestamp")
                .insert("op", "last")
                .build_scalar(),
        ]),
    )
    .build_hashmap();
}

Accessing Results

#![allow(unused)]
fn main() {
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(ResultSettings::default()).await?;

let source = StorePath::from_segments(["stats", "products"]);
let cmd_results = results.get_by_source(&source).expect("Expected results");

// Access aggregation results
let row_count = cmd_results
    .data_get(&source.with_segment("row_count"))
    .and_then(|r| r.as_scalar())
    .expect("Expected row_count");

let avg_price = cmd_results
    .data_get(&source.with_segment("avg_price"))
    .and_then(|r| r.as_scalar())
    .expect("Expected avg_price");

println!("Products: {} rows, average price: {}", row_count.1, avg_price.1);
}

Common Patterns

Using Aggregates in Templates

Aggregation results are stored in the scalar store and can be referenced in Tera templates:

#![allow(unused)]
fn main() {
// First: aggregate data
let agg_attrs = ObjectBuilder::new()
    .insert("source", "data.load.products.data")
    .insert("aggregations", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "total_count")
            .insert("op", "count")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "total_value")
            .insert("column", "price")
            .insert("op", "sum")
            .build_scalar(),
    ]))
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("stats"))
    .await?
    .add_command::<AggregateCommand>("summary", &agg_attrs)
    .await?;

// Then: use in a template
let template_attrs = ObjectBuilder::new()
    .insert("templates", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "report")
            .insert("content", "Total products: {{ stats.summary.total_count }}\nTotal value: ${{ stats.summary.total_value }}")
            .build_scalar(),
    ]))
    .insert("render", "report")
    .insert("output", "/tmp/report.txt")
    .build_hashmap();
}

Using Aggregates in Conditions

Branch logic based on aggregation results:

#![allow(unused)]
fn main() {
// Aggregate first
let agg_attrs = ObjectBuilder::new()
    .insert("source", "data.load.orders.data")
    .insert("aggregations", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "order_count")
            .insert("op", "count")
            .build_scalar(),
    ]))
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("metrics"))
    .await?
    .add_command::<AggregateCommand>("orders", &agg_attrs)
    .await?;

// Condition based on aggregate
let condition_attrs = ObjectBuilder::new()
    .insert("branches", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "high_volume")
            .insert("if", "metrics.orders.order_count > 1000")
            .insert("then", "High order volume detected")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "normal")
            .insert("if", "true")
            .insert("then", "Normal order volume")
            .build_scalar(),
    ]))
    .build_hashmap();
}

Aggregating SQL Query Results

Chain with SqlCommand to aggregate filtered data:

#![allow(unused)]
fn main() {
// Query
let query_attrs = ObjectBuilder::new()
    .insert("tables", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "orders")
            .insert("source", "data.load.orders.data")
            .build_scalar(),
    ]))
    .insert("query", "SELECT * FROM orders WHERE status = 'completed'")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("query"))
    .await?
    .add_command::<SqlCommand>("completed", &query_attrs)
    .await?;

// Aggregate the filtered results
let agg_attrs = ObjectBuilder::new()
    .insert("source", "query.completed.data")  // Reference SQL output
    .insert("aggregations", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "completed_count")
            .insert("op", "count")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "total_revenue")
            .insert("column", "total")
            .insert("op", "sum")
            .build_scalar(),
    ]))
    .build_hashmap();
}

Error Handling

AggregateCommand will return an error if:

  • The source store path does not exist
  • A specified column does not exist in the DataFrame
  • An operation that requires a column (anything except count) is missing the column field
  • The operation name is not recognized

Type Handling

  • Numeric columns return appropriate numeric types (integer or float)
  • first and last operations work on both numeric and string columns
  • Non-finite values (NaN, infinity) are converted to null
  • Integer results are preserved when possible (e.g., count, sum of integers)

ConditionCommand

ConditionCommand evaluates Tera expressions to select between multiple branches. It provides if/then branching logic for pipelines, producing a result based on the first matching condition.

When to Use

Use ConditionCommand when you need to:

  • Choose between different values based on runtime conditions
  • Implement feature flags or configuration-based branching
  • Generate different outputs based on data characteristics
  • Create conditional messages or labels

Attributes

AttributeTypeRequiredDescription
branchesArray of objectsYesArray of condition branches evaluated in order
defaultStringNoDefault value if no branch matches (supports Tera substitution)

Branch Object Fields

Each object in the branches array defines one condition:

FieldTypeRequiredDescription
nameStringYesUnique identifier for this branch
ifStringYesTera expression to evaluate as the condition
thenStringYesValue if condition is true (supports Tera substitution)

Results

Data Results (Fixed)

ResultTypeDescription
resultStringThe value from the matched branch or default
matchedBooleanWhether a branch condition matched (false if default was used)
branch_indexNumberIndex of the matched branch (0-based), or -1 if default was used

Data Results (Per Branch)

For each branch in the branches array, an object is stored:

ResultTypeDescription
{name}ObjectContains matched (bool) and value (string) for this branch

Condition Evaluation

The if expression is wrapped in {{ }} and evaluated as a Tera expression. The result is considered truthy if it is:

  • A non-empty string (except "false", "0", "null", "undefined")
  • A non-zero number
  • Boolean true

Branches are evaluated in order. The first truthy branch wins, and subsequent branches are not evaluated.

Examples

Basic Conditional

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

// Set up inputs
pipeline
    .add_namespace(
        NamespaceBuilder::new("inputs")
            .static_ns()
            .insert("user_role", ScalarValue::String("admin".to_string())),
    )
    .await?;

let attrs = ObjectBuilder::new()
    .insert(
        "branches",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "admin_greeting")
                .insert("if", "inputs.user_role == 'admin'")
                .insert("then", "Welcome, Administrator!")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "user_greeting")
                .insert("if", "inputs.user_role == 'user'")
                .insert("then", "Welcome, User!")
                .build_scalar(),
        ]),
    )
    .insert("default", "Welcome, Guest!")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("greeting"))
    .await?
    .add_command::<ConditionCommand>("message", &attrs)
    .await?;

// Result: "Welcome, Administrator!"
}

Feature Flag Pattern

#![allow(unused)]
fn main() {
// Static namespace with feature flags
pipeline
    .add_namespace(
        NamespaceBuilder::new("features")
            .static_ns()
            .insert("new_dashboard", ScalarValue::Bool(true))
            .insert("beta_api", ScalarValue::Bool(false)),
    )
    .await?;

let attrs = ObjectBuilder::new()
    .insert(
        "branches",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "new_dash")
                .insert("if", "features.new_dashboard")
                .insert("then", "/v2/dashboard")
                .build_scalar(),
        ]),
    )
    .insert("default", "/v1/dashboard")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("routing"))
    .await?
    .add_command::<ConditionCommand>("dashboard_path", &attrs)
    .await?;
}

Data-Driven Conditions

Use values from earlier pipeline stages (like aggregations):

#![allow(unused)]
fn main() {
// Aggregate order data
let agg_attrs = ObjectBuilder::new()
    .insert("source", "data.load.orders.data")
    .insert("aggregations", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "order_count")
            .insert("op", "count")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "total_revenue")
            .insert("column", "total")
            .insert("op", "sum")
            .build_scalar(),
    ]))
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("metrics"))
    .await?
    .add_command::<AggregateCommand>("orders", &agg_attrs)
    .await?;

// Branch based on metrics
let condition_attrs = ObjectBuilder::new()
    .insert(
        "branches",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "high_volume")
                .insert("if", "metrics.orders.order_count > 1000")
                .insert("then", "HIGH_VOLUME")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "high_revenue")
                .insert("if", "metrics.orders.total_revenue > 50000")
                .insert("then", "HIGH_REVENUE")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "normal")
                .insert("if", "true")  // Always matches as fallback
                .insert("then", "NORMAL")
                .build_scalar(),
        ]),
    )
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("classification"))
    .await?
    .add_command::<ConditionCommand>("tier", &condition_attrs)
    .await?;
}

Using Tera Filters and Functions

The if expression supports full Tera syntax:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "branches",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "long_name")
                .insert("if", "inputs.name | length > 10")
                .insert("then", "Name is long")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "uppercase")
                .insert("if", "inputs.name == inputs.name | upper")
                .insert("then", "Name is all uppercase")
                .build_scalar(),
        ]),
    )
    .insert("default", "Name is normal")
    .build_hashmap();
}

Accessing Results

#![allow(unused)]
fn main() {
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(ResultSettings::default()).await?;

let source = StorePath::from_segments(["greeting", "message"]);
let cmd_results = results.get_by_source(&source).expect("Expected results");

// Main result
let result = cmd_results
    .data_get(&source.with_segment("result"))
    .and_then(|r| r.as_scalar())
    .expect("Expected result");
println!("Result: {}", result.1);

// Did any branch match?
let matched = cmd_results
    .data_get(&source.with_segment("matched"))
    .and_then(|r| r.as_scalar())
    .expect("Expected matched");
println!("Matched: {}", matched.1);

// Which branch matched?
let index = cmd_results
    .data_get(&source.with_segment("branch_index"))
    .and_then(|r| r.as_scalar())
    .expect("Expected branch_index");
println!("Branch index: {}", index.1);  // 0, 1, 2... or -1 for default
}

Common Patterns

Conditional with when Attribute

Combine ConditionCommand with the when attribute to skip the entire command:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert("when", "inputs.feature_enabled")  // Skip if false
    .insert(
        "branches",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "greeting")
                .insert("if", "true")
                .insert("then", "Hello, {{ inputs.user_name }}! Feature is active.")
                .build_scalar(),
        ]),
    )
    .insert("default", "Fallback message")
    .build_hashmap();
}

When when is false:

  • The command status is "skipped"
  • No data results are produced
  • result, matched, and branch_index are absent

Multiple Independent Conditions

Evaluate multiple conditions that don't depend on each other:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "branches",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "is_admin")
                .insert("if", "inputs.role == 'admin'")
                .insert("then", "true")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "is_premium")
                .insert("if", "inputs.subscription == 'premium'")
                .insert("then", "true")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "has_access")
                .insert("if", "inputs.access_granted")
                .insert("then", "true")
                .build_scalar(),
        ]),
    )
    .build_hashmap();

// Access individual branch results:
// condition.check.is_admin.matched
// condition.check.is_premium.matched
// condition.check.has_access.matched
}

Cascading If/Else

Use true as the final condition for an else clause:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "branches",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "premium")
                .insert("if", "metrics.score > 90")
                .insert("then", "PREMIUM")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "standard")
                .insert("if", "metrics.score > 50")
                .insert("then", "STANDARD")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "basic")
                .insert("if", "true")  // Else clause
                .insert("then", "BASIC")
                .build_scalar(),
        ]),
    )
    .build_hashmap();
}

Using Results in Templates

Condition results are stored in the scalar store and can be used in templates:

#![allow(unused)]
fn main() {
// Condition command
let condition_attrs = ObjectBuilder::new()
    .insert("branches", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "status")
            .insert("if", "metrics.health > 80")
            .insert("then", "healthy")
            .build_scalar(),
    ]))
    .insert("default", "degraded")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("check"))
    .await?
    .add_command::<ConditionCommand>("health", &condition_attrs)
    .await?;

// Template using the result
let template_attrs = ObjectBuilder::new()
    .insert("templates", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "report")
            .insert("content", "System status: {{ check.health.result }}")
            .build_scalar(),
    ]))
    .insert("render", "report")
    .insert("output", "/tmp/status.txt")
    .build_hashmap();
}

Error Handling

ConditionCommand will return an error if:

  • A branch is missing required fields (name, if, or then)
  • A Tera expression in if or then cannot be evaluated
  • Referenced variables in expressions do not exist in the scalar store

TemplateCommand

TemplateCommand renders Tera templates and writes the output to a file. It supports template inheritance, includes, and loading templates from files or globs.

When to Use

Use TemplateCommand when you need to:

  • Generate reports, configuration files, or other text output
  • Use template inheritance with base templates and blocks
  • Render dynamic content using data from the pipeline
  • Create multiple output files from templates

Attributes

AttributeTypeRequiredDescription
templatesArray of objectsNoInline template definitions (can be combined with template_glob)
template_globStringNoGlob pattern to load templates from disk (e.g., templates/**/*.tera)
renderStringYesName of the template to render (supports Tera substitution)
outputStringYesFile path to write the rendered output (supports Tera substitution)
captureBooleanNoIf true, store the rendered content in the content result (default: false)

At least one of templates or template_glob must provide the template to render.

Template Object Fields

Each object in the templates array defines one template:

FieldTypeRequiredDescription
nameStringYesName to register the template under
contentStringNoRaw template content (mutually exclusive with file)
fileStringNoPath to template file (mutually exclusive with content)

You must specify either content or file, but not both.

Results

Meta Results

ResultTypeDescription
line_countNumberNumber of lines in the rendered output
sizeNumberSize in bytes of the rendered output

Data Results

ResultTypeDescription
contentStringThe rendered content (only populated when capture is true, otherwise empty)

Examples

Simple Inline Template

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

// Set up data for the template
pipeline
    .add_namespace(
        NamespaceBuilder::new("inputs")
            .static_ns()
            .insert("title", ScalarValue::String("Monthly Report".to_string()))
            .insert("date", ScalarValue::String("2024-01-15".to_string())),
    )
    .await?;

let attrs = ObjectBuilder::new()
    .insert(
        "templates",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "report")
                .insert("content", "# {{ inputs.title }}\n\nGenerated on: {{ inputs.date }}")
                .build_scalar(),
        ]),
    )
    .insert("render", "report")
    .insert("output", "/tmp/report.md")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("generate"))
    .await?
    .add_command::<TemplateCommand>("report", &attrs)
    .await?;
}

Template Inheritance with Glob Loading

Load templates from disk using a glob pattern:

#![allow(unused)]
fn main() {
// Directory structure:
// templates/
//   base.tera       - {% block content %}{% endblock %}
//   header.tera     - Navigation HTML
//   page.tera       - {% extends "base.tera" %}{% block content %}...{% endblock %}

let attrs = ObjectBuilder::new()
    .insert("template_glob", "templates/**/*.tera")
    .insert("render", "page.tera")
    .insert("output", "/tmp/page.html")
    .insert("capture", true)
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("render"))
    .await?
    .add_command::<TemplateCommand>("page", &attrs)
    .await?;
}

Template Inheritance with Inline Templates

Define a base template and a child template inline:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "templates",
        ScalarValue::Array(vec![
            // Base template with blocks
            ObjectBuilder::new()
                .insert("name", "base")
                .insert("content", r#"<!DOCTYPE html>
<html>
<head><title>{% block title %}Default Title{% endblock %}</title></head>
<body>
{% block content %}{% endblock %}
</body>
</html>"#)
                .build_scalar(),
            // Child template that extends base
            ObjectBuilder::new()
                .insert("name", "page")
                .insert("content", r#"{% extends "base" %}
{% block title %}{{ inputs.page_title }}{% endblock %}
{% block content %}
<h1>{{ inputs.page_title }}</h1>
<p>{{ inputs.page_content }}</p>
{% endblock %}"#)
                .build_scalar(),
        ]),
    )
    .insert("render", "page")
    .insert("output", "/tmp/page.html")
    .build_hashmap();
}

Loading Templates from Files

Reference template files instead of inline content:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "templates",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "base")
                .insert("file", "templates/base.tera")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "page")
                .insert("file", "templates/page.tera")
                .build_scalar(),
        ]),
    )
    .insert("render", "page")
    .insert("output", "/tmp/output.html")
    .build_hashmap();
}

Using Pipeline Data in Templates

Templates have access to all scalar values in the store:

#![allow(unused)]
fn main() {
// Load and aggregate data
let agg_attrs = ObjectBuilder::new()
    .insert("source", "data.load.products.data")
    .insert("aggregations", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "total_count")
            .insert("op", "count")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "total_value")
            .insert("column", "price")
            .insert("op", "sum")
            .build_scalar(),
    ]))
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("stats"))
    .await?
    .add_command::<AggregateCommand>("summary", &agg_attrs)
    .await?;

// Use aggregation results in template
let template_attrs = ObjectBuilder::new()
    .insert(
        "templates",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "summary")
                .insert("content", r#"Product Summary
===============
Total products: {{ stats.summary.total_count }}
Total value: ${{ stats.summary.total_value }}

{% if stats.summary.total_count > 100 %}
Note: High product count!
{% endif %}"#)
                .build_scalar(),
        ]),
    )
    .insert("render", "summary")
    .insert("output", "/tmp/summary.txt")
    .insert("capture", true)
    .build_hashmap();
}

Dynamic Output Path

Use Tera substitution for the output path:

#![allow(unused)]
fn main() {
pipeline
    .add_namespace(
        NamespaceBuilder::new("config")
            .static_ns()
            .insert("output_dir", ScalarValue::String("/var/reports".to_string()))
            .insert("report_name", ScalarValue::String("monthly".to_string())),
    )
    .await?;

let attrs = ObjectBuilder::new()
    .insert("templates", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "report")
            .insert("content", "Report content here...")
            .build_scalar(),
    ]))
    .insert("render", "report")
    .insert("output", "{{ config.output_dir }}/{{ config.report_name }}.txt")
    .build_hashmap();
}

Accessing Results

#![allow(unused)]
fn main() {
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(ResultSettings::default()).await?;

let source = StorePath::from_segments(["render", "page"]);
let cmd_results = results.get_by_source(&source).expect("Expected results");

// Meta results
let size = cmd_results
    .meta_get(&source.with_segment("size"))
    .expect("Expected size");
let lines = cmd_results
    .meta_get(&source.with_segment("line_count"))
    .expect("Expected line_count");

println!("Rendered {} bytes, {} lines", size, lines);

// Content (only if capture=true)
if let Some(content) = cmd_results
    .data_get(&source.with_segment("content"))
    .and_then(|r| r.as_scalar())
{
    println!("Content: {}", content.1);
}
}

Common Patterns

Combining Glob and Inline Templates

Load base templates from disk, add custom templates inline:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert("template_glob", "templates/**/*.tera")  // Load from disk
    .insert(
        "templates",
        ScalarValue::Array(vec![
            // Add or override templates
            ObjectBuilder::new()
                .insert("name", "custom_page")
                .insert("content", r#"{% extends "base.tera" %}
{% block content %}Custom content here{% endblock %}"#)
                .build_scalar(),
        ]),
    )
    .insert("render", "custom_page")
    .insert("output", "/tmp/custom.html")
    .build_hashmap();
}

Iterating Over Data with Tera

Use Tera's for loop to iterate over arrays:

#![allow(unused)]
fn main() {
pipeline
    .add_namespace(
        NamespaceBuilder::new("inputs")
            .static_ns()
            .insert(
                "items",
                ScalarValue::Array(vec![
                    ObjectBuilder::new()
                        .insert("name", "Item 1")
                        .insert("price", 10.0)
                        .build_scalar(),
                    ObjectBuilder::new()
                        .insert("name", "Item 2")
                        .insert("price", 20.0)
                        .build_scalar(),
                ]),
            ),
    )
    .await?;

let attrs = ObjectBuilder::new()
    .insert("templates", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "list")
            .insert("content", r#"Items:
{% for item in inputs.items %}
- {{ item.name }}: ${{ item.price }}
{% endfor %}"#)
            .build_scalar(),
    ]))
    .insert("render", "list")
    .insert("output", "/tmp/items.txt")
    .build_hashmap();
}

Using Tera Includes

Include templates within other templates:

#![allow(unused)]
fn main() {
let attrs = ObjectBuilder::new()
    .insert(
        "templates",
        ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "header")
                .insert("content", "<header>{{ inputs.site_name }}</header>")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "footer")
                .insert("content", "<footer>Copyright 2024</footer>")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "page")
                .insert("content", r#"{% include "header" %}
<main>Page content</main>
{% include "footer" %}"#)
                .build_scalar(),
        ]),
    )
    .insert("render", "page")
    .insert("output", "/tmp/page.html")
    .build_hashmap();
}

Using Condition Results in Templates

Reference ConditionCommand results:

#![allow(unused)]
fn main() {
// Condition first
let condition_attrs = ObjectBuilder::new()
    .insert("branches", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "status")
            .insert("if", "metrics.health > 80")
            .insert("then", "healthy")
            .build_scalar(),
    ]))
    .insert("default", "degraded")
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("check"))
    .await?
    .add_command::<ConditionCommand>("health", &condition_attrs)
    .await?;

// Template using condition result
let template_attrs = ObjectBuilder::new()
    .insert("templates", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "status")
            .insert("content", r#"System Status: {{ check.health.result }}
{% if check.health.matched %}
(condition matched at branch {{ check.health.branch_index }})
{% else %}
(using default value)
{% endif %}"#)
            .build_scalar(),
    ]))
    .insert("render", "status")
    .insert("output", "/tmp/status.txt")
    .build_hashmap();
}

Tera Features

TemplateCommand uses the Tera templating engine, which supports:

  • Variable substitution: {{ variable }}
  • Conditionals: {% if condition %}...{% endif %}
  • Loops: {% for item in items %}...{% endfor %}
  • Template inheritance: {% extends "base" %} and {% block name %}...{% endblock %}
  • Includes: {% include "partial" %}
  • Filters: {{ value | upper }}, {{ value | length }}, etc.
  • Built-in functions and operators

See the Tera documentation for complete syntax reference.

Error Handling

TemplateCommand will return an error if:

  • A template file specified in file does not exist
  • Both content and file are specified for a template (mutually exclusive)
  • Neither content nor file is specified for a template
  • The template_glob pattern is invalid or cannot be read
  • The template specified in render does not exist
  • Template syntax errors (unclosed blocks, invalid expressions)
  • Referenced variables do not exist in the scalar store
  • The output directory cannot be created
  • The output file cannot be written

Directory Creation

TemplateCommand automatically creates parent directories for the output file if they do not exist.

Working with Data

This section covers how data flows through Panopticon pipelines and the tools available for manipulating and accessing that data.

Data Flow Overview

In Panopticon, data flows between commands through two parallel storage systems:

+------------------+     +------------------+     +------------------+
|    Command A     |     |    Command B     |     |    Command C     |
+------------------+     +------------------+     +------------------+
         |                        |                        |
         v                        v                        v
+------------------------------------------------------------------------+
|                         ExecutionContext                               |
|  +----------------------------+  +----------------------------+        |
|  |       ScalarStore          |  |      TabularStore          |        |
|  |  (JSON-like values)        |  |  (Polars DataFrames)       |        |
|  +----------------------------+  +----------------------------+        |
+------------------------------------------------------------------------+
         ^                        ^                        ^
         |                        |                        |
   StorePath refs           StorePath refs           StorePath refs

The Two Stores

Panopticon maintains two separate data stores during pipeline execution:

StoreType AliasContentsUse Case
ScalarStoreScalarValueJSON-compatible values (strings, numbers, booleans, arrays, objects)Configuration, metadata, single values, template variables
TabularStoreTabularValuePolars DataFramesStructured data, CSV/JSON/Parquet files, SQL query results

StorePath: The Universal Reference

All data in both stores is addressed using StorePath - a dot-separated path that uniquely identifies values:

#![allow(unused)]
fn main() {
// Creating paths
let path = StorePath::from_segments(["namespace", "command", "field"]);
let child = path.with_segment("subfield");    // namespace.command.field.subfield
let indexed = path.with_index(0);             // namespace.command.field.0
}

StorePaths serve as:

  • Storage keys: Where commands write their outputs
  • Dependency references: How commands declare what data they need
  • Template variables: Accessed via {{ namespace.command.field }} syntax
  • Result accessors: How you retrieve data after pipeline execution

Data Flow Example

Consider a pipeline that loads a CSV file and performs aggregations:

Pipeline Definition:
====================

Namespace: data                    Namespace: stats
+------------------+              +------------------+
| FileCommand      |              | AggregateCommand |
| name: "load"     | -----------> | source: "data.   |
|                  |              |   load.products. |
|                  |              |   data"          |
+------------------+              +------------------+

Data Flow:
==========

1. FileCommand executes:
   - Reads products.csv
   - Stores DataFrame at: data.load.products.data
   - Stores row count at: data.load.products.row_count

2. AggregateCommand executes:
   - Retrieves DataFrame from: data.load.products.data
   - Computes aggregations
   - Stores results at: stats.products.*

Chapter Overview

This section contains three detailed chapters:

Store Paths

Learn the StorePath API for creating, manipulating, and navigating data paths:

  • from_segments() - Build paths from components
  • with_segment() - Extend paths with new segments
  • with_index() - Add numeric indices for iteration

Tera Templating

Master the Tera templating syntax used throughout Panopticon:

  • Variable interpolation with {{ path.to.value }}
  • Filters for transforming values
  • Control structures for conditional content
  • Template inheritance for complex outputs

Polars DataFrames

Work with tabular data using Polars:

  • Understanding TabularValue (the DataFrame type alias)
  • Accessing DataFrame results from the TabularStore
  • Export formats: CSV, JSON, Parquet

Quick Reference

Accessing Data During Execution

Commands receive an ExecutionContext that provides access to both stores:

#![allow(unused)]
fn main() {
// In a command's execute method:
async fn execute(&self, ctx: &ExecutionContext, source: &StorePath) -> Result<()> {
    // Read scalar values
    let value = ctx.scalar().get(&some_path).await?;

    // Read tabular values
    let df = ctx.tabular().get(&some_path).await?;

    // Template substitution (uses ScalarStore)
    let rendered = ctx.substitute("Hello, {{ user.name }}!").await?;

    // Write results
    ctx.scalar().insert(&source.with_segment("output"), my_value).await?;
    ctx.tabular().insert(&source.with_segment("data"), my_df).await?;

    Ok(())
}
}

Accessing Data After Execution

After a pipeline completes, use ResultStore to access outputs:

#![allow(unused)]
fn main() {
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(ResultSettings::default()).await?;

// Get results for a specific command
let source = StorePath::from_segments(["namespace", "command"]);
if let Some(cmd_results) = results.get_by_source(&source) {
    // Access scalar results
    if let Some(value) = cmd_results.data_get(&source.with_segment("field")) {
        if let Some((ty, scalar)) = value.as_scalar() {
            println!("Scalar: {:?} = {}", ty, scalar);
        }
    }

    // Access tabular results (exported to disk)
    if let Some(value) = cmd_results.data_get(&source.with_segment("data")) {
        if let Some((path, format, rows, cols)) = value.as_tabular() {
            println!("Table: {} ({} rows x {} cols)", path.display(), rows, cols);
        }
    }
}
}

Next Steps

Continue to Store Paths to learn the details of the StorePath API.

Store Paths

StorePath is the fundamental addressing mechanism in Panopticon. Every piece of data - whether scalar values or tabular DataFrames - is stored and retrieved using a StorePath.

What is a StorePath?

A StorePath is a sequence of string segments that form a hierarchical path, similar to filesystem paths but using dots as separators:

namespace.command.field.subfield
    ^        ^      ^      ^
    |        |      |      +-- Nested field
    |        |      +--------- Result field
    |        +---------------- Command name
    +------------------------- Namespace name

The path data.load.products.row_count represents:

  • Namespace: data
  • Command: load
  • Field: products
  • Subfield: row_count

Creating StorePaths

from_segments()

Build a StorePath from an iterator of segments:

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

// From a slice of string literals
let path = StorePath::from_segments(["namespace", "command", "field"]);
assert_eq!(path.to_dotted(), "namespace.command.field");

// From a Vec
let segments = vec!["data", "load", "products"];
let path = StorePath::from_segments(segments);
assert_eq!(path.to_dotted(), "data.load.products");

// From any iterator of Into<String>
let path = StorePath::from_segments(["a", "b", "c"].into_iter());
}

from_dotted()

Parse a dotted string into a StorePath:

#![allow(unused)]
fn main() {
let path = StorePath::from_dotted("config.regions.us-east");
assert_eq!(path.segments(), &["config", "regions", "us-east"]);
}

Extending StorePaths

StorePaths are immutable by default. Extension methods return new paths:

with_segment()

Add a named segment to create a child path:

#![allow(unused)]
fn main() {
let base = StorePath::from_segments(["namespace", "command"]);

// Add a field
let field = base.with_segment("output");
assert_eq!(field.to_dotted(), "namespace.command.output");

// Chain multiple segments
let nested = base
    .with_segment("results")
    .with_segment("summary");
assert_eq!(nested.to_dotted(), "namespace.command.results.summary");
}

with_index()

Add a numeric index segment (useful for iteration):

#![allow(unused)]
fn main() {
let base = StorePath::from_segments(["classify", "region"]);

// First iteration
let iter0 = base.with_index(0);
assert_eq!(iter0.to_dotted(), "classify.region.0");

// Access a field within an iteration
let result = iter0.with_segment("result");
assert_eq!(result.to_dotted(), "classify.region.0.result");
}

add_segment() (Mutable)

Mutate a path in place:

#![allow(unused)]
fn main() {
let mut path = StorePath::from_segments(["namespace"]);
path.add_segment("command");
path.add_segment("field");
assert_eq!(path.to_dotted(), "namespace.command.field");
}

Inspecting StorePaths

segments()

Get the path segments as a slice:

#![allow(unused)]
fn main() {
let path = StorePath::from_segments(["a", "b", "c"]);
assert_eq!(path.segments(), &["a", "b", "c"]);
}

namespace()

Get the first segment (typically the namespace name):

#![allow(unused)]
fn main() {
let path = StorePath::from_segments(["data", "load", "file"]);
assert_eq!(path.namespace(), Some(&"data".to_string()));

let empty = StorePath::default();
assert_eq!(empty.namespace(), None);
}

to_dotted()

Convert to a dot-separated string:

#![allow(unused)]
fn main() {
let path = StorePath::from_segments(["config", "database", "host"]);
assert_eq!(path.to_dotted(), "config.database.host");
}

starts_with()

Check if a path is a prefix of another:

#![allow(unused)]
fn main() {
let parent = StorePath::from_segments(["data", "load"]);
let child = StorePath::from_segments(["data", "load", "products", "data"]);
let other = StorePath::from_segments(["stats", "aggregate"]);

assert!(child.starts_with(&parent));
assert!(!other.starts_with(&parent));
}

contains()

Check if any segment matches a value:

#![allow(unused)]
fn main() {
let path = StorePath::from_segments(["data", "load", "products"]);
assert!(path.contains("load"));
assert!(!path.contains("stats"));
}

Practical Examples

Example: Iterating Over Object Keys

This example from iterate_object_keys.rs shows how StorePaths work with iterative namespaces:

use panopticon_core::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut pipeline = Pipeline::new();

    // Static namespace with region data
    pipeline.add_namespace(
        NamespaceBuilder::new("config")
            .static_ns()
            .insert("regions", ObjectBuilder::new()
                .insert("us-east", "Virginia")
                .insert("us-west", "Oregon")
                .insert("eu-west", "Ireland")
                .build_scalar()
            ),
    ).await?;

    // Iterative namespace that loops over region keys
    let mut handle = pipeline.add_namespace(
        NamespaceBuilder::new("classify")
            .iterative()
            .store_path(StorePath::from_segments(["config", "regions"]))
            .scalar_object_keys(None, false)
            .iter_var("region")
            .index_var("idx"),
    ).await?;

    // Add condition command for each iteration
    handle.add_command::<ConditionCommand>("region", &condition_attrs).await?;

    // Execute pipeline
    let completed = pipeline.compile().await?.execute().await?;
    let results = completed.results(ResultSettings::default()).await?;

    // Access results per iteration using indexed StorePaths
    let mut idx = 0;
    loop {
        // Build path with iteration index
        let source = StorePath::from_segments(["classify", "region"])
            .with_index(idx);

        // Try to get results for this iteration
        let Some(cmd_results) = results.get_by_source(&source) else {
            break; // No more iterations
        };

        // Access specific fields
        let result = cmd_results
            .data_get(&source.with_segment("result"))
            .and_then(|r| r.as_scalar())
            .expect("Expected result");

        let matched = cmd_results
            .data_get(&source.with_segment("matched"))
            .and_then(|r| r.as_scalar())
            .expect("Expected matched");

        println!("[{}] {} (matched: {})", idx, result.1, matched.1);
        idx += 1;
    }

    Ok(())
}

Example: Accessing Aggregation Results

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

// After pipeline execution...
let results = completed.results(ResultSettings::default()).await?;

// Build path to the stats command
let stats_source = StorePath::from_segments(["stats", "products"]);

if let Some(cmd_results) = results.get_by_source(&stats_source) {
    // Access individual aggregation results
    let fields = ["row_count", "total_price", "avg_price", "max_quantity"];

    for field in fields {
        let field_path = stats_source.with_segment(field);
        if let Some(value) = cmd_results.data_get(&field_path) {
            if let Some((ty, scalar)) = value.as_scalar() {
                println!("{}: {} ({:?})", field, scalar, ty);
            }
        }
    }
}
}

StorePath in Templates

StorePaths directly correspond to Tera template variables. The path config.database.host is accessed in templates as:

{{ config.database.host }}

See Tera Templating for more details on template syntax.

StorePath Diagram

StorePath Structure:
====================

StorePath::from_segments(["data", "load", "products", "row_count"])
                            |      |        |           |
                            v      v        v           v
                         +------+------+---------+-----------+
    segments: Vec<String>|"data"|"load"|"products"|"row_count"|
                         +------+------+---------+-----------+
                            0      1        2           3

    to_dotted() -> "data.load.products.row_count"
    namespace() -> Some("data")
    segments()  -> &["data", "load", "products", "row_count"]


Path Operations:
================

    base = ["data", "load"]
              |
              +-- with_segment("file")    -> ["data", "load", "file"]
              |
              +-- with_index(0)           -> ["data", "load", "0"]
              |
              +-- with_segment("products")
                    |
                    +-- with_segment("data") -> ["data", "load", "products", "data"]

Best Practices

  1. Use meaningful segment names: Paths should be self-documenting

    #![allow(unused)]
    fn main() {
    // Good
    StorePath::from_segments(["users", "fetch", "active_count"])
    
    // Less clear
    StorePath::from_segments(["u", "f", "ac"])
    }
  2. Build paths incrementally: Use with_segment() to build on base paths

    #![allow(unused)]
    fn main() {
    let base = StorePath::from_segments(["namespace", "command"]);
    let output = base.with_segment("output");
    let data = base.with_segment("data");
    }
  3. Use with_index() for iteration: Makes iteration patterns clear

    #![allow(unused)]
    fn main() {
    for i in 0..count {
        let iter_path = base.with_index(i);
        // Process iteration...
    }
    }
  4. Store base paths as constants: Avoid typos in repeated path construction

    #![allow(unused)]
    fn main() {
    const DATA_NS: &[&str] = &["data", "load"];
    let base = StorePath::from_segments(DATA_NS.iter().copied());
    }

Next Steps

Continue to Tera Templating to learn how StorePaths integrate with template syntax.

Tera Templating

Panopticon uses Tera as its templating engine. Tera provides a powerful, Jinja2-inspired syntax for variable interpolation, filters, and control structures.

How Templates Connect to Data

The ScalarStore holds all scalar values (strings, numbers, booleans, arrays, objects) and serves as the template context. Any value stored via a StorePath becomes available in templates using dot notation:

ScalarStore Contents:               Template Access:
=====================               ================

inputs.site_name = "My Site"    ->  {{ inputs.site_name }}
inputs.page_title = "Home"      ->  {{ inputs.page_title }}
config.debug = true             ->  {{ config.debug }}
data.load.row_count = 42        ->  {{ data.load.row_count }}

Basic Variable Interpolation

Simple Values

Access scalar values using double curly braces:

<h1>{{ inputs.site_name }}</h1>
<p>Processing {{ data.load.row_count }} records</p>

Nested Objects

Navigate into nested structures with dot notation:

#![allow(unused)]
fn main() {
// Stored as:
ObjectBuilder::new()
    .object("database", ObjectBuilder::new()
        .insert("host", "localhost")
        .insert("port", 5432))
    .build_scalar()
}
<!-- Template -->
Connecting to {{ config.database.host }}:{{ config.database.port }}

Array Access

Access array elements by index:

First item: {{ items.0 }}
Third item: {{ items.2 }}

Filters

Filters transform values using the pipe (|) syntax:

Built-in Filters

<!-- String manipulation -->
{{ name | upper }}              <!-- JOHN -->
{{ name | lower }}              <!-- john -->
{{ name | capitalize }}         <!-- John -->
{{ name | title }}              <!-- John Doe -->
{{ text | trim }}               <!-- removes whitespace -->
{{ text | truncate(length=20) }}

<!-- Number formatting -->
{{ price | round }}             <!-- 10 -->
{{ price | round(precision=2) }}<!-- 9.99 -->

<!-- Collections -->
{{ items | length }}            <!-- array length -->
{{ items | first }}             <!-- first element -->
{{ items | last }}              <!-- last element -->
{{ items | reverse }}           <!-- reversed array -->
{{ items | join(sep=", ") }}    <!-- comma-separated -->

<!-- Default values -->
{{ maybe_null | default(value="N/A") }}

<!-- JSON encoding -->
{{ object | json_encode() }}
{{ object | json_encode(pretty=true) }}

<!-- Escaping -->
{{ html_content | safe }}       <!-- no escaping -->
{{ user_input | escape }}       <!-- HTML escape -->

Filter Chaining

Chain multiple filters together:

{{ name | trim | upper | truncate(length=10) }}

Control Structures

Conditionals

{% if config.debug %}
    <div class="debug-panel">Debug mode enabled</div>
{% endif %}

{% if user.role == "admin" %}
    <a href="/admin">Admin Panel</a>
{% elif user.role == "editor" %}
    <a href="/edit">Edit Content</a>
{% else %}
    <span>Welcome, guest</span>
{% endif %}

Loops

Iterate over arrays:

<ul>
{% for item in inputs.nav_items %}
    <li><a href="{{ item.url }}">{{ item.label }}</a></li>
{% endfor %}
</ul>

Loop variables:

{% for item in items %}
    {{ loop.index }}      <!-- 1-indexed position -->
    {{ loop.index0 }}     <!-- 0-indexed position -->
    {{ loop.first }}      <!-- true if first iteration -->
    {{ loop.last }}       <!-- true if last iteration -->
{% endfor %}

Iterate over object key-value pairs:

{% for key, value in config.settings %}
    {{ key }}: {{ value }}
{% endfor %}

Template Inheritance

Tera supports template inheritance for building complex layouts:

Base Template (base.tera)

<!DOCTYPE html>
<html>
<head>
    <title>{% block title %}Default Title{% endblock %}</title>
</head>
<body>
{% block header %}{% endblock %}
<main>
{% block content %}{% endblock %}
</main>
<footer>
    <p>Generated by Panopticon</p>
</footer>
</body>
</html>

Child Template (page.tera)

{% extends "base.tera" %}

{% block title %}{{ inputs.page_title }} - {{ inputs.site_name }}{% endblock %}

{% block header %}
{% include "header.tera" %}
{% endblock %}

{% block content %}
<article>
    <h2>{{ inputs.page_title }}</h2>
    <p>{{ inputs.page_content }}</p>
</article>
{% endblock %}

Include Template (header.tera)

<header>
    <h1>{{ inputs.site_name }}</h1>
    <nav>
        {% for item in inputs.nav_items %}
        <a href="{{ item.url }}">{{ item.label }}</a>
        {% endfor %}
    </nav>
</header>

Using Templates in Panopticon

TemplateCommand

The TemplateCommand renders Tera templates:

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

let mut pipeline = Pipeline::new();

// Add input data
pipeline.add_namespace(
    NamespaceBuilder::new("inputs")
        .static_ns()
        .insert("site_name", ScalarValue::String("Panopticon Demo".into()))
        .insert("page_title", ScalarValue::String("Getting Started".into()))
        .insert("page_content", ScalarValue::String("Welcome!".into()))
        .insert("nav_items", ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("label", "Home")
                .insert("url", "/")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("label", "Docs")
                .insert("url", "/docs")
                .build_scalar(),
        ])),
).await?;

// Configure template command
let template_attrs = ObjectBuilder::new()
    .insert("template_glob", "/path/to/templates/**/*.tera")
    .insert("render", "page.tera")
    .insert("output", "/output/page.html")
    .insert("capture", true)  // Also store rendered content in ScalarStore
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("render"))
    .await?
    .add_command::<TemplateCommand>("page", &template_attrs)
    .await?;
}

Inline Template Substitution

Use ctx.substitute() for inline template rendering:

#![allow(unused)]
fn main() {
// In command execution
let greeting = ctx.substitute("Hello, {{ user.name }}!").await?;
}

Condition Expressions

The ConditionCommand uses Tera expressions for branching:

#![allow(unused)]
fn main() {
let condition_attrs = ObjectBuilder::new()
    .insert("branches", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "is_us")
            .insert("if", "region is starting_with(\"us-\")")
            .insert("then", "Region {{ region }} is in the US")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "is_eu")
            .insert("if", "region is starting_with(\"eu-\")")
            .insert("then", "Region {{ region }} is in the EU")
            .build_scalar(),
    ]))
    .insert("default", "Region {{ region }} is in an unknown area")
    .build_hashmap();
}

Tera Tests

Tera "tests" check conditions on values (used with is keyword):

{% if value is defined %}...{% endif %}
{% if value is undefined %}...{% endif %}
{% if value is odd %}...{% endif %}
{% if value is even %}...{% endif %}
{% if text is containing("needle") %}...{% endif %}
{% if text is starting_with("prefix") %}...{% endif %}
{% if text is ending_with("suffix") %}...{% endif %}
{% if text is matching("regex") %}...{% endif %}

Data Flow Diagram

Template Rendering Flow:
========================

+-------------------+
|   ScalarStore     |
|                   |
| inputs.site_name  |
| inputs.page_title |
| inputs.nav_items  |
| config.debug      |
+--------+----------+
         |
         v
+-------------------+
|   Tera Context    |  <- ScalarStore becomes the template context
+--------+----------+
         |
         v
+-------------------+
|  Template File    |
|                   |
| {{ inputs.xxx }}  |
| {% for item %}    |
| {% if config %}   |
+--------+----------+
         |
         v
+-------------------+
|  Rendered Output  |
|                   |
| <h1>My Site</h1>  |
| <p>Welcome!</p>   |
+-------------------+

Common Patterns

Conditional CSS Classes

<div class="alert {% if level == "error" %}alert-danger{% elif level == "warning" %}alert-warning{% else %}alert-info{% endif %}">
    {{ message }}
</div>

Building URLs with Parameters

<a href="/users/{{ user.id }}?tab={{ tab | default(value="overview") }}">
    View Profile
</a>

JSON Data Embedding

<script>
    const config = {{ config | json_encode(pretty=true) | safe }};
</script>

Iteration with Separators

{% for tag in tags %}{{ tag }}{% if not loop.last %}, {% endif %}{% endfor %}

Troubleshooting

Common Errors

ErrorCauseSolution
Variable not foundPath doesn't exist in ScalarStoreCheck StorePath and ensure data is stored before template renders
Cannot iterate overValue is not an arrayVerify the value type with {% if items is iterable %}
Filter not foundTypo in filter nameCheck Tera documentation for correct filter names

Debugging Tips

  1. Check available data: Use {{ __tera_context }} to dump all available variables (if enabled)

  2. Use default values: Prevent errors from missing data

    {{ maybe_missing | default(value="fallback") }}
    
  3. Test variable existence:

    {% if my_var is defined %}
        {{ my_var }}
    {% else %}
        Variable not set
    {% endif %}
    

Next Steps

Continue to Polars DataFrames to learn about working with tabular data.

Polars DataFrames

Panopticon uses Polars for tabular data operations. Polars is a high-performance DataFrame library written in Rust, providing excellent performance for data manipulation tasks.

TabularValue and TabularStore

Type Definitions

#![allow(unused)]
fn main() {
// TabularValue is a type alias for Polars DataFrame
pub type TabularValue = polars::prelude::DataFrame;

// TabularStore manages DataFrames during pipeline execution
pub struct TabularStore {
    store: Arc<RwLock<HashMap<String, TabularValue>>>,
}
}

Store Operations

The TabularStore provides async methods for managing DataFrames:

#![allow(unused)]
fn main() {
// Insert a DataFrame
ctx.tabular().insert(&path, dataframe).await?;

// Retrieve a DataFrame
let df: Option<TabularValue> = ctx.tabular().get(&path).await?;

// Remove a DataFrame
let removed: Option<TabularValue> = ctx.tabular().remove(&path).await?;

// List all stored paths
let keys: Vec<String> = ctx.tabular().keys().await;
}

Loading Tabular Data

FileCommand

Load data from CSV, JSON, or Parquet files:

#![allow(unused)]
fn main() {
let file_attrs = ObjectBuilder::new()
    .insert("files", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "products")
            .insert("file", "/path/to/products.csv")
            .insert("format", "csv")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "orders")
            .insert("file", "/path/to/orders.parquet")
            .insert("format", "parquet")
            .build_scalar(),
    ]))
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("data"))
    .await?
    .add_command::<FileCommand>("load", &file_attrs)
    .await?;
}

After execution, DataFrames are stored at paths like:

  • data.load.products.data - The DataFrame
  • data.load.products.row_count - Number of rows (scalar)
  • data.load.orders.data - Another DataFrame

SqlCommand

Query data using SQL:

#![allow(unused)]
fn main() {
let sql_attrs = ObjectBuilder::new()
    .insert("query", "SELECT * FROM products WHERE price > 100")
    .insert("sources", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "products")
            .insert("path", "data.load.products.data")
            .build_scalar(),
    ]))
    .build_hashmap();
}

Aggregating Data

AggregateCommand

Perform statistical aggregations on DataFrames:

#![allow(unused)]
fn main() {
let agg_attrs = ObjectBuilder::new()
    .insert("source", "data.load.products.data")
    .insert("aggregations", ScalarValue::Array(vec![
        ObjectBuilder::new()
            .insert("name", "row_count")
            .insert("op", "count")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "total_price")
            .insert("column", "price")
            .insert("op", "sum")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "avg_price")
            .insert("column", "price")
            .insert("op", "mean")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "max_quantity")
            .insert("column", "quantity")
            .insert("op", "max")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "min_quantity")
            .insert("column", "quantity")
            .insert("op", "min")
            .build_scalar(),
        ObjectBuilder::new()
            .insert("name", "median_price")
            .insert("column", "price")
            .insert("op", "median")
            .build_scalar(),
    ]))
    .build_hashmap();

pipeline
    .add_namespace(NamespaceBuilder::new("stats"))
    .await?
    .add_command::<AggregateCommand>("products", &agg_attrs)
    .await?;
}

Supported aggregation operations:

  • count - Row count (no column required)
  • sum - Sum of column values
  • mean - Average of column values
  • min - Minimum value
  • max - Maximum value
  • median - Median value

Accessing DataFrame Results

During Execution

Commands can access DataFrames from the execution context:

#![allow(unused)]
fn main() {
async fn execute(&self, ctx: &ExecutionContext, source: &StorePath) -> Result<()> {
    // Get DataFrame from TabularStore
    let df_path = StorePath::from_segments(["data", "load", "products", "data"]);
    let df = ctx.tabular()
        .get(&df_path)
        .await?
        .context("DataFrame not found")?;

    // Work with the DataFrame
    println!("Columns: {:?}", df.get_column_names());
    println!("Shape: {:?}", df.shape());

    Ok(())
}
}

After Execution

The ResultStore provides access to both scalar and tabular results:

#![allow(unused)]
fn main() {
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(ResultSettings::default()).await?;

for cmd_results in results.iter() {
    println!("Source: {}", cmd_results.source().to_dotted());

    // Iterate over metadata
    for (path, value) in cmd_results.meta_iter() {
        println!("  [meta] {} = {}", path.to_dotted(), value);
    }

    // Iterate over data results
    for (path, value) in cmd_results.data_iter() {
        match value.as_scalar() {
            Some((ty, val)) => {
                println!("  [data] {} = {} ({:?})", path.to_dotted(), val, ty);
            }
            None => {
                // Tabular results are exported to disk
                let (file_path, format, rows, cols) =
                    value.as_tabular().expect("Expected tabular result");
                println!(
                    "  [data] {} => {} ({} rows x {} cols)",
                    path.to_dotted(),
                    file_path.display(),
                    rows,
                    cols
                );
            }
        }
    }
}
}

Export Formats

DataFrames are automatically exported to disk when accessing results. Configure the format via ResultSettings:

#![allow(unused)]
fn main() {
// Export as CSV
let settings = ResultSettings::new()
    .with_format(TabularFormat::Csv)
    .with_output_path(PathBuf::from("/output/directory"));

// Export as JSON
let settings = ResultSettings::new()
    .with_format(TabularFormat::Json);

// Export as Parquet (default)
let settings = ResultSettings::new()
    .with_format(TabularFormat::Parquet);

let results = completed.results(settings).await?;
}

TabularFormat Options

FormatExtensionUse Case
TabularFormat::Csv.csvHuman-readable, spreadsheet compatible
TabularFormat::Json.jsonWeb applications, APIs
TabularFormat::Parquet.parquetEfficient storage, data pipelines

Data Flow Diagram

Tabular Data Flow:
==================

+------------------+
|  Input Files     |
|  - products.csv  |
|  - orders.json   |
+--------+---------+
         |
         v
+------------------+
|   FileCommand    |
+--------+---------+
         |
         v
+------------------+
|  TabularStore    |
|                  |
| data.load.       |
|   products.data  |  <-- DataFrame
|   orders.data    |  <-- DataFrame
+--------+---------+
         |
    +----+----+
    |         |
    v         v
+-------+  +------------+
| SQL   |  | Aggregate  |
+---+---+  +-----+------+
    |            |
    v            v
+------------------+
|  TabularStore    |
| (updated)        |
+--------+---------+
         |
         v
+------------------+
|   ResultStore    |
+--------+---------+
         |
         v
+------------------+
|  Output Files    |
|  - .csv          |
|  - .json         |
|  - .parquet      |
+------------------+

Working with Polars Directly

When building custom commands, you can use Polars DataFrame operations:

#![allow(unused)]
fn main() {
use polars::prelude::*;

// Create a DataFrame
let df = df!(
    "name" => &["Alice", "Bob", "Charlie"],
    "age" => &[30, 25, 35],
    "city" => &["NYC", "LA", "Chicago"]
)?;

// Filter rows
let filtered = df.clone().lazy()
    .filter(col("age").gt(lit(28)))
    .collect()?;

// Select columns
let selected = df.clone().lazy()
    .select([col("name"), col("city")])
    .collect()?;

// Group and aggregate
let grouped = df.clone().lazy()
    .group_by([col("city")])
    .agg([
        col("age").mean().alias("avg_age"),
        col("name").count().alias("count"),
    ])
    .collect()?;

// Store in TabularStore
ctx.tabular().insert(&source.with_segment("result"), filtered).await?;
}

Complete Example

use panopticon_core::prelude::*;
use std::path::PathBuf;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let output_dir = tempfile::tempdir()?;
    let mut pipeline = Pipeline::new();

    // --- Load product data ---
    let file_attrs = ObjectBuilder::new()
        .insert("files", ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "products")
                .insert("file", "/path/to/products.csv")
                .insert("format", "csv")
                .build_scalar(),
        ]))
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("data"))
        .await?
        .add_command::<FileCommand>("load", &file_attrs)
        .await?;

    // --- Aggregate: compute statistics ---
    let agg_attrs = ObjectBuilder::new()
        .insert("source", "data.load.products.data")
        .insert("aggregations", ScalarValue::Array(vec![
            ObjectBuilder::new()
                .insert("name", "row_count")
                .insert("op", "count")
                .build_scalar(),
            ObjectBuilder::new()
                .insert("name", "total_price")
                .insert("column", "price")
                .insert("op", "sum")
                .build_scalar(),
        ]))
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("stats"))
        .await?
        .add_command::<AggregateCommand>("products", &agg_attrs)
        .await?;

    // --- Execute with custom output path ---
    let completed = pipeline.compile().await?.execute().await?;
    let settings = ResultSettings::new()
        .with_output_path(output_dir.path().to_path_buf())
        .with_format(TabularFormat::Json);
    let results = completed.results(settings).await?;

    // --- Access results ---
    println!("=== Result Store ({} command(s)) ===\n", results.len());

    for cmd_results in results.iter() {
        println!("Source: {}", cmd_results.source().to_dotted());

        for (path, value) in cmd_results.data_iter() {
            match value.as_scalar() {
                Some((ty, val)) => {
                    println!("  {} = {} ({:?})", path.to_dotted(), val, ty);
                }
                None => {
                    let (file_path, fmt, rows, cols) =
                        value.as_tabular().expect("Expected tabular");
                    println!(
                        "  {} => {} ({} rows x {} cols)",
                        path.to_dotted(),
                        file_path.display(),
                        rows,
                        cols
                    );
                }
            }
        }
    }

    Ok(())
}

Next Steps

Explore Pipeline Patterns to learn about iteration, conditional execution, and advanced pipeline techniques.

Pipeline Patterns

This section covers common patterns and recipes for building effective Panopticon pipelines. Each pattern addresses a specific problem with a concrete solution you can adapt to your use case.

Pattern Overview

PatternProblemSolution
IterationProcess each item in a collectionUse iterative namespaces with iter_var and index_var
Conditional ExecutionSkip commands based on runtime conditionsUse the when attribute with Tera expressions
Pipeline EditingAdd stages to an already-executed pipelineUse .edit() to return to Draft state
Result AccessRetrieve and export pipeline outputsConfigure ResultSettings and iterate ResultStore

When to Use These Patterns

Iteration

Use iterative namespaces when you need to:

  • Process each key in a configuration object
  • Apply the same operation to every item in an array
  • Loop over unique values in a DataFrame column
  • Split a string and handle each segment

Example scenario: You have a configuration object with region keys (us-east, us-west, eu-west) and need to run a classification command for each region.

Conditional Execution

Use the when attribute when you need to:

  • Feature-flag parts of your pipeline
  • Skip expensive operations when their inputs are empty
  • Create debug-only or production-only commands
  • Short-circuit processing based on earlier results

Example scenario: You have a feature flag in your configuration, and certain commands should only execute when that flag is enabled.

Pipeline Editing

Use .edit() when you need to:

  • Add follow-up analysis after initial exploration
  • Build pipelines incrementally based on intermediate results
  • Implement REPL-style workflows
  • Reuse expensive data loading across multiple analyses

Example scenario: You loaded a large dataset and ran an initial query. Now you want to add aggregation commands without re-loading the data.

Result Access

Configure ResultSettings when you need to:

  • Export tabular results to a specific directory
  • Choose output format (CSV, JSON, Parquet)
  • Exclude certain commands from result collection
  • Iterate over all results programmatically

Example scenario: After pipeline execution, you want to write all DataFrames to a temporary directory as Parquet files and print a summary of all computed metrics.

Combining Patterns

These patterns compose naturally. A real-world pipeline might:

  1. Load data into a static namespace for configuration
  2. Use an iterative namespace to process each configuration key
  3. Apply when conditions to skip processing for disabled regions
  4. Call .edit() to add aggregation after reviewing initial results
  5. Export all results with custom ResultSettings
#![allow(unused)]
fn main() {
// Pseudocode showing pattern composition
let mut pipeline = Pipeline::new();

// Static config (Pattern: static namespaces)
pipeline.add_namespace(NamespaceBuilder::new("config").static_ns()
    .insert("regions", regions_object)
    .insert("debug_mode", ScalarValue::Bool(false))
).await?;

// Iterative processing (Pattern: iteration)
let mut handle = pipeline.add_namespace(
    NamespaceBuilder::new("process")
        .iterative()
        .store_path(StorePath::from_segments(["config", "regions"]))
        .scalar_object_keys(None, false)
        .iter_var("region")
).await?;

// Conditional command (Pattern: when)
handle.add_command::<ExpensiveCommand>("analyze", &ObjectBuilder::new()
    .insert("when", "config.debug_mode == false")  // Skip in debug mode
    .build_hashmap()
).await?;

// Execute first pass
let completed = pipeline.compile().await?.execute().await?;

// Add more analysis (Pattern: pipeline editing)
let mut pipeline = completed.edit();
pipeline.add_namespace(NamespaceBuilder::new("summary")).await?
    .add_command::<AggregateCommand>("totals", &agg_attrs).await?;

// Re-execute and collect results (Pattern: result access)
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(
    ResultSettings::new()
        .with_output_path(output_dir)
        .with_format(TabularFormat::Parquet)
).await?;
}

The following pages dive into each pattern with complete, runnable examples.

Iteration

Problem: You have a collection of items (object keys, array elements, or DataFrame column values) and need to run the same commands for each item.

Solution: Create an iterative namespace that loops over the collection, exposing each item via iter_var and its position via index_var.

How Iterative Namespaces Work

When you create an iterative namespace, Panopticon:

  1. Resolves the source collection from the data store
  2. Executes all commands in the namespace once per item
  3. Stores results with an index suffix (e.g., classify.region[0], classify.region[1])
  4. Exposes the current item and index as template variables

Iterator Types

Panopticon supports four iterator types:

TypeSourceIterates Over
scalar_object_keysJSON objectKeys of the object
scalar_arrayJSON arrayElements of the array
string_splitString valueSegments split by delimiter
tabular_columnDataFrame columnUnique values in the column

Basic Pattern: Object Keys

The most common pattern is iterating over the keys of a configuration object.

use panopticon_core::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut pipeline = Pipeline::new();

    // Create a static namespace with an object to iterate over
    pipeline
        .add_namespace(
            NamespaceBuilder::new("config").static_ns().insert(
                "regions",
                ObjectBuilder::new()
                    .insert("us-east", "Virginia")
                    .insert("us-west", "Oregon")
                    .insert("eu-west", "Ireland")
                    .build_scalar(),
            ),
        )
        .await?;

    // Create an iterative namespace that loops over region keys
    let mut handle = pipeline
        .add_namespace(
            NamespaceBuilder::new("classify")
                .iterative()
                .store_path(StorePath::from_segments(["config", "regions"]))
                .scalar_object_keys(None, false)  // All keys, no exclusions
                .iter_var("region")               // Current key available as {{ region }}
                .index_var("idx"),                // Current index available as {{ idx }}
        )
        .await?;

    // This command runs once per region key
    let attrs = ObjectBuilder::new()
        .insert(
            "branches",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "is_us")
                    .insert("if", "region is starting_with(\"us-\")")
                    .insert("then", "Region {{ region }} is in the US")
                    .build_scalar(),
                ObjectBuilder::new()
                    .insert("name", "is_eu")
                    .insert("if", "region is starting_with(\"eu-\")")
                    .insert("then", "Region {{ region }} is in the EU")
                    .build_scalar(),
            ]),
        )
        .insert("default", "Region {{ region }} is in an unknown area")
        .build_hashmap();

    handle
        .add_command::<ConditionCommand>("check", &attrs)
        .await?;

    // Execute the pipeline
    let completed = pipeline.compile().await?.execute().await?;
    let results = completed.results(ResultSettings::default()).await?;

    // Access results by index
    for idx in 0.. {
        let source = StorePath::from_segments(["classify", "check"]).with_index(idx);
        let Some(cmd_results) = results.get_by_source(&source) else {
            break;  // No more iterations
        };

        let result = cmd_results
            .data_get(&source.with_segment("result"))
            .and_then(|r| r.as_scalar())
            .expect("Expected result");

        println!("[{}] {}", idx, result.1);
    }

    Ok(())
}

Output:

[0] Region us-east is in the US
[1] Region us-west is in the US
[2] Region eu-west is in the EU

Key Concepts

iter_var and index_var

These methods define the template variable names used during iteration:

#![allow(unused)]
fn main() {
.iter_var("region")   // {{ region }} contains the current item
.index_var("idx")     // {{ idx }} contains 0, 1, 2, ...
}

If not specified, the defaults are:

  • iter_var: "item"
  • index_var: "index"

These variables are available in:

  • Tera template expressions in command attributes
  • The when condition for conditional execution
  • Any attribute that supports Tera substitution

store_path

The store_path points to the collection in the scalar or tabular store:

#![allow(unused)]
fn main() {
.store_path(StorePath::from_segments(["config", "regions"]))
}

This path must exist when the pipeline executes. If it does not, execution fails with an error.

Result Indexing

Iterative namespace results are indexed by iteration number. To access them:

#![allow(unused)]
fn main() {
// Build the base path
let base = StorePath::from_segments(["namespace", "command"]);

// Access specific iteration
let iteration_0 = base.with_index(0);  // namespace.command[0]
let iteration_1 = base.with_index(1);  // namespace.command[1]

// Get results
let results_0 = results.get_by_source(&iteration_0);
}

Filtering Object Keys

You can filter which keys to iterate over:

#![allow(unused)]
fn main() {
// Only iterate over specific keys
.scalar_object_keys(Some(vec!["us-east".to_string(), "eu-west".to_string()]), false)

// Exclude specific keys (iterate over all except these)
.scalar_object_keys(Some(vec!["us-west".to_string()]), true)

// Iterate over all keys
.scalar_object_keys(None, false)
}

Iterating Over Arrays

To iterate over array elements instead of object keys:

#![allow(unused)]
fn main() {
pipeline
    .add_namespace(
        NamespaceBuilder::new("config").static_ns().insert(
            "items",
            ScalarValue::Array(vec![
                ScalarValue::String("apple".to_string()),
                ScalarValue::String("banana".to_string()),
                ScalarValue::String("cherry".to_string()),
            ]),
        ),
    )
    .await?;

let mut handle = pipeline
    .add_namespace(
        NamespaceBuilder::new("process")
            .iterative()
            .store_path(StorePath::from_segments(["config", "items"]))
            .scalar_array(None)        // All elements
            .iter_var("fruit"),
    )
    .await?;
}

With a range to limit iterations:

#![allow(unused)]
fn main() {
.scalar_array(Some((0, 2)))  // Only first two elements (indices 0 and 1)
}

Iterating Over String Segments

Split a string and iterate over the parts:

#![allow(unused)]
fn main() {
pipeline
    .add_namespace(
        NamespaceBuilder::new("config").static_ns()
            .insert("path", ScalarValue::String("/usr/local/bin".to_string())),
    )
    .await?;

let mut handle = pipeline
    .add_namespace(
        NamespaceBuilder::new("segments")
            .iterative()
            .store_path(StorePath::from_segments(["config", "path"]))
            .string_split("/")         // Split on "/"
            .iter_var("segment"),
    )
    .await?;
}

Iterating Over DataFrame Columns

Extract unique values from a DataFrame column:

#![allow(unused)]
fn main() {
// Assuming data.load.users.data contains a DataFrame with a "department" column
let mut handle = pipeline
    .add_namespace(
        NamespaceBuilder::new("by_dept")
            .iterative()
            .store_path(StorePath::from_segments(["data", "load", "users", "data"]))
            .tabular_column("department", None)  // Unique values from "department"
            .iter_var("dept"),
    )
    .await?;
}

This iterates over the unique, non-null values in the specified column. Use a range to limit:

#![allow(unused)]
fn main() {
.tabular_column("department", Some((0, 5)))  // First 5 unique values
}

Best Practices

Use Descriptive Variable Names

Choose iter_var names that reflect what you are iterating over:

#![allow(unused)]
fn main() {
// Good: clear what we're iterating
.iter_var("region")
.iter_var("user_id")
.iter_var("filename")

// Avoid: generic names
.iter_var("item")
.iter_var("x")
}

Keep Iteration Counts Reasonable

Each iteration creates separate command executions and results. For large collections, consider:

  • Filtering with scalar_object_keys(Some(keys), false)
  • Using ranges with scalar_array(Some((start, end)))
  • Pre-filtering data with SqlCommand before iteration

Access Results Systematically

When iterating over results, use a loop that terminates when get_by_source returns None:

#![allow(unused)]
fn main() {
let mut idx = 0;
loop {
    let source = StorePath::from_segments(["ns", "cmd"]).with_index(idx);
    let Some(results) = store.get_by_source(&source) else {
        break;
    };
    // Process results...
    idx += 1;
}
}

This pattern handles any number of iterations without hardcoding the count.

Conditional Execution

Problem: You want certain commands to execute only when specific conditions are met at runtime.

Solution: Use the when attribute with a Tera expression. If it evaluates to a falsy value, the command is skipped.

How the when Attribute Works

Every command in Panopticon supports an optional when attribute:

  1. Before executing the command, the runtime evaluates the when expression
  2. If the result is falsy (false, null, empty string, 0), the command is skipped
  3. Skipped commands have status = "skipped" in their metadata
  4. Data results are absent for skipped commands

Basic Pattern: Feature Flags

The most common use case is feature-flagging parts of your pipeline.

use panopticon_core::prelude::*;

async fn run_with_feature_flag(enabled: bool) -> anyhow::Result<()> {
    let mut pipeline = Pipeline::with_services(PipelineServices::defaults());

    // Static namespace with configuration
    pipeline
        .add_namespace(
            NamespaceBuilder::new("inputs")
                .static_ns()
                .insert("feature_enabled", ScalarValue::Bool(enabled))
                .insert("user_name", ScalarValue::String("Alice".to_string())),
        )
        .await?;

    // Command with `when` guard
    let attrs = ObjectBuilder::new()
        .insert("when", "inputs.feature_enabled")  // <-- The condition
        .insert(
            "branches",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "greeting")
                    .insert("if", "true")
                    .insert("then", "Hello, {{ inputs.user_name }}! Feature is active.")
                    .build_scalar(),
            ]),
        )
        .insert("default", "Fallback message")
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("example"))
        .await?
        .add_command::<ConditionCommand>("greeting", &attrs)
        .await?;

    // Execute
    let completed = pipeline.compile().await?.execute().await?;
    let results = completed.results(ResultSettings::default()).await?;

    // Check the status
    let source = StorePath::from_segments(["example", "greeting"]);
    let cmd_results = results
        .get_by_source(&source)
        .expect("Expected results");

    let status = cmd_results
        .meta_get(&source.with_segment("status"))
        .expect("Expected status");

    println!("status = {}", status);

    // Data is only present when the command executed
    if let Some(result) = cmd_results
        .data_get(&source.with_segment("result"))
        .and_then(|r| r.as_scalar())
    {
        println!("result = {}", result.1);
    } else {
        println!("(no data - command was skipped)");
    }

    Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    println!("=== Feature flag: TRUE ===");
    run_with_feature_flag(true).await?;

    println!("\n=== Feature flag: FALSE ===");
    run_with_feature_flag(false).await?;

    Ok(())
}

Output:

=== Feature flag: TRUE ===
status = "completed"
result = Hello, Alice! Feature is active.

=== Feature flag: FALSE ===
status = "skipped"
(no data - command was skipped)

Tera Expression Syntax

The when value is a Tera expression that has access to the entire scalar store. Common patterns:

Boolean Checks

#![allow(unused)]
fn main() {
// Direct boolean value
.insert("when", "config.debug_mode")

// Negation
.insert("when", "not config.production")

// Comparison
.insert("when", "config.log_level == \"debug\"")
}

Numeric Comparisons

#![allow(unused)]
fn main() {
// Greater than
.insert("when", "stats.row_count > 0")

// Range check
.insert("when", "inputs.threshold >= 10 and inputs.threshold <= 100")
}

String Tests

#![allow(unused)]
fn main() {
// Equality
.insert("when", "config.environment == \"production\"")

// Prefix check
.insert("when", "region is starting_with(\"us-\")")

// Contains
.insert("when", "config.tags is containing(\"important\")")
}

Existence Checks

#![allow(unused)]
fn main() {
// Check if a value is defined
.insert("when", "optional_config is defined")

// Check for null
.insert("when", "maybe_value is not none")
}

Combined Conditions

#![allow(unused)]
fn main() {
// AND
.insert("when", "config.enabled and stats.count > 0")

// OR
.insert("when", "config.mode == \"full\" or config.force")

// Complex
.insert("when", "(env == \"prod\" or env == \"staging\") and feature_flags.new_flow")
}

Handling Skipped Commands

When a command is skipped, you need to handle its absence in downstream commands.

Check Status in Results

#![allow(unused)]
fn main() {
let status = cmd_results
    .meta_get(&source.with_segment("status"))
    .expect("Expected status");

match status.as_str() {
    Some("completed") => {
        // Process data results
    }
    Some("skipped") => {
        // Handle skipped case
    }
    _ => {
        // Unexpected status
    }
}
}

Use Tera Defaults in Templates

When referencing potentially-skipped command outputs in templates:

#![allow(unused)]
fn main() {
// Use default filter to handle missing values
.insert("message", "Count: {{ stats.count | default(value=0) }}")
}

Chain Conditions

If command B depends on command A's output, and A might be skipped:

#![allow(unused)]
fn main() {
// Command A: might be skipped
let a_attrs = ObjectBuilder::new()
    .insert("when", "config.run_expensive_query")
    // ...
    .build_hashmap();

handle.add_command::<SqlCommand>("query_a", &a_attrs).await?;

// Command B: only runs if A ran successfully
let b_attrs = ObjectBuilder::new()
    .insert("when", "ns.query_a.status == \"completed\"")
    // ...
    .build_hashmap();

handle.add_command::<AggregateCommand>("aggregate_b", &b_attrs).await?;
}

Use Cases

Debug-Only Commands

Skip verbose logging in production:

#![allow(unused)]
fn main() {
.insert("when", "config.debug")
}

Empty Data Guards

Skip processing when there is no data:

#![allow(unused)]
fn main() {
.insert("when", "data.load.rows > 0")
}

Environment-Specific Logic

Run different commands per environment:

#![allow(unused)]
fn main() {
// Production-only
.insert("when", "config.env == \"production\"")

// Development-only
.insert("when", "config.env == \"development\"")
}

Conditional Exports

Only export when there are results worth saving:

#![allow(unused)]
fn main() {
.insert("when", "stats.significant_findings > 0")
}

Iteration Guards

Within an iterative namespace, skip certain items:

#![allow(unused)]
fn main() {
// Skip disabled regions
.insert("when", "region is not starting_with(\"disabled-\")")

// Only process items meeting criteria
.insert("when", "item.status == \"active\"")
}

Best Practices

Keep Conditions Simple

Complex conditions are hard to debug. Prefer:

#![allow(unused)]
fn main() {
// Good: simple, readable
.insert("when", "config.feature_enabled")

// Avoid: complex nested logic
.insert("when", "((a and b) or (c and not d)) and (e or f)")
}

If you need complex logic, compute a boolean in an earlier command and reference it.

Document Skip Behavior

When a command might be skipped, document what happens:

#![allow(unused)]
fn main() {
// This command is skipped when feature_x is disabled.
// Downstream commands that reference its output use default values.
let attrs = ObjectBuilder::new()
    .insert("when", "config.feature_x")
    // ...
}

Test Both Paths

Always test your pipeline with conditions evaluating to both true and false to ensure proper handling.

Use Status Checks for Dependencies

Instead of duplicating conditions, check the upstream command's status:

#![allow(unused)]
fn main() {
// Instead of repeating the condition:
// .insert("when", "config.feature_x")

// Check if the dependency actually ran:
.insert("when", "upstream.cmd.status == \"completed\"")
}

This ensures consistency even if the original condition logic changes.

Pipeline Editing

Problem: You have executed a pipeline and want to add more commands without re-running everything from scratch.

Solution: Call .edit() on a completed pipeline to return to Draft state, add new namespaces and commands, then re-compile and execute.

How Pipeline Editing Works

The Pipeline type has a state machine with three states:

Draft  --compile-->  Ready  --execute-->  Completed
  ^                                           |
  |                                           |
  +------------------edit()-------------------+

When you call .edit() on a Pipeline<Completed>:

  1. The pipeline returns to Pipeline<Draft> state
  2. All existing namespaces and commands are preserved
  3. All data in the stores is preserved
  4. You can add new namespaces and commands
  5. Re-compilation and execution runs only the new additions (existing results remain)

Basic Pattern: Two-Pass Pipeline

A common workflow: load data, run initial analysis, review, then add more analysis.

use panopticon_core::prelude::*;
use std::path::PathBuf;

fn fixtures_dir() -> PathBuf {
    PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("fixtures")
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // ===== Pass 1: Load data and run initial query =====
    println!("=== Pass 1: Load + Query ===\n");

    let mut pipeline = Pipeline::new();

    // Load users from CSV
    let file_attrs = ObjectBuilder::new()
        .insert(
            "files",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "users")
                    .insert("file", fixtures_dir().join("users.csv").to_string_lossy().to_string())
                    .insert("format", "csv")
                    .build_scalar(),
            ]),
        )
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("data"))
        .await?
        .add_command::<FileCommand>("load", &file_attrs)
        .await?;

    // Run a SQL query
    let sql_attrs = ObjectBuilder::new()
        .insert(
            "tables",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "users")
                    .insert("source", "data.load.users.data")
                    .build_scalar(),
            ]),
        )
        .insert("query", "SELECT name, age FROM users ORDER BY age DESC")
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("query"))
        .await?
        .add_command::<SqlCommand>("sorted", &sql_attrs)
        .await?;

    // Execute pass 1
    let completed = pipeline.compile().await?.execute().await?;
    let results = completed.results(ResultSettings::default()).await?;

    // Inspect pass 1 results
    let query_source = StorePath::from_segments(["query", "sorted"]);
    let query_results = results.get_by_source(&query_source).expect("Expected query results");
    let rows = query_results.meta_get(&query_source.with_segment("rows")).expect("Expected rows");
    println!("  query.sorted: {} rows", rows);
    println!("  Namespaces: data, query");

    // ===== Pass 2: Add aggregation to existing pipeline =====
    println!("\n=== Pass 2: Edit + Aggregate ===\n");

    // Return to Draft state - this is the key step!
    let mut pipeline = completed.edit();

    // Add aggregation namespace (data from pass 1 is still available)
    let agg_attrs = ObjectBuilder::new()
        .insert("source", "data.load.users.data")
        .insert(
            "aggregations",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "user_count")
                    .insert("op", "count")
                    .build_scalar(),
                ObjectBuilder::new()
                    .insert("name", "avg_age")
                    .insert("column", "age")
                    .insert("op", "mean")
                    .build_scalar(),
                ObjectBuilder::new()
                    .insert("name", "oldest")
                    .insert("column", "age")
                    .insert("op", "max")
                    .build_scalar(),
            ]),
        )
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("stats"))
        .await?
        .add_command::<AggregateCommand>("users", &agg_attrs)
        .await?;

    // Re-compile and execute
    let completed = pipeline.compile().await?.execute().await?;
    let results = completed.results(ResultSettings::default()).await?;

    // Original results are still present
    let query_source = StorePath::from_segments(["query", "sorted"]);
    let query_results = results.get_by_source(&query_source).expect("query.sorted should still exist");
    let rows = query_results.meta_get(&query_source.with_segment("rows")).expect("Expected rows");
    println!("  query.sorted: {} rows (preserved from pass 1)", rows);

    // New aggregation results are available
    let stats_source = StorePath::from_segments(["stats", "users"]);
    let stats_results = results.get_by_source(&stats_source).expect("Expected stats results");

    for name in ["user_count", "avg_age", "oldest"] {
        let value = stats_results
            .data_get(&stats_source.with_segment(name))
            .and_then(|r| r.as_scalar())
            .expect(&format!("Expected {}", name));
        println!("  stats.users.{} = {}", name, value.1);
    }

    println!("  Namespaces: data, query, stats");
    println!("\nPipeline successfully edited and re-executed.");

    Ok(())
}

Output:

=== Pass 1: Load + Query ===

  query.sorted: 5 rows
  Namespaces: data, query

=== Pass 2: Edit + Aggregate ===

  query.sorted: 5 rows (preserved from pass 1)
  stats.users.user_count = 5
  stats.users.avg_age = 32.4
  stats.users.oldest = 45
  Namespaces: data, query, stats

Pipeline successfully edited and re-executed.

Key Points

State Preservation

When you call .edit():

  • Preserved: Namespaces, commands, store data, execution results
  • Reset: Pipeline state returns to Draft

This means pass 2 can reference data created in pass 1 without re-executing the original commands.

Ownership Transfer

The .edit() method consumes the Pipeline<Completed>:

#![allow(unused)]
fn main() {
// completed is consumed here
let mut pipeline = completed.edit();

// This would fail - completed no longer exists:
// let _ = completed.results(...);  // ERROR: use of moved value
}

Re-compilation Required

After editing, you must call .compile() before .execute():

#![allow(unused)]
fn main() {
let mut pipeline = completed.edit();
pipeline.add_namespace(...).await?;

// Must compile before executing
let completed = pipeline.compile().await?.execute().await?;
}

Use Cases

Exploratory Data Analysis

Build your analysis incrementally:

#![allow(unused)]
fn main() {
// Step 1: Load and explore
let completed = load_and_preview().await?;
// ... review results ...

// Step 2: Add filtering based on what you saw
let mut pipeline = completed.edit();
add_filters(&mut pipeline).await?;
let completed = pipeline.compile().await?.execute().await?;
// ... review filtered results ...

// Step 3: Add aggregation
let mut pipeline = completed.edit();
add_aggregations(&mut pipeline).await?;
let completed = pipeline.compile().await?.execute().await?;
}

Conditional Follow-Up

Add analysis based on intermediate results:

#![allow(unused)]
fn main() {
let completed = pipeline.compile().await?.execute().await?;
let results = completed.results(ResultSettings::default()).await?;

// Check if we need additional analysis
let row_count = get_row_count(&results);
if row_count > 1000 {
    let mut pipeline = completed.edit();
    add_sampling_namespace(&mut pipeline).await?;
    let completed = pipeline.compile().await?.execute().await?;
}
}

REPL-Style Workflows

In an interactive environment, let users add commands incrementally:

#![allow(unused)]
fn main() {
let mut completed = initial_pipeline.compile().await?.execute().await?;

loop {
    // Show current results
    display_results(&completed).await?;

    // Get user input
    let command = get_user_command()?;
    if command == "quit" {
        break;
    }

    // Add the new command
    let mut pipeline = completed.edit();
    add_user_command(&mut pipeline, &command).await?;
    completed = pipeline.compile().await?.execute().await?;
}
}

Expensive Data Reuse

Load expensive data once, analyze multiple ways:

#![allow(unused)]
fn main() {
// Load large dataset (expensive)
let completed = load_big_dataset().await?;

// Analysis 1
let mut p1 = completed.edit();
add_analysis_1(&mut p1).await?;
let c1 = p1.compile().await?.execute().await?;
save_results(&c1, "analysis_1").await?;

// Analysis 2 (starts fresh from after load, not from c1)
let mut p2 = completed.edit();
add_analysis_2(&mut p2).await?;
let c2 = p2.compile().await?.execute().await?;
save_results(&c2, "analysis_2").await?;
}

Note: In this pattern, each .edit() call forks from the same point. Changes in p1 do not affect p2.

Best Practices

Name Namespaces for Phases

Use namespace names that indicate which pass they belong to:

#![allow(unused)]
fn main() {
// Pass 1
.add_namespace(NamespaceBuilder::new("load"))
.add_namespace(NamespaceBuilder::new("initial_query"))

// Pass 2
.add_namespace(NamespaceBuilder::new("refined_query"))
.add_namespace(NamespaceBuilder::new("aggregations"))
}

Document the Multi-Pass Structure

When your pipeline has multiple passes, document the intended flow:

#![allow(unused)]
fn main() {
// Pipeline structure:
// Pass 1: Load data, run initial validation
// Pass 2: Apply transformations based on validation results
// Pass 3: Generate final reports
}

Avoid Namespace Name Conflicts

Each namespace must have a unique name. Adding a namespace with a duplicate name will fail:

#![allow(unused)]
fn main() {
// Pass 1
pipeline.add_namespace(NamespaceBuilder::new("data")).await?;

// Pass 2 - this fails!
let mut pipeline = completed.edit();
pipeline.add_namespace(NamespaceBuilder::new("data")).await?;  // ERROR: duplicate name
}

Consider Memory Usage

All store data is preserved across edits. For very large datasets, this can increase memory usage. If you need to release memory:

#![allow(unused)]
fn main() {
// Create a fresh pipeline instead of editing
let mut new_pipeline = Pipeline::new();
// Copy only the data you need
}

Ready State Editing

You can also call .edit() on a Pipeline<Ready> (after compile, before execute):

#![allow(unused)]
fn main() {
let ready = pipeline.compile().await?;
// Oops, forgot something
let mut pipeline = ready.edit();
pipeline.add_namespace(...).await?;
let ready = pipeline.compile().await?;
let completed = ready.execute().await?;
}

This is useful when you realize you need to add something after compilation but before execution.

Result Access

Problem: After pipeline execution, you need to retrieve results, export tabular data, and iterate over outputs programmatically.

Solution: Use ResultSettings to configure output behavior and ResultStore to access and iterate over all command results.

How Result Access Works

After calling .execute(), you have a Pipeline<Completed>. To access results:

  1. Create ResultSettings to configure output path and format
  2. Call .results(settings) to get a ResultStore
  3. Use ResultStore methods to access individual command results
  4. Each CommandResults contains metadata and data, accessible via iterators or direct lookup

Basic Pattern: Configure and Collect

use panopticon_core::prelude::*;
use std::path::PathBuf;

fn fixtures_dir() -> PathBuf {
    PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("fixtures")
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let output_dir = tempfile::tempdir()?;
    let mut pipeline = Pipeline::new();

    // Load product data
    let file_attrs = ObjectBuilder::new()
        .insert(
            "files",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "products")
                    .insert("file", fixtures_dir().join("products.csv").to_string_lossy().to_string())
                    .insert("format", "csv")
                    .build_scalar(),
            ]),
        )
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("data"))
        .await?
        .add_command::<FileCommand>("load", &file_attrs)
        .await?;

    // Compute aggregations
    let agg_attrs = ObjectBuilder::new()
        .insert("source", "data.load.products.data")
        .insert(
            "aggregations",
            ScalarValue::Array(vec![
                ObjectBuilder::new()
                    .insert("name", "row_count")
                    .insert("op", "count")
                    .build_scalar(),
                ObjectBuilder::new()
                    .insert("name", "total_price")
                    .insert("column", "price")
                    .insert("op", "sum")
                    .build_scalar(),
                ObjectBuilder::new()
                    .insert("name", "avg_price")
                    .insert("column", "price")
                    .insert("op", "mean")
                    .build_scalar(),
            ]),
        )
        .build_hashmap();

    pipeline
        .add_namespace(NamespaceBuilder::new("stats"))
        .await?
        .add_command::<AggregateCommand>("products", &agg_attrs)
        .await?;

    // Execute pipeline
    let completed = pipeline.compile().await?.execute().await?;

    // Configure result settings
    let settings = ResultSettings::new()
        .with_output_path(output_dir.path().to_path_buf())
        .with_format(TabularFormat::Json);

    // Collect results
    let results = completed.results(settings).await?;

    // Iterate over all results
    println!("=== Result Store ({} command(s)) ===\n", results.len());

    for cmd_results in results.iter() {
        println!("Source: {}", cmd_results.source().to_dotted());

        // Print metadata
        for (path, value) in cmd_results.meta_iter() {
            println!("  [meta] {} = {}", path.to_dotted(), value);
        }

        // Print data
        for (path, value) in cmd_results.data_iter() {
            match value.as_scalar() {
                Some((ty, val)) => {
                    println!("  [data] {} = {} ({:?})", path.to_dotted(), val, ty);
                }
                None => {
                    let (file_path, fmt, rows, cols) =
                        value.as_tabular().expect("Expected tabular result");
                    println!(
                        "  [data] {} => {} ({} rows x {} cols)",
                        path.to_dotted(),
                        file_path.display(),
                        rows,
                        cols
                    );
                }
            }
        }
        println!();
    }

    Ok(())
}

Output:

=== Result Store (2 command(s)) ===

Source: data.load
  [meta] data.load.products.rows = 10
  [meta] data.load.products.columns = ["name", "price", "quantity"]
  [data] data.load.products.data => /tmp/xxx/data_load_products.json (10 rows x 3 cols)

Source: stats.products
  [meta] stats.products.status = "completed"
  [data] stats.products.row_count = 10 (Int)
  [data] stats.products.total_price = 1250.50 (Float)
  [data] stats.products.avg_price = 125.05 (Float)

ResultSettings

ResultSettings configures how results are collected and exported.

Creating Settings

#![allow(unused)]
fn main() {
// Default settings
let settings = ResultSettings::default();

// Or use the builder
let settings = ResultSettings::new();
}

Output Path

Specify where tabular data files are written:

#![allow(unused)]
fn main() {
let settings = ResultSettings::new()
    .with_output_path(PathBuf::from("/path/to/output"));
}

Default: ./panopticon_results in the current working directory.

Output Format

Choose the format for tabular data exports:

#![allow(unused)]
fn main() {
// JSON (default)
let settings = ResultSettings::new()
    .with_format(TabularFormat::Json);

// CSV
let settings = ResultSettings::new()
    .with_format(TabularFormat::Csv);

// Parquet (efficient binary format)
let settings = ResultSettings::new()
    .with_format(TabularFormat::Parquet);
}

Excluded Commands

Skip specific commands when collecting results:

#![allow(unused)]
fn main() {
let settings = ResultSettings::new()
    .with_excluded_commands(vec![
        StorePath::from_segments(["debug", "verbose_log"]),
        StorePath::from_segments(["temp", "intermediate"]),
    ]);
}

Excluded commands do not appear in the ResultStore and their tabular data is not exported.

ResultStore

The ResultStore contains all command results from the pipeline execution.

Basic Access

#![allow(unused)]
fn main() {
let results = completed.results(settings).await?;

// Number of commands with results
let count = results.len();

// Check if empty
if results.is_empty() {
    println!("No results");
}
}

Lookup by Source

Access a specific command's results by its store path:

#![allow(unused)]
fn main() {
let source = StorePath::from_segments(["namespace", "command"]);

if let Some(cmd_results) = results.get_by_source(&source) {
    // Process this command's results
}
}

Iteration

Iterate over all command results:

#![allow(unused)]
fn main() {
for cmd_results in results.iter() {
    println!("Command: {}", cmd_results.source().to_dotted());
}
}

CommandResults

Each CommandResults contains the outputs from a single command.

Source Path

The path identifying this command:

#![allow(unused)]
fn main() {
let source: &StorePath = cmd_results.source();
println!("Command at: {}", source.to_dotted());
}

Metadata Access

Metadata includes execution information like row counts, column names, and status:

#![allow(unused)]
fn main() {
// Direct lookup
let source = StorePath::from_segments(["data", "load"]);
let rows = cmd_results
    .meta_get(&source.with_segment("products").with_segment("rows"))
    .expect("Expected rows");

// Iterate all metadata
for (path, value) in cmd_results.meta_iter() {
    println!("{} = {}", path.to_dotted(), value);
}

// Get all metadata keys
for key in cmd_results.meta_keys() {
    println!("Meta key: {}", key.to_dotted());
}
}

Data Access

Data includes the actual outputs (scalar values or tabular data references):

#![allow(unused)]
fn main() {
// Direct lookup
let source = StorePath::from_segments(["stats", "products"]);
let avg = cmd_results
    .data_get(&source.with_segment("avg_price"))
    .and_then(|r| r.as_scalar());

// Iterate all data
for (path, value) in cmd_results.data_iter() {
    match value.as_scalar() {
        Some((ty, val)) => println!("Scalar: {} = {}", path.to_dotted(), val),
        None => {
            let (file, fmt, rows, cols) = value.as_tabular().unwrap();
            println!("Tabular: {} -> {}", path.to_dotted(), file.display());
        }
    }
}

// Get all data keys
for key in cmd_results.data_keys() {
    println!("Data key: {}", key.to_dotted());
}
}

ResultValue

Each data value is either scalar or tabular.

Scalar Values

#![allow(unused)]
fn main() {
if let Some((ty, value)) = result_value.as_scalar() {
    // ty: &ScalarType (Int, Float, String, Bool, etc.)
    // value: &ScalarValue
    println!("Type: {:?}, Value: {}", ty, value);
}

// Type check
if result_value.is_scalar() {
    // ...
}
}

Tabular Values

#![allow(unused)]
fn main() {
if let Some((path, format, rows, cols)) = result_value.as_tabular() {
    // path: &PathBuf - file location on disk
    // format: &TabularFormat - Csv, Json, or Parquet
    // rows: usize - row count
    // cols: usize - column count
    println!("File: {}, Format: {}, Shape: {}x{}",
             path.display(), format, rows, cols);
}

// Type check
if result_value.is_tabular() {
    // ...
}
}

Patterns for Common Tasks

#![allow(unused)]
fn main() {
println!("Pipeline Results Summary");
println!("========================");

for cmd_results in results.iter() {
    let source = cmd_results.source();
    print!("{}: ", source.to_dotted());

    // Check status
    if let Some(status) = cmd_results.meta_get(&source.with_segment("status")) {
        if status.as_str() == Some("skipped") {
            println!("SKIPPED");
            continue;
        }
    }

    // Count outputs
    let scalar_count = cmd_results.data_iter()
        .filter(|(_, v)| v.is_scalar())
        .count();
    let tabular_count = cmd_results.data_iter()
        .filter(|(_, v)| v.is_tabular())
        .count();

    println!("{} scalars, {} tables", scalar_count, tabular_count);
}
}

Export All Tables to Directory

#![allow(unused)]
fn main() {
let settings = ResultSettings::new()
    .with_output_path(export_dir.to_path_buf())
    .with_format(TabularFormat::Parquet);

let results = completed.results(settings).await?;

// List exported files
println!("Exported files:");
for entry in std::fs::read_dir(&export_dir)? {
    let entry = entry?;
    let meta = entry.metadata()?;
    println!("  {} ({} bytes)",
             entry.file_name().to_string_lossy(),
             meta.len());
}
}

Collect Scalar Metrics

#![allow(unused)]
fn main() {
let mut metrics: HashMap<String, f64> = HashMap::new();

for cmd_results in results.iter() {
    for (path, value) in cmd_results.data_iter() {
        if let Some((ScalarType::Float, scalar)) = value.as_scalar() {
            if let Some(f) = scalar.as_f64() {
                metrics.insert(path.to_dotted(), f);
            }
        }
    }
}

for (name, value) in &metrics {
    println!("{}: {:.2}", name, value);
}
}

Handle Iterative Results

For iterative namespaces, results are indexed:

#![allow(unused)]
fn main() {
let mut iteration = 0;
loop {
    let source = StorePath::from_segments(["iterative_ns", "command"])
        .with_index(iteration);

    let Some(cmd_results) = results.get_by_source(&source) else {
        break; // No more iterations
    };

    println!("Iteration {}: {:?}", iteration, cmd_results.source().to_dotted());

    // Process this iteration's results...

    iteration += 1;
}

println!("Total iterations: {}", iteration);
}

Best Practices

Choose the Right Format

  • JSON: Human-readable, good for debugging and small datasets
  • CSV: Widely compatible, good for sharing with other tools
  • Parquet: Efficient storage, good for large datasets and further processing

Clean Up Output Directories

#![allow(unused)]
fn main() {
// Use tempdir for automatic cleanup
let output_dir = tempfile::tempdir()?;
let settings = ResultSettings::new()
    .with_output_path(output_dir.path().to_path_buf());

// output_dir is deleted when it goes out of scope
}

Handle Missing Results Gracefully

#![allow(unused)]
fn main() {
let source = StorePath::from_segments(["maybe", "exists"]);

match results.get_by_source(&source) {
    Some(cmd_results) => {
        // Process results
    }
    None => {
        println!("Command {} not in results (possibly excluded or skipped)",
                 source.to_dotted());
    }
}
}

Use Type-Safe Path Construction

Build paths systematically to avoid typos:

#![allow(unused)]
fn main() {
// Define base paths once
let stats_base = StorePath::from_segments(["stats", "products"]);

// Build specific paths from the base
let row_count = stats_base.with_segment("row_count");
let avg_price = stats_base.with_segment("avg_price");
let total = stats_base.with_segment("total_price");
}

Services & IO

Panopticon pipelines can interact with the outside world through services. The PipelineServices struct bundles two categories of functionality:

  • PipelineIO - Send notifications and prompt for user input
  • EventHooks - React to pipeline lifecycle events (compilation, execution, completion)

Services are optional. A pipeline without services runs silently, which is often what you want for batch processing or automated workflows. When you need feedback or interactivity, services provide a clean abstraction.

Attaching Services to a Pipeline

Use Pipeline::with_services() to attach a PipelineServices instance:

#![allow(unused)]
fn main() {
use panopticon_core::prelude::*;

let services = PipelineServices::defaults();
let mut pipeline = Pipeline::with_services(services);
}

Using PipelineIO

The PipelineIO trait provides two methods for interacting with users:

  • notify(message) - Display a message (fire-and-forget)
  • prompt(message) - Display a message and wait for a response

Commands access these through the ExecutionContext:

#![allow(unused)]
fn main() {
// Inside a command's execute method
ctx.services().notify("Processing started...").await?;

if let Some(answer) = ctx.services().prompt("Continue? (y/n)").await? {
    if answer.to_lowercase() == "y" {
        // proceed
    }
}
}

Multiple IO services can be registered. When you call notify, all registered services receive the message. When you call prompt, services are tried in order until one returns a response.

Event Hooks

Event hooks let you observe pipeline lifecycle events without modifying command logic. Hooks fire at key moments:

  • Draft phase: After namespaces/commands are added, before/after compilation
  • Ready phase: Before/after pipeline, namespace, and command execution
  • Completed phase: When results are being collected

This is useful for logging, metrics, progress reporting, or custom debugging.

Default Services

PipelineServices::defaults() behaves differently based on build mode:

Build ModeIO ServiceEvent Hooks
Debug (cfg(debug_assertions))StdoutInteractionDebugEventHooks
ReleaseNoneNone

In debug builds, you get console output and lifecycle logging out of the box. In release builds, services start empty for maximum control.

To start with no services regardless of build mode:

#![allow(unused)]
fn main() {
let services = PipelineServices::new();
}

Implementing Custom Services

This guide covers using services. For implementing your own PipelineIO or EventHooks:

See the Extend guide: Implementing Services

The Extend guide covers:

  • Implementing the PipelineIO trait for custom notification channels
  • Implementing EventHooks for lifecycle callbacks
  • Available hook events and their payloads
  • Registering multiple services with add_io() and add_hook()