otsdaq  v2_05_02_indev
TransmitterSocket.cc
1 #include "otsdaq/NetworkUtilities/TransmitterSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 
5 #include <iomanip> /* for setfill */
6 #include <iostream>
7 #include <sstream>
8 
9 using namespace ots;
10 
11 //==============================================================================
12 TransmitterSocket::TransmitterSocket(void) { __COUT__ << "TransmitterSocket constructor " << __E__; }
13 
14 //==============================================================================
15 TransmitterSocket::TransmitterSocket(const std::string& IPAddress, unsigned int port) : Socket(IPAddress, port)
16 {
17  __COUT__ << "TransmitterSocket constructor " << IPAddress << ":" << port << __E__;
18 }
19 
20 //==============================================================================
21 TransmitterSocket::~TransmitterSocket(void) {}
22 
23 //==============================================================================
24 int TransmitterSocket::send(Socket& toSocket, const std::string& buffer, bool verbose)
25 {
26  // lockout other senders for the remainder of the scope
27  std::lock_guard<std::mutex> lock(sendMutex_);
28 
29  // __COUT__ << "Socket Descriptor #: " << socketNumber_ <<
30  // " from-port: " << ntohs(socketAddress_.sin_port) <<
31  // " to-port: " << ntohs(toSocket.getSocketAddress().sin_port) << std::endl;
32 
33  constexpr size_t MAX_SEND_SIZE = 65500;
34  size_t offset = 0;
35  int sts = 1;
36  bool delay = false;
37  while(offset < buffer.size() && sts > 0)
38  {
39  auto thisSize = buffer.size() - offset > MAX_SEND_SIZE ? MAX_SEND_SIZE : buffer.size() - offset;
40 
41  if(verbose) // debug
42  {
43  __COUT__ << "Sending "
44  << " from: " << getIPAddress() << ":" << ntohs(socketAddress_.sin_port) << " to: " << toSocket.getIPAddress() << ":"
45  << ntohs(toSocket.getSocketAddress().sin_port) << " size: " << thisSize << " remaining = " << (buffer.size() - offset - thisSize)
46  << std::endl;
47  // std::stringstream ss;
48  // ss << "\t";
49  // uint32_t begin = 0;
50  // for(uint32_t i=begin; i<buffer.size(); i++)
51  // {
52  // if(i==begin+2) ss << ":::";
53  // else if(i==begin+10) ss << ":::";
54  // ss << std::setfill('0') << std::setw(2) << std::hex <<
55  //(((int16_t) buffer[i]) &0xFF) << "-" << std::dec;
56  // }
57  // ss << std::endl;
58  // std::cout << ss.str();
59  }
60 
61  if(delay)
62  usleep(10000);
63  else
64  delay = true;
65  sts = sendto(socketNumber_, buffer.c_str() + offset, thisSize, 0, (struct sockaddr*)&(toSocket.getSocketAddress()), sizeof(sockaddr_in));
66  offset += sts;
67  }
68 
69  if(sts <= 0)
70  {
71  __COUT__ << "Error writing buffer for port " << ntohs(socketAddress_.sin_port) << ": " << strerror(errno) << std::endl;
72  return -1;
73  }
74  return 0;
75 }
76 
77 //==============================================================================
78 int TransmitterSocket::send(Socket& toSocket, const std::vector<uint16_t>& buffer, bool /*verbose*/)
79 {
80  // lockout other senders for the remainder of the scope
81  std::lock_guard<std::mutex> lock(sendMutex_);
82 
83  // __COUT__ << "Socket Descriptor #: " << socketNumber_ <<
84  // " from-port: " << ntohs(socketAddress_.sin_port) <<
85  // " to-port: " << ntohs(toSocket.getSocketAddress().sin_port) << std::endl;
86 
87  constexpr size_t MAX_SEND_SIZE = 1500;
88  size_t offset = 0;
89  int sts = 1;
90 
91  while(offset < buffer.size() && sts > 0)
92  {
93  auto thisSize = 2 * (buffer.size() - offset) > MAX_SEND_SIZE ? MAX_SEND_SIZE : 2 * (buffer.size() - offset);
94  sts = sendto(socketNumber_, &buffer[0] + offset, thisSize, 0, (struct sockaddr*)&(toSocket.getSocketAddress()), sizeof(sockaddr_in));
95  offset += sts / 2;
96  }
97 
98  if(sts <= 0)
99  {
100  __COUT__ << "Error writing buffer for port " << ntohs(socketAddress_.sin_port) << ": " << strerror(errno) << std::endl;
101  return -1;
102  }
103  return 0;
104 }
105 
106 //==============================================================================
107 int TransmitterSocket::send(Socket& toSocket, const std::vector<uint32_t>& buffer, bool /*verbose*/)
108 {
109  // lockout other senders for the remainder of the scope
110  std::lock_guard<std::mutex> lock(sendMutex_);
111 
112  // __COUT__ << "Socket Descriptor #: " << socketNumber_ <<
113  // " from-port: " << ntohs(socketAddress_.sin_port) <<
114  // " to-port: " << ntohs(toSocket.getSocketAddress().sin_port) << std::endl;
115 
116  if(sendto(socketNumber_, &buffer[0], buffer.size() * sizeof(uint32_t), 0, (struct sockaddr*)&(toSocket.getSocketAddress()), sizeof(sockaddr_in)) <
117  (int)(buffer.size() * sizeof(uint32_t)))
118  {
119  __COUT__ << "Error writing buffer for port " << ntohs(socketAddress_.sin_port) << std::endl;
120  return -1;
121  }
122  return 0;
123 }