Functional Operators
filter
The filter
operator allows the filtering of events.
The example below only allows even numbered events through.
import { filter } from '@electricui/dataflow'
const dataSource = useMessageDataSource <number>('speed') const dataTransformer = useDataTransformer (() => { const evenEvents = filter (dataSource , (data , time , tags ) => { if (data % 2 === 0) { return true } return false }) return evenEvents })
The filter
operator supports Collections. Note that Collection event updates may not be in monotonically increasing time order.
import { filter , head } from '@electricui/dataflow'
const dataSource = useMessageDataSource <{x : number, y : number, value : number}>('xysampled') const dataTransformer = useDataTransformer (() => { const evenTop10Events = filter ( // Form a collection of the top 10 events when sorted by the value property head (dataSource , { N : 10, orderBy : (data , time , tags ) => data .value }), // Filter the odd ones out (data , time , tags ) => { if (data .value % 2 === 0) { return true } return false } ) return evenTop10Events })
map
The map
operator allows the transformation of events in a 1:1 fashion.
The example below converts temperature measurements from Kelvin to Celsius.
import { map } from '@electricui/dataflow'
const dataSource = useMessageDataSource <number>('temperature') const dataTransformer = useDataTransformer (() => { const celsiusEvents = map (dataSource , (data , time , tags ) => { return data + 273.15 }) return celsiusEvents })
The map
operator supports Collections. Note that Collection event updates may not be in monotonically increasing time order.
import { map , head } from '@electricui/dataflow'
const dataSource = useMessageDataSource <{x : number, y : number, value : number}>('xysampled') const dataTransformer = useDataTransformer (() => { const topEventXValues = map ( // Form a collection of the top 10 events when sorted by the value property head (dataSource , { N : 10, orderBy : (data , time , tags ) => data .value }), // Pick out the x value only (data , time , tags ) => data .x ) return topEventXValues })
tag
The tag
operator allows the tagging of events in a 1:1 fashion.
It is often useful when interleaving multiple streams of data. It replaces the tags with the returned object by default, use object spreading to merge the objects instead.
import { map } from '@electricui/dataflow'
const dataSourceA = useMessageDataSource <number>('temperatureA')const dataSourceB = useMessageDataSource <number>('temperatureB') const dataTransformer = useDataTransformer (() => { const interleaved = interleave ([ // Spread previous tags in with ...tags, then add the new 'stream' tag before interleaving. tag (dataSourceA , (data , time , tags ) => ({...tags , stream : 'A'})), tag (dataSourceB , (data , time , tags ) => ({...tags , stream : 'B'})) ]) return interleaved })
The tag
operator also supports tagging events within Collections.
forEach
The forEach
operator executes a function for every Event
received, by default consuming the Event
.
The following example consumes both a temperature measurement and a progress measurement, combining them so the progress updates also contain whether the temperature is within range.
import { forEach , map , interleave } from '@electricui/dataflow'
const tempDataSource = useMessageDataSource <number>('temperature')const progressDataSource = useMessageDataSource <number>('progress') const dataTransformer = useDataTransformer (() => { let lastTemperatureStatus = 'unknown' const tempStatus = forEach ( tempDataSource , (data , time , tags ) => { if (data >= 28) { lastTemperatureStatus = 'too hot' } else if (data < 20) { lastTemperatureStatus = 'too cold' } else { lastTemperatureStatus = 'just right' } }, true, // don't emit temperature events down the line ) const statusEvents = map (progressDataSource , (data , time , tags ) => { return { temp : lastTemperatureStatus , progress : data , } }) // Since tempStatus doesn't emit any events, while both branches will be executed, only `statusEvents` will emit events. return interleave ([tempStatus , statusEvents ])})
prepare
The prepare
operator allows the transformation of queries.
The example below extends the query range by 100 ms into the past.
import { prepare } from '@electricui/dataflow'
const dataSource = useMessageDataSource <number>('temperature') const dataTransformer = useDataTransformer (() => { const moreData = prepare (dataSource , queryToMutate => { return queryToMutate .start (queryToMutate .getStart () - 100) }) return moreData })
interleave
The interleave
operator takes multiple streams of inputs and returns a 'sorted' by time stream that contains events from each of the inputs.
The example below naively fans in two temperature sources into the same stream.
import { interleave } from '@electricui/dataflow'
const dataSourceA = useMessageDataSource <number>('temperatureA')const dataSourceB = useMessageDataSource <number>('temperatureB') const dataTransformer = useDataTransformer (() => { const interleaved = interleave ([dataSourceA , dataSourceB ]) return interleaved })
The interleave
operator can also combine Collection Queryables, forming a new Collection containing each of the sets.
import { interleave , head , tail , tag } from '@electricui/dataflow'
const dataSource = useMessageDataSource <{x : number, y : number, value : number}>('xysampled') const dataTransformer = useDataTransformer (() => { // Tag the data as from the head or tail const taggedHead = tag (dataSource , (data , time , tags ) => ({ ...tags , head : true })) const taggedTail = tag (dataSource , (data , time , tags ) => ({ ...tags , tail : true })) // Combine both the top and bottom 10 when sorted by the `value` property. const interleaved = interleave ([ head (taggedHead , { N : 10, orderBy : (data , time , tags ) => data .value }), tail (taggedTail , { N : 10, orderBy : (data , time , tags ) => data .value }), ]) return interleaved })
advance
The advance
operator is given the time once per timestep, and can emit an additional event just before the frontier closes over that time.
import { advance } from '@electricui/dataflow'
const dataTransformer = useDataTransformer (() => { const advanced = advance (dataSource , time => { console .log (`${time } has passed`) // don't emit any new events return undefined }) return advanced })
coalesce
The coalesce
operator takes an object as an argument with a flat keyed object of Queryable
s, and combines events so that the latest is in each key position.
By default, it waits until all upstream Queryables have emitted at least one event, then will emit an update per upstream event. If the timestamps of upstream events are exactly the same, only a single update will be emitted per timestamp.
import { coalesce } from '@electricui/dataflow'
const dataSourceA = useMessageDataSource <number>('temperature')const dataSourceB = useMessageDataSource <string>('name')const dataSourceC = useMessageDataSource <boolean>('health') const dataTransformer = useDataTransformer (() => { const combinedDS = coalesce ({ temp : dataSourceA , name : dataSourceB , ok : dataSourceC , }) return combinedDS })
When merging Queryables that emit data at the same rate, enable the synchronize
option. The coalesce
operator will wait until each 'slot' has received a new event before emitting an update.
import { coalesce } from '@electricui/dataflow'
const dsA = useMessageDataSource <number>('distance')const dsB = useMessageDataSource <number>('angle') const dataTransformer = useDataTransformer (() => { const combinedDS = coalesce ({ distance : dsA , angle : dsB }, { synchronize : true }) return combinedDS })
To begin emission at the first event, without waiting for all 'slots' to have data, disable the synchronizeInitial
option. Using the defaultValue
option, initial values can be set.
import { coalesce } from '@electricui/dataflow'
const positionDs = useMessageDataSource <{x : number, y : number, z : number}>('pos')const dsB = useMessageDataSource <number>('state') const dataTransformer = useDataTransformer (() => { const combinedDS = coalesce ({ distance : positionDs , angle : dsB }, { synchronizeInitial : false, defaultValue : null }) return combinedDS })
and
Accepts an array of Queryables and returns a new Queryable representing the boolean AND
operation.
The default value of a Queryable is true unless stated otherwise.
We find boolean outputs useful when working with Chart continuity, visibility and blanking props.
import { and } from '@electricui/dataflow'
const dsA = useMessageDataSource <boolean>('sensor_cal')const dsB = useMessageDataSource <boolean>('healthy') const visibilityDS = useDataTransformer (() => { const bothOK = and ([dsA , dsB ]) return bothOK })
or
Accepts an array of Queryables and returns a new Queryable representing the boolean OR
operation.
The default value of a Queryable is false unless stated otherwise.
import { or } from '@electricui/dataflow'
const dsA = useMessageDataSource <boolean>('sens_first')const dsB = useMessageDataSource <boolean>('sens_second')const dsC = useMessageDataSource <boolean>('sens_backup') const showEstimateDS = useDataTransformer (() => { const anyOK = or ([dsA , dsB , dsC ]) return anyOK })
iterateEmit
The iterateEmit
operator executes a function for every Event
received, passing an emit
function as a parameter that can be used to emit Event
s that will continue through the stream.
The following example consumes the first event of a stream, emitting all subsequent events.
import { iterateEmit } from '@electricui/dataflow'
const dataSource = useMessageDataSource <number>('temperature') const dataTransformer = useDataTransformer (() => { let seenFirst = false const notFirst = iterateEmit ( dataSource , (time , data , tags , emit , schedule , shouldEmit ) => { if (!seenFirst ) { seenFirst = true return } // emit (time , data , tags ) }, ) return notFirst })
If you are using this operator in your project, please let us know by sending us an email with your use case. This operator will likely be deprecated in a future release as it exposes internal APIs that are subject to change, and its use cases have been absorbed by other operators.
peek
peek
allows queries that do not affect the overall SharedFrontier.
import { peek } from '@electricui/dataflow'
peek(queryable, (data, time, tags) => { // update state machine with information from returned events})
If you are using this operator in your project, please let us know by sending us an email with your use case. This operator will likely be deprecated in a future release and replaced with a new
multipass
operator to handle similar use cases.