Libtrap: Internal development docs  1.16.1
ifc_tcpip.c
Go to the documentation of this file.
1 /**
2  * \file ifc_tcpip.c
3  * \brief TRAP TCP/IP interfaces
4  * \author Tomas Cejka <cejkat@cesnet.cz>
5  * \date 2018
6  */
7 /*
8  * Copyright (C) 2013-2018 CESNET
9  *
10  * LICENSE TERMS
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  * 1. Redistributions of source code must retain the above copyright
16  * notice, this list of conditions and the following disclaimer.
17  * 2. Redistributions in binary form must reproduce the above copyright
18  * notice, this list of conditions and the following disclaimer in
19  * the documentation and/or other materials provided with the
20  * distribution.
21  * 3. Neither the name of the Company nor the names of its contributors
22  * may be used to endorse or promote products derived from this
23  * software without specific prior written permission.
24  *
25  * ALTERNATIVELY, provided that this notice is retained in full, this
26  * product may be distributed under the terms of the GNU General Public
27  * License (GPL) version 2 or later, in which case the provisions
28  * of the GPL apply INSTEAD OF those given above.
29  *
30  * This software is provided ``as is'', and any express or implied
31  * warranties, including, but not limited to, the implied warranties of
32  * merchantability and fitness for a particular purpose are disclaimed.
33  * In no event shall the company or contributors be liable for any
34  * direct, indirect, incidental, special, exemplary, or consequential
35  * damages (including, but not limited to, procurement of substitute
36  * goods or services; loss of use, data, or profits; or business
37  * interruption) however caused and on any theory of liability, whether
38  * in contract, strict liability, or tort (including negligence or
39  * otherwise) arising in any way out of the use of this software, even
40  * if advised of the possibility of such damage.
41  *
42  */
43 
44 #define _GNU_SOURCE
45 #include <arpa/inet.h>
46 #include <netdb.h>
47 #include <netinet/in.h>
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <string.h>
51 #include <sys/socket.h>
52 #include <sys/un.h>
53 #include <sys/types.h>
54 #include <sys/stat.h>
55 #include <unistd.h>
56 #include <fcntl.h>
57 #include <inttypes.h>
58 #include <stdio.h>
59 #include <pthread.h>
60 #include <errno.h>
61 #include <semaphore.h>
62 #include <assert.h>
63 #include <poll.h>
64 
65 #include "../include/libtrap/trap.h"
66 #include "trap_internal.h"
67 #include "trap_ifc.h"
68 #include "trap_error.h"
69 #include "ifc_tcpip.h"
70 #include "ifc_tcpip_internal.h"
71 #include "ifc_socket_common.h"
72 
73 /**
74  * \addtogroup trap_ifc TRAP communication module interface
75  * @{
76  */
77 /**
78  * \addtogroup tcpip_ifc TCP/IP and UNIX socket communication interface module
79  * @{
80  */
81 
82 #define MAX_RECOVERY_TRY 10
83 /* must be smaller than 1000000 */
84 #define RECOVERY_WAIT_USEC 500000
85 #define USEC_IN_SEC 1000000
86 #define ACK_MESS_SIZE 1
87 #define CRIT_1VS2SEND 10000
88 #ifndef MAX
89 #define MAX(a,b) ((a)<(b)?(b):(a))
90 #endif
91 #ifndef MIN
92 #define MIN(a,b) ((a)>(b)?(b):(a))
93 #endif
94 
95 /***** TCPIP server *****/
96 
97 /**
98  * Internal union for host address storage, common for tcpip & unix
99  */
101  struct addrinfo tcpip_addr; ///< used for TCPIP socket
102  struct sockaddr_un unix_addr; ///< used for path of UNIX socket
103 };
104 
105 /**
106  * Unix sockets for service IFC and UNIX IFC have default path format defined by UNIX_PATH_FILENAME_FORMAT
107  */
109 
110 static int client_socket_connect(void *priv, const char *dest_addr, const char *dest_port, int *socket_descriptor, struct timeval *tv);
111 static void client_socket_disconnect(void *priv);
112 static int server_socket_open(void *priv);
113 
114 /**
115  * \brief Get sockaddr, IPv4 or IPv6
116  * \param[in] sa structure with input socket address
117  * \return converted ponter to address
118  */
119 static void *get_in_addr(struct sockaddr *sa)
120 {
121  if (sa->sa_family == AF_INET) {
122  return &(((struct sockaddr_in*)sa)->sin_addr);
123  }
124 
125  return &(((struct sockaddr_in6*)sa)->sin6_addr);
126 }
127 
128 /**
129  * \brief Check if the given port is a correct port number.
130  *
131  * Port number for TCP socket must be a number in the range from 1 to 65535.
132  * It can also be a service name that is translated by getaddrinfo().
133  *
134  * \param[in] port Port to check.
135  * \return EXIT_FAILURE if port is not given or it is a number < 1 or > 65535;
136  * EXIT_SUCCESS when port is a valid number or it is a service name.
137  */
138 static int check_portrange(const char *port)
139 {
140  uint32_t portnum = 0;
141  int ret;
142 
143  if (port == NULL) {
144  return EXIT_FAILURE;
145  }
146 
147  ret = sscanf(port, "%" SCNu32, &portnum);
148  if (ret == 1) {
149  if (portnum < 1 || portnum > 65535) {
150  VERBOSE(CL_ERROR, "Given port (%" PRIu32 ") number is out of the allowed range (1-65535).", portnum);
151  return EXIT_FAILURE;
152  }
153  }
154 
155  /* port is not number (it is a service name) or it is correct */
156  return EXIT_SUCCESS;
157 }
158 
159 /**
160  * \addtogroup tcpip_receiver
161  * @{
162  */
163 /* Receiver (client socket) */
164 // Receiver is a client that connects itself to the source of data (to sender) = server
165 
166 /**
167  * Receive chunk of data.
168  *
169  * Caller is responsible for checking elapsed time, since this function
170  * may finished before the given timeout without having data.
171  *
172  * \param[in] priv private IFC data
173  * \param[out] data received data
174  * \param[in,out] size expected size to wait for, it is used to return size that was not read
175  * \param[in] tm timeout
176  */
177 static int receive_part(void *priv, void **data, uint32_t *size, struct timeval *tm)
178 {
179  void *data_p = (*data);
181  ssize_t numbytes = *size;
182  int recvb, retval;
183  struct pollfd pfds;
184  struct timespec ts, *tempts = NULL;
185  if (tm != NULL) {
186  ts.tv_sec = tm->tv_sec;
187  ts.tv_nsec = tm->tv_usec * 1000l;
188  tempts = &ts;
189  }
190 
191  assert(data_p != NULL);
192 
193  while (config->is_terminated == 0) {
194  DEBUG_IFC(if (tm) {VERBOSE(CL_VERBOSE_LIBRARY, "Try to receive data in timeout %" PRIu64
195  "s%"PRIu64"us", tm->tv_sec, tm->tv_usec)});
196 
197  /*
198  * Blocking or with timeout?
199  * With timeout 0,0 - non-blocking
200  */
201  pfds = (struct pollfd) {.fd = config->sd, .events = POLLIN};
202  retval = ppoll(&pfds, 1, tempts, NULL);
203  if (retval > 0 && pfds.revents & POLLIN) {
204  do {
205  recvb = recv(config->sd, data_p, numbytes, 0);
206  if (recvb < 1) {
207  if (recvb == 0) {
208  errno = EPIPE;
209  }
210  switch (errno) {
211  case EINTR:
212  if (config->is_terminated == 1) {
214  return TRAP_E_TERMINATED;
215  }
216  break;
217  case ECONNRESET:
218  case EBADF:
219  case EPIPE:
221  return TRAP_E_IO_ERROR;
222  case EAGAIN:
223  /* This should never happen with blocking socket. */
224  (*size) = numbytes;
225  (*data) = data_p;
226  return TRAP_E_TIMEOUT;
227  }
228  }
229  numbytes -= recvb;
230  data_p += recvb;
231  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "receive_part got %" PRId32 "B", recvb));
232  } while (numbytes > 0);
233  (*size) = numbytes;
234  (*data) = data_p;
235  return TRAP_E_OK;
236  } else if ((retval == 0) || (retval < 0 && errno == EINTR)) {
237  /* Timeout expired or signal received. Caller of this function
238  * has to decide to call this function again or not according
239  * to elapsed time from the calling. */
240  (*size) = numbytes;
241  return TRAP_E_TIMEOUT;
242  } else { // some error has occured
243  VERBOSE(CL_VERBOSE_OFF, "ppoll() returned %i (%s)", retval, strerror(errno));
245  return TRAP_E_IO_ERROR;
246  }
247  }
248  return TRAP_E_TERMINATED;
249 }
250 
251 /**
252  * Return current time in microseconds.
253  *
254  * This is used to get current timestamp in tcpip_receiver_recv() and tcpip_sender_send().
255  *
256  * \return current timestamp
257  */
258 static inline uint64_t get_cur_timestamp()
259 {
260  struct timespec spec_time;
261 
262  clock_gettime(CLOCK_MONOTONIC, &spec_time);
263  /* time in microseconds seconds -> secs * microsends + nanoseconds */
264  return spec_time.tv_sec * 1000000 + (spec_time.tv_nsec / 1000);
265 }
266 
267 /**
268  * \brief Receive data from interface.
269  *
270  * It is expected that data is always the same pointer because it is buffer given by trap.c.
271  *
272  * This function contains finite state machine that controls receiving messages (header
273  * and payload), handles timeouts and sleep (to offload CPU during waiting for connection).
274  * The transition graph is:
275  * \dot
276  * digraph fsm { label="tcpip_receiver_recv()";labelloc=t;
277  * init -> conn_wait;
278  * init -> head_wait;
279  * init -> mess_wait;
280  * discard -> reset;
281  * reset -> init;
282  * reset -> init;
283  * reset -> reset;
284  * reset -> init;
285  * conn_wait -> reset;
286  * conn_wait -> head_wait;
287  * head_wait -> discard;
288  * head_wait -> reset;
289  * head_wait -> reset;
290  * head_wait -> mess_wait;
291  * mess_wait -> discard;
292  * mess_wait -> reset;
293  * }
294  * \enddot
295  *
296  * \param [in,out] priv private configuration structure
297  * \param [out] data where received data are stored
298  * \param [out] size size of received data
299  * \param [in] timeout timeout in usec, can be TRAP_WAIT, TRAP_HALFWAIT, or TRAP_NO_WAIT
300  * \return TRAP_E_OK (0) on success
301  */
302 int tcpip_receiver_recv(void *priv, void *data, uint32_t *size, int timeout)
303 {
304 #ifdef LIMITED_RECOVERY
305  uint32_t recovery = 0;
306 #endif
307  /** messageframe contains header that is read (even partially) in HEAD_WAIT */
308  trap_buffer_header_t messageframe;
310  void *p = &messageframe;
311  struct timeval tm, *temptm;
312  int retval;
313  uint64_t entry_time = get_cur_timestamp();
314  uint64_t curr_time = 0;
315 
316  /* sleeptime (in usec) with sleeptimespec are used to wait
317  * for a while before next connecting when non-blocking. */
318  uint64_t sleeptime;
319  struct timespec sleeptimespec;
320 
321  /* correct module will pass only possitive timeout or TRAP_WAIT.
322  * TRAP_HALFWAIT is not valid value */
323  assert(timeout > TRAP_HALFWAIT);
324 
325  if ((config == NULL) || (data == NULL) || (size == NULL)) {
326  return TRAP_E_BAD_FPARAMS;
327  }
328  (*size) = 0;
329 
330  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv trap_recv() was called"));
331 
332  /* convert libtrap timeout into timespec and timeval */
333  trap_set_timeouts(timeout, &tm, NULL);
334  temptm = (timeout == TRAP_WAIT) ? NULL : &tm;
335 
336  while (config->is_terminated == 0) {
337 init:
338  p = &messageframe;
339  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv INIT"));
340  if (config->connected == 0) {
341  goto conn_wait;
342  } else {
343  if (config->data_pointer == NULL) {
344  goto head_wait;
345  } else {
346  /* continue where we timedout earlier */
347  p = config->data_pointer;
348  goto mess_wait;
349  }
350  }
351 discard:
352  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv DISCARD"));
353  config->data_pointer = NULL;
354  goto reset;
355 reset:
356  if (config->is_terminated != 0) {
357  /* TRAP_E_TERMINATED is returned outside the loop */
358  break;
359  }
360  /* failure, next state is exit when we are non-blocking or INIT on blocking,
361  this state is a great place for handling timeouts. */
362  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv RESET"));
363  if (timeout == TRAP_WAIT) {
364 #ifdef LIMITED_RECOVERY
365  if (++recovery > MAX_RECOVERY_TRY) {
366  goto init;
367  } else {
368  return TRAP_E_TIMEOUT;
369  }
370 #else
371  goto init;
372 #endif
373  } else {
374  /* non-blocking mode, let's check elapsed time */
375  curr_time = get_cur_timestamp();
376  if ((curr_time - entry_time) >= timeout) {
377  return TRAP_E_TIMEOUT;
378  } else {
379  if (config->connected == 0) {
380  /* wait at most 1 second before return to INIT */
381 
382  /* sleeptime is in usec */
383  sleeptime = timeout - (curr_time - entry_time);
384  /* if remaining sleeptime is higher than 1s, use 1s */
385  if (sleeptime < 1000000) {
386  sleeptimespec.tv_sec = sleeptime / 1000000;
387  sleeptimespec.tv_nsec = (sleeptime % 1000000) * 1000;
388  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "sleep time set %" PRIu64
389  " us: %"PRIu64"s%"PRIu64"ns", sleeptime,
390  sleeptimespec.tv_sec, sleeptimespec.tv_nsec));
391  } else {
392  sleeptimespec.tv_sec = 1;
393  sleeptimespec.tv_nsec = 0;
394  }
395  /* We are not interested in reminder, because timeout will be
396  * checked again later. */
397  if (nanosleep(&sleeptimespec, NULL) == -1) {
398  if (errno == EINTR) {
399  goto reset;
400  } else {
401  VERBOSE(CL_ERROR, "recv nanosleep(): %s", strerror(errno));
402  }
403  }
404  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv nanosleep finished"));
405  }
406 
407  /* update timeout that is used for recv after successful connection */
408  curr_time = get_cur_timestamp();
409  sleeptime = timeout - (int) (curr_time - entry_time);
410  if ((int) sleeptime > 0) {
411  trap_set_timeouts(sleeptime, &tm, NULL);
412  } else {
413  return TRAP_E_TIMEOUT;
414  }
415 
416  goto init;
417  }
418  }
419 conn_wait:
420  /* check if connected -> try to connect -> check if connected; next state is RESET or HEAD_WAIT */
421  /* expected next state is waiting for header */
422  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv CONN_WAIT"));
423  if (config->connected == 0) {
424  /* we don't have connection, we must try to connect before accepting header */
425  retval = client_socket_connect(priv, config->dest_addr, config->dest_port, &config->sd, temptm);
426  if (retval == TRAP_E_FIELDS_MISMATCH) {
427  config->connected = 1;
428  return TRAP_E_FORMAT_MISMATCH;
429  } else if (retval == TRAP_E_NEGOTIATION_FAILED) {
430  config->connected = 1;
432  } else if (retval == TRAP_E_OK) {
433  config->connected = 1;
434  /* ok, wait for header as we planned */
435  } else {
436  /* failed, reseting... */
437  if (timeout == TRAP_WAIT) {
438  /* Create a delay when blocking...
439  * This is specific situation, many attempts would be unpleasant */
440  sleep(1);
441  }
442  goto reset;
443  }
444  }
445  goto head_wait;
446 head_wait:
447  /* get and check header of message, next state can be MESS_WAIT or RESET */
448  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv HEAD_WAIT (%p)", p));
449  config->data_wait_size = sizeof(trap_buffer_header_t);
450  retval = receive_part(config, &p, &config->data_wait_size, temptm);
451  if (retval != TRAP_E_OK) {
452  /* receiving failed */
453  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv failed HEAD (%p) waiting %d B", p, config->data_wait_size));
454  if (retval == TRAP_E_IO_ERROR) {
455  /* disconnected -> drop data */
456  goto discard;
457  }
458 
459  goto reset;
460  } else {
461  /* we expect to receive data */
462  messageframe.data_length = ntohl(messageframe.data_length);
463  config->data_wait_size = messageframe.data_length;
464  config->ext_buffer_size = messageframe.data_length;
465 #ifdef ENABLE_CHECK_HEADER
466  /* check if header is ok: */
467  if (tcpip_check_header(&messageframe) == 0) {
468  goto reset;
469  }
470 #endif
471  /* we got header, now we can start receiving payload */
472  p = data;
473  config->ext_buffer = data;
474  goto mess_wait;
475  }
476 mess_wait:
477  /* get and check payload of message, next state can be RESET or success exit */
478  /* receive payload */
479  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv waiting MESS (%p) %d B", p, config->data_wait_size));
480  retval = receive_part(config, &p, &config->data_wait_size, temptm);
481  if (retval == TRAP_E_OK) {
482  /* Success! Data was already set by recv */
483  config->data_pointer = NULL;
484  (*size) = messageframe.data_length;
485  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv get MESS (%p) remains: %d B", p, config->data_wait_size));
486  return TRAP_E_OK;
487  } else {
488  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv get MESS (%p) still waiting for %d B", p, config->data_wait_size));
489  if (retval == TRAP_E_IO_ERROR) {
490  /* disconnected -> drop data */
491  goto discard;
492  }
493  config->data_pointer = p;
494  goto reset;
495  }
496  }
497  return TRAP_E_TERMINATED;
498 }
499 
500 /**
501  * \brief Set interface state as terminated.
502  * \param[in] priv pointer to module private data
503  */
504 void tcpip_receiver_terminate(void *priv)
505 {
507  if (config != NULL) {
508  config->is_terminated = 1;
509  } else {
510  VERBOSE(CL_ERROR, "Bad parameter of tcpip_receiver_terminate()!");
511  }
512  return;
513 }
514 
515 
516 /**
517  * \brief Destructor of TCPIP receiver (input ifc)
518  * \param[in] priv pointer to module private data
519  */
520 void tcpip_receiver_destroy(void *priv)
521 {
522 #define X(p) if (p != NULL) { \
523 free(p); \
524 p = NULL; \
525 }
527  if (config != NULL) {
528  if (config->connected == 1) {
529  close(config->sd);
530  }
531  X(config->dest_addr);
532  X(config->dest_port);
533  X(config);
534  } else {
535  VERBOSE(CL_ERROR, "Destroying IFC that is probably not initialized.");
536  }
537  return;
538 #undef X
539 }
540 
541 static void tcpip_receiver_create_dump(void *priv, uint32_t idx, const char *path)
542 {
544  /* return value */
545  int r;
546  /* config file trap-i<number>-config.txt */
547  char *conf_file = NULL;
548  /* config file trap-i<number>-buffer.dat */
549  char *buf_file = NULL;
550  FILE *f = NULL;
551  trap_buffer_header_t aux = { 0 };
552  aux.data_length = htonl(c->ext_buffer_size);
553 
554  r = asprintf(&conf_file, "%s/trap-i%02"PRIu32"-config.txt", path, idx);
555  if (r == -1) {
556  VERBOSE(CL_ERROR, "Not enough memory, dump failed. (%s:%d)", __FILE__, __LINE__);
557  conf_file = NULL;
558  goto exit;
559  }
560  f = fopen(conf_file, "w");
561  fprintf(f, "Dest addr: %s\nDest port: %s\nConnected: %d\n"
562  "Terminated: %d\nSocket descriptor: %d\nSocket type: %d\n"
563  "Data pointer: %p\nData wait size: %"PRIu32"\nMessage header: %"PRIu32"\n"
564  "Extern buffer pointer: %p\nExtern buffer data size: %"PRIu32"\n"
565  "Timeout: %"PRId32"us (%s)\n",
566  c->dest_addr, c->dest_port, c->connected, c->is_terminated, c->sd, c->socket_type,
569  c->ctx->in_ifc_list[idx].datatimeout,
571  fclose(f);
572  f = NULL;
573 
574  r = asprintf(&buf_file, "%s/trap-i%02"PRIu32"-buffer.dat", path, idx);
575  if (r == -1) {
576  buf_file = NULL;
577  VERBOSE(CL_ERROR, "Not enough memory, dump failed. (%s:%d)", __FILE__, __LINE__);
578  goto exit;
579  }
580  f = fopen(buf_file, "w");
581  if (fwrite(&aux, sizeof(c->ext_buffer_size), 1, f) != 1) {
582  VERBOSE(CL_ERROR, "Writing buffer header failed. (%s:%d)", __FILE__, __LINE__);
583  goto exit;
584  }
585  if (fwrite(c->ext_buffer, c->ext_buffer_size, 1, f) != 1) {
586  VERBOSE(CL_ERROR, "Writing buffer content failed. (%s:%d)", __FILE__, __LINE__);
587  goto exit;
588  }
589 exit:
590  if (f != NULL) {
591  fclose(f);
592  }
593  free(conf_file);
594  free(buf_file);
595  return;
596 }
597 
598 char *tcpip_recv_ifc_get_id(void *priv)
599 {
600  if (priv == NULL) {
601  return NULL;
602  }
603 
605  if (config->dest_port == NULL) {
606  return NULL;
607  }
608  return config->dest_port;
609 }
610 
611 uint8_t tcpip_recv_ifc_is_conn(void *priv)
612 {
613  if (priv == NULL) {
614  return 0;
615  }
617  if (config->connected == 1) {
618  return 1;
619  }
620  return 0;
621 }
622 
623 /**
624  * \brief Constructor of input TCP/IP IFC module.
625  * This function is called by TRAP library to initialize one input interface.
626  *
627  * \param[in,out] ctx Pointer to the private libtrap context data (trap_ctx_init()).
628  * \param[in] params Configuration string containing space separated values of these parameters (in this exact order): *dest_addr* *dest_port*,
629  * where dest_addr is destination address of output TCP/IP IFC module and
630  * dest_port is the port where sender is listening.
631  * \param[in,out] ifc IFC interface used for calling TCP/IP module.
632  * \param[in] idx Index of IFC that is created.
633  * \param [in] type Select the type of socket (see #tcpip_ifc_sockettype for options).
634  * \return 0 on success (TRAP_E_OK)
635  */
636 int create_tcpip_receiver_ifc(trap_ctx_priv_t *ctx, char *params, trap_input_ifc_t *ifc, uint32_t idx, enum tcpip_ifc_sockettype type)
637 {
638 #define X(pointer) free(pointer); \
639  pointer = NULL;
640 
641  int result = TRAP_E_OK;
642  char *param_iterator = NULL;
643  char *dest_addr = NULL;
644  char *dest_port = NULL;
645  tcpip_receiver_private_t *config = NULL;
646 
647  if (params == NULL) {
648  VERBOSE(CL_ERROR, "No parameters found for input IFC.");
649  return TRAP_E_BADPARAMS;
650  }
651 
652  config = (tcpip_receiver_private_t *) calloc(1, sizeof(tcpip_receiver_private_t));
653  if (config == NULL) {
654  VERBOSE(CL_ERROR, "Failed to allocate internal memory for input IFC.");
655  return TRAP_E_MEMORY;
656  }
657  config->ctx = ctx;
658  config->is_terminated = 0;
659  config->socket_type = type;
660  config->ifc_idx = idx;
661 
662  /* Parsing params */
663  param_iterator = trap_get_param_by_delimiter(params, &dest_addr, TRAP_IFC_PARAM_DELIMITER);
664  if (param_iterator == NULL) {
665  /* error! we expect 2 parameters */
666  if ((dest_addr == NULL) || (strlen(dest_addr) == 0)) {
667  VERBOSE(CL_ERROR, "Missing 'destination address' for TCPIP IFC.");
668  result = TRAP_E_BADPARAMS;
669  goto failsafe_cleanup;
670  }
671  }
672  param_iterator = trap_get_param_by_delimiter(param_iterator, &dest_port, TRAP_IFC_PARAM_DELIMITER);
673  if ((dest_port == NULL) || (strlen(dest_port) == 0)) {
674  /* if 2nd param is missing, use localhost as addr and 1st param as "port" */
675  free(dest_port);
676  dest_port = dest_addr;
677  dest_addr = strdup("localhost");
678  VERBOSE(CL_VERBOSE_BASIC, "Using the only parameter as 'destination port' and \"localhost\" as 'destination address' for TCPIP IFC.");
679  }
680 
681  /* set global buffer size */
683  /* Parsing params ended */
684 
685  config->dest_addr = dest_addr;
686  config->dest_port = dest_port;
687 
688  if ((config->dest_addr == NULL) || (config->dest_port == NULL)) {
689  /* no delimiter found even if we expect two parameters */
690  VERBOSE(CL_ERROR, "Malformed params for input IFC, missing destination address and port.");
691  result = TRAP_E_BADPARAMS;
692  goto failsafe_cleanup;
693  }
694 
695  VERBOSE(CL_VERBOSE_ADVANCED, "config:\ndest_addr=\"%s\"\ndest_port=\"%s\"\n"
696  "TDU size: %u\n", config->dest_addr, config->dest_port,
697  config->int_mess_header.data_length);
698 
699  /*
700  * In constructor, we do not know timeout yet.
701  * Use 5 seconds to wait for connection to output interface.
702  */
703 #ifndef ENABLE_NEGOTIATION
704  int retval = 0;
705  struct timeval tv = {5, 0};
706  retval = client_socket_connect((void *) config, config->dest_addr, config->dest_port, &config->sd, &tv);
707  if (retval != TRAP_E_OK) {
708  config->connected = 0;
709  if ((retval == TRAP_E_BAD_FPARAMS) || (retval == TRAP_E_IO_ERROR)) {
710  VERBOSE(CL_VERBOSE_BASIC, "Could not connect to sender due to bad parameters.");
711  result = TRAP_E_BADPARAMS;
712  goto failsafe_cleanup;
713  }
714  } else {
715  config->connected = 1;
716  }
717 #endif
718 
719  /* hook functions and store priv */
720  ifc->recv = tcpip_receiver_recv;
724  ifc->priv = config;
727 
728 #ifndef ENABLE_NEGOTIATION
729  if (config->connected == 0) {
730  VERBOSE(CL_VERBOSE_BASIC, "Could not connect to sender.");
731  if ((retval == TRAP_E_BAD_FPARAMS) || (retval == TRAP_E_IO_ERROR)) {
732  result = retval;
733  goto failsafe_cleanup;
734  }
735  }
736 #endif
737  return TRAP_E_OK;
738 failsafe_cleanup:
739  X(dest_addr);
740  X(dest_port);
741  X(config);
742  return result;
743 #undef X
744 }
745 
746 /**
747  * Disconnect from output IFC.
748  *
749  * \param[in,out] priv pointer to private structure of input IFC (client)
750  */
751 static void client_socket_disconnect(void *priv)
752 {
754  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv Disconnected."));
755  if (config->connected == 1) {
756  VERBOSE(CL_VERBOSE_BASIC, "TCPIP ifc client disconnecting");
757  close(config->sd);
758  config->connected = 0;
759  }
760 }
761 
762 /**
763  * Function waits for non-blocking connect().
764  *
765  * \param[in] sock socket descriptor of client
766  * \param[in] tv timeout
767  * \return TRAP_E_OK on success, TRAP_E_TIMEOUT on error (can be caused by interrupt)
768  */
769 static int wait_for_connection(int sock, struct timeval *tv)
770 {
771  int rv;
772  struct pollfd pfds = {.fd = sock, .events = POLLOUT};
773  struct timespec ts, *tempts = NULL;
774  if (tv != NULL) {
775  ts.tv_sec = tv->tv_sec;
776  ts.tv_nsec = tv->tv_usec * 1000l;
777  tempts = &ts;
778  }
779  VERBOSE(CL_VERBOSE_LIBRARY, "wait for connection");
780  rv = ppoll(&pfds, 1, tempts, NULL);
781  if (rv == 1 && pfds.revents & POLLOUT) {
782  int so_error;
783  socklen_t len = sizeof so_error;
784 
785  getsockopt(sock, SOL_SOCKET, SO_ERROR, &so_error, &len);
786 
787  if (so_error == 0) {
788  return TRAP_E_OK;
789  }
790  }
791  return TRAP_E_TIMEOUT;
792 }
793 
794 
795 /**
796  * \brief client_socket is used as a receiver
797  * \param[in] priv pointer to module private data
798  * \param[in] dest_addr destination address where to connect and where receive
799  * \param[in] dest_port destination port where to connect and where receive
800  * \param[out] socket_descriptor socket descriptor of established connection
801  * \param[in] tv timeout
802  * \return TRAP_E_OK on success
803  */
804 static int client_socket_connect(void *priv, const char *dest_addr, const char *dest_port, int *socket_descriptor, struct timeval *tv)
805 {
807  int sockfd = -1, options;
808  union tcpip_socket_addr addr;
809  struct addrinfo *servinfo, *p = NULL;
810  int rv = 0, addr_count = 0;
811  char s[INET6_ADDRSTRLEN];
812 
813  if ((config == NULL) || (dest_addr == NULL) || (dest_port == NULL) || (socket_descriptor == NULL)) {
814  return TRAP_E_BAD_FPARAMS;
815  }
816  if (check_portrange(dest_port) == EXIT_FAILURE) {
817  return TRAP_E_BADPARAMS;
818  }
819 
820  memset(&addr, 0, sizeof(addr));
821 
822  if (config->socket_type == TRAP_IFC_TCPIP) {
823  addr.tcpip_addr.ai_family = AF_UNSPEC;
824  addr.tcpip_addr.ai_socktype = SOCK_STREAM;
825 
826  if ((rv = getaddrinfo(dest_addr, dest_port, &addr.tcpip_addr, &servinfo)) != 0) {
827  VERBOSE(CL_ERROR, "getaddrinfo: %s", gai_strerror(rv));
828  return TRAP_E_IO_ERROR;
829  }
830 
831  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv Try to connect"));
832 
833  if (tv != NULL) {
834  /* compute uniform intervals for all possible address */
835  for (p = servinfo; p != NULL; p = p->ai_next) {
836  addr_count++;
837  }
838  tv->tv_sec = (tv->tv_sec * 1000000 + tv->tv_usec) / addr_count;
839  tv->tv_usec = tv->tv_sec % 1000000;
840  tv->tv_sec /= 1000000;
841  VERBOSE(CL_VERBOSE_LIBRARY, "Every address will be tried for timeout: %"PRId64"s%"PRId64"us",
842  tv->tv_sec, tv->tv_usec);
843  }
844 
845  // loop through all the results and connect to the first we can
846  for (p = servinfo; p != NULL; p = p->ai_next) {
847  if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
848  continue;
849  }
850 
851  /* Change the socket to be non-blocking if required by user. */
852  if (tv != NULL) {
853  if ((options = fcntl(sockfd, F_GETFL)) != -1) {
854  if (fcntl(sockfd, F_SETFL, O_NONBLOCK | options) == -1) {
855  VERBOSE(CL_ERROR, "Could not set socket to non-blocking.");
856  }
857  }
858  }
859 
860  if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
861  if (errno != EINPROGRESS && errno != EAGAIN) {
862  DEBUG_IFC(VERBOSE(CL_VERBOSE_LIBRARY, "recv TCPIP ifc connect error %d (%s)", errno,
863  strerror(errno)));
864  close(sockfd);
865  continue;
866  } else {
867  rv = wait_for_connection(sockfd, tv);
868  if (rv == TRAP_E_TIMEOUT) {
869  close(sockfd);
870  if (config->is_terminated) {
871  rv = TRAP_E_TERMINATED;
872  break;
873  }
874  /* try another address */
875  continue;
876  } else {
877  /* success */
878  rv = TRAP_E_OK;
879  break;
880  }
881  }
882  }
883  break;
884  }
885 
886  if (p != NULL) {
887  if (inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), s, sizeof s) != NULL) {
888  VERBOSE(CL_VERBOSE_LIBRARY, "recv client: connected to %s", s);
889  }
890  }
891  freeaddrinfo(servinfo); // all done with this structure
892  } else if (config->socket_type == TRAP_IFC_TCPIP_UNIX) {
893  /* UNIX socket */
894  addr.unix_addr.sun_family = AF_UNIX;
895  snprintf(addr.unix_addr.sun_path, sizeof(addr.unix_addr.sun_path) - 1, trap_default_socket_path_format, dest_port);
896  sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
897  if (sockfd != -1) {
898  if (connect(sockfd, (struct sockaddr *) &addr.unix_addr, sizeof(addr.unix_addr)) < 0) {
899  VERBOSE(CL_VERBOSE_LIBRARY, "recv UNIX domain socket connect error %d (%s)", errno, strerror(errno));
900  close(sockfd);
901  } else {
902  p = (struct addrinfo *) &addr.unix_addr;
903  }
904  rv = TRAP_E_OK;
905  } else {
906  rv = TRAP_E_IO_ERROR;
907  }
908  }
909 
910  if (p == NULL) {
911  VERBOSE(CL_VERBOSE_LIBRARY, "recv client: Connection failed.");
912  rv = TRAP_E_TIMEOUT;
913  }
914 
915  if (rv != TRAP_E_OK) { /*something went wrong while setting up connection */
916  if (sockfd >= 0) {
917  close(sockfd);
918  }
919  return rv;
920  }
921 
922  *socket_descriptor = sockfd;
923 
924 
925  /** Input interface negotiation */
926 #ifdef ENABLE_NEGOTIATION
928  case NEG_RES_FMT_UNKNOWN:
929  VERBOSE(CL_VERBOSE_LIBRARY, "Input_ifc_negotiation result: failed (unknown data format of the output interface).");
930  close(sockfd);
931  return TRAP_E_TIMEOUT;
932 
933  case NEG_RES_CONT:
934  VERBOSE(CL_VERBOSE_LIBRARY, "Input_ifc_negotiation result: success.");
935  return TRAP_E_OK;
936 
937  case NEG_RES_FMT_CHANGED: // used on format change with JSON
938  VERBOSE(CL_VERBOSE_LIBRARY, "Input_ifc_negotiation result: success (format has changed; it was not first negotiation).");
939  return TRAP_E_OK;
940 
941  case NEG_RES_RECEIVER_FMT_SUBSET: // used on format change with UniRec
942  VERBOSE(CL_VERBOSE_LIBRARY, "Input_ifc_negotiation result: success (required set of fields of the input interface is subset of the recevied format).");
943  return TRAP_E_OK;
944 
945  case NEG_RES_SENDER_FMT_SUBSET: // used on format change with UniRec
946  VERBOSE(CL_VERBOSE_LIBRARY, "Input_ifc_negotiation result: success (new recevied format specifier is subset of the old one; it was not first negotiation).");
947  return TRAP_E_OK;
948 
949  case NEG_RES_FAILED:
950  VERBOSE(CL_VERBOSE_LIBRARY, "Input_ifc_negotiation result: failed (error while receiving hello message from output interface).");
952 
954  VERBOSE(CL_VERBOSE_LIBRARY, "Input_ifc_negotiation result: failed (data type or data format specifier mismatch).");
955  return TRAP_E_FIELDS_MISMATCH;
956 
957  default:
958  VERBOSE(CL_VERBOSE_LIBRARY, "Input_ifc_negotiation result: default case");
959  break;
960  }
961 #endif
962 
963 
964  return rv;
965 }
966 
967 /**
968  * @}
969  *//* tcpip_receiver */
970 
971 /**
972  * \addtogroup tcpip_sender
973  * @{
974  */
975 
976 /**
977  * \brief This function is called when a client was/is being disconnected.
978  *
979  * \param[in] priv Pointer to interface's private data structure.
980  * \param[in] cl_id Index of the client in 'clients' array.
981  */
982 static inline void disconnect_client(tcpip_sender_private_t *priv, int cl_id)
983 {
984  int i;
985  client_t *c = &priv->clients[cl_id];
986 
987  for (i = 0; i < priv->buffer_count; ++i) {
988  del_index(&priv->buffers[i].clients_bit_arr, cl_id);
989  if (priv->buffers[i].clients_bit_arr == 0) {
990  pthread_cond_broadcast(&priv->cond_full_buffer);
991  }
992  }
993  del_index(&priv->clients_bit_arr, cl_id);
994  __sync_sub_and_fetch(&priv->connected_clients, 1);
995 
996  shutdown(c->sd, SHUT_RDWR);
997  close(c->sd);
998  c->sd = -1;
999  c->pfds_index = -1;
1000  c->pending_bytes = 0;
1001  c->sending_pointer = NULL;
1002 }
1003 
1004 /**
1005  * \brief Function disconnects all clients of the output interface whose private structure is passed via "priv" parameter.
1006  *
1007  * \param[in] priv Pointer to output interface private structure.
1008  */
1010 {
1011  uint32_t i;
1013 
1014  for (i = 0; i<c->clients_arr_size; i++) {
1015  if (c->clients[i].sd > 0) {
1016  disconnect_client(priv, i);
1017  }
1018  }
1019 }
1020 
1021 /**
1022  * \brief This function runs in a separate thread and handles new client's connection requests.
1023  *
1024  * \param[in] arg Pointer to interface's private data structure.
1025  */
1026 static void *accept_clients_thread(void *arg)
1027 {
1028  char remoteIP[INET6_ADDRSTRLEN];
1029  struct sockaddr_storage remoteaddr; // client address
1030  struct client_s *cl;
1031  socklen_t addrlen;
1032  int newclient;
1034  int i;
1035  struct sockaddr *tmpaddr;
1036  struct ucred ucred;
1037  uint32_t ucredlen = sizeof(struct ucred);
1038  uint32_t client_id = 0;
1039  struct pollfd pfds;
1040 
1041  /* handle new connections */
1042  addrlen = sizeof(remoteaddr);
1043  while (1) {
1044  if (c->is_terminated != 0) {
1045  break;
1046  }
1047  pfds = (struct pollfd) {.fd = c->server_sd, .events = POLLIN};
1048 
1049  if (poll(&pfds, 1, -1) == -1) {
1050  if (errno == EINTR) {
1051  if (c->is_terminated != 0) {
1052  break;
1053  }
1054  continue;
1055  } else {
1056  VERBOSE(CL_ERROR, "%s:%d unexpected error code %d", __func__, __LINE__, errno);
1057  }
1058  }
1059 
1060  if (pfds.revents & POLLIN) {
1061  newclient = accept(c->server_sd, (struct sockaddr *) &remoteaddr, &addrlen);
1062  if (newclient == -1) {
1063  VERBOSE(CL_ERROR, "Accepting new client failed.");
1064  } else {
1065  if (c->socket_type == TRAP_IFC_TCPIP) {
1066  tmpaddr = (struct sockaddr *) &remoteaddr;
1067  switch (((struct sockaddr *) tmpaddr)->sa_family) {
1068  case AF_INET:
1069  client_id = ntohs(((struct sockaddr_in *) tmpaddr)->sin_port);
1070  break;
1071  case AF_INET6:
1072  client_id = ntohs(((struct sockaddr_in6 *) tmpaddr)->sin6_port);
1073  break;
1074  }
1075  VERBOSE(CL_VERBOSE_ADVANCED, "Client connected via TCP socket, port=%u", client_id);
1076  } else {
1077  if (getsockopt(newclient, SOL_SOCKET, SO_PEERCRED, &ucred, &ucredlen) == -1) {
1078  goto refuse_client;
1079  }
1080  client_id = (uint32_t) ucred.pid;
1081  VERBOSE(CL_VERBOSE_ADVANCED, "Client connected via UNIX socket, pid=%ld", (long) ucred.pid);
1082  }
1083 
1084  VERBOSE(CL_VERBOSE_ADVANCED, "New connection from %s on socket %d",
1085  inet_ntop(remoteaddr.ss_family, get_in_addr((struct sockaddr*) &remoteaddr), remoteIP, INET6_ADDRSTRLEN),
1086  newclient);
1087 
1088  if (c->connected_clients < c->clients_arr_size) {
1089  cl = NULL;
1090  for (i = 0; i < c->clients_arr_size; ++i) {
1091  if (check_index(c->clients_bit_arr, i) == 0) {
1092  cl = &c->clients[i];
1093  break;
1094  }
1095  }
1096  if (cl == NULL) {
1097  goto refuse_client;
1098  }
1099 
1100  cl->sd = newclient;
1101  cl->pfds_index = -1;
1102  cl->sending_pointer = NULL;
1103  cl->pending_bytes = 0;
1104  cl->timer_total = 0;
1105  cl->id = client_id;
1106  cl->assigned_buffer = c->active_buffer;
1107  cl->timeouts = 0;
1108 
1109 #ifdef ENABLE_NEGOTIATION
1110  int ret_val = output_ifc_negotiation(c, TRAP_IFC_TYPE_TCPIP, i);
1111  if (ret_val == NEG_RES_OK) {
1112  VERBOSE(CL_VERBOSE_LIBRARY, "Output_ifc_negotiation result: success.");
1113  } else if (ret_val == NEG_RES_FMT_UNKNOWN) {
1114  VERBOSE(CL_VERBOSE_LIBRARY, "Output_ifc_negotiation result: failed (unknown data format of this output interface -> refuse client).");
1115  cl->sd = -1;
1116  goto refuse_client;
1117  } else { // ret_val == NEG_RES_FAILED, sending the data to input interface failed, refuse client
1118  VERBOSE(CL_VERBOSE_LIBRARY, "Output_ifc_negotiation result: failed (error while sending hello message to input interface).");
1119  cl->sd = -1;
1120  goto refuse_client;
1121  }
1122 #endif
1123 
1124  set_index(&c->clients_bit_arr, i);
1125  __sync_add_and_fetch(&c->connected_clients, 1);
1126  } else {
1127 refuse_client:
1128  VERBOSE(CL_VERBOSE_LIBRARY, "Shutting down client we do not have additional resources (%u/%u)", c->connected_clients, c->clients_arr_size);
1129  shutdown(newclient, SHUT_RDWR);
1130  close(newclient);
1131  }
1132  }
1133  }
1134  }
1135  pthread_exit(NULL);
1136 }
1137 
1138 /**
1139  * \brief Write buffer size to its header and shift active index.
1140  *
1141  * \param[in] priv Pointer to output interface private structure.
1142  * \param[in] buffer Pointer to the buffer.
1143  */
1144 static inline void finish_buffer(tcpip_sender_private_t *priv, buffer_t *buffer)
1145 {
1147 
1148  if (buffer->clients_bit_arr == 0 && buffer->wr_index != 0) {
1149  uint32_t header = htonl(buffer->wr_index);
1150  memcpy(buffer->header, &header, sizeof(header));
1151 
1152  priv->active_buffer = (priv->active_buffer + 1) % priv->buffer_count;
1153 
1154  buffer->clients_bit_arr = priv->clients_bit_arr;
1155  buffer->wr_index = 0;
1156  }
1157 
1158  pthread_mutex_lock(&priv->mtx_no_data);
1159  pthread_cond_broadcast(&priv->cond_no_data);
1160  pthread_mutex_unlock(&priv->mtx_no_data);
1161 }
1162 
1163 /**
1164  * \brief Force flush of active buffer
1165  *
1166  * \param[in] priv pointer to interface private data
1167  */
1168 void tcpip_sender_flush(void *priv)
1169 {
1171 
1172  pthread_mutex_lock(&c->ctx->out_ifc_list[c->ifc_idx].ifc_mtx);
1173  finish_buffer(c, &c->buffers[c->active_buffer]);
1174  pthread_mutex_unlock(&c->ctx->out_ifc_list[c->ifc_idx].ifc_mtx);
1175 
1176  __sync_add_and_fetch(&c->ctx->counter_autoflush[c->ifc_idx], 1);
1177 }
1178 
1179 /**
1180  * \brief Send data to client from his assigned buffer.
1181  *
1182  * \param[in] priv Pointer to iterface's private data structure.
1183  * \param[in] c Pointer to the client's structure.
1184  * \param[in] cl_id Client's index in the 'clients' array.
1185  *
1186  * \return TRAP_E_OK successfully sent.
1187  * \return TRAP_E_TERMINATED TRAP was terminated.
1188  * \return TRAP_E_IO_ERROR send failed although TRAP was not terminated.
1189  */
1190 static inline int send_data(tcpip_sender_private_t *priv, client_t *c, uint32_t cl_id)
1191 {
1192  int sent;
1193  /* Pointer to client's assigned buffer */
1194  buffer_t *buffer = &priv->buffers[c->assigned_buffer];
1195 
1196 again:
1197  sent = send(c->sd, c->sending_pointer, c->pending_bytes, MSG_NOSIGNAL);
1198 
1199  if (sent < 0) {
1200  /* Send failed */
1201  if (priv->is_terminated != 0) {
1202  return TRAP_E_TERMINATED;
1203  }
1204  switch (errno) {
1205  case EBADF:
1206  case EPIPE:
1207  case EFAULT:
1208  return TRAP_E_IO_ERROR;
1209  case EAGAIN:
1210  goto again;
1211  default:
1212  VERBOSE(CL_VERBOSE_OFF, "Unhandled error from send in send_data (errno: %i)", errno);
1213  return TRAP_E_IO_ERROR;
1214  }
1215  } else {
1216  c->pending_bytes -= sent;
1217  c->sending_pointer = (uint8_t *) c->sending_pointer + sent;
1218 
1219  /* Client received whole buffer */
1220  if (c->pending_bytes <= 0) {
1221  del_index(&buffer->clients_bit_arr, cl_id);
1222  if (buffer->clients_bit_arr == 0) {
1223  __sync_add_and_fetch(&priv->ctx->counter_send_buffer[priv->ifc_idx], 1);
1224  pthread_cond_broadcast(&priv->cond_full_buffer);
1225  }
1226 
1227  /* Assign client the next buffer in sequence */
1228  c->assigned_buffer = (c->assigned_buffer + 1) % priv->buffer_count;
1229  }
1230  }
1231  return TRAP_E_OK;
1232 }
1233 
1234 /**
1235  * \brief This function runs in a separate thread. It handles sending data
1236  to connected clients for TCPIP and UNIX interfaces.
1237  * \param[in] priv pointer to interface private data
1238  */
1239 static void *sending_thread_func(void *priv)
1240 {
1241  uint32_t i, j;
1242  int res;
1243  client_t *cl;
1244  buffer_t *assigned_buffer;
1245  uint8_t buffer[DEFAULT_MAX_DATA_LENGTH];
1246  uint64_t send_entry_time;
1247  uint64_t send_exit_time;
1248  uint8_t waiting_clients;
1249  int poll_timeout;
1250  int clients_pfds_size;
1251  struct pollfd *pfds;
1252  int64_t time_since_flush;
1253  int receiving_clients_array[DEFAULT_MAX_CLIENTS] = {0}; // Array of clients for whom we have data for
1254 
1256 
1257  while (1) {
1258  if (c->is_terminated != 0) {
1259  pthread_exit(NULL);
1260  }
1261  if (c->connected_clients == 0) {
1262  usleep(NO_CLIENTS_SLEEP);
1263  continue;
1264  }
1265 
1266  time_since_flush = get_cur_timestamp() - c->autoflush_timestamp;
1267  if (time_since_flush > c->ctx->out_ifc_list[c->ifc_idx].timeout) {
1268  tcpip_sender_flush(c);
1269  }
1270 
1271  clients_pfds_size = 0;
1272  waiting_clients = 0;
1273  poll_timeout = 1;
1274 
1275  /* Add term_pipe for reading into the disconnect client set */
1276  c->clients_pfds[clients_pfds_size++] = (struct pollfd) {.fd = c->term_pipe[0], .events = POLLIN};
1277 
1278  /* Check whether clients are connected and there is data for them to receive. */
1279  for (i = j = 0; i < c->clients_arr_size; ++i) {
1280  if (j == c->connected_clients) {
1281  break;
1282  }
1283 
1284  if (check_index(c->clients_bit_arr, i) == 0) {
1285  continue;
1286  }
1287 
1288  ++j;
1289  receiving_clients_array[i] = 0;
1290 
1291  cl = &(c->clients[i]);
1292  assigned_buffer = &c->buffers[cl->assigned_buffer];
1293 
1294  cl->pfds_index = j;
1295  pfds = c->clients_pfds + j;
1296  ++clients_pfds_size;
1297  *pfds = (struct pollfd) {.fd = cl->sd, .events = POLLIN};
1298 
1299  if (check_index(assigned_buffer->clients_bit_arr, i) == 0) {
1300  ++waiting_clients;
1301  continue;
1302  }
1303 
1304  if (cl->pending_bytes <= 0) {
1305  cl->sending_pointer = assigned_buffer->header;
1306  cl->pending_bytes = ntohl(*((uint32_t *) assigned_buffer->header)) + sizeof(uint32_t);
1307  }
1308 
1309  pfds->events = pfds->events | POLLOUT;
1310  receiving_clients_array[i] = 1;
1311  }
1312 
1313  if (waiting_clients == c->connected_clients) {
1314  int timeout = c->ctx->out_ifc_list[c->ifc_idx].timeout - time_since_flush;
1315  struct timespec ts;
1316  clock_gettime(CLOCK_REALTIME, &ts);
1317 
1318  ts.tv_nsec += (ts.tv_sec * 1000000000L) + (timeout * 1000L);
1319  ts.tv_sec = (ts.tv_nsec / 1000000000L);
1320  ts.tv_nsec %= 1000000000L;
1321 
1322  pthread_mutex_lock(&c->mtx_no_data);
1323  pthread_cond_timedwait(&c->cond_no_data, &c->mtx_no_data, &ts);
1324  pthread_mutex_unlock(&c->mtx_no_data);
1325  continue;
1326  }
1327 
1328  res = poll(c->clients_pfds, clients_pfds_size, poll_timeout);
1329  if (res < 0) {
1330  /* Select returned with an error */
1331  if (c->is_terminated == 0) {
1332  switch (errno) {
1333  case EINTR:
1334  continue;
1335  default:
1336  VERBOSE(CL_ERROR, "Sending thread: unexpected error in select (errno: %i)", errno);
1337  pthread_exit(NULL);
1338  }
1339  } else {
1340  VERBOSE(CL_VERBOSE_ADVANCED, "Sending thread: terminating...");
1341  pthread_exit(NULL);
1342  }
1343  } else if (res == 0) {
1344  /* Select timed out - no client will be receiving */
1345  for (i = j = 0; i < c->clients_arr_size; ++i) {
1346  if (j == c->connected_clients) {
1347  break;
1348  }
1349  if (check_index(c->clients_bit_arr, i) == 0) {
1350  continue;
1351  }
1352  j++;
1353  cl = &(c->clients[i]);
1354  if (receiving_clients_array[i] && cl->timeouts > 0) {
1355  /* Disconnect clients that are unable to receive data fast enough and are blocking the whole module. */
1356  disconnect_client(c, i);
1357  receiving_clients_array[i] = 0;
1358  VERBOSE(CL_VERBOSE_ADVANCED, "Sending thread: Client %" PRIu32 " could not receive data fast enough and was disconnected", cl->id);
1359  } else if (cl->sd < 0) {
1360  disconnect_client(c, i);
1361  receiving_clients_array[i] = 0;
1362  }
1363  }
1364  continue;
1365  }
1366 
1367  if (c->clients_pfds[0].revents & POLLIN) {
1368  /* Sending was interrupted by terminate(), exit even from TRAP_WAIT function call. */
1369  VERBOSE(CL_VERBOSE_ADVANCED, "Sending thread: Sending was interrupted by terminate()");
1370  pthread_exit(NULL);
1371  }
1372 
1373  /* Check file descriptors. Disconnect "inactive" clients and send data to those designated by select */
1374  for (i = j = 0; i < c->clients_arr_size; ++i) {
1375  if (j == c->connected_clients) {
1376  break;
1377  }
1378 
1379  receiving_clients_array[i] = 0;
1380  cl = &(c->clients[i]);
1381  if (cl->sd < 1) {
1382  continue;
1383  }
1384 
1385  ++j;
1386  if (cl->pfds_index < 0) {
1387  continue;
1388  }
1389 
1390  pfds = c->clients_pfds + cl->pfds_index;
1391  assert(pfds->fd == cl->sd);
1392 
1393  /* Check if client is still connected */
1394  if (pfds->revents & POLLIN) {
1395  res = recv(cl->sd, buffer, DEFAULT_MAX_DATA_LENGTH, 0);
1396  if (res < 1) {
1397  disconnect_client(c, i);
1398  VERBOSE(CL_VERBOSE_LIBRARY, "Sending thread: Client %" PRIu32 " disconnected", cl->id);
1399  continue;
1400  }
1401  }
1402 
1403  /* Check if client is ready for data */
1404  if (pfds->revents & POLLOUT) {
1405  send_entry_time = get_cur_timestamp();
1406  res = send_data(c, cl, i);
1407  send_exit_time = get_cur_timestamp();
1408 
1409  /* Measure how much time we spent sending to this client (in microseconds) */
1410  cl->timer_last = (send_exit_time - send_entry_time);
1411  cl->timer_total += cl->timer_last;
1412 
1413  if (res != TRAP_E_OK) {
1414  VERBOSE(CL_VERBOSE_OFF, "Sending thread: Disconnected client %" PRIu32 " (ret val: %d)", cl->id, res);
1415  disconnect_client(c, i);
1416  }
1417  }
1418  }
1419  }
1420 }
1421 
1422 /**
1423  * \brief Store message into buffer.
1424  *
1425  * \param[in] priv pointer to module private data
1426  * \param[in] data pointer to data to write
1427  * \param[in] size size of data to write
1428  * \param[in] timeout maximum time spent waiting for the message to be stored [microseconds]
1429  *
1430  * \return TRAP_E_OK Success.
1431  * \return TRAP_E_TIMEOUT Message was not stored into buffer and the attempt should be repeated.
1432  * \return TRAP_E_TERMINATED Libtrap was terminated during the process.
1433  */
1434 int tcpip_sender_send(void *priv, const void *data, uint16_t size, int timeout)
1435 {
1436  int res, i;
1437  uint32_t free_bytes;
1438  struct timespec ts;
1439  buffer_t *buffer;
1440 
1442  uint8_t block = (timeout == TRAP_WAIT || (timeout == TRAP_HALFWAIT && c->connected_clients != 0)) ? 1 : 0;
1443 
1444  /* Can we put message at least into empty buffer? In the worst case, we could end up with SEGFAULT -> rather skip with error */
1445  if ((size + sizeof(size)) > c->buffer_size) {
1446  VERBOSE(CL_ERROR, "Buffer is too small for this message. Skipping...");
1447  goto timeout;
1448  }
1449 
1450  /* If timeout is wait or half wait, we need to set some valid timeout value (>= 0)*/
1451  if (timeout == TRAP_WAIT || timeout == TRAP_HALFWAIT) {
1452  timeout = 10000;
1453  }
1454 
1455 repeat:
1456  if (c->is_terminated != 0) {
1457  return TRAP_E_TERMINATED;
1458  }
1459  if (block && c->connected_clients == 0) {
1460  usleep(NO_CLIENTS_SLEEP);
1461  goto repeat;
1462  }
1463 
1464  pthread_mutex_lock(&c->ctx->out_ifc_list[c->ifc_idx].ifc_mtx);
1465  buffer = &c->buffers[c->active_buffer];
1466  while (buffer->clients_bit_arr != 0) {
1467  clock_gettime(CLOCK_REALTIME, &ts);
1468 
1469  ts.tv_nsec += (ts.tv_sec * 1000000000L) + (timeout * 1000L);
1470  ts.tv_sec = (ts.tv_nsec / 1000000000L);
1471  ts.tv_nsec %= 1000000000L;
1472 
1473  /* Wait until woken up by sending thread or until timeout elapses */
1474  res = pthread_cond_timedwait(&c->cond_full_buffer, &c->ctx->out_ifc_list[c->ifc_idx].ifc_mtx, &ts);
1475  switch (res) {
1476  case 0:
1477  /* Succesfully locked, buffer can be used */
1478  break;
1479  case ETIMEDOUT:
1480  /* Desired buffer is still full after timeout */
1481  if (block) {
1482  /* Blocking send, wait until buffer is free to use */
1483  pthread_mutex_unlock(&c->ctx->out_ifc_list[c->ifc_idx].ifc_mtx);
1484  goto repeat;
1485  } else {
1486  /* Non-blocking send, drop message or force buffer reset (not implemented) */
1487  goto timeout;
1488  }
1489  default:
1490  VERBOSE(CL_ERROR, "Unexpected error in pthread_mutex_timedlock()");
1491  goto timeout;
1492  }
1493  }
1494 
1495  /* Check if there is enough space in buffer */
1496  free_bytes = c->buffer_size - buffer->wr_index;
1497  if (free_bytes >= (size + sizeof(size))) {
1498  /* Store message into buffer */
1499  insert_into_buffer(buffer, data, size);
1500 
1501  /* If bufferswitch is 0, only 1 message is allowed to be stored in buffer */
1502  if (c->ctx->out_ifc_list[c->ifc_idx].bufferswitch == 0) {
1503  finish_buffer(c, buffer);
1504  }
1505 
1506  pthread_mutex_unlock(&c->ctx->out_ifc_list[c->ifc_idx].ifc_mtx);
1507  return TRAP_E_OK;
1508  } else {
1509  /* Not enough space for message, finish current buffer and try to store message into next buffer */
1510  finish_buffer(c, buffer);
1511  buffer = &c->buffers[c->active_buffer];
1512 
1513  pthread_mutex_unlock(&c->ctx->out_ifc_list[c->ifc_idx].ifc_mtx);
1514  goto repeat;
1515  }
1516 
1517 timeout:
1518  for (i = 0; i < c->clients_arr_size; i++) {
1519  if (c->clients[i].sd > 0 && c->clients[i].assigned_buffer == c->active_buffer) {
1520  c->clients[i].timeouts++;
1521  }
1522  }
1523  pthread_mutex_unlock(&c->ctx->out_ifc_list[c->ifc_idx].ifc_mtx);
1524  return TRAP_E_TIMEOUT;
1525 }
1526 
1527 /**
1528  * \brief Set interface state as terminated.
1529  * \param[in] priv pointer to module private data
1530  */
1531 void tcpip_sender_terminate(void *priv)
1532 {
1534 
1535  uint32_t i;
1536  uint64_t sum;
1537 
1538  /* Wait for connected clients to receive all finished buffers before terminating */
1539  if (c != NULL) {
1540  do {
1541  usleep(10000); //prevents busy waiting
1542  sum = 0;
1543  for (i = 0; i < c->buffer_count; i++) {
1544  sum |= c->buffers[i].clients_bit_arr;
1545  }
1546  } while (sum != 0);
1547 
1548  c->is_terminated = 1;
1549  close(c->term_pipe[1]);
1550  VERBOSE(CL_VERBOSE_LIBRARY, "Closed term_pipe, it should break poll()");
1551  } else {
1552  VERBOSE(CL_ERROR, "Destroying IFC that is probably not initialized.");
1553  }
1554  return;
1555 }
1556 
1557 /**
1558  * \brief Destructor of TCP sender (output ifc)
1559  * \param[in] priv pointer to module private data
1560  */
1561 void tcpip_sender_destroy(void *priv)
1562 {
1564  char *unix_socket_path = NULL;
1565  void *res;
1566  int32_t i;
1567 
1568 #define X(x) free(x); x = NULL;
1569  // Free private data
1570  if (c != NULL) {
1572  if (asprintf(&unix_socket_path, trap_default_socket_path_format, c->server_port) != -1) {
1573  if (unix_socket_path != NULL) {
1574  unlink(unix_socket_path);
1575  X(unix_socket_path);
1576  }
1577  }
1578  }
1579  if (c->server_port != NULL) {
1580  X(c->server_port);
1581  }
1582  if ((c->initialized) && (c->socket_type != TRAP_IFC_TCPIP_SERVICE)) {
1583  pthread_cancel(c->send_thr);
1584  pthread_cancel(c->accept_thr);
1585  pthread_join(c->send_thr, &res);
1586  pthread_join(c->accept_thr, &res);
1587  }
1588 
1589  /* close server socket */
1590  close(c->server_sd);
1591 
1592  if (c->clients_pfds != NULL) {
1593  X(c->clients_pfds);
1594  }
1595  /* disconnect all clients */
1596  if (c->clients != NULL) {
1598  X(c->clients);
1599  }
1600 
1601  if (c->buffers != NULL) {
1602  for (i = 0; i < c->buffer_count; i++) {
1603  X(c->buffers[i].header);
1604  }
1605  X(c->buffers);
1606  }
1607 
1608  pthread_mutex_destroy(&c->mtx_no_data);
1609  pthread_cond_destroy(&c->cond_no_data);
1610  pthread_cond_destroy(&c->cond_full_buffer);
1611  X(c)
1612  }
1613 #undef X
1614 }
1615 
1617 {
1619 
1620  if (c == NULL) {
1621  return 0;
1622  }
1623 
1624  return c->connected_clients;
1625 }
1626 
1627 int8_t tcpip_sender_get_client_stats_json(void *priv, json_t *client_stats_arr)
1628 {
1629  int i;
1630  json_t *client_stats = NULL;
1632 
1633  if (c == NULL) {
1634  return 0;
1635  }
1636 
1637  for (i = 0; i < c->clients_arr_size; ++i) {
1638  if (check_index(c->clients_bit_arr, i) == 0) {
1639  continue;
1640  }
1641 
1642  client_stats = json_pack("{sisisisi}", "id", c->clients[i].id, "timer_total", c->clients[i].timer_total, "timer_last", c->clients[i].timer_last, "timeouts", c->clients[i].timeouts);
1643  if (client_stats == NULL) {
1644  return 0;
1645  }
1646 
1647  if (json_array_append_new(client_stats_arr, client_stats) == -1) {
1648  return 0;
1649  }
1650  }
1651  return 1;
1652 }
1653 
1654 static void tcpip_sender_create_dump(void *priv, uint32_t idx, const char *path)
1655 {
1657  /* return value */
1658  int r;
1659  /* config file trap-i<number>-config.txt */
1660  char *conf_file = NULL;
1661  FILE *f = NULL;
1662  int32_t i;
1663  client_t *cl;
1664 
1665  r = asprintf(&conf_file, "%s/trap-o%02"PRIu32"-config.txt", path, idx);
1666  if (r == -1) {
1667  VERBOSE(CL_ERROR, "Not enough memory, dump failed. (%s:%d)", __FILE__, __LINE__);
1668  conf_file = NULL;
1669  goto exit;
1670  }
1671  f = fopen(conf_file, "w");
1672  fprintf(f, "Server port: %s\n"
1673  "Server socket descriptor: %d\n"
1674  "Connected clients: %d\n"
1675  "Max clients: %d\n"
1676  "Active buffer: %d\n"
1677  "Buffer count: %u\n"
1678  "Buffer size: %u\n"
1679  "Terminated: %d\n"
1680  "Initialized: %d\n"
1681  "Socket type: %s\n"
1682  "Timeout: %u us\n",
1683  c->server_port,
1684  c->server_sd,
1685  c->connected_clients,
1686  c->clients_arr_size,
1687  c->active_buffer,
1688  c->buffer_size,
1689  c->buffer_size,
1690  c->is_terminated,
1691  c->initialized,
1693  c->ctx->out_ifc_list[idx].datatimeout);
1694  fprintf(f, "Clients:\n");
1695  for (i = 0; i < c->clients_arr_size; i++) {
1696  cl = &c->clients[i];
1697  fprintf(f, "\t{%d, %d, %p, %d}\n", cl->sd, cl->assigned_buffer, cl->sending_pointer, cl->pending_bytes);
1698  }
1699  fclose(f);
1700 exit:
1701  free(conf_file);
1702  return;
1703 }
1704 
1705 char *tcpip_send_ifc_get_id(void *priv)
1706 {
1707  if (priv == NULL) {
1708  return NULL;
1709  }
1710 
1712  if (config->server_port == NULL) {
1713  return NULL;
1714  }
1715  return config->server_port;
1716 }
1717 
1718 /**
1719  * \brief Constructor of output TCP/IP IFC module.
1720  * This function is called by TRAP library to initialize one output interface.
1721  *
1722  * \param[in,out] ctx Pointer to the private libtrap context data (trap_ctx_init()).
1723  * \param[in] params Configuration string containing interface specific parameters -
1724  * - tcp port/unix socket, max number of clients, buffer size, buffer count.
1725  * \param[in,out] ifc IFC interface used for calling TCP/IP module.
1726  * \param[in] idx Index of IFC that is created.
1727  * \param [in] type select the type of socket (see #tcpip_ifc_sockettype for options)
1728  * \return 0 on success (TRAP_E_OK)
1729  */
1730 int create_tcpip_sender_ifc(trap_ctx_priv_t *ctx, const char *params, trap_output_ifc_t *ifc, uint32_t idx, enum tcpip_ifc_sockettype type)
1731 {
1732  int result = TRAP_E_OK;
1733  char *param_iterator = NULL;
1734  char *param_str = NULL;
1735  char *server_port = NULL;
1736  tcpip_sender_private_t *priv = NULL;
1737  unsigned int max_clients = DEFAULT_MAX_CLIENTS;
1738  unsigned int buffer_count = DEFAULT_BUFFER_COUNT;
1739  unsigned int buffer_size = DEFAULT_BUFFER_SIZE;
1740  uint32_t i;
1741 
1742 #define X(pointer) free(pointer); \
1743  pointer = NULL;
1744 
1745  // Check parameter
1746  if (params == NULL) {
1747  VERBOSE(CL_ERROR, "IFC requires at least one parameter (%s).",
1748  type == TRAP_IFC_TCPIP ? "TCP port" : "UNIX socket name");
1749  return TRAP_E_BADPARAMS;
1750  }
1751 
1752  // Create structure to store private data
1753  priv = (tcpip_sender_private_t *) calloc(1, sizeof(tcpip_sender_private_t));
1754  if (priv == NULL) {
1755  result = TRAP_E_MEMORY;
1756  goto failsafe_cleanup;
1757  }
1758 
1759  /* Parsing params */
1760  param_iterator = trap_get_param_by_delimiter(params, &server_port, TRAP_IFC_PARAM_DELIMITER);
1761  if ((server_port == NULL) || (strlen(server_port) == 0)) {
1762  VERBOSE(CL_ERROR, "Missing 'port' for %s IFC.", (type == TRAP_IFC_TCPIP ? "TCPIP" : "UNIX socket"));
1763  result = TRAP_E_BADPARAMS;
1764  goto failsafe_cleanup;
1765  }
1766 
1767  /* Optional params */
1768  while (param_iterator != NULL) {
1769  param_iterator = trap_get_param_by_delimiter(param_iterator, &param_str, TRAP_IFC_PARAM_DELIMITER);
1770  if (param_str == NULL)
1771  continue;
1772  if (strncmp(param_str, "buffer_count=x", BUFFER_COUNT_PARAM_LENGTH) == 0) {
1773  if (sscanf(param_str + BUFFER_COUNT_PARAM_LENGTH, "%u", &buffer_count) != 1) {
1774  VERBOSE(CL_ERROR, "Optional buffer count given, but it is probably in wrong format.");
1775  buffer_count = DEFAULT_BUFFER_COUNT;
1776  }
1777  } else if (strncmp(param_str, "buffer_size=x", BUFFER_SIZE_PARAM_LENGTH) == 0) {
1778  if (sscanf(param_str + BUFFER_SIZE_PARAM_LENGTH, "%u", &buffer_size) != 1) {
1779  VERBOSE(CL_ERROR, "Optional buffer size given, but it is probably in wrong format.");
1780  buffer_size = DEFAULT_BUFFER_SIZE;
1781  }
1782  } else if (strncmp(param_str, "max_clients=x", MAX_CLIENTS_PARAM_LENGTH) == 0) {
1783  if (sscanf(param_str + MAX_CLIENTS_PARAM_LENGTH, "%u", &max_clients) != 1 || max_clients > 64) {
1784  VERBOSE(CL_ERROR, "Optional max clients number given, but it is probably in wrong format.");
1785  max_clients = DEFAULT_MAX_CLIENTS;
1786  }
1787  } else {
1788  VERBOSE(CL_ERROR, "Unknown parameter \"%s\".", param_str);
1789  }
1790  X(param_str);
1791  }
1792  /* Parsing params ended */
1793 
1794  priv->buffers = calloc(buffer_count, sizeof(buffer_t));
1795  if (priv->buffers == NULL) {
1796  /* if some memory could not have been allocated, we cannot continue */
1797  goto failsafe_cleanup;
1798  }
1799  for (i = 0; i < buffer_count; ++i) {
1800  buffer_t *b = &(priv->buffers[i]);
1801 
1802  b->header = malloc(buffer_size + sizeof(buffer_size));
1803  if (b->header == NULL) {
1804  /* if some memory could not have been allocated, we cannot continue */
1805  result = TRAP_E_MEMORY;
1806  goto failsafe_cleanup;
1807  }
1808 
1809  b->data = b->header + sizeof(buffer_size);
1810  b->wr_index = 0;
1811  b->clients_bit_arr = 0;
1812  }
1813  priv->clients_pfds = calloc(max_clients + 1, sizeof(*priv->clients_pfds));
1814  if (priv->clients_pfds == NULL) {
1815  /* if some memory could not have been allocated, we cannot continue */
1816  result = TRAP_E_MEMORY;
1817  goto failsafe_cleanup;
1818  }
1819  priv->clients = calloc(max_clients, sizeof(client_t));
1820  if (priv->clients == NULL) {
1821  /* if some memory could not have been allocated, we cannot continue */
1822  result = TRAP_E_MEMORY;
1823  goto failsafe_cleanup;
1824  }
1825  for (i = 0; i < max_clients; ++i) {
1826  client_t *client = &(priv->clients[i]);
1827 
1828  client->assigned_buffer = 0;
1829  client->sd = -1;
1830  client->pfds_index = -1;
1831  client->timer_total = 0;
1832  client->pending_bytes = 0;
1833  client->sending_pointer = NULL;
1834  }
1835 
1836  priv->ctx = ctx;
1837  priv->socket_type = type;
1838  priv->ifc_idx = idx;
1839  priv->server_port = server_port;
1840  priv->buffer_size = buffer_size;
1841  priv->buffer_count = buffer_count;
1842  priv->clients_arr_size = max_clients;
1843  priv->clients_bit_arr = 0;
1844  priv->connected_clients = 0;
1845  priv->is_terminated = 0;
1846  priv->active_buffer = 0;
1848 
1849  pthread_mutex_init(&priv->mtx_no_data, NULL);
1850  pthread_cond_init(&priv->cond_no_data, NULL);
1851  pthread_cond_init(&priv->cond_full_buffer, NULL);
1852 
1853  VERBOSE(CL_VERBOSE_ADVANCED, "config:\nserver_port:\t%s\nmax_clients:\t%u\nbuffer count:\t%u\nbuffer size:\t%uB\n",
1854  priv->server_port, priv->clients_arr_size, priv->buffer_count, priv->buffer_size);
1855 
1856  result = server_socket_open(priv);
1857  if (result != TRAP_E_OK) {
1858  VERBOSE(CL_ERROR, "Socket could not be opened on given port '%s'.", server_port);
1859  goto failsafe_cleanup;
1860  }
1861 
1862  if (pipe(priv->term_pipe) != 0) {
1863  VERBOSE(CL_ERROR, "Opening of pipe failed. Using stdin as a fall back.");
1864  priv->term_pipe[0] = 0;
1865  }
1866 
1867  // Fill struct defining the interface
1869  ifc->send = tcpip_sender_send;
1870  ifc->flush = tcpip_sender_flush;
1876  ifc->priv = priv;
1878  return result;
1879 
1880 failsafe_cleanup:
1881  X(server_port);
1882  X(param_str);
1883  if (priv != NULL) {
1884  if (priv->buffers != NULL) {
1885  for (i = 0; i < priv->buffer_count; i++) {
1886  X(priv->buffers[i].header);
1887  }
1888  X(priv->buffers)
1889  }
1890  if (priv->clients_pfds != NULL) {
1891  X(priv->clients_pfds);
1892  }
1893  if (priv->clients != NULL) {
1894  X(priv->clients);
1895  }
1896  pthread_mutex_destroy(&priv->mtx_no_data);
1897  pthread_cond_destroy(&priv->cond_no_data);
1898  pthread_cond_destroy(&priv->cond_full_buffer);
1899  X(priv);
1900  }
1901 #undef X
1902  return result;
1903 }
1904 
1905 /**
1906  * \brief Open TCPIP socket for sender module
1907  * \param[in] priv tcpip_sender_private_t structure (private data)
1908  * \return 0 on success (TRAP_E_OK), TRAP_E_IO_ERROR on error
1909  */
1910 static int server_socket_open(void *priv)
1911 {
1912  int yes = 1; // for setsockopt() SO_REUSEADDR, below
1913  int rv;
1914 
1915  union tcpip_socket_addr addr;
1916  struct addrinfo *ai, *p = NULL;
1918  if (c->server_port == NULL) {
1919  return TRAP_E_BAD_FPARAMS;
1920  }
1921 
1922  memset(&addr, 0, sizeof(addr));
1923 
1924  if (c->socket_type == TRAP_IFC_TCPIP) {
1925  if (check_portrange(c->server_port) == EXIT_FAILURE) {
1926  return TRAP_E_BADPARAMS;
1927  }
1928 
1929  // get us a socket and bind it
1930  addr.tcpip_addr.ai_family = AF_UNSPEC;
1931  addr.tcpip_addr.ai_socktype = SOCK_STREAM;
1932  addr.tcpip_addr.ai_flags = AI_PASSIVE;
1933  if ((rv = getaddrinfo(NULL, c->server_port, &addr.tcpip_addr, &ai)) != 0) {
1934  return trap_errorf(c->ctx, TRAP_E_IO_ERROR, "selectserver: %s\n", gai_strerror(rv));
1935  }
1936 
1937  for (p = ai; p != NULL; p = p->ai_next) {
1938  c->server_sd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
1939  if (c->server_sd < 0) {
1940  continue;
1941  }
1942 
1943  // lose the pesky "address already in use" error message
1944  if (setsockopt(c->server_sd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
1945  VERBOSE(CL_ERROR, "Failed to set socket to reuse address. (%d)", errno);
1946  }
1947 
1948  if (bind(c->server_sd, p->ai_addr, p->ai_addrlen) < 0) {
1949  close(c->server_sd);
1950  continue;
1951  }
1952  break; /* found socket to bind */
1953  }
1954  freeaddrinfo(ai); // all done with this
1955  } else if ((c->socket_type == TRAP_IFC_TCPIP_UNIX) || (c->socket_type == TRAP_IFC_TCPIP_SERVICE)) {
1956  /* UNIX socket */
1957  addr.unix_addr.sun_family = AF_UNIX;
1958  snprintf(addr.unix_addr.sun_path, sizeof(addr.unix_addr.sun_path) - 1, trap_default_socket_path_format, c->server_port);
1959  /* if socket file exists, it could be hard to create new socket and bind */
1960  unlink(addr.unix_addr.sun_path); /* error when file does not exist is not a problem */
1961  c->server_sd = socket(AF_UNIX, SOCK_STREAM, 0);
1962  if (c->server_sd != -1) {
1963  if (bind(c->server_sd, (struct sockaddr *) &addr.unix_addr, sizeof(addr.unix_addr)) != -1) {
1964  p = (struct addrinfo *) &addr.unix_addr;
1965  if (chmod(addr.unix_addr.sun_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH) == -1) {
1966  VERBOSE(CL_ERROR, "Failed to set permissions to socket (%s).", addr.unix_addr.sun_path);
1967  }
1968  } else {
1969  /* error bind() failed */
1970  p = NULL;
1971  VERBOSE(CL_ERROR, "Failed bind() with the following socket path: %s", addr.unix_addr.sun_path);
1972  }
1973  } else {
1974  VERBOSE(CL_ERROR, "Failed to create socket.");
1975  p = NULL;
1976  }
1977  }
1978 
1979  if (p == NULL) {
1980  // if we got here, it means we didn't get bound
1981  VERBOSE(CL_VERBOSE_LIBRARY, "selectserver: failed to bind");
1982  return TRAP_E_IO_ERROR;
1983  }
1984 
1985  // listen
1986  if (listen(c->server_sd, c->clients_arr_size) == -1) {
1987  VERBOSE(CL_ERROR, "Listen failed");
1988  return TRAP_E_IO_ERROR;
1989  }
1990 
1991  if (c->socket_type != TRAP_IFC_TCPIP_SERVICE) {
1992  if (pthread_create(&c->send_thr, NULL, sending_thread_func, priv) != 0) {
1993  VERBOSE(CL_ERROR, "Failed to create sending thread.");
1994  return TRAP_E_IO_ERROR;
1995  }
1996  }
1997 
1998  if (c->socket_type != TRAP_IFC_TCPIP_SERVICE) {
1999  if (pthread_create(&c->accept_thr, NULL, accept_clients_thread, priv) != 0) {
2000  VERBOSE(CL_ERROR, "Failed to create accept_thread.");
2001  return TRAP_E_IO_ERROR;
2002  }
2003  }
2004 
2005  c->initialized = 1;
2006  return 0;
2007 }
2008 
2009 /**
2010  * @}
2011  *//* tcpip_sender */
2012 
2013 
2014 /**
2015  * @}
2016  *//* tcpip module */
2017 
2018 /**
2019  * @}
2020  *//* ifc modules */
2021 
2022 
2023 // Local variables:
2024 // c-basic-offset: 3
2025 // End:
ifc_get_id_func_t get_id
Pointer to get_id function.
Definition: trap_ifc.h:182
static int client_socket_connect(void *priv, const char *dest_addr, const char *dest_port, int *socket_descriptor, struct timeval *tv)
client_socket is used as a receiver
Definition: ifc_tcpip.c:804
uint8_t * data
#define BUFFER_SIZE_PARAM_LENGTH
#define TRAP_E_OK
Success, no error.
Definition: trap.h:87
int tcpip_receiver_recv(void *priv, void *data, uint32_t *size, int timeout)
Receive data from interface.
Definition: ifc_tcpip.c:302
#define MAX_RECOVERY_TRY
Definition: ifc_tcpip.c:82
char * trap_get_param_by_delimiter(const char *source, char **dest, const char delimiter)
Splitter of params string. Cut the first param, copy it into dest and returns pointer to the start of...
Definition: trap.c:1160
ifc_get_client_stats_json_func_t get_client_stats_json
Pointer to get_client_stats_json function.
Definition: trap_ifc.h:244
void * sending_pointer
#define NEG_RES_SENDER_FMT_SUBSET
If the data format of input and output interfaces is the same and new data specifier of the output in...
Definition: trap_internal.h:76
char * tcpip_send_ifc_get_id(void *priv)
Definition: ifc_tcpip.c:1705
Internal functions and macros for libtrap Verbose and debug macros from libcommlbr.
static void disconnect_client(tcpip_sender_private_t *priv, int cl_id)
This function is called when a client was/is being disconnected.
Definition: ifc_tcpip.c:982
uint32_t wr_index
Output buffer structure.
#define TRAP_IFC_PARAM_DELIMITER
Definition: trap.h:165
int create_tcpip_receiver_ifc(trap_ctx_priv_t *ctx, char *params, trap_input_ifc_t *ifc, uint32_t idx, enum tcpip_ifc_sockettype type)
Constructor of input TCP/IP IFC module. This function is called by TRAP library to initialize one inp...
Definition: ifc_tcpip.c:636
pthread_cond_t cond_full_buffer
struct trap_buffer_header_s trap_buffer_header_t
uint64_t timer_total
pthread_mutex_t ifc_mtx
Locking mutex for interface.
Definition: trap_ifc.h:246
char * trap_default_socket_path_format
static int server_socket_open(void *priv)
Open TCPIP socket for sender module.
Definition: ifc_tcpip.c:1910
#define NEG_RES_FAILED
If receiving the data from output interface fails or sending the data to input interface fails...
Definition: trap_internal.h:84
void tcpip_sender_terminate(void *priv)
Set interface state as terminated.
Definition: ifc_tcpip.c:1531
void trap_set_timeouts(int timeout, struct timeval *tm, struct timespec *tmnblk)
Internal function for setting of timeout structs according to libtrap timeout.
Definition: trap.c:1211
ifc_get_id_func_t get_id
Pointer to get_id function.
Definition: trap_ifc.h:236
enum tcpip_ifc_sockettype socket_type
#define TRAP_TIMEOUT_STR(t)
Definition: trap.h:132
uint32_t assigned_buffer
#define TRAP_E_FIELDS_MISMATCH
Returned when receiver fields are not subset of sender fields.
Definition: trap.h:98
static uint64_t get_cur_timestamp()
Definition: ifc_tcpip.c:258
struct sockaddr_un unix_addr
used for path of UNIX socket
Definition: ifc_tcpip.c:102
TRAP TCP/IP interfaces private structures.
static void tcpip_receiver_create_dump(void *priv, uint32_t idx, const char *path)
Definition: ifc_tcpip.c:541
int output_ifc_negotiation(void *ifc_priv_data, char ifc_type, uint32_t client_idx)
Definition: trap.c:2833
static void insert_into_buffer(file_buffer_t *buffer, const void *data, uint16_t size)
Definition: ifc_file.c:647
static void set_index(uint64_t *bits, int i)
Set i-th element (one bit) of &#39;bits&#39; to 1.
void tcpip_receiver_destroy(void *priv)
Destructor of TCPIP receiver (input ifc)
Definition: ifc_tcpip.c:520
#define X(p)
#define UNIX_PATH_FILENAME_FORMAT
Definition: ifc_tcpip.h:64
void tcpip_sender_flush(void *priv)
Force flush of active buffer.
Definition: ifc_tcpip.c:1168
use UNIX socket as a service interface
Definition: ifc_tcpip.h:73
static void del_index(uint64_t *bits, int i)
Set i-th element (one bit) of &#39;bits&#39; to 0.
#define TRAP_E_TERMINATED
Interface was terminated during reading/writing.
Definition: trap.h:94
static void client_socket_disconnect(void *priv)
Definition: ifc_tcpip.c:751
static int wait_for_connection(int sock, struct timeval *tv)
Definition: ifc_tcpip.c:769
uint64_t timeouts
ifc_create_dump_func_t create_dump
Pointer to function for generating of dump.
Definition: trap_ifc.h:242
#define TRAP_HALFWAIT
Definition: trap.h:130
void tcpip_sender_destroy(void *priv)
Destructor of TCP sender (output ifc)
Definition: ifc_tcpip.c:1561
#define TRAP_E_IO_ERROR
IO Error.
Definition: trap.h:93
#define NEG_RES_FMT_CHANGED
If the data format has changed (for JSON type, UNIREC type uses *SUBSET variants) ...
Definition: trap_internal.h:78
struct pollfd * clients_pfds
#define VERBOSE(level, format, args...)
static void tcpip_sender_create_dump(void *priv, uint32_t idx, const char *path)
Definition: ifc_tcpip.c:1654
static int trap_errorf(trap_ctx_priv_t *ctx, int err_num, const char *msg,...)
Definition: trap_error.h:92
uint8_t data[0]
ifc_get_client_count_func_t get_client_count
Pointer to get_client_count function.
Definition: trap_ifc.h:243
#define NEG_RES_CONT
If the data format and data specifier of input and output interface are the same (input interface can...
Definition: trap_internal.h:74
ifc_disconn_clients_func_t disconn_clients
Pointer to disconnect_clients function.
Definition: trap_ifc.h:237
int32_t tcpip_sender_get_client_count(void *priv)
Definition: ifc_tcpip.c:1616
uint32_t timer_last
void tcpip_receiver_terminate(void *priv)
Set interface state as terminated.
Definition: ifc_tcpip.c:504
Structure for TCP/IP IFC private information.
int8_t tcpip_sender_get_client_stats_json(void *priv, json_t *client_stats_arr)
Definition: ifc_tcpip.c:1627
ifc_terminate_func_t terminate
Pointer to terminate function.
Definition: trap_ifc.h:184
#define TRAP_E_BAD_FPARAMS
Bad parameters of function.
Definition: trap.h:92
use UNIX socket for local communication
Definition: ifc_tcpip.h:72
int tcpip_sender_send(void *priv, const void *data, uint16_t size, int timeout)
Store message into buffer.
Definition: ifc_tcpip.c:1434
static void * accept_clients_thread(void *arg)
This function runs in a separate thread and handles new client&#39;s connection requests.
Definition: ifc_tcpip.c:1026
char bufferswitch
Enable (1) or Disable (0) buffering, default is Enabled (1).
Definition: trap_ifc.h:250
#define TRAP_IFC_TYPE_TCPIP
trap_ifc_tcpip (input&output part)
Definition: trap.h:173
#define DEFAULT_BUFFER_SIZE
#define DEBUG_IFC(X)
trap_buffer_header_t int_mess_header
static uint64_t check_index(uint64_t bits, int i)
Return value of i-th element (one bit) in the &#39;bits&#39; array.
Structure for TCP/IP IFC client information.
use TCP/IP connection
Definition: ifc_tcpip.h:71
TRAP TCP/IP interfaces.
ifc_destroy_func_t destroy
Pointer to destructor function.
Definition: trap_ifc.h:185
int32_t datatimeout
Timeout for *_send() calls.
Definition: trap_ifc.h:248
#define TRAP_E_NEGOTIATION_FAILED
Returned by trap_recv when negotiation of the output and input interfaces failed. ...
Definition: trap.h:102
static int check_portrange(const char *port)
Check if the given port is a correct port number.
Definition: ifc_tcpip.c:138
ifc_send_func_t send
Pointer to send function.
Definition: trap_ifc.h:238
int32_t datatimeout
Timeout for *_recv() calls.
Definition: trap_ifc.h:191
void tcpip_server_disconnect_all_clients(void *priv)
Function disconnects all clients of the output interface whose private structure is passed via "priv"...
Definition: ifc_tcpip.c:1009
#define NEG_RES_FMT_UNKNOWN
If the output interface has not specified data format.
Definition: trap_internal.h:85
#define DEFAULT_MAX_CLIENTS
ifc_recv_func_t recv
Pointer to receive function.
Definition: trap_ifc.h:183
#define TRAP_E_FORMAT_MISMATCH
Returned by trap_recv when data format or data specifier of the output and input interfaces doesn&#39;t m...
Definition: trap.h:101
#define NEG_RES_FMT_MISMATCH
If the data format or data specifier of input and output interfaces does not match.
Definition: trap_internal.h:77
ifc_create_dump_func_t create_dump
Pointer to function for generating of dump.
Definition: trap_ifc.h:186
#define DEFAULT_BUFFER_COUNT
int64_t timeout
Internal structure to send partial data after timeout (autoflush).
Definition: trap_ifc.h:247
uint8_t * header
Error handling for TRAP.
static int receive_part(void *priv, void **data, uint32_t *size, struct timeval *tm)
Definition: ifc_tcpip.c:177
#define TRAP_E_MEMORY
Memory allocation error.
Definition: trap.h:104
#define NEG_RES_OK
Signaling success (hello message successfully sent to input interface)
Definition: trap_internal.h:81
struct addrinfo tcpip_addr
used for TCPIP socket
Definition: ifc_tcpip.c:101
ifc_destroy_func_t destroy
Pointer to destructor function.
Definition: trap_ifc.h:241
int create_tcpip_sender_ifc(trap_ctx_priv_t *ctx, const char *params, trap_output_ifc_t *ifc, uint32_t idx, enum tcpip_ifc_sockettype type)
Constructor of output TCP/IP IFC module. This function is called by TRAP library to initialize one ou...
Definition: ifc_tcpip.c:1730
ifc_terminate_func_t terminate
Pointer to terminate function.
Definition: trap_ifc.h:240
This file contains common functions and structures used in socket based interfaces (tcp-ip / tls)...
static int send_data(tcpip_sender_private_t *priv, client_t *c, uint32_t cl_id)
Send data to client from his assigned buffer.
Definition: ifc_tcpip.c:1190
uint64_t clients_bit_arr
#define TRAP_E_TIMEOUT
Read or write operation timeout.
Definition: trap.h:88
enum tcpip_ifc_sockettype socket_type
uint64_t * counter_autoflush
#define NEG_RES_RECEIVER_FMT_SUBSET
If the data format of input and output interfaces is the same and data specifier of the input interfa...
Definition: trap_internal.h:75
static void finish_buffer(tcpip_sender_private_t *priv, buffer_t *buffer)
Write buffer size to its header and shift active index.
Definition: ifc_tcpip.c:1144
#define TCPIP_SOCKETTYPE_STR(st)
Definition: ifc_tcpip.h:75
#define DEFAULT_MAX_DATA_LENGTH
uint32_t pending_bytes
ifc_flush_func_t flush
Pointer to flush function.
Definition: trap_ifc.h:239
static void * sending_thread_func(void *priv)
This function runs in a separate thread. It handles sending data to connected clients for TCPIP and U...
Definition: ifc_tcpip.c:1239
#define TRAP_WAIT
Definition: trap.h:124
void * priv
Pointer to instance&#39;s private data.
Definition: trap_ifc.h:187
uint64_t * counter_send_buffer
int input_ifc_negotiation(void *ifc_priv_data, char ifc_type)
Definition: trap.c:2995
ifc_is_conn_func_t is_conn
Pointer to is_connected function.
Definition: trap_ifc.h:181
void * priv
Pointer to instance&#39;s private data.
Definition: trap_ifc.h:245
static void * get_in_addr(struct sockaddr *sa)
Get sockaddr, IPv4 or IPv6.
Definition: ifc_tcpip.c:119
trap_output_ifc_t * out_ifc_list
#define MAX_CLIENTS_PARAM_LENGTH
Interface of TRAP interfaces.
#define BUFFER_COUNT_PARAM_LENGTH
uint8_t tcpip_recv_ifc_is_conn(void *priv)
Definition: ifc_tcpip.c:611
char * tcpip_recv_ifc_get_id(void *priv)
Definition: ifc_tcpip.c:598
trap_input_ifc_t * in_ifc_list
#define TRAP_E_BADPARAMS
Bad parameters passed to interface initializer.
Definition: trap.h:90
#define NO_CLIENTS_SLEEP
tcpip_ifc_sockettype
Definition: ifc_tcpip.h:70
char *trap_default_socket_path_format __attribute__((used))