From de34516fedc99bfb5300f61eea510d93b4aa1a15 Mon Sep 17 00:00:00 2001 From: Leopere Date: Sun, 15 Feb 2026 11:47:09 -0500 Subject: [PATCH] Replace pool with on-demand live camera entropy Every request now opens the camera, grabs fresh frames, conditions them with SHA-256, and returns. No pool, no background harvester, no buffered data. Camera not available = immediate error. - Delete src/entropy/pool/ (ring buffer, harvester, subscriber channels) - Add src/entropy/live.rs (extract_entropy, fill_entropy, stream_entropy) - Health endpoint now tests live camera reachability instead of pool stats - /stream opens a dedicated camera session per connection - Remove rand/rand_chacha dependencies (only used by removed CSPRNG) - Include monotonic nonce + timestamp in SHA-256 conditioning to guarantee unique output even on rapid back-to-back calls Co-authored-by: Cursor --- Cargo.lock | 294 +---------------------------- Cargo.toml | 2 - src/entropy/extract.rs | 4 +- src/entropy/live.rs | 187 ++++++++++++++++++ src/entropy/mod.rs | 13 +- src/entropy/pool/mod.rs | 387 -------------------------------------- src/entropy/pool/tests.rs | 96 ---------- src/lib.rs | 6 +- src/main.rs | 54 +++--- 9 files changed, 231 insertions(+), 812 deletions(-) create mode 100644 src/entropy/live.rs delete mode 100644 src/entropy/pool/mod.rs delete mode 100644 src/entropy/pool/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 8939b55..5614856 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,12 +11,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "anyhow" -version = "1.0.101" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" - [[package]] name = "arrayvec" version = "0.7.6" @@ -206,8 +200,6 @@ dependencies = [ "bytes", "hex", "nokhwa", - "rand", - "rand_chacha", "serde", "serde_json", "sha2", @@ -247,17 +239,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" -[[package]] -name = "chacha20" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" -dependencies = [ - "cfg-if 1.0.4", - "cpufeatures 0.3.0", - "rand_core", -] - [[package]] name = "clang-sys" version = "1.8.1" @@ -386,15 +367,6 @@ dependencies = [ "libc", ] -[[package]] -name = "cpufeatures" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" -dependencies = [ - "libc", -] - [[package]] name = "crypto-common" version = "0.1.7" @@ -427,12 +399,6 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -[[package]] -name = "equivalent" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" - [[package]] name = "errno" version = "0.3.14" @@ -461,12 +427,6 @@ dependencies = [ "spin", ] -[[package]] -name = "foldhash" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" - [[package]] name = "foreign-types" version = "0.3.2" @@ -565,47 +525,12 @@ dependencies = [ "wasip2", ] -[[package]] -name = "getrandom" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" -dependencies = [ - "cfg-if 1.0.4", - "libc", - "r-efi", - "rand_core", - "wasip2", - "wasip3", -] - [[package]] name = "glob" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" -[[package]] -name = "hashbrown" -version = "0.15.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" -dependencies = [ - "foldhash", -] - -[[package]] -name = "hashbrown" -version = "0.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" - -[[package]] -name = "heck" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" - [[package]] name = "hex" version = "0.4.3" @@ -703,12 +628,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "id-arena" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" - [[package]] name = "image" version = "0.25.9" @@ -721,18 +640,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "indexmap" -version = "2.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" -dependencies = [ - "equivalent", - "hashbrown 0.16.1", - "serde", - "serde_core", -] - [[package]] name = "itoa" version = "1.0.17" @@ -771,12 +678,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "leb128fmt" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" - [[package]] name = "libc" version = "0.2.180" @@ -1067,15 +968,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "ppv-lite86" -version = "0.2.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" -dependencies = [ - "zerocopy", -] - [[package]] name = "prettyplease" version = "0.2.37" @@ -1119,33 +1011,6 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" -[[package]] -name = "rand" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" -dependencies = [ - "chacha20", - "getrandom 0.4.1", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e6af7f3e25ded52c41df4e0b1af2d047e45896c2f3281792ed68a1c243daedb" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" - [[package]] name = "regex" version = "1.12.2" @@ -1221,12 +1086,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "semver" -version = "1.0.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" - [[package]] name = "serde" version = "1.0.228" @@ -1300,7 +1159,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if 1.0.4", - "cpufeatures 0.2.17", + "cpufeatures", "digest", ] @@ -1467,12 +1326,6 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" -[[package]] -name = "unicode-xid" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" - [[package]] name = "v4l" version = "0.14.0" @@ -1514,15 +1367,6 @@ dependencies = [ "wit-bindgen", ] -[[package]] -name = "wasip3" -version = "0.4.0+wasi-0.3.0-rc-2026-01-06" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" -dependencies = [ - "wit-bindgen", -] - [[package]] name = "wasm-bindgen" version = "0.2.108" @@ -1568,40 +1412,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "wasm-encoder" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" -dependencies = [ - "leb128fmt", - "wasmparser", -] - -[[package]] -name = "wasm-metadata" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" -dependencies = [ - "anyhow", - "indexmap", - "wasm-encoder", - "wasmparser", -] - -[[package]] -name = "wasmparser" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" -dependencies = [ - "bitflags 2.10.0", - "hashbrown 0.15.5", - "indexmap", - "semver", -] - [[package]] name = "which" version = "4.4.2" @@ -1885,108 +1695,6 @@ name = "wit-bindgen" version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" -dependencies = [ - "wit-bindgen-rust-macro", -] - -[[package]] -name = "wit-bindgen-core" -version = "0.51.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" -dependencies = [ - "anyhow", - "heck", - "wit-parser", -] - -[[package]] -name = "wit-bindgen-rust" -version = "0.51.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" -dependencies = [ - "anyhow", - "heck", - "indexmap", - "prettyplease", - "syn", - "wasm-metadata", - "wit-bindgen-core", - "wit-component", -] - -[[package]] -name = "wit-bindgen-rust-macro" -version = "0.51.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" -dependencies = [ - "anyhow", - "prettyplease", - "proc-macro2", - "quote", - "syn", - "wit-bindgen-core", - "wit-bindgen-rust", -] - -[[package]] -name = "wit-component" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" -dependencies = [ - "anyhow", - "bitflags 2.10.0", - "indexmap", - "log", - "serde", - "serde_derive", - "serde_json", - "wasm-encoder", - "wasm-metadata", - "wasmparser", - "wit-parser", -] - -[[package]] -name = "wit-parser" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" -dependencies = [ - "anyhow", - "id-arena", - "indexmap", - "log", - "semver", - "serde", - "serde_derive", - "serde_json", - "unicode-xid", - "wasmparser", -] - -[[package]] -name = "zerocopy" -version = "0.8.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" -dependencies = [ - "zerocopy-derive", -] - -[[package]] -name = "zerocopy-derive" -version = "0.8.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] [[package]] name = "zmij" diff --git a/Cargo.toml b/Cargo.toml index b66cdac..353a02b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,8 +21,6 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" bytes = "1" async-stream = "0.3" -rand_chacha = "0.10.0" -rand = "0.10.0" [profile.release] opt-level = "z" diff --git a/src/entropy/extract.rs b/src/entropy/extract.rs index b0e8e65..5d084e9 100644 --- a/src/entropy/extract.rs +++ b/src/entropy/extract.rs @@ -1,7 +1,7 @@ //! Low-level entropy extraction from camera frames. //! -//! These functions are used by the background harvester. API consumers -//! should use pool::extract_entropy() which never blocks on the camera. +//! Provides conditioning (SHA-256 over LSB chunks) and raw LSB extraction. +//! Used by live.rs for on-demand entropy and by debug/testing utilities. use sha2::{Digest, Sha256}; diff --git a/src/entropy/live.rs b/src/entropy/live.rs new file mode 100644 index 0000000..e4ef0f9 --- /dev/null +++ b/src/entropy/live.rs @@ -0,0 +1,187 @@ +//! Live on-demand entropy: every request opens the camera, grabs fresh +//! frames, conditions them, and returns. No pool, no background thread, +//! no buffered data. Camera not available = error. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use sha2::{Digest, Sha256}; + +use super::camera::open_camera_with_retry; +use super::config::CameraConfig; +use super::extract::CHUNK_SIZE; + +/// Global monotonic nonce so every extraction is unique even if the raw +/// camera frame hasn't changed between rapid back-to-back calls. +static EXTRACTION_NONCE: AtomicU64 = AtomicU64::new(0); + +/// Extract `num_bytes` of conditioned entropy directly from the camera. +/// +/// Opens the camera, captures enough frames to produce `num_bytes` of +/// SHA-256 conditioned output, then closes the camera. Each frame yields +/// `(frame_size / CHUNK_SIZE) * 32` bytes of conditioned entropy. +/// +/// Fails immediately if the camera cannot be opened. +pub fn extract_entropy(num_bytes: usize, config: &CameraConfig) -> Result, String> { + if num_bytes == 0 { + return Ok(Vec::new()); + } + + let mut camera = open_camera_with_retry(config)?; + + // Discard the first frame — cameras often return a stale/cached buffer + // on the first call. This ensures we only use fresh sensor data. + let _ = camera.frame().map_err(|e| format!("camera prime frame failed: {}", e))?; + + // Unique nonce per extraction prevents duplicate output even if the + // camera returns a cached frame on rapid back-to-back calls. + let nonce = EXTRACTION_NONCE.fetch_add(1, Ordering::Relaxed); + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + + let mut out = Vec::with_capacity(num_bytes); + let mut hasher = Sha256::new(); + let mut frame_counter: u64 = 0; + + while out.len() < num_bytes { + let frame = camera.frame().map_err(|e| { + format!("camera frame capture failed: {}", e) + })?; + let raw = frame.buffer(); + + if raw.is_empty() { + return Err("camera returned empty frame".to_string()); + } + + let lsbs: Vec = raw.iter().map(|b| b & 0x03).collect(); + + for (chunk_idx, chunk) in lsbs.chunks(CHUNK_SIZE).enumerate() { + hasher.update(chunk); + hasher.update(&frame_counter.to_le_bytes()); + hasher.update(&(chunk_idx as u64).to_le_bytes()); + hasher.update(&nonce.to_le_bytes()); + hasher.update(&ts.to_le_bytes()); + let hash = hasher.finalize_reset(); + out.extend_from_slice(&hash); + if out.len() >= num_bytes { + break; + } + } + frame_counter += 1; + } + + camera.stop_stream().ok(); + out.truncate(num_bytes); + Ok(out) +} + +/// Fill a buffer with live camera entropy (used by OpenSSL provider). +pub fn fill_entropy(out: &mut [u8], config: &CameraConfig) -> Result<(), String> { + let data = extract_entropy(out.len(), config)?; + out.copy_from_slice(&data); + Ok(()) +} + +/// Stream conditioned entropy chunks from the camera via a channel. +/// +/// Opens the camera in a background thread and sends 32-byte conditioned +/// chunks through the provided sender. The camera stays open and actively +/// capturing for the lifetime of the channel. Closes when the receiver +/// is dropped. +pub fn stream_entropy( + config: CameraConfig, + tx: std::sync::mpsc::SyncSender>, +) -> Result<(), String> { + std::thread::spawn(move || { + let mut camera = match open_camera_with_retry(&config) { + Ok(c) => c, + Err(e) => { + eprintln!("[stream] camera open failed: {}", e); + return; + } + }; + + // Discard first frame (may be stale/cached). + let _ = camera.frame(); + + let nonce = EXTRACTION_NONCE.fetch_add(1, Ordering::Relaxed); + let mut hasher = Sha256::new(); + let mut frame_counter: u64 = 0; + + loop { + let frame = match camera.frame() { + Ok(f) => f, + Err(e) => { + eprintln!("[stream] frame error: {}", e); + break; + } + }; + let raw = frame.buffer(); + if raw.is_empty() { + continue; + } + + let lsbs: Vec = raw.iter().map(|b| b & 0x03).collect(); + + for (chunk_idx, chunk) in lsbs.chunks(CHUNK_SIZE).enumerate() { + hasher.update(chunk); + hasher.update(&frame_counter.to_le_bytes()); + hasher.update(&(chunk_idx as u64).to_le_bytes()); + hasher.update(&nonce.to_le_bytes()); + let hash = hasher.finalize_reset().to_vec(); + if tx.send(hash).is_err() { + // Receiver dropped — stop streaming. + camera.stop_stream().ok(); + return; + } + } + frame_counter += 1; + } + camera.stop_stream().ok(); + }); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Zero-length request returns empty vec (no camera needed). + #[test] + fn zero_bytes_returns_empty() { + let config = CameraConfig::default(); + let data = extract_entropy(0, &config).expect("zero bytes"); + assert!(data.is_empty()); + } + + /// All camera-dependent tests in one function to avoid parallel camera + /// lock conflicts (only one test thread can open the camera at a time). + #[test] + fn live_camera_invariants() { + let config = CameraConfig::default(); + + // 1. extract_entropy returns non-empty, non-zero data. + let data = extract_entropy(32, &config) + .expect("extract_entropy should succeed with a camera"); + assert_eq!(data.len(), 32); + assert!( + data.iter().any(|&b| b != 0), + "output must not be all zeros" + ); + + // 2. Consecutive calls return different data (fresh frames). + let a = extract_entropy(32, &config).expect("first call"); + let b = extract_entropy(32, &config).expect("second call"); + assert_ne!(a, b, "consecutive calls must return different data"); + + // 3. fill_entropy fills the buffer with non-zero camera data. + let mut buf = [0u8; 32]; + fill_entropy(&mut buf, &config).expect("fill_entropy should succeed"); + assert!( + buf.iter().any(|&b| b != 0), + "filled buffer must not be all zeros" + ); + } +} diff --git a/src/entropy/mod.rs b/src/entropy/mod.rs index d165b0b..8fd7c42 100644 --- a/src/entropy/mod.rs +++ b/src/entropy/mod.rs @@ -3,20 +3,17 @@ //! Uses the LavaRnd approach: covered camera sensor with high gain //! captures thermal/quantum noise from the CCD/CMOS dark current. //! -//! Architecture: a background harvester feeds a ring-buffer pool. -//! Random data is only from camera input; if the pool has insufficient bytes, requests fail. +//! Architecture: every request opens the camera, grabs fresh frames, +//! conditions them with SHA-256, and returns. No pool, no background +//! thread, no buffered data. Camera not available = error. mod config; -mod pool; +mod live; mod camera; mod extract; pub use config::{CameraConfig, CameraListItem}; -pub use pool::{ - extract_entropy, fill_entropy, - start_harvester, pool_stats, - subscribe_entropy, unsubscribe_entropy, ensure_producer_running, -}; +pub use live::{extract_entropy, fill_entropy, stream_entropy}; pub use camera::{list_cameras, test_camera, open_camera_with_retry, try_reconnect}; pub use extract::{ extract_raw_lsb, spawn_raw_lsb_stream, CHUNK_SIZE, diff --git a/src/entropy/pool/mod.rs b/src/entropy/pool/mod.rs deleted file mode 100644 index 63b1800..0000000 --- a/src/entropy/pool/mod.rs +++ /dev/null @@ -1,387 +0,0 @@ -//! Entropy pool: random data from camera input only, or not at all. -//! -//! A background thread captures camera frames and feeds conditioned entropy into -//! a ring buffer. API requests pull from that buffer. The only source of random -//! data is the camera; if the pool has insufficient bytes, requests fail. -//! -//! All camera operations have enforced timeouts. Poisoned mutexes are recovered; -//! harvester panics are caught and the thread restarts. - -use std::collections::HashMap; -use std::panic; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::{Mutex, MutexGuard, PoisonError}; -use std::time::{Duration, Instant}; - -use rand::RngExt; -use rand_chacha::ChaCha20Rng; -use rand::SeedableRng; -use sha2::{Digest, Sha256}; - -use super::camera::{open_camera_with_retry, MAX_RETRIES}; -use super::config::CameraConfig; -use super::extract::CHUNK_SIZE; - -/// How many bytes to keep in the pool ring buffer. -const POOL_CAPACITY: usize = 256 * 1024; // 256 KB -/// Camera frame() deadline: treat as error if exceeded. -const FRAME_TIMEOUT: Duration = Duration::from_secs(5); -/// Base reconnect interval (doubled on each consecutive failure). -const RECONNECT_BASE: Duration = Duration::from_secs(2); -/// Maximum reconnect backoff cap. -const RECONNECT_MAX: Duration = Duration::from_secs(60); -/// Minimum camera bytes before marking pool as seeded (diagnostics only). -const MIN_SEED_BYTES: usize = 64; -/// How long to wait before restarting a panicked harvester thread. -const PANIC_RESTART_DELAY: Duration = Duration::from_secs(3); - -/// Global pool; output is only from camera buffer. -static RESILIENT_POOL: std::sync::OnceLock = std::sync::OnceLock::new(); - -// ── Poison-safe mutex helper ──────────────────────────────────────── -/// Lock a mutex, recovering from poison (prior panic) instead of panicking. -fn lock_or_recover(m: &Mutex) -> MutexGuard<'_, T> { - m.lock().unwrap_or_else(PoisonError::into_inner) -} - -struct PoolInner { - /// Ring buffer of conditioned camera entropy bytes. - buf: Vec, - /// Write position (where the next byte will be written). - write_pos: usize, - /// Read position (where the next byte will be consumed from). - read_pos: usize, - /// Number of valid unconsumed bytes available. - available: usize, - /// Total camera bytes harvested since start. - total_harvested: u64, -} - -pub struct ResilientPool { - inner: Mutex, - /// Internal; not used for any output. Output is only from camera buffer. - rng: Mutex, - /// True once pool has received enough camera bytes (diagnostics only). - seeded: AtomicBool, - /// Set to true while the harvester supervisor thread is alive. - producer_running: AtomicBool, - /// Monotonic counter of harvester (re)starts for diagnostics. - harvester_starts: AtomicU64, - /// Subscriber channels for /stream endpoint. - subscribers: Mutex, -} - -struct SubscriberMap { - subs: HashMap>>, - next_id: u64, -} - -fn get_pool() -> &'static ResilientPool { - RESILIENT_POOL.get_or_init(|| { - let seed = [0u8; 32]; // rng is not used for output; only camera buffer is. - let rng = ChaCha20Rng::from_seed(seed); - ResilientPool { - inner: Mutex::new(PoolInner { - buf: vec![0u8; POOL_CAPACITY], - write_pos: 0, - read_pos: 0, - available: 0, - total_harvested: 0, - }), - rng: Mutex::new(rng), - seeded: AtomicBool::new(false), - producer_running: AtomicBool::new(false), - harvester_starts: AtomicU64::new(0), - subscribers: Mutex::new(SubscriberMap { - subs: HashMap::new(), - next_id: 0, - }), - } - }) -} - -// ── Exponential backoff helper ────────────────────────────────────── -fn backoff_duration(attempt: u32) -> Duration { - let secs = RECONNECT_BASE - .as_secs() - .saturating_mul(1u64.checked_shl(attempt).unwrap_or(u64::MAX)); - let capped = Duration::from_secs(secs).min(RECONNECT_MAX); - // Add ~25% jitter so multiple instances don't thundering-herd. - let jitter_ms = (capped.as_millis() as u64) / 4; - capped + Duration::from_millis(jitter_ms) -} - -/// Push conditioned camera bytes into the pool. -pub(crate) fn pool_push(data: &[u8]) { - let pool = get_pool(); - let mut inner = lock_or_recover(&pool.inner); - for &byte in data { - let pos = inner.write_pos; - inner.buf[pos] = byte; - inner.write_pos = (pos + 1) % POOL_CAPACITY; - if inner.available < POOL_CAPACITY { - inner.available += 1; - } else { - inner.read_pos = (inner.read_pos + 1) % POOL_CAPACITY; - } - } - inner.total_harvested += data.len() as u64; - let total = inner.total_harvested; - drop(inner); - - if data.len() >= 32 && total >= MIN_SEED_BYTES as u64 { - let mut seed = [0u8; 32]; - seed.copy_from_slice(&data[..32]); - let mut rng = lock_or_recover(&pool.rng); - let mut current = [0u8; 32]; - rng.fill(&mut current); - for i in 0..32 { - seed[i] ^= current[i]; - } - *rng = ChaCha20Rng::from_seed(seed); - drop(rng); - pool.seeded.store(true, Ordering::Release); - } -} - -/// Pull bytes from the pool. Only from camera. Fails if fewer than `num_bytes` available. -pub fn pool_pull(num_bytes: usize) -> Result, String> { - let pool = get_pool(); - let mut out = Vec::with_capacity(num_bytes); - - { - let mut inner = lock_or_recover(&pool.inner); - if inner.available < num_bytes { - return Err(format!( - "insufficient camera entropy: need {} bytes, {} available (cover lens and wait for harvest)", - num_bytes, inner.available - )); - } - for _ in 0..num_bytes { - let pos = inner.read_pos; - out.push(inner.buf[pos]); - inner.read_pos = (pos + 1) % POOL_CAPACITY; - } - inner.available -= num_bytes; - } - - Ok(out) -} - -/// Extract entropy: only from camera. Fails if pool has insufficient bytes. -pub fn extract_entropy(num_bytes: usize, _config: &CameraConfig) -> Result, String> { - pool_pull(num_bytes) -} - -/// Fill a buffer with entropy (used by OpenSSL provider). -pub fn fill_entropy(out: &mut [u8], config: &CameraConfig) -> Result<(), String> { - let data = extract_entropy(out.len(), config)?; - out.copy_from_slice(&data); - Ok(()) -} - -// ── Harvester supervisor ──────────────────────────────────────────── - -/// Start the background camera harvester. Safe to call multiple times. -/// Spawns a supervisor thread that catches panics and restarts the -/// harvester loop automatically. -pub fn start_harvester(config: CameraConfig) { - let pool = get_pool(); - if pool.producer_running.swap(true, Ordering::SeqCst) { - return; // Already running. - } - - std::thread::Builder::new() - .name("harvester-supervisor".into()) - .spawn(move || { - eprintln!("[harvester] supervisor started"); - loop { - let cfg = config; - pool.harvester_starts.fetch_add(1, Ordering::Relaxed); - let run = pool.harvester_starts.load(Ordering::Relaxed); - eprintln!("[harvester] starting worker (run #{})", run); - - let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - harvester_loop(cfg); - })); - - match result { - Ok(()) => { - eprintln!("[harvester] worker exited normally, restarting"); - } - Err(e) => { - let msg = if let Some(s) = e.downcast_ref::<&str>() { - s.to_string() - } else if let Some(s) = e.downcast_ref::() { - s.clone() - } else { - "unknown panic".to_string() - }; - eprintln!( - "[harvester] worker PANICKED: {}, restarting in {:?}", - msg, PANIC_RESTART_DELAY - ); - } - } - - std::thread::sleep(PANIC_RESTART_DELAY); - } - }) - .expect("failed to spawn harvester supervisor thread"); -} - -/// The actual harvester work loop (runs inside catch_unwind). -fn harvester_loop(config: CameraConfig) { - let mut hasher = Sha256::new(); - let mut frame_counter: u64 = 0; - let mut reconnect_attempts: u32 = 0; - - loop { - let mut camera = match open_camera_with_retry(&config) { - Ok(c) => { - eprintln!("[harvester] camera opened successfully"); - reconnect_attempts = 0; - c - } - Err(e) => { - let delay = backoff_duration(reconnect_attempts); - eprintln!( - "[harvester] camera open failed: {}, retrying in {:?} (attempt {})", - e, delay, reconnect_attempts + 1 - ); - reconnect_attempts = reconnect_attempts.saturating_add(1); - std::thread::sleep(delay); - continue; - } - }; - - let mut consecutive_errors: u32 = 0; - - loop { - let frame_result = frame_with_deadline(&mut camera, FRAME_TIMEOUT); - - match frame_result { - Ok(frame) => { - consecutive_errors = 0; - let raw = frame.buffer(); - let lsbs: Vec = raw.iter().map(|b| b & 0x03).collect(); - - for (chunk_idx, chunk) in lsbs.chunks(CHUNK_SIZE).enumerate() { - hasher.update(chunk); - hasher.update(&frame_counter.to_le_bytes()); - hasher.update(&(chunk_idx as u64).to_le_bytes()); - let hash = hasher.finalize_reset().to_vec(); - - pool_push(&hash); - distribute_to_subscribers(&hash, chunk_idx); - } - frame_counter += 1; - } - Err(e) => { - consecutive_errors += 1; - eprintln!( - "[harvester] frame error ({}x): {}", - consecutive_errors, e - ); - - if consecutive_errors >= MAX_RETRIES { - let delay = backoff_duration(reconnect_attempts); - eprintln!( - "[harvester] too many errors, reconnecting in {:?}", - delay - ); - camera.stop_stream().ok(); - reconnect_attempts = reconnect_attempts.saturating_add(1); - std::thread::sleep(delay); - break; - } - std::thread::sleep(Duration::from_millis(500)); - } - } - } - } -} - -/// Grab a camera frame with an enforced deadline. -/// If camera.frame() exceeds the timeout, we treat it as an error. -fn frame_with_deadline( - camera: &mut nokhwa::Camera, - timeout: Duration, -) -> Result { - let start = Instant::now(); - match camera.frame() { - Ok(f) => { - let elapsed = start.elapsed(); - if elapsed > timeout { - eprintln!( - "[harvester] frame took {:?} (exceeds {:?} deadline)", - elapsed, timeout - ); - return Err(format!("frame exceeded deadline ({:?})", elapsed)); - } - Ok(f) - } - Err(e) => Err(e.to_string()), - } -} - -/// Send hash to one stream subscriber (round-robin by chunk index). -fn distribute_to_subscribers(hash: &[u8], chunk_idx: usize) { - let pool = get_pool(); - let mut subs = lock_or_recover(&pool.subscribers); - if subs.subs.is_empty() { - return; - } - let ids: Vec = subs.subs.keys().copied().collect(); - let id = ids[chunk_idx % ids.len()]; - if let Some(tx) = subs.subs.get(&id) { - if tx.send(hash.to_vec()).is_err() { - subs.subs.remove(&id); - } - } -} - -/// Subscribe to the live entropy stream. Returns (id, receiver). -pub fn subscribe_entropy() -> (u64, std::sync::mpsc::Receiver>) { - let (tx, rx) = std::sync::mpsc::sync_channel(4); - let pool = get_pool(); - let mut subs = lock_or_recover(&pool.subscribers); - let id = subs.next_id; - subs.next_id += 1; - subs.subs.insert(id, tx); - (id, rx) -} - -/// Unsubscribe from the live entropy stream. -pub fn unsubscribe_entropy(id: u64) { - let pool = get_pool(); - let mut subs = lock_or_recover(&pool.subscribers); - subs.subs.remove(&id); -} - -/// Start the producer if not running (compatibility shim for stream endpoint). -pub fn ensure_producer_running(config: CameraConfig) { - start_harvester(config); -} - -/// Get pool statistics for health/debug. -pub fn pool_stats() -> (u64, usize, bool) { - let pool = get_pool(); - let inner = lock_or_recover(&pool.inner); - let restarts = pool.harvester_starts.load(Ordering::Relaxed); - if restarts > 1 { - eprintln!( - "[pool] note: harvester has been restarted {} time(s)", - restarts - 1 - ); - } - ( - inner.total_harvested, - inner.available, - pool.seeded.load(Ordering::Acquire), - ) -} - -#[cfg(test)] -mod tests; - diff --git a/src/entropy/pool/tests.rs b/src/entropy/pool/tests.rs deleted file mode 100644 index 824ed49..0000000 --- a/src/entropy/pool/tests.rs +++ /dev/null @@ -1,96 +0,0 @@ -//! Pool tests: verify the critical invariant that all entropy output comes -//! exclusively from camera-sourced data pushed into the pool. There is -//! no CSPRNG fallback; requests fail if the pool is empty. -//! -//! All pool tests run in a single #[test] function because the pool is a -//! process-wide singleton (OnceLock). Running them sequentially in one -//! test avoids races from cargo test's parallel thread execution. - -use super::*; - -/// Reset pool to a known-empty state. -fn reset_pool() { - let pool = get_pool(); - let mut inner = lock_or_recover(&pool.inner); - inner.available = 0; - inner.read_pos = 0; - inner.write_pos = 0; - inner.total_harvested = 0; -} - -/// All pool invariant tests, run sequentially to avoid global state races. -#[test] -fn pool_camera_only_invariants() { - // ── 1. Empty pool returns error (no CSPRNG fallback) ──────────── - reset_pool(); - let result = pool_pull(1); - assert!(result.is_err(), "pool_pull must fail when pool is empty"); - let err = result.unwrap_err(); - assert!( - err.contains("insufficient camera entropy"), - "error must mention insufficient camera entropy, got: {}", - err - ); - - // ── 2. Push/pull round-trip: data in = data out ───────────────── - reset_pool(); - let input: Vec = (0..64).collect(); - pool_push(&input); - let output = pool_pull(64).expect("pool_pull should succeed after push"); - assert_eq!(output, input, "pulled data must match pushed data exactly"); - - // ── 3. Pool empty after full drain (no synthetic fill) ────────── - reset_pool(); - let data: Vec = vec![0xAB; 128]; - pool_push(&data); - let pulled = pool_pull(128).expect("should pull 128 bytes"); - assert_eq!(pulled.len(), 128); - let result = pool_pull(1); - assert!( - result.is_err(), - "pool must be empty after draining all pushed data (no CSPRNG fill)" - ); - - // ── 4. extract_entropy fails on empty pool ────────────────────── - reset_pool(); - let config = CameraConfig::default(); - let result = extract_entropy(32, &config); - assert!( - result.is_err(), - "extract_entropy must fail when no camera data in pool" - ); - - // ── 5. fill_entropy fails on empty pool ───────────────────────── - reset_pool(); - let mut buf = [0u8; 32]; - let result = fill_entropy(&mut buf, &config); - assert!( - result.is_err(), - "fill_entropy must fail when pool has no camera data" - ); - assert_eq!( - buf, [0u8; 32], - "buffer must not be modified on failure (no fallback write)" - ); - - // ── 6. Partial pull: all-or-nothing ───────────────────────────── - reset_pool(); - pool_push(&[1, 2, 3, 4]); - let result = pool_pull(8); - assert!( - result.is_err(), - "requesting more bytes than available must fail (no partial fill)" - ); - // The 4 bytes should still be there (pull was atomic). - let result = pool_pull(4); - assert!(result.is_ok(), "4 bytes should still be available"); - assert_eq!(result.unwrap(), vec![1, 2, 3, 4]); - - // ── 7. pool_stats tracks camera bytes ─────────────────────────── - reset_pool(); - pool_push(&[0u8; 100]); - let pool = get_pool(); - let inner = lock_or_recover(&pool.inner); - assert_eq!(inner.total_harvested, 100, "total_harvested must count pushed bytes"); - assert_eq!(inner.available, 100, "available must reflect pushed bytes"); -} diff --git a/src/lib.rs b/src/lib.rs index a8d8254..f65829b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,9 +11,9 @@ pub mod provider; pub mod tools; pub use entropy::{ - extract_entropy, fill_entropy, start_harvester, pool_stats, - list_cameras, spawn_raw_lsb_stream, subscribe_entropy, unsubscribe_entropy, - ensure_producer_running, test_camera, extract_raw_lsb, CameraConfig, CameraListItem, CHUNK_SIZE, + extract_entropy, fill_entropy, stream_entropy, + list_cameras, spawn_raw_lsb_stream, + test_camera, extract_raw_lsb, CameraConfig, CameraListItem, CHUNK_SIZE, }; pub use tools::{ diff --git a/src/main.rs b/src/main.rs index 5efeda9..e38be6f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ //! Camera QRNG HTTP Server //! -//! Serves quantum random bytes via HTTP API. Also builds as an OpenSSL provider. +//! Serves quantum random bytes via HTTP API. Every request opens the camera, +//! grabs fresh frames, and returns conditioned entropy. No pool, no buffer. use axum::{ body::Body, @@ -11,8 +12,7 @@ use axum::{ Router, }; use camera_trng::{ - extract_entropy, list_cameras, subscribe_entropy, unsubscribe_entropy, - ensure_producer_running, start_harvester, pool_stats, test_camera, + extract_entropy, list_cameras, stream_entropy, test_camera, CameraConfig, CHUNK_SIZE, }; mod qrng_handlers; @@ -53,7 +53,7 @@ async fn main() -> Result<(), Box> { .unwrap_or(DEFAULT_PORT); let config = CameraConfig::from_env(); - // Test camera access, then start background entropy harvester. + // Test camera access at startup (informational only). println!("Testing camera access..."); match test_camera(&config) { Ok((actual_w, actual_h, frame_size)) => { @@ -76,7 +76,7 @@ async fn main() -> Result<(), Box> { } Err(e) => { eprintln!( - "Camera error: {}. Server will start; /random and stream return errors until camera entropy is available.", + "Camera error: {}. Requests will fail until camera is available.", e ); if e.contains("Lock Rejected") || e.contains("lock") { @@ -87,9 +87,6 @@ async fn main() -> Result<(), Box> { } } - println!("Starting background entropy harvester..."); - start_harvester(config); - let app = Router::new() .route("/", get(index)) .route("/cameras", get(get_cameras)) @@ -108,7 +105,7 @@ async fn main() -> Result<(), Box> { .route("/docs/mcp.json", get(mcp_wellknown)); let addr = format!("0.0.0.0:{}", port); - println!("Camera QRNG (LavaRnd-style) on http://{}", addr); + println!("Camera QRNG (LavaRnd-style, live) on http://{}", addr); let listener = tokio::net::TcpListener::bind(&addr).await?; // Graceful shutdown: listen for SIGINT (Ctrl-C) and SIGTERM. @@ -142,14 +139,25 @@ async fn main() -> Result<(), Box> { async fn index() -> Html<&'static str> { Html(INDEX_HTML) } async fn tools_page() -> Html<&'static str> { Html(TOOLS_HTML) } +/// Health check: tests camera access right now. Camera must be reachable. async fn health() -> Response { - let (total_harvested, pool_available, csprng_seeded) = pool_stats(); - let body = serde_json::json!({ - "status": "ok", - "pool_bytes_available": pool_available, - "total_camera_bytes_harvested": total_harvested, - "csprng_seeded_from_camera": csprng_seeded, - }); + let config = CameraConfig::from_env(); + let result = tokio::task::spawn_blocking(move || test_camera(&config)).await; + let body = match result { + Ok(Ok((w, h, frame_size))) => serde_json::json!({ + "status": "ok", + "camera": format!("{}x{}", w, h), + "frame_bytes": frame_size, + }), + Ok(Err(e)) => serde_json::json!({ + "status": "error", + "error": e, + }), + Err(e) => serde_json::json!({ + "status": "error", + "error": e.to_string(), + }), + }; Response::builder() .header(header::CONTENT_TYPE, "application/json") .body(Body::from(body.to_string())) @@ -178,6 +186,7 @@ async fn get_cameras() -> Response { } } +/// Get random bytes: opens camera, grabs fresh frames, returns conditioned data. async fn get_random(Query(params): Query) -> Response { let current = ACTIVE_REQUESTS.fetch_add(1, Ordering::SeqCst); if current >= MAX_CONCURRENT { @@ -217,12 +226,14 @@ async fn get_random(Query(params): Query) -> Response { } } -/// Cryptographically sound continuous random. GET /stream or /stream?bytes=N. -/// Multiple streams get different data (each chunk goes to one consumer). +/// Stream live camera entropy. Camera stays open for the duration of the stream. async fn get_stream(Query(params): Query) -> Response { let config = CameraConfig::from_env(); - let (sub_id, rx) = subscribe_entropy(); - ensure_producer_running(config); + let (tx, rx) = std::sync::mpsc::sync_channel(4); + + if let Err(e) = stream_entropy(config, tx) { + return (StatusCode::INTERNAL_SERVER_ERROR, e).into_response(); + } let rx = Arc::new(Mutex::new(rx)); let limit = params.bytes; @@ -259,7 +270,8 @@ async fn get_stream(Query(params): Query) -> Response { _ => break, } } - unsubscribe_entropy(sub_id); + // rx is dropped here, which causes the stream_entropy thread + // to see a send error and close the camera. }; let content_type = if hex { "text/plain" } else { "application/octet-stream" }; Response::builder()