tdaq-develop-2025-02-12
TransmitterSocket.cc
1 #include "otsdaq/NetworkUtilities/TransmitterSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 
5 #include <iomanip> /* for setfill */
6 #include <iostream>
7 #include <sstream>
8 
9 using namespace ots;
10 
11 //==============================================================================
12 TransmitterSocket::TransmitterSocket(void)
13 {
14  __COUT__ << "TransmitterSocket constructor " << __E__;
15 }
16 
17 //==============================================================================
18 TransmitterSocket::TransmitterSocket(const std::string& IPAddress, unsigned int port)
19  : Socket(IPAddress, port)
20 {
21  __COUT__ << "TransmitterSocket constructor " << IPAddress << ":" << port << __E__;
22 }
23 
24 //==============================================================================
25 TransmitterSocket::~TransmitterSocket(void) {}
26 
27 //==============================================================================
28 int TransmitterSocket::send(Socket& toSocket, const std::string& buffer, bool verbose)
29 {
30  // lockout other senders for the remainder of the scope
31  std::lock_guard<std::mutex> lock(sendMutex_);
32 
33  // __COUT__ << "Socket Descriptor #: " << socketNumber_ <<
34  // " from-port: " << ntohs(socketAddress_.sin_port) <<
35  // " to-port: " << ntohs(toSocket.getSocketAddress().sin_port) << std::endl;
36 
37  constexpr size_t MAX_SEND_SIZE = 65500;
38  size_t offset = 0;
39  int sts = 1;
40  bool delay = false;
41  while(offset < buffer.size() && sts > 0)
42  {
43  auto thisSize = buffer.size() - offset > MAX_SEND_SIZE ? MAX_SEND_SIZE
44  : buffer.size() - offset;
45 
46  if(verbose) // debug
47  {
48  __COUT__ << "Sending "
49  << " from: " << getIPAddress() << ":"
50  << ntohs(socketAddress_.sin_port)
51  << " to: " << toSocket.getIPAddress() << ":"
52  << ntohs(toSocket.getSocketAddress().sin_port)
53  << " size: " << thisSize
54  << " remaining = " << (buffer.size() - offset - thisSize)
55  << std::endl;
56  // std::stringstream ss;
57  // ss << "\t";
58  // uint32_t begin = 0;
59  // for(uint32_t i=begin; i<buffer.size(); i++)
60  // {
61  // if(i==begin+2) ss << ":::";
62  // else if(i==begin+10) ss << ":::";
63  // ss << std::setfill('0') << std::setw(2) << std::hex <<
64  //(((int16_t) buffer[i]) &0xFF) << "-" << std::dec;
65  // }
66  // ss << std::endl;
67  // std::cout << ss.str();
68  }
69 
70  if(delay)
71  usleep(10000);
72  else
73  delay = true;
74  sts = sendto(socketNumber_,
75  buffer.c_str() + offset,
76  thisSize,
77  0,
78  (struct sockaddr*)&(toSocket.getSocketAddress()),
79  sizeof(sockaddr_in));
80  offset += sts;
81  }
82 
83  if(sts <= 0)
84  {
85  __SS__ << "Error writing buffer for port " << ntohs(socketAddress_.sin_port)
86  << ": " << strerror(errno)
87  << ". Was this tx socket initialized with Socket::Initialize()?" << __E__;
88  __SS_THROW__; //return -1;
89  }
90  return 0;
91 } //end send()
92 
93 //==============================================================================
94 int TransmitterSocket::send(Socket& toSocket,
95  const std::vector<uint16_t>& buffer,
96  bool /*verbose*/)
97 {
98  // lockout other senders for the remainder of the scope
99  std::lock_guard<std::mutex> lock(sendMutex_);
100 
101  // __COUT__ << "Socket Descriptor #: " << socketNumber_ <<
102  // " from-port: " << ntohs(socketAddress_.sin_port) <<
103  // " to-port: " << ntohs(toSocket.getSocketAddress().sin_port) << std::endl;
104 
105  constexpr size_t MAX_SEND_SIZE = 1500;
106  size_t offset = 0;
107  int sts = 1;
108 
109  while(offset < buffer.size() && sts > 0)
110  {
111  auto thisSize = 2 * (buffer.size() - offset) > MAX_SEND_SIZE
112  ? MAX_SEND_SIZE
113  : 2 * (buffer.size() - offset);
114  sts = sendto(socketNumber_,
115  &buffer[0] + offset,
116  thisSize,
117  0,
118  (struct sockaddr*)&(toSocket.getSocketAddress()),
119  sizeof(sockaddr_in));
120  offset += sts / 2;
121  }
122 
123  if(sts <= 0)
124  {
125  __SS__ << "Error writing buffer for port " << ntohs(socketAddress_.sin_port)
126  << ": " << strerror(errno)
127  << ". Was this tx socket initialized with Socket::Initialize()?" << __E__;
128  __SS_THROW__; //return -1;
129  }
130  return 0;
131 } //end send()
132 
133 //==============================================================================
134 int TransmitterSocket::send(Socket& toSocket,
135  const std::vector<uint32_t>& buffer,
136  bool /*verbose*/)
137 {
138  // lockout other senders for the remainder of the scope
139  std::lock_guard<std::mutex> lock(sendMutex_);
140 
141  // __COUT__ << "Socket Descriptor #: " << socketNumber_ <<
142  // " from-port: " << ntohs(socketAddress_.sin_port) <<
143  // " to-port: " << ntohs(toSocket.getSocketAddress().sin_port) << std::endl;
144 
145  if(sendto(socketNumber_,
146  &buffer[0],
147  buffer.size() * sizeof(uint32_t),
148  0,
149  (struct sockaddr*)&(toSocket.getSocketAddress()),
150  sizeof(sockaddr_in)) < (int)(buffer.size() * sizeof(uint32_t)))
151  {
152  __SS__ << "Error writing buffer for port " << ntohs(socketAddress_.sin_port)
153  << ". Was this tx socket initialized with Socket::Initialize()?" << __E__;
154  __SS_THROW__; //return -1;
155  }
156  return 0;
157 } //end send()