#include "websocket_bridge.h" #include #include #include #include #include #include #include #include #include #include #include "async_engine.h" #include #ifdef KORE_WINDOWS #ifndef WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN #endif #include #include #include #include #include #pragma comment(lib, "advapi32.lib") #pragma comment(lib, "ws2_32.lib") #endif #ifdef _WIN32 #ifndef WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN #endif #include #include #include #include #endif #ifndef _WIN32 #include #include #include #include #include #include #include #include #include #endif std::map g_websocket_servers; std::mutex g_servers_mutex; int g_next_server_id = 1; // global metrics for debugging std::atomic g_ws_frames_received{0}; std::atomic g_ws_frames_broadcast{0}; std::atomic g_ws_bytes_sent{0}; std::atomic g_ws_bytes_queued{0}; std::atomic g_ws_send_completions{0}; std::atomic g_ws_recv_completions{0}; static const char* WEBSOCKET_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; static const char* base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; std::string base64_encode(const unsigned char* bytes_to_encode, unsigned int in_len) { std::string ret; int i = 0; int j = 0; unsigned char char_array_3[3]; unsigned char char_array_4[4]; while (in_len--) { char_array_3[i++] = *(bytes_to_encode++); if (i == 3) { char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); char_array_4[3] = char_array_3[2] & 0x3f; for(i = 0; (i <4) ; i++) ret += base64_chars[char_array_4[i]]; i = 0; } } if (i) { for(j = i; j < 3; j++) char_array_3[j] = '\0'; char_array_4[0] = (char_array_3[0] & 0xfc) >> 2; char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4); char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6); char_array_4[3] = char_array_3[2] & 0x3f; for (j = 0; (j < i + 1); j++) ret += base64_chars[char_array_4[j]]; while((i++ < 3)) ret += '='; } return ret; } #include #include std::string sha1_binary(const std::string& data) { #ifdef KORE_WINDOWS // CryptoAPI for proper SHA-1 hashing HCRYPTPROV hProv = 0; HCRYPTHASH hHash = 0; DWORD cbHash = 20; // produces 20-byte hash BYTE hash[20]; if (CryptAcquireContext(&hProv, NULL, NULL, PROV_RSA_FULL, CRYPT_VERIFYCONTEXT)) { if (CryptCreateHash(hProv, CALG_SHA1, 0, 0, &hHash)) { if (CryptHashData(hHash, (BYTE*)data.c_str(), (DWORD)data.length(), 0)) { if (CryptGetHashParam(hHash, HP_HASHVAL, hash, &cbHash, 0)) { std::string result((char*)hash, cbHash); CryptDestroyHash(hHash); CryptReleaseContext(hProv, 0); return result; } } CryptDestroyHash(hHash); } CryptReleaseContext(hProv, 0); } kinc_log(KINC_LOG_LEVEL_ERROR, "Windows CryptoAPI SHA-1 failed"); return ""; #else // SHA-1 implementation using OpenSSL unsigned char hash[20]; // also 20-byte hash SHA_CTX sha1_ctx; SHA1_Init(&sha1_ctx); SHA1_Update(&sha1_ctx, data.c_str(), data.length()); SHA1_Final(hash, &sha1_ctx); return std::string((char*)hash, 20); #endif } RunTWebSocketServer::RunTWebSocketServer(int id, const std::string& h, int p, int maxConn) : serverId(id), host(h), port(p), maxConnections(maxConn), isRunning(false), running(false) { #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "RunTWebSocketServer created: %s:%d (id=%d, maxConn=%d)", host.c_str(), port, serverId, maxConnections); #endif } RunTWebSocketServer::~RunTWebSocketServer() { stop(); #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "RunTWebSocketServer destroyed: id=%d", serverId); #endif } bool RunTWebSocketServer::start() { if (isRunning.load()) { #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_WARNING, "WebSocket server %d already running", serverId); #endif return true; } kinc_socket_init(&serverSocket); kinc_socket_options_t options; kinc_socket_options_set_defaults(&options); kinc_socket_set(&serverSocket, host.c_str(), port, KINC_SOCKET_FAMILY_IP4, KINC_SOCKET_PROTOCOL_TCP); if (!kinc_socket_open(&serverSocket, &options)) { kinc_log(KINC_LOG_LEVEL_ERROR, "Failed to open WebSocket server socket on port %d", port); return false; } int reuseaddr = 1; setsockopt(serverSocket.handle, SOL_SOCKET, SO_REUSEADDR, (char*)&reuseaddr, sizeof(reuseaddr)); if (!kinc_socket_bind(&serverSocket)) { kinc_log(KINC_LOG_LEVEL_ERROR, "Failed to bind WebSocket server socket to %s:%d", host.c_str(), port); kinc_socket_destroy(&serverSocket); return false; } if (!kinc_socket_listen(&serverSocket, maxConnections)) { kinc_log(KINC_LOG_LEVEL_ERROR, "Failed to listen on WebSocket server socket"); kinc_socket_destroy(&serverSocket); return false; } #ifdef _WIN32 // create IOCP handle iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (iocpHandle == NULL) { kinc_log(KINC_LOG_LEVEL_ERROR, "Failed to create IOCP handle: %d", GetLastError()); kinc_socket_destroy(&serverSocket); return false; } kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d: IOCP created for event-driven I/O", serverId); #endif running.store(true); isRunning.store(true); broadcastRunning.store(true); // broadcast worker thread broadcastWorker = std::thread(&RunTWebSocketServer::broadcastWorkerLoop, this); // server thread - IOCP on Windows - TODO: polling on other platforms serverThread = std::thread(&RunTWebSocketServer::serverLoop, this); #ifdef _WIN32 // start IOCP event loop thread for async I/O processing iocpThread = std::thread(&RunTWebSocketServer::iocpEventLoop, this); #endif #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d started on %s:%d", serverId, host.c_str(), port); #endif return true; } void RunTWebSocketServer::stop() { if (!isRunning.load()) { return; } running.store(false); isRunning.store(false); broadcastRunning.store(false); #ifdef _WIN32 // close IOCP handle to wake up blocked threads if (iocpHandle != NULL) { CloseHandle(iocpHandle); iocpHandle = NULL; } // wait for IOCP thread to finish if (iocpThread.joinable()) { iocpThread.join(); } #endif // wait for broadcast worker to finish first if (broadcastWorker.joinable()) { broadcastWorker.join(); } // wait for server thread to finish if (serverThread.joinable()) { serverThread.join(); } // close server socket kinc_socket_destroy(&serverSocket); // close all client connections - lock-free cleanup for (size_t i = 0; i < MAX_CLIENTS; i++) { RunTWebSocketClient* client = clients[i].exchange(nullptr, std::memory_order_acq_rel); if (client) { if (client->connected.load(std::memory_order_relaxed)) { kinc_socket_destroy(&client->socket); } delete client; } } clientCount.store(0, std::memory_order_release); #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d stopped", serverId); #endif } void RunTWebSocketServer::serverLoop() { #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d thread started", serverId); #endif while (running.load()) { acceptClients(); #ifdef _WIN32 // windows with IOCP client is processed in iocpEventLoop and only handles accepts using select() to avoid busy waiting fd_set readSet; FD_ZERO(&readSet); FD_SET(serverSocket.handle, &readSet); struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 10000; select(0, &readSet, NULL, NULL, &timeout); #else // TODO: Linux events tick(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); #endif } #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d thread finished", serverId); #endif } void RunTWebSocketServer::acceptClients() { kinc_socket_t clientSocket; unsigned remoteAddress, remotePort; if (kinc_socket_accept(&serverSocket, &clientSocket, &remoteAddress, &remotePort)) { // socket configuration for 1M+ msg/sec #ifdef _WIN32 // large socket buffers 4MB each for high throughput int bufSize = 4 * 1024 * 1024; setsockopt(clientSocket.handle, SOL_SOCKET, SO_SNDBUF, (char*)&bufSize, sizeof(bufSize)); setsockopt(clientSocket.handle, SOL_SOCKET, SO_RCVBUF, (char*)&bufSize, sizeof(bufSize)); int nodelay = 1; setsockopt(clientSocket.handle, IPPROTO_TCP, TCP_NODELAY, (char*)&nodelay, sizeof(nodelay)); // NOTE: do not set non-blocking mode as IOCP handles async I/O differently #else int flags = fcntl(clientSocket.handle, F_GETFL, 0); fcntl(clientSocket.handle, F_SETFL, flags | O_NONBLOCK); int bufSize = 4 * 1024 * 1024; setsockopt(clientSocket.handle, SOL_SOCKET, SO_SNDBUF, &bufSize, sizeof(bufSize)); setsockopt(clientSocket.handle, SOL_SOCKET, SO_RCVBUF, &bufSize, sizeof(bufSize)); int nodelay = 1; setsockopt(clientSocket.handle, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); #endif // LOCK-FREE CLIENT INSERTION const size_t currentCount = clientCount.load(std::memory_order_relaxed); if (currentCount < MAX_CLIENTS && currentCount < (size_t)maxConnections) { static std::atomic nextClientId{1000}; int clientId = serverId * 10000 + nextClientId.fetch_add(1, std::memory_order_relaxed); RunTWebSocketClient* client = new RunTWebSocketClient(clientId, clientSocket); client->remoteAddress = remoteAddress; client->remotePort = remotePort; #ifdef _WIN32 HANDLE result = CreateIoCompletionPort( (HANDLE)clientSocket.handle, iocpHandle, (ULONG_PTR)client, // client pointer as completion key 0 ); if (result == NULL) { kinc_log(KINC_LOG_LEVEL_ERROR, "Failed to associate socket with IOCP: %d", GetLastError()); delete client; kinc_socket_destroy(&clientSocket); return; } #endif // find empty slot to insert atomically for (size_t i = 0; i < MAX_CLIENTS; i++) { RunTWebSocketClient* expected = nullptr; if (clients[i].compare_exchange_weak(expected, client, std::memory_order_release)) { clientCount.fetch_add(1, std::memory_order_release); #ifdef _WIN32 // initial async recv to start the event driven chain postRecv(client); #endif return; } } // no free slots delete client; kinc_socket_destroy(&clientSocket); } else { kinc_socket_destroy(&clientSocket); } } } void RunTWebSocketServer::tick() { const size_t maxClients = clientCount.load(std::memory_order_acquire); for (size_t i = 0; i < MAX_CLIENTS && i <= maxClients; i++) { RunTWebSocketClient* client = clients[i].load(std::memory_order_acquire); if (!client) continue; if (!client->connected.load(std::memory_order_relaxed)) { // auto remove disconnected client RunTWebSocketClient* expected = client; if (clients[i].compare_exchange_weak(expected, nullptr, std::memory_order_release)) { clientCount.fetch_sub(1, std::memory_order_release); delete client; } continue; } processClientData(client); } } void RunTWebSocketServer::processClientData(RunTWebSocketClient* client) { // loop to read ALL available data not just one chunk unsigned fromAddress, fromPort; while (true) { // direct write pointer into ring buffer for zero copy receive size_t contiguousSpace; uint8_t* writePtr = client->recvRingBuffer->getWritePtr(&contiguousSpace); // limit to available contiguous space if (contiguousSpace == 0) { kinc_log(KINC_LOG_LEVEL_WARNING, "WebSocket server %d: Ring buffer full for client %d", serverId, client->clientId); break; } int bytesRead = kinc_socket_receive(&client->socket, writePtr, (int)contiguousSpace, &fromAddress, &fromPort); if (bytesRead < 0) { #ifdef _WIN32 int err = WSAGetLastError(); // WSAEWOULDBLOCK means no data available and is not an error if (err == WSAEWOULDBLOCK) { break; } #else if (errno == EWOULDBLOCK || errno == EAGAIN) { break; } #endif // real error we disconnect client client->connected = false; return; } if (bytesRead == 0) { // connection closed gracefully client->connected = false; return; } client->recvRingBuffer->commitWrite(bytesRead); if (!client->handshakeCompleted) { // copy from ring buffer to handshake buffer uint8_t tempBuf[8192]; size_t available = client->recvRingBuffer->size(); if (available > sizeof(tempBuf)) available = sizeof(tempBuf); client->recvRingBuffer->peek(tempBuf, available); client->handshakeBuffer.assign((char*)tempBuf, available); size_t headerEnd = client->handshakeBuffer.find("\r\n\r\n"); if (headerEnd != std::string::npos) { processWebSocketHandshake(client); client->recvRingBuffer->consume(headerEnd + 4); } } else { processWebSocketFrames(client); } } // TODO: confirm bytesRead <= 0 is normal for non-blocking sockets with no data and actual connection close is bytesRead == 0 after handshake with no pending data } void RunTWebSocketServer::broadcastWorkerLoop() { static thread_local BroadcastMessage messages[50000]; #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d async broadcast worker started", serverId); #endif while (broadcastRunning.load(std::memory_order_relaxed)) { // drain messages size_t messageCount = 0; while (messageCount < 50000) { if (!broadcastQueue.pop(messages[messageCount])) { break; } messageCount++; } if (messageCount == 0) { // yield CPU when no messages for minimal overhead std::this_thread::yield(); continue; } for (size_t msgIdx = 0; msgIdx < messageCount; msgIdx++) { const auto& msg = messages[msgIdx]; const uint8_t* frameData = msg.frameData.data(); const int frameSize = (int)msg.frameData.size(); // Send to all const size_t maxClients = clientCount.load(std::memory_order_acquire); for (size_t i = 0; i < MAX_CLIENTS && i <= maxClients; i++) { RunTWebSocketClient* client = clients[i].load(std::memory_order_acquire); if (client && client->connected.load(std::memory_order_relaxed) && client->handshakeCompleted.load(std::memory_order_relaxed)) { // TODO: Using synchronous send int sent = ::send(client->socket.handle, (const char*)frameData, frameSize, 0); if (sent <= 0) { #ifdef _WIN32 int err = WSAGetLastError(); if (err != WSAEWOULDBLOCK) { client->connected.store(false, std::memory_order_relaxed); } #else if (errno != EWOULDBLOCK && errno != EAGAIN) { client->connected.store(false, std::memory_order_relaxed); } #endif } } } } #ifdef DEBUG_NETWORK if (messageCount >= 5000) { kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket async worker queued %d messages", (int)messageCount); } #endif } } void RunTWebSocketServer::broadcastFrameWithOpcode(const std::string& data, uint8_t opcode, int excludeClientId) { static thread_local std::vector frameBuffer; frameBuffer.clear(); size_t dataLen = data.length(); if (frameBuffer.capacity() < dataLen + 14) { frameBuffer.reserve(dataLen + 14); } // first byte FIN bit 0x80 opcode uint8_t firstByte = (opcode & 0x80) ? opcode : (0x80 | (opcode & 0x0F)); frameBuffer.push_back(firstByte); // payload length encoding if (dataLen < 126) { frameBuffer.push_back((uint8_t)dataLen); } else if (dataLen < 65536) { frameBuffer.push_back(126); frameBuffer.push_back((dataLen >> 8) & 0xFF); frameBuffer.push_back(dataLen & 0xFF); } else { frameBuffer.push_back(127); for (int i = 7; i >= 0; i--) { frameBuffer.push_back((dataLen >> (i * 8)) & 0xFF); } } // payload data frameBuffer.insert(frameBuffer.end(), data.begin(), data.end()); // TODO: send to all connected clients without excludeClientId parameter for (size_t i = 0; i < MAX_CLIENTS; i++) { RunTWebSocketClient* client = clients[i].load(std::memory_order_acquire); if (client && client->connected.load(std::memory_order_acquire) && client->handshakeCompleted.load(std::memory_order_acquire)) { // Note: excludeClientId parameter is used to ignore client sender #ifdef _WIN32 postSend(client, frameBuffer.data(), frameBuffer.size()); #else int sent = ::send(client->socket.handle, (const char*)frameBuffer.data(), (int)frameBuffer.size(), 0); if (sent <= 0) { if (errno != EWOULDBLOCK && errno != EAGAIN) { client->connected.store(false, std::memory_order_release); } } #endif } } } void RunTWebSocketServer::broadcastBinaryFrame(const std::string& binaryData, int excludeClientId) { broadcastFrameWithOpcode(binaryData, 0x82, excludeClientId); } std::vector RunTWebSocketServer::createWebSocketFrame(const std::string& data, uint8_t opcode) { thread_local std::vector frameBuffer; frameBuffer.clear(); // reserve capacity to avoid resizing size_t dataLen = data.length(); size_t estimatedSize = dataLen + 10; if (frameBuffer.capacity() < estimatedSize) { frameBuffer.reserve(estimatedSize * 2); } // first byte must be FIN bit 0x80 if opcode already has FIN bit set dont double set it uint8_t firstByte = (opcode & 0x80) ? opcode : (0x80 | (opcode & 0x0F)); frameBuffer.push_back(firstByte); if (dataLen < 126) { frameBuffer.push_back((uint8_t)dataLen); } else if (dataLen < 65536) { frameBuffer.push_back(126); frameBuffer.push_back((dataLen >> 8) & 0xFF); frameBuffer.push_back(dataLen & 0xFF); } else { frameBuffer.push_back(127); for (int i = 7; i >= 0; i--) { frameBuffer.push_back((dataLen >> (i * 8)) & 0xFF); } } frameBuffer.insert(frameBuffer.end(), data.begin(), data.end()); return frameBuffer; } void RunTWebSocketServer::sendFrameToClients(const std::vector& frame, int excludeClientId) { const size_t maxClients = clientCount.load(std::memory_order_acquire); const uint8_t* frameData = frame.data(); const size_t frameSize = frame.size(); // send to valid clients for (size_t i = 0; i < MAX_CLIENTS && i <= maxClients; i++) { RunTWebSocketClient* client = clients[i].load(std::memory_order_acquire); if (client && client->connected.load(std::memory_order_relaxed) && client->handshakeCompleted.load(std::memory_order_relaxed) && client->clientId != excludeClientId) { #ifdef _WIN32 postSend(client, frameData, frameSize); #else int sent = kinc_socket_send(&client->socket, frameData, static_cast(frameSize)); if (sent <= 0) { client->connected.store(false, std::memory_order_relaxed); } #endif } } } void RunTWebSocketServer::sendToAll(const std::string& data) { broadcastFrameWithOpcode(data, 0x1, -1); } void RunTWebSocketServer::sendToClient(int clientId, const std::string& data) { static thread_local std::vector frameBuffer; frameBuffer.clear(); const size_t dataLen = data.length(); frameBuffer.reserve(dataLen + 10); frameBuffer.push_back(0x81); if (dataLen < 126) { frameBuffer.push_back((uint8_t)dataLen); } else if (dataLen < 65536) { frameBuffer.push_back(126); frameBuffer.push_back((dataLen >> 8) & 0xFF); frameBuffer.push_back(dataLen & 0xFF); } else { frameBuffer.push_back(127); for (int i = 7; i >= 0; i--) { frameBuffer.push_back((dataLen >> (i * 8)) & 0xFF); } } frameBuffer.insert(frameBuffer.end(), data.begin(), data.end()); const size_t maxClients = clientCount.load(std::memory_order_acquire); const uint8_t* frameData = frameBuffer.data(); const size_t frameSize = frameBuffer.size(); for (size_t i = 0; i < MAX_CLIENTS && i <= maxClients; i++) { RunTWebSocketClient* client = clients[i].load(std::memory_order_acquire); if (client && client->clientId == clientId && client->connected.load(std::memory_order_relaxed)) { #ifdef _WIN32 postSend(client, frameData, frameSize); #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d queued %zu bytes to client %d", serverId, frameSize, clientId); #endif #else int sent = kinc_socket_send(&client->socket, frameData, static_cast(frameSize)); if (sent > 0) { #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d sent %zu bytes to client %d", serverId, frameSize, clientId); #endif } else { client->connected.store(false, std::memory_order_relaxed); } #endif break; } } } void RunTWebSocketServer::processWebSocketHandshake(RunTWebSocketClient* client) { #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d processing handshake for client %d", serverId, client->clientId); kinc_log(KINC_LOG_LEVEL_INFO, "Handshake buffer size: %d, content: '%.200s'", (int)client->handshakeBuffer.length(), client->handshakeBuffer.c_str()); // dump first 64 bytes for debugging std::string hexDump; size_t dumpSize = client->handshakeBuffer.length(); if (dumpSize > 64) dumpSize = 64; for (size_t i = 0; i < dumpSize; i++) { char hex[4]; sprintf(hex, "%02x ", (unsigned char)client->handshakeBuffer[i]); hexDump += hex; } kinc_log(KINC_LOG_LEVEL_INFO, "Handshake hex dump: %s", hexDump.c_str()); #endif // extract and find Sec-WebSocket-Key header std::string request = client->handshakeBuffer; std::string websocketKey; size_t keyPos = request.find("Sec-WebSocket-Key: "); if (keyPos != std::string::npos) { keyPos += 19; // skip Sec-WebSocket-Key size_t keyEnd = request.find("\r\n", keyPos); if (keyEnd != std::string::npos) { websocketKey = request.substr(keyPos, keyEnd - keyPos); } } if (websocketKey.empty()) { #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_ERROR, "WebSocket server %d: No Sec-WebSocket-Key found in handshake", serverId); #endif client->connected = false; return; } std::string acceptKey = generateWebSocketAccept(websocketKey); // HTTP 101 Switching Protocols response with CORS headers for browser compatibility std::string response = "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: " + acceptKey + "\r\n" "Access-Control-Allow-Origin: *\r\n" "Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n" "Access-Control-Allow-Headers: Content-Type, Authorization, Sec-WebSocket-Key, Sec-WebSocket-Version, Sec-WebSocket-Protocol\r\n" "\r\n"; // reset ring buffer after handshake to prevent leftover handshake data which can cause unknown opcodes client->recvRingBuffer->reset(); #ifdef _WIN32 // handshake happens before any async operations int bytesSent = ::send(client->socket.handle, response.c_str(), (int)response.length(), 0); #else int bytesSent = kinc_socket_send(&client->socket, (uint8_t*)response.c_str(), (int)response.length()); #endif if (bytesSent > 0) { client->handshakeCompleted.store(true, std::memory_order_release); } else { #ifdef DEBUG_NETWORK kinc_log(KINC_LOG_LEVEL_ERROR, "WebSocket server %d failed to send handshake response to client %d", serverId, client->clientId); #endif client->connected.store(false, std::memory_order_release); } } void RunTWebSocketServer::processWebSocketFrames(RunTWebSocketClient* client) { RingBuffer& ringBuf = *client->recvRingBuffer; while (ringBuf.size() >= 2) { WsFrameInfo frameInfo; WsFrameResult result = parseWsFrameHeader(ringBuf, frameInfo); if (result == WsFrameResult::WS_INCOMPLETE) { break; } if (result == WsFrameResult::WS_ERROR) { ringBuf.consume(1); continue; } if (frameInfo.payload_length > 16 * 1024 * 1024) { kinc_log(KINC_LOG_LEVEL_ERROR, "WebSocket server %d: Rejecting oversized payload (%llu bytes)", serverId, (unsigned long long)frameInfo.payload_length); client->connected = false; break; } std::vector payloadBuf(frameInfo.payload_length); extractWsPayload(ringBuf, frameInfo, payloadBuf.data()); std::string payload((char*)payloadBuf.data(), payloadBuf.size()); g_ws_frames_received.fetch_add(1, std::memory_order_relaxed); switch (frameInfo.opcode) { case 0x0: // continuation case 0x1: // text case 0x2: // bin g_ws_frames_broadcast.fetch_add(1, std::memory_order_relaxed); broadcastFrameWithOpcode(payload, frameInfo.opcode, client->clientId); break; case 0x8: // close frame kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket client %d sent close frame", client->clientId); { uint8_t closeFrame[2] = {0x88, 0x00}; #ifdef _WIN32 postSend(client, closeFrame, 2); #else kinc_socket_send(&client->socket, closeFrame, 2); #endif } client->connected = false; break; case 0x9: // ping frame { // pong response std::vector pongFrame; pongFrame.push_back(0x8A); // pong + FIN if (frameInfo.payload_length < 126) { pongFrame.push_back((uint8_t)frameInfo.payload_length); } else { pongFrame.push_back(126); pongFrame.push_back((frameInfo.payload_length >> 8) & 0xFF); pongFrame.push_back(frameInfo.payload_length & 0xFF); } pongFrame.insert(pongFrame.end(), payloadBuf.begin(), payloadBuf.end()); #ifdef _WIN32 postSend(client, pongFrame.data(), pongFrame.size()); #else kinc_socket_send(&client->socket, pongFrame.data(), (int)pongFrame.size()); #endif } break; case 0xA: // pong frame // Just acknowledge break; default: kinc_log(KINC_LOG_LEVEL_WARNING, "WebSocket server %d: Unknown opcode 0x%X from client %d", serverId, frameInfo.opcode, client->clientId); break; } } } std::string RunTWebSocketServer::generateWebSocketAccept(const std::string& key) { std::string combined = key + WEBSOCKET_MAGIC; std::string hash = sha1_binary(combined); return base64_encode((unsigned char*)hash.c_str(), static_cast(hash.length())); } #ifdef _WIN32 void RunTWebSocketServer::iocpEventLoop() { kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d IOCP event loop started (zero polling)", serverId); DWORD bytesTransferred; ULONG_PTR completionKey; OVERLAPPED* overlapped; while (running.load(std::memory_order_relaxed)) { // block until I/O completes with key difference from polling we sleep until data arrives BOOL result = GetQueuedCompletionStatus( iocpHandle, &bytesTransferred, &completionKey, &overlapped, 100 ); if (!result) { DWORD err = GetLastError(); if (err == WAIT_TIMEOUT) { continue; } if (overlapped == nullptr) { kinc_log(KINC_LOG_LEVEL_ERROR, "IOCP GetQueuedCompletionStatus failed: %d", err); continue; } } if (overlapped == nullptr) { continue; } IOCPOverlapped* iocpOv = CONTAINING_RECORD(overlapped, IOCPOverlapped, overlapped); RunTWebSocketClient* client = reinterpret_cast(completionKey); if (!client || !client->connected.load(std::memory_order_relaxed)) { continue; } switch (iocpOv->opType) { case IOCPOperationType::IOCP_RECV: if (bytesTransferred == 0) { client->connected.store(false, std::memory_order_release); } else { onRecvComplete(client, bytesTransferred); } break; case IOCPOperationType::IOCP_SEND: onSendComplete(client, bytesTransferred); break; default: break; } } kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d IOCP event loop stopped", serverId); } void RunTWebSocketServer::postRecv(RunTWebSocketClient* client) { if (!client || !client->connected.load(std::memory_order_relaxed)) { return; } IOCPOverlapped* ov = client->recvOverlapped.get(); memset(&ov->overlapped, 0, sizeof(OVERLAPPED)); ov->wsaBuf.buf = reinterpret_cast(ov->buffer); ov->wsaBuf.len = sizeof(ov->buffer); ov->opType = IOCPOperationType::IOCP_RECV; DWORD flags = 0; DWORD bytesRecv = 0; int result = WSARecv( client->socket.handle, &ov->wsaBuf, 1, &bytesRecv, &flags, &ov->overlapped, nullptr ); if (result == SOCKET_ERROR) { int err = WSAGetLastError(); if (err != WSA_IO_PENDING) { client->connected.store(false, std::memory_order_release); } // WSA_IO_PENDING is expected, operation will complete asynchronously } } void RunTWebSocketServer::postSend(RunTWebSocketClient* client, const uint8_t* data, size_t len) { if (!client || !client->connected.load(std::memory_order_relaxed)) { return; } std::lock_guard lock(client->sendQueueMutex); // if send is already pending, queue ALL data if (client->sendPending.load(std::memory_order_relaxed)) { client->sendQueue.insert(client->sendQueue.end(), data, data + len); client->queuedBytes.fetch_add(len, std::memory_order_relaxed); g_ws_bytes_queued.fetch_add(len, std::memory_order_relaxed); return; } client->sendPending.store(true, std::memory_order_release); IOCPOverlapped* ov = client->sendOverlapped.get(); memset(&ov->overlapped, 0, sizeof(OVERLAPPED)); // copy what fits to overlapped buffer, queue the rest size_t copyLen = (len > sizeof(ov->buffer)) ? sizeof(ov->buffer) : len; memcpy(ov->buffer, data, copyLen); ov->bufferLen = copyLen; // and queue remaining data if any if (len > copyLen) { client->sendQueue.insert(client->sendQueue.end(), data + copyLen, data + len); client->queuedBytes.fetch_add(len - copyLen, std::memory_order_relaxed); g_ws_bytes_queued.fetch_add(len - copyLen, std::memory_order_relaxed); } ov->wsaBuf.buf = reinterpret_cast(ov->buffer); ov->wsaBuf.len = static_cast(copyLen); ov->opType = IOCPOperationType::IOCP_SEND; DWORD bytesSent = 0; int result = WSASend( client->socket.handle, &ov->wsaBuf, 1, &bytesSent, 0, &ov->overlapped, nullptr ); if (result == SOCKET_ERROR) { int err = WSAGetLastError(); if (err != WSA_IO_PENDING) { client->connected.store(false, std::memory_order_release); client->sendPending.store(false, std::memory_order_release); } } } void RunTWebSocketServer::onRecvComplete(RunTWebSocketClient* client, DWORD bytesTransferred) { if (!client || bytesTransferred == 0) { return; } g_ws_recv_completions.fetch_add(1, std::memory_order_relaxed); IOCPOverlapped* ov = client->recvOverlapped.get(); // copy received data to ring buffer client->recvRingBuffer->write(ov->buffer, bytesTransferred); if (!client->handshakeCompleted.load(std::memory_order_relaxed)) { uint8_t tempBuf[8192]; size_t available = client->recvRingBuffer->size(); if (available > sizeof(tempBuf)) available = sizeof(tempBuf); client->recvRingBuffer->peek(tempBuf, available); client->handshakeBuffer.assign(reinterpret_cast(tempBuf), available); size_t headerEnd = client->handshakeBuffer.find("\r\n\r\n"); if (headerEnd != std::string::npos) { processWebSocketHandshake(client); client->recvRingBuffer->consume(headerEnd + 4); } } else { processWebSocketFrames(client); } postRecv(client); } void RunTWebSocketServer::onSendComplete(RunTWebSocketClient* client, DWORD bytesTransferred) { g_ws_send_completions.fetch_add(1, std::memory_order_relaxed); g_ws_bytes_sent.fetch_add(bytesTransferred, std::memory_order_relaxed); std::lock_guard lock(client->sendQueueMutex); client->sendPending.store(false, std::memory_order_release); // check queue for data to send if (!client->sendQueue.empty()) { // copy queued data to overlapped buffer and send IOCPOverlapped* ov = client->sendOverlapped.get(); memset(&ov->overlapped, 0, sizeof(OVERLAPPED)); size_t copyLen = (client->sendQueue.size() > sizeof(ov->buffer)) ? sizeof(ov->buffer) : client->sendQueue.size(); memcpy(ov->buffer, client->sendQueue.data(), copyLen); ov->bufferLen = copyLen; // remove sent data from queue client->sendQueue.erase(client->sendQueue.begin(), client->sendQueue.begin() + copyLen); client->queuedBytes.store(client->sendQueue.size(), std::memory_order_release); client->sendPending.store(true, std::memory_order_release); ov->wsaBuf.buf = reinterpret_cast(ov->buffer); ov->wsaBuf.len = static_cast(copyLen); ov->opType = IOCPOperationType::IOCP_SEND; DWORD bytesSent = 0; int result = WSASend( client->socket.handle, &ov->wsaBuf, 1, &bytesSent, 0, &ov->overlapped, nullptr ); if (result == SOCKET_ERROR) { int err = WSAGetLastError(); if (err != WSA_IO_PENDING) { client->connected.store(false, std::memory_order_release); client->sendPending.store(false, std::memory_order_release); } } } } #endif // C API Bridge Functions extern "C" { int runt_websocket_server_create(const char* host, int port, int maxConnections) { std::lock_guard lock(g_servers_mutex); kinc_log(KINC_LOG_LEVEL_INFO, "Creating WebSocket server: %s:%d (maxConn=%d)", host, port, maxConnections); int serverId = g_next_server_id++; RunTWebSocketServer* server = new RunTWebSocketServer(serverId, host, port, maxConnections); g_websocket_servers[serverId] = server; kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server created with ID: %d", serverId); return serverId; } bool runt_websocket_server_start(int serverId) { std::lock_guard lock(g_servers_mutex); kinc_log(KINC_LOG_LEVEL_INFO, "Starting WebSocket server with ID: %d", serverId); auto it = g_websocket_servers.find(serverId); if (it != g_websocket_servers.end()) { bool result = it->second->start(); kinc_log(KINC_LOG_LEVEL_INFO, "WebSocket server %d start result: %s", serverId, result ? "SUCCESS" : "FAILED"); return result; } kinc_log(KINC_LOG_LEVEL_ERROR, "WebSocket server %d not found for start", serverId); return false; } void runt_websocket_server_stop(int serverId) { std::lock_guard lock(g_servers_mutex); auto it = g_websocket_servers.find(serverId); if (it != g_websocket_servers.end()) { it->second->stop(); delete it->second; g_websocket_servers.erase(it); kinc_log(KINC_LOG_LEVEL_INFO, "Stopped and removed WebSocket server %d", serverId); } else { kinc_log(KINC_LOG_LEVEL_WARNING, "WebSocket server %d not found for stop", serverId); } } void runt_websocket_server_tick(int serverId) { std::lock_guard lock(g_servers_mutex); auto it = g_websocket_servers.find(serverId); if (it != g_websocket_servers.end()) { it->second->tick(); } } void runt_websocket_server_send_all(int serverId, const char* data) { std::lock_guard lock(g_servers_mutex); auto it = g_websocket_servers.find(serverId); if (it != g_websocket_servers.end()) { it->second->sendToAll(std::string(data)); } else { kinc_log(KINC_LOG_LEVEL_WARNING, "WebSocket server %d not found for send_all", serverId); } } void runt_websocket_server_send_client(int serverId, int clientId, const char* data) { std::lock_guard lock(g_servers_mutex); auto it = g_websocket_servers.find(serverId); if (it != g_websocket_servers.end()) { it->second->sendToClient(clientId, std::string(data)); } else { kinc_log(KINC_LOG_LEVEL_WARNING, "WebSocket server %d not found for send_client", serverId); } } void runt_websocket_server_send_all_binary(int serverId, const char* data, size_t length) { std::lock_guard lock(g_servers_mutex); auto it = g_websocket_servers.find(serverId); if (it != g_websocket_servers.end()) { std::string binaryData(data, length); it->second->broadcastBinaryFrame(binaryData, -1); } else { kinc_log(KINC_LOG_LEVEL_WARNING, "WebSocket server %d not found for send_all_binary", serverId); } } void runt_websocket_server_send_client_binary(int serverId, int clientId, const char* data, size_t length) { std::lock_guard lock(g_servers_mutex); auto it = g_websocket_servers.find(serverId); if (it != g_websocket_servers.end()) { std::string binaryData(data, length); // Create binary frame (opcode 0x02) and send to specific client auto frame = it->second->createWebSocketFrame(binaryData, 0x02); // Find client by ID, not by array index for (size_t i = 0; i < RunTWebSocketServer::MAX_CLIENTS; i++) { RunTWebSocketClient* client = it->second->clients[i].load(); if (client && client->clientId == clientId && client->connected.load()) { #ifdef _WIN32 it->second->postSend(client, frame.data(), frame.size()); #else kinc_socket_send(&client->socket, frame.data(), static_cast(frame.size())); #endif break; } } } else { kinc_log(KINC_LOG_LEVEL_WARNING, "WebSocket server %d not found for send_client_binary", serverId); } } // *** for debugging void runt_websocket_get_metrics(uint64_t* frames_received, uint64_t* frames_broadcast, uint64_t* bytes_sent, uint64_t* bytes_queued, uint64_t* send_completions, uint64_t* recv_completions) { if (frames_received) *frames_received = g_ws_frames_received.load(); if (frames_broadcast) *frames_broadcast = g_ws_frames_broadcast.load(); if (bytes_sent) *bytes_sent = g_ws_bytes_sent.load(); if (bytes_queued) *bytes_queued = g_ws_bytes_queued.load(); if (send_completions) *send_completions = g_ws_send_completions.load(); if (recv_completions) *recv_completions = g_ws_recv_completions.load(); } void runt_websocket_reset_metrics() { g_ws_frames_received.store(0); g_ws_frames_broadcast.store(0); g_ws_bytes_sent.store(0); g_ws_bytes_queued.store(0); g_ws_send_completions.store(0); g_ws_recv_completions.store(0); } void runt_websocket_print_metrics() { kinc_log(KINC_LOG_LEVEL_INFO, "=== WebSocket Metrics ==="); kinc_log(KINC_LOG_LEVEL_INFO, "Frames Received: %llu", (unsigned long long)g_ws_frames_received.load()); kinc_log(KINC_LOG_LEVEL_INFO, "Frames Broadcast: %llu", (unsigned long long)g_ws_frames_broadcast.load()); kinc_log(KINC_LOG_LEVEL_INFO, "Bytes Sent: %llu", (unsigned long long)g_ws_bytes_sent.load()); kinc_log(KINC_LOG_LEVEL_INFO, "Bytes Queued: %llu", (unsigned long long)g_ws_bytes_queued.load()); kinc_log(KINC_LOG_LEVEL_INFO, "Send Completions: %llu", (unsigned long long)g_ws_send_completions.load()); kinc_log(KINC_LOG_LEVEL_INFO, "Recv Completions: %llu", (unsigned long long)g_ws_recv_completions.load()); kinc_log(KINC_LOG_LEVEL_INFO, "========================="); } }