Files
LNXSDK/leenkx/blender/lnx/render_engine.py
2026-02-24 17:35:26 -08:00

1342 lines
50 KiB
Python

"""
Leenkx viewport framebuffer via shared memory.
"""
import bpy
import gpu
from gpu_extras.batch import batch_for_shader
import struct
import ctypes
import sys
import os
import array
import atexit
HAS_GPU_STATE = bpy.app.version >= (3, 0, 0)
if not HAS_GPU_STATE:
import bgl
SHADER_UNIFORM_COLOR = 'UNIFORM_COLOR' if bpy.app.version >= (3, 4, 0) else '2D_UNIFORM_COLOR'
SHADER_IMAGE = 'IMAGE' if bpy.app.version >= (3, 4, 0) else '2D_IMAGE'
HAS_GPU_TEXTURE = bpy.app.version >= (3, 0, 0)
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
# TODO: Fix correctly
def _build_srgb_to_linear_lut():
lut = []
for i in range(256):
srgb = i / 255.0
if srgb <= 0.04045:
linear = srgb / 12.92
else:
linear = ((srgb + 0.055) / 1.055) ** 2.4
lut.append(linear)
return lut
_srgb_to_linear_lut_list = _build_srgb_to_linear_lut()
if HAS_NUMPY:
_srgb_to_linear_lut = np.array(_srgb_to_linear_lut_list, dtype=np.float32)
if sys.platform == 'win32':
import mmap
else:
import mmap
try:
import posix_ipc
HAS_POSIX_IPC = True
except ImportError:
HAS_POSIX_IPC = False
VIEWPORT_BUILD = 0x4B524F4D
VIEWPORT_VERSION = 1
VIEWPORT_SHMEM_NAME_BASE = "KROM_VIEWPORT_FB"
def _get_viewport_shmem_name(viewport_id=None):
"""Get shared memory name for a specific viewport."""
if viewport_id:
return f"{VIEWPORT_SHMEM_NAME_BASE}_{viewport_id}"
return VIEWPORT_SHMEM_NAME_BASE
# Header structure offsets (must match viewport_server.h SharedFramebufferHeader)
# build: 0-3 (4 bytes, uint32)
# version: 4-7 (4 bytes, uint32)
# width: 8-11 (4 bytes, uint32)
# height: 12-15 (4 bytes, uint32)
# frame_id: 16-23 (8 bytes, uint64)
# ready_flag: 24-27 (4 bytes, uint32)
# format: 28-31 (4 bytes, uint32)
# viewport_width: 32-35 (4 bytes, uint32)
# viewport_height: 36-39 (4 bytes, uint32)
# resize_request: 40-43 (4 bytes, uint32)
# shutdown_flag: 44-47 (4 bytes, uint32)
# input_enabled: 48-51 (4 bytes, uint32)
# view_matrix: 52-115 (64 bytes, 16 floats)
# proj_matrix: 116-179 (64 bytes, 16 floats)
# krom_camera_pos: 180-191 (12 bytes, 3 floats)
# krom_camera_rot: 192-207 (16 bytes, 4 floats)
# krom_camera_dirty: 208-211 (4 bytes, uint32)
# input_write_idx: 212-215 (4 bytes, uint32)
# input_read_idx: 216-219 (4 bytes, uint32)
# input_events: 220-603 (384 bytes, 32 events * 12 bytes each)
# Total: 604 bytes
OFFSET_BUILD = 0
OFFSET_WIDTH = 8
OFFSET_FRAME_ID = 16
OFFSET_READY_FLAG = 24
OFFSET_VIEWPORT_WIDTH = 32
OFFSET_RESIZE_REQUEST = 40
OFFSET_SHUTDOWN_FLAG = 44
OFFSET_INPUT_ENABLED = 48
OFFSET_VIEW_MATRIX = 52
OFFSET_PROJ_MATRIX = 116
OFFSET_KROM_CAMERA_POS = 180 # 12 bytes vec
OFFSET_KROM_CAMERA_ROT = 192 # 16 bytes quat
OFFSET_KROM_CAMERA_DIRTY = 208 # 1 uint32
OFFSET_INPUT_WRITE_IDX = 212
OFFSET_INPUT_READ_IDX = 216
OFFSET_INPUT_EVENTS = 220
MAX_INPUT_EVENTS = 32
INPUT_EVENT_SIZE = 12
VIEWPORT_HEADER_SIZE = 604
INPUT_EVENT_NONE = 0
INPUT_EVENT_MOUSE_MOVE = 1
INPUT_EVENT_MOUSE_DOWN = 2
INPUT_EVENT_MOUSE_UP = 3
INPUT_EVENT_KEY_DOWN = 4
INPUT_EVENT_KEY_UP = 5
INPUT_EVENT_MOUSE_WHEEL = 6
MOUSE_BUTTON_LEFT = 0
MOUSE_BUTTON_RIGHT = 1
MOUSE_BUTTON_MIDDLE = 2
_rendered_mode_pending = False
_rendered_mode_retries = 0
_msgbus_owner = object()
@bpy.app.handlers.persistent
def _on_depsgraph_update(scene, depsgraph):
"""Called on depsgraph update - check if render engine changed to Krom."""
try:
if scene.render.engine == 'KROM_VIEWPORT':
for window in bpy.context.window_manager.windows:
for area in window.screen.areas:
if area.type == 'VIEW_3D':
for space in area.spaces:
if space.type == 'VIEW_3D':
if space.shading.type != 'RENDERED':
space.shading.type = 'RENDERED'
area.tag_redraw()
return
except:
pass
def _deferred_set_rendered_mode():
"""Module-level timer callback to set rendered mode."""
global _rendered_mode_pending, _rendered_mode_retries
try:
current_engine = bpy.context.scene.render.engine
for window in bpy.context.window_manager.windows:
screen = window.screen
for area in screen.areas:
if area.type == 'VIEW_3D':
for space in area.spaces:
if space.type == 'VIEW_3D':
current_shading = space.shading.type
if current_shading == 'RENDERED':
_rendered_mode_pending = False
_rendered_mode_retries = 0
return None
space.shading.type = 'RENDERED'
area.tag_redraw()
_rendered_mode_pending = False
_rendered_mode_retries = 0
return None
_rendered_mode_retries += 1
if _rendered_mode_retries < 10:
print(f"Viewport VIEW_3D not found, retrying... ({_rendered_mode_retries})")
return 0.2
except Exception as e:
print(f"Could not set rendered mode: {e}")
import traceback
traceback.print_exc()
_rendered_mode_retries += 1
if _rendered_mode_retries < 10:
return 0.2
_rendered_mode_pending = False
_rendered_mode_retries = 0
return None
class KromViewportEngine(bpy.types.RenderEngine):
"""Leenkx render engine that displays Krom runtime output in viewport."""
bl_idname = 'KROM_VIEWPORT'
bl_label = 'Krom Viewport'
bl_use_preview = False
bl_use_gpu_context = True
bl_use_shading_nodes_custom = False
bl_use_eevee_viewport = True
def _ensure_initialized(self):
if not hasattr(self, '_viewport_init_done'):
self._shm = None
self._shm_file = None
self._texture = None
self._last_frame_id = 0
self._width = 0
self._height = 0
self._initialized = False
self._shader = None
self._batch = None
self._viewport_init_done = True
self._rendered_mode_set = False
self._last_shading_mode = None
self._viewport_id = None
self._shmem_name = None
self._krom_proc = None
self._krom_launch_attempted = False
self._pending_camera_sync = False
self._engine_id = id(self)
# print(f"New engine instance created: {self._engine_id}")
def _set_rendered_mode(self, context):
if self._rendered_mode_set:
return
try:
space = context.space_data
if space and hasattr(space, 'shading'):
current = space.shading.type
if current != 'RENDERED':
space.shading.type = 'RENDERED'
self._rendered_mode_set = True
except Exception as e:
print(f"Failed to set rendered mode: {e}")
def _handle_mode_switch(self, context):
"""Detect shading mode changes and restart Krom on Solid->Rendered switch if needed."""
try:
space = context.space_data
if not space or not hasattr(space, 'shading'):
return
current_mode = space.shading.type
if self._last_shading_mode == 'SOLID' and current_mode == 'RENDERED':
self._initialized = False
self._rendered_mode_set = False
self._krom_launch_attempted = False
self._cleanup_shared_memory()
self._pending_camera_sync = True
if self._last_shading_mode == 'RENDERED' and current_mode != 'RENDERED':
self._rendered_mode_set = False
# keep krom running in solid mode for switching back ti rendered
self._last_shading_mode = current_mode
except Exception as e:
pass
def _sync_camera_once(self, context):
"""Sync the camera once for Solid->Rendered mode"""
if not self._initialized:
return
try:
rv3d = context.region_data
if not rv3d:
return
view_matrix = rv3d.view_matrix
proj_matrix = rv3d.window_matrix
view_data = struct.pack('<16f', *[view_matrix[i][j] for j in range(4) for i in range(4)])
proj_data = struct.pack('<16f', *[proj_matrix[i][j] for j in range(4) for i in range(4)])
if sys.platform == 'win32':
if not self._shm_ptr:
return
ctypes.memmove(self._shm_ptr + OFFSET_VIEW_MATRIX, view_data, len(view_data))
ctypes.memmove(self._shm_ptr + OFFSET_PROJ_MATRIX, proj_data, len(proj_data))
else:
self._shm.seek(OFFSET_VIEW_MATRIX)
self._shm.write(view_data)
self._shm.seek(OFFSET_PROJ_MATRIX)
self._shm.write(proj_data)
except Exception as e:
print(f"Failed camera sync: {e}")
def _launch_krom_process(self, context):
"""Launch Krom process for this viewport instance."""
if self._krom_launch_attempted:
return self._krom_proc is not None
self._krom_launch_attempted = True
if self._viewport_id is None and context is not None:
try:
space = context.space_data
if space:
self._viewport_id = hex(space.as_pointer())[-6:]
except:
pass
if not self._viewport_id:
import time
self._viewport_id = hex(int(time.time() * 1000) % 0xFFFFFF)[-6:]
region = context.region
width = region.width // 2 if region else 960
height = region.height if region else 540
viewport_id = self._viewport_id
engine_id = self._engine_id
shmem_name = _get_viewport_shmem_name(viewport_id)
# print(f"ID: {engine_id} launching: {viewport_id} with shared memory: {shmem_name}")
def deferred_launch():
try:
import lnx.make as make
make.play_viewport(viewport_id, width, height)
except Exception as e:
print(f"Failed to launch viewport: {e}")
import traceback
traceback.print_exc()
return None
bpy.app.timers.register(deferred_launch, first_interval=0.1)
return True
def _stop_krom_process(self):
if self._krom_proc:
try:
import lnx.make as make
make.stop_viewport_runtime(self._viewport_id)
except:
pass
self._krom_proc = None
def _init_shared_memory(self, context=None):
self._ensure_initialized()
if self._initialized:
return True
if self._viewport_id is None and context is not None:
try:
space = context.space_data
if space:
self._viewport_id = hex(space.as_pointer())[-6:]
except:
pass
if self._viewport_id is None:
import time
self._viewport_id = hex(int(time.time() * 1000) % 0xFFFFFF)[-6:]
shmem_name = _get_viewport_shmem_name(self._viewport_id)
self._shmem_name = shmem_name
try:
if sys.platform == 'win32':
max_size = VIEWPORT_HEADER_SIZE + (4096 * 4096 * 4)
try:
import ctypes.wintypes
kernel32 = ctypes.windll.kernel32
FILE_MAP_ALL_ACCESS = 0x000F001F
kernel32.OpenFileMappingW.argtypes = [ctypes.wintypes.DWORD, ctypes.wintypes.BOOL, ctypes.wintypes.LPCWSTR]
kernel32.OpenFileMappingW.restype = ctypes.wintypes.HANDLE
kernel32.MapViewOfFile.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.c_size_t]
kernel32.MapViewOfFile.restype = ctypes.c_void_p
kernel32.UnmapViewOfFile.argtypes = [ctypes.c_void_p]
kernel32.UnmapViewOfFile.restype = ctypes.wintypes.BOOL
kernel32.CloseHandle.argtypes = [ctypes.wintypes.HANDLE]
kernel32.CloseHandle.restype = ctypes.wintypes.BOOL
kernel32.VirtualQuery.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t]
kernel32.VirtualQuery.restype = ctypes.c_size_t
handle = kernel32.OpenFileMappingW(
FILE_MAP_ALL_ACCESS,
False,
shmem_name
)
if not handle:
return False
# print(f"ID: {self._engine_id} CONNECTED: {shmem_name}")
# use 0 to map entire region
ptr = kernel32.MapViewOfFile(
handle,
FILE_MAP_ALL_ACCESS,
0, 0, 0
)
if not ptr:
error = kernel32.GetLastError()
kernel32.CloseHandle(handle)
print(f"Failed to map shared memory: {error}")
return False
self._shm_handle = handle
self._shm_ptr = ptr
# VirtualQuery to get actual size of mapped region
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_void_p),
("AllocationBase", ctypes.c_void_p),
("AllocationProtect", ctypes.c_ulong),
("RegionSize", ctypes.c_size_t),
("State", ctypes.c_ulong),
("Protect", ctypes.c_ulong),
("Type", ctypes.c_ulong),
]
mbi = MEMORY_BASIC_INFORMATION()
result = kernel32.VirtualQuery(ptr, ctypes.byref(mbi), ctypes.sizeof(mbi))
if result == 0 or mbi.RegionSize == 0:
actual_size = VIEWPORT_HEADER_SIZE + (1920 * 1080 * 4)
else:
actual_size = mbi.RegionSize
self._shm_size = actual_size
self._shm_buffer = (ctypes.c_ubyte * actual_size).from_address(ptr)
self._initialized = True
# print(f"Connected: {shmem_name}")
return True
except Exception as e:
print(f"Failed to open shared memory: {e}")
return False
else:
posix_shmem_name = f"/{shmem_name}"
max_size = VIEWPORT_HEADER_SIZE + (4096 * 4096 * 4)
if HAS_POSIX_IPC:
try:
shm = posix_ipc.SharedMemory(posix_shmem_name)
self._shm = mmap.mmap(shm.fd, max_size, mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)
shm.close_fd()
self._initialized = True
# print(f"Connected: {posix_shmem_name}")
return True
except Exception as e:
print(f"Failed to open shared memory: {e}")
return False
else:
# fallback: try /dev/shm directly
shm_path = f"/dev/shm{posix_shmem_name}"
try:
self._shm_file = open(shm_path, 'r+b')
self._shm = mmap.mmap(self._shm_file.fileno(), max_size, mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)
self._initialized = True
# print(f"Connected: {shm_path}")
return True
except Exception as e:
print(f"Failed to open /dev/shm: {e}")
return False
except Exception as e:
print(f"Error creating shared memory: {e}")
return False
def _cleanup_shared_memory(self):
"""Close shared memory handles."""
if sys.platform == 'win32':
if hasattr(self, '_shm_ptr') and self._shm_ptr:
try:
kernel32 = ctypes.windll.kernel32
kernel32.UnmapViewOfFile(self._shm_ptr)
except:
pass
self._shm_ptr = None
if hasattr(self, '_shm_handle') and self._shm_handle:
try:
kernel32 = ctypes.windll.kernel32
kernel32.CloseHandle(self._shm_handle)
except:
pass
self._shm_handle = None
else:
# POSIX cleanup
if hasattr(self, '_shm') and self._shm:
try:
self._shm.close()
except:
pass
self._shm = None
if hasattr(self, '_shm_file') and self._shm_file:
try:
self._shm_file.close()
except:
pass
self._shm_file = None
self._initialized = False
def _read_header(self):
"""Read and parse the shared memory header."""
self._ensure_initialized()
if not self._initialized:
return None
try:
if sys.platform == 'win32':
if not self._shm_ptr:
return None
# use memmove for safety
header_buffer = (ctypes.c_ubyte * VIEWPORT_HEADER_SIZE)()
ctypes.memmove(header_buffer, self._shm_ptr, VIEWPORT_HEADER_SIZE)
header_data = bytes(header_buffer)
else:
self._shm.seek(0)
header_data = self._shm.read(VIEWPORT_HEADER_SIZE)
build, version, width, height = struct.unpack('<IIII', header_data[:16])
frame_id = struct.unpack('<Q', header_data[16:24])[0]
ready_flag, fmt = struct.unpack('<II', header_data[24:32])
if build != VIEWPORT_BUILD:
# shared memory may have been recreated
self._handle_shmem_invalid()
return None
return {
'build': build,
'version': version,
'width': width,
'height': height,
'frame_id': frame_id,
'ready_flag': ready_flag,
'format': fmt
}
except Exception as e:
self._handle_shmem_invalid()
print(f"Failed to read header: {e}")
return None
def _handle_shmem_invalid(self):
"""Handle invalid/stale shared memory by cleaning up and allowing re-init."""
self._cleanup_shared_memory()
self._initialized = False
self._texture = None
self._last_frame_id = 0
def _read_framebuffer(self):
"""Read framebuffer from shared memory."""
header = self._read_header()
if not header:
return None
if header['frame_id'] == self._last_frame_id:
return None
if header['ready_flag'] == 0:
return None
width = header['width']
height = header['height']
if width <= 0 or height <= 0 or width > 4096 or height > 4096:
return None
try:
pixel_size = width * height * 4
if sys.platform == 'win32':
if not self._shm_ptr:
return None
pixel_buffer = (ctypes.c_ubyte * pixel_size)()
ctypes.memmove(pixel_buffer, self._shm_ptr + VIEWPORT_HEADER_SIZE, pixel_size)
pixels = bytes(pixel_buffer)
ready_bytes = struct.pack('<I', 0)
ctypes.memmove(self._shm_ptr + OFFSET_READY_FLAG, ready_bytes, 4)
else:
self._shm.seek(VIEWPORT_HEADER_SIZE)
pixels = self._shm.read(pixel_size)
self._shm.seek(OFFSET_READY_FLAG)
self._shm.write(struct.pack('<I', 0))
self._last_frame_id = header['frame_id']
self._width = width
self._height = height
return pixels
except Exception as e:
self._handle_shmem_invalid()
print(f"Failed to read framebuffer: {e}")
return None
def _write_camera_matrices(self, context):
"""Write camera matrices to shared memory for Krom to use."""
self._ensure_initialized()
if not self._initialized:
return
try:
rv3d = context.region_data
if not rv3d:
return
view_matrix = rv3d.view_matrix
proj_matrix = rv3d.window_matrix
view_data = struct.pack('<16f', *[view_matrix[i][j] for j in range(4) for i in range(4)])
proj_data = struct.pack('<16f', *[proj_matrix[i][j] for j in range(4) for i in range(4)])
if sys.platform == 'win32':
if not self._shm_ptr:
return
ctypes.memmove(self._shm_ptr + OFFSET_VIEW_MATRIX, view_data, len(view_data))
ctypes.memmove(self._shm_ptr + OFFSET_PROJ_MATRIX, proj_data, len(proj_data))
else:
self._shm.seek(OFFSET_VIEW_MATRIX)
self._shm.write(view_data)
self._shm.seek(OFFSET_PROJ_MATRIX)
self._shm.write(proj_data)
except Exception as e:
self._handle_shmem_invalid()
print(f"Failed to write camera matrices: {e}")
def _read_krom_camera(self, context):
"""Read camera position/rotation from Krom and apply to Blender viewport."""
self._ensure_initialized()
if not self._initialized:
return
try:
if sys.platform == 'win32':
if not self._shm_ptr:
return
dirty_data = ctypes.string_at(self._shm_ptr + OFFSET_KROM_CAMERA_DIRTY, 4)
else:
self._shm.seek(OFFSET_KROM_CAMERA_DIRTY)
dirty_data = self._shm.read(4)
dirty = struct.unpack('<I', dirty_data)[0]
if dirty == 0:
return
if sys.platform == 'win32':
pos_data = ctypes.string_at(self._shm_ptr + OFFSET_KROM_CAMERA_POS, 12)
rot_data = ctypes.string_at(self._shm_ptr + OFFSET_KROM_CAMERA_ROT, 16)
else:
self._shm.seek(OFFSET_KROM_CAMERA_POS)
pos_data = self._shm.read(12)
self._shm.seek(OFFSET_KROM_CAMERA_ROT)
rot_data = self._shm.read(16)
pos = struct.unpack('<3f', pos_data)
rot = struct.unpack('<4f', rot_data) # quaternion (x, y, z, w)
clear_data = struct.pack('<I', 0)
if sys.platform == 'win32':
ctypes.memmove(self._shm_ptr + OFFSET_KROM_CAMERA_DIRTY, clear_data, 4)
else:
self._shm.seek(OFFSET_KROM_CAMERA_DIRTY)
self._shm.write(clear_data)
from mathutils import Quaternion, Matrix, Vector
rv3d = context.space_data.region_3d if context.space_data else None
if not rv3d:
return
# Krom quaternion to Blender format (w, x, y, z)
quat = Quaternion((rot[3], rot[0], rot[1], rot[2]))
forward = quat @ Vector((0, 0, -1))
camera_pos = Vector(pos)
view_dist = rv3d.view_distance if rv3d.view_distance > 0 else 10.0
target = camera_pos + forward * view_dist
rv3d.view_location = target
rv3d.view_rotation = quat
except Exception as e:
pass
def _request_resize(self, width, height):
"""Request Krom to resize the viewport."""
self._ensure_initialized()
if not self._initialized:
return
if hasattr(self, '_last_requested_width') and hasattr(self, '_last_requested_height'):
if self._last_requested_width == width and self._last_requested_height == height:
return
self._last_requested_width = width
self._last_requested_height = height
try:
data = struct.pack('<III', width, height, 1)
if sys.platform == 'win32':
if not self._shm_ptr:
return
ctypes.memmove(self._shm_ptr + OFFSET_VIEWPORT_WIDTH, data, len(data))
else:
self._shm.seek(OFFSET_VIEWPORT_WIDTH)
self._shm.write(data)
except Exception as e:
print(f"Failed to request resize: {e}")
def _request_resize_pending(self):
"""Check if we have a pending resize that Krom needs to process."""
if not hasattr(self, '_pending_resize'):
self._pending_resize = False
return self._pending_resize
def _request_shutdown(self):
"""Request Krom to shutdown."""
self._ensure_initialized()
if not self._initialized:
return
try:
data = struct.pack('<I', 1)
if sys.platform == 'win32':
if not self._shm_ptr:
return
ctypes.memmove(self._shm_ptr + OFFSET_SHUTDOWN_FLAG, data, len(data))
else:
self._shm.seek(OFFSET_SHUTDOWN_FLAG)
self._shm.write(data)
except Exception as e:
print(f"Failed to request shutdown: {e}")
def _set_input_enabled(self, enabled):
"""Enable or disable input forwarding to Krom."""
self._ensure_initialized()
if not self._initialized:
return
try:
data = struct.pack('<I', 1 if enabled else 0)
if sys.platform == 'win32':
if not self._shm_ptr:
return
ctypes.memmove(self._shm_ptr + OFFSET_INPUT_ENABLED, data, len(data))
else:
self._shm.seek(OFFSET_INPUT_ENABLED)
self._shm.write(data)
except Exception as e:
pass
def _send_input_event(self, event_type, button=0, x=0, y=0, delta=0, modifiers=0):
"""Send an input event to Krom via shared memory ring buffer."""
self._ensure_initialized()
if not self._initialized:
return False
try:
if sys.platform == 'win32':
if not self._shm_ptr:
return False
write_idx_data = ctypes.string_at(self._shm_ptr + OFFSET_INPUT_WRITE_IDX, 4)
write_idx = struct.unpack('<I', write_idx_data)[0]
# calculate event offset in ring buffer
event_offset = OFFSET_INPUT_EVENTS + (write_idx % MAX_INPUT_EVENTS) * INPUT_EVENT_SIZE
event_data = struct.pack('<BBhhhI', event_type, button, x, y, delta, modifiers)
ctypes.memmove(self._shm_ptr + event_offset, event_data, len(event_data))
new_write_idx = struct.pack('<I', (write_idx + 1) % MAX_INPUT_EVENTS)
ctypes.memmove(self._shm_ptr + OFFSET_INPUT_WRITE_IDX, new_write_idx, 4)
return True
else:
self._shm.seek(OFFSET_INPUT_WRITE_IDX)
write_idx = struct.unpack('<I', self._shm.read(4))[0]
event_offset = OFFSET_INPUT_EVENTS + (write_idx % MAX_INPUT_EVENTS) * INPUT_EVENT_SIZE
self._shm.seek(event_offset)
event_data = struct.pack('<BBhhhI', event_type, button, x, y, delta, modifiers)
self._shm.write(event_data)
self._shm.seek(OFFSET_INPUT_WRITE_IDX)
self._shm.write(struct.pack('<I', (write_idx + 1) % MAX_INPUT_EVENTS))
return True
except Exception as e:
return False
def _create_shader(self):
"""Create shader for drawing the framebuffer texture."""
self._ensure_initialized()
if self._shader:
return
self._shader = gpu.shader.from_builtin(SHADER_IMAGE)
def _update_texture(self, pixels, width, height):
"""Create or update the GPU texture with new pixel data."""
self._ensure_initialized()
try:
num_pixels = width * height * 4
if len(pixels) < num_pixels:
return
# clear old texture if dimensions changed
if hasattr(self, '_tex_width') and hasattr(self, '_tex_height'):
if self._tex_width != width or self._tex_height != height:
self._texture = None
# TODO: re-investigate LUT for linearization
if HAS_NUMPY:
pixel_array = np.frombuffer(pixels[:num_pixels], dtype=np.uint8).copy()
pixel_array[3::4] = 255
float_array = _srgb_to_linear_lut[pixel_array]
float_array[3::4] = 1.0
buffer = gpu.types.Buffer('FLOAT', num_pixels, float_array)
else:
pixel_array = array.array('B', pixels[:num_pixels])
float_pixels = []
for i, p in enumerate(pixel_array):
if i % 4 == 3: # alpha channel
float_pixels.append(1.0)
else:
float_pixels.append(_srgb_to_linear_lut_list[p])
buffer = gpu.types.Buffer('FLOAT', len(float_pixels), float_pixels)
self._texture = gpu.types.GPUTexture((width, height), format='RGBA32F', data=buffer)
self._tex_width = width
self._tex_height = height
except Exception as e:
print(f"Failed to update texture: {e}")
self._texture = None
def render(self, depsgraph):
"""Disabled render method for final renders (F12)."""
pass
def view_update(self, context, depsgraph):
"""Called when the scene is updated."""
self._ensure_initialized()
if not self._init_shared_memory(context):
return
region = context.region
if region:
if region.width != self._width or region.height != self._height:
if region.width > 0 and region.height > 0:
self._request_resize(region.width, region.height)
def view_draw(self, context, depsgraph):
"""Called to draw the viewport."""
self._ensure_initialized()
self._handle_mode_switch(context)
was_initialized = self._initialized
if not self._init_shared_memory(context):
if not self._krom_launch_attempted:
# print(f"Shared memory not found, launching Krom for viewport {self._viewport_id}")
self._launch_krom_process(context)
self._draw_placeholder(context)
return
if not was_initialized and self._initialized:
region = context.region
if region and region.width > 0 and region.height > 0:
self._last_requested_width = 0
self._last_requested_height = 0
# width from region is 2x the actual viewport width.. TODO: re-investigate
initial_width = region.width // 2
self._request_resize(initial_width, region.height)
if self._pending_camera_sync:
self._pending_camera_sync = False
self._sync_camera_once(context)
self._set_rendered_mode(context)
# TODO: input forwarding when viewport is active
global _active_krom_engine, _active_krom_engines, _active_viewport_id, _input_operator_running
_active_krom_engine = self # legacy single-engine reference
if self._viewport_id:
_active_krom_engines[self._viewport_id] = self
_active_viewport_id = self._viewport_id
self._set_input_enabled(True)
if not _input_operator_running:
bpy.app.timers.register(_start_input_capture, first_interval=0.1)
else:
if not hasattr(self, '_input_check_counter'):
self._input_check_counter = 0
self._input_check_counter += 1
if self._input_check_counter > 60:
self._input_check_counter = 0
bpy.app.timers.register(_start_input_capture, first_interval=0.5)
region = context.region
if region and region.width > 0 and region.height > 0:
self._request_resize(region.width, region.height)
self._read_krom_camera(context)
pixels = self._read_framebuffer()
if pixels:
self._update_texture(pixels, self._width, self._height)
if not self._texture:
self._draw_placeholder(context)
return
self._create_shader()
if HAS_GPU_STATE:
gpu.state.blend_set('NONE')
else:
bgl.glDisable(bgl.GL_BLEND)
region = context.region
vertices = (
(0, 0),
(region.width, 0),
(region.width, region.height),
(0, region.height),
)
tex_coords = (
(0, 1), # Flip Y
(1, 1),
(1, 0),
(0, 0),
)
indices = ((0, 1, 2), (0, 2, 3))
self._shader.bind()
self._shader.uniform_sampler("image", self._texture)
batch = batch_for_shader(self._shader, 'TRIS', {"pos": vertices, "texCoord": tex_coords}, indices=indices)
batch.draw(self._shader)
if HAS_GPU_STATE:
gpu.state.blend_set('NONE')
else:
bgl.glDisable(bgl.GL_BLEND)
self.tag_redraw()
def _draw_placeholder(self, context):
"""Draw a placeholder when Krom is not connected."""
# Simple colored background
if HAS_GPU_STATE:
gpu.state.blend_set('ALPHA')
else:
bgl.glEnable(bgl.GL_BLEND)
bgl.glBlendFunc(bgl.GL_SRC_ALPHA, bgl.GL_ONE_MINUS_SRC_ALPHA)
shader = gpu.shader.from_builtin(SHADER_UNIFORM_COLOR)
shader.bind()
shader.uniform_float("color", (0.1, 0.1, 0.15, 1.0))
region = context.region
vertices = (
(0, 0),
(region.width, 0),
(region.width, region.height),
(0, region.height),
)
indices = ((0, 1, 2), (0, 2, 3))
batch = batch_for_shader(shader, 'TRIS', {"pos": vertices}, indices=indices)
batch.draw(shader)
if HAS_GPU_STATE:
gpu.state.blend_set('NONE')
else:
bgl.glDisable(bgl.GL_BLEND)
class KROM_OT_viewport_input(bpy.types.Operator):
"""Modal operator to capture and forward input to Krom viewport"""
bl_idname = "krom.viewport_input"
bl_label = "Krom Viewport Input"
bl_options = {'INTERNAL'}
_engine = None
_last_mouse_x = 0
_last_mouse_y = 0
def modal(self, context, event):
global _input_operator_running
# check if we should stop
if event.type == 'ESC':
try:
if self._engine:
self._engine._set_input_enabled(False)
except ReferenceError:
pass
_input_operator_running = False
return {'CANCELLED'}
if event.type in {'SCREEN_SET', 'SCREEN_CHANGED'}:
return {'PASS_THROUGH'}
if not context.space_data or not context.area:
return {'PASS_THROUGH'}
if context.space_data.type == 'VIEW_3D':
if context.engine != 'KROM_VIEWPORT':
try:
if self._engine:
self._engine._set_input_enabled(False)
except ReferenceError:
pass
_input_operator_running = False
return {'CANCELLED'}
current_engine = self._get_engine_for_area(context, event)
if current_engine:
self._engine = current_engine
try:
if not self._engine or not self._engine._initialized:
return {'PASS_THROUGH'}
except ReferenceError:
_input_operator_running = False
return {'CANCELLED'}
region = context.region
if not region:
return {'PASS_THROUGH'}
if region.type != 'WINDOW':
return {'PASS_THROUGH'}
mouse_in_region = (0 <= event.mouse_region_x <= region.width and
0 <= event.mouse_region_y <= region.height)
if not mouse_in_region:
return {'PASS_THROUGH'}
area = context.area
if area:
mouse_x_abs = event.mouse_x
mouse_y_abs = event.mouse_y
for rgn in area.regions:
# skip the main window region where we actually want input
if rgn.type == 'WINDOW':
continue
# check if mouse is over toolbar, header, UI panel, etc.
if (rgn.x <= mouse_x_abs <= rgn.x + rgn.width and
rgn.y <= mouse_y_abs <= rgn.y + rgn.height):
# mouse is over a UI region - pass through
return {'PASS_THROUGH'}
mouse_x = event.mouse_region_x
# flip Y for Krom
mouse_y = region.height - event.mouse_region_y
try:
if event.type == 'MOUSEMOVE':
self._engine._send_input_event(INPUT_EVENT_MOUSE_MOVE, x=mouse_x, y=mouse_y)
self._last_mouse_x = mouse_x
self._last_mouse_y = mouse_y
return {'RUNNING_MODAL'}
if event.type == 'LEFTMOUSE':
if event.value == 'PRESS':
self._engine._send_input_event(INPUT_EVENT_MOUSE_DOWN, button=MOUSE_BUTTON_LEFT, x=mouse_x, y=mouse_y)
elif event.value == 'RELEASE':
self._engine._send_input_event(INPUT_EVENT_MOUSE_UP, button=MOUSE_BUTTON_LEFT, x=mouse_x, y=mouse_y)
return {'RUNNING_MODAL'}
if event.type == 'RIGHTMOUSE':
if event.value == 'PRESS':
self._engine._send_input_event(INPUT_EVENT_MOUSE_DOWN, button=MOUSE_BUTTON_RIGHT, x=mouse_x, y=mouse_y)
elif event.value == 'RELEASE':
self._engine._send_input_event(INPUT_EVENT_MOUSE_UP, button=MOUSE_BUTTON_RIGHT, x=mouse_x, y=mouse_y)
return {'RUNNING_MODAL'}
if event.type == 'MIDDLEMOUSE':
if event.value == 'PRESS':
self._engine._send_input_event(INPUT_EVENT_MOUSE_DOWN, button=MOUSE_BUTTON_MIDDLE, x=mouse_x, y=mouse_y)
elif event.value == 'RELEASE':
self._engine._send_input_event(INPUT_EVENT_MOUSE_UP, button=MOUSE_BUTTON_MIDDLE, x=mouse_x, y=mouse_y)
return {'RUNNING_MODAL'}
if event.type == 'WHEELUPMOUSE':
self._engine._send_input_event(INPUT_EVENT_MOUSE_WHEEL, delta=1)
return {'RUNNING_MODAL'}
if event.type == 'WHEELDOWNMOUSE':
self._engine._send_input_event(INPUT_EVENT_MOUSE_WHEEL, delta=-1)
return {'RUNNING_MODAL'}
if event.value in {'PRESS', 'RELEASE'}:
key_code = self._blender_to_krom_key(event.type)
if key_code is not None:
if event.value == 'PRESS':
self._engine._send_input_event(INPUT_EVENT_KEY_DOWN, button=key_code)
else:
self._engine._send_input_event(INPUT_EVENT_KEY_UP, button=key_code)
return {'RUNNING_MODAL'}
except ReferenceError:
_input_operator_running = False
return {'CANCELLED'}
return {'PASS_THROUGH'}
def _blender_to_krom_key(self, blender_key):
"""Convert Blender key type to Kinc key code."""
# Kinc key codes from keyboard.h
key_map = {
'A': 65, 'B': 66, 'C': 67, 'D': 68, 'E': 69, 'F': 70, 'G': 71, 'H': 72,
'I': 73, 'J': 74, 'K': 75, 'L': 76, 'M': 77, 'N': 78, 'O': 79, 'P': 80,
'Q': 81, 'R': 82, 'S': 83, 'T': 84, 'U': 85, 'V': 86, 'W': 87, 'X': 88,
'Y': 89, 'Z': 90,
'ZERO': 48, 'ONE': 49, 'TWO': 50, 'THREE': 51, 'FOUR': 52,
'FIVE': 53, 'SIX': 54, 'SEVEN': 55, 'EIGHT': 56, 'NINE': 57,
'SPACE': 32, 'BACK_SPACE': 8, 'TAB': 9, 'RET': 13,
'LEFT_SHIFT': 16, 'RIGHT_SHIFT': 16, 'LEFT_CTRL': 17, 'RIGHT_CTRL': 17,
'LEFT_ALT': 18, 'RIGHT_ALT': 18,
'UP_ARROW': 38, 'DOWN_ARROW': 40, 'LEFT_ARROW': 37, 'RIGHT_ARROW': 39,
'ESC': 27, 'DEL': 46, 'HOME': 36, 'END': 35,
'PAGE_UP': 33, 'PAGE_DOWN': 34, 'INSERT': 45,
'F1': 112, 'F2': 113, 'F3': 114, 'F4': 115, 'F5': 116, 'F6': 117,
'F7': 118, 'F8': 119, 'F9': 120, 'F10': 121, 'F11': 122, 'F12': 123,
}
return key_map.get(blender_key)
def _get_engine_for_area(self, context, event=None):
"""Get the correct engine for the viewport under the mouse cursor."""
# Use mouse coordinates to find which viewport the mouse is over
if event:
mouse_x = event.mouse_x
mouse_y = event.mouse_y
for window in bpy.context.window_manager.windows:
for area in window.screen.areas:
if area.type != 'VIEW_3D':
continue
if (area.x <= mouse_x < area.x + area.width and
area.y <= mouse_y < area.y + area.height):
for space in area.spaces:
if space.type == 'VIEW_3D':
try:
viewport_id = hex(space.as_pointer())[-6:]
if viewport_id in _active_krom_engines:
return _active_krom_engines[viewport_id]
except:
pass
break
if context.space_data:
try:
viewport_id = hex(context.space_data.as_pointer())[-6:]
if viewport_id in _active_krom_engines:
return _active_krom_engines[viewport_id]
except:
pass
if _active_viewport_id and _active_viewport_id in _active_krom_engines:
return _active_krom_engines[_active_viewport_id]
return _active_krom_engine
def invoke(self, context, event):
self._engine = self._get_engine_for_area(context)
if not self._engine:
return {'CANCELLED'}
context.window_manager.modal_handler_add(self)
return {'RUNNING_MODAL'}
_active_krom_engines = {}
_active_viewport_id = None
_input_operator_running = False
_active_krom_engine = None
def _start_input_capture():
"""Timer callback to start input capture operator."""
global _input_operator_running
if _active_krom_engine and not _input_operator_running:
try:
for window in bpy.context.window_manager.windows:
for area in window.screen.areas:
if area.type == 'VIEW_3D':
for region in area.regions:
if region.type == 'WINDOW':
override = {'window': window, 'area': area, 'region': region}
with bpy.context.temp_override(**override):
bpy.ops.krom.viewport_input('INVOKE_DEFAULT')
_input_operator_running = True
return None
except Exception as e:
pass
return None
def get_panels():
"""Get panels to ensure world settings and other properties are available."""
exclude_panels = {
'VIEWLAYER_PT_filter',
'VIEWLAYER_PT_layer_passes',
}
compatible_engines = {'BLENDER_RENDER', 'BLENDER_EEVEE', 'BLENDER_EEVEE_NEXT'}
panels = []
for panel in bpy.types.Panel.__subclasses__():
if hasattr(panel, 'COMPAT_ENGINES'):
if panel.COMPAT_ENGINES & compatible_engines:
if panel.__name__ not in exclude_panels:
panels.append(panel)
return panels
def _cleanup_krom_on_exit():
"""Kill any running Krom viewport processes when Blender exits."""
try:
import lnx.make as make
if hasattr(make, '_viewport_processes'):
for viewport_id, proc in list(make._viewport_processes.items()):
if proc and proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=2)
except:
proc.kill()
make._viewport_processes.clear()
if hasattr(make, '_viewport_proc') and make._viewport_proc is not None:
if make._viewport_proc.poll() is None:
make._viewport_proc.terminate()
try:
make._viewport_proc.wait(timeout=2)
except:
make._viewport_proc.kill()
except:
pass
class KROM_OT_stop_viewport(bpy.types.Operator):
"""Stop the Krom viewport process and switch to solid shading"""
bl_idname = "krom.stop_viewport"
bl_label = "Stop Viewport"
bl_description = "Stop Krom viewport rendering and switch to solid mode"
bl_options = {'REGISTER'}
viewport_id: bpy.props.StringProperty(default="")
def execute(self, context):
import lnx.make as make
viewport_id = self.viewport_id
if not viewport_id and context.space_data:
viewport_id = hex(context.space_data.as_pointer())[-6:]
if viewport_id:
make.stop_viewport(viewport_id)
global _active_krom_engines
if viewport_id in _active_krom_engines:
del _active_krom_engines[viewport_id]
if context.space_data and hasattr(context.space_data, 'shading'):
context.space_data.shading.type = 'SOLID'
return {'FINISHED'}
def draw_viewport_stop_button(self, context):
"""Draw X button in viewport header when Krom is running."""
if context.engine != 'KROM_VIEWPORT':
return
space = context.space_data
if not space or not hasattr(space, 'shading'):
return
if space.shading.type != 'RENDERED':
return
viewport_id = hex(space.as_pointer())[-6:]
global _active_krom_engines
if viewport_id not in _active_krom_engines:
layout = self.layout
layout.label(text="Building...", icon='TIME')
return
layout = self.layout
op = layout.operator("krom.stop_viewport", text="", icon='X', emboss=False)
op.viewport_id = viewport_id
def register():
"""Register the render engine."""
bpy.utils.register_class(KromViewportEngine)
bpy.utils.register_class(KROM_OT_viewport_input)
bpy.utils.register_class(KROM_OT_stop_viewport)
bpy.types.VIEW3D_HT_header.append(draw_viewport_stop_button)
for panel in get_panels():
panel.COMPAT_ENGINES.add('KROM_VIEWPORT')
if _on_depsgraph_update not in bpy.app.handlers.depsgraph_update_post:
bpy.app.handlers.depsgraph_update_post.append(_on_depsgraph_update)
atexit.register(_cleanup_krom_on_exit)
def unregister():
"""Unregister the render engine."""
global _active_krom_engine, _active_krom_engines, _active_viewport_id, _input_operator_running, _msgbus_owner
_active_krom_engine = None
_active_krom_engines.clear()
_active_viewport_id = None
_input_operator_running = False
if _on_depsgraph_update in bpy.app.handlers.depsgraph_update_post:
bpy.app.handlers.depsgraph_update_post.remove(_on_depsgraph_update)
_cleanup_krom_on_exit()
try:
atexit.unregister(_cleanup_krom_on_exit)
except:
pass
for panel in get_panels():
if 'KROM_VIEWPORT' in panel.COMPAT_ENGINES:
panel.COMPAT_ENGINES.remove('KROM_VIEWPORT')
bpy.types.VIEW3D_HT_header.remove(draw_viewport_stop_button)
bpy.utils.unregister_class(KROM_OT_stop_viewport)
bpy.utils.unregister_class(KROM_OT_viewport_input)
bpy.utils.unregister_class(KromViewportEngine)