Datasources

The @electricui/core-timeseries library provides powerful data manipulation functionality used to format inbound data for logging, buffer historical data for graphs, and build maths helpers for UI-side calculations.

This guide describes how to use them, how to calculate a windowed average, apply a single-pole lowpass filter, cache min/max values, and implement a conditional trigger.

What is a DataSource?

DataSources provide a unified interface for handling timeseries events used in charting and logging systems. They are not limited to streams of data from hardware, though that is their most common use case.

DataSources are stateless and do not provide their own persistent storage.

The most common DataSource is a MessageDataSource driven by a message identifier. It produces events when specified messages arrive.

import { MessageDataSource } from '@electricui/core-timeseries'
const speedSource = new MessageDataSource('speed')

LineChart, EventCSVLogger and PolledCSVLogger components accept datasources for time-series variable streams.

MessageDataSource provides a second argument which lets a developer manually process the incoming Message into an Event for the timeseries system.

The default processor behaves like this (this example is simplified with to just a xyz style type):

import { MessageDataSource } from '@electricui/core-timeseries'
import {
Event,
Time,
} from '@electricui/timeseries'
  1. const speedSource = new MessageDataSource('speed', (message, emit) => {
  2. // Build the event at the current time
  3. const event = new Event(message.metadata.timestamp, {
  4. x: message.payload.x,
  5. y: message.payload.y,
  6. z: message.payload.z,
  7. })
  8. // Emit the event
  9. emit(event)
  10. })

Retiming Events

By default, Events are timestamped by the transport manager.

In situations where a large amount of data is streaming from hardware, it is possible that dropped packets, a busy transport, or heavily loaded transport manager may not process data immediately as it arrives.

If this occurs, charts may show visual errors due to out-of-order packets or erraneous delays. The recommended approach in these circumstances is to provide hardware timing information in the packet.

import { MessageDataSource } from '@electricui/core-timeseries'
import { Event } from '@electricui/timeseries'
  1. interface SpeedPayload {
  2. received: number // timestamp in ms since epoch
  3. sensor: number // adc readings
  4. }
  5. const retimedSpeedSource = new MessageDataSource('speed', (message, emit) => {
  6. // Custom time from the message
  7. new Event( message.payload.received, message.payload.sensor })
  8. emit(event)
  9. })

It is also significantly more efficient to batch multiple events in the same packet. By emitting multiple events for an inbound message, the downstream consumers can't tell a difference from a standard DataSource.

import { MessageDataSource } from '@electricui/core-timeseries'
import { Event } from '@electricui/timeseries'
  1. interface PackedPayload {
  2. start: number // the unix time in milliseconds of the first event
  3. delta: number // the delta time between each subsequent event
  4. values: number[] // the adc values of each event
  5. }
  6. const bulkSpeedSource = new MessageDataSource('speed', (message, emit) => {
  7. // Each packet has 512 sensor readings
  8. for (let index = 0; index < 512; index++) {
  9. // Create the timestamp for this value, and then push the retimed event
  10. const time = start + delta * index
  11. const event = new Event(time, message.payload.values[index])
  12. emit(event)
  13. }
  14. })

Getting the current timestamp

In certain situations, knowing the timestamp for 'now' in the datasource or transformer can be useful, rather than the included packet timestamp.

In this situation, we suggest that you use our timing export, as we handle differences such as cross-platform precision differences, starting epochs etc.

import { timing } from '@electricui/timing'
const timestamp = timing.now()

Rolling Storage

DataSources don't provide persistence by default.

If a Chart component is mounted on one page, when the user navigates away from the page and comes back, the chart will fill events from a blank state.

To persist the state of the chart, a RollingStorageRequest can be mounted which will store events at a higher level context. Upon re-mount, the Chart will request this data.

import { RollingStorageRequest } from '@electricui/core-timeseries'
<RollingStorageRequest dataSource={tADataSource} maxItems={20000} persist />

If the persist prop is provided, the RollingStorageRequest won't clear the cache contents on unmount, causing an intentional memory leak.

This memory allocation occurs when the RollingStorageRequest component is first rendered for a given device.

Note

As the RollingStorageRequest allocates storage for the message based on the maxItems value per-device, applications which handle frequently changing devices or large meshes should be aware of the memory impact of deep storage pools.

Allocated space = eventSize * maxItems * number of unique devices that have connected

Data Transformers

For performing mathematical transforms on inbound data, as typically needed when plotting signals, we use a DataTransformer to accept a DataSource for manipulation.

Data Transformers take one or more DataSources as inputs, an initial state creator function, and the function ran when events flow through the input DataSources.

Data Transformers automatically create their inital state per device when subscribed to within an Electric UI context.

This allows you to create running averages of inbound data, or for more complex filtering behaviours.

Running Average

The snip below demonstrates the creation of a running average. Every time a new value arrives, the average on a 10 item buffer is calculated and emitted.

This example also demonstrates retiming. The averages are emitted 'later' than the current input data, allowing the average calculation to 'look forward' at upcoming values, reducing the percieved phase delay of the average curve.

Noisy signal with windowed average line

import {
DataSource,
DataTransformer,
FixedQueue,
Event,
} from '@electricui/core-timeseries'
const sensorDataSource = new MessageDataSource('adc')
  1. const rollingAverage = new DataTransformer<
  2. number,
  3. { queue: FixedQueue<Event<number>> }
  4. >(
  5. sensorDataSource,
  6. () => ({
  7. queue: new FixedQueue<Event<number>>(10),
  8. }),
  9. (event, dataSource, state, emit) => {
  10. // Add the event to the queue
  11. state.queue.push(event)
  12. // Calculate the average over the queue
  13. let total = 0
  14. state.queue.forEach(item => {
  15. total += item.data
  16. })
  17. const average = total / state.queue.length
  18. // Emit the average event at the middle time of the queue
  19. const averageTime =
  20. (state.queue.first()!.time + state.queue.last()!.time) / 2
  21. emit(new Event(averageTime, average))
  22. },
  23. )

Leaky Integrator

This example demonstrates a Leaky Integrator implementation. This style of filter is a commonly used first-order low-pass filter which has low computational overhead.

One benefit of this approach is the simple control over the filtering strength. α is the smoothing factor between 0 (no filtering = raw signal) and 1 (fully filtered = no output signal).

import {
DataSource,
DataTransformer,
Event,
} from '@electricui/core-timeseries'
const sensorDataSource = new MessageDataSource('adc')
  1. const leakyIntegrator = new DataTransformer<
  2. number,
  3. { lastValue: number | null }
  4. >(
  5. sensorDataSource,
  6. () => ({
  7. lastValue: null,
  8. }),
  9. (event, dataSource, state, emit) => {
  10. const thisValue = event.data
  11. // First run, just set the last value and exit
  12. if (state.lastValue === null) {
  13. state.lastValue = thisValue
  14. return
  15. }
  16. const smoothing_factor = 2
  17. state.lastValue =
  18. ((thisValue - state.lastValue) >> smoothing_factor) + state.lastValue
  19. emit(new Event(event.time, state.lastValue))
  20. },
  21. )

This example uses bit-shifting to match a common embedded implementation, where a smoothing factor of 2 represents α=0.25

Replacing the shift with a simple division allows for finer grained control