tdaq-develop-2025-02-12
MF_ReceiveAndForward.cpp
1 #include <arpa/inet.h>
30 #include <errno.h>
31 #include <netdb.h>
32 #include <netinet/in.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <sys/socket.h>
37 #include <sys/types.h>
38 #include <unistd.h>
39 #include <iostream>
40 #include <map>
41 
42 #define MAXBUFLEN 5000
43 
44 #define __MF_SUBJECT__ "mfReceiveAndForward"
45 
46 #define __SHORTFILE__ \
47  (__builtin_strstr(&__FILE__[0], "/srcs/") \
48  ? __builtin_strstr(&__FILE__[0], "/srcs/") + 6 \
49  : __FILE__)
50 #define __COUT_HDR_L__ "[" << std::dec << __LINE__ << " |\t"
51 #define __COUT_HDR_FL__ __SHORTFILE__ << " " << __COUT_HDR_L__
52 
53 #define __COUT_TYPE__(X) std::cout << QUOTE(X) << ":" << __MF_SUBJECT__ << ":"
54 #define __COUT_ERR__ __COUT_TYPE__(LogError) << __COUT_HDR_FL__
55 #define __COUT_WARN__ __COUT_TYPE__(LogWarning) << __COUT_HDR_FL__
56 #define __COUT_INFO__ __COUT_TYPE__(LogInfo) << __COUT_HDR_FL__
57 #define __COUT__ __COUT_TYPE__(LogDebug) << __COUT_HDR_FL__
58 
59 #define __SS__ \
60  std::stringstream ss; \
61  ss << "|" << __MF_DECOR__ << ": " << __COUT_HDR_FL__
62 #define __SS_THROW__ \
63  { \
64  __COUT_ERR__ << "\n" << ss.str(); \
65  throw std::runtime_error(ss.str()); \
66  } //put in {}'s to prevent surprises, e.g. if ... else __SS_THROW__;
67 #define __E__ std::endl
68 #define Q(X) #X
69 #define QUOTE(X) Q(X)
70 #define __COUTV__(X) __COUT__ << QUOTE(X) << " = " << X << __E__
71 
73 void* get_in_addr(struct sockaddr* sa)
74 {
75  if(sa->sa_family == AF_INET)
76  {
77  return &(((struct sockaddr_in*)sa)->sin_addr);
78  }
79 
80  return &(((struct sockaddr_in6*)sa)->sin6_addr);
81 }
82 
83 int makeSocket(const char* ip, int port, struct addrinfo*& p)
84 {
85  int sockfd;
86  struct addrinfo hints, *servinfo;
87  int rv;
88  // char s[INET6_ADDRSTRLEN];
89 
90  memset(&hints, 0, sizeof hints);
91  hints.ai_family = AF_UNSPEC;
92  hints.ai_socktype = SOCK_DGRAM;
93  char portStr[10];
94  sprintf(portStr, "%d", port);
95  if((rv = getaddrinfo(ip, portStr, &hints, &servinfo)) != 0)
96  {
97  fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
98  return 1;
99  }
100 
101  // loop through all the results and make a socket
102  for(p = servinfo; p != NULL; p = p->ai_next)
103  {
104  if((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
105  {
106  perror("sw: socket");
107  continue;
108  }
109 
110  break;
111  }
112 
113  if(p == NULL)
114  {
115  fprintf(stderr, "sw: failed to create socket\n");
116  return 2;
117  }
118 
119  freeaddrinfo(servinfo);
120 
121  return sockfd;
122 }
123 
124 int main(int argc, char** argv)
125 {
126  __COUT__ << "Starting...\n\n" << __E__;
127 
128  std::string myPort_("3000"); // set default
129  std::string myFwdPort_("3001"); // set default
130  std::string myFwdIP_("127.0.0.1"); // set default
131  std::string enablePrintouts_("do not"); // set default
132  if(argc >= 2)
133  {
134  __COUT__ << "port parameter file:" << argv[1] << "\n\n" << __E__;
135  FILE* fp = fopen(argv[1], "r");
136  if(fp)
137  {
138  char tmp[100];
139  char tmpParamStr[100];
140  fgets(tmp, 100, fp);
141  sscanf(tmp, "%*s %s", tmpParamStr);
142  myPort_ = tmpParamStr;
143  fgets(tmp, 100, fp);
144  sscanf(tmp, "%*s %s", tmpParamStr);
145  myFwdPort_ = tmpParamStr;
146  fgets(tmp, 100, fp);
147  sscanf(tmp, "%*s %s", tmpParamStr);
148  myFwdIP_ = tmpParamStr;
149  fgets(tmp, 100, fp);
150  sscanf(tmp, "%*s %s", tmpParamStr);
151  enablePrintouts_ = tmpParamStr;
152  fclose(fp);
153  }
154  else // else use defaults
155  __COUT__ << "port parameter file failed to open: " << argv[1] << "\n\n"
156  << __E__;
157  }
158  __COUT__ << "Forwarding from: " << myPort_ << " to: " << myFwdIP_ << ":" << myFwdPort_
159  << "\n\n"
160  << __E__;
161 
162  int myFwdPort;
163  sscanf(myFwdPort_.c_str(), "%d", &myFwdPort);
164 
165  bool enablePrintouts = enablePrintouts_ == "enablePrintouts";
166 
167  int sockfd;
168  int sendSockfd = 0;
169 
170  struct addrinfo hints, *servinfo, *p;
171  int rv;
172  int recvBytes, sentBytes;
173  struct sockaddr_storage their_addr;
174  char buff[MAXBUFLEN * 3];
175  socklen_t addr_len;
176  // char s[INET6_ADDRSTRLEN];
177 
178  memset(&hints, 0, sizeof hints);
179  hints.ai_family = AF_UNSPEC; // set to AF_INET to force IPv4
180  hints.ai_socktype = SOCK_DGRAM;
181  hints.ai_flags = AI_PASSIVE; // use my IP
182 
183  if((rv = getaddrinfo(NULL, myPort_.c_str(), &hints, &servinfo)) != 0)
184  {
185  fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
186  return 1;
187  }
188 
189  // loop through all the results and bind to the first we can
190  for(p = servinfo; p != NULL; p = p->ai_next)
191  {
192  if((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
193  {
194  __COUT__ << "listener: socket...\n\n" << __E__;
195  perror("listener: socket");
196  continue;
197  }
198 
199  if(bind(sockfd, p->ai_addr, p->ai_addrlen) < 0)
200  {
201  close(sockfd);
202  __COUT__ << "listener: bind.\n\n" << __E__;
203  perror("listener: bind");
204  continue;
205  }
206 
207  break;
208  }
209 
210  if(p == NULL)
211  {
212  __COUT__ << "listener: failed to bind socket...\n\n" << __E__;
213  fprintf(stderr, "listener: failed to bind socket\n");
214  return 2;
215  }
216 
217  freeaddrinfo(servinfo);
218 
219  // increase socket buffer size
220  unsigned int socketReceiveBufferSize = 0x1400000;
221  if(setsockopt(sockfd,
222  SOL_SOCKET,
223  SO_RCVBUF,
224  (char*)&socketReceiveBufferSize,
225  sizeof(socketReceiveBufferSize)) < 0)
226  __COUT_ERR__ << "Failed to set socket receive size to 0x" << std::hex
227  << socketReceiveBufferSize << std::dec
228  << ". Attempting to revert to default." << std::endl;
229  else
230  __COUT__ << "set socket receive size to 0x" << std::hex << socketReceiveBufferSize
231  << std::dec << "." << __E__;
232 
233  int socketLength = 0;
234  socklen_t sizeOfSocketLength = sizeof(socketLength);
235  if(getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &socketLength, &sizeOfSocketLength) < 0)
236  __COUT_ERR__ << "Failed to set socket receive size to 0x" << std::hex
237  << socketReceiveBufferSize << std::dec
238  << ". Attempting to revert to default." << std::endl;
239  else
240  __COUT__ << "set socket receive size verified at 0x" << std::hex << socketLength
241  << std::dec << "." << __E__;
242 
246 
247  // hardware "registers"
248  // uint64_t data_gen_cnt = 0;
249  // uint64_t data_gen_rate = 1 << 16;
250  // uint8_t dataEnabled = 0;
251 
252  // const unsigned int RX_ADDR_OFFSET = 2;
253  // const unsigned int RX_DATA_OFFSET = 10;
254  // const unsigned int TX_DATA_OFFSET = 2;
255 
256  // unsigned int packetSz;
257  unsigned int pingCounter = 0;
258 
259  // for timeout/select
260  struct timeval tv;
261  fd_set readfds, masterfds;
262  tv.tv_sec = 0;
263  tv.tv_usec = 500000;
264  FD_ZERO(&masterfds);
265  FD_SET(sockfd, &masterfds);
266 
267  // time_t count = 0;
268 
269  int mf_p, mf_i, mf_j; // for extracting message
270  const int MF_POS_OF_ID = 2;
271  const int MF_POS_OF_TYPE = 5;
272  const int MF_POS_OF_MSG = 11;
273  bool firstPartPresent;
274  unsigned int buffi = 0, buffStarti = 0, packCount = 1;
275  char saveChar = '\0', saveChar2;
276 
277  int mf_labeli, mf_labelsi;
278  unsigned int newSequenceId;
279  unsigned int processId;
280 
281  std::map<unsigned int, unsigned int> sourceLastSequenceID; // map from sourceID to
282  // lastSequenceID to
283  // identify missed messages
284 
285  // this should ip/port of Console xdaq app Receiver port
286  sendSockfd = makeSocket(myFwdIP_.c_str(), myFwdPort, p);
287 
288  while(1)
289  {
290  readfds = masterfds; // copy to reset timeout select
291  select(sockfd + 1, &readfds, NULL, NULL, &tv);
292 
293  if(FD_ISSET(sockfd, &readfds))
294  {
295  pingCounter = 0; // reset ping counter
296 
297  // packet received
298  // cout << "hw: Line " << __LINE__ << ":::" << "Packet Received!" << endl;
299 
300  addr_len = sizeof their_addr;
301  if((recvBytes = recvfrom(sockfd,
302  &buff[buffi],
303  MAXBUFLEN - 1,
304  0,
305  (struct sockaddr*)&their_addr,
306  &addr_len)) == -1)
307  {
308  __COUT__ << "error: recvfrom...\n\n" << __E__;
309  perror("recvfrom");
310  exit(1);
311  }
312 
313  // printf("hw: got packet from %s\n",
314  // inet_ntop(their_addr.ss_family,
315  // get_in_addr((struct sockaddr *)&their_addr),
316  // s, sizeof s));
317  // printf("hw: packet is %d bytes long\n", recvBytes);
318  // printf("packet contents: ");
319  //
320  // for(int i=0;i<recvBytes;++i)
321  // {
322  // if((i-RX_ADDR_OFFSET)%8==0) printf("\n");
323  // printf("%2.2X", (unsigned char)buff[i]);
324  // }
325  // printf("\n");
326 
327  // print message without decoration
328  // find position of message and save to p
329  // by jumping to the correct '|' marker
330  buff[buffi + recvBytes] = '\0'; // make sure it is null terminated
331 
332  // DEBUG -- for identifying strange MessageFacility bug with clipped messages
333  // std::cout << "+" << ((int)strlen(buff)==recvBytes?1:0) << " " << buff <<
334  // __E__; if((int)strlen(buff)!=recvBytes)
335  // {
336  // for(int iii=strlen(buff)-3;iii<recvBytes;++iii)
337  // std::cout << (int)buff[iii] << "-" << (char)buff[iii] << "
338  //"; std::cout << recvBytes << " " << strlen(buff) << __E__;
339  // std::cout << __E__;
340  // }
341 
342  // count markers to find message
343 
344  if(enablePrintouts)
345  std::cout << "|||" << &buff[buffi] << __E__; // show all
346  // e.g. UDPMFMESSAGE7370|01-Jul-2019 11:12:44
347  // CDT|3|correlator2.fnal.gov|131.225.52.45|Info|_TCPConnect|xdaq.exe|7370|Booted|DAQ|TCPConnect.cc|241|Resolving
348  // ip correlator2.fnal.gov
349 
350  if(1)
351  {
352  // get sequence ID
353  mf_labeli = 0;
354  mf_labelsi = 0;
355  for(mf_p = 0, mf_i = 0; mf_i < recvBytes && mf_p < MF_POS_OF_ID; ++mf_i)
356  if(buff[buffi + mf_i] == '|')
357  {
358  ++mf_p; // count markers
359  if(!mf_labeli)
360  mf_labeli = mf_i;
361  }
362  else if(!mf_labeli &&
363  !(buff[buffi + mf_i] >= '0' && buff[buffi + mf_i] <= '9'))
364  mf_labelsi = mf_i;
365  for(mf_j = mf_i; mf_j < recvBytes; ++mf_j)
366  if(buff[buffi + mf_j] == '|')
367  {
368  break;
369  }
370 
371  saveChar2 = buff[buffi + mf_labeli];
372  buff[buffi + mf_labeli] = '\0';
373  if(mf_j < recvBytes)
374  {
375  saveChar = buff[buffi + mf_j];
376  buff[buffi + mf_j] = '\0';
377  }
378  newSequenceId = atoi(&buff[buffi + mf_i]);
379  processId = atoi(&buff[buffi + mf_labelsi + 1]);
380 
381  // std::cout << processId << ": " <<
382  // &buff[buffi + mf_i] << " ==> " << newSequenceId << __E__;
383  // avoid startup sequencing problem with MF
384  if( //(newSequenceId == 1 && sourceLastSequenceID[processId] == 5) &&
386  sourceLastSequenceID.find(processId) !=
387  sourceLastSequenceID.end() && // ensure not first packet received
388  ((newSequenceId == 0 && sourceLastSequenceID[processId] !=
389  (unsigned int)-1) || // wrap around case
390  newSequenceId !=
391  sourceLastSequenceID[processId] + 1)) // normal sequence case
392  {
393  // missed some messages!
394  std::cout << "MFfwd missed " << newSequenceId << " vs "
395  << sourceLastSequenceID[processId] + 1 << " "
396  << (newSequenceId - 1) -
397  (sourceLastSequenceID[processId] + 1) + 1
398  << " packet(s) from " << processId << "!" << __E__;
399  }
400  // std::cout << &buff[buffi + mf_i - 1] << "|" << newSequenceId << " vs "
401  // << sourceLastSequenceID[&buff[buffi + 0]] << "[" << &buff[buffi +
402  // 0]
403  // <<
404  // "]||" << &buff[buffi + mf_j + 1] << __E__; // show all w/sequence ID
405 
406  // save the new last sequence ID
407  sourceLastSequenceID[processId] = newSequenceId;
408 
409  // resolve nullchars
410  if(mf_j < recvBytes)
411  buff[buffi + mf_j] = saveChar;
412  buff[buffi + mf_labeli] = saveChar2;
413  }
414 
415  if(1 || packCount > 10000)
416  {
417  for(mf_p = 0, mf_i = 0; mf_i < recvBytes && mf_p < MF_POS_OF_TYPE; ++mf_i)
418  if(buff[buffi + mf_i] == '|')
419  ++mf_p; // count markers
420 
421  for(mf_j = mf_i; mf_j < recvBytes && mf_p < MF_POS_OF_TYPE + 1; ++mf_j)
422  if(buff[buffi + mf_j] == '|')
423  ++mf_p; // count markers
424 
425  // print first part (message type)
426  if(mf_i < mf_j && mf_j < recvBytes)
427  {
428  saveChar = buff[buffi + mf_j - 1];
429  buff[buffi + mf_j - 1] = '\0';
430  // std::cout << &buff[buffi + mf_i - 1];
431  buff[buffi + mf_j - 1] = saveChar;
432 
433  // // tab for all types but Warning
434  // if(strcmp(&buff[buffi + mf_i - 1], "|Warning") != 0)
435  // std::cout << "\t";
436 
437  firstPartPresent = true;
438  }
439  else
440  firstPartPresent = false;
441 
442  for(mf_i = mf_j; mf_i < recvBytes && mf_p < MF_POS_OF_MSG; ++mf_i)
443  if(buff[buffi + mf_i] == '|')
444  ++mf_p; // count markers
445 
446  if(packCount > 10000)
447  {
448  // print second part
449  if(mf_i < recvBytes) // if valid find, show message
450  std::cout << &buff[buffi + mf_i - 1]
451  << __E__; // show msg after '|'
452  else if(firstPartPresent)
453  std::cout << __E__;
454  }
455  }
456 
457  // forward packet onto sendSockfd
458 
459  // forward when we have a full packet
460  if(buffi + recvBytes - buffStarti > MAXBUFLEN)
461  {
462  __COUT__ << "Pack count ==> " << packCount
463  << " sz=" << buffi - buffStarti - 1 << __E__;
464  // send buffer without closing null character
465  if((sentBytes = sendto(sendSockfd,
466  &buff[buffStarti],
467  buffi - buffStarti - 1,
468  0,
469  p->ai_addr,
470  p->ai_addrlen)) == -1)
471  {
472  __COUT__ << "error: sendto...\n\n" << __E__;
473  perror("hw: sendto");
474  exit(1);
475  }
476  // printf("hw: sent %d bytes on\n", sentBytes);
477 
478  // now setup for next buffer
479  buffStarti = buffi;
480  buffi += recvBytes + 1; // go past null character
481  packCount = 1;
482 
483  // if past point for safe receive of next packet, then lets bail on it now
484  // and send
485  if(buffi > MAXBUFLEN * 2)
486  {
487  __COUT__ << "Wrap Pack count ==> " << packCount
488  << " sz=" << buffi - buffStarti - 1 << __E__;
489  if((sentBytes = sendto(sendSockfd,
490  &buff[buffStarti],
491  buffi - buffStarti - 1,
492  0,
493  p->ai_addr,
494  p->ai_addrlen)) == -1)
495  {
496  __COUT__ << "error: sendto...\n\n" << __E__;
497  perror("hw: sendto");
498  exit(1);
499  }
500  // reset for next buffer
501  buffStarti = 0;
502  buffi = 0;
503  packCount = 1;
504  }
505  } // end handle full packet
506  else // we do not have a full packet, so setup buff for next packet
507  {
508  buffi += recvBytes + 1; // go past null character
509  ++packCount;
510 
511  // bail on decent sized packet, so buffer can return to start efficiently
512  if(buffi > MAXBUFLEN && packCount > 12)
513  {
514  __COUT__ << "WrapAvoid Pack count ==> " << packCount
515  << " sz=" << buffi - buffStarti - 1 << __E__;
516  if((sentBytes = sendto(sendSockfd,
517  &buff[buffStarti],
518  buffi - buffStarti - 1,
519  0,
520  p->ai_addr,
521  p->ai_addrlen)) == -1)
522  {
523  __COUT__ << "error: sendto...\n\n" << __E__;
524  perror("hw: sendto");
525  exit(1);
526  }
527  // reset for next buffer
528  buffStarti = 0;
529  buffi = 0;
530  packCount = 1;
531  }
532  }
533 
534  } // end received packet case
535  else
536  {
537  sleep(1); // one second
538  if(++pingCounter > 2 * 60) // two minutes
539  {
540  // send 1-byte "ping" to keep socket alive
541  if((sentBytes =
542  sendto(sendSockfd, buff, 1, 0, p->ai_addr, p->ai_addrlen)) == -1)
543  {
544  __COUT__ << "error: ping sendto...\n\n" << __E__;
545  perror("hw: ping sendto");
546  exit(1);
547  }
548  pingCounter = 0;
549  }
550  else if(pingCounter > 1 && packCount > 1)
551  {
552  // forward partial packet while idle
553  __COUT__ << "Partial Pack count ==> " << packCount - 1
554  << " sz=" << buffi - buffStarti - 1 << __E__;
555 
556  if((sentBytes = sendto(sendSockfd,
557  &buff[buffStarti],
558  buffi - buffStarti - 1,
559  0,
560  p->ai_addr,
561  p->ai_addrlen)) == -1)
562  {
563  __COUT__ << "error: sendto...\n\n" << __E__;
564  perror("hw: sendto");
565  exit(1);
566  }
567  // reset for next buffer
568  buffStarti = 0;
569  buffi = 0;
570  packCount = 1;
571  }
572  } // end no packet case
573  } // end main loop
574 
575  close(sockfd);
576  close(sendSockfd);
577 
578  __COUT__ << "Exited.\n\n" << __E__;
579 
580  return 0;
581 }