-
Notifications
You must be signed in to change notification settings - Fork 5.7k
Expand file tree
/
Copy pathret_queue_length.rs
More file actions
354 lines (306 loc) · 12.6 KB
/
ret_queue_length.rs
File metadata and controls
354 lines (306 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
//! Queue length estimation — ADR-041 Category 4: Retail & Hospitality.
//!
//! Estimates queue length from sequential presence detection using CSI data.
//! Tracks join rate (lambda) and service rate (mu), then applies Little's Law
//! (L = lambda * W) to estimate average wait time.
//!
//! Events (400-series):
//! - `QUEUE_LENGTH(400)`: Current estimated queue length
//! - `WAIT_TIME_ESTIMATE(401)`: Estimated wait time in seconds
//! - `SERVICE_RATE(402)`: Service rate (persons/minute)
//! - `QUEUE_ALERT(403)`: Queue threshold exceeded
//!
//! Host API used: presence, n_persons, variance, motion energy.
use crate::vendor_common::Ema;
#[cfg(not(feature = "std"))]
use libm::fabsf;
#[cfg(feature = "std")]
fn fabsf(x: f32) -> f32 { x.abs() }
// ── Event IDs ─────────────────────────────────────────────────────────────────
pub const EVENT_QUEUE_LENGTH: i32 = 400;
pub const EVENT_WAIT_TIME_ESTIMATE: i32 = 401;
pub const EVENT_SERVICE_RATE: i32 = 402;
pub const EVENT_QUEUE_ALERT: i32 = 403;
// ── Configuration constants ──────────────────────────────────────────────────
/// Frame rate assumption (Hz).
const FRAME_RATE: f32 = 20.0;
/// Number of frames per reporting interval (~1 s at 20 Hz).
const REPORT_INTERVAL: u32 = 20;
/// Number of frames per service-rate computation window (~30 s).
const SERVICE_WINDOW_FRAMES: u32 = 600;
/// EMA smoothing for queue length.
const QUEUE_EMA_ALPHA: f32 = 0.1;
/// EMA smoothing for join/service rates.
const RATE_EMA_ALPHA: f32 = 0.05;
/// Variance threshold to detect a new person joining the queue.
const JOIN_VARIANCE_THRESH: f32 = 0.05;
/// Motion energy threshold below which a person is considered "served" (left).
const DEPART_MOTION_THRESH: f32 = 0.02;
/// Queue length alert threshold (persons).
const QUEUE_ALERT_THRESH: f32 = 5.0;
/// Maximum queue length tracked.
const MAX_QUEUE: usize = 20;
/// History window for arrival/departure events (60 seconds at 20 Hz).
const RATE_HISTORY: usize = 1200;
// ── Queue Length Estimator ───────────────────────────────────────────────────
/// Estimates queue length from CSI presence and person-count data.
pub struct QueueLengthEstimator {
/// Smoothed queue length estimate.
queue_ema: Ema,
/// Smoothed arrival rate (persons/minute).
arrival_rate_ema: Ema,
/// Smoothed service rate (persons/minute).
service_rate_ema: Ema,
/// Previous n_persons value for detecting joins/departures.
prev_n_persons: i32,
/// Previous presence state.
prev_presence: bool,
/// Running count of arrivals in current window.
arrivals_in_window: u16,
/// Running count of departures in current window.
departures_in_window: u16,
/// Frame counter.
frame_count: u32,
/// Window frame counter (resets every SERVICE_WINDOW_FRAMES).
window_frame_count: u32,
/// Previous variance value for detecting transient spikes.
prev_variance: f32,
/// Current best estimate of queue length (integer).
current_queue: u8,
/// Alert already fired flag (prevents re-alerting same spike).
alert_active: bool,
}
impl QueueLengthEstimator {
pub const fn new() -> Self {
Self {
queue_ema: Ema::new(QUEUE_EMA_ALPHA),
arrival_rate_ema: Ema::new(RATE_EMA_ALPHA),
service_rate_ema: Ema::new(RATE_EMA_ALPHA),
prev_n_persons: 0,
prev_presence: false,
arrivals_in_window: 0,
departures_in_window: 0,
frame_count: 0,
window_frame_count: 0,
prev_variance: 0.0,
current_queue: 0,
alert_active: false,
}
}
/// Process one CSI frame with host-provided aggregate signals.
///
/// - `presence`: 1 if someone is present, 0 otherwise
/// - `n_persons`: estimated person count from Tier 2
/// - `variance`: mean subcarrier variance (indicates motion)
/// - `motion_energy`: aggregate motion energy
///
/// Returns event slice `&[(event_type, value)]`.
pub fn process_frame(
&mut self,
presence: i32,
n_persons: i32,
variance: f32,
motion_energy: f32,
) -> &[(i32, f32)] {
self.frame_count += 1;
self.window_frame_count += 1;
let is_present = presence > 0;
let n = if n_persons < 0 { 0 } else { n_persons };
// Detect arrivals: n_persons increased or new presence with variance spike.
if n > self.prev_n_persons {
let delta = (n - self.prev_n_persons) as u16;
self.arrivals_in_window = self.arrivals_in_window.saturating_add(delta);
} else if !self.prev_presence && is_present {
// Presence edge: someone appeared.
let var_delta = fabsf(variance - self.prev_variance);
if var_delta > JOIN_VARIANCE_THRESH {
self.arrivals_in_window = self.arrivals_in_window.saturating_add(1);
}
}
// Detect departures: n_persons decreased.
if n < self.prev_n_persons {
let delta = (self.prev_n_persons - n) as u16;
self.departures_in_window = self.departures_in_window.saturating_add(delta);
} else if self.prev_presence && !is_present && motion_energy < DEPART_MOTION_THRESH {
// Presence edge: everyone left.
self.departures_in_window = self.departures_in_window.saturating_add(1);
}
self.prev_n_persons = n;
self.prev_presence = is_present;
self.prev_variance = variance;
// Update queue estimate: max(0, arrivals - departures) smoothed with person count.
let raw_queue = if n > 0 { n as f32 } else { 0.0 };
self.queue_ema.update(raw_queue);
self.current_queue = (self.queue_ema.value + 0.5) as u8;
if self.current_queue > MAX_QUEUE as u8 {
self.current_queue = MAX_QUEUE as u8;
}
// Build events.
static mut EVENTS: [(i32, f32); 4] = [(0, 0.0); 4];
let mut ne = 0usize;
// Periodic queue length report.
if self.frame_count % REPORT_INTERVAL == 0 {
unsafe {
EVENTS[ne] = (EVENT_QUEUE_LENGTH, self.current_queue as f32);
}
ne += 1;
}
// Service window elapsed: compute and emit rates.
if self.window_frame_count >= SERVICE_WINDOW_FRAMES {
let window_minutes = self.window_frame_count as f32 / (FRAME_RATE * 60.0);
if window_minutes > 0.0 {
let arr_rate = self.arrivals_in_window as f32 / window_minutes;
let svc_rate = self.departures_in_window as f32 / window_minutes;
self.arrival_rate_ema.update(arr_rate);
self.service_rate_ema.update(svc_rate);
// Service rate event.
if ne < 4 {
unsafe {
EVENTS[ne] = (EVENT_SERVICE_RATE, self.service_rate_ema.value);
}
ne += 1;
}
// Wait time estimate via Little's Law: W = L / lambda.
// If arrival rate is near zero, report 0 wait.
let wait_time = if self.arrival_rate_ema.value > 0.1 {
(self.current_queue as f32) / (self.arrival_rate_ema.value / 60.0)
} else {
0.0
};
if ne < 4 {
unsafe {
EVENTS[ne] = (EVENT_WAIT_TIME_ESTIMATE, wait_time);
}
ne += 1;
}
}
// Reset window counters.
self.window_frame_count = 0;
self.arrivals_in_window = 0;
self.departures_in_window = 0;
}
// Queue alert.
if self.current_queue as f32 >= QUEUE_ALERT_THRESH && !self.alert_active {
self.alert_active = true;
if ne < 4 {
unsafe {
EVENTS[ne] = (EVENT_QUEUE_ALERT, self.current_queue as f32);
}
ne += 1;
}
} else if (self.current_queue as f32) < QUEUE_ALERT_THRESH - 1.0 {
self.alert_active = false;
}
unsafe { &EVENTS[..ne] }
}
/// Get the current smoothed queue length.
pub fn queue_length(&self) -> u8 {
self.current_queue
}
/// Get the smoothed arrival rate (persons/minute).
pub fn arrival_rate(&self) -> f32 {
self.arrival_rate_ema.value
}
/// Get the smoothed service rate (persons/minute).
pub fn service_rate(&self) -> f32 {
self.service_rate_ema.value
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_init_state() {
let q = QueueLengthEstimator::new();
assert_eq!(q.queue_length(), 0);
assert_eq!(q.frame_count, 0);
assert!(!q.alert_active);
}
#[test]
fn test_empty_queue_no_events_except_periodic() {
let mut q = QueueLengthEstimator::new();
// Process frames with no presence.
for i in 1..=40 {
let events = q.process_frame(0, 0, 0.0, 0.0);
if i % REPORT_INTERVAL == 0 {
assert!(!events.is_empty(), "periodic report expected at frame {}", i);
assert_eq!(events[0].0, EVENT_QUEUE_LENGTH);
assert!(events[0].1 < 0.5, "queue should be ~0");
}
}
assert_eq!(q.queue_length(), 0);
}
#[test]
fn test_queue_grows_with_persons() {
let mut q = QueueLengthEstimator::new();
// Simulate people arriving: ramp n_persons from 0 to 3.
for _ in 0..60 {
q.process_frame(1, 3, 0.1, 0.5);
}
// Queue EMA should converge towards 3.
assert!(q.queue_length() >= 2, "queue should track person count, got {}", q.queue_length());
}
#[test]
fn test_arrival_detection() {
let mut q = QueueLengthEstimator::new();
// Start with 0 people.
q.process_frame(0, 0, 0.0, 0.0);
// One person arrives.
q.process_frame(1, 1, 0.1, 0.3);
// Another person arrives.
q.process_frame(1, 2, 0.15, 0.4);
// Check arrivals tracked.
assert!(q.arrivals_in_window >= 2, "should detect at least 2 arrivals, got {}", q.arrivals_in_window);
}
#[test]
fn test_departure_detection() {
let mut q = QueueLengthEstimator::new();
// Start with 3 people.
q.process_frame(1, 3, 0.1, 0.5);
// One departs.
q.process_frame(1, 2, 0.08, 0.3);
// Another departs.
q.process_frame(1, 1, 0.05, 0.2);
assert!(q.departures_in_window >= 2, "should detect departures, got {}", q.departures_in_window);
}
#[test]
fn test_queue_alert() {
let mut q = QueueLengthEstimator::new();
let mut alert_fired = false;
// Push enough frames with high person count to trigger alert.
for _ in 0..200 {
let events = q.process_frame(1, 8, 0.2, 0.8);
for &(et, _) in events {
if et == EVENT_QUEUE_ALERT {
alert_fired = true;
}
}
}
assert!(alert_fired, "queue alert should fire when queue >= {}", QUEUE_ALERT_THRESH);
}
#[test]
fn test_service_rate_computation() {
let mut q = QueueLengthEstimator::new();
let mut service_rate_emitted = false;
// Simulate arrivals and departures over a full window.
for i in 0..SERVICE_WINDOW_FRAMES + 1 {
let n = if i < 300 { 3 } else { 1 };
let events = q.process_frame(1, n, 0.1, 0.3);
for &(et, _) in events {
if et == EVENT_SERVICE_RATE {
service_rate_emitted = true;
}
}
}
assert!(service_rate_emitted, "service rate should be emitted after window elapses");
}
#[test]
fn test_negative_inputs_handled() {
let mut q = QueueLengthEstimator::new();
// Negative n_persons should be treated as 0.
let _events = q.process_frame(-1, -5, -0.1, -0.5);
// Should not panic.
assert_eq!(q.queue_length(), 0);
}
}