otsdaq_utilities  v2_05_02_indev
MF_ReceiveAndForward.cpp
1 // MF_ReceiveAndForward.cpp
2 // by rrivera at fnal dot gov
3 // created Feb 2016
4 //
5 // This is a simple UDP receive and forward program
6 // for MessageFacility packets.
7 //
8 // It echos packets received and only appends '|' as decoration.
9 //
10 //
11 //
12 //
13 // compile with:
14 // g++ MF_ReceiveAndForward.cpp -o MF_ReceiveAndForward.o
15 //
16 // if developing, consider appending -D_GLIBCXX_DEBUG to get more
17 // descriptive error messages
18 //
19 // run with:
20 //./MF_ReceiveAndForward.o <optional port file name>
21 //
22 //
23 // Port Config File Format:
24 // RECEIVING_PORT <port number>
25 // FORWARDING_PORT <port number>
26 //
27 //
28 
29 #include <arpa/inet.h>
30 #include <errno.h>
31 #include <netdb.h>
32 #include <netinet/in.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <sys/socket.h>
37 #include <sys/types.h>
38 #include <unistd.h>
39 #include <iostream>
40 
41 #define MAXBUFLEN 5000
42 
43 #define __MF_SUBJECT__ "mfReceiveAndForward"
44 #define Q(X) #X
45 #define QUOTE(X) Q(X)
46 
47 // take filename only after srcs/ (this gives by repo name)
48 #define __SHORTFILE__ \
49  (strstr(&__FILE__[0], "/srcs/") ? strstr(&__FILE__[0], "/srcs/") + 6 : __FILE__)
50 
51 // take only file name
52 #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
53 
54 #define __E__ std::endl
55 
56 #define __COUT_HDR_F__ __SHORTFILE__ << "\t"
57 #define __COUT_HDR_L__ "[" << std::dec << __LINE__ << "]\t"
58 #define __COUT_HDR_P__ __PRETTY_FUNCTION__ << "\t"
59 #define __COUT_HDR_FL__ __SHORTFILE__ << " " << __COUT_HDR_L__
60 #define __COUT_HDR_FP__ __SHORTFILE__ << " : " << __COUT_HDR_P__
61 #define __COUT_HDR__ __COUT_HDR_FL__
62 
63 #define __COUT_TYPE__(X) std::cout << QUOTE(X) << ":" << __MF_SUBJECT__ << ":"
64 
65 #define __COUT_ERR__ __COUT_TYPE__(LogError) << __COUT_HDR__
66 #define __COUT_WARN__ __COUT_TYPE__(LogWarning) << __COUT_HDR__
67 #define __COUT_INFO__ __COUT_TYPE__(LogInfo) << __COUT_HDR__
68 #define __COUT__ __COUT_TYPE__(LogDebug) << __COUT_HDR__
69 #define __COUTV__(X) __COUT__ << QUOTE(X) << " = " << X << __E__
70 
71 // get sockaddr, IPv4 or IPv6:
72 void* get_in_addr(struct sockaddr* sa)
73 {
74  if(sa->sa_family == AF_INET)
75  {
76  return &(((struct sockaddr_in*)sa)->sin_addr);
77  }
78 
79  return &(((struct sockaddr_in6*)sa)->sin6_addr);
80 }
81 
82 int makeSocket(const char* ip, int port, struct addrinfo*& p)
83 {
84  int sockfd;
85  struct addrinfo hints, *servinfo;
86  int rv;
87  //int numbytes;
88  //struct sockaddr_storage their_addr;
89  //socklen_t addr_len;
90  //char s[INET6_ADDRSTRLEN];
91 
92  memset(&hints, 0, sizeof hints);
93  hints.ai_family = AF_UNSPEC;
94  hints.ai_socktype = SOCK_DGRAM;
95  char portStr[10];
96  sprintf(portStr, "%d", port);
97  if((rv = getaddrinfo(ip, portStr, &hints, &servinfo)) != 0)
98  {
99  fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
100  return 1;
101  }
102 
103  // loop through all the results and make a socket
104  for(p = servinfo; p != NULL; p = p->ai_next)
105  {
106  if((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
107  {
108  perror("sw: socket");
109  continue;
110  }
111 
112  break;
113  }
114 
115  if(p == NULL)
116  {
117  fprintf(stderr, "sw: failed to create socket\n");
118  return 2;
119  }
120 
121  freeaddrinfo(servinfo);
122 
123  // increase socket buffer size
124  unsigned int socketReceiveBufferSize = 0xA00000;
125  if(setsockopt(sockfd,
126  SOL_SOCKET,
127  SO_RCVBUF,
128  (char*)&socketReceiveBufferSize,
129  sizeof(socketReceiveBufferSize)) < 0)
130  __COUT_ERR__ << "Failed to set socket receive size to 0x" << std::hex
131  << socketReceiveBufferSize << std::dec
132  << ". Attempting to revert to default." << std::endl;
133  else
134  __COUT__ << "set socket receive size to 0x" << std::hex << socketReceiveBufferSize
135  << std::dec << "." << __E__;
136 
137  return sockfd;
138 }
139 
140 int main(int argc, char** argv)
141 {
142  __COUT__ << "Starting...\n\n" << __E__;
143 
144  std::string myPort_("3000"); // set default
145  std::string myFwdPort_("3001"); // set default
146  std::string myFwdIP_("127.0.0.1"); // set default
147  if(argc >= 2)
148  {
149  __COUT__ << "port parameter file:" << argv[1] << "\n\n" << __E__;
150  FILE* fp = fopen(argv[1], "r");
151  if(fp)
152  {
153  char tmp[100];
154  char tmpParamStr[100];
155  fgets(tmp, 100, fp);
156  sscanf(tmp, "%*s %s", tmpParamStr);
157  myPort_ = tmpParamStr;
158  fgets(tmp, 100, fp);
159  sscanf(tmp, "%*s %s", tmpParamStr);
160  myFwdPort_ = tmpParamStr;
161  fgets(tmp, 100, fp);
162  sscanf(tmp, "%*s %s", tmpParamStr);
163  myFwdIP_ = tmpParamStr;
164  fclose(fp);
165  }
166  else // else use defaults
167  __COUT__ << "port parameter file failed to open: " << argv[1] << "\n\n"
168  << __E__;
169  }
170  __COUT__ << "Forwarding from: " << myPort_ << " to: " << myFwdIP_ << ":" << myFwdPort_
171  << "\n\n"
172  << __E__;
173 
174  int myFwdPort;
175  sscanf(myFwdPort_.c_str(), "%d", &myFwdPort);
176 
177  int sockfd;
178  int sendSockfd = 0;
179 
180  struct addrinfo hints, *servinfo, *p;
181  int rv;
182  int numbytes;
183  struct sockaddr_storage their_addr;
184  char buff[MAXBUFLEN];
185  socklen_t addr_len;
186  //char s[INET6_ADDRSTRLEN];
187 
188  memset(&hints, 0, sizeof hints);
189  hints.ai_family = AF_UNSPEC; // set to AF_INET to force IPv4
190  hints.ai_socktype = SOCK_DGRAM;
191  hints.ai_flags = AI_PASSIVE; // use my IP
192 
193  if((rv = getaddrinfo(NULL, myPort_.c_str(), &hints, &servinfo)) != 0)
194  {
195  fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
196  return 1;
197  }
198 
199  // loop through all the results and bind to the first we can
200  for(p = servinfo; p != NULL; p = p->ai_next)
201  {
202  if((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
203  {
204  __COUT__ << "listener: socket...\n\n" << __E__;
205  perror("listener: socket");
206  continue;
207  }
208 
209  if(bind(sockfd, p->ai_addr, p->ai_addrlen) < 0 )
210  {
211  close(sockfd);
212  __COUT__ << "listener: bind.\n\n" << __E__;
213  perror("listener: bind");
214  continue;
215  }
216 
217  break;
218  }
219 
220  if(p == NULL)
221  {
222  __COUT__ << "listener: failed to bind socket...\n\n" << __E__;
223  fprintf(stderr, "listener: failed to bind socket\n");
224  return 2;
225  }
226 
227  freeaddrinfo(servinfo);
228 
232 
233  // hardware "registers"
234  //uint64_t data_gen_cnt = 0;
235  //uint64_t data_gen_rate = 1 << 16;
236  //uint8_t dataEnabled = 0;
237 
238  //const unsigned int RX_ADDR_OFFSET = 2;
239  //const unsigned int RX_DATA_OFFSET = 10;
240  //const unsigned int TX_DATA_OFFSET = 2;
241 
242  //unsigned int packetSz;
243  unsigned int pingCounter = 0;
244 
245  // for timeout/select
246  struct timeval tv;
247  fd_set readfds, masterfds;
248  tv.tv_sec = 0;
249  tv.tv_usec = 500000;
250  FD_ZERO(&masterfds);
251  FD_SET(sockfd, &masterfds);
252 
253  //time_t count = 0;
254 
255  int mf_p, mf_i, mf_j; // for extracting message
256  const int MF_POS_OF_TYPE = 5;
257  const int MF_POS_OF_MSG = 11;
258  bool firstPartPresent;
259 
260  // this should ip/port of Console xdaq app Receiver port
261  sendSockfd = makeSocket(myFwdIP_.c_str(), myFwdPort, p);
262 
263  while(1)
264  {
265  readfds = masterfds; // copy to reset timeout select
266  select(sockfd + 1, &readfds, NULL, NULL, &tv);
267 
268  if(FD_ISSET(sockfd, &readfds))
269  {
270  pingCounter = 0; // reset ping counter
271 
272  // packet received
273  // cout << "hw: Line " << __LINE__ << ":::" << "Packet Received!" << endl;
274 
275  addr_len = sizeof their_addr;
276  if((numbytes = recvfrom(sockfd,
277  buff,
278  MAXBUFLEN - 1,
279  0,
280  (struct sockaddr*)&their_addr,
281  &addr_len)) == -1)
282  {
283  __COUT__ << "error: recvfrom...\n\n" << __E__;
284  perror("recvfrom");
285  exit(1);
286  }
287 
288  // printf("hw: got packet from %s\n",
289  // inet_ntop(their_addr.ss_family,
290  // get_in_addr((struct sockaddr *)&their_addr),
291  // s, sizeof s));
292  // printf("hw: packet is %d bytes long\n", numbytes);
293  // printf("packet contents: ");
294  //
295  // for(int i=0;i<numbytes;++i)
296  // {
297  // if((i-RX_ADDR_OFFSET)%8==0) printf("\n");
298  // printf("%2.2X", (unsigned char)buff[i]);
299  // }
300  // printf("\n");
301 
302  // print message without decoration
303  // find position of message and save to p
304  // by jumping to the correct '|' marker
305  buff[numbytes] = '\0'; // make sure it is null terminated
306 
307  // DEBUG -- for identifying strange MessageFacility bug with clipped messages
308  // std::cout << "+" << ((int)strlen(buff)==numbytes?1:0) << " " << buff <<
309  // __E__; if((int)strlen(buff)!=numbytes)
310  // {
311  // for(int iii=strlen(buff)-3;iii<numbytes;++iii)
312  // std::cout << (int)buff[iii] << "-" << (char)buff[iii] << "
313  //"; std::cout << numbytes << " " << strlen(buff) << __E__;
314  // std::cout << __E__;
315  // }
316 
317  // count markers to find message
318 
319  // std::cout << "|||" << buff << __E__; // show all
320  // e.g. UDPMFMESSAGE7370|01-Jul-2019 11:12:44
321  // CDT|3|correlator2.fnal.gov|131.225.52.45|Info|_TCPConnect|xdaq.exe|7370|Booted|DAQ|TCPConnect.cc|241|Resolving
322  // ip correlator2.fnal.gov
323 
324  for(mf_p = 0, mf_i = 0; mf_i < numbytes && mf_p < MF_POS_OF_TYPE; ++mf_i)
325  if(buff[mf_i] == '|')
326  ++mf_p; // count markers
327 
328  for(mf_j = mf_i; mf_j < numbytes && mf_p < MF_POS_OF_TYPE + 1; ++mf_j)
329  if(buff[mf_j] == '|')
330  ++mf_p; // count markers
331 
332  // print first part (message type)
333  if(mf_i < mf_j && mf_j < numbytes)
334  {
335  char sav = buff[mf_j - 1];
336  buff[mf_j - 1] = '\0';
337  std::cout << &buff[mf_i - 1];
338  buff[mf_j - 1] = sav;
339 
340  // tab for all types but Warning
341  if(strcmp(&buff[mf_i - 1], "|Warning") != 0)
342  std::cout << "\t";
343 
344  firstPartPresent = true;
345  }
346  else
347  firstPartPresent = false;
348 
349  for(mf_i = mf_j; mf_i < numbytes && mf_p < MF_POS_OF_MSG; ++mf_i)
350  if(buff[mf_i] == '|')
351  ++mf_p; // count markers
352 
353  // print second part
354  if(mf_i < numbytes) // if valid find, show message
355  std::cout << &buff[mf_i - 1] << __E__; // show msg after '|'
356  else if(firstPartPresent)
357  std::cout << __E__;
358 
359  // forward packet onto sendSockfd
360 
361  if((numbytes = sendto(
362  sendSockfd, buff, numbytes, 0, p->ai_addr, p->ai_addrlen)) == -1)
363  {
364  __COUT__ << "error: sendto...\n\n" << __E__;
365  perror("hw: sendto");
366  exit(1);
367  }
368  // printf("hw: sent %d bytes on\n", numbytes);
369  }
370  else
371  {
372  sleep(1); // one second
373  if(++pingCounter > 2 * 60) // two minutes
374  {
375  // send 1-byte "ping" to keep socket alive
376  if((numbytes =
377  sendto(sendSockfd, buff, 1, 0, p->ai_addr, p->ai_addrlen)) == -1)
378  {
379  __COUT__ << "error: ping sendto...\n\n" << __E__;
380  perror("hw: ping sendto");
381  exit(1);
382  }
383  pingCounter = 0;
384  }
385  }
386  }
387 
388  close(sockfd);
389  close(sendSockfd);
390 
391  __COUT__ << "Exited.\n\n" << __E__;
392 
393  return 0;
394 }