Bulk event transfer

Latency and throughput form a tradeoff when data is sent across a bandwidth constrained transport. Sending one event per packet can achieve the lowest possible latency at the cost of disproportionate overheads. As events become more frequent, and bandwidth becomes more precious, it is desirable to reduce this overhead.

One way to achieve this is to pack many events worth of data into a single packet. This comes at the cost of increased latency. If a packet contains 10 events, the hardware is waiting 10x longer before it can send a packet.

The Electric UI binary protocol has 7 bytes of overhead with large packet offsets disabled. The messageID is another fixed cost, let's say it's 2 bytes.

With a 4 byte payload, the packet totals 13 bytes. Sending 512 events costs 6,656 bytes.

By batching 512 events into a single packet, the fixed costs are paid only once. The new payload is 4 * 512 = 2048 bytes, totalling 2,057 bytes for the whole packet.

Sending events in batches of 512 is over 3x more bandwidth efficient than sending a packet per event.

This is of course at the cost of 512x the latency. If events are collected once per second, that would represent 8 minutes of extra latency. In a real-time system this would be unacceptable. If the hardware is trickling data into an aggregation system however, that latency might not matter at all.

If the events are collected every millisecond instead, waiting half a second might be too long.

This tradeoff must be considered per project, there is no one size fits all answer.

For some rough numbers:

  • a human will likely begin to context switch when waiting longer than 400ms. Above this threshold, a progress bar or spinner may be desirable.
  • 100ms between updates feels 'pretty fast'.
  • it takes 16ms for a 60hz monitor to refresh.
  • 10ms is the 'magic threshold' where a system can become an extension of the body's proprioception.

This example demonstrates sending batches of data recorded on hardware. It then explores delta compression as a technique to reduce bandwidth even further.

We'll just be sending noise in this example, but the technique is intended to be used with a ping pong DMA buffers.

Screenshot of component Retiming noise

Firmware

In the firmware we'll define a struct, StreamableADCBuffer_t which contains a uint32_t timestamp, as well as a buffer of 512 uint32_t data points.

Every 100ms, a new random walk of data will be generated, timestamped, and sent to the UI.

#include "electricui.h"
// Used to mock DMA based ADC behaviour
bool adc_ping_pong = false;
uint32_t mock_data_ready = 0;
#define ADC_SAMPLES_PER_BATCH 512
#define ADC_SAMPLE_INTERVAL 100
// Packet for sequential ADC data
typedef struct
{
uint32_t timestamp;
uint32_t buffer[ADC_SAMPLES_PER_BATCH];
} StreamableADCBuffer_t;
StreamableADCBuffer_t group_a;
StreamableADCBuffer_t group_b;
bool group_a_ready = false;
bool group_b_ready = false;
void hal_adc_postprocess( uint32_t *data, uint32_t length, StreamableADCBuffer_t *output );
StreamableADCBuffer_t* hal_adc_read_block( void );
StreamableADCBuffer_t processed_adc;
char nickname[15] = "HelloMOCK fADC";
eui_interface_t serial_comms = EUI_INTERFACE( &serial_write );
eui_message_t tracked_variables[] =
{
EUI_CUSTOM_RO( "bulk", processed_adc ),
EUI_CHAR_ARRAY_RO( "name", nickname ),
};
void hal_adc_postprocess( uint32_t *data, uint32_t length, StreamableADCBuffer_t *output )
{
output->timestamp = millis();
for( uint32_t i = 1; i < length; i++ )
{
output->buffer[i] = (int32_t)data[i];
}
}
StreamableADCBuffer_t* hal_adc_read_block( void )
{
if( group_a_ready )
{
group_a_ready = false;
return &group_a;
}
if( group_b_ready )
{
group_b_ready = false;
return &group_b;
}
return 0;
}
void setup()
{
Serial.begin( 115200 );
pinMode( LED_BUILTIN, OUTPUT );
eui_setup_interface( &serial_comms );
EUI_TRACK( tracked_variables );
eui_setup_identifier( "highspeed", 9 );
mock_data_ready = millis();
}
uint32_t adc_buffer[ADC_SAMPLES_PER_BATCH+1] = { 0 };
void loop()
{
serial_rx_handler(); //check for new inbound data
// Every 100ms, emit a new batch of data
if( millis() - mock_data_ready >= ADC_SAMPLE_INTERVAL )
{
// Generate the initial point
adc_buffer[0] = random(0, 5000);
for( uint16_t i = 1; i < ADC_SAMPLES_PER_BATCH; i++ )
{
// Random walk all subsequent points
int32_t next = adc_buffer[i-1] + random(-5000, 5000);
// Saturate to 0 if underflowing at the cost of half our precision in this toy example
if (next < 0)
{
next = 0;
}
adc_buffer[i] = next;
}
if( adc_ping_pong )
{
// On hardware, this is the 'front' half of the buffer
hal_adc_postprocess( &adc_buffer[0], 512, &group_a);
group_a_ready = true;
}
else
{
// On hardware, this would be tail half of the buffer
hal_adc_postprocess( &adc_buffer[0], 512, &group_b);
group_b_ready = true;
}
adc_ping_pong != adc_ping_pong;
mock_data_ready = millis();
}
StreamableADCBuffer_t *data_ready;
data_ready = hal_adc_read_block();
if( data_ready )
{
memcpy( &processed_adc, data_ready, sizeof(StreamableADCBuffer_t));
eui_send_tracked("bulk");
}
}
void serial_rx_handler()
{
// While we have data, we will pass those bytes to the ElectricUI parser
while( Serial.available() > 0 )
{
eui_parse( Serial.read(), &serial_comms ); // Ingest a byte
}
}
void serial_write( uint8_t *data, uint16_t len )
{
Serial.write( data, len ); //output on the main serial port
}

User interface

The codec will read the uint32_t timestamp, use a built in utility to exchange it for a UI timestamp, then read out the buffer of uint32_t data.

// transport-manager/config/codecs.tsx
import { Codec, Message } from '@electricui/core'
import { HardwareMessageRetimer, HardwareTimeBasis } from '@electricui/protocol-binary-codecs'
import { timing } from '@electricui/timing'
import { SmartBuffer } from 'smart-buffer'
/**
* #define ADC_SAMPLES_PER_BATCH 512
* #define ADC_SAMPLE_INTERVAL 100
*
* typedef struct
* {
* uint32_t timestamp;
* uint32_t buffer[ADC_SAMPLES_PER_BATCH];
* } StreamableADCBuffer_t;
*/
export const ADC_SAMPLES_PER_BATCH = 512
export const ADC_SAMPLE_INTERVAL = 512
export interface StreamableADCBuffer {
// We'll change the name to indicate that the origin will be based on the UI's time origin
offsetTimestamp: number,
data: number[]
}
export class StreamableADCCodec extends Codec<StreamableADCBuffer> {
private retimer: HardwareMessageRetimer
constructor(timeBasis: HardwareTimeBasis) {
super()
this.retimer = new HardwareMessageRetimer(timeBasis)
}
filter(message: Message): boolean {
return message.messageID === 'bulk'
}
encode(payload: StreamableADCBuffer): Buffer {
throw new Error(`bulk is readonly`)
}
decode(payload: Buffer): StreamableADCBuffer {
const reader = SmartBuffer.fromBuffer(payload)
// Read out the timestamp from hardware
const hardwareOriginTimestamp = reader.readUInt32LE()
// Exchange the timestamp for one in the same time basis as the UI
const offsetTimestamp = this.retimer.exchange(hardwareOriginTimestamp)
let values: number[] = []
// Read out each sample
for (let index = 0; index < ADC_SAMPLES_PER_BATCH; index++) {
values.push(reader.readUInt32LE())
}
return {
offsetTimestamp,
data: values,
}
}
}

As with the hardware timestamping example, the codec is passed a hardware time basis.

// transport-manager/config/serial.tsx
import { CodecDuplexPipelineWithDefaults, HardwareTimeBasis } from '@electricui/protocol-binary-codecs'
import { StreamableADCCodec } from './codecs'
// ...omitted for brevity
const serialTransportFactory = new TransportFactory((options: SerialTransportOptions) => {
const connectionInterface = new ConnectionInterface()
// ...omitted for brevity
const codecPipeline = new CodecDuplexPipelineWithDefaults()
// Create the HardwareTimeBasis, specifying a uint32_t container
const hardwareTimeBasis = new HardwareTimeBasis(32)
// Create the instances of the codecs
const customCodecs = [
new StreamableADCCodec(hardwareTimeBasis),
]
// Add custom codecs.
codecPipeline.addCodecs(customCodecs)
// ...omitted for brevity
return connectionInterface.finalise()
})

Creating the events

A single packet contains 512 events, each of these events can be created in the custom processor.

This aligns the last event of the batch with the time that the packet is sent.

The persistence engine will also be modified to hold up to 250k events at a time.

// transport-manager/index.tsx
import {
ElectronIPCRemoteQueryExecutor,
Event,
QueryableMessageIDProvider,
} from '@electricui/core-timeseries'
import {
ADC_SAMPLES_PER_BATCH,
ADC_SAMPLE_INTERVAL,
StreamableADCBuffer,
} from './config/codecs'
const remoteQueryExecutor = new ElectronIPCRemoteQueryExecutor()
const queryableMessageIDProvider = new QueryableMessageIDProvider(deviceManager, remoteQueryExecutor)
queryableMessageIDProvider.setCustomProcessor('bulk', (message: Message<StreamableADCBuffer>, emit) => {
if (!message.payload) {
// If there's no payload, do nothing
return
}
const packetTime = message.payload.offsetTimestamp
for (let index = 0; index < ADC_SAMPLES_PER_BATCH; index++) {
// Calculate the time of this event, lining up the last event with the time of the packet
const eventTime = packetTime - ADC_SAMPLE_INTERVAL + ((index + 1) / ADC_SAMPLES_PER_BATCH) * ADC_SAMPLE_INTERVAL
const eventData = message.payload.data[index]
// Emit an event at this time
emit(new Event(eventTime, eventData))
}
})
queryableMessageIDProvider.setPersistenceEngineFactory('bulk', dS => new PersistenceEngineMemory(250_000, dS.description))

The events are accessed using the useMessageDataSource hook as normal. A 100ms delay is added to the real-time domain to prevent the 'pop in' effect.

import {
ChartContainer,
LineChart,
RealTimeDomain,
TimeAxis,
VerticalAxis,
} from '@electricui/components-desktop-charts'
import { useMessageDataSource } from '@electricui/core-timeseries'
const Page = () => {
const bulkDataSource = useMessageDataSource('bulk')
return (
<div style={{ textAlign: 'center', marginBottom: '1em' }}>
<b>Bulk sensor</b>
</div>
<ChartContainer>
<LineChart dataSource={bulkDataSource} maxItems={10000} />
<RealTimeDomain window={10000} delay={100}/>
<TimeAxis />
<VerticalAxis />
</ChartContainer>
)
}

Each batch has been colored red or green:

Screenshot of component Retiming noise-red-green

Delta compression

Many sensor measurements won't change by much, sample to sample. If the deltas can be represented with a smaller type than a single data point, the batch of data can be compressed. The initial value can be sent, then each subsequent data point can be represented by a delta from the previous point.

  • A payload of 512 uint32_ts is a total of 2,048 bytes.
  • A payload of 1 uint32_t then 511 uint16_t deltas is a total of 1,026 bytes.
  • A payload of 1 uint32_t then 511 uint8_t deltas is a total of 515 bytes.

Firmware change

This firmware has been modified to send the initial value and a series of deltas.

#include "electricui.h"
// Used to mock DMA based ADC behaviour
bool adc_ping_pong = false;
uint32_t mock_data_ready = 0;
#define ADC_SAMPLES_PER_BATCH 512
#define ADC_SAMPLE_INTERVAL 100
// Packet for sequential ADC data
typedef struct
{
uint32_t timestamp;
uint32_t initial;
int16_t buffer[ADC_SAMPLES_PER_BATCH - 1];
} StreamableADCBuffer_t;
StreamableADCBuffer_t group_a;
StreamableADCBuffer_t group_b;
bool group_a_ready = false;
bool group_b_ready = false;
void hal_adc_postprocess( uint32_t *data, uint32_t length, StreamableADCBuffer_t *output );
StreamableADCBuffer_t* hal_adc_read_block( void );
StreamableADCBuffer_t processed_adc;
char nickname[15] = "HelloMOCK fADC";
eui_interface_t serial_comms = EUI_INTERFACE( &serial_write );
eui_message_t tracked_variables[] =
{
EUI_CUSTOM_RO( "bulk", processed_adc ),
EUI_CHAR_ARRAY_RO( "name", nickname ),
};
void hal_adc_postprocess( uint32_t *data, uint32_t length, StreamableADCBuffer_t *output )
{
output->timestamp = millis();
output->initial = data[0];
uint32_t last_value = data[0];
for( uint32_t i = 1; i < length; i++ )
{
output->buffer[i] = (int32_t)data[i] - last_value;
last_value = data[i];
}
}
StreamableADCBuffer_t* hal_adc_read_block( void )
{
if( group_a_ready )
{
group_a_ready = false;
return &group_a;
}
if( group_b_ready )
{
group_b_ready = false;
return &group_b;
}
return 0;
}
void setup()
{
Serial.begin( 115200 );
pinMode( LED_BUILTIN, OUTPUT );
eui_setup_interface( &serial_comms );
EUI_TRACK( tracked_variables );
eui_setup_identifier( "highspeed", 9 );
mock_data_ready = millis();
}
uint32_t adc_buffer[ADC_SAMPLES_PER_BATCH+1] = { 0 };
void loop()
{
serial_rx_handler(); //check for new inbound data
// Every 100ms, emit a new batch of data
if( millis() - mock_data_ready >= ADC_SAMPLE_INTERVAL )
{
// Generate the initial point
adc_buffer[0] = random(0, 5000);
for( uint16_t i = 1; i < ADC_SAMPLES_PER_BATCH; i++ )
{
// Random walk all subsequent points
int32_t next = adc_buffer[i-1] + random(-5000, 5000);
// Saturate to 0 if underflowing at the cost of half our precision in this toy example
if (next < 0)
{
next = 0;
}
adc_buffer[i] = next;
}
if( adc_ping_pong )
{
// On hardware, this is the 'front' half of the buffer
hal_adc_postprocess( &adc_buffer[0], 512, &group_a);
group_a_ready = true;
}
else
{
// On hardware, this would be tail half of the buffer
hal_adc_postprocess( &adc_buffer[0], 512, &group_b);
group_b_ready = true;
}
adc_ping_pong != adc_ping_pong;
mock_data_ready = millis();
}
StreamableADCBuffer_t *data_ready;
data_ready = hal_adc_read_block();
if( data_ready )
{
memcpy( &processed_adc, data_ready, sizeof(StreamableADCBuffer_t));
eui_send_tracked("bulk");
}
}
void serial_rx_handler()
{
// While we have data, we will pass those bytes to the ElectricUI parser
while( Serial.available() > 0 )
{
eui_parse( Serial.read(), &serial_comms ); // Ingest a byte
}
}
void serial_write( uint8_t *data, uint16_t len )
{
Serial.write( data, len ); //output on the main serial port
}

This codec reads out the initial value, then reconstructs the rest of the buffer using the deltas.

// transport-manager/config/codecs.tsx
import { Codec, Message } from '@electricui/core'
import { HardwareMessageRetimer, HardwareTimeBasis } from '@electricui/protocol-binary-codecs'
import { timing } from '@electricui/timing'
import { SmartBuffer } from 'smart-buffer'
/**
* #define ADC_SAMPLES_PER_BATCH 512
* #define ADC_SAMPLE_INTERVAL 100
*
* typedef struct
* {
* uint32_t timestamp;
* uint32_t initial;
* int16_t buffer[ADC_SAMPLES_PER_BATCH - 1];
* } StreamableADCBuffer_t;
*/
export const ADC_SAMPLES_PER_BATCH = 512
export const ADC_SAMPLE_INTERVAL = 100
export interface StreamableADCBuffer {
// We'll change the name to indicate that the origin will be based on the UI's time origin
offsetTimestamp: number,
data: number[]
}
export class StreamableADCCodec extends Codec<StreamableADCBuffer> {
private retimer: HardwareMessageRetimer
constructor(timeBasis: HardwareTimeBasis) {
super()
this.retimer = new HardwareMessageRetimer(timeBasis)
}
filter(message: Message): boolean {
return message.messageID === 'bulk'
}
encode(payload: StreamableADCBuffer): Buffer {
throw new Error(`bulk is readonly`)
}
decode(payload: Buffer): StreamableADCBuffer {
const reader = SmartBuffer.fromBuffer(payload)
// Read out the timestamp from hardware
const hardwareOriginTimestamp = reader.readUInt32LE()
// Exchange the timestamp for one in the same time basis as the UI
const offsetTimestamp = this.retimer.exchange(hardwareOriginTimestamp)
const initial = reader.readUInt32LE()
let current = initial
const values: number[] = [current]
for (let index = 0; index < ADC_SAMPLES_PER_BATCH - 1; index++) {
const offset = reader.readInt16LE()
current += offset
values.push(current)
}
return {
offsetTimestamp,
data: values,
}
}
}

The end result is visually the same, but using a fraction of the orignal bandwidth.