1 #include "otsdaq/NetworkUtilities/ReceiverSocket.h" 
    2 #include "otsdaq/Macros/CoutMacros.h" 
    3 #include "otsdaq/MessageFacility/MessageFacility.h" 
    4 #include "otsdaq/NetworkUtilities/NetworkConverters.h" 
   10 #include <arpa/inet.h> 
   18     , addressLength_(sizeof(fromAddress_))
 
   22     __COUT__ << 
"ReceiverSocket constructor " << IPAddress << 
":" << port << __E__;
 
   27 ReceiverSocket::ReceiverSocket(
void)
 
   28     : addressLength_(sizeof(fromAddress_)), numberOfBytes_(0), readCounter_(0)
 
   30     __COUT__ << 
"ReceiverSocket constructor" << __E__;
 
   34 ReceiverSocket::~ReceiverSocket(
void) {}
 
   37 std::string ReceiverSocket::getLastIncomingIPAddress(
void)
 
   40     for(
int i = 0; i < 4; i++)
 
   42         fromIP += std::to_string((lastIncomingIPAddress_ << (i * 8)) & 0xff);
 
   50 unsigned short ReceiverSocket::getLastIncomingPort(
void)
 
   52     return ntohs(lastIncomingPort_);
 
   56 int ReceiverSocket::receive(std::string& buffer,
 
   57                             unsigned int timeoutSeconds,
 
   58                             unsigned int timeoutUSeconds,
 
   61     return receive(buffer,
 
   62                    lastIncomingIPAddress_,
 
   73 int ReceiverSocket::receive(std::string&    buffer,
 
   74                             unsigned long&  fromIPAddress,
 
   75                             unsigned short& fromPort,
 
   76                             unsigned int    timeoutSeconds,
 
   77                             unsigned int    timeoutUSeconds,
 
   81     std::lock_guard<std::mutex> lock(receiveMutex_);
 
   84     timeout_.tv_sec  = timeoutSeconds;
 
   85     timeout_.tv_usec = timeoutUSeconds;
 
   87     FD_ZERO(&fileDescriptor_);
 
   88     FD_SET(socketNumber_, &fileDescriptor_);
 
   89     select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
 
   91     if(FD_ISSET(socketNumber_, &fileDescriptor_))
 
   93         buffer.resize(maxSocketSize_);  
 
   96         if((numberOfBytes_ = recvfrom(socketNumber_,
 
  100                                       (
struct sockaddr*)&fromAddress_,
 
  101                                       &addressLength_)) == -1)
 
  103             __COUT__ << 
"At socket with IPAddress: " << getIPAddress()
 
  104                      << 
" port: " << getPort() << std::endl;
 
  105             __SS__ << 
"Error reading buffer from\tIP:\t";
 
  106             std::string fromIP     = inet_ntoa(fromAddress_.sin_addr);
 
  107             fromIPAddress          = fromAddress_.sin_addr.s_addr;
 
  108             fromPort               = fromAddress_.sin_port;
 
  109             lastIncomingIPAddress_ = fromIPAddress;
 
  110             lastIncomingPort_      = fromPort;
 
  112             for(
int i = 0; i < 4; i++)
 
  114                 ss << ((fromIPAddress << (i * 8)) & 0xff);
 
  118             ss << 
"\tPort\t" << ntohs(fromPort) << 
" IP " << fromIP << std::endl;
 
  119             __COUT__ << 
"\n" << ss.str();
 
  124         fromIPAddress          = fromAddress_.sin_addr.s_addr;
 
  125         fromPort               = fromAddress_.sin_port;
 
  126         lastIncomingIPAddress_ = fromIPAddress;
 
  127         lastIncomingPort_      = fromPort;
 
  139         buffer.resize(numberOfBytes_);
 
  144             std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
 
  146             __COUT__ << 
"Receiving " 
  147                      << 
" at: " << getIPAddress() << 
":" << getPort()
 
  148                      << 
" from: " << fromIP << 
":" << ntohs(fromPort)
 
  149                      << 
" size: " << buffer.size() << std::endl;
 
  170             __COUT__ << 
"No new messages for " 
  171                      << timeoutSeconds + timeoutUSeconds / 1000000. << 
"s (Total " 
  172                      << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
 
  173                      << 
"s). Read request timed out receiving on " 
  174                      << 
" " << getIPAddress() << 
":" << getPort() << std::endl;
 
  182 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
 
  183                             unsigned int           timeoutSeconds,
 
  184                             unsigned int           timeoutUSeconds,
 
  187     return receive(buffer,
 
  188                    lastIncomingIPAddress_,
 
  199 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
 
  200                             unsigned long&         fromIPAddress,
 
  201                             unsigned short&        fromPort,
 
  202                             unsigned int           timeoutSeconds,
 
  203                             unsigned int           timeoutUSeconds,
 
  207     std::lock_guard<std::mutex> lock(receiveMutex_);
 
  210     timeout_.tv_sec  = timeoutSeconds;
 
  211     timeout_.tv_usec = timeoutUSeconds;
 
  213     FD_ZERO(&fileDescriptor_);
 
  214     FD_SET(socketNumber_, &fileDescriptor_);
 
  215     select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
 
  216     __COUT__ << 
"Is this a successful reeeaaad???" << std::endl;
 
  218     if(FD_ISSET(socketNumber_, &fileDescriptor_))
 
  220         buffer.resize(maxSocketSize_ / 
sizeof(uint32_t));  
 
  226         if((numberOfBytes_ = recvfrom(socketNumber_,
 
  230                                       (
struct sockaddr*)&fromAddress_,
 
  231                                       &addressLength_)) == -1)
 
  233             __COUT__ << 
"At socket with IPAddress: " << getIPAddress()
 
  234                      << 
" port: " << getPort() << std::endl;
 
  235             __SS__ << 
"Error reading buffer from\tIP:\t";
 
  236             std::string fromIP     = inet_ntoa(fromAddress_.sin_addr);
 
  237             fromIPAddress          = fromAddress_.sin_addr.s_addr;
 
  238             fromPort               = fromAddress_.sin_port;
 
  239             lastIncomingIPAddress_ = fromIPAddress;
 
  240             lastIncomingPort_      = fromPort;
 
  242             for(
int i = 0; i < 4; i++)
 
  244                 ss << ((fromIPAddress << (i * 8)) & 0xff);
 
  248             ss << 
"\tPort\t" << ntohs(fromPort) << 
" IP " << fromIP << std::endl;
 
  249             __COUT__ << 
"\n" << ss.str();
 
  254         fromIPAddress          = fromAddress_.sin_addr.s_addr;
 
  255         fromPort               = fromAddress_.sin_port;
 
  256         lastIncomingIPAddress_ = fromIPAddress;
 
  257         lastIncomingPort_      = fromPort;
 
  269         buffer.resize(numberOfBytes_ / 
sizeof(uint32_t));
 
  275         struct sockaddr_in sin;
 
  276         socklen_t          len = 
sizeof(sin);
 
  277         getsockname(socketNumber_, (
struct sockaddr*)&sin, &len);
 
  280             __COUT__ << __COUT_HDR_FL__ << 
"No new messages for " 
  281                      << timeoutSeconds + timeoutUSeconds / 1000000. << 
"s (Total " 
  282                      << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
 
  283                      << 
"s). Read request timed out for port: " << ntohs(sin.sin_port)
 
  287     __COUT__ << 
"This a successful read" << std::endl;
 
ReceiverSocket(void)
protected constructor