otsdaq  v2_05_02_indev
TCPServerBase.cc
1 //#ifndef BEAGLEBONE
2 //#include "otsdaq_cmsburninbox/BeagleBone/BeagleBoneUtils/TCPServerBase.h"
3 //#else
4 #include "otsdaq/NetworkUtilities/TCPServerBase.h"
5 #include "otsdaq/NetworkUtilities/TCPTransmitterSocket.h"
6 //#endif
7 
8 #include <arpa/inet.h>
9 #include <errno.h> // errno
10 #include <string.h> // errno
11 #include <iostream>
12 
13 using namespace ots;
14 
15 //==============================================================================
16 TCPServerBase::TCPServerBase(int serverPort, unsigned int /*maxNumberOfClients*/)
17  : /*fMaxNumberOfClients(maxNumberOfClients),*/ fAccept(true), fAcceptFuture(fAcceptPromise.get_future())
18 {
19  int opt = 1; // SO_REUSEADDR - man socket(7)
20  if(::setsockopt(getSocketId(), SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) != 0)
21  {
22  close();
23  throw std::runtime_error(std::string("Setsockopt: ") + strerror(errno));
24  }
25 
26  struct sockaddr_in serverAddr;
27  bzero((char*)&serverAddr, sizeof(serverAddr));
28  serverAddr.sin_family = AF_INET;
29  serverAddr.sin_port = htons(serverPort);
30  serverAddr.sin_addr.s_addr = INADDR_ANY;
31 
32  if(::bind(getSocketId(), (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0)
33  {
34  close();
35  throw std::runtime_error(std::string("Bind: ") + strerror(errno));
36  }
37 
38  if(::listen(getSocketId(), fMaxConnectionBacklog) != 0)
39  {
40  close();
41  throw std::runtime_error(std::string("Listen: ") + strerror(errno));
42  }
43  startAccept();
44 }
45 
46 //==============================================================================
47 TCPServerBase::~TCPServerBase(void)
48 {
49  std::cout << __PRETTY_FUNCTION__ << "Shutting down accept for socket: " << getSocketId() << std::endl;
50  shutdownAccept();
51  while(fAcceptFuture.wait_for(std::chrono::milliseconds(100)) != std::future_status::ready)
52  std::cout << __PRETTY_FUNCTION__ << "Server accept still running" << std::endl;
53  std::cout << __PRETTY_FUNCTION__ << "Closing connected client sockets for socket: " << getSocketId() << std::endl;
54  closeClientSockets();
55  std::cout << __PRETTY_FUNCTION__ << "Closed all sockets connected to server: " << getSocketId() << std::endl;
56 }
57 
58 //==============================================================================
59 void TCPServerBase::startAccept(void)
60 {
61  fAccept = true;
62  std::thread thread(&TCPServerBase::acceptConnections, this);
63  thread.detach();
64 }
65 
66 // An accepts waits for a connection and returns the opened socket number
67 //==============================================================================
68 int TCPServerBase::accept(bool blocking)
69 {
70  std::cout << __PRETTY_FUNCTION__ << "Now server accept connections on socket: " << getSocketId() << std::endl;
71  if (getSocketId() == invalidSocketId)
72  {
73  throw std::logic_error("Accept called on a bad socket object (this object was moved)");
74  }
75 
76  struct sockaddr_storage serverStorage;
77  socklen_t addr_size = sizeof serverStorage;
78  int clientSocket = invalidSocketId;
79  if (blocking)
80  {
81  clientSocket = ::accept(getSocketId(), (struct sockaddr *)&serverStorage, &addr_size);
82  std::cout << __LINE__<< "] " << __PRETTY_FUNCTION__ << "fAccept? " << fAccept << std::endl;
83  if (!fAccept)
84  {
85  throw E_SHUTDOWN;
86  }
87  if (clientSocket == invalidSocketId)
88  {
89  std::cout << __PRETTY_FUNCTION__ << "New socket invalid?: " << clientSocket << " errno: " << errno << std::endl;
90  throw std::runtime_error(std::string("Accept: ") + strerror(errno));
91  }
92  std::cout << __PRETTY_FUNCTION__ << "Server just accepted a connection on socket: " << getSocketId() << " Client socket: " << clientSocket << std::endl;
93  return clientSocket;
94  }
95  else
96  {
97  constexpr int sleepMSeconds = 5;
98  constexpr int timeoutSeconds = 0;
99  constexpr int timeoutUSeconds = 1000;
100  struct timeval timeout;
101  timeout.tv_sec = timeoutSeconds;
102  timeout.tv_usec = timeoutUSeconds;
103 
104  fd_set fdSet;
105 
106  while (fAccept)
107  {
108  FD_ZERO(&fdSet);
109  FD_SET(getSocketId(), &fdSet);
110  select(getSocketId() + 1, &fdSet, 0, 0, &timeout);
111 
112  if (FD_ISSET(getSocketId(), &fdSet))
113  {
114  struct sockaddr_in clientAddress;
115  socklen_t socketSize = sizeof(clientAddress);
116  //int newSocketFD = ::accept4(fdServerSocket_,(struct sockaddr*)&clientAddress,&socketSize, (pushOnly_ ? SOCK_NONBLOCK : 0));
117  clientSocket = ::accept(getSocketId(), (struct sockaddr *)&clientAddress, &socketSize); //Blocking since select goes in timeout if there is nothing
118  if (clientSocket == invalidSocketId)
119  {
120  std::cout << __PRETTY_FUNCTION__ << "New socket invalid?: " << clientSocket << " errno: " << errno << std::endl;
121  throw std::runtime_error(std::string("Accept: ") + strerror(errno));
122  }
123  return clientSocket;
124  }
125  std::this_thread::sleep_for(std::chrono::milliseconds(sleepMSeconds));
126  }
127  throw E_SHUTDOWN;
128  }
129 }
130 
131 //==============================================================================
132 void TCPServerBase::closeClientSockets(void)
133 {
134  for(auto& socket : fConnectedClients)
135  {
136  socket.second->sendClose();
137  delete socket.second;
138  }
139  fConnectedClients.clear();
140 }
141 
142 //==============================================================================
143 void TCPServerBase::closeClientSocket(int socket)
144 {
145  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
146  if(it->second->getSocketId() == socket)
147  {
148  it->second->sendClose();
149  delete it->second;
150  fConnectedClients.erase(it--);
151  }
152 }
153 
154 //==============================================================================
155 void TCPServerBase::broadcastPacket(const char* message, std::size_t length)
156 {
157  broadcastPacket(std::string(message,length));
158 }
159 
160 //==============================================================================
161 void TCPServerBase::broadcastPacket(const std::string& message)
162 {
163  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
164  {
165  try
166  {
167  dynamic_cast<TCPTransmitterSocket*>(it->second)->sendPacket(message);
168  }
169  catch(const std::exception& e)
170  {
171  std::cout << __PRETTY_FUNCTION__ << "Error: " << e.what() << std::endl;
172  delete it->second;
173  fConnectedClients.erase(it--);
174  }
175  }
176 }
177 
178 //========================================================================================================================
179 void TCPServerBase::broadcast(const char* message, std::size_t length)
180 {
181  // std::lock_guard<std::mutex> lock(clientsMutex_);
182  for (auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
183  {
184  try
185  {
186  dynamic_cast<TCPTransmitterSocket *>(it->second)->send(message, length);
187  }
188  catch (const std::exception &e)
189  {
190  //std::cout << __PRETTY_FUNCTION__ << "Connection closed with the server! Stop writing!" << std::endl;
191  delete it->second;
192  fConnectedClients.erase(it--);
193  }
194  }
195 }
196 
197 //==============================================================================
198 void TCPServerBase::broadcast(const std::string& message)
199 {
200  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
201  {
202  try
203  {
204  dynamic_cast<TCPTransmitterSocket*>(it->second)->send(message);
205  }
206  catch(const std::exception& e)
207  {
208  std::cout << __PRETTY_FUNCTION__ << "Error: " << e.what() << std::endl;
209  delete it->second;
210  fConnectedClients.erase(it--);
211  }
212  }
213 }
214 
215 //==============================================================================
216 void TCPServerBase::broadcast(const std::vector<char>& message)
217 {
218  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
219  {
220  try
221  {
222  dynamic_cast<TCPTransmitterSocket*>(it->second)->send(message);
223  }
224  catch(const std::exception& e)
225  {
226  std::cout << __PRETTY_FUNCTION__ << "Error: " << e.what() << std::endl;
227  delete it->second;
228  fConnectedClients.erase(it--);
229  }
230  }
231 }
232 
233 //==============================================================================
234 void TCPServerBase::broadcast(const std::vector<uint16_t>& message)
235 {
236  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
237  {
238  try
239  {
240  dynamic_cast<TCPTransmitterSocket*>(it->second)->send(message);
241  }
242  catch(const std::exception& e)
243  {
244  std::cout << __PRETTY_FUNCTION__ << "Error: " << e.what() << std::endl;
245  delete it->second;
246  fConnectedClients.erase(it--);
247  }
248  }
249 }
250 
251 //==============================================================================
252 void TCPServerBase::shutdownAccept()
253 {
254  fAccept = false;
255  shutdown(getSocketId(), SHUT_RD);
256 }