tdaq-develop-2025-02-12
TCPReceiverSocket.cc
1 #include "otsdaq/NetworkUtilities/TCPReceiverSocket.h"
2 #include <arpa/inet.h>
3 #include <sys/socket.h>
4 #include <unistd.h>
5 #include <iostream>
6 #include <sstream>
7 #include <stdexcept>
8 #include <vector>
9 #include "otsdaq/Macros/CoutMacros.h"
10 
11 using namespace ots;
12 
13 //==============================================================================
14 TCPReceiverSocket::TCPReceiverSocket(int socketId) : TCPSocket(socketId) {}
15 
16 //==============================================================================
17 TCPReceiverSocket::~TCPReceiverSocket(void) {}
18 
19 //==============================================================================
20 std::string TCPReceiverSocket::receivePacket(std::chrono::milliseconds timeout)
21 {
22  std::string retVal = "";
23  auto start = std::chrono::steady_clock::now();
24 
25  size_t received_bytes = 0;
26  uint32_t message_size;
27  while(received_bytes < 4 && std::chrono::duration_cast<std::chrono::milliseconds>(
28  std::chrono::steady_clock::now() - start) < timeout)
29  {
30  int this_received_bytes = receive(
31  reinterpret_cast<char*>(&message_size) + received_bytes, 4 - received_bytes);
32  if(this_received_bytes < 0)
33  {
34  continue;
35  }
36  received_bytes += this_received_bytes;
37  }
38 
39  if(received_bytes == 0 && std::chrono::duration_cast<std::chrono::milliseconds>(
40  std::chrono::steady_clock::now() - start) >= timeout)
41  {
42  // std::cout << __PRETTY_FUNCTION__ << " timeout while receiving message size, returning null (received " << static_cast<int>(received_bytes) << "
43  // bytes)" << std::endl;
44  return retVal;
45  }
46  else
47  {
48  while(received_bytes < 4)
49  {
50  int this_received_bytes =
51  receive(reinterpret_cast<char*>(&message_size) + received_bytes,
52  4 - received_bytes);
53  if(this_received_bytes < 0)
54  {
55  continue;
56  }
57  received_bytes += this_received_bytes;
58  }
59  }
60 
61  message_size = ntohl(message_size);
62  // std::cout << "Received message size in header: " << message_size << std::endl;
63  message_size -= 4; // Message size in header is inclusive, remove header size
64  std::vector<char> buffer(message_size);
65  received_bytes = 0;
66  while(received_bytes < message_size)
67  {
68  int this_received_bytes =
69  receive(&buffer[received_bytes], message_size - received_bytes);
70  // std::cout << "Message receive returned " << this_received_bytes << std::endl;
71  if(this_received_bytes < 0)
72  {
73  continue;
74  }
75  received_bytes += this_received_bytes;
76  }
77 
78  retVal = std::string(buffer.begin(), buffer.end());
79 
80  return retVal;
81 }
82 
83 //==============================================================================
84 int TCPReceiverSocket::receive(char* buffer,
85  std::size_t bufferSize,
86  int /*timeoutMicroSeconds*/)
87 {
88  // std::cout << __PRETTY_FUNCTION__ << "Receiving Message for socket: " << getSocketId() << std::endl;
89  if(getSocketId() == 0)
90  {
91  throw std::logic_error("Bad socket object (this object was moved)");
92  }
93  // std::cout << __PRETTY_FUNCTION__ << "WAITING: " << std::endl;
94  int dataRead = ::read(getSocketId(), buffer, bufferSize);
95  // std::cout << __PRETTY_FUNCTION__ << "Message-" << buffer << "- Error? " << (dataRead == static_cast<std::size_t>(-1)) << std::endl;
96  if(dataRead < 0)
97  {
98  __SS__;
99  switch(errno)
100  {
101  case EBADF:
102  ss << "Socket file descriptor " << getSocketId()
103  << " is not a valid file descriptor or is not open for reading...Errno: "
104  << errno;
105  break;
106  case EFAULT:
107  ss << "Buffer is outside your accessible address space...Errno: " << errno;
108  break;
109  case ENXIO: // Fatal error. Programming bug
110  ss << "Read critical error caused by a programming bug...Errno: " << errno;
111  break;
112  case EINTR:
113  // TODO: Check for user interrupt flags.
114  // Beyond the scope of this project
115  // so continue normal operations.
116  ss << "The call was interrupted by a signal before any data was "
117  "read...Errno: "
118  << errno;
119  break;
120  case EAGAIN:
121  // recv is non blocking so this error is issued every time there are no messages to read
122  // std::cout << __PRETTY_FUNCTION__ << "Couldn't read any data: " << dataRead << std::endl;
123  // std::this_thread::sleep_for (std::chrono::seconds(1));
124  return dataRead;
125  case ENOTCONN:
126  // Connection broken.
127  // Return the data we have available and exit
128  // as if the connection was closed correctly.
129  return dataRead;
130  default:
131  ss << "Read: returned -1...Errno: " << errno;
132  }
133  __SS_THROW__;
134  }
135  else if(dataRead == static_cast<std::size_t>(0))
136  {
137  __SS__ << "Connection closed!" << std::endl;
138  __SS_THROW__;
139  }
140  // std::cout << __PRETTY_FUNCTION__ << "Message: " << buffer << " -> is error free! Socket: " << getSocketId() << std::endl;
141  return dataRead;
142 }
143 
144 //==============================================================================
145 void TCPReceiverSocket::setReceiveTimeout(unsigned int timeoutSeconds,
146  unsigned int timeoutMicroSeconds)
147 {
148  struct timeval tv;
149  tv.tv_sec = timeoutSeconds;
150  tv.tv_usec = timeoutMicroSeconds;
151  setsockopt(getSocketId(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
152 }