4 #include "otsdaq/NetworkUtilities/TCPServerBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/NetworkUtilities/TCPTransmitterSocket.h"
10 #include <arpa/inet.h>
13 #include <sys/socket.h>
25 : fMaxNumberOfClients(maxNumberOfClients), fServerPort(serverPort), fAccept(true)
28 if(fMaxNumberOfClients == 0)
29 fMaxNumberOfClients = (unsigned)-1;
36 TCPServerBase::~TCPServerBase(
void)
38 __COUT__ <<
"Shutting down accept for socket: " << getSocketId() << std::endl;
40 while(fAcceptFuture.valid() && fAcceptFuture.wait_for(std::chrono::milliseconds(
41 100)) != std::future_status::ready)
43 __COUT__ <<
"Server accept still running" << std::endl;
52 void TCPServerBase::startAccept(
void)
56 if(::setsockopt(getSocketId(), SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(
int)) == -1)
59 throw std::runtime_error(std::string(
"Setsockopt: ") + strerror(errno));
62 struct sockaddr_in serverAddr;
63 bzero((
char*)&serverAddr,
sizeof(serverAddr));
64 serverAddr.sin_family = AF_INET;
65 serverAddr.sin_port = htons(fServerPort);
66 serverAddr.sin_addr.s_addr = INADDR_ANY;
68 if(::bind(getSocketId(), (
struct sockaddr*)&serverAddr,
sizeof(serverAddr)) != 0)
71 throw std::runtime_error(std::string(
"Bind: ") + strerror(errno));
75 if(::listen(getSocketId(), fMaxConnectionBacklog) != 0)
78 throw std::runtime_error(std::string(
"Listen: ") + strerror(errno));
83 std::async(std::launch::async, &TCPServerBase::acceptConnections,
this);
89 int TCPServerBase::accept(
bool blocking)
91 __COUT__ <<
"Now server accept connections on socket: " << getSocketId() << std::endl;
92 if(getSocketId() == invalidSocketId)
94 throw std::logic_error(
95 "Accept called on a bad socket object (this object was moved)");
98 struct sockaddr_storage clientAddress;
99 socklen_t clientAddressSize =
sizeof(clientAddress);
100 int clientSocket = invalidSocketId;
108 clientSocket = ::accept(
109 getSocketId(), (
struct sockaddr*)&clientAddress, &clientAddressSize);
111 if(fAccept && fMaxNumberOfClients > 0 &&
112 fConnectedClients.size() >= fMaxNumberOfClients)
114 send(clientSocket,
"Too many clients connected!", 27, 0);
115 ::shutdown(clientSocket, SHUT_WR);
120 __COUT__ <<
"fAccept? " << fAccept << std::endl;
125 else if(clientSocket == invalidSocketId)
127 __COUT__ <<
"New socket invalid?: " << clientSocket <<
" errno: " << errno
129 throw std::runtime_error(std::string(
"Accept: ") + strerror(errno));
132 __COUT__ <<
"Server just accepted a connection on socket: " << getSocketId()
133 <<
" Client socket: " << clientSocket << std::endl;
138 constexpr
int sleepMSeconds = 5;
139 constexpr
int timeoutSeconds = 0;
140 constexpr
int timeoutUSeconds = 1000;
141 struct timeval timeout;
142 timeout.tv_sec = timeoutSeconds;
143 timeout.tv_usec = timeoutUSeconds;
150 FD_SET(getSocketId(), &fdSet);
151 select(getSocketId() + 1, &fdSet, 0, 0, &timeout);
153 if(FD_ISSET(getSocketId(), &fdSet))
155 struct sockaddr_in clientAddress;
156 socklen_t socketSize =
sizeof(clientAddress);
158 clientSocket = ::accept(
160 (
struct sockaddr*)&clientAddress,
162 if(clientSocket == invalidSocketId)
164 __COUT__ <<
"New socket invalid?: " << clientSocket
165 <<
" errno: " << errno << std::endl;
166 throw std::runtime_error(std::string(
"Accept: ") + strerror(errno));
170 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMSeconds));
178 void TCPServerBase::closeClientSockets(
void)
180 for(
auto& socket : fConnectedClients)
184 socket.second->sendClose();
186 catch(
const std::exception& e)
190 __COUT__ << e.what() <<
'\n';
193 auto clientThread = fConnectedClientsFuture.find(socket.first);
194 if(clientThread != fConnectedClientsFuture.end())
195 clientThread->second.wait();
196 delete socket.second;
198 fConnectedClients.clear();
199 fConnectedClientsFuture.clear();
203 void TCPServerBase::closeClientSocket(
int socket)
206 auto it = fConnectedClients.find(socket);
207 if(it != fConnectedClients.end())
209 if(it->second->getSocketId() == socket)
213 it->second->sendClose();
215 catch(
const std::exception& e)
219 __COUT__ << e.what() <<
'\n';
222 fConnectedClients.erase(it);
226 throw std::runtime_error(std::string(
227 "SocketId in fConnectedClients != socketId in TCPSocket! Impossible!!!"));
233 void TCPServerBase::broadcastPacket(
const char* message, std::size_t length)
235 broadcastPacket(std::string(message, length));
239 void TCPServerBase::broadcastPacket(
const std::string& message)
241 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
247 catch(
const std::exception& e)
253 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
254 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
256 fConnectedClients.erase(it--);
262 void TCPServerBase::broadcast(
const char* message, std::size_t length)
265 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
271 catch(
const std::exception& e)
277 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
278 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
280 fConnectedClients.erase(it--);
286 void TCPServerBase::broadcast(
const std::string& message)
288 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
294 catch(
const std::exception& e)
300 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
301 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
303 fConnectedClients.erase(it--);
309 void TCPServerBase::broadcast(
const std::vector<char>& message)
311 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
317 catch(
const std::exception& e)
323 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
324 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
326 fConnectedClients.erase(it--);
332 void TCPServerBase::broadcast(
const std::vector<uint16_t>& message)
334 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
340 catch(
const std::exception& e)
344 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
345 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
347 fConnectedClients.erase(it--);
353 void TCPServerBase::pingActiveClients()
355 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
361 catch(
const std::exception& e)
367 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
368 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
370 fConnectedClients.erase(it--);
376 void TCPServerBase::shutdownAccept()
379 shutdown(getSocketId(), SHUT_RD);
TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients=0)
Means as many unsigned allows.
A class that can write to a socket.