import sys
import time
import os
import subprocess
import multiprocessing as mp
from PIL import Image
import numpy as np
from tqdm import tqdm
ORGANIC_SECS = 60 # warmup / cooldown duration in seconds
def resize_to_height(img, target_h):
w, h = img.size
target_w = int(w * (target_h / h))
return img.resize((target_w, target_h), Image.LANCZOS)
def worker(core, conn):
# Pre-forked once at startup, persists for the entire run.
# Receives (load, dur) per frame via Pipe and calls stress-ng.
while True:
msg = conn.recv()
if msg is None:
break
load, dur = msg
subprocess.Popen([
"stress-ng", "--quiet", "--cpu", "1",
"--cpu-load", str(load),
"--cpu-method", "apery",
"--timeout", f"{dur}s",
"--taskset", str(core),
]).wait()
def img_to_frames(img, ncpu):
pixels = np.array(img.convert("L")).astype(np.float32) / 255.0
# Mild concave transform: maps [0,1] -> [0, ~0.92], intentionally
# leaving ~8% headroom so the scheduler has room to breathe.
loads = np.floor(2 * (np.exp(pixels) - 1) / (np.exp(pixels) + 1) * 100).astype(int)
return [
[loads[r, c] for r in reversed(range(loads.shape[0]))]
for c in range(loads.shape[1])
]
def run_organic(duration):
# Run parallel ffmpeg encodes for `duration` seconds. Multiple workers at staggered
# s.t. start times fill more cores and create an "natural" texture you'd expect
# from a legitimate use of a machine like this...
ncpu = os.cpu_count() or 1
# A handful of workers, each pinned to a few threads — leaves most cores idle
# so the heatmap looks like background activity, not saturation.
n_workers = max(2, ncpu // 28)
threads_per = 4
ffmpeg_cmd = [
"ffmpeg", "-y",
"-f", "lavfi",
"-i", "testsrc=size=1920x1080:rate=30",
"-c:v", "libx264",
"-preset", "slow",
"-threads", str(threads_per),
"-f", "null", "-",
]
procs = []
for i in range(n_workers):
time.sleep(0.4) # stagger starts slightly for uneven, organic texture
procs.append(subprocess.Popen(
ffmpeg_cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
))
time.sleep(max(0, duration - n_workers * 0.4))
for p in procs:
p.terminate()
for p in procs:
p.wait()
def rayleigh_frames(ncpu, sigmas):
# Each frame: per-core independent Rayleigh sample, clipped at 92
# (the output of our concave transform at pixel=255).
return [
np.random.rayleigh(sigma, size=ncpu).clip(0, 92).astype(int).tolist()
for sigma in sigmas
]
def intro(img_frames, ncpu, n=5):
return [
[np.random.randint(30, 101) if core % 2 == i % 2 else img_frames[i][core]
for core in range(ncpu)]
for i in range(min(n, len(img_frames)))
]
def main(img_paths, pq_path="rezolus.parquet", frame_dur=2):
# Convert one or more images to CPU load values and "paint" them onto Rezolus'
# User-% Per-CPU heatmap by driving per-core load with stress-ng via
# pre-forked worker processes.
ncpu = os.cpu_count()
# Scale all images to ncpu rows, preserving each image's aspect ratio.
imgs = [resize_to_height(Image.open(p), ncpu) for p in img_paths]
per_image = [img_to_frames(img, ncpu) for img in imgs]
# Stitch: rayleigh intro, then images with interstitials, then rayleigh outro.
frames = []
intro_alphas = [8, 8, 16, 8, 8, 16, 32, 64, 64, 32, 16, 64, 32, 96]
frames.extend(rayleigh_frames(ncpu, intro_alphas))
for i, img_frames in enumerate(per_image):
frames.extend(intro(img_frames, ncpu))
frames.extend(img_frames)
frames.extend(rayleigh_frames(ncpu, intro_alphas[::-1]))
for i in range(len(frames) - 1):
if np.random.random() < 0.125:
frames[i], frames[i + 1] = frames[i + 1], frames[i]
# Pre-fork one worker per core. Workers persist for the full run —
# no fork+exec overhead per frame.
conns = []
for core in range(ncpu):
parent_conn, child_conn = mp.Pipe()
mp.Process(target=worker, args=(core, child_conn), daemon=True).start()
child_conn.close()
conns.append(parent_conn)
trace_dur = (2 * ORGANIC_SECS
+ len(frames) * 1.05 * frame_dur
+ 30
) # safety margin
trace_proc = subprocess.Popen(
[
"rezolus", "record",
"--interval", "500ms",
"--duration", f"{int(trace_dur)}s",
"http://localhost:4241",
pq_path,
]
)
time.sleep(10) # let rezolus init before activity starts...
run_organic(ORGANIC_SECS)
for frame in tqdm(frames):
t0 = time.monotonic()
for conn, load in zip(conns, frame):
# Small positive-only jitter softens hard vertical edges in the
# heatmap without introducing visible smear.
dur = frame_dur + np.random.uniform(0.00, 0.1)
conn.send((int(load), dur))
time.sleep(max(0, t0 + frame_dur - time.monotonic()))
for conn in conns:
conn.send(None)
run_organic(ORGANIC_SECS)
trace_proc.wait()
# Usage: python3 work.py img1.png [img2.png ...]
main(sys.argv[1:])