1 #include "otsdaq/NetworkUtilities/ReceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
10 #include <arpa/inet.h>
18 , addressLength_(sizeof(fromAddress_))
22 __COUT__ <<
"ReceiverSocket constructor " << IPAddress <<
":" << port << __E__;
27 ReceiverSocket::ReceiverSocket(
void)
28 : addressLength_(sizeof(fromAddress_)), numberOfBytes_(0), readCounter_(0)
30 __COUT__ <<
"ReceiverSocket constructor" << __E__;
34 ReceiverSocket::~ReceiverSocket(
void) {}
37 std::string ReceiverSocket::getLastIncomingIPAddress(
void)
40 for(
int i = 0; i < 4; i++)
42 fromIP += std::to_string((lastIncomingIPAddress_ << (i * 8)) & 0xff);
50 unsigned short ReceiverSocket::getLastIncomingPort(
void)
52 return ntohs(lastIncomingPort_);
56 int ReceiverSocket::receive(std::string& buffer,
57 unsigned int timeoutSeconds,
58 unsigned int timeoutUSeconds,
61 return receive(buffer,
62 lastIncomingIPAddress_,
73 int ReceiverSocket::receive(std::string& buffer,
74 unsigned long& fromIPAddress,
75 unsigned short& fromPort,
76 unsigned int timeoutSeconds,
77 unsigned int timeoutUSeconds,
81 std::lock_guard<std::mutex> lock(receiveMutex_);
84 timeout_.tv_sec = timeoutSeconds;
85 timeout_.tv_usec = timeoutUSeconds;
87 FD_ZERO(&fileDescriptor_);
88 FD_SET(socketNumber_, &fileDescriptor_);
89 select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
91 if(FD_ISSET(socketNumber_, &fileDescriptor_))
93 buffer.resize(maxSocketSize_);
96 if((numberOfBytes_ = recvfrom(socketNumber_,
100 (
struct sockaddr*)&fromAddress_,
101 &addressLength_)) == -1)
103 __COUT__ <<
"At socket with IPAddress: " << getIPAddress()
104 <<
" port: " << getPort() << std::endl;
105 __SS__ <<
"Error reading buffer from\tIP:\t";
106 std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
107 fromIPAddress = fromAddress_.sin_addr.s_addr;
108 fromPort = fromAddress_.sin_port;
109 lastIncomingIPAddress_ = fromIPAddress;
110 lastIncomingPort_ = fromPort;
112 for(
int i = 0; i < 4; i++)
114 ss << ((fromIPAddress << (i * 8)) & 0xff);
118 ss <<
"\tPort\t" << ntohs(fromPort) <<
" IP " << fromIP << std::endl;
119 __COUT__ <<
"\n" << ss.str();
124 fromIPAddress = fromAddress_.sin_addr.s_addr;
125 fromPort = fromAddress_.sin_port;
126 lastIncomingIPAddress_ = fromIPAddress;
127 lastIncomingPort_ = fromPort;
139 buffer.resize(numberOfBytes_);
144 std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
146 __COUT__ <<
"Receiving "
147 <<
" at: " << getIPAddress() <<
":" << getPort()
148 <<
" from: " << fromIP <<
":" << ntohs(fromPort)
149 <<
" size: " << buffer.size() << std::endl;
170 __COUT__ <<
"No new messages for "
171 << timeoutSeconds + timeoutUSeconds / 1000000. <<
"s (Total "
172 << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
173 <<
"s). Read request timed out receiving on "
174 <<
" " << getIPAddress() <<
":" << getPort() << std::endl;
182 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
183 unsigned int timeoutSeconds,
184 unsigned int timeoutUSeconds,
187 return receive(buffer,
188 lastIncomingIPAddress_,
199 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
200 unsigned long& fromIPAddress,
201 unsigned short& fromPort,
202 unsigned int timeoutSeconds,
203 unsigned int timeoutUSeconds,
207 std::lock_guard<std::mutex> lock(receiveMutex_);
210 timeout_.tv_sec = timeoutSeconds;
211 timeout_.tv_usec = timeoutUSeconds;
213 FD_ZERO(&fileDescriptor_);
214 FD_SET(socketNumber_, &fileDescriptor_);
215 select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
216 __COUT__ <<
"Is this a successful reeeaaad???" << std::endl;
218 if(FD_ISSET(socketNumber_, &fileDescriptor_))
220 buffer.resize(maxSocketSize_ /
sizeof(uint32_t));
226 if((numberOfBytes_ = recvfrom(socketNumber_,
230 (
struct sockaddr*)&fromAddress_,
231 &addressLength_)) == -1)
233 __COUT__ <<
"At socket with IPAddress: " << getIPAddress()
234 <<
" port: " << getPort() << std::endl;
235 __SS__ <<
"Error reading buffer from\tIP:\t";
236 std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
237 fromIPAddress = fromAddress_.sin_addr.s_addr;
238 fromPort = fromAddress_.sin_port;
239 lastIncomingIPAddress_ = fromIPAddress;
240 lastIncomingPort_ = fromPort;
242 for(
int i = 0; i < 4; i++)
244 ss << ((fromIPAddress << (i * 8)) & 0xff);
248 ss <<
"\tPort\t" << ntohs(fromPort) <<
" IP " << fromIP << std::endl;
249 __COUT__ <<
"\n" << ss.str();
254 fromIPAddress = fromAddress_.sin_addr.s_addr;
255 fromPort = fromAddress_.sin_port;
256 lastIncomingIPAddress_ = fromIPAddress;
257 lastIncomingPort_ = fromPort;
269 buffer.resize(numberOfBytes_ /
sizeof(uint32_t));
275 struct sockaddr_in sin;
276 socklen_t len =
sizeof(sin);
277 getsockname(socketNumber_, (
struct sockaddr*)&sin, &len);
280 __COUT__ << __COUT_HDR_FL__ <<
"No new messages for "
281 << timeoutSeconds + timeoutUSeconds / 1000000. <<
"s (Total "
282 << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
283 <<
"s). Read request timed out for port: " << ntohs(sin.sin_port)
287 __COUT__ <<
"This a successful read" << std::endl;
ReceiverSocket(void)
protected constructor