#include "worker.h" #include #include #include #include #include #include #include #include using namespace v8; namespace { const uint32_t worker_data_slot = 0; struct WorkerMessage { char* message; int length; }; struct MessageQueue { std::vector messages; kinc_mutex_t messageMutex; Global messageFunc; }; struct WorkerMessagePort { MessageQueue workerMessages; MessageQueue ownerMessages; bool isTerminated = false; }; struct WorkerData { char fileName[256]; WorkerMessagePort* messagePort; }; struct OwnedWorker { kinc_thread_t* workerThread; WorkerMessagePort* workerMessagePort; }; struct ContextData { std::vector workers; }; struct IntervalFunction { Global function; double interval; double nextCallTime; int id; }; struct IntervalFunctions { std::vector functions; int latestId; }; void set_onmessage(MessageQueue* messageQueue, Isolate* isolate, Local callback) { if (callback->IsFunction()) { messageQueue->messageFunc.Reset(isolate, Local::Cast(callback)); } else if (callback->IsNullOrUndefined()) { messageQueue->messageFunc.Reset(); } else { kinc_log(KINC_LOG_LEVEL_ERROR, "[set_onmessage]: argument is neither a function nor null"); } } void post_message(MessageQueue* messageQueue, Isolate* isolate, Local messageObject) { Local result = JSON::Stringify(isolate->GetCurrentContext(), messageObject).ToLocalChecked(); WorkerMessage message; message.length = result->Length(); message.message = new char[message.length]; result->WriteUtf8(isolate, message.message, message.length); kinc_mutex_lock(&messageQueue->messageMutex); messageQueue->messages.push_back(message); kinc_mutex_unlock(&messageQueue->messageMutex); } void handle_message_queue(Isolate* isolate, const Global& context, MessageQueue* messageQueue) { if (messageQueue->messageFunc.IsEmpty()) { return; } HandleScope scope(isolate); Local current_context = Local::New(isolate, context); Context::Scope context_scope(current_context); kinc_mutex_lock(&messageQueue->messageMutex); for (WorkerMessage& message : messageQueue->messages) { MaybeLocal value = JSON::Parse(current_context, String::NewFromUtf8(isolate, message.message, NewStringType::kNormal, message.length).ToLocalChecked()); delete[] message.message; Local object = Object::New(isolate); object->Set(current_context, String::NewFromUtf8(isolate, "data").ToLocalChecked(), value.ToLocalChecked()); Local args[] = { object }; Local message_func = messageQueue->messageFunc.Get(isolate); message_func->Call(current_context, current_context->Global(), 1, args); } messageQueue->messages.clear(); kinc_mutex_unlock(&messageQueue->messageMutex); } void handle_exception(Isolate* isolate, TryCatch* try_catch) { MaybeLocal trace = try_catch->StackTrace(isolate->GetCurrentContext()); if (trace.IsEmpty()) { v8::String::Utf8Value stack_trace(isolate, try_catch->Message()->Get()); kinc_log(KINC_LOG_LEVEL_ERROR, "uncaught exception %s", *stack_trace); } else { v8::String::Utf8Value stack_trace(isolate, trace.ToLocalChecked()); kinc_log(KINC_LOG_LEVEL_ERROR, "uncaught exception %s", *stack_trace); } } void worker_post_message(const FunctionCallbackInfo& args) { Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); if (args.Length() < 1) { return; } if (args.Length() > 1) { kinc_log(KINC_LOG_LEVEL_WARNING, "Krom workers only support 1 argument for postMessage"); } Local external = Local::Cast(args.Data()); WorkerMessagePort* messagePort = (WorkerMessagePort*)external->Value(); post_message(&messagePort->workerMessages, isolate, args[0]); } void worker_onmessage_get(Local property, const PropertyCallbackInfo& info) { Isolate* isolate = info.GetIsolate(); HandleScope scope(isolate); Local external = Local::Cast(info.Data()); WorkerMessagePort* messagePort = (WorkerMessagePort*)external->Value(); info.GetReturnValue().Set(messagePort->ownerMessages.messageFunc); } void worker_onmessage_set(Local property, Local value, const PropertyCallbackInfo& info) { Isolate* isolate = info.GetIsolate(); HandleScope scope(isolate); Local external = Local::Cast(info.Data()); WorkerMessagePort* messagePort = (WorkerMessagePort*)external->Value(); set_onmessage(&messagePort->ownerMessages, isolate, value); } void worker_add_event_listener(const FunctionCallbackInfo& args) { Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); if (args.Length() < 2) { return; } Local name = Local::Cast(args[0]); char event_name[256]; int length = name->WriteUtf8(isolate, event_name, 255); event_name[length] = 0; if (strcmp(event_name, "message") != 0) { kinc_log(KINC_LOG_LEVEL_WARNING, "Trying to add listener for unknown event %s", event_name); return; } Local external = Local::Cast(args.Data()); WorkerMessagePort* messagePort = (WorkerMessagePort*)external->Value(); set_onmessage(&messagePort->ownerMessages, isolate, args[1]); } void worker_set_interval(const FunctionCallbackInfo& args) { Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); Local external = Local::Cast(args.Data()); IntervalFunctions* interval_functions = (IntervalFunctions*)external->Value(); Local function = Local::Cast(args[0]); double frequency; if (args.Length() > 1) { frequency = Local::Cast(args[1])->Value(); frequency /= 1000.0; } else { frequency = 0.016; } interval_functions->functions.emplace_back(); IntervalFunction& interval_function = interval_functions->functions[interval_functions->functions.size() - 1]; interval_function.function.Reset(isolate, function); interval_function.interval = frequency; interval_function.nextCallTime = kinc_time() + frequency; interval_function.id = interval_functions->latestId; interval_functions->latestId++; args.GetReturnValue().Set(interval_function.id); } void worker_clear_interval(const FunctionCallbackInfo& args) { Local external = Local::Cast(args.Data()); IntervalFunctions* intervalFunctions = (IntervalFunctions*)external->Value(); int id = Local::Cast(args[0])->Value(); for (std::vector::iterator it = intervalFunctions->functions.begin(); it != intervalFunctions->functions.end(); ++it) { if (it->id == id) { it->function.Reset(); intervalFunctions->functions.erase(it); return; } } } void worker_thread_func(void* param) { WorkerData* worker_data = (WorkerData*)param; WorkerMessagePort* message_port = worker_data->messagePort; Isolate::CreateParams create_params; create_params.array_buffer_allocator = v8::ArrayBuffer::Allocator::NewDefaultAllocator(); Isolate* isolate = Isolate::New(create_params); Locker locker(isolate); Isolate::Scope isolate_scope(isolate); HandleScope handle_scope(isolate); Local global = ObjectTemplate::New(isolate); Local extern_message_port = External::New(isolate, message_port); global->Set(String::NewFromUtf8(isolate, "postMessage").ToLocalChecked(), FunctionTemplate::New(isolate, worker_post_message, extern_message_port)); global->SetAccessor(String::NewFromUtf8(isolate, "onmessage").ToLocalChecked(), (AccessorGetterCallback)worker_onmessage_get, (AccessorSetterCallback)worker_onmessage_set, extern_message_port); global->Set(String::NewFromUtf8(isolate, "addEventListener").ToLocalChecked(), FunctionTemplate::New(isolate, worker_add_event_listener, extern_message_port)); IntervalFunctions interval_functions; interval_functions.latestId = 0; Local extern_interval_functions = External::New(isolate, &interval_functions); global->Set(String::NewFromUtf8(isolate, "setInterval").ToLocalChecked(), FunctionTemplate::New(isolate, worker_set_interval, extern_interval_functions)); global->Set(String::NewFromUtf8(isolate, "clearInterval").ToLocalChecked(), FunctionTemplate::New(isolate, worker_clear_interval, extern_interval_functions)); Local context = Context::New(isolate, nullptr, global); Global global_context; global_context.Reset(isolate, context); Context::Scope context_scope(context); bind_worker_class(isolate, global_context); kinc_file_reader_t reader; if (!kinc_file_reader_open(&reader, worker_data->fileName, KINC_FILE_TYPE_ASSET)) { kinc_log(KINC_LOG_LEVEL_ERROR, "Could not load file %s for worker thread", worker_data->fileName); exit(1); } size_t file_size = kinc_file_reader_size(&reader); char* code = new char[file_size + 1]; kinc_file_reader_read(&reader, code, file_size); code[file_size] = 0; kinc_file_reader_close(&reader); Local source = String::NewFromUtf8(isolate, code, NewStringType::kNormal).ToLocalChecked(); TryCatch try_catch(isolate); Local