tdaq-develop-2025-02-12
TCPServerBase.cc
1 // #ifndef BEAGLEBONE
2 // #include "otsdaq_cmsburninbox/BeagleBone/BeagleBoneUtils/TCPServerBase.h"
3 // #else
4 #include "otsdaq/NetworkUtilities/TCPServerBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/NetworkUtilities/TCPTransmitterSocket.h"
7 
8 // #endif
9 
10 #include <arpa/inet.h>
11 #include <errno.h> // errno
12 #include <string.h> // errno
13 #include <sys/socket.h>
14 #include <iostream>
15 #include <thread>
16 
17 // #include <sys/socket.h>
18 // #include <netinet/in.h>
19 // #include <netdb.h>
20 
21 using namespace ots;
22 
23 //==============================================================================
24 TCPServerBase::TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients)
25  : fMaxNumberOfClients(maxNumberOfClients), fServerPort(serverPort), fAccept(true)
26 {
27  // 0 or -1 means no restrictions on the number of clients
28  if(fMaxNumberOfClients == 0)
29  fMaxNumberOfClients = (unsigned)-1;
30  // CANNOT GO IN THE CONSTRUCTOR OR IT MIGHT START BEFORE THE CHILD CLASS CONSTRUCTOR IS FULLY CONSTRUCTED
31  // THIS MIGHT RESULT IN THE CALL OF THE VIRTUAL TCPServerBase::acceptConnections
32  // startAccept();
33 }
34 
35 //==============================================================================
36 TCPServerBase::~TCPServerBase(void)
37 {
38  __COUT__ << "Shutting down accept for socket: " << getSocketId() << std::endl;
39  shutdownAccept();
40  while(fAcceptFuture.valid() && fAcceptFuture.wait_for(std::chrono::milliseconds(
41  100)) != std::future_status::ready)
42  {
43  __COUT__ << "Server accept still running" << std::endl;
44  shutdownAccept();
45  }
46  //__COUT__ << "Closing connected client sockets for socket: " << getSocketId() << std::endl;
47  closeClientSockets();
48  //__COUT__ << "Closed all sockets connected to server: " << getSocketId() << std::endl;
49 }
50 
51 //==============================================================================
52 void TCPServerBase::startAccept(void)
53 {
54  // __COUT__ << "Begin startAccept" << std::endl;
55  int opt = 1; // SO_REUSEADDR - man socket(7)
56  if(::setsockopt(getSocketId(), SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int)) == -1)
57  {
58  close();
59  throw std::runtime_error(std::string("Setsockopt: ") + strerror(errno));
60  }
61 
62  struct sockaddr_in serverAddr;
63  bzero((char*)&serverAddr, sizeof(serverAddr));
64  serverAddr.sin_family = AF_INET;
65  serverAddr.sin_port = htons(fServerPort);
66  serverAddr.sin_addr.s_addr = INADDR_ANY;
67 
68  if(::bind(getSocketId(), (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0)
69  {
70  close();
71  throw std::runtime_error(std::string("Bind: ") + strerror(errno));
72  }
73  // freeaddrinfo(serverAddr); // all done with this structure
74 
75  if(::listen(getSocketId(), fMaxConnectionBacklog) != 0)
76  {
77  close();
78  throw std::runtime_error(std::string("Listen: ") + strerror(errno));
79  }
80 
81  fAccept = true;
82  fAcceptFuture =
83  std::async(std::launch::async, &TCPServerBase::acceptConnections, this);
84  // __COUT__ << "Done startAccept" << std::endl;
85 }
86 
88 //==============================================================================
89 int TCPServerBase::accept(bool blocking)
90 {
91  __COUT__ << "Now server accept connections on socket: " << getSocketId() << std::endl;
92  if(getSocketId() == invalidSocketId)
93  {
94  throw std::logic_error(
95  "Accept called on a bad socket object (this object was moved)");
96  }
97 
98  struct sockaddr_storage clientAddress; // connector's address information
99  socklen_t clientAddressSize = sizeof(clientAddress);
100  int clientSocket = invalidSocketId;
101  if(blocking)
102  {
103  //__COUT__ << "Number of connected clients: " << fConnectedClients.size() << std::endl;
104  // clientSocket = ::accept4(getSocketId(),(struct sockaddr *)&clientAddress, &clientAddressSize, 0);
105  // unsigned counter = 0;
106  while(true)
107  {
108  clientSocket = ::accept(
109  getSocketId(), (struct sockaddr*)&clientAddress, &clientAddressSize);
110  pingActiveClients(); // This message is to check if there are clients that disconnected and, if so, they are removed from the client list
111  if(fAccept && fMaxNumberOfClients > 0 &&
112  fConnectedClients.size() >= fMaxNumberOfClients)
113  {
114  send(clientSocket, "Too many clients connected!", 27, 0);
115  ::shutdown(clientSocket, SHUT_WR);
116  continue;
117  }
118  break;
119  }
120  __COUT__ << "fAccept? " << fAccept << std::endl;
121  if(!fAccept)
122  {
123  throw E_SHUTDOWN;
124  }
125  else if(clientSocket == invalidSocketId)
126  {
127  __COUT__ << "New socket invalid?: " << clientSocket << " errno: " << errno
128  << std::endl;
129  throw std::runtime_error(std::string("Accept: ") + strerror(errno));
130  }
131 
132  __COUT__ << "Server just accepted a connection on socket: " << getSocketId()
133  << " Client socket: " << clientSocket << std::endl;
134  return clientSocket;
135  }
136  else
137  {
138  constexpr int sleepMSeconds = 5;
139  constexpr int timeoutSeconds = 0;
140  constexpr int timeoutUSeconds = 1000;
141  struct timeval timeout;
142  timeout.tv_sec = timeoutSeconds;
143  timeout.tv_usec = timeoutUSeconds;
144 
145  fd_set fdSet;
146 
147  while(fAccept)
148  {
149  FD_ZERO(&fdSet);
150  FD_SET(getSocketId(), &fdSet);
151  select(getSocketId() + 1, &fdSet, 0, 0, &timeout);
152 
153  if(FD_ISSET(getSocketId(), &fdSet))
154  {
155  struct sockaddr_in clientAddress;
156  socklen_t socketSize = sizeof(clientAddress);
157  // int newSocketFD = ::accept4(fdServerSocket_,(struct sockaddr*)&clientAddress,&socketSize, (pushOnly_ ? SOCK_NONBLOCK : 0));
158  clientSocket = ::accept(
159  getSocketId(),
160  (struct sockaddr*)&clientAddress,
161  &socketSize); // Blocking since select goes in timeout if there is nothing
162  if(clientSocket == invalidSocketId)
163  {
164  __COUT__ << "New socket invalid?: " << clientSocket
165  << " errno: " << errno << std::endl;
166  throw std::runtime_error(std::string("Accept: ") + strerror(errno));
167  }
168  return clientSocket;
169  }
170  std::this_thread::sleep_for(std::chrono::milliseconds(sleepMSeconds));
171  }
172  throw E_SHUTDOWN;
173  }
174 }
175 
176 //==============================================================================
178 void TCPServerBase::closeClientSockets(void)
179 {
180  for(auto& socket : fConnectedClients)
181  {
182  try
183  {
184  socket.second->sendClose();
185  }
186  catch(const std::exception& e)
187  {
188  // I can get here with the TCPPubishServer because it doesn't keep track of the clients that might have already disconnected
189  // Just do nothing!
190  __COUT__ << e.what() << '\n';
191  }
192 
193  auto clientThread = fConnectedClientsFuture.find(socket.first);
194  if(clientThread != fConnectedClientsFuture.end())
195  clientThread->second.wait(); // Waiting for client thread
196  delete socket.second;
197  }
198  fConnectedClients.clear();
199  fConnectedClientsFuture.clear();
200 }
201 
202 //==============================================================================
203 void TCPServerBase::closeClientSocket(int socket)
204 {
205  // This method is called inside the thread itself so it cannot call the removeClientSocketFuture!!!
206  auto it = fConnectedClients.find(socket);
207  if(it != fConnectedClients.end())
208  {
209  if(it->second->getSocketId() == socket)
210  {
211  try
212  {
213  it->second->sendClose();
214  }
215  catch(const std::exception& e)
216  {
217  // I can get here with the TCPPubishServer because it doesn't keep track of the clients that might have already disconnected
218  // Just do nothing!
219  __COUT__ << e.what() << '\n';
220  }
221  delete it->second;
222  fConnectedClients.erase(it);
223  }
224  else
225  {
226  throw std::runtime_error(std::string(
227  "SocketId in fConnectedClients != socketId in TCPSocket! Impossible!!!"));
228  }
229  }
230 }
231 
232 //==============================================================================
233 void TCPServerBase::broadcastPacket(const char* message, std::size_t length)
234 {
235  broadcastPacket(std::string(message, length));
236 }
237 
238 //==============================================================================
239 void TCPServerBase::broadcastPacket(const std::string& message)
240 {
241  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
242  {
243  try
244  {
245  dynamic_cast<TCPTransmitterSocket*>(it->second)->sendPacket(message);
246  }
247  catch(const std::exception& e)
248  {
249  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
250  // contact Lorenzo Uplegger" << std::endl;
251  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
252  // __COUT__ << "Error: " << e.what() << std::endl;
253  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
254  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
255  delete it->second;
256  fConnectedClients.erase(it--);
257  }
258  }
259 }
260 
261 //========================================================================================================================
262 void TCPServerBase::broadcast(const char* message, std::size_t length)
263 {
264  // std::lock_guard<std::mutex> lock(clientsMutex_);
265  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
266  {
267  try
268  {
269  dynamic_cast<TCPTransmitterSocket*>(it->second)->send(message, length);
270  }
271  catch(const std::exception& e)
272  {
273  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
274  // contact Lorenzo Uplegger" << std::endl;
275  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
276  // __COUT__ << "Error: " << e.what() << std::endl;
277  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
278  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
279  delete it->second;
280  fConnectedClients.erase(it--);
281  }
282  }
283 }
284 
285 //==============================================================================
286 void TCPServerBase::broadcast(const std::string& message)
287 {
288  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
289  {
290  try
291  {
292  dynamic_cast<TCPTransmitterSocket*>(it->second)->send(message);
293  }
294  catch(const std::exception& e)
295  {
296  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
297  // contact Lorenzo Uplegger" << std::endl;
298  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
299  // __COUT__ << "Error: " << e.what() << std::endl;
300  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
301  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
302  delete it->second;
303  fConnectedClients.erase(it--);
304  }
305  }
306 }
307 
308 //==============================================================================
309 void TCPServerBase::broadcast(const std::vector<char>& message)
310 {
311  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
312  {
313  try
314  {
315  dynamic_cast<TCPTransmitterSocket*>(it->second)->send(message);
316  }
317  catch(const std::exception& e)
318  {
319  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
320  // contact Lorenzo Uplegger" << std::endl;
321  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
322  // __COUT__ << "Error: " << e.what() << std::endl;
323  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
324  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
325  delete it->second;
326  fConnectedClients.erase(it--);
327  }
328  }
329 }
330 
331 //==============================================================================
332 void TCPServerBase::broadcast(const std::vector<uint16_t>& message)
333 {
334  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
335  {
336  try
337  {
338  dynamic_cast<TCPTransmitterSocket*>(it->second)->send(message);
339  }
340  catch(const std::exception& e)
341  {
342  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
343  // __COUT__ << "Error: " << e.what() << std::endl;
344  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
345  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
346  delete it->second;
347  fConnectedClients.erase(it--);
348  }
349  }
350 }
351 
352 //==============================================================================
353 void TCPServerBase::pingActiveClients()
354 {
355  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
356  {
357  try
358  {
359  dynamic_cast<TCPTransmitterSocket*>(it->second)->send("", 0, true);
360  }
361  catch(const std::exception& e)
362  {
363  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
364  // contact Lorenzo Uplegger" << std::endl;
365  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
366  // __COUT__ << "Error: " << e.what() << std::endl;
367  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
368  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
369  delete it->second;
370  fConnectedClients.erase(it--);
371  }
372  }
373 }
374 
375 //==============================================================================
376 void TCPServerBase::shutdownAccept()
377 {
378  fAccept = false;
379  shutdown(getSocketId(), SHUT_RD);
380 }
TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients=0)
Means as many unsigned allows.
A class that can write to a socket.