Bulk event transfer
Latency and throughput form a tradeoff when data is sent across a bandwidth constrained transport. Sending one event per packet can achieve the lowest possible latency at the cost of disproportionate overheads. As events become more frequent, and bandwidth becomes more precious, it is desirable to reduce this overhead.
One way to achieve this is to pack many events worth of data into a single packet. This comes at the cost of increased latency. If a packet contains 10 events, the hardware is waiting 10x longer before it can send a packet.
The Electric UI binary protocol has 7
bytes of overhead with large packet offsets disabled. The messageID is another fixed cost, let's say it's 2
bytes.
With a 4
byte payload, the packet totals 13
bytes. Sending 512
events costs 6,656
bytes.
By batching 512
events into a single packet, the fixed costs are paid only once. The new payload is 4 * 512 = 2048
bytes, totalling 2,057
bytes for the whole packet.
Sending events in batches of 512
is over 3x more bandwidth efficient than sending a packet per event.
This is of course at the cost of 512x the latency. If events are collected once per second, that would represent 8 minutes of extra latency. In a real-time system this would be unacceptable. If the hardware is trickling data into an aggregation system however, that latency might not matter at all.
If the events are collected every millisecond instead, waiting half a second might be too long.
This tradeoff must be considered per project, there is no one size fits all answer.
For some rough numbers:
- a human will likely begin to context switch when waiting longer than 400ms. Above this threshold, a progress bar or spinner may be desirable.
- 100ms between updates feels 'pretty fast'.
- it takes 16ms for a 60hz monitor to refresh.
- 10ms is the 'magic threshold' where a system can become an extension of the body's proprioception.
This example demonstrates sending batches of data recorded on hardware. It then explores delta compression as a technique to reduce bandwidth even further.
We'll just be sending noise in this example, but the technique is intended to be used with a ping pong DMA buffers.
Firmware
In the firmware we'll define a struct, StreamableADCBuffer_t
which contains a uint32_t
timestamp, as well as a buffer of 512 uint32_t
data points.
Every 100ms, a new random walk of data will be generated, timestamped, and sent to the UI.
#include "electricui.h"// Used to mock DMA based ADC behaviourbool adc_ping_pong = false;uint32_t mock_data_ready = 0;#define ADC_SAMPLES_PER_BATCH 512#define ADC_SAMPLE_INTERVAL 100// Packet for sequential ADC datatypedef struct{ uint32_t timestamp; uint32_t buffer[ADC_SAMPLES_PER_BATCH];} StreamableADCBuffer_t;StreamableADCBuffer_t group_a;StreamableADCBuffer_t group_b;bool group_a_ready = false;bool group_b_ready = false;void hal_adc_postprocess( uint32_t *data, uint32_t length, StreamableADCBuffer_t *output );StreamableADCBuffer_t* hal_adc_read_block( void );StreamableADCBuffer_t processed_adc;char nickname[15] = "HelloMOCK fADC"; eui_interface_t serial_comms = EUI_INTERFACE( &serial_write ); eui_message_t tracked_variables[] = { EUI_CUSTOM_RO( "bulk", processed_adc ), EUI_CHAR_ARRAY_RO( "name", nickname ),};void hal_adc_postprocess( uint32_t *data, uint32_t length, StreamableADCBuffer_t *output ){ output->timestamp = millis(); for( uint32_t i = 1; i < length; i++ ) { output->buffer[i] = (int32_t)data[i]; }}StreamableADCBuffer_t* hal_adc_read_block( void ){ if( group_a_ready ) { group_a_ready = false; return &group_a; } if( group_b_ready ) { group_b_ready = false; return &group_b; } return 0;}void setup() { Serial.begin( 115200 ); pinMode( LED_BUILTIN, OUTPUT ); eui_setup_interface( &serial_comms ); EUI_TRACK( tracked_variables ); eui_setup_identifier( "highspeed", 9 ); mock_data_ready = millis();}uint32_t adc_buffer[ADC_SAMPLES_PER_BATCH+1] = { 0 };void loop() { serial_rx_handler(); //check for new inbound data // Every 100ms, emit a new batch of data if( millis() - mock_data_ready >= ADC_SAMPLE_INTERVAL ) { // Generate the initial point adc_buffer[0] = random(0, 5000); for( uint16_t i = 1; i < ADC_SAMPLES_PER_BATCH; i++ ) { // Random walk all subsequent points int32_t next = adc_buffer[i-1] + random(-5000, 5000); // Saturate to 0 if underflowing at the cost of half our precision in this toy example if (next < 0) { next = 0; } adc_buffer[i] = next; } if( adc_ping_pong ) { // On hardware, this is the 'front' half of the buffer hal_adc_postprocess( &adc_buffer[0], 512, &group_a); group_a_ready = true; } else { // On hardware, this would be tail half of the buffer hal_adc_postprocess( &adc_buffer[0], 512, &group_b); group_b_ready = true; } adc_ping_pong != adc_ping_pong; mock_data_ready = millis(); } StreamableADCBuffer_t *data_ready; data_ready = hal_adc_read_block(); if( data_ready ) { memcpy( &processed_adc, data_ready, sizeof(StreamableADCBuffer_t)); eui_send_tracked("bulk"); }}void serial_rx_handler(){ // While we have data, we will pass those bytes to the ElectricUI parser while( Serial.available() > 0 ) { eui_parse( Serial.read(), &serial_comms ); // Ingest a byte }} void serial_write( uint8_t *data, uint16_t len ){ Serial.write( data, len ); //output on the main serial port}
User interface
The codec will read the uint32_t
timestamp, use a built in utility to exchange it for a UI timestamp, then read out the buffer of uint32_t
data.
// transport-manager/config/codecs.tsximport { Codec, Message } from '@electricui/core'import { HardwareMessageRetimer, HardwareTimeBasis } from '@electricui/protocol-binary-codecs'import { timing } from '@electricui/timing'import { SmartBuffer } from 'smart-buffer'/** * #define ADC_SAMPLES_PER_BATCH 512 * #define ADC_SAMPLE_INTERVAL 100 * * typedef struct * { * uint32_t timestamp; * uint32_t buffer[ADC_SAMPLES_PER_BATCH]; * } StreamableADCBuffer_t; */export const ADC_SAMPLES_PER_BATCH = 512export const ADC_SAMPLE_INTERVAL = 512export interface StreamableADCBuffer { // We'll change the name to indicate that the origin will be based on the UI's time origin offsetTimestamp: number, data: number[]}export class StreamableADCCodec extends Codec<StreamableADCBuffer> { private retimer: HardwareMessageRetimer constructor(timeBasis: HardwareTimeBasis) { super() this.retimer = new HardwareMessageRetimer(timeBasis) } filter(message: Message): boolean { return message.messageID === 'bulk' } encode(payload: StreamableADCBuffer): Buffer { throw new Error(`bulk is readonly`) } decode(payload: Buffer): StreamableADCBuffer { const reader = SmartBuffer.fromBuffer(payload) // Read out the timestamp from hardware const hardwareOriginTimestamp = reader.readUInt32LE() // Exchange the timestamp for one in the same time basis as the UI const offsetTimestamp = this.retimer.exchange(hardwareOriginTimestamp) let values: number[] = [] // Read out each sample for (let index = 0; index < ADC_SAMPLES_PER_BATCH; index++) { values.push(reader.readUInt32LE()) } return { offsetTimestamp, data: values, } }}
As with the hardware timestamping example, the codec is passed a hardware time basis.
// transport-manager/config/serial.tsximport { CodecDuplexPipelineWithDefaults, HardwareTimeBasis } from '@electricui/protocol-binary-codecs'import { StreamableADCCodec } from './codecs'// ...omitted for brevityconst serialTransportFactory = new TransportFactory((options: SerialTransportOptions) => { const connectionInterface = new ConnectionInterface() // ...omitted for brevity const codecPipeline = new CodecDuplexPipelineWithDefaults() // Create the HardwareTimeBasis, specifying a uint32_t container const hardwareTimeBasis = new HardwareTimeBasis(32) // Create the instances of the codecs const customCodecs = [ new StreamableADCCodec(hardwareTimeBasis), ] // Add custom codecs. codecPipeline.addCodecs(customCodecs) // ...omitted for brevity return connectionInterface.finalise()})
Creating the events
A single packet contains 512 events, each of these events can be created in the custom processor.
This aligns the last event of the batch with the time that the packet is sent.
The persistence engine will also be modified to hold up to 250k events at a time.
// transport-manager/index.tsximport { ElectronIPCRemoteQueryExecutor, MultiPersistenceEngineMemory, QueryableMessageIDProvider,} from '@electricui/core-timeseries'import { ADC_SAMPLE_INTERVAL, StreamableADCBuffer } from './config/codecs'const multiPersistenceEngine = new MultiPersistenceEngineMemory()const remoteQueryExecutor = new ElectronIPCRemoteQueryExecutor( multiPersistenceEngine,)const queryableMessageIDProvider = new QueryableMessageIDProvider( deviceManager, multiPersistenceEngine,)queryableMessageIDProvider.setCustomMessageProcessor( 'bulk', (message: Message<StreamableADCBuffer>, api) => { if (!message.payload) { // If there's no payload, do nothing return } const packetTime = message.payload.offsetTimestamp api.emitIntervalAlignEnd( packetTime, ADC_SAMPLE_INTERVAL, message.payload.data, ) },)queryableMessageIDProvider.setPersistenceEngineFactory( 'bulk', dS => new PersistenceEngineMemory(250_000, dS.description),)
The events are accessed using the useMessageDataSource
hook as normal. A 100ms delay is added to the real-time domain to prevent the 'pop in' effect.
import { ChartContainer, LineChart, RealTimeDomain, TimeAxis, VerticalAxis,} from '@electricui/components-desktop-charts'import { useMessageDataSource } from '@electricui/core-timeseries'const Page = () => { const bulkDataSource = useMessageDataSource('bulk') return ( <div style={{ textAlign: 'center', marginBottom: '1em' }}> <b>Bulk sensor</b> </div> <ChartContainer> <LineChart dataSource={bulkDataSource} maxItems={10000} /> <RealTimeDomain window={10000} delay={100}/> <TimeAxis /> <VerticalAxis /> </ChartContainer> )}
Each batch has been colored red or green:
Delta compression
Many sensor measurements won't change by much, sample to sample. If the deltas can be represented with a smaller type than a single data point, the batch of data can be compressed. The initial value can be sent, then each subsequent data point can be represented by a delta from the previous point.
- A payload of 512
uint32_t
s is a total of2,048
bytes. - A payload of 1
uint32_t
then 511uint16_t
deltas is a total of1,026
bytes. - A payload of 1
uint32_t
then 511uint8_t
deltas is a total of515
bytes.
Firmware change
This firmware has been modified to send the initial value and a series of deltas.
#include "electricui.h"// Used to mock DMA based ADC behaviourbool adc_ping_pong = false;uint32_t mock_data_ready = 0;#define ADC_SAMPLES_PER_BATCH 512#define ADC_SAMPLE_INTERVAL 100// Packet for sequential ADC datatypedef struct{ uint32_t timestamp; uint32_t initial; int16_t buffer[ADC_SAMPLES_PER_BATCH - 1];} StreamableADCBuffer_t;StreamableADCBuffer_t group_a;StreamableADCBuffer_t group_b;bool group_a_ready = false;bool group_b_ready = false;void hal_adc_postprocess( uint32_t *data, uint32_t length, StreamableADCBuffer_t *output );StreamableADCBuffer_t* hal_adc_read_block( void );StreamableADCBuffer_t processed_adc;char nickname[15] = "HelloMOCK fADC"; eui_interface_t serial_comms = EUI_INTERFACE( &serial_write ); eui_message_t tracked_variables[] = { EUI_CUSTOM_RO( "bulk", processed_adc ), EUI_CHAR_ARRAY_RO( "name", nickname ),};void hal_adc_postprocess( uint32_t *data, uint32_t length, StreamableADCBuffer_t *output ){ output->timestamp = millis(); output->initial = data[0]; uint32_t last_value = data[0]; for( uint32_t i = 1; i < length; i++ ) { output->buffer[i] = (int32_t)data[i] - last_value; last_value = data[i]; }}StreamableADCBuffer_t* hal_adc_read_block( void ){ if( group_a_ready ) { group_a_ready = false; return &group_a; } if( group_b_ready ) { group_b_ready = false; return &group_b; } return 0;}void setup() { Serial.begin( 115200 ); pinMode( LED_BUILTIN, OUTPUT ); eui_setup_interface( &serial_comms ); EUI_TRACK( tracked_variables ); eui_setup_identifier( "highspeed", 9 ); mock_data_ready = millis();}uint32_t adc_buffer[ADC_SAMPLES_PER_BATCH+1] = { 0 };void loop() { serial_rx_handler(); //check for new inbound data // Every 100ms, emit a new batch of data if( millis() - mock_data_ready >= ADC_SAMPLE_INTERVAL ) { // Generate the initial point adc_buffer[0] = random(0, 5000); for( uint16_t i = 1; i < ADC_SAMPLES_PER_BATCH; i++ ) { // Random walk all subsequent points int32_t next = adc_buffer[i-1] + random(-5000, 5000); // Saturate to 0 if underflowing at the cost of half our precision in this toy example if (next < 0) { next = 0; } adc_buffer[i] = next; } if( adc_ping_pong ) { // On hardware, this is the 'front' half of the buffer hal_adc_postprocess( &adc_buffer[0], 512, &group_a); group_a_ready = true; } else { // On hardware, this would be tail half of the buffer hal_adc_postprocess( &adc_buffer[0], 512, &group_b); group_b_ready = true; } adc_ping_pong != adc_ping_pong; mock_data_ready = millis(); } StreamableADCBuffer_t *data_ready; data_ready = hal_adc_read_block(); if( data_ready ) { memcpy( &processed_adc, data_ready, sizeof(StreamableADCBuffer_t)); eui_send_tracked("bulk"); }}void serial_rx_handler(){ // While we have data, we will pass those bytes to the ElectricUI parser while( Serial.available() > 0 ) { eui_parse( Serial.read(), &serial_comms ); // Ingest a byte }} void serial_write( uint8_t *data, uint16_t len ){ Serial.write( data, len ); //output on the main serial port}
This codec reads out the initial value, then reconstructs the rest of the buffer using the deltas.
// transport-manager/config/codecs.tsximport { Codec, Message } from '@electricui/core'import { HardwareMessageRetimer, HardwareTimeBasis } from '@electricui/protocol-binary-codecs'import { timing } from '@electricui/timing'import { SmartBuffer } from 'smart-buffer'/** * #define ADC_SAMPLES_PER_BATCH 512 * #define ADC_SAMPLE_INTERVAL 100 * * typedef struct * { * uint32_t timestamp; * uint32_t initial; * int16_t buffer[ADC_SAMPLES_PER_BATCH - 1]; * } StreamableADCBuffer_t; */export const ADC_SAMPLES_PER_BATCH = 512export const ADC_SAMPLE_INTERVAL = 100export interface StreamableADCBuffer { // We'll change the name to indicate that the origin will be based on the UI's time origin offsetTimestamp: number, data: number[]}export class StreamableADCCodec extends Codec<StreamableADCBuffer> { private retimer: HardwareMessageRetimer constructor(timeBasis: HardwareTimeBasis) { super() this.retimer = new HardwareMessageRetimer(timeBasis) } filter(message: Message): boolean { return message.messageID === 'bulk' } encode(payload: StreamableADCBuffer): Buffer { throw new Error(`bulk is readonly`) } decode(payload: Buffer): StreamableADCBuffer { const reader = SmartBuffer.fromBuffer(payload) // Read out the timestamp from hardware const hardwareOriginTimestamp = reader.readUInt32LE() // Exchange the timestamp for one in the same time basis as the UI const offsetTimestamp = this.retimer.exchange(hardwareOriginTimestamp) const initial = reader.readUInt32LE() let current = initial const values: number[] = [current] for (let index = 0; index < ADC_SAMPLES_PER_BATCH - 1; index++) { const offset = reader.readInt16LE() current += offset values.push(current) } return { offsetTimestamp, data: values, } }}
The end result is visually the same, but using a fraction of the orignal bandwidth.