Files
LNXSDK/leenkx/blender/lnx/lib/server.py
2026-04-27 19:21:50 -07:00

90 lines
2.6 KiB
Python

import atexit
import gzip
import http.server
import io
import os
import socketserver
import subprocess
haxe_server = None
_GZIP_MIN_SIZE = 1400
_COMPRESSIBLE = {
'text/html', 'text/css', 'text/javascript', 'text/plain',
'application/javascript', 'application/json', 'application/xml',
'application/wasm', 'image/svg+xml',
}
def run_tcp(port: int, do_log: bool):
class HTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
protocol_version = "HTTP/1.1"
def log_message(self, format, *args):
if do_log:
print(format % args)
def end_headers(self):
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Cache-Control', 'no-cache')
super().end_headers()
def do_GET(self):
ae = self.headers.get('Accept-Encoding', '')
if 'gzip' not in ae:
return super().do_GET()
fpath = self.translate_path(self.path)
if os.path.isdir(fpath):
return super().do_GET()
try:
ctype = self.guess_type(fpath)
if ctype.split(';')[0].strip() not in _COMPRESSIBLE:
return super().do_GET()
fsize = os.path.getsize(fpath)
if fsize < _GZIP_MIN_SIZE:
return super().do_GET()
with open(fpath, 'rb') as f:
raw = f.read()
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode='wb', compresslevel=1) as gz:
gz.write(raw)
compressed = buf.getvalue()
self.send_response(200)
self.send_header('Content-Type', ctype)
self.send_header('Content-Length', str(len(compressed)))
self.send_header('Content-Encoding', 'gzip')
self.end_headers()
self.wfile.write(compressed)
except (FileNotFoundError, PermissionError):
self.send_error(404)
class ThreadedHTTPServer(socketserver.ThreadingTCPServer):
allow_reuse_address = True
daemon_threads = True
try:
http_server = ThreadedHTTPServer(("", port), HTTPRequestHandler)
http_server.serve_forever()
except:
print("Server already running")
def run_haxe(haxe_path, port=6000):
global haxe_server
if haxe_server is None:
haxe_server = subprocess.Popen([haxe_path, "--wait", str(port)])
atexit.register(kill_haxe)
def kill_haxe():
global haxe_server
if haxe_server is not None:
haxe_server.kill()
haxe_server = None