2026-02-20 23:40:15 -08:00
# include "websocket_bridge.h"
# include <kinc/log.h>
# include <kinc/system.h>
# include <kinc/threads/thread.h>
# include <cstring>
# include <chrono>
# include <random>
# include <sstream>
# include <iomanip>
# include <map>
# include <algorithm>
# include "async_engine.h"
# include <thread>
2026-06-10 02:30:31 +00:00
# ifdef KINC_WINDOWS
2026-02-20 23:40:15 -08:00
# ifndef WIN32_LEAN_AND_MEAN
# define WIN32_LEAN_AND_MEAN
# endif
# include <WinSock2.h>
# include <WS2tcpip.h>
# include <mswsock.h>
# include <wincrypt.h>
# include <windows.h>
# pragma comment(lib, "advapi32.lib")
# pragma comment(lib, "ws2_32.lib")
# endif
# ifdef _WIN32
# ifndef WIN32_LEAN_AND_MEAN
# define WIN32_LEAN_AND_MEAN
# endif
# include <WinSock2.h>
# include <WS2tcpip.h>
# include <mswsock.h>
# include <windows.h>
# endif
# ifndef _WIN32
# include <sys/socket.h>
# include <netinet/in.h>
# include <netinet/tcp.h>
# include <arpa/inet.h>
# include <netdb.h>
# include <unistd.h>
# include <fcntl.h>
# include <errno.h>
# include <openssl/sha.h>
# endif
std : : map < int , RunTWebSocketServer * > g_websocket_servers ;
std : : mutex g_servers_mutex ;
int g_next_server_id = 1 ;
// global metrics for debugging
std : : atomic < uint64_t > g_ws_frames_received { 0 } ;
std : : atomic < uint64_t > g_ws_frames_broadcast { 0 } ;
std : : atomic < uint64_t > g_ws_bytes_sent { 0 } ;
std : : atomic < uint64_t > g_ws_bytes_queued { 0 } ;
std : : atomic < uint64_t > g_ws_send_completions { 0 } ;
std : : atomic < uint64_t > g_ws_recv_completions { 0 } ;
static const char * WEBSOCKET_MAGIC = " 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 " ;
static const char * base64_chars = " ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/ " ;
std : : string base64_encode ( const unsigned char * bytes_to_encode , unsigned int in_len ) {
std : : string ret ;
int i = 0 ;
int j = 0 ;
unsigned char char_array_3 [ 3 ] ;
unsigned char char_array_4 [ 4 ] ;
while ( in_len - - ) {
char_array_3 [ i + + ] = * ( bytes_to_encode + + ) ;
if ( i = = 3 ) {
char_array_4 [ 0 ] = ( char_array_3 [ 0 ] & 0xfc ) > > 2 ;
char_array_4 [ 1 ] = ( ( char_array_3 [ 0 ] & 0x03 ) < < 4 ) + ( ( char_array_3 [ 1 ] & 0xf0 ) > > 4 ) ;
char_array_4 [ 2 ] = ( ( char_array_3 [ 1 ] & 0x0f ) < < 2 ) + ( ( char_array_3 [ 2 ] & 0xc0 ) > > 6 ) ;
char_array_4 [ 3 ] = char_array_3 [ 2 ] & 0x3f ;
for ( i = 0 ; ( i < 4 ) ; i + + )
ret + = base64_chars [ char_array_4 [ i ] ] ;
i = 0 ;
}
}
if ( i ) {
for ( j = i ; j < 3 ; j + + )
char_array_3 [ j ] = ' \0 ' ;
char_array_4 [ 0 ] = ( char_array_3 [ 0 ] & 0xfc ) > > 2 ;
char_array_4 [ 1 ] = ( ( char_array_3 [ 0 ] & 0x03 ) < < 4 ) + ( ( char_array_3 [ 1 ] & 0xf0 ) > > 4 ) ;
char_array_4 [ 2 ] = ( ( char_array_3 [ 1 ] & 0x0f ) < < 2 ) + ( ( char_array_3 [ 2 ] & 0xc0 ) > > 6 ) ;
char_array_4 [ 3 ] = char_array_3 [ 2 ] & 0x3f ;
for ( j = 0 ; ( j < i + 1 ) ; j + + )
ret + = base64_chars [ char_array_4 [ j ] ] ;
while ( ( i + + < 3 ) )
ret + = ' = ' ;
}
return ret ;
}
# include <iomanip>
# include <sstream>
std : : string sha1_binary ( const std : : string & data ) {
2026-06-10 02:30:31 +00:00
# ifdef KINC_WINDOWS
2026-02-20 23:40:15 -08:00
// CryptoAPI for proper SHA-1 hashing
HCRYPTPROV hProv = 0 ;
HCRYPTHASH hHash = 0 ;
DWORD cbHash = 20 ; // produces 20-byte hash
BYTE hash [ 20 ] ;
if ( CryptAcquireContext ( & hProv , NULL , NULL , PROV_RSA_FULL , CRYPT_VERIFYCONTEXT ) ) {
if ( CryptCreateHash ( hProv , CALG_SHA1 , 0 , 0 , & hHash ) ) {
if ( CryptHashData ( hHash , ( BYTE * ) data . c_str ( ) , ( DWORD ) data . length ( ) , 0 ) ) {
if ( CryptGetHashParam ( hHash , HP_HASHVAL , hash , & cbHash , 0 ) ) {
std : : string result ( ( char * ) hash , cbHash ) ;
CryptDestroyHash ( hHash ) ;
CryptReleaseContext ( hProv , 0 ) ;
return result ;
}
}
CryptDestroyHash ( hHash ) ;
}
CryptReleaseContext ( hProv , 0 ) ;
}
kinc_log ( KINC_LOG_LEVEL_ERROR , " Windows CryptoAPI SHA-1 failed " ) ;
return " " ;
# else
// SHA-1 implementation using OpenSSL
unsigned char hash [ 20 ] ; // also 20-byte hash
SHA_CTX sha1_ctx ;
SHA1_Init ( & sha1_ctx ) ;
SHA1_Update ( & sha1_ctx , data . c_str ( ) , data . length ( ) ) ;
SHA1_Final ( hash , & sha1_ctx ) ;
return std : : string ( ( char * ) hash , 20 ) ;
# endif
}
RunTWebSocketServer : : RunTWebSocketServer ( int id , const std : : string & h , int p , int maxConn )
: serverId ( id ) , host ( h ) , port ( p ) , maxConnections ( maxConn ) ,
isRunning ( false ) , running ( false ) {
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " RunTWebSocketServer created: %s:%d (id=%d, maxConn=%d) " ,
host . c_str ( ) , port , serverId , maxConnections ) ;
# endif
}
RunTWebSocketServer : : ~ RunTWebSocketServer ( ) {
stop ( ) ;
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " RunTWebSocketServer destroyed: id=%d " , serverId ) ;
# endif
}
bool RunTWebSocketServer : : start ( ) {
if ( isRunning . load ( ) ) {
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_WARNING , " WebSocket server %d already running " , serverId ) ;
# endif
return true ;
}
kinc_socket_init ( & serverSocket ) ;
kinc_socket_options_t options ;
kinc_socket_options_set_defaults ( & options ) ;
kinc_socket_set ( & serverSocket , host . c_str ( ) , port , KINC_SOCKET_FAMILY_IP4 , KINC_SOCKET_PROTOCOL_TCP ) ;
if ( ! kinc_socket_open ( & serverSocket , & options ) ) {
kinc_log ( KINC_LOG_LEVEL_ERROR , " Failed to open WebSocket server socket on port %d " , port ) ;
return false ;
}
int reuseaddr = 1 ;
setsockopt ( serverSocket . handle , SOL_SOCKET , SO_REUSEADDR , ( char * ) & reuseaddr , sizeof ( reuseaddr ) ) ;
if ( ! kinc_socket_bind ( & serverSocket ) ) {
kinc_log ( KINC_LOG_LEVEL_ERROR , " Failed to bind WebSocket server socket to %s:%d " , host . c_str ( ) , port ) ;
kinc_socket_destroy ( & serverSocket ) ;
return false ;
}
if ( ! kinc_socket_listen ( & serverSocket , maxConnections ) ) {
kinc_log ( KINC_LOG_LEVEL_ERROR , " Failed to listen on WebSocket server socket " ) ;
kinc_socket_destroy ( & serverSocket ) ;
return false ;
}
# ifdef _WIN32
// create IOCP handle
iocpHandle = CreateIoCompletionPort ( INVALID_HANDLE_VALUE , NULL , 0 , 0 ) ;
if ( iocpHandle = = NULL ) {
kinc_log ( KINC_LOG_LEVEL_ERROR , " Failed to create IOCP handle: %d " , GetLastError ( ) ) ;
kinc_socket_destroy ( & serverSocket ) ;
return false ;
}
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d: IOCP created for event-driven I/O " , serverId ) ;
# endif
running . store ( true ) ;
isRunning . store ( true ) ;
broadcastRunning . store ( true ) ;
// broadcast worker thread
broadcastWorker = std : : thread ( & RunTWebSocketServer : : broadcastWorkerLoop , this ) ;
// server thread - IOCP on Windows - TODO: polling on other platforms
serverThread = std : : thread ( & RunTWebSocketServer : : serverLoop , this ) ;
# ifdef _WIN32
// start IOCP event loop thread for async I/O processing
iocpThread = std : : thread ( & RunTWebSocketServer : : iocpEventLoop , this ) ;
# endif
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d started on %s:%d " ,
serverId , host . c_str ( ) , port ) ;
# endif
return true ;
}
void RunTWebSocketServer : : stop ( ) {
if ( ! isRunning . load ( ) ) {
return ;
}
running . store ( false ) ;
isRunning . store ( false ) ;
broadcastRunning . store ( false ) ;
# ifdef _WIN32
// close IOCP handle to wake up blocked threads
if ( iocpHandle ! = NULL ) {
CloseHandle ( iocpHandle ) ;
iocpHandle = NULL ;
}
// wait for IOCP thread to finish
if ( iocpThread . joinable ( ) ) {
iocpThread . join ( ) ;
}
# endif
// wait for broadcast worker to finish first
if ( broadcastWorker . joinable ( ) ) {
broadcastWorker . join ( ) ;
}
// wait for server thread to finish
if ( serverThread . joinable ( ) ) {
serverThread . join ( ) ;
}
// close server socket
kinc_socket_destroy ( & serverSocket ) ;
// close all client connections - lock-free cleanup
for ( size_t i = 0 ; i < MAX_CLIENTS ; i + + ) {
RunTWebSocketClient * client = clients [ i ] . exchange ( nullptr , std : : memory_order_acq_rel ) ;
if ( client ) {
if ( client - > connected . load ( std : : memory_order_relaxed ) ) {
kinc_socket_destroy ( & client - > socket ) ;
}
delete client ;
}
}
clientCount . store ( 0 , std : : memory_order_release ) ;
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d stopped " , serverId ) ;
# endif
}
void RunTWebSocketServer : : serverLoop ( ) {
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d thread started " , serverId ) ;
# endif
while ( running . load ( ) ) {
acceptClients ( ) ;
# ifdef _WIN32
// windows with IOCP client is processed in iocpEventLoop and only handles accepts using select() to avoid busy waiting
fd_set readSet ;
FD_ZERO ( & readSet ) ;
FD_SET ( serverSocket . handle , & readSet ) ;
struct timeval timeout ;
timeout . tv_sec = 0 ;
timeout . tv_usec = 10000 ;
select ( 0 , & readSet , NULL , NULL , & timeout ) ;
# else
// TODO: Linux events
tick ( ) ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1 ) ) ;
# endif
}
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d thread finished " , serverId ) ;
# endif
}
void RunTWebSocketServer : : acceptClients ( ) {
kinc_socket_t clientSocket ;
unsigned remoteAddress , remotePort ;
if ( kinc_socket_accept ( & serverSocket , & clientSocket , & remoteAddress , & remotePort ) ) {
// socket configuration for 1M+ msg/sec
# ifdef _WIN32
// large socket buffers 4MB each for high throughput
int bufSize = 4 * 1024 * 1024 ;
setsockopt ( clientSocket . handle , SOL_SOCKET , SO_SNDBUF , ( char * ) & bufSize , sizeof ( bufSize ) ) ;
setsockopt ( clientSocket . handle , SOL_SOCKET , SO_RCVBUF , ( char * ) & bufSize , sizeof ( bufSize ) ) ;
int nodelay = 1 ;
setsockopt ( clientSocket . handle , IPPROTO_TCP , TCP_NODELAY , ( char * ) & nodelay , sizeof ( nodelay ) ) ;
// NOTE: do not set non-blocking mode as IOCP handles async I/O differently
# else
int flags = fcntl ( clientSocket . handle , F_GETFL , 0 ) ;
fcntl ( clientSocket . handle , F_SETFL , flags | O_NONBLOCK ) ;
int bufSize = 4 * 1024 * 1024 ;
setsockopt ( clientSocket . handle , SOL_SOCKET , SO_SNDBUF , & bufSize , sizeof ( bufSize ) ) ;
setsockopt ( clientSocket . handle , SOL_SOCKET , SO_RCVBUF , & bufSize , sizeof ( bufSize ) ) ;
int nodelay = 1 ;
setsockopt ( clientSocket . handle , IPPROTO_TCP , TCP_NODELAY , & nodelay , sizeof ( nodelay ) ) ;
# endif
// LOCK-FREE CLIENT INSERTION
const size_t currentCount = clientCount . load ( std : : memory_order_relaxed ) ;
if ( currentCount < MAX_CLIENTS & & currentCount < ( size_t ) maxConnections ) {
static std : : atomic < int > nextClientId { 1000 } ;
int clientId = serverId * 10000 + nextClientId . fetch_add ( 1 , std : : memory_order_relaxed ) ;
RunTWebSocketClient * client = new RunTWebSocketClient ( clientId , clientSocket ) ;
client - > remoteAddress = remoteAddress ;
client - > remotePort = remotePort ;
# ifdef _WIN32
HANDLE result = CreateIoCompletionPort (
( HANDLE ) clientSocket . handle ,
iocpHandle ,
( ULONG_PTR ) client , // client pointer as completion key
0
) ;
if ( result = = NULL ) {
kinc_log ( KINC_LOG_LEVEL_ERROR , " Failed to associate socket with IOCP: %d " , GetLastError ( ) ) ;
delete client ;
kinc_socket_destroy ( & clientSocket ) ;
return ;
}
# endif
// find empty slot to insert atomically
for ( size_t i = 0 ; i < MAX_CLIENTS ; i + + ) {
RunTWebSocketClient * expected = nullptr ;
if ( clients [ i ] . compare_exchange_weak ( expected , client , std : : memory_order_release ) ) {
clientCount . fetch_add ( 1 , std : : memory_order_release ) ;
# ifdef _WIN32
// initial async recv to start the event driven chain
postRecv ( client ) ;
# endif
return ;
}
}
// no free slots
delete client ;
kinc_socket_destroy ( & clientSocket ) ;
} else {
kinc_socket_destroy ( & clientSocket ) ;
}
}
}
void RunTWebSocketServer : : tick ( ) {
const size_t maxClients = clientCount . load ( std : : memory_order_acquire ) ;
for ( size_t i = 0 ; i < MAX_CLIENTS & & i < = maxClients ; i + + ) {
RunTWebSocketClient * client = clients [ i ] . load ( std : : memory_order_acquire ) ;
if ( ! client ) continue ;
if ( ! client - > connected . load ( std : : memory_order_relaxed ) ) {
// auto remove disconnected client
RunTWebSocketClient * expected = client ;
if ( clients [ i ] . compare_exchange_weak ( expected , nullptr , std : : memory_order_release ) ) {
clientCount . fetch_sub ( 1 , std : : memory_order_release ) ;
delete client ;
}
continue ;
}
processClientData ( client ) ;
}
}
void RunTWebSocketServer : : processClientData ( RunTWebSocketClient * client ) {
// loop to read ALL available data not just one chunk
unsigned fromAddress , fromPort ;
while ( true ) {
// direct write pointer into ring buffer for zero copy receive
size_t contiguousSpace ;
uint8_t * writePtr = client - > recvRingBuffer - > getWritePtr ( & contiguousSpace ) ;
// limit to available contiguous space
if ( contiguousSpace = = 0 ) {
kinc_log ( KINC_LOG_LEVEL_WARNING , " WebSocket server %d: Ring buffer full for client %d " , serverId , client - > clientId ) ;
break ;
}
int bytesRead = kinc_socket_receive ( & client - > socket , writePtr , ( int ) contiguousSpace , & fromAddress , & fromPort ) ;
if ( bytesRead < 0 ) {
# ifdef _WIN32
int err = WSAGetLastError ( ) ;
// WSAEWOULDBLOCK means no data available and is not an error
if ( err = = WSAEWOULDBLOCK ) {
break ;
}
# else
if ( errno = = EWOULDBLOCK | | errno = = EAGAIN ) {
break ;
}
# endif
// real error we disconnect client
client - > connected = false ;
return ;
}
if ( bytesRead = = 0 ) {
// connection closed gracefully
client - > connected = false ;
return ;
}
client - > recvRingBuffer - > commitWrite ( bytesRead ) ;
if ( ! client - > handshakeCompleted ) {
// copy from ring buffer to handshake buffer
uint8_t tempBuf [ 8192 ] ;
size_t available = client - > recvRingBuffer - > size ( ) ;
if ( available > sizeof ( tempBuf ) ) available = sizeof ( tempBuf ) ;
client - > recvRingBuffer - > peek ( tempBuf , available ) ;
client - > handshakeBuffer . assign ( ( char * ) tempBuf , available ) ;
size_t headerEnd = client - > handshakeBuffer . find ( " \r \n \r \n " ) ;
if ( headerEnd ! = std : : string : : npos ) {
processWebSocketHandshake ( client ) ;
client - > recvRingBuffer - > consume ( headerEnd + 4 ) ;
}
} else {
processWebSocketFrames ( client ) ;
}
}
// TODO: confirm bytesRead <= 0 is normal for non-blocking sockets with no data and actual connection close is bytesRead == 0 after handshake with no pending data
}
void RunTWebSocketServer : : broadcastWorkerLoop ( ) {
static thread_local BroadcastMessage messages [ 50000 ] ;
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d async broadcast worker started " , serverId ) ;
# endif
while ( broadcastRunning . load ( std : : memory_order_relaxed ) ) {
// drain messages
size_t messageCount = 0 ;
while ( messageCount < 50000 ) {
if ( ! broadcastQueue . pop ( messages [ messageCount ] ) ) {
break ;
}
messageCount + + ;
}
if ( messageCount = = 0 ) {
// yield CPU when no messages for minimal overhead
std : : this_thread : : yield ( ) ;
continue ;
}
for ( size_t msgIdx = 0 ; msgIdx < messageCount ; msgIdx + + ) {
const auto & msg = messages [ msgIdx ] ;
const uint8_t * frameData = msg . frameData . data ( ) ;
const int frameSize = ( int ) msg . frameData . size ( ) ;
// Send to all
const size_t maxClients = clientCount . load ( std : : memory_order_acquire ) ;
for ( size_t i = 0 ; i < MAX_CLIENTS & & i < = maxClients ; i + + ) {
RunTWebSocketClient * client = clients [ i ] . load ( std : : memory_order_acquire ) ;
if ( client & &
client - > connected . load ( std : : memory_order_relaxed ) & &
client - > handshakeCompleted . load ( std : : memory_order_relaxed ) ) {
// TODO: Using synchronous send
int sent = : : send ( client - > socket . handle , ( const char * ) frameData , frameSize , 0 ) ;
if ( sent < = 0 ) {
# ifdef _WIN32
int err = WSAGetLastError ( ) ;
if ( err ! = WSAEWOULDBLOCK ) {
client - > connected . store ( false , std : : memory_order_relaxed ) ;
}
# else
if ( errno ! = EWOULDBLOCK & & errno ! = EAGAIN ) {
client - > connected . store ( false , std : : memory_order_relaxed ) ;
}
# endif
}
}
}
}
# ifdef DEBUG_NETWORK
if ( messageCount > = 5000 ) {
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket async worker queued %d messages " , ( int ) messageCount ) ;
}
# endif
}
}
void RunTWebSocketServer : : broadcastFrameWithOpcode ( const std : : string & data , uint8_t opcode , int excludeClientId ) {
static thread_local std : : vector < uint8_t > frameBuffer ;
frameBuffer . clear ( ) ;
size_t dataLen = data . length ( ) ;
if ( frameBuffer . capacity ( ) < dataLen + 14 ) {
frameBuffer . reserve ( dataLen + 14 ) ;
}
// first byte FIN bit 0x80 opcode
uint8_t firstByte = ( opcode & 0x80 ) ? opcode : ( 0x80 | ( opcode & 0x0F ) ) ;
frameBuffer . push_back ( firstByte ) ;
// payload length encoding
if ( dataLen < 126 ) {
frameBuffer . push_back ( ( uint8_t ) dataLen ) ;
} else if ( dataLen < 65536 ) {
frameBuffer . push_back ( 126 ) ;
frameBuffer . push_back ( ( dataLen > > 8 ) & 0xFF ) ;
frameBuffer . push_back ( dataLen & 0xFF ) ;
} else {
frameBuffer . push_back ( 127 ) ;
for ( int i = 7 ; i > = 0 ; i - - ) {
frameBuffer . push_back ( ( dataLen > > ( i * 8 ) ) & 0xFF ) ;
}
}
// payload data
frameBuffer . insert ( frameBuffer . end ( ) , data . begin ( ) , data . end ( ) ) ;
// TODO: send to all connected clients without excludeClientId parameter
for ( size_t i = 0 ; i < MAX_CLIENTS ; i + + ) {
RunTWebSocketClient * client = clients [ i ] . load ( std : : memory_order_acquire ) ;
if ( client & &
client - > connected . load ( std : : memory_order_acquire ) & &
client - > handshakeCompleted . load ( std : : memory_order_acquire ) ) {
// Note: excludeClientId parameter is used to ignore client sender
# ifdef _WIN32
postSend ( client , frameBuffer . data ( ) , frameBuffer . size ( ) ) ;
# else
int sent = : : send ( client - > socket . handle , ( const char * ) frameBuffer . data ( ) , ( int ) frameBuffer . size ( ) , 0 ) ;
if ( sent < = 0 ) {
if ( errno ! = EWOULDBLOCK & & errno ! = EAGAIN ) {
client - > connected . store ( false , std : : memory_order_release ) ;
}
}
# endif
}
}
}
void RunTWebSocketServer : : broadcastBinaryFrame ( const std : : string & binaryData , int excludeClientId ) {
broadcastFrameWithOpcode ( binaryData , 0x82 , excludeClientId ) ;
}
std : : vector < uint8_t > RunTWebSocketServer : : createWebSocketFrame ( const std : : string & data , uint8_t opcode ) {
thread_local std : : vector < uint8_t > frameBuffer ;
frameBuffer . clear ( ) ;
// reserve capacity to avoid resizing
size_t dataLen = data . length ( ) ;
size_t estimatedSize = dataLen + 10 ;
if ( frameBuffer . capacity ( ) < estimatedSize ) {
frameBuffer . reserve ( estimatedSize * 2 ) ;
}
// first byte must be FIN bit 0x80 if opcode already has FIN bit set dont double set it
uint8_t firstByte = ( opcode & 0x80 ) ? opcode : ( 0x80 | ( opcode & 0x0F ) ) ;
frameBuffer . push_back ( firstByte ) ;
if ( dataLen < 126 ) {
frameBuffer . push_back ( ( uint8_t ) dataLen ) ;
} else if ( dataLen < 65536 ) {
frameBuffer . push_back ( 126 ) ;
frameBuffer . push_back ( ( dataLen > > 8 ) & 0xFF ) ;
frameBuffer . push_back ( dataLen & 0xFF ) ;
} else {
frameBuffer . push_back ( 127 ) ;
for ( int i = 7 ; i > = 0 ; i - - ) {
frameBuffer . push_back ( ( dataLen > > ( i * 8 ) ) & 0xFF ) ;
}
}
frameBuffer . insert ( frameBuffer . end ( ) , data . begin ( ) , data . end ( ) ) ;
return frameBuffer ;
}
void RunTWebSocketServer : : sendFrameToClients ( const std : : vector < uint8_t > & frame , int excludeClientId ) {
const size_t maxClients = clientCount . load ( std : : memory_order_acquire ) ;
const uint8_t * frameData = frame . data ( ) ;
const size_t frameSize = frame . size ( ) ;
// send to valid clients
for ( size_t i = 0 ; i < MAX_CLIENTS & & i < = maxClients ; i + + ) {
RunTWebSocketClient * client = clients [ i ] . load ( std : : memory_order_acquire ) ;
if ( client & &
client - > connected . load ( std : : memory_order_relaxed ) & &
client - > handshakeCompleted . load ( std : : memory_order_relaxed ) & &
client - > clientId ! = excludeClientId ) {
# ifdef _WIN32
postSend ( client , frameData , frameSize ) ;
# else
int sent = kinc_socket_send ( & client - > socket , frameData , static_cast < int > ( frameSize ) ) ;
if ( sent < = 0 ) {
client - > connected . store ( false , std : : memory_order_relaxed ) ;
}
# endif
}
}
}
void RunTWebSocketServer : : sendToAll ( const std : : string & data ) {
broadcastFrameWithOpcode ( data , 0x1 , - 1 ) ;
}
void RunTWebSocketServer : : sendToClient ( int clientId , const std : : string & data ) {
static thread_local std : : vector < uint8_t > frameBuffer ;
frameBuffer . clear ( ) ;
const size_t dataLen = data . length ( ) ;
frameBuffer . reserve ( dataLen + 10 ) ;
frameBuffer . push_back ( 0x81 ) ;
if ( dataLen < 126 ) {
frameBuffer . push_back ( ( uint8_t ) dataLen ) ;
} else if ( dataLen < 65536 ) {
frameBuffer . push_back ( 126 ) ;
frameBuffer . push_back ( ( dataLen > > 8 ) & 0xFF ) ;
frameBuffer . push_back ( dataLen & 0xFF ) ;
} else {
frameBuffer . push_back ( 127 ) ;
for ( int i = 7 ; i > = 0 ; i - - ) {
frameBuffer . push_back ( ( dataLen > > ( i * 8 ) ) & 0xFF ) ;
}
}
frameBuffer . insert ( frameBuffer . end ( ) , data . begin ( ) , data . end ( ) ) ;
const size_t maxClients = clientCount . load ( std : : memory_order_acquire ) ;
const uint8_t * frameData = frameBuffer . data ( ) ;
const size_t frameSize = frameBuffer . size ( ) ;
for ( size_t i = 0 ; i < MAX_CLIENTS & & i < = maxClients ; i + + ) {
RunTWebSocketClient * client = clients [ i ] . load ( std : : memory_order_acquire ) ;
if ( client & &
client - > clientId = = clientId & &
client - > connected . load ( std : : memory_order_relaxed ) ) {
# ifdef _WIN32
postSend ( client , frameData , frameSize ) ;
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d queued %zu bytes to client %d " ,
serverId , frameSize , clientId ) ;
# endif
# else
int sent = kinc_socket_send ( & client - > socket , frameData , static_cast < int > ( frameSize ) ) ;
if ( sent > 0 ) {
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d sent %zu bytes to client %d " ,
serverId , frameSize , clientId ) ;
# endif
} else {
client - > connected . store ( false , std : : memory_order_relaxed ) ;
}
# endif
break ;
}
}
}
void RunTWebSocketServer : : processWebSocketHandshake ( RunTWebSocketClient * client ) {
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d processing handshake for client %d " , serverId , client - > clientId ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Handshake buffer size: %d, content: '%.200s' " ,
( int ) client - > handshakeBuffer . length ( ) , client - > handshakeBuffer . c_str ( ) ) ;
// dump first 64 bytes for debugging
std : : string hexDump ;
size_t dumpSize = client - > handshakeBuffer . length ( ) ;
if ( dumpSize > 64 ) dumpSize = 64 ;
for ( size_t i = 0 ; i < dumpSize ; i + + ) {
char hex [ 4 ] ;
sprintf ( hex , " %02x " , ( unsigned char ) client - > handshakeBuffer [ i ] ) ;
hexDump + = hex ;
}
kinc_log ( KINC_LOG_LEVEL_INFO , " Handshake hex dump: %s " , hexDump . c_str ( ) ) ;
# endif
// extract and find Sec-WebSocket-Key header
std : : string request = client - > handshakeBuffer ;
std : : string websocketKey ;
size_t keyPos = request . find ( " Sec-WebSocket-Key: " ) ;
if ( keyPos ! = std : : string : : npos ) {
keyPos + = 19 ; // skip Sec-WebSocket-Key
size_t keyEnd = request . find ( " \r \n " , keyPos ) ;
if ( keyEnd ! = std : : string : : npos ) {
websocketKey = request . substr ( keyPos , keyEnd - keyPos ) ;
}
}
if ( websocketKey . empty ( ) ) {
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_ERROR , " WebSocket server %d: No Sec-WebSocket-Key found in handshake " , serverId ) ;
# endif
client - > connected = false ;
return ;
}
std : : string acceptKey = generateWebSocketAccept ( websocketKey ) ;
// HTTP 101 Switching Protocols response with CORS headers for browser compatibility
std : : string response =
" HTTP/1.1 101 Switching Protocols \r \n "
" Upgrade: websocket \r \n "
" Connection: Upgrade \r \n "
" Sec-WebSocket-Accept: " + acceptKey + " \r \n "
" Access-Control-Allow-Origin: * \r \n "
" Access-Control-Allow-Methods: GET, POST, OPTIONS \r \n "
" Access-Control-Allow-Headers: Content-Type, Authorization, Sec-WebSocket-Key, Sec-WebSocket-Version, Sec-WebSocket-Protocol \r \n "
" \r \n " ;
// reset ring buffer after handshake to prevent leftover handshake data which can cause unknown opcodes
client - > recvRingBuffer - > reset ( ) ;
# ifdef _WIN32
// handshake happens before any async operations
int bytesSent = : : send ( client - > socket . handle , response . c_str ( ) , ( int ) response . length ( ) , 0 ) ;
# else
int bytesSent = kinc_socket_send ( & client - > socket , ( uint8_t * ) response . c_str ( ) , ( int ) response . length ( ) ) ;
# endif
if ( bytesSent > 0 ) {
client - > handshakeCompleted . store ( true , std : : memory_order_release ) ;
} else {
# ifdef DEBUG_NETWORK
kinc_log ( KINC_LOG_LEVEL_ERROR , " WebSocket server %d failed to send handshake response to client %d " ,
serverId , client - > clientId ) ;
# endif
client - > connected . store ( false , std : : memory_order_release ) ;
}
}
void RunTWebSocketServer : : processWebSocketFrames ( RunTWebSocketClient * client ) {
RingBuffer & ringBuf = * client - > recvRingBuffer ;
while ( ringBuf . size ( ) > = 2 ) {
WsFrameInfo frameInfo ;
WsFrameResult result = parseWsFrameHeader ( ringBuf , frameInfo ) ;
if ( result = = WsFrameResult : : WS_INCOMPLETE ) {
break ;
}
if ( result = = WsFrameResult : : WS_ERROR ) {
ringBuf . consume ( 1 ) ;
continue ;
}
if ( frameInfo . payload_length > 16 * 1024 * 1024 ) {
kinc_log ( KINC_LOG_LEVEL_ERROR , " WebSocket server %d: Rejecting oversized payload (%llu bytes) " ,
serverId , ( unsigned long long ) frameInfo . payload_length ) ;
client - > connected = false ;
break ;
}
std : : vector < uint8_t > payloadBuf ( frameInfo . payload_length ) ;
extractWsPayload ( ringBuf , frameInfo , payloadBuf . data ( ) ) ;
std : : string payload ( ( char * ) payloadBuf . data ( ) , payloadBuf . size ( ) ) ;
g_ws_frames_received . fetch_add ( 1 , std : : memory_order_relaxed ) ;
switch ( frameInfo . opcode ) {
case 0x0 : // continuation
case 0x1 : // text
case 0x2 : // bin
g_ws_frames_broadcast . fetch_add ( 1 , std : : memory_order_relaxed ) ;
broadcastFrameWithOpcode ( payload , frameInfo . opcode , client - > clientId ) ;
break ;
case 0x8 : // close frame
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket client %d sent close frame " , client - > clientId ) ;
{
uint8_t closeFrame [ 2 ] = { 0x88 , 0x00 } ;
# ifdef _WIN32
postSend ( client , closeFrame , 2 ) ;
# else
kinc_socket_send ( & client - > socket , closeFrame , 2 ) ;
# endif
}
client - > connected = false ;
break ;
case 0x9 : // ping frame
{
// pong response
std : : vector < uint8_t > pongFrame ;
pongFrame . push_back ( 0x8A ) ; // pong + FIN
if ( frameInfo . payload_length < 126 ) {
pongFrame . push_back ( ( uint8_t ) frameInfo . payload_length ) ;
} else {
pongFrame . push_back ( 126 ) ;
pongFrame . push_back ( ( frameInfo . payload_length > > 8 ) & 0xFF ) ;
pongFrame . push_back ( frameInfo . payload_length & 0xFF ) ;
}
pongFrame . insert ( pongFrame . end ( ) , payloadBuf . begin ( ) , payloadBuf . end ( ) ) ;
# ifdef _WIN32
postSend ( client , pongFrame . data ( ) , pongFrame . size ( ) ) ;
# else
kinc_socket_send ( & client - > socket , pongFrame . data ( ) , ( int ) pongFrame . size ( ) ) ;
# endif
}
break ;
case 0xA : // pong frame
// Just acknowledge
break ;
default :
kinc_log ( KINC_LOG_LEVEL_WARNING , " WebSocket server %d: Unknown opcode 0x%X from client %d " ,
serverId , frameInfo . opcode , client - > clientId ) ;
break ;
}
}
}
std : : string RunTWebSocketServer : : generateWebSocketAccept ( const std : : string & key ) {
std : : string combined = key + WEBSOCKET_MAGIC ;
std : : string hash = sha1_binary ( combined ) ;
return base64_encode ( ( unsigned char * ) hash . c_str ( ) , static_cast < unsigned int > ( hash . length ( ) ) ) ;
}
# ifdef _WIN32
void RunTWebSocketServer : : iocpEventLoop ( ) {
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d IOCP event loop started (zero polling) " , serverId ) ;
DWORD bytesTransferred ;
ULONG_PTR completionKey ;
OVERLAPPED * overlapped ;
while ( running . load ( std : : memory_order_relaxed ) ) {
// block until I/O completes with key difference from polling we sleep until data arrives
BOOL result = GetQueuedCompletionStatus (
iocpHandle ,
& bytesTransferred ,
& completionKey ,
& overlapped ,
100
) ;
if ( ! result ) {
DWORD err = GetLastError ( ) ;
if ( err = = WAIT_TIMEOUT ) {
continue ;
}
if ( overlapped = = nullptr ) {
kinc_log ( KINC_LOG_LEVEL_ERROR , " IOCP GetQueuedCompletionStatus failed: %d " , err ) ;
continue ;
}
}
if ( overlapped = = nullptr ) {
continue ;
}
IOCPOverlapped * iocpOv = CONTAINING_RECORD ( overlapped , IOCPOverlapped , overlapped ) ;
RunTWebSocketClient * client = reinterpret_cast < RunTWebSocketClient * > ( completionKey ) ;
if ( ! client | | ! client - > connected . load ( std : : memory_order_relaxed ) ) {
continue ;
}
switch ( iocpOv - > opType ) {
case IOCPOperationType : : IOCP_RECV :
if ( bytesTransferred = = 0 ) {
client - > connected . store ( false , std : : memory_order_release ) ;
} else {
onRecvComplete ( client , bytesTransferred ) ;
}
break ;
case IOCPOperationType : : IOCP_SEND :
onSendComplete ( client , bytesTransferred ) ;
break ;
default :
break ;
}
}
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d IOCP event loop stopped " , serverId ) ;
}
void RunTWebSocketServer : : postRecv ( RunTWebSocketClient * client ) {
if ( ! client | | ! client - > connected . load ( std : : memory_order_relaxed ) ) {
return ;
}
IOCPOverlapped * ov = client - > recvOverlapped . get ( ) ;
memset ( & ov - > overlapped , 0 , sizeof ( OVERLAPPED ) ) ;
ov - > wsaBuf . buf = reinterpret_cast < char * > ( ov - > buffer ) ;
ov - > wsaBuf . len = sizeof ( ov - > buffer ) ;
ov - > opType = IOCPOperationType : : IOCP_RECV ;
DWORD flags = 0 ;
DWORD bytesRecv = 0 ;
int result = WSARecv (
client - > socket . handle ,
& ov - > wsaBuf ,
1 ,
& bytesRecv ,
& flags ,
& ov - > overlapped ,
nullptr
) ;
if ( result = = SOCKET_ERROR ) {
int err = WSAGetLastError ( ) ;
if ( err ! = WSA_IO_PENDING ) {
client - > connected . store ( false , std : : memory_order_release ) ;
}
// WSA_IO_PENDING is expected, operation will complete asynchronously
}
}
void RunTWebSocketServer : : postSend ( RunTWebSocketClient * client , const uint8_t * data , size_t len ) {
if ( ! client | | ! client - > connected . load ( std : : memory_order_relaxed ) ) {
return ;
}
std : : lock_guard < std : : mutex > lock ( client - > sendQueueMutex ) ;
// if send is already pending, queue ALL data
if ( client - > sendPending . load ( std : : memory_order_relaxed ) ) {
client - > sendQueue . insert ( client - > sendQueue . end ( ) , data , data + len ) ;
client - > queuedBytes . fetch_add ( len , std : : memory_order_relaxed ) ;
g_ws_bytes_queued . fetch_add ( len , std : : memory_order_relaxed ) ;
return ;
}
client - > sendPending . store ( true , std : : memory_order_release ) ;
IOCPOverlapped * ov = client - > sendOverlapped . get ( ) ;
memset ( & ov - > overlapped , 0 , sizeof ( OVERLAPPED ) ) ;
// copy what fits to overlapped buffer, queue the rest
size_t copyLen = ( len > sizeof ( ov - > buffer ) ) ? sizeof ( ov - > buffer ) : len ;
memcpy ( ov - > buffer , data , copyLen ) ;
ov - > bufferLen = copyLen ;
// and queue remaining data if any
if ( len > copyLen ) {
client - > sendQueue . insert ( client - > sendQueue . end ( ) , data + copyLen , data + len ) ;
client - > queuedBytes . fetch_add ( len - copyLen , std : : memory_order_relaxed ) ;
g_ws_bytes_queued . fetch_add ( len - copyLen , std : : memory_order_relaxed ) ;
}
ov - > wsaBuf . buf = reinterpret_cast < char * > ( ov - > buffer ) ;
ov - > wsaBuf . len = static_cast < ULONG > ( copyLen ) ;
ov - > opType = IOCPOperationType : : IOCP_SEND ;
DWORD bytesSent = 0 ;
int result = WSASend (
client - > socket . handle ,
& ov - > wsaBuf ,
1 ,
& bytesSent ,
0 ,
& ov - > overlapped ,
nullptr
) ;
if ( result = = SOCKET_ERROR ) {
int err = WSAGetLastError ( ) ;
if ( err ! = WSA_IO_PENDING ) {
client - > connected . store ( false , std : : memory_order_release ) ;
client - > sendPending . store ( false , std : : memory_order_release ) ;
}
}
}
void RunTWebSocketServer : : onRecvComplete ( RunTWebSocketClient * client , DWORD bytesTransferred ) {
if ( ! client | | bytesTransferred = = 0 ) {
return ;
}
g_ws_recv_completions . fetch_add ( 1 , std : : memory_order_relaxed ) ;
IOCPOverlapped * ov = client - > recvOverlapped . get ( ) ;
// copy received data to ring buffer
client - > recvRingBuffer - > write ( ov - > buffer , bytesTransferred ) ;
if ( ! client - > handshakeCompleted . load ( std : : memory_order_relaxed ) ) {
uint8_t tempBuf [ 8192 ] ;
size_t available = client - > recvRingBuffer - > size ( ) ;
if ( available > sizeof ( tempBuf ) ) available = sizeof ( tempBuf ) ;
client - > recvRingBuffer - > peek ( tempBuf , available ) ;
client - > handshakeBuffer . assign ( reinterpret_cast < char * > ( tempBuf ) , available ) ;
size_t headerEnd = client - > handshakeBuffer . find ( " \r \n \r \n " ) ;
if ( headerEnd ! = std : : string : : npos ) {
processWebSocketHandshake ( client ) ;
client - > recvRingBuffer - > consume ( headerEnd + 4 ) ;
}
} else {
processWebSocketFrames ( client ) ;
}
postRecv ( client ) ;
}
void RunTWebSocketServer : : onSendComplete ( RunTWebSocketClient * client , DWORD bytesTransferred ) {
g_ws_send_completions . fetch_add ( 1 , std : : memory_order_relaxed ) ;
g_ws_bytes_sent . fetch_add ( bytesTransferred , std : : memory_order_relaxed ) ;
std : : lock_guard < std : : mutex > lock ( client - > sendQueueMutex ) ;
client - > sendPending . store ( false , std : : memory_order_release ) ;
// check queue for data to send
if ( ! client - > sendQueue . empty ( ) ) {
// copy queued data to overlapped buffer and send
IOCPOverlapped * ov = client - > sendOverlapped . get ( ) ;
memset ( & ov - > overlapped , 0 , sizeof ( OVERLAPPED ) ) ;
size_t copyLen = ( client - > sendQueue . size ( ) > sizeof ( ov - > buffer ) )
? sizeof ( ov - > buffer ) : client - > sendQueue . size ( ) ;
memcpy ( ov - > buffer , client - > sendQueue . data ( ) , copyLen ) ;
ov - > bufferLen = copyLen ;
// remove sent data from queue
client - > sendQueue . erase ( client - > sendQueue . begin ( ) ,
client - > sendQueue . begin ( ) + copyLen ) ;
client - > queuedBytes . store ( client - > sendQueue . size ( ) , std : : memory_order_release ) ;
client - > sendPending . store ( true , std : : memory_order_release ) ;
ov - > wsaBuf . buf = reinterpret_cast < char * > ( ov - > buffer ) ;
ov - > wsaBuf . len = static_cast < ULONG > ( copyLen ) ;
ov - > opType = IOCPOperationType : : IOCP_SEND ;
DWORD bytesSent = 0 ;
int result = WSASend (
client - > socket . handle ,
& ov - > wsaBuf ,
1 ,
& bytesSent ,
0 ,
& ov - > overlapped ,
nullptr
) ;
if ( result = = SOCKET_ERROR ) {
int err = WSAGetLastError ( ) ;
if ( err ! = WSA_IO_PENDING ) {
client - > connected . store ( false , std : : memory_order_release ) ;
client - > sendPending . store ( false , std : : memory_order_release ) ;
}
}
}
}
# endif
// C API Bridge Functions
extern " C " {
int runt_websocket_server_create ( const char * host , int port , int maxConnections ) {
std : : lock_guard < std : : mutex > lock ( g_servers_mutex ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Creating WebSocket server: %s:%d (maxConn=%d) " , host , port , maxConnections ) ;
int serverId = g_next_server_id + + ;
RunTWebSocketServer * server = new RunTWebSocketServer ( serverId , host , port , maxConnections ) ;
g_websocket_servers [ serverId ] = server ;
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server created with ID: %d " , serverId ) ;
return serverId ;
}
bool runt_websocket_server_start ( int serverId ) {
std : : lock_guard < std : : mutex > lock ( g_servers_mutex ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Starting WebSocket server with ID: %d " , serverId ) ;
auto it = g_websocket_servers . find ( serverId ) ;
if ( it ! = g_websocket_servers . end ( ) ) {
bool result = it - > second - > start ( ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " WebSocket server %d start result: %s " , serverId , result ? " SUCCESS " : " FAILED " ) ;
return result ;
}
kinc_log ( KINC_LOG_LEVEL_ERROR , " WebSocket server %d not found for start " , serverId ) ;
return false ;
}
void runt_websocket_server_stop ( int serverId ) {
std : : lock_guard < std : : mutex > lock ( g_servers_mutex ) ;
auto it = g_websocket_servers . find ( serverId ) ;
if ( it ! = g_websocket_servers . end ( ) ) {
it - > second - > stop ( ) ;
delete it - > second ;
g_websocket_servers . erase ( it ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Stopped and removed WebSocket server %d " , serverId ) ;
} else {
kinc_log ( KINC_LOG_LEVEL_WARNING , " WebSocket server %d not found for stop " , serverId ) ;
}
}
void runt_websocket_server_tick ( int serverId ) {
std : : lock_guard < std : : mutex > lock ( g_servers_mutex ) ;
auto it = g_websocket_servers . find ( serverId ) ;
if ( it ! = g_websocket_servers . end ( ) ) {
it - > second - > tick ( ) ;
}
}
void runt_websocket_server_send_all ( int serverId , const char * data ) {
std : : lock_guard < std : : mutex > lock ( g_servers_mutex ) ;
auto it = g_websocket_servers . find ( serverId ) ;
if ( it ! = g_websocket_servers . end ( ) ) {
it - > second - > sendToAll ( std : : string ( data ) ) ;
} else {
kinc_log ( KINC_LOG_LEVEL_WARNING , " WebSocket server %d not found for send_all " , serverId ) ;
}
}
void runt_websocket_server_send_client ( int serverId , int clientId , const char * data ) {
std : : lock_guard < std : : mutex > lock ( g_servers_mutex ) ;
auto it = g_websocket_servers . find ( serverId ) ;
if ( it ! = g_websocket_servers . end ( ) ) {
it - > second - > sendToClient ( clientId , std : : string ( data ) ) ;
} else {
kinc_log ( KINC_LOG_LEVEL_WARNING , " WebSocket server %d not found for send_client " , serverId ) ;
}
}
void runt_websocket_server_send_all_binary ( int serverId , const char * data , size_t length ) {
std : : lock_guard < std : : mutex > lock ( g_servers_mutex ) ;
auto it = g_websocket_servers . find ( serverId ) ;
if ( it ! = g_websocket_servers . end ( ) ) {
std : : string binaryData ( data , length ) ;
it - > second - > broadcastBinaryFrame ( binaryData , - 1 ) ;
} else {
kinc_log ( KINC_LOG_LEVEL_WARNING , " WebSocket server %d not found for send_all_binary " , serverId ) ;
}
}
void runt_websocket_server_send_client_binary ( int serverId , int clientId , const char * data , size_t length ) {
std : : lock_guard < std : : mutex > lock ( g_servers_mutex ) ;
auto it = g_websocket_servers . find ( serverId ) ;
if ( it ! = g_websocket_servers . end ( ) ) {
std : : string binaryData ( data , length ) ;
// Create binary frame (opcode 0x02) and send to specific client
auto frame = it - > second - > createWebSocketFrame ( binaryData , 0x02 ) ;
// Find client by ID, not by array index
for ( size_t i = 0 ; i < RunTWebSocketServer : : MAX_CLIENTS ; i + + ) {
RunTWebSocketClient * client = it - > second - > clients [ i ] . load ( ) ;
if ( client & & client - > clientId = = clientId & & client - > connected . load ( ) ) {
# ifdef _WIN32
it - > second - > postSend ( client , frame . data ( ) , frame . size ( ) ) ;
# else
kinc_socket_send ( & client - > socket , frame . data ( ) , static_cast < int > ( frame . size ( ) ) ) ;
# endif
break ;
}
}
} else {
kinc_log ( KINC_LOG_LEVEL_WARNING , " WebSocket server %d not found for send_client_binary " , serverId ) ;
}
}
// *** for debugging
void runt_websocket_get_metrics ( uint64_t * frames_received , uint64_t * frames_broadcast ,
uint64_t * bytes_sent , uint64_t * bytes_queued ,
uint64_t * send_completions , uint64_t * recv_completions ) {
if ( frames_received ) * frames_received = g_ws_frames_received . load ( ) ;
if ( frames_broadcast ) * frames_broadcast = g_ws_frames_broadcast . load ( ) ;
if ( bytes_sent ) * bytes_sent = g_ws_bytes_sent . load ( ) ;
if ( bytes_queued ) * bytes_queued = g_ws_bytes_queued . load ( ) ;
if ( send_completions ) * send_completions = g_ws_send_completions . load ( ) ;
if ( recv_completions ) * recv_completions = g_ws_recv_completions . load ( ) ;
}
void runt_websocket_reset_metrics ( ) {
g_ws_frames_received . store ( 0 ) ;
g_ws_frames_broadcast . store ( 0 ) ;
g_ws_bytes_sent . store ( 0 ) ;
g_ws_bytes_queued . store ( 0 ) ;
g_ws_send_completions . store ( 0 ) ;
g_ws_recv_completions . store ( 0 ) ;
}
void runt_websocket_print_metrics ( ) {
kinc_log ( KINC_LOG_LEVEL_INFO , " === WebSocket Metrics === " ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Frames Received: %llu " , ( unsigned long long ) g_ws_frames_received . load ( ) ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Frames Broadcast: %llu " , ( unsigned long long ) g_ws_frames_broadcast . load ( ) ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Bytes Sent: %llu " , ( unsigned long long ) g_ws_bytes_sent . load ( ) ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Bytes Queued: %llu " , ( unsigned long long ) g_ws_bytes_queued . load ( ) ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Send Completions: %llu " , ( unsigned long long ) g_ws_send_completions . load ( ) ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " Recv Completions: %llu " , ( unsigned long long ) g_ws_recv_completions . load ( ) ) ;
kinc_log ( KINC_LOG_LEVEL_INFO , " ========================= " ) ;
}
}