camera-trng/scripts/test-randomness.py

252 lines
7.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Randomness validation suite for Camera QRNG.
Tests based on NIST SP 800-22 and ENT methodologies.
Run against binary random data to validate entropy quality.
Usage:
# Test from file
./scripts/test-randomness.py /path/to/random.bin
# Test from server (fetches 1MB)
./scripts/test-randomness.py --server http://127.0.0.1:8787
# Test from stdin
curl -s http://127.0.0.1:8787/random?bytes=1048576 | ./scripts/test-randomness.py -
"""
import argparse
import math
import struct
import sys
import urllib.request
from collections import Counter
def shannon_entropy(data: bytes) -> tuple[float, bool]:
"""Calculate Shannon entropy in bits per byte."""
size = len(data)
freq = Counter(data)
entropy = -sum((c / size) * math.log2(c / size) for c in freq.values())
return entropy, entropy > 7.9
def chi_square_test(data: bytes) -> tuple[float, bool]:
"""Chi-square test for uniform distribution."""
size = len(data)
freq = Counter(data)
expected = size / 256
chi_sq = sum((freq.get(i, 0) - expected) ** 2 / expected for i in range(256))
# For df=255, acceptable range is roughly 200-330 at 95% confidence
return chi_sq, 200 < chi_sq < 330
def arithmetic_mean(data: bytes) -> tuple[float, bool]:
"""Test arithmetic mean (should be ~127.5)."""
mean = sum(data) / len(data)
return mean, 126 < mean < 129
def monte_carlo_pi(data: bytes) -> tuple[float, float, bool]:
"""Estimate Pi using Monte Carlo method."""
pairs = len(data) // 2
inside = sum(
1
for i in range(0, pairs * 2, 2)
if (data[i] / 256) ** 2 + (data[i + 1] / 256) ** 2 <= 1
)
pi_est = 4.0 * inside / pairs
error = abs(pi_est - math.pi) / math.pi * 100
return pi_est, error, error < 1.0
def serial_correlation(data: bytes) -> tuple[float, bool]:
"""Calculate serial correlation coefficient."""
size = len(data)
corr_sum = sum(data[i] * data[i + 1] for i in range(size - 1))
sq_sum = sum(b * b for b in data)
total = sum(data)
serial = (size * corr_sum - total**2 + data[-1] * (total - data[0])) / (
size * sq_sum - total**2
)
return serial, abs(serial) < 0.01
def byte_coverage(data: bytes) -> tuple[int, bool]:
"""Check that all 256 byte values are present."""
coverage = len(set(data))
return coverage, coverage == 256
def bit_balance(data: bytes) -> tuple[float, bool]:
"""Test that bits are balanced (~50% ones)."""
ones = sum(bin(b).count("1") for b in data)
ratio = ones / (len(data) * 8)
return ratio * 100, 0.49 < ratio < 0.51
def longest_run(data: bytes, sample_size: int = 10000) -> tuple[int, bool]:
"""Find longest run of same bit in sample."""
sample = data[:sample_size]
bits = "".join(format(b, "08b") for b in sample)
runs_0 = bits.replace("1", " ").split()
runs_1 = bits.replace("0", " ").split()
max_run = max(len(r) for r in runs_0 + runs_1 if r)
# For 80000 bits, expect max run ~17, threshold 25
return max_run, max_run < 25
def run_all_tests(data: bytes) -> tuple[int, int]:
"""Run all randomness tests and print results."""
size = len(data)
print("=" * 55)
print("CAMERA QRNG RANDOMNESS VALIDATION")
print("=" * 55)
print(f"Sample size: {size:,} bytes ({size / 1024 / 1024:.2f} MB)")
print()
passed = 0
total = 0
# 1. Shannon Entropy
total += 1
entropy, ok = shannon_entropy(data)
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
print(f"1. Shannon Entropy: {entropy:.6f} bits/byte [{status}]")
print(" (ideal: 8.0, threshold: >7.9)")
# 2. Chi-Square
total += 1
chi_sq, ok = chi_square_test(data)
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
print(f"\n2. Chi-Square Test: {chi_sq:.2f} [{status}]")
print(" (expect: ~255, acceptable: 200-330)")
# 3. Mean Value
total += 1
mean, ok = arithmetic_mean(data)
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
print(f"\n3. Arithmetic Mean: {mean:.4f} [{status}]")
print(" (ideal: 127.5, acceptable: 126-129)")
# 4. Monte Carlo Pi
total += 1
pi_est, error, ok = monte_carlo_pi(data)
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
print(f"\n4. Monte Carlo Pi: {pi_est:.6f} [{status}]")
print(f" (actual: 3.141593, error: {error:.4f}%)")
# 5. Serial Correlation
total += 1
serial, ok = serial_correlation(data)
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
print(f"\n5. Serial Correlation: {serial:.6f} [{status}]")
print(" (ideal: 0.0, threshold: |x| < 0.01)")
# 6. Byte Coverage
total += 1
coverage, ok = byte_coverage(data)
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
print(f"\n6. Byte Coverage: {coverage}/256 [{status}]")
# 7. Bit Balance
total += 1
balance, ok = bit_balance(data)
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
print(f"\n7. Bit Balance: {balance:.2f}% ones [{status}]")
print(" (ideal: 50%, acceptable: 49-51%)")
# 8. Longest Run
total += 1
max_run, ok = longest_run(data)
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
print(f"\n8. Longest Run (10KB): {max_run} bits [{status}]")
print(" (threshold: <25 bits)")
print()
print("=" * 55)
print(f"RESULTS: {passed}/{total} tests passed")
if passed == total:
print("VERDICT: EXCELLENT - All tests passed!")
elif passed >= total - 1:
print("VERDICT: GOOD - Minor deviations within tolerance")
else:
print("VERDICT: INVESTIGATE - Multiple test failures")
print("=" * 55)
return passed, total
def fetch_from_server(url: str, num_bytes: int = 1048576) -> bytes:
"""Fetch random bytes from QRNG server."""
endpoint = f"{url.rstrip('/')}/random?bytes={num_bytes}"
print(f"Fetching {num_bytes:,} bytes from {endpoint}...")
with urllib.request.urlopen(endpoint, timeout=120) as response:
return response.read()
def main():
parser = argparse.ArgumentParser(
description="Validate randomness quality of Camera QRNG output"
)
parser.add_argument(
"input",
nargs="?",
default="-",
help="Input file, '-' for stdin, or use --server",
)
parser.add_argument(
"--server",
"-s",
metavar="URL",
help="Fetch from QRNG server (e.g., http://127.0.0.1:8787)",
)
parser.add_argument(
"--bytes",
"-n",
type=int,
default=1048576,
help="Bytes to fetch from server (default: 1MB)",
)
args = parser.parse_args()
# Get data
if args.server:
data = fetch_from_server(args.server, args.bytes)
elif args.input == "-":
data = sys.stdin.buffer.read()
else:
with open(args.input, "rb") as f:
data = f.read()
if len(data) < 10000:
print(f"ERROR: Need at least 10KB of data, got {len(data)} bytes")
sys.exit(1)
# Run tests
passed, total = run_all_tests(data)
# Exit code: 0 if all passed, 1 otherwise
sys.exit(0 if passed == total else 1)
if __name__ == "__main__":
main()