forked from LeenkxTeam/LNXSDK
		
	
		
			
	
	
		
			347 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			347 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|  | """
 | ||
|  | This module contains the functionality to replace nodes by other nodes | ||
|  | in order to keep files from older Leenkx versions compatible with newer versions. | ||
|  | 
 | ||
|  | Nodes can define custom update procedures which describe how the replacement | ||
|  | should look like. | ||
|  | 
 | ||
|  | Original author: @niacdoial | ||
|  | """
 | ||
|  | import os.path | ||
|  | import time | ||
|  | import traceback | ||
|  | import typing | ||
|  | from typing import Dict, List, Optional, Tuple | ||
|  | 
 | ||
|  | import bpy.props | ||
|  | 
 | ||
|  | import lnx.log as log | ||
|  | import lnx.logicnode.lnx_nodes as lnx_nodes | ||
|  | import lnx.logicnode.lnx_sockets | ||
|  | import lnx.node_utils as node_utils | ||
|  | 
 | ||
|  | if lnx.is_reload(__name__): | ||
|  |     log = lnx.reload_module(log) | ||
|  |     lnx_nodes = lnx.reload_module(lnx_nodes) | ||
|  |     lnx.logicnode.lnx_sockets = lnx.reload_module(lnx.logicnode.lnx_sockets) | ||
|  |     node_utils = lnx.reload_module(node_utils) | ||
|  | else: | ||
|  |     lnx.enable_reload(__name__) | ||
|  | 
 | ||
|  | # List of errors that occurred during the replacement | ||
|  | # Format: (error identifier, node.bl_idname (or None), tree name, exception traceback (optional)) | ||
|  | replacement_errors: List[Tuple[str, Optional[str], str, Optional[str]]] = [] | ||
|  | 
 | ||
|  | 
 | ||
|  | class NodeReplacement: | ||
|  |     """
 | ||
|  |     Represents a simple replacement rule, this can replace nodes of one type to nodes of a second type. | ||
|  |     However, it is fairly limited. For instance, it assumes there are no changes in the type of the inputs or outputs | ||
|  |     Second, it also assumes that node properties (especially EnumProperties) keep the same possible values. | ||
|  | 
 | ||
|  |     - from_node, from_node_version: the type of node to be removed, and its version number | ||
|  |     - to_node, to_node_version: the type of node which takes from_node's place, and its version number | ||
|  |     - *_socket_mapping: a map which defines how the sockets of the old node shall be connected to the new node | ||
|  |       {1: 2} means that anything connected to the socket with index 1 on the original node will be connected to the socket with index 2 on the new node | ||
|  |     - property_mapping: the mapping used to transfer the values of the old node's properties to the new node's properties. | ||
|  |       {"property0": "property1"} mean that the value of the new node's property1 should be the old node's property0's value. | ||
|  |     - input_defaults: a mapping used to give default values to the inputs which aren't overridden otherwise. | ||
|  |     - property_defaults: a mapping used to define the value of the new node's properties, when they aren't overridden otherwise. | ||
|  |     """
 | ||
|  | 
 | ||
|  |     def __init__(self, from_node: str, from_node_version: int, to_node: str, to_node_version: int, | ||
|  |                  in_socket_mapping: Dict[int, int], out_socket_mapping: Dict[int, int], property_mapping: Optional[Dict[str, str]] = None, | ||
|  |                  input_defaults: Optional[Dict[int, any]] = None, property_defaults: Optional[Dict[str, any]] = None): | ||
|  |         self.from_node = from_node | ||
|  |         self.to_node = to_node | ||
|  |         self.from_node_version = from_node_version | ||
|  |         self.to_node_version = to_node_version | ||
|  | 
 | ||
|  |         self.in_socket_mapping = in_socket_mapping | ||
|  |         self.out_socket_mapping = out_socket_mapping | ||
|  |         self.property_mapping = {} if property_mapping is None else property_mapping | ||
|  | 
 | ||
|  |         self.input_defaults = {} if input_defaults is None else input_defaults | ||
|  |         self.property_defaults = {} if property_defaults is None else property_defaults | ||
|  | 
 | ||
|  |     @classmethod | ||
|  |     def Identity(cls, node: 'LnxLogicTreeNode'): | ||
|  |         """Returns a NodeReplacement that does nothing, while operating on a given node.
 | ||
|  |         WARNING: it assumes that all node properties have names that start with "property" | ||
|  |         """
 | ||
|  |         in_socks = {i: i for i in range(len(node.inputs))} | ||
|  |         out_socks = {i: i for i in range(len(node.outputs))} | ||
|  | 
 | ||
|  |         # Find all properties for this node | ||
|  |         props = {} | ||
|  |         possible_properties = [] | ||
|  |         for attrname in dir(node): | ||
|  |             # We assume that property names start with 'property' | ||
|  |             if attrname.startswith('property'): | ||
|  |                 possible_properties.append(attrname) | ||
|  | 
 | ||
|  |         for attrname in possible_properties: | ||
|  |             # Search in type annotations | ||
|  |             if attrname not in node.__annotations__: | ||
|  |                 continue | ||
|  | 
 | ||
|  |             # Properties must be annotated with '_PropertyDeferred', see | ||
|  |             # https://developer.blender.org/rB37e6a1995ac7eeabd5b6a56621ad5a850dae4149 | ||
|  |             # and https://developer.blender.org/rBc44c611c6d8c6ae071b48efb5fc07168f18cd17e | ||
|  |             if not isinstance(node.__annotations__[attrname], bpy.props._PropertyDeferred): | ||
|  |                 continue | ||
|  | 
 | ||
|  |             props[attrname] = attrname | ||
|  | 
 | ||
|  |         return NodeReplacement( | ||
|  |             node.bl_idname, node.lnx_version, node.bl_idname, type(node).lnx_version, | ||
|  |             in_socket_mapping=in_socks, out_socket_mapping=out_socks, | ||
|  |             property_mapping=props | ||
|  |         ) | ||
|  | 
 | ||
|  |     def chain_with(self, other): | ||
|  |         """Modify the current NodeReplacement by "adding" a second replacement after it""" | ||
|  |         if self.to_node != other.from_node or self.to_node_version != other.from_node_version: | ||
|  |             raise TypeError('the given NodeReplacement-s could not be chained') | ||
|  |         self.to_node = other.to_node | ||
|  |         self.to_node_version = other.to_node_version | ||
|  | 
 | ||
|  |         for i1, i2 in self.in_socket_mapping.items(): | ||
|  |             i3 = other.in_socket_mapping[i2] | ||
|  |             self.in_socket_mapping[i1] = i3 | ||
|  |         for i1, i2 in self.out_socket_mapping.items(): | ||
|  |             i3 = other.out_socket_mapping[i2] | ||
|  |             self.out_socket_mapping[i1] = i3 | ||
|  |         for p1, p2 in self.property_mapping.items(): | ||
|  |             p3 = other.property_mapping[p2] | ||
|  |             self.property_mapping[p1] = p3 | ||
|  | 
 | ||
|  |         old_input_defaults = self.input_defaults | ||
|  |         self.input_defaults = other.input_defaults.copy() | ||
|  |         for i, x in old_input_defaults.items(): | ||
|  |             self.input_defaults[ other.in_socket_mapping[i] ] = x | ||
|  | 
 | ||
|  |         old_property_defaults = self.property_defaults | ||
|  |         self.property_defaults = other.property_defaults.copy() | ||
|  |         for p, x in old_property_defaults.items(): | ||
|  |             self.property_defaults[ other.property_mapping[p] ] = x | ||
|  | 
 | ||
|  |     @staticmethod | ||
|  |     def replace_input_socket(tree: bpy.types.NodeTree, socket_src: bpy.types.NodeSocket, socket_dst: bpy.types.NodeSocket): | ||
|  |         if socket_src.is_linked: | ||
|  |             for link in socket_src.links: | ||
|  |                 tree.links.new(link.from_socket, socket_dst) | ||
|  |         else: | ||
|  |             node_utils.set_socket_default(socket_dst, node_utils.get_socket_default(socket_src)) | ||
|  | 
 | ||
|  |     @staticmethod | ||
|  |     def replace_output_socket(tree: bpy.types.NodeTree, socket_src: bpy.types.NodeSocket, socket_dst: bpy.types.NodeSocket): | ||
|  |         if socket_src.is_linked: | ||
|  |             for link in socket_src.links: | ||
|  |                 tree.links.new(socket_dst, link.to_socket) | ||
|  |         else: | ||
|  |             node_utils.set_socket_default(socket_dst, node_utils.get_socket_default(socket_src)) | ||
|  | 
 | ||
|  | 
 | ||
|  | def replace(tree: bpy.types.NodeTree, node: 'LnxLogicTreeNode'): | ||
|  |     """Replaces the given node with its replacement.""" | ||
|  | 
 | ||
|  |     # the node can either return a NodeReplacement object (for simple replacements) | ||
|  |     # or a brand new node, for more complex stuff. | ||
|  |     response = node.get_replacement_node(tree) | ||
|  | 
 | ||
|  |     if isinstance(response, lnx_nodes.LnxLogicTreeNode): | ||
|  |         newnode = response | ||
|  |         # some misc. properties | ||
|  |         node_utils.copy_basic_node_props(from_node=node, to_node=newnode) | ||
|  | 
 | ||
|  |     elif isinstance(response, list):  # a list of nodes: | ||
|  |         for newnode in response: | ||
|  |             node_utils.copy_basic_node_props(from_node=node, to_node=newnode) | ||
|  | 
 | ||
|  |     elif isinstance(response, NodeReplacement): | ||
|  |         replacement = response | ||
|  |         # if the returned object is a NodeReplacement, check that it corresponds to the node (also, create the new node) | ||
|  |         if node.bl_idname != replacement.from_node or node.lnx_version != replacement.from_node_version: | ||
|  |             raise LookupError("The provided NodeReplacement doesn't seem to correspond to the node needing replacement") | ||
|  | 
 | ||
|  |         # Create the replacement node | ||
|  |         newnode = tree.nodes.new(response.to_node) | ||
|  |         if newnode.lnx_version != replacement.to_node_version: | ||
|  |             tree.nodes.remove(newnode) | ||
|  |             raise LookupError("The provided NodeReplacement doesn't seem to correspond to the node needing replacement") | ||
|  | 
 | ||
|  |         # some misc. properties | ||
|  |         node_utils.copy_basic_node_props(from_node=node, to_node=newnode) | ||
|  | 
 | ||
|  |         # now, use the `replacement` to hook up the new node correctly | ||
|  |         # start by applying defaults | ||
|  |         for prop_name, prop_value in replacement.property_defaults.items(): | ||
|  |             setattr(newnode, prop_name, prop_value) | ||
|  |         for input_id, input_value in replacement.input_defaults.items(): | ||
|  |             input_socket = newnode.inputs[input_id] | ||
|  |             node_utils.set_socket_default(input_socket, input_value) | ||
|  | 
 | ||
|  |         # map properties | ||
|  |         for src_prop_name, dest_prop_name in replacement.property_mapping.items(): | ||
|  |             setattr(newnode, dest_prop_name, getattr(node, src_prop_name)) | ||
|  | 
 | ||
|  |         # map inputs | ||
|  |         for src_socket_id, dest_socket_id in replacement.in_socket_mapping.items(): | ||
|  |             src_socket = node.inputs[src_socket_id] | ||
|  |             dest_socket = newnode.inputs[dest_socket_id] | ||
|  |             NodeReplacement.replace_input_socket(tree, src_socket, dest_socket) | ||
|  | 
 | ||
|  |         # map outputs | ||
|  |         for src_socket_id, dest_socket_id in replacement.out_socket_mapping.items(): | ||
|  |             src_socket = node.outputs[src_socket_id] | ||
|  |             dest_socket = newnode.outputs[dest_socket_id] | ||
|  |             NodeReplacement.replace_output_socket(tree, src_socket, dest_socket) | ||
|  | 
 | ||
|  |     else: | ||
|  |         print(response) | ||
|  | 
 | ||
|  |     tree.nodes.remove(node) | ||
|  | 
 | ||
|  | 
 | ||
|  | def replace_all(): | ||
|  |     """Iterate through all logic node trees in the file and check for node updates/replacements to execute.""" | ||
|  |     global replacement_errors | ||
|  | 
 | ||
|  |     replacement_errors.clear() | ||
|  | 
 | ||
|  |     for tree in bpy.data.node_groups: | ||
|  |         if tree.bl_idname == "LnxLogicTreeType" or tree.bl_idname == 'LnxGroupTree': | ||
|  |             # Use list() to make a "static" copy. It's possible to iterate over it because nodes which get removed | ||
|  |             # from the tree leave python objects in the list | ||
|  |             for node in list(tree.nodes): | ||
|  |                 # Blender nodes (layout) | ||
|  |                 if not isinstance(node, lnx_nodes.LnxLogicTreeNode): | ||
|  |                     continue | ||
|  | 
 | ||
|  |                 # That node has been removed from the tree without replace() being called on it somehow | ||
|  |                 elif node.type == '': | ||
|  |                     continue | ||
|  | 
 | ||
|  |                 # Node type deleted. That's unusual. Or it has been replaced for a looong time | ||
|  |                 elif not node.is_registered_node_type(): | ||
|  |                     replacement_errors.append(('unregistered', None, tree.name, None)) | ||
|  | 
 | ||
|  |                 # Invalid version number | ||
|  |                 elif not isinstance(type(node).lnx_version, int): | ||
|  |                     replacement_errors.append(('bad version', node.bl_idname, tree.name, None)) | ||
|  | 
 | ||
|  |                 # Actual replacement | ||
|  |                 elif node.lnx_version < type(node).lnx_version: | ||
|  |                     try: | ||
|  |                         replace(tree, node) | ||
|  |                     except LookupError as err: | ||
|  |                         replacement_errors.append(('update failed', node.bl_idname, tree.name, traceback.format_exc())) | ||
|  |                     except Exception as err: | ||
|  |                         replacement_errors.append(('misc.', node.bl_idname, tree.name, traceback.format_exc())) | ||
|  | 
 | ||
|  |                 # Node version is newer than supported by the class | ||
|  |                 elif node.lnx_version > type(node).lnx_version: | ||
|  |                     replacement_errors.append(('future version', node.bl_idname, tree.name, None)) | ||
|  | 
 | ||
|  |     # If possible, make a popup about the errors and write an error report into the .blend file's folder | ||
|  |     if len(replacement_errors) > 0: | ||
|  |         basedir = os.path.dirname(bpy.data.filepath) | ||
|  |         reportfile = os.path.join( | ||
|  |             basedir, 'node_update_failure.{:s}.txt'.format( | ||
|  |                 time.strftime("%Y-%m-%dT%H-%M-%S%z") | ||
|  |             ) | ||
|  |         ) | ||
|  | 
 | ||
|  |         with open(reportfile, 'w') as reportf: | ||
|  |             for error_type, node_class, tree_name, tb in replacement_errors: | ||
|  |                 if error_type == 'unregistered': | ||
|  |                     print(f"A node whose class doesn't exist was found in node tree \"{tree_name}\"", file=reportf) | ||
|  |                 elif error_type == 'update failed': | ||
|  |                     print(f"A node of type {node_class} in tree \"{tree_name}\" failed to be updated, " | ||
|  |                           f"because there is no (longer?) an update routine for this version of the node. Original exception:" | ||
|  |                           "\n" + tb + "\n", file=reportf) | ||
|  |                 elif error_type == 'future version': | ||
|  |                     print(f"A node of type {node_class} in tree \"{tree_name}\" seemingly comes from a future version of leenkx. " | ||
|  |                           f"Please check whether your version of leenkx is up to date", file=reportf) | ||
|  |                 elif error_type == 'bad version': | ||
|  |                     print(f"A node of type {node_class} in tree \"{tree_name}\" doesn't have version information attached to it. " | ||
|  |                           f"If so, please check that the nodes in the file are compatible with the in-code node classes. " | ||
|  |                           f"If this nodes comes from an add-on, please check that it is compatible with this version of leenkx.", file=reportf) | ||
|  |                 elif error_type == 'misc.': | ||
|  |                     print(f"A node of type {node_class} in tree \"{tree_name}\" failed to be updated, " | ||
|  |                           f"because the node's update procedure itself failed. Original exception:" | ||
|  |                           "\n" + tb + "\n", file=reportf) | ||
|  |                 else: | ||
|  |                     print(f"Whoops, we don't know what this error type (\"{error_type}\") means. You might want to report a bug here. " | ||
|  |                           f"All we know is that it comes form a node of class {node_class} in the node tree called \"{tree_name}\".", file=reportf) | ||
|  | 
 | ||
|  |         log.error(f'There were errors in the node update procedure, a detailed report has been written to {reportfile}') | ||
|  | 
 | ||
|  |         bpy.ops.lnx.show_node_update_errors() | ||
|  | 
 | ||
|  | 
 | ||
|  | def node_compat_sdk2108(): | ||
|  |     """SDK 21.08 broke compatibility with older nodes as nodes now use
 | ||
|  |     custom sockets even for Blender's default data types and custom | ||
|  |     property "constructors". This allows to listen for events for the | ||
|  |     live patch system. | ||
|  | 
 | ||
|  |     In order to update older nodes this routine is used. It creates a | ||
|  |     full copy of the nodes and replaces all properties and sockets with | ||
|  |     their new equivalents. | ||
|  |     """
 | ||
|  |     for tree in bpy.data.node_groups: | ||
|  |         if tree.bl_idname == "LnxLogicTreeType" or tree.bl_idname == 'LnxGroupTree': | ||
|  |             for node in list(tree.nodes): | ||
|  |                 # Don't raise exceptions for invalid unregistered nodes, this | ||
|  |                 # function didn't cause the registration problem if there is one | ||
|  |                 if not node.__class__.is_registered_node_type(): | ||
|  |                     continue | ||
|  | 
 | ||
|  |                 if node.type in ('FRAME', 'REROUTE'): | ||
|  |                     continue | ||
|  | 
 | ||
|  |                 newnode = tree.nodes.new(node.__class__.bl_idname) | ||
|  |                 node_utils.copy_basic_node_props(from_node=node, to_node=newnode) | ||
|  | 
 | ||
|  |                 # Also copy the node's version number to _not_ prevent actual node | ||
|  |                 # replacement after this step | ||
|  |                 newnode.lnx_version = node.lnx_version | ||
|  | 
 | ||
|  |                 # First replace all properties | ||
|  |                 for prop_name, prop in typing.get_type_hints(node.__class__, {}, {}).items(): | ||
|  |                     if isinstance(prop, bpy.props._PropertyDeferred): | ||
|  |                         if hasattr(node, prop_name) and hasattr(newnode, prop_name): | ||
|  |                             setattr(newnode, prop_name, getattr(node, prop_name)) | ||
|  | 
 | ||
|  |                 # Replace sockets with new socket types | ||
|  |                 socket_replacements = { | ||
|  |                     'NodeSocketBool': 'LnxBoolSocket', | ||
|  |                     'NodeSocketColor': 'LnxColorSocket', | ||
|  |                     'NodeSocketFloat': 'LnxFloatSocket', | ||
|  |                     'NodeSocketInt': 'LnxIntSocket', | ||
|  |                     'NodeSocketShader': 'LnxDynamicSocket', | ||
|  |                     'NodeSocketString': 'LnxStringSocket', | ||
|  |                     'NodeSocketVector': 'LnxVectorSocket' | ||
|  |                 } | ||
|  | 
 | ||
|  |                 # Recreate all sockets | ||
|  |                 newnode.inputs.clear() | ||
|  |                 for inp in node.inputs: | ||
|  |                     inp_idname = inp.bl_idname | ||
|  |                     inp_idname = socket_replacements.get(inp_idname, inp_idname) | ||
|  | 
 | ||
|  |                     newinp = newnode.inputs.new(inp_idname, inp.name, identifier=inp.identifier) | ||
|  |                     NodeReplacement.replace_input_socket(tree, inp, newinp) | ||
|  | 
 | ||
|  |                 newnode.outputs.clear() | ||
|  |                 for out in node.outputs: | ||
|  |                     out_idname = out.bl_idname | ||
|  |                     out_idname = socket_replacements.get(out_idname, out_idname) | ||
|  | 
 | ||
|  |                     newout = newnode.outputs.new(out_idname, out.name, identifier=out.identifier) | ||
|  |                     NodeReplacement.replace_output_socket(tree, out, newout) | ||
|  | 
 | ||
|  |                 tree.nodes.remove(node) |