diff --git a/leenkx/blender/lnx/handlers.py b/leenkx/blender/lnx/handlers.py index ef47589..5d62a25 100644 --- a/leenkx/blender/lnx/handlers.py +++ b/leenkx/blender/lnx/handlers.py @@ -2,7 +2,10 @@ import importlib import os import queue import sys +import threading +import time import types +from typing import Dict, Tuple, Callable, Set import bpy from bpy.app.handlers import persistent @@ -30,6 +33,10 @@ if lnx.is_reload(__name__): else: lnx.enable_reload(__name__) +# Module-level storage for active threads (eliminates re-queuing overhead) +_active_threads: Dict[threading.Thread, Callable] = {} +_last_poll_time = 0.0 +_consecutive_empty_polls = 0 @persistent def on_depsgraph_update_post(self): @@ -135,35 +142,113 @@ def always() -> float: def poll_threads() -> float: - """Polls the thread callback queue and if a thread has finished, it - is joined with the main thread and the corresponding callback is - executed in the main thread. """ + Improved thread polling with: + - No re-queuing overhead + - Batch processing of completed threads + - Adaptive timing based on activity + - Better memory management + - Simplified logic flow + """ + global _last_poll_time, _consecutive_empty_polls + current_time = time.time() + + # Process all new threads from queue at once (batch processing) + new_threads_added = 0 try: - thread, callback = make.thread_callback_queue.get(block=False) + while True: + thread, callback = make.thread_callback_queue.get(block=False) + _active_threads[thread] = callback + new_threads_added += 1 except queue.Empty: + pass + + # Early return if no active threads + if not _active_threads: + _consecutive_empty_polls += 1 + # Adaptive timing: longer intervals when consistently empty + if _consecutive_empty_polls > 10: + return 0.5 # Back off when no activity return 0.25 - if thread.is_alive(): - try: - make.thread_callback_queue.put((thread, callback), block=False) - except queue.Full: - return 0.5 - return 0.1 + + # Reset empty poll counter when we have active threads + _consecutive_empty_polls = 0 + + # Find completed threads (single pass, no re-queuing) + completed_threads = [] + for thread in list(_active_threads.keys()): + if not thread.is_alive(): + completed_threads.append(thread) + + # Batch process all completed threads + if completed_threads: + _process_completed_threads(completed_threads) + + # Adaptive timing based on activity level + active_count = len(_active_threads) + if active_count == 0: + return 0.25 + elif active_count <= 3: + return 0.05 # Medium frequency for low activity else: + return 0.01 # High frequency for high activity + +def _process_completed_threads(completed_threads: list) -> None: + """Process a batch of completed threads with robust error handling.""" + for thread in completed_threads: + callback = _active_threads.pop(thread) # Remove from tracking + try: - thread.join() + thread.join() # Should be instant since thread is dead callback() except Exception as e: - # If there is an exception, we can no longer return the time to - # the next call to this polling function, so to keep it running - # we re-register it and then raise the original exception. - try: - bpy.app.timers.unregister(poll_threads) - except ValueError: - pass - bpy.app.timers.register(poll_threads, first_interval=0.01, persistent=True) - # Quickly check if another thread has finished - return 0.01 + # Robust error recovery + _handle_callback_error(e) + continue # Continue processing other threads + + # Explicit cleanup for better memory management + del thread, callback + +def _handle_callback_error(exception: Exception) -> None: + """Centralized error handling with better recovery.""" + try: + # Try to unregister existing timer + bpy.app.timers.unregister(poll_threads) + except ValueError: + pass # Timer wasn't registered, that's fine + + # Re-register timer with slightly longer interval for stability + bpy.app.timers.register(poll_threads, first_interval=0.1, persistent=True) + + # Re-raise the original exception after ensuring timer continuity + raise exception + +def cleanup_polling_system() -> None: + """Optional cleanup function for proper shutdown.""" + global _active_threads, _consecutive_empty_polls + + # Wait for remaining threads to complete (with timeout) + for thread in list(_active_threads.keys()): + if thread.is_alive(): + thread.join(timeout=1.0) # 1 second timeout + + # Clear tracking structures + _active_threads.clear() + _consecutive_empty_polls = 0 + + # Unregister timer + try: + bpy.app.timers.unregister(poll_threads) + except ValueError: + pass + +def get_polling_stats() -> dict: + """Get statistics about the polling system for monitoring.""" + return { + 'active_threads': len(_active_threads), + 'consecutive_empty_polls': _consecutive_empty_polls, + 'thread_ids': [t.ident for t in _active_threads.keys()] + } loaded_py_libraries: dict[str, types.ModuleType] = {}