1 #include "otsdaq/NetworkUtilities/ReceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
10 #include <arpa/inet.h>
16 ReceiverSocket::ReceiverSocket(std::string IPAddress,
unsigned int port)
17 :
Socket(IPAddress, port), addressLength_(sizeof(fromAddress_)), numberOfBytes_(0), readCounter_(0)
19 __COUT__ <<
"ReceiverSocket constructor " << IPAddress <<
":" << port << __E__;
24 ReceiverSocket::ReceiverSocket(
void) : addressLength_(sizeof(fromAddress_)), numberOfBytes_(0), readCounter_(0)
26 __COUT__ <<
"ReceiverSocket constructor" << __E__;
30 ReceiverSocket::~ReceiverSocket(
void) {}
33 int ReceiverSocket::receive(std::string& buffer,
unsigned int timeoutSeconds,
unsigned int timeoutUSeconds,
bool verbose)
35 return receive(buffer, dummyIPAddress_, dummyPort_, timeoutSeconds, timeoutUSeconds, verbose);
42 int ReceiverSocket::receive(
43 std::string& buffer,
unsigned long& fromIPAddress,
unsigned short& fromPort,
unsigned int timeoutSeconds,
unsigned int timeoutUSeconds,
bool verbose)
46 std::lock_guard<std::mutex> lock(receiveMutex_);
49 timeout_.tv_sec = timeoutSeconds;
50 timeout_.tv_usec = timeoutUSeconds;
52 FD_ZERO(&fileDescriptor_);
53 FD_SET(socketNumber_, &fileDescriptor_);
54 select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
56 if(FD_ISSET(socketNumber_, &fileDescriptor_))
58 buffer.resize(maxSocketSize_);
61 if((numberOfBytes_ = recvfrom(socketNumber_, &buffer[0], maxSocketSize_, 0, (
struct sockaddr*)&fromAddress_, &addressLength_)) == -1)
63 __COUT__ <<
"At socket with IPAddress: " << getIPAddress() <<
" port: " << getPort() << std::endl;
64 __SS__ <<
"Error reading buffer from\tIP:\t";
65 std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
66 fromIPAddress = fromAddress_.sin_addr.s_addr;
67 fromPort = fromAddress_.sin_port;
68 for(
int i = 0; i < 4; i++)
70 ss << ((fromIPAddress << (i * 8)) & 0xff);
74 ss <<
"\tPort\t" << ntohs(fromPort) <<
" IP " << fromIP << std::endl;
75 __COUT__ <<
"\n" << ss.str();
80 fromIPAddress = fromAddress_.sin_addr.s_addr;
81 fromPort = fromAddress_.sin_port;
93 buffer.resize(numberOfBytes_);
98 std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
100 __COUT__ <<
"Receiving "
101 <<
" at: " << getIPAddress() <<
":" << getPort() <<
" from: " << fromIP;
102 std::cout <<
":" << ntohs(fromPort) <<
" size: " << buffer.size() << std::endl;
123 __COUT__ <<
"No new messages for " << timeoutSeconds + timeoutUSeconds / 1000. <<
"s (Total "
124 << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000.) <<
"s). Read request timed out receiving on "
125 <<
" " << getIPAddress() <<
":" << getPort() << std::endl;
133 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
unsigned int timeoutSeconds,
unsigned int timeoutUSeconds,
bool verbose)
135 return receive(buffer, dummyIPAddress_, dummyPort_, timeoutSeconds, timeoutUSeconds, verbose);
142 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
143 unsigned long& fromIPAddress,
144 unsigned short& fromPort,
145 unsigned int timeoutSeconds,
146 unsigned int timeoutUSeconds,
150 std::lock_guard<std::mutex> lock(receiveMutex_);
153 timeout_.tv_sec = timeoutSeconds;
154 timeout_.tv_usec = timeoutUSeconds;
156 FD_ZERO(&fileDescriptor_);
157 FD_SET(socketNumber_, &fileDescriptor_);
158 select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
159 __COUT__ <<
"Is this a successful reeeaaad???" << std::endl;
161 if(FD_ISSET(socketNumber_, &fileDescriptor_))
163 buffer.resize(maxSocketSize_ /
sizeof(uint32_t));
169 if((numberOfBytes_ = recvfrom(socketNumber_, &buffer[0], maxSocketSize_, 0, (
struct sockaddr*)&fromAddress_, &addressLength_)) == -1)
171 __COUT__ <<
"At socket with IPAddress: " << getIPAddress() <<
" port: " << getPort() << std::endl;
172 __SS__ <<
"Error reading buffer from\tIP:\t";
173 for(
int i = 0; i < 4; i++)
175 ss << ((fromIPAddress << (i * 8)) & 0xff);
179 ss <<
"\tPort\t" << fromPort << std::endl;
180 __COUT__ <<
"\n" << ss.str();
185 fromIPAddress = fromAddress_.sin_addr.s_addr;
186 fromPort = fromAddress_.sin_port;
198 buffer.resize(numberOfBytes_ /
sizeof(uint32_t));
204 struct sockaddr_in sin;
205 socklen_t len =
sizeof(sin);
206 getsockname(socketNumber_, (
struct sockaddr*)&sin, &len);
209 __COUT__ << __COUT_HDR_FL__ <<
"No new messages for " << timeoutSeconds + timeoutUSeconds / 1000. <<
"s (Total "
210 << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000.) <<
"s). Read request timed out for port: " << ntohs(sin.sin_port)
214 __COUT__ <<
"This a successful reeeaaad" << std::endl;