Data Transformers

This guide describes how to manipulate the fields in a DataSource, how to calculate a windowed average, apply a single-pole lowpass filter, cache min/max values, and implement a conditional trigger.

If you don't know what a DataSource is, read this guide first

DataTransformers and DataFlow

For performing transformations on inbound data, as typically needed when plotting signals, we use a streaming data computation model.

By expressing the transformation as a computation on streams, the transformation can be run across both historical, static data, and future incoming data.

If data in the dependency tree is invalidated by changing historical data, the query will automatically re-calculate.

A DataTransformer provides a queryable form of a DataFlow, a program built with a tree of operators that modify the query on the way to the DataSource, and transform the Event returned on the way back.

The DataFlow executor can interleave multiple streams of data by their timestamp, perform stateful computation, and iteratively update the query as new data is fed in.

Anything from simple running averages, to complex filtering behaviour is possible with this computation system.

The simplest DataTransformer is a passthrough:

ts
import { MessageDataSource, Query } from '@electricui/core-timeseries'
import { DataTransformer } from '@electricui/dataflow'
 
const dataSource = new MessageDataSource('speed')
 
const dataTransformer = new DataTransformer((query: Query) => {
return dataSource
})

Operators

Several operators are provided by the dataflow package. These operators can be composed and combined to create your own DataFlows which can be reusable.

ts
import { filter, map, prepare, repeat, interleave, forEach, iterateEmit } from '@electricui/dataflow'
import { MessageDataSource, Query, Queryable, Event } from '@electricui/core-timeseries'
import { DataTransformer } from '@electricui/dataflow'

filter

The filter operator allows the filtering of events.

The example below only allows even numbered events through.

ts
const dataSource = new MessageDataSource<number>('speed')
 
const dataTransformer = new DataTransformer((query: Query) => {
const evenEvents = filter(dataSource, event => {
if (event.data % 2 === 0) {
return true
}
return false
})
 
return evenEvents
})

map

The map operator allows the transformation of events in a 1:1 fashion.

The example below converts temperature measurements from Kelvin to Celsius.

Note that this usage does not mutate the original event, which is less performant than simply mutating the event data and returning it due to additional allocations.

However this format is easier to understand; if the Events are reused elsewhere they won't be accidentally mutated.

ts
const dataSource = new MessageDataSource<number>('temperature')
 
const dataTransformer = new DataTransformer((query: Query) => {
const celsiusEvents = map(dataSource, event => {
return new Event(event.time, event.data + 273.15)
})
 
return celsiusEvents
})

forEach

The forEach operator executes a function for every Event received, by default (but optionally) consuming the Event.

This operator can be used for highest-performance, imperative mutation of Events. By mutating the event data directly, zero additional allocations are required.

The example below converts temperature measurements from Kelvin to Celsius, mutating the events.

ts
const dataSource = new MessageDataSource<number>('temperature')
 
const dataTransformer = new DataTransformer((query: Query) => {
const celsiusEvents = forEach(
dataSource,
event => {
event.data += 273.15
},
false,
)
 
return celsiusEvents
})

iterateEmit

The iterateEmit operator executes a function for every Event received, passing an emit function as a parameter that can be used to emit Events that will continue through the stream.

This is another form of the filter operator in a more imperative style.

The following example consumes the first event of a stream, emitting all subsequent events.

ts
const dataSource = new MessageDataSource<number>('temperature')
 
const dataTransformer = new DataTransformer((query: Query) => {
let seenFirst = false
 
const notFirst = iterateEmit(
dataSource,
(event: Event, emit: (event: Event) => void) => {
if (!seenFirst) {
seenFirst = true
return
} //
 
emit(event)
},
)
 
return notFirst
})

prepare

The prepare operator allows the transformation of queries.

The example below extends the query range by 100 ms into the past.

ts
const dataSource = new MessageDataSource<number>('temperature')
 
const dataTransformer = new DataTransformer((query: Query) => {
const moreData = prepare(dataSource, queryToMutate => {
return queryToMutate.start(queryToMutate.getStart() - 100)
})
 
return moreData
})

repeat

The repeat operator runs the query on the DataFlow provided by the closure until the closure returns null. This can be used to create state machines and other complex logic.

The example below runs a simple state machine that runs two other queries in sequence.

ts
const dataSource = new MessageDataSource<number>('temperature')
 
const dataTransformer = new DataTransformer((query: Query) => {
enum STATE {
START = 0,
MIDDLE = 1,
END = 2,
}
 
let state = STATE.START
 
const stateMachine = repeat(() => {
switch (state) {
case STATE.START:
state = STATE.MIDDLE // next repeat cycle, the state will be MIDDLE
return prepare(dataSource, query => query.start(0).end(1))
 
case STATE.MIDDLE:
state = STATE.END // next repeat cycle, the state will be END
return prepare(dataSource, query => query.start(2).end(3))
 
case STATE.END:
// The computation has completed
return null
}
})
 
return stateMachine
})

interleave

The interleave operator takes multiple streams of inputs and returns a 'sorted' by time stream that contains events from each of the inputs.

The example below naively fans in two temperature sources into the same stream.

ts
const dataSourceA = new MessageDataSource<number>('temperatureA')
const dataSourceB = new MessageDataSource<number>('temperatureB')
 
const dataTransformer = new DataTransformer((query: Query) => {
const interleaved = interleave([dataSourceA, dataSourceB])
 
return interleaved
})

More complex examples

Color Mixer

Imagine a rocket that has its own internal state machine, and a set of XYZ coordinates. The goal is to graph the XYZ coordinates as it flies, with the line's color coded to the current state of the rocket.

Instead of a bare DataTransformer, this example will create a reusable DataFlow operator combining other operators, and that will be passed to a DataTransformer.

ts
// @filename: colorMixer.ts
import { forEach, map, interleave, DataTransformer } from '@electricui/dataflow'
import { MessageDataSource, Event, Queryable } from '@electricui/core-timeseries'
 
export type XYZEvent = {
x: number
y: number
z: number
}
 
export type ColorEvent = string // The colour is simply a string
 
export type MixedEvent = XYZEvent & ColorEvent
 
export function colorMixer(
colorQueryable: Queryable<ColorEvent>,
xyzQueryable: Queryable<XYZEvent>,
defaultColor: string,
) {
// The starting state is the default color
let currentColorState = defaultColor
 
// For every color change event (driven by our hardware state changes later on)
// set the current color
const colorSetter = forEach(colorQueryable, event => {
currentColorState = event.data
})
 
// For every position update, create a new Event that contains the color information
// of the last received color update as well
const colorXYZ = map(xyzQueryable, event => {
return new Event(event.time, {
x: event.data.x,
y: event.data.y,
z: event.data.z,
color: currentColorState,
})
})
 
// Interleave the two streams
return interleave<MixedEvent>([colorXYZ, colorSetter])
}

The colorMixer will interleave the sorted streams, making sure that both historical and future updates happen in the correct order.

In use with our rocket, the hardware state gets mapped to a color, and that gets passed to the color mixer operator.

ts
// @filename: page.tsx
import { colorMixer, XYZEvent, ColorEvent, MixedEvent } from './colorMixer'
import { DataTransformer, map } from '@electricui/dataflow'
import { MessageDataSource, Event, Query } from '@electricui/core-timeseries'
 
enum HARDWARE_STATES {
IDLE = 0,
FLYING = 1,
LANDING = 2,
LANDED = 3,
}
 
const hardwareState = new MessageDataSource<HARDWARE_STATES>('state')
const hardwarePosition = new MessageDataSource<XYZEvent>('xyz')
 
const dataTransformer = new DataTransformer((query: Query) => {
const stateToColor = map(hardwareState, event => {
switch (event.data) {
case HARDWARE_STATES.IDLE:
return new Event(event.time, 'grey')
case HARDWARE_STATES.FLYING:
return new Event(event.time, 'red')
case HARDWARE_STATES.LANDING:
return new Event(event.time, 'blue')
case HARDWARE_STATES.LANDED:
return new Event(event.time, 'green')
 
default:
return new Event(event.time, 'black')
}
})
 
const interleaved = colorMixer(stateToColor, hardwarePosition, 'black')
 
return interleaved
})

Running Average

This example demonstrates a simple running average. Every time a new value arrives, the average of the last 10 items (inclusive of the newest value) is calculated and emitted.

When querying historical data, it fetches up to 10 items before the query start to pre-fill the buffer. This prefetching allows for a defined level of accuracy when querying historical data.

Again it is created as a reusable operator, then a DataTransformer is later used to consume it and the MessageDataSource

Screenshot of component ChartTransformer rollingaverage
ts
// @filename: rollingAverage.ts
import {
prepare,
map,
interleave,
DataTransformer,
} from '@electricui/dataflow'
import {
FixedQueue,
MessageDataSource,
Queryable,
Query,
ORDERING,
PERSISTENCE,
Event
} from '@electricui/core-timeseries'
 
export function rollingAverage(
dataSource: Queryable<number>,
bufferSize: number,
) {
// Allocate our circular buffer
const buffer = new FixedQueue<number>(bufferSize)
 
// Prefetch up to `bufferSize` before the start of the query
const prefetch = prepare(dataSource, query => {
return new Query()
.limit(bufferSize) // Fetch up to `bufferSize` events
.end(query.getStart()) // before the start of the current query
.order(ORDERING.DESC) // ordered latest to earliest, so the _last_ events are fetched
.persist(PERSISTENCE.IMMEDIATE) // fetching only events that are immediately available
})
 
// The resultant batch will be ordered back-to-front, but the interleave operator
// will automatically re-order the incoming events intelligently.
// The prefetched events and the 'current' events will be combined into the same stream.
const allEvents = interleave([prefetch, dataSource])
 
// The actual averaging occurs here:
const averageEvents = map(allEvents, event => {
// Add the event to the queue
buffer.push(event.data)
 
// Calculate the average over the queue
let total = 0
buffer.forEach(data => {
total += data
})
const average = total / buffer.length
 
// Return the average event
return new Event(event.time, average)
})
 
// Return the averaged event stream
return averageEvents
}

In use:

ts
// @filename: page.ts
import { rollingAverage } from './rollingAverage'
import { DataTransformer } from '@electricui/dataflow'
import { MessageDataSource } from '@electricui/core-timeseries'
 
const sensorDataSource = new MessageDataSource<number>('adc')
 
const averagedSensorDataSource = new DataTransformer(() => {
// Average the last 10 events
return rollingAverage(sensorDataSource, 10)
})

Leaky Integrator

This example demonstrates a Leaky Integrator implementation. This style of filter is a commonly used first-order low-pass filter which has low computational overhead.

Screenshot of component ChartTransformer leakyintegrator

One benefit of this approach is the simple control over the filtering strength. α is the smoothing factor between 0 (no filtering = raw signal) and 1 (fully filtered = no output signal).

ts
// @filename: leakyIntegrator.ts
import {
prepare,
map,
interleave,
iterateEmit,
DataTransformer
} from '@electricui/dataflow'
import {
FixedQueue,
MessageDataSource,
Queryable,
Event,
} from '@electricui/core-timeseries'
 
// leakyIntegrator.ts
export function leakyIntegrator(
dataSource: Queryable<number>,
smoothingFactor: number,
) {
// Allocate the state for our last value
let lastValue: number | null = null
 
const filteredEvents = iterateEmit(
dataSource,
(event: Event, emit: (event: Event) => void) => {
// Ignore the first event
if (lastValue === null) {
lastValue = event.data
return
}
 
// For subsequent events, perform the operation
lastValue = (event.data - lastValue) / smoothingFactor + lastValue
 
// Emit the event
emit(new Event(event.time, lastValue))
},
)
 
// Return the event stream
return filteredEvents
}

In use:

ts
// @filename: page.ts
import { leakyIntegrator } from './leakyIntegrator'
import { DataTransformer } from '@electricui/dataflow'
import { MessageDataSource } from '@electricui/core-timeseries'
 
const sensorDataSource = new MessageDataSource<number>('adc')
 
const averagedSensorDataSource = new DataTransformer(() => {
return leakyIntegrator(sensorDataSource, 0.25)
})