changeset 32:ccc8f0903d2e

Use scheduling to keep better time Our previous approach just hoped that we kept to time, but long processing would slowly make our "sleep" periods wrong. This is especially likely at 0.1s (versus longer sleeps).
author IBBoard <dev@ibboard.co.uk>
date Sat, 27 Feb 2021 15:39:36 +0000
parents 72e04923c4d4
children 6e30e90527d6
files load-record
diffstat 1 files changed, 82 insertions(+), 51 deletions(-) [+]
line wrap: on
line diff
--- a/load-record	Sat Feb 27 15:34:54 2021 +0000
+++ b/load-record	Sat Feb 27 15:39:36 2021 +0000
@@ -4,8 +4,9 @@
 import os
 import os.path
 import rrdtool
-import time
+import sched
 import subprocess
+import threading
 
 from pathlib import Path
 
@@ -30,6 +31,7 @@
 
 fields = len(config)
 
+
 def needs_creating():
 	if not os.path.exists(DB):
 		return True
@@ -44,8 +46,10 @@
 				cur_config[f"ds[{key}].minimal_heartbeat"] != heartbeat:
 				# We don't appear to be able to check min/max from info
 				return True
+		# TODO: Check RRA definitions based on rra[i].cf, rra[i].pdp_per_row and rra[i].rows
 		return False
 
+
 # TODO: Add "pressure" support - relies on "psi=1" in kernel and /proc/pressure/… existing
 if needs_creating():
 	rrdtool.create(DB, '--step', '1',
@@ -60,61 +64,88 @@
 last_avg_idx = 3 + cpus + 2 # load + CPUs + CPU average + GPU
 total_mem = psutil.virtual_memory().total
 
-# Use dmon and assume we'll keep to roughly every second
-# to keep up with its output to reduce the popen overhead
-nv_smi = subprocess.Popen(['nvidia-smi', 'dmon', '-s', 'u'],
-							stdout=subprocess.PIPE,
-							universal_newlines=True)
+# Note: We use some global variables on the assumption that:
+#  1) We just need "the latest" gpu_val value
+#  2) Because gpu_val is numeric then it can't be "inconsistent"
+#  3) The use of the scheduler and its priorities ensures that
+#     the record_interims and record_record functions happen in sequence
+#  4) The record_interims function takes under 1/samples seconds to run
+#
+# If this ever fails, we need to look at multiprocessing.Value and .Array
+gpu_val = 0
+interims = [[0] * samples for _ in range(fields)]
+pos = 0
+
 
-while True:
-	interims = [[] for _ in range(fields)]
-	# Average 10 values internally to reduce RRD updates
-	# and to allow us to max some stats rather than average
-	for _ in range(0, 10):
-		cpu_pcs = psutil.cpu_percent(percpu=True)
-		cpu_pc = sum(cpu_pcs) / cpus
-		#TODO: If cpu_pc > 25% (?) log top processes
-		cpu_states_pc = psutil.cpu_times_percent()
-		loads = os.getloadavg()
-		mem = psutil.virtual_memory()
-		i = 0
-		interims[i].append(loads[0])
-		i = i + 1
-		interims[i].append(loads[1])
+def parse_nvidia_output():
+	global gpu_val
+	nv_smi = subprocess.Popen(['nvidia-smi', 'dmon', '-s', 'u'],
+								stdout=subprocess.PIPE,
+								universal_newlines=True)
+	while True:
+		# Readline blocks, so this thread will update as and when new values are available
+		line = nv_smi.stdout.readline()
+		if line and line[0] != '#':
+			gpu_val = int(line[8:11])
+
+
+def record_interims():
+	global pos
+	scheduler.enter(1.0/samples, 1, record_interims)
+	cur_pos = pos
+	pos = (pos + 1) % samples
+	cpu_pcs = psutil.cpu_percent(percpu=True)
+	cpu_pc = sum(cpu_pcs) / cpus
+	#TODO: If cpu_pc > 25% (?) log top processes
+	cpu_states_pc = psutil.cpu_times_percent()
+	loads = os.getloadavg()
+	mem = psutil.virtual_memory()
+	i = 0
+	interims[i][cur_pos] = loads[0]
+	i = i + 1
+	interims[i][cur_pos] = loads[1]
+	i = i + 1
+	interims[i][cur_pos] = loads[2]
+	i = i + 1
+	for a_cpu_pc in cpu_pcs:
+		interims[i][cur_pos] = a_cpu_pc
 		i = i + 1
-		interims[i].append(loads[2])
-		i = i + 1
-		for a_cpu_pc in cpu_pcs:
-			interims[i].append(a_cpu_pc)
-			i = i + 1
-		interims[i].append(cpu_pc)
-		i = i + 1
-		interims[i].append(0) # Placeholder for GPU
-		i = i + 1
-		interims[i].append(cpu_states_pc.user)
-		i = i + 1
-		interims[i].append(cpu_states_pc.system)
-		i = i + 1
-		interims[i].append(cpu_states_pc.iowait)
-		i = i + 1
-		interims[i].append((mem.used / total_mem) * 100)
-		i = i + 1
-		interims[i].append(((mem.buffers + mem.cached) / total_mem) * 100)
-		time.sleep(0.1)
+	interims[i][cur_pos] = cpu_pc
+	i = i + 1
+	interims[i][cur_pos] = 0 # Placeholder for GPU
+	i = i + 1
+	interims[i][cur_pos] = cpu_states_pc.user
+	i = i + 1
+	interims[i][cur_pos] = cpu_states_pc.system
+	i = i + 1
+	interims[i][cur_pos] = cpu_states_pc.iowait
+	i = i + 1
+	interims[i][cur_pos] = (mem.used / total_mem) * 100
+	i = i + 1
+	interims[i][cur_pos] = ((mem.buffers + mem.cached) / total_mem) * 100
 
-	vals = []
+
+def record_record():
+	global gpu_val, gpu_idx
+	scheduler.enter(1, 2, record_record)
+	vals = [0] * fields
 	for i, interim_vals in enumerate(interims):
 		if i  < last_avg_idx:
 			# Average most values
-			vals.append(sum(interim_vals) / 10)
+			vals[i] = sum(interim_vals) / samples
 		else:
-			# But take the max CPU state value
-			vals.append(max(interim_vals))
+			# But take the max CPU state value and memory usage
+			vals[i] = max(interim_vals)
+
+	vals[gpu_idx] = gpu_val
+	rrdtool.update(DB, "N:{}".format(':'.join(str(val) for val in vals)))
+
 
-	while True:
-		line = nv_smi.stdout.readline()
-		if line[0] != '#':
-			vals[gpu_idx] = int(line[8:11])
-			break
-
-	rrdtool.update(DB, "N:{}".format(':'.join(str(val) for val in vals)))
+nv_thread = threading.Thread(target=parse_nvidia_output)
+nv_thread.start()
+scheduler = sched.scheduler()
+# Let record_interims run and schedule itself
+record_interims()
+# Schedule record recording for 1s later so we've got a set of values, and then it'll schedule future calls
+scheduler.enter(1, 2, record_record)
+scheduler.run()