otsdaq  v2_05_02_indev
ReceiverSocket.cc
1 #include "otsdaq/NetworkUtilities/ReceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
5 
6 #include <iomanip> /* for setfill */
7 #include <iostream>
8 #include <sstream>
9 
10 #include <arpa/inet.h>
11 #include <sys/time.h>
12 
13 using namespace ots;
14 
15 //==============================================================================
16 ReceiverSocket::ReceiverSocket(std::string IPAddress, unsigned int port)
17  : Socket(IPAddress, port), addressLength_(sizeof(fromAddress_)), numberOfBytes_(0), readCounter_(0)
18 {
19  __COUT__ << "ReceiverSocket constructor " << IPAddress << ":" << port << __E__;
20 }
21 
22 //==============================================================================
23 // protected constructor
24 ReceiverSocket::ReceiverSocket(void) : addressLength_(sizeof(fromAddress_)), numberOfBytes_(0), readCounter_(0)
25 {
26  __COUT__ << "ReceiverSocket constructor" << __E__;
27 }
28 
29 //==============================================================================
30 ReceiverSocket::~ReceiverSocket(void) {}
31 
32 //==============================================================================
33 int ReceiverSocket::receive(std::string& buffer, unsigned int timeoutSeconds, unsigned int timeoutUSeconds, bool verbose)
34 {
35  return receive(buffer, dummyIPAddress_, dummyPort_, timeoutSeconds, timeoutUSeconds, verbose);
36 }
37 
38 //==============================================================================
39 // receive ~~
40 // returns 0 on success, -1 on failure
41 // NOTE: must call Socket::initialize before receiving!
42 int ReceiverSocket::receive(
43  std::string& buffer, unsigned long& fromIPAddress, unsigned short& fromPort, unsigned int timeoutSeconds, unsigned int timeoutUSeconds, bool verbose)
44 {
45  // lockout other receivers for the remainder of the scope
46  std::lock_guard<std::mutex> lock(receiveMutex_);
47 
48  // set timeout period for select()
49  timeout_.tv_sec = timeoutSeconds;
50  timeout_.tv_usec = timeoutUSeconds;
51 
52  FD_ZERO(&fileDescriptor_);
53  FD_SET(socketNumber_, &fileDescriptor_);
54  select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
55 
56  if(FD_ISSET(socketNumber_, &fileDescriptor_))
57  {
58  buffer.resize(maxSocketSize_); // NOTE: this is inexpensive according to
59  // Lorenzo/documentation in C++11 (only increases
60  // size once and doesn't decrease size)
61  if((numberOfBytes_ = recvfrom(socketNumber_, &buffer[0], maxSocketSize_, 0, (struct sockaddr*)&fromAddress_, &addressLength_)) == -1)
62  {
63  __COUT__ << "At socket with IPAddress: " << getIPAddress() << " port: " << getPort() << std::endl;
64  __SS__ << "Error reading buffer from\tIP:\t";
65  std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
66  fromIPAddress = fromAddress_.sin_addr.s_addr;
67  fromPort = fromAddress_.sin_port;
68  for(int i = 0; i < 4; i++)
69  {
70  ss << ((fromIPAddress << (i * 8)) & 0xff);
71  if(i < 3)
72  ss << ".";
73  }
74  ss << "\tPort\t" << ntohs(fromPort) << " IP " << fromIP << std::endl;
75  __COUT__ << "\n" << ss.str();
76  return -1;
77  }
78  // char address[INET_ADDRSTRLEN];
79  // inet_ntop(AF_INET, &(fromAddress.sin_addr), address, INET_ADDRSTRLEN);
80  fromIPAddress = fromAddress_.sin_addr.s_addr;
81  fromPort = fromAddress_.sin_port;
82 
83  //__COUT__ << __PRETTY_FUNCTION__ << "IP: " << std::hex << fromIPAddress <<
84  // std::dec << " port: " << fromPort << std::endl;
85  //__COUT__ << "Socket Number: " << socketNumber_ << " number of bytes: " <<
86  // nOfBytes << std::endl; gettimeofday(&tvend,NULL);
87  //__COUT__ << "started at" << tvbegin.tv_sec << ":" <<tvbegin.tv_usec <<
88  // std::endl;
89  //__COUT__ << "ended at" << tvend.tv_sec << ":" <<tvend.tv_usec << std::endl;
90 
91  // NOTE: this is inexpensive according to Lorenzo/documentation in C++11 (only
92  // increases size once and doesn't decrease size)
93  buffer.resize(numberOfBytes_);
94  readCounter_ = 0;
95 
96  if(verbose) // debug
97  {
98  std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
99 
100  __COUT__ << "Receiving "
101  << " at: " << getIPAddress() << ":" << getPort() << " from: " << fromIP;
102  std::cout << ":" << ntohs(fromPort) << " size: " << buffer.size() << std::endl;
103 
104  // std::stringstream ss;
105  // ss << "\tRx";
106  // uint32_t begin = 0;
107  // for(uint32_t i=begin; i<buffer.size(); i++)
108  // {
109  // if(i==begin+2) ss << ":::";
110  // else if(i==begin+10) ss << ":::";
111  // ss << std::setfill('0') << std::setw(2) << std::hex <<
112  //(((int16_t) buffer[i]) &0xFF) << "-" << std::dec;
113  // }
114  // ss << std::endl;
115  // std::cout << ss.str();
116  }
117  }
118  else
119  {
120  ++readCounter_;
121 
122  if(verbose)
123  __COUT__ << "No new messages for " << timeoutSeconds + timeoutUSeconds / 1000. << "s (Total "
124  << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000.) << "s). Read request timed out receiving on "
125  << " " << getIPAddress() << ":" << getPort() << std::endl;
126  return -1;
127  }
128 
129  return 0;
130 }
131 
132 //==============================================================================
133 int ReceiverSocket::receive(std::vector<uint32_t>& buffer, unsigned int timeoutSeconds, unsigned int timeoutUSeconds, bool verbose)
134 {
135  return receive(buffer, dummyIPAddress_, dummyPort_, timeoutSeconds, timeoutUSeconds, verbose);
136 }
137 
138 //==============================================================================
139 // receive ~~
140 // returns 0 on success, -1 on failure
141 // NOTE: must call Socket::initialize before receiving!
142 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
143  unsigned long& fromIPAddress,
144  unsigned short& fromPort,
145  unsigned int timeoutSeconds,
146  unsigned int timeoutUSeconds,
147  bool verbose)
148 {
149  // lockout other receivers for the remainder of the scope
150  std::lock_guard<std::mutex> lock(receiveMutex_);
151 
152  // set timeout period for select()
153  timeout_.tv_sec = timeoutSeconds;
154  timeout_.tv_usec = timeoutUSeconds;
155 
156  FD_ZERO(&fileDescriptor_);
157  FD_SET(socketNumber_, &fileDescriptor_);
158  select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
159  __COUT__ << "Is this a successful reeeaaad???" << std::endl;
160 
161  if(FD_ISSET(socketNumber_, &fileDescriptor_))
162  {
163  buffer.resize(maxSocketSize_ / sizeof(uint32_t)); // NOTE: this is inexpensive
164  // according to
165  // Lorezno/documentation in
166  // C++11 (only increases size
167  // once and doesn't decrease
168  // size)
169  if((numberOfBytes_ = recvfrom(socketNumber_, &buffer[0], maxSocketSize_, 0, (struct sockaddr*)&fromAddress_, &addressLength_)) == -1)
170  {
171  __COUT__ << "At socket with IPAddress: " << getIPAddress() << " port: " << getPort() << std::endl;
172  __SS__ << "Error reading buffer from\tIP:\t";
173  for(int i = 0; i < 4; i++)
174  {
175  ss << ((fromIPAddress << (i * 8)) & 0xff);
176  if(i < 3)
177  ss << ".";
178  }
179  ss << "\tPort\t" << fromPort << std::endl;
180  __COUT__ << "\n" << ss.str();
181  return -1;
182  }
183  // char address[INET_ADDRSTRLEN];
184  // inet_ntop(AF_INET, &(fromAddress.sin_addr), address, INET_ADDRSTRLEN);
185  fromIPAddress = fromAddress_.sin_addr.s_addr;
186  fromPort = fromAddress_.sin_port;
187 
188  //__COUT__ << __PRETTY_FUNCTION__ << "IP: " << std::hex << fromIPAddress <<
189  // std::dec << " port: " << fromPort << std::endl;
190  //__COUT__ << "Socket Number: " << socketNumber_ << " number of bytes: " <<
191  // nOfBytes << std::endl; gettimeofday(&tvend,NULL);
192  //__COUT__ << "started at" << tvbegin.tv_sec << ":" <<tvbegin.tv_usec <<
193  // std::endl;
194  //__COUT__ << "ended at" << tvend.tv_sec << ":" <<tvend.tv_usec << std::endl;
195 
196  // NOTE: this is inexpensive according to Lorenzo/documentation in C++11 (only
197  // increases size once and doesn't decrease size)
198  buffer.resize(numberOfBytes_ / sizeof(uint32_t));
199  readCounter_ = 0;
200  }
201  else
202  {
203  ++readCounter_;
204  struct sockaddr_in sin;
205  socklen_t len = sizeof(sin);
206  getsockname(socketNumber_, (struct sockaddr*)&sin, &len);
207 
208  if(verbose)
209  __COUT__ << __COUT_HDR_FL__ << "No new messages for " << timeoutSeconds + timeoutUSeconds / 1000. << "s (Total "
210  << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000.) << "s). Read request timed out for port: " << ntohs(sin.sin_port)
211  << std::endl;
212  return -1;
213  }
214  __COUT__ << "This a successful reeeaaad" << std::endl;
215  return 0;
216 }