364 lines
8.6 KiB
Haxe
364 lines
8.6 KiB
Haxe
|
/*
|
||
|
* Copyright (C)2005-2019 Haxe Foundation
|
||
|
*
|
||
|
* Permission is hereby granted, free of charge, to any person obtaining a
|
||
|
* copy of this software and associated documentation files (the "Software"),
|
||
|
* to deal in the Software without restriction, including without limitation
|
||
|
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||
|
* and/or sell copies of the Software, and to permit persons to whom the
|
||
|
* Software is furnished to do so, subject to the following conditions:
|
||
|
*
|
||
|
* The above copyright notice and this permission notice shall be included in
|
||
|
* all copies or substantial portions of the Software.
|
||
|
*
|
||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||
|
* DEALINGS IN THE SOFTWARE.
|
||
|
*/
|
||
|
|
||
|
package cpp.net;
|
||
|
|
||
|
import cpp.vm.Thread;
|
||
|
import cpp.net.Poll;
|
||
|
import cpp.vm.Lock;
|
||
|
|
||
|
private typedef ThreadInfos = {
|
||
|
var id:Int;
|
||
|
var t:Thread;
|
||
|
var p:Poll;
|
||
|
var socks:Array<sys.net.Socket>;
|
||
|
}
|
||
|
|
||
|
private typedef ClientInfos<Client> = {
|
||
|
var client:Client;
|
||
|
var sock:sys.net.Socket;
|
||
|
var thread:ThreadInfos;
|
||
|
var buf:haxe.io.Bytes;
|
||
|
var bufpos:Int;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
The ThreadServer can be used to easily create a multithreaded server where each thread polls multiple connections.
|
||
|
To use it, at a minimum you must override or rebind clientConnected, readClientMessage, and clientMessage and you must define your Client and Message.
|
||
|
**/
|
||
|
class ThreadServer<Client, Message> {
|
||
|
var threads:Array<ThreadInfos>;
|
||
|
var sock:sys.net.Socket;
|
||
|
var worker:Thread;
|
||
|
var timer:Thread;
|
||
|
|
||
|
/**
|
||
|
Number of total connections the server will accept.
|
||
|
**/
|
||
|
public var listen:Int;
|
||
|
|
||
|
/**
|
||
|
Number of server threads.
|
||
|
**/
|
||
|
public var nthreads:Int;
|
||
|
|
||
|
/**
|
||
|
Polling timeout.
|
||
|
**/
|
||
|
public var connectLag:Float;
|
||
|
|
||
|
/**
|
||
|
Stream to send error messages.
|
||
|
**/
|
||
|
public var errorOutput:haxe.io.Output;
|
||
|
|
||
|
/**
|
||
|
Space allocated to buffers when they are created.
|
||
|
**/
|
||
|
public var initialBufferSize:Int;
|
||
|
|
||
|
/**
|
||
|
Maximum size of buffered data read from a socket. An exception is thrown if the buffer exceeds this value.
|
||
|
**/
|
||
|
public var maxBufferSize:Int;
|
||
|
|
||
|
/**
|
||
|
Minimum message size.
|
||
|
**/
|
||
|
public var messageHeaderSize:Int;
|
||
|
|
||
|
/**
|
||
|
Time between calls to update.
|
||
|
**/
|
||
|
public var updateTime:Float;
|
||
|
|
||
|
/**
|
||
|
The most sockets a thread will handle.
|
||
|
**/
|
||
|
public var maxSockPerThread:Int;
|
||
|
|
||
|
/**
|
||
|
Creates a ThreadServer.
|
||
|
**/
|
||
|
public function new() {
|
||
|
threads = new Array();
|
||
|
nthreads = if (Sys.systemName() == "Windows") 150 else 10;
|
||
|
messageHeaderSize = 1;
|
||
|
listen = 10;
|
||
|
connectLag = 0.5;
|
||
|
errorOutput = Sys.stderr();
|
||
|
initialBufferSize = (1 << 10);
|
||
|
maxBufferSize = (1 << 16);
|
||
|
maxSockPerThread = 64;
|
||
|
updateTime = 1;
|
||
|
}
|
||
|
|
||
|
function runThread(t) {
|
||
|
while (true) {
|
||
|
try {
|
||
|
loopThread(t);
|
||
|
} catch (e:Dynamic) {
|
||
|
logError(e);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function readClientData(c:ClientInfos<Client>) {
|
||
|
var available = c.buf.length - c.bufpos;
|
||
|
if (available == 0) {
|
||
|
var newsize = c.buf.length * 2;
|
||
|
if (newsize > maxBufferSize) {
|
||
|
newsize = maxBufferSize;
|
||
|
if (c.buf.length == maxBufferSize)
|
||
|
throw "Max buffer size reached";
|
||
|
}
|
||
|
var newbuf = haxe.io.Bytes.alloc(newsize);
|
||
|
newbuf.blit(0, c.buf, 0, c.bufpos);
|
||
|
c.buf = newbuf;
|
||
|
available = newsize - c.bufpos;
|
||
|
}
|
||
|
var bytes = c.sock.input.readBytes(c.buf, c.bufpos, available);
|
||
|
var pos = 0;
|
||
|
var len = c.bufpos + bytes;
|
||
|
while (len >= messageHeaderSize) {
|
||
|
var m = readClientMessage(c.client, c.buf, pos, len);
|
||
|
if (m == null)
|
||
|
break;
|
||
|
pos += m.bytes;
|
||
|
len -= m.bytes;
|
||
|
work(clientMessage.bind(c.client, m.msg));
|
||
|
}
|
||
|
if (pos > 0)
|
||
|
c.buf.blit(0, c.buf, pos, len);
|
||
|
c.bufpos = len;
|
||
|
}
|
||
|
|
||
|
function loopThread(t:ThreadInfos) {
|
||
|
if (t.socks.length > 0)
|
||
|
for (s in t.p.poll(t.socks, connectLag)) {
|
||
|
var infos:ClientInfos<Client> = s.custom;
|
||
|
try {
|
||
|
readClientData(infos);
|
||
|
} catch (e:Dynamic) {
|
||
|
t.socks.remove(s);
|
||
|
if (!Std.isOfType(e, haxe.io.Eof) && !Std.isOfType(e, haxe.io.Error))
|
||
|
logError(e);
|
||
|
work(doClientDisconnected.bind(s, infos.client));
|
||
|
}
|
||
|
}
|
||
|
while (true) {
|
||
|
var m:{s:sys.net.Socket, cnx:Bool} = Thread.readMessage(t.socks.length == 0);
|
||
|
if (m == null)
|
||
|
break;
|
||
|
if (m.cnx)
|
||
|
t.socks.push(m.s);
|
||
|
else if (t.socks.remove(m.s)) {
|
||
|
var infos:ClientInfos<Client> = m.s.custom;
|
||
|
work(doClientDisconnected.bind(m.s, infos.client));
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function doClientDisconnected(s:sys.net.Socket, c) {
|
||
|
try
|
||
|
s.close()
|
||
|
catch (e:Dynamic) {};
|
||
|
clientDisconnected(c);
|
||
|
}
|
||
|
|
||
|
function runWorker() {
|
||
|
while (true) {
|
||
|
var f = Thread.readMessage(true);
|
||
|
try {
|
||
|
f();
|
||
|
} catch (e:Dynamic) {
|
||
|
logError(e);
|
||
|
}
|
||
|
try {
|
||
|
afterEvent();
|
||
|
} catch (e:Dynamic) {
|
||
|
logError(e);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
Internally used to delegate something to the worker thread.
|
||
|
**/
|
||
|
public function work(f:Void->Void) {
|
||
|
worker.sendMessage(f);
|
||
|
}
|
||
|
|
||
|
function logError(e:Dynamic) {
|
||
|
var stack = haxe.CallStack.exceptionStack();
|
||
|
if (Thread.current() == worker)
|
||
|
onError(e, stack);
|
||
|
else
|
||
|
work(onError.bind(e, stack));
|
||
|
}
|
||
|
|
||
|
function addClient(sock:sys.net.Socket) {
|
||
|
var start = Std.random(nthreads);
|
||
|
for (i in 0...nthreads) {
|
||
|
var t = threads[(start + i) % nthreads];
|
||
|
if (t.socks.length < maxSockPerThread) {
|
||
|
var infos:ClientInfos<Client> = {
|
||
|
thread: t,
|
||
|
client: clientConnected(sock),
|
||
|
sock: sock,
|
||
|
buf: haxe.io.Bytes.alloc(initialBufferSize),
|
||
|
bufpos: 0,
|
||
|
};
|
||
|
sock.custom = infos;
|
||
|
infos.thread.t.sendMessage({s: sock, cnx: true});
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
refuseClient(sock);
|
||
|
}
|
||
|
|
||
|
function refuseClient(sock:sys.net.Socket) {
|
||
|
// we have reached maximum number of active clients
|
||
|
sock.close();
|
||
|
}
|
||
|
|
||
|
function runTimer() {
|
||
|
var l = new Lock();
|
||
|
while (true) {
|
||
|
l.wait(updateTime);
|
||
|
work(update);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function init() {
|
||
|
worker = Thread.create(runWorker);
|
||
|
timer = Thread.create(runTimer);
|
||
|
for (i in 0...nthreads) {
|
||
|
var t = {
|
||
|
id: i,
|
||
|
t: null,
|
||
|
socks: new Array(),
|
||
|
p: new Poll(maxSockPerThread),
|
||
|
};
|
||
|
threads.push(t);
|
||
|
t.t = Thread.create(runThread.bind(t));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
Called when the server gets a new connection.
|
||
|
**/
|
||
|
public function addSocket(s:sys.net.Socket) {
|
||
|
s.setBlocking(false);
|
||
|
work(addClient.bind(s));
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
Start the server at the specified host and port.
|
||
|
**/
|
||
|
public function run(host, port) {
|
||
|
sock = new sys.net.Socket();
|
||
|
sock.bind(new sys.net.Host(host), port);
|
||
|
sock.listen(listen);
|
||
|
init();
|
||
|
while (true) {
|
||
|
try {
|
||
|
addSocket(sock.accept());
|
||
|
} catch (e:Dynamic) {
|
||
|
logError(e);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
Send data to a client.
|
||
|
**/
|
||
|
public function sendData(s:sys.net.Socket, data:String) {
|
||
|
try {
|
||
|
s.write(data);
|
||
|
} catch (e:Dynamic) {
|
||
|
stopClient(s);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
Shutdown a client's connection and remove them from the server.
|
||
|
**/
|
||
|
public function stopClient(s:sys.net.Socket) {
|
||
|
var infos:ClientInfos<Client> = s.custom;
|
||
|
try
|
||
|
s.shutdown(true, true)
|
||
|
catch (e:Dynamic) {};
|
||
|
infos.thread.t.sendMessage({s: s, cnx: false});
|
||
|
}
|
||
|
|
||
|
// --- CUSTOMIZABLE API ---
|
||
|
|
||
|
/**
|
||
|
Called when an error has ocurred.
|
||
|
**/
|
||
|
public dynamic function onError(e:Dynamic, stack) {
|
||
|
var estr = try Std.string(e) catch (e2:Dynamic) "???" + try "[" + Std.string(e2) + "]" catch (e:Dynamic) "";
|
||
|
errorOutput.writeString(estr + "\n" + haxe.CallStack.toString(stack));
|
||
|
errorOutput.flush();
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
Called when a client connects. Returns a client object.
|
||
|
**/
|
||
|
public dynamic function clientConnected(s:sys.net.Socket):Client {
|
||
|
return null;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
Called when a client disconnects or an error forces the connection to close.
|
||
|
**/
|
||
|
public dynamic function clientDisconnected(c:Client) {}
|
||
|
|
||
|
/**
|
||
|
Called when data has been read from a socket. This method should try to extract a message from the buffer.
|
||
|
The available data resides in buf, starts at pos, and is len bytes wide. Return the new message and the number of bytes read from the buffer.
|
||
|
If no message could be read, return null.
|
||
|
**/
|
||
|
public dynamic function readClientMessage(c:Client, buf:haxe.io.Bytes, pos:Int, len:Int):{msg:Message, bytes:Int} {
|
||
|
return {
|
||
|
msg: null,
|
||
|
bytes: len,
|
||
|
};
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
Called when a message has been recieved. Message handling code should go here.
|
||
|
**/
|
||
|
public dynamic function clientMessage(c:Client, msg:Message) {}
|
||
|
|
||
|
/**
|
||
|
This method is called periodically. It can be used to do server maintenance.
|
||
|
**/
|
||
|
public dynamic function update() {}
|
||
|
|
||
|
/**
|
||
|
Called after a client connects, disconnects, a message is received, or an update is performed.
|
||
|
**/
|
||
|
public dynamic function afterEvent() {}
|
||
|
}
|