Data Transformers

This guide describes how to manipulate the fields in a DataSource, how to calculate a windowed average, apply a single-pole lowpass filter, cache min/max values, and implement a conditional trigger.

If you don't know what a DataSource is, read this guide first:

DataTransformers and DataFlow

For performing transformations on inbound data, as typically needed when plotting signals, we use a streaming data computation model.

By expressing the transformation as a computation on streams, the transformation can be run across both historical, static data, and future incoming data.

If data in the dependency tree is invalidated by changing historical data, the query will automatically re-calculate.

A DataTransformer provides a queryable form of a DataFlow, a program built with a tree of operators that modify the query on the way to the DataSource, and transform the Event returned on the way back.

The DataFlow executor can interleave multiple streams of data by their timestamp, perform stateful computation, and iteratively update the query as new data is fed in.

Anything from simple running averages, to complex filtering behaviour is possible with this computation system. Composed DataFlows can maintain internal state by capturing variables in their closure.

Simple Usage

A common use-case is running some simple maths on each value, we can do that with the map operator.

We'll also handle values which aren't realistic by chaining a filter operation.

import { useMessageDataSource } from '@electricui/core-timeseries'
import { useDataTransformer } from '@electricui/timeseries-react'
import { map, filter } from '@electricui/dataflow'
 
const ChartPage = () => {
const sensorDS = useMessageDataSource('tempK')
 
const transformedDS = useDataTransformer(() => {
const celsiusEvents = map(sensorDS, (data, time) => {
return data + 273.15
})
const validEvents = filter(celsiusEvents, (data, time) => {
if (data >= -40 && data <= 500 ) {
return true
}
return false
})
 
return validEvents
})
 
return (
<ChartContainer>
<LineChart dataSource={transformedDS} />
<RealTimeDomain window={10000} />
</ChartContainer>
)
}

It is important to wrap operators in a DataTransformer instead of querying them directly, as that can result in stale data across queries and hard to diagnose bugs.

For the full set of operations available, the API docs explain each option with code snippets and examples.

Custom Transformations

In this example, a DataSource provides XYZ position information over time, and emits global state change information in the form of a color.

The colorMixer DataFlow combines XYZ positions with the latest state color.

import { ChartContainer, LineChart } from '@electricui/components-desktop-charts'
import { useMessageDataSource, Query, Queryable } from '@electricui/core-timeseries'
import { useDataTransformer } from '@electricui/timeseries-react'
import { forEach, map, interleave } from '@electricui/dataflow'
 
type XYZEvent = {
x: number
y: number
z: number
}
type ColorEvent = {
color: string
}
type MixedEvent = XYZEvent & ColorEvent
 
function colorMixer(
colorQueryable: Queryable<ColorEvent>,
xyzQueryable: Queryable<XYZEvent>,
) {
let currentColorState = 'blue'
 
const colorSetter = forEach(colorQueryable, (data, time) => {
currentColorState = data.color
})
 
const colorXYZ = map(xyzQueryable, (data, time) => ({
x: data.x,
y: data.y,
z: data.z,
color: currentColorState,
}))
 
return interleave([colorXYZ, colorSetter])
}
 
export const ColorMixingChart = () => {
const stateColDataSource = useMessageDataSource<ColorEvent>('state_col')
const posDataSource = useMessageDataSource<XYZEvent>('pos')
 
const dataTransformer = useDataTransformer(() => {
return colorMixer(stateColDataSource, posDataSource)
})
 
return (
<ChartContainer>
<LineChart
dataSource={dataTransformer}
accessor={data => data.z}
lineWidth={4}
colorAccessor={(data, time) => data.color}
/>
</ChartContainer>
)
}

Events are passed through the DataFlow system in temporal order. If any incoming event invalidates the sorting guarantees, the query is re-run to correct for the missing data.

┌─────────┐
│ │ ┌─────┐ ┌───┐
│ Color ├───►Green├────────────────────────►Red│
│ │ └──┬──┘ └─┬─┘
└─────────┘ │ │
│ │
│ │
┌──▼──┐ ┌─────┐ ┌─────┐ │ ┌─────┐
│Green│ │Green│ │Green│ │ │ Red │
│ 1 ├─────► 2 ├─────► 3 ├──┴──► 4 │
└──▲──┘ └──▲──┘ └──▲──┘ └──▲──┘
│ │ │ │
│ │ │ │
┌─────────┐ │ │ │ │
│ │ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐
│ XYZ ├─────►1├─────────►2├─────────►3├─────────►4│
│ │ └─┘ └─┘ └─┘ └─┘
└─────────┘

The DataTransformer creates a closure per query so that state can be maintained during a query's lifetime.

More complex examples

Color Mixer

Imagine a rocket that has its own internal state machine, and a set of XYZ coordinates. The goal is to graph the XYZ coordinates as it flies, with the line's color coded to the current state of the rocket.

Screenshot of component RocketUI colormix

Instead of a bare DataTransformer, this example will create a reusable DataFlow operator combining other operators, and that will be passed to a DataTransformer.

Running Average

This example demonstrates a simple running average. Every time a new value arrives, the average of the last 10 items (inclusive of the newest value) is calculated and emitted.

When querying historical data, it fetches up to 10 items before the query start to pre-fill the buffer. This prefetching allows for a defined level of accuracy when querying historical data.

Again it is created as a reusable operator, then a DataTransformer is later used to consume it and the MessageDataSource

Screenshot of component ChartTransformer rollingaverage
// @filename: rollingAverage.ts
import {
prepare,
map,
interleave,
} from '@electricui/dataflow'
import {
FixedQueue,
Queryable,
Query,
ORDERING,
PERSISTENCE,
Event
} from '@electricui/core-timeseries'
 
export function rollingAverage(
dataSource: Queryable<number>,
bufferSize: number,
) {
// Allocate our circular buffer
const buffer = new FixedQueue<number>(bufferSize)
 
// Prefetch up to `bufferSize` before the start of the query
const prefetch = prepare(dataSource, query => {
return query.clone()
.limit(bufferSize) // Fetch up to `bufferSize` events
.end(query.getStart()) // before the start of the current query
.order(ORDERING.DESC) // ordered latest to earliest, so the _last_ events are fetched
.persist(PERSISTENCE.IMMEDIATE) // fetching only events that are immediately available
})
 
// The resultant batch will be ordered back-to-front, but the interleave operator
// will automatically re-order the incoming events intelligently.
// The prefetched events and the 'current' events will be combined into the same stream.
const allEvents = interleave([prefetch, dataSource])
 
// The actual averaging occurs here:
const averageEvents = map(allEvents, (data, time) => {
// Add the event to the queue
buffer.push(data)
 
// Calculate the average over the queue
let total = 0
buffer.forEach(val => {
total += val
})
const average = total / buffer.length
 
// Return the average event
return average
})
 
// Return the averaged event stream
return averageEvents
}

In use:

const OverviewPage = () => {
const sensorDataSource = useMessageDataSource('adc')
 
const averagedSensorDataSource = useDataTransformer(() => {
// Average the last 10 events
return rollingAverage(sensorDataSource, 10)
})
 
return (
<ChartContainer>
<LineChart dataSource={averagedSensorDataSource} />
<RealTimeDomain window={10000} />
</ChartContainer>
)
}

Leaky Integrator

This example demonstrates a Leaky Integrator implementation. This style of filter is a commonly used first-order low-pass filter which has low computational overhead.

Screenshot of component ChartTransformer leakyintegrator

One benefit of this approach is the simple control over the filtering strength. α is the smoothing coefficient between 0 (no filtering = raw signal) and 1 (fully filtered = no output signal).

Twoslash failure

Errors were thrown in the sample, but not included in an errors tag

These errors were not marked as being expected: 2322 2362.
Expected: // @errors: 2322 2362

Compiler Errors:

leakyIntegrator.ts
[2322] 14 : Type 'QueryableInnerOutput>' is not assignable to type 'number | null'.
[2362] 22 : The left-hand side of an arithmetic operation must be of type 'any', 'number', 'bigint' or an enum type.
[2322] 28 : Type 'Queryable, number, {}>, MappedOutputTags, {}>, {}>' is not assignable to type 'Queryable'.

Raising Code:

## Code

'''ts
0 // @filename: leakyIntegrator.ts
1 import { filtermap } from '@electricui/dataflow'
2 import { Queryable } from '@electricui/core-timeseries'
3 
4 export function leakyIntegrator<T extends number>(
5   queryable: Queryable<T>,
6   smoothingFactor: number,
7 ): Queryable<number> {
8   let lastValue: number | null = null
9 
10   const processed = filtermap(
11     queryable,
12     (data, time) => {
13       // First run, just set the last data and exit
14       if (lastValue === null) {
15         lastValue = data
16         return false
17       }
18 
19       return true
20     },
21     (data, time) => {
22       const thisValue = data
23 
24       const newData = (thisValue - lastValue!) / smoothingFactor + lastValue!
25 
26       lastValue = newData
27 
28       return newData
29     },
30   )
31 
32   return processed
33 }
'''

To use it, import the custom transformer and use it like normal:

Twoslash failure

Errors were thrown in the sample, but not included in an errors tag

These errors were not marked as being expected: 2322 2362.
Expected: // @errors: 2322 2362

Compiler Errors:

index.tsx
[2322] 18 : Type 'QueryableInnerOutput>' is not assignable to type 'number | null'.
[2362] 27 : The left-hand side of an arithmetic operation must be of type 'any', 'number', 'bigint' or an enum type.
[2322] 35 : Type 'Queryable, number, {}>, MappedOutputTags, {}>, {}>' is not assignable to type 'Queryable'.

Raising Code:

## Code

'''tsx
0 import React from 'react'
1 // Re-implement ts file above but inlined to resolve 
2 // Twoslash import errors due to mixing ts and tsx
3 
4 import { filtermap } from '@electricui/dataflow'
5 import { Queryable } from '@electricui/core-timeseries'
6 
7 function leakyIntegrator<T extends number>(
8   queryable: Queryable<T>,
9   smoothingFactor: number,
10 ): Queryable<number> {
11   let lastValue: number | null = null
12 
13   const processed = filtermap(
14     queryable,
15     (data, time) => {
16       // First run, just set the last data and exit
17       if (lastValue === null) {
18         lastValue = data
19         return false
20       }
21 
22       return true
23     },
24     (data, time) => {
25       const thisValue = data
26 
27       const newData = (thisValue - lastValue!) / smoothingFactor + lastValue!
28 
29       lastValue = newData
30 
31       return newData
32     },
33   )
34 
35   return processed
36 }
37 
38 // The usage example
39 import { useDataTransformer } from '@electricui/timeseries-react'
40 import { useMessageDataSource } from '@electricui/core-timeseries'
41 import { 
42   ChartContainer,
43   LineChart,
44   RealTimeDomain,
45 } from '@electricui/components-desktop-charts'
46 
47 declare global {
48   interface ElectricUIDeveloperState {
49     adc: number
50   }
51 }
52 // ---cut---
53 const FilteredChart = () => {
54   const sensorDataSource = useMessageDataSource('adc')
55 
56   const filteredDataSource = useDataTransformer(() => {
57     return leakyIntegrator(sensorDataSource, 0.25)
58   })
59 
60   return (
61     <ChartContainer>
62       <LineChart dataSource={filteredDataSource} />
63       <RealTimeDomain window={10000} />
64     </ChartContainer>
65   )
66 }
'''

The above example demonstrates the process of implementing the transformer.

A pre-made leakyIntegrator is available in @electricui/dataflow, though the strength factor argument describes the amount of filtering as a factor of the input - i.e strength of 2 is α = 0.5

const leakyIntegratorDs = leakyIntegrator(sensorDataSource, 3)