Functional Operators

filter

The filter operator allows the filtering of events.

The example below only allows even numbered events through.

import { filter } from '@electricui/dataflow'
const dataSource = useMessageDataSource<number>('speed')
 
const dataTransformer = useDataTransformer(() => {
const evenEvents = filter(dataSource, (data, time, tags) => {
if (data % 2 === 0) {
return true
}
return false
})
 
return evenEvents
})

The filter operator supports Collections. Note that Collection event updates may not be in monotonically increasing time order.

import { filter, head } from '@electricui/dataflow'
const dataSource = useMessageDataSource<{x: number, y: number, value: number}>('xysampled')
 
const dataTransformer = useDataTransformer(() => {
const evenTop10Events = filter(
// Form a collection of the top 10 events when sorted by the value property
head(dataSource, { N: 10, orderBy: (data, time, tags) => data.value }),
// Filter the odd ones out
(data, time, tags) => {
if (data.value % 2 === 0) {
return true
}
return false
}
)
 
return evenTop10Events
})

map

The map operator allows the transformation of events in a 1:1 fashion.

The example below converts temperature measurements from Kelvin to Celsius.

import { map } from '@electricui/dataflow'
const dataSource = useMessageDataSource<number>('temperature')
 
const dataTransformer = useDataTransformer(() => {
const celsiusEvents = map(dataSource, (data, time, tags) => {
return data + 273.15
})
 
return celsiusEvents
})

The map operator supports Collections. Note that Collection event updates may not be in monotonically increasing time order.

import { map, head } from '@electricui/dataflow'
const dataSource = useMessageDataSource<{x: number, y: number, value: number}>('xysampled')
 
const dataTransformer = useDataTransformer(() => {
const topEventXValues = map(
// Form a collection of the top 10 events when sorted by the value property
head(dataSource, { N: 10, orderBy: (data, time, tags) => data.value }),
// Pick out the x value only
(data, time, tags) => data.x
)
 
return topEventXValues
})

tag

The tag operator allows the tagging of events in a 1:1 fashion.

It is often useful when interleaving multiple streams of data. It replaces the tags with the returned object by default, use object spreading to merge the objects instead.

import { map } from '@electricui/dataflow'
const dataSourceA = useMessageDataSource<number>('temperatureA')
const dataSourceB = useMessageDataSource<number>('temperatureB')
 
const dataTransformer = useDataTransformer(() => {
const interleaved = interleave([
// Spread previous tags in with ...tags, then add the new 'stream' tag before interleaving.
tag(dataSourceA, (data, time, tags) => ({...tags, stream: 'A'})),
tag(dataSourceB, (data, time, tags) => ({...tags, stream: 'B'}))
])
 
return interleaved
})

The tag operator also supports tagging events within Collections.

forEach

The forEach operator executes a function for every Event received, by default consuming the Event.

The following example consumes both a temperature measurement and a progress measurement, combining them so the progress updates also contain whether the temperature is within range.

import { forEach, map, interleave } from '@electricui/dataflow'
const tempDataSource = useMessageDataSource<number>('temperature')
const progressDataSource = useMessageDataSource<number>('progress')
 
const dataTransformer = useDataTransformer(() => {
let lastTemperatureStatus = 'unknown'
 
const tempStatus = forEach(
tempDataSource,
(data, time, tags) => {
if (data >= 28) {
lastTemperatureStatus = 'too hot'
} else if (data < 20) {
lastTemperatureStatus = 'too cold'
} else {
lastTemperatureStatus = 'just right'
}
},
true, // don't emit temperature events down the line
)
 
const statusEvents = map(progressDataSource, (data, time, tags) => {
return {
temp: lastTemperatureStatus,
progress: data,
}
})
 
// Since tempStatus doesn't emit any events, while both branches will be executed, only `statusEvents` will emit events.
return interleave([tempStatus, statusEvents])
})

prepare

The prepare operator allows the transformation of queries.

The example below extends the query range by 100 ms into the past.

import { prepare } from '@electricui/dataflow'
const dataSource = useMessageDataSource<number>('temperature')
 
const dataTransformer = useDataTransformer(() => {
const moreData = prepare(dataSource, queryToMutate => {
return queryToMutate.start(queryToMutate.getStart() - 100)
})
 
return moreData
})

interleave

The interleave operator takes multiple streams of inputs and returns a 'sorted' by time stream that contains events from each of the inputs.

The example below naively fans in two temperature sources into the same stream.

import { interleave } from '@electricui/dataflow'
const dataSourceA = useMessageDataSource<number>('temperatureA')
const dataSourceB = useMessageDataSource<number>('temperatureB')
 
const dataTransformer = useDataTransformer(() => {
const interleaved = interleave([dataSourceA, dataSourceB])
 
return interleaved
})

The interleave operator can also combine Collection Queryables, forming a new Collection containing each of the sets.

import { interleave, head, tail, tag } from '@electricui/dataflow'
const dataSource = useMessageDataSource<{x: number, y: number, value: number}>('xysampled')
 
const dataTransformer = useDataTransformer(() => {
// Tag the data as from the head or tail
const taggedHead = tag(dataSource, (data, time, tags) => ({ ...tags, head: true }))
const taggedTail = tag(dataSource, (data, time, tags) => ({ ...tags, tail: true }))
// Combine both the top and bottom 10 when sorted by the `value` property.
const interleaved = interleave([
head(taggedHead, { N: 10, orderBy: (data, time, tags) => data.value }),
tail(taggedTail, { N: 10, orderBy: (data, time, tags) => data.value }),
])
 
return interleaved
})

advance

The advance operator is given the time once per timestep, and can emit an additional event just before the frontier closes over that time.

import { advance } from '@electricui/dataflow'
const dataTransformer = useDataTransformer(() => {
const advanced = advance(dataSource, time => {
console.log(`${time} has passed`)
 
// don't emit any new events
return undefined
})
 
return advanced
})

coalesce

The coalesce operator takes an object as an argument with a flat keyed object of Queryables, and combines events so that the latest is in each key position.

By default, it waits until all upstream Queryables have emitted at least one event, then will emit an update per upstream event. If the timestamps of upstream events are exactly the same, only a single update will be emitted per timestamp.

import { coalesce } from '@electricui/dataflow'
const dataSourceA = useMessageDataSource<number>('temperature')
const dataSourceB = useMessageDataSource<string>('name')
const dataSourceC = useMessageDataSource<boolean>('health')
 
const dataTransformer = useDataTransformer(() => {
const combinedDS = coalesce({
temp: dataSourceA,
name: dataSourceB,
ok: dataSourceC,
})
 
return combinedDS
})

When merging Queryables that emit data at the same rate, enable the synchronize option. The coalesce operator will wait until each 'slot' has received a new event before emitting an update.

import { coalesce } from '@electricui/dataflow'
const dsA = useMessageDataSource<number>('distance')
const dsB = useMessageDataSource<number>('angle')
 
const dataTransformer = useDataTransformer(() => {
const combinedDS = coalesce({ distance: dsA, angle: dsB }, { synchronize: true })
 
return combinedDS
})

To begin emission at the first event, without waiting for all 'slots' to have data, disable the synchronizeInitial option. Using the defaultValue option, initial values can be set.

import { coalesce } from '@electricui/dataflow'
const positionDs = useMessageDataSource<{x: number, y: number, z: number}>('pos')
const dsB = useMessageDataSource<number>('state')
 
const dataTransformer = useDataTransformer(() => {
const combinedDS = coalesce({ distance: positionDs, angle: dsB }, { synchronizeInitial: false, defaultValue: null })
 
return combinedDS
})

and

Accepts an array of Queryables and returns a new Queryable representing the boolean AND operation.

The default value of a Queryable is true unless stated otherwise.

We find boolean outputs useful when working with Chart continuity, visibility and blanking props.

import { and } from '@electricui/dataflow'
const dsA = useMessageDataSource<boolean>('sensor_cal')
const dsB = useMessageDataSource<boolean>('healthy')
 
const visibilityDS = useDataTransformer(() => {
const bothOK = and([dsA, dsB])
return bothOK
})

or

Accepts an array of Queryables and returns a new Queryable representing the boolean OR operation.

The default value of a Queryable is false unless stated otherwise.

import { or } from '@electricui/dataflow'
const dsA = useMessageDataSource<boolean>('sens_first')
const dsB = useMessageDataSource<boolean>('sens_second')
const dsC = useMessageDataSource<boolean>('sens_backup')
 
const showEstimateDS = useDataTransformer(() => {
const anyOK = or([dsA, dsB, dsC])
return anyOK
})

iterateEmit

The iterateEmit operator executes a function for every Event received, passing an emit function as a parameter that can be used to emit Events that will continue through the stream.

The following example consumes the first event of a stream, emitting all subsequent events.

import { iterateEmit } from '@electricui/dataflow'
const dataSource = useMessageDataSource<number>('temperature')
 
const dataTransformer = useDataTransformer(() => {
let seenFirst = false
 
const notFirst = iterateEmit(
dataSource,
(time, data, tags, emit, schedule, shouldEmit) => {
if (!seenFirst) {
seenFirst = true
return
} //
 
emit(time, data, tags)
},
)
 
return notFirst
})

If you are using this operator in your project, please let us know by sending us an email with your use case. This operator will likely be deprecated in a future release as it exposes internal APIs that are subject to change, and its use cases have been absorbed by other operators.

peek

peek allows queries that do not affect the overall SharedFrontier.

import { peek } from '@electricui/dataflow'
peek(queryable, (data, time, tags) => {
// update state machine with information from returned events
})

If you are using this operator in your project, please let us know by sending us an email with your use case. This operator will likely be deprecated in a future release and replaced with a new multipass operator to handle similar use cases.