Update leenkx/blender/lnx/handlers.py

This commit is contained in:
2025-07-15 17:57:38 +00:00
parent 8f073c5ae1
commit a65675ef75

View File

@ -2,7 +2,10 @@ import importlib
import os import os
import queue import queue
import sys import sys
import threading
import time
import types import types
from typing import Dict, Tuple, Callable, Set
import bpy import bpy
from bpy.app.handlers import persistent from bpy.app.handlers import persistent
@ -30,6 +33,10 @@ if lnx.is_reload(__name__):
else: else:
lnx.enable_reload(__name__) lnx.enable_reload(__name__)
# Module-level storage for active threads (eliminates re-queuing overhead)
_active_threads: Dict[threading.Thread, Callable] = {}
_last_poll_time = 0.0
_consecutive_empty_polls = 0
@persistent @persistent
def on_depsgraph_update_post(self): def on_depsgraph_update_post(self):
@ -135,35 +142,113 @@ def always() -> float:
def poll_threads() -> float: def poll_threads() -> float:
"""Polls the thread callback queue and if a thread has finished, it
is joined with the main thread and the corresponding callback is
executed in the main thread.
""" """
Improved thread polling with:
- No re-queuing overhead
- Batch processing of completed threads
- Adaptive timing based on activity
- Better memory management
- Simplified logic flow
"""
global _last_poll_time, _consecutive_empty_polls
current_time = time.time()
# Process all new threads from queue at once (batch processing)
new_threads_added = 0
try: try:
thread, callback = make.thread_callback_queue.get(block=False) while True:
thread, callback = make.thread_callback_queue.get(block=False)
_active_threads[thread] = callback
new_threads_added += 1
except queue.Empty: except queue.Empty:
pass
# Early return if no active threads
if not _active_threads:
_consecutive_empty_polls += 1
# Adaptive timing: longer intervals when consistently empty
if _consecutive_empty_polls > 10:
return 0.5 # Back off when no activity
return 0.25 return 0.25
if thread.is_alive():
try: # Reset empty poll counter when we have active threads
make.thread_callback_queue.put((thread, callback), block=False) _consecutive_empty_polls = 0
except queue.Full:
return 0.5 # Find completed threads (single pass, no re-queuing)
return 0.1 completed_threads = []
for thread in list(_active_threads.keys()):
if not thread.is_alive():
completed_threads.append(thread)
# Batch process all completed threads
if completed_threads:
_process_completed_threads(completed_threads)
# Adaptive timing based on activity level
active_count = len(_active_threads)
if active_count == 0:
return 0.25
elif active_count <= 3:
return 0.05 # Medium frequency for low activity
else: else:
return 0.01 # High frequency for high activity
def _process_completed_threads(completed_threads: list) -> None:
"""Process a batch of completed threads with robust error handling."""
for thread in completed_threads:
callback = _active_threads.pop(thread) # Remove from tracking
try: try:
thread.join() thread.join() # Should be instant since thread is dead
callback() callback()
except Exception as e: except Exception as e:
# If there is an exception, we can no longer return the time to # Robust error recovery
# the next call to this polling function, so to keep it running _handle_callback_error(e)
# we re-register it and then raise the original exception. continue # Continue processing other threads
try:
bpy.app.timers.unregister(poll_threads) # Explicit cleanup for better memory management
except ValueError: del thread, callback
pass
bpy.app.timers.register(poll_threads, first_interval=0.01, persistent=True) def _handle_callback_error(exception: Exception) -> None:
# Quickly check if another thread has finished """Centralized error handling with better recovery."""
return 0.01 try:
# Try to unregister existing timer
bpy.app.timers.unregister(poll_threads)
except ValueError:
pass # Timer wasn't registered, that's fine
# Re-register timer with slightly longer interval for stability
bpy.app.timers.register(poll_threads, first_interval=0.1, persistent=True)
# Re-raise the original exception after ensuring timer continuity
raise exception
def cleanup_polling_system() -> None:
"""Optional cleanup function for proper shutdown."""
global _active_threads, _consecutive_empty_polls
# Wait for remaining threads to complete (with timeout)
for thread in list(_active_threads.keys()):
if thread.is_alive():
thread.join(timeout=1.0) # 1 second timeout
# Clear tracking structures
_active_threads.clear()
_consecutive_empty_polls = 0
# Unregister timer
try:
bpy.app.timers.unregister(poll_threads)
except ValueError:
pass
def get_polling_stats() -> dict:
"""Get statistics about the polling system for monitoring."""
return {
'active_threads': len(_active_threads),
'consecutive_empty_polls': _consecutive_empty_polls,
'thread_ids': [t.ident for t in _active_threads.keys()]
}
loaded_py_libraries: dict[str, types.ModuleType] = {} loaded_py_libraries: dict[str, types.ModuleType] = {}