tdaq-develop-2025-02-12
ReceiverSocket.cc
1 #include "otsdaq/NetworkUtilities/ReceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
5 
6 #include <iomanip> /* for setfill */
7 #include <iostream>
8 #include <sstream>
9 
10 #include <arpa/inet.h>
11 #include <sys/time.h>
12 
13 using namespace ots;
14 
15 //==============================================================================
16 ReceiverSocket::ReceiverSocket(std::string IPAddress, unsigned int port)
17  : Socket(IPAddress, port)
18  , addressLength_(sizeof(fromAddress_))
19  , numberOfBytes_(0)
20  , readCounter_(0)
21 {
22  __COUT__ << "ReceiverSocket constructor " << IPAddress << ":" << port << __E__;
23 }
24 
25 //==============================================================================
27 ReceiverSocket::ReceiverSocket(void)
28  : addressLength_(sizeof(fromAddress_)), numberOfBytes_(0), readCounter_(0)
29 {
30  __COUT__ << "ReceiverSocket constructor" << __E__;
31 }
32 
33 //==============================================================================
34 ReceiverSocket::~ReceiverSocket(void) {}
35 
36 //==============================================================================
37 std::string ReceiverSocket::getLastIncomingIPAddress(void)
38 {
39  std::string fromIP;
40  for(int i = 0; i < 4; i++)
41  {
42  fromIP += std::to_string((lastIncomingIPAddress_ << (i * 8)) & 0xff);
43  if(i < 3)
44  fromIP += ".";
45  }
46 
47  return fromIP;
48 } //end getLastIncomingIPAddress()
49 //==============================================================================
50 unsigned short ReceiverSocket::getLastIncomingPort(void)
51 {
52  return ntohs(lastIncomingPort_);
53 }
54 
55 //==============================================================================
56 int ReceiverSocket::receive(std::string& buffer,
57  unsigned int timeoutSeconds,
58  unsigned int timeoutUSeconds,
59  bool verbose)
60 {
61  return receive(buffer,
62  lastIncomingIPAddress_,
63  lastIncomingPort_,
64  timeoutSeconds,
65  timeoutUSeconds,
66  verbose);
67 } //end receive()
68 
69 //==============================================================================
73 int ReceiverSocket::receive(std::string& buffer,
74  unsigned long& fromIPAddress,
75  unsigned short& fromPort,
76  unsigned int timeoutSeconds,
77  unsigned int timeoutUSeconds,
78  bool verbose)
79 {
80  // lockout other receivers for the remainder of the scope
81  std::lock_guard<std::mutex> lock(receiveMutex_);
82 
83  // set timeout period for select()
84  timeout_.tv_sec = timeoutSeconds;
85  timeout_.tv_usec = timeoutUSeconds;
86 
87  FD_ZERO(&fileDescriptor_);
88  FD_SET(socketNumber_, &fileDescriptor_);
89  select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
90 
91  if(FD_ISSET(socketNumber_, &fileDescriptor_))
92  {
93  buffer.resize(maxSocketSize_); // NOTE: this is inexpensive according to
94  // Lorenzo/documentation in C++11 (only increases
95  // size once and doesn't decrease size)
96  if((numberOfBytes_ = recvfrom(socketNumber_,
97  &buffer[0],
98  maxSocketSize_,
99  0,
100  (struct sockaddr*)&fromAddress_,
101  &addressLength_)) == -1)
102  {
103  __COUT__ << "At socket with IPAddress: " << getIPAddress()
104  << " port: " << getPort() << std::endl;
105  __SS__ << "Error reading buffer from\tIP:\t";
106  std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
107  fromIPAddress = fromAddress_.sin_addr.s_addr;
108  fromPort = fromAddress_.sin_port;
109  lastIncomingIPAddress_ = fromIPAddress;
110  lastIncomingPort_ = fromPort;
111 
112  for(int i = 0; i < 4; i++)
113  {
114  ss << ((fromIPAddress << (i * 8)) & 0xff);
115  if(i < 3)
116  ss << ".";
117  }
118  ss << "\tPort\t" << ntohs(fromPort) << " IP " << fromIP << std::endl;
119  __COUT__ << "\n" << ss.str();
120  return -1;
121  }
122  // char address[INET_ADDRSTRLEN];
123  // inet_ntop(AF_INET, &(fromAddress.sin_addr), address, INET_ADDRSTRLEN);
124  fromIPAddress = fromAddress_.sin_addr.s_addr;
125  fromPort = fromAddress_.sin_port;
126  lastIncomingIPAddress_ = fromIPAddress;
127  lastIncomingPort_ = fromPort;
128 
129  //__COUT__ << __PRETTY_FUNCTION__ << "IP: " << std::hex << fromIPAddress <<
130  // std::dec << " port: " << fromPort << std::endl;
131  //__COUT__ << "Socket Number: " << socketNumber_ << " number of bytes: " <<
132  // nOfBytes << std::endl; gettimeofday(&tvend,NULL);
133  //__COUT__ << "started at" << tvbegin.tv_sec << ":" <<tvbegin.tv_usec <<
134  // std::endl;
135  //__COUT__ << "ended at" << tvend.tv_sec << ":" <<tvend.tv_usec << std::endl;
136 
137  // NOTE: this is inexpensive according to Lorenzo/documentation in C++11 (only
138  // increases size once and doesn't decrease size)
139  buffer.resize(numberOfBytes_);
140  readCounter_ = 0;
141 
142  if(verbose) // debug
143  {
144  std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
145 
146  __COUT__ << "Receiving "
147  << " at: " << getIPAddress() << ":" << getPort()
148  << " from: " << fromIP << ":" << ntohs(fromPort)
149  << " size: " << buffer.size() << std::endl;
150 
151  // std::stringstream ss;
152  // ss << "\tRx";
153  // uint32_t begin = 0;
154  // for(uint32_t i=begin; i<buffer.size(); i++)
155  // {
156  // if(i==begin+2) ss << ":::";
157  // else if(i==begin+10) ss << ":::";
158  // ss << std::setfill('0') << std::setw(2) << std::hex <<
159  //(((int16_t) buffer[i]) &0xFF) << "-" << std::dec;
160  // }
161  // ss << std::endl;
162  // std::cout << ss.str();
163  }
164  }
165  else
166  {
167  ++readCounter_;
168 
169  if(verbose)
170  __COUT__ << "No new messages for "
171  << timeoutSeconds + timeoutUSeconds / 1000000. << "s (Total "
172  << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
173  << "s). Read request timed out receiving on "
174  << " " << getIPAddress() << ":" << getPort() << std::endl;
175  return -1;
176  }
177 
178  return 0;
179 } //end receive()
180 
181 //==============================================================================
182 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
183  unsigned int timeoutSeconds,
184  unsigned int timeoutUSeconds,
185  bool verbose)
186 {
187  return receive(buffer,
188  lastIncomingIPAddress_,
189  lastIncomingPort_,
190  timeoutSeconds,
191  timeoutUSeconds,
192  verbose);
193 } //end receive()
194 
195 //==============================================================================
199 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
200  unsigned long& fromIPAddress,
201  unsigned short& fromPort,
202  unsigned int timeoutSeconds,
203  unsigned int timeoutUSeconds,
204  bool verbose)
205 {
206  // lockout other receivers for the remainder of the scope
207  std::lock_guard<std::mutex> lock(receiveMutex_);
208 
209  // set timeout period for select()
210  timeout_.tv_sec = timeoutSeconds;
211  timeout_.tv_usec = timeoutUSeconds;
212 
213  FD_ZERO(&fileDescriptor_);
214  FD_SET(socketNumber_, &fileDescriptor_);
215  select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
216  __COUT__ << "Is this a successful reeeaaad???" << std::endl;
217 
218  if(FD_ISSET(socketNumber_, &fileDescriptor_))
219  {
220  buffer.resize(maxSocketSize_ / sizeof(uint32_t)); // NOTE: this is inexpensive
221  // according to
222  // Lorezno/documentation in
223  // C++11 (only increases size
224  // once and doesn't decrease
225  // size)
226  if((numberOfBytes_ = recvfrom(socketNumber_,
227  &buffer[0],
228  maxSocketSize_,
229  0,
230  (struct sockaddr*)&fromAddress_,
231  &addressLength_)) == -1)
232  {
233  __COUT__ << "At socket with IPAddress: " << getIPAddress()
234  << " port: " << getPort() << std::endl;
235  __SS__ << "Error reading buffer from\tIP:\t";
236  std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
237  fromIPAddress = fromAddress_.sin_addr.s_addr;
238  fromPort = fromAddress_.sin_port;
239  lastIncomingIPAddress_ = fromIPAddress;
240  lastIncomingPort_ = fromPort;
241 
242  for(int i = 0; i < 4; i++)
243  {
244  ss << ((fromIPAddress << (i * 8)) & 0xff);
245  if(i < 3)
246  ss << ".";
247  }
248  ss << "\tPort\t" << ntohs(fromPort) << " IP " << fromIP << std::endl;
249  __COUT__ << "\n" << ss.str();
250  return -1;
251  }
252  // char address[INET_ADDRSTRLEN];
253  // inet_ntop(AF_INET, &(fromAddress.sin_addr), address, INET_ADDRSTRLEN);
254  fromIPAddress = fromAddress_.sin_addr.s_addr;
255  fromPort = fromAddress_.sin_port;
256  lastIncomingIPAddress_ = fromIPAddress;
257  lastIncomingPort_ = fromPort;
258 
259  //__COUT__ << __PRETTY_FUNCTION__ << "IP: " << std::hex << fromIPAddress <<
260  // std::dec << " port: " << fromPort << std::endl;
261  //__COUT__ << "Socket Number: " << socketNumber_ << " number of bytes: " <<
262  // nOfBytes << std::endl; gettimeofday(&tvend,NULL);
263  //__COUT__ << "started at" << tvbegin.tv_sec << ":" <<tvbegin.tv_usec <<
264  // std::endl;
265  //__COUT__ << "ended at" << tvend.tv_sec << ":" <<tvend.tv_usec << std::endl;
266 
267  // NOTE: this is inexpensive according to Lorenzo/documentation in C++11 (only
268  // increases size once and doesn't decrease size)
269  buffer.resize(numberOfBytes_ / sizeof(uint32_t));
270  readCounter_ = 0;
271  }
272  else
273  {
274  ++readCounter_;
275  struct sockaddr_in sin;
276  socklen_t len = sizeof(sin);
277  getsockname(socketNumber_, (struct sockaddr*)&sin, &len);
278 
279  if(verbose)
280  __COUT__ << __COUT_HDR_FL__ << "No new messages for "
281  << timeoutSeconds + timeoutUSeconds / 1000000. << "s (Total "
282  << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
283  << "s). Read request timed out for port: " << ntohs(sin.sin_port)
284  << std::endl;
285  return -1;
286  }
287  __COUT__ << "This a successful read" << std::endl;
288  return 0;
289 } //end receive()
ReceiverSocket(void)
protected constructor