pacemaker  1.1.16-94ff4df
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008 Andrew Beekhof
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  *
18  */
19 #include <crm_internal.h>
20 #include <crm/crm.h>
21 
22 #include <sys/param.h>
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 #include <netinet/ip.h>
31 #include <netinet/tcp.h>
32 #include <netdb.h>
33 
34 #include <stdlib.h>
35 #include <errno.h>
36 #include <fcntl.h>
37 #include <glib.h>
38 
39 #include <bzlib.h>
40 
41 #include <crm/common/ipcs.h>
42 #include <crm/common/xml.h>
43 #include <crm/common/mainloop.h>
44 
45 #ifdef HAVE_GNUTLS_GNUTLS_H
46 # undef KEYFILE
47 # include <gnutls/gnutls.h>
48 
49 const int psk_tls_kx_order[] = {
50  GNUTLS_KX_DHE_PSK,
51  GNUTLS_KX_PSK,
52 };
53 
54 const int anon_tls_kx_order[] = {
55  GNUTLS_KX_ANON_DH,
56  GNUTLS_KX_DHE_RSA,
57  GNUTLS_KX_DHE_DSS,
58  GNUTLS_KX_RSA,
59  0
60 };
61 #endif
62 
63 /* Swab macros from linux/swab.h */
64 #ifdef HAVE_LINUX_SWAB_H
65 # include <linux/swab.h>
66 #else
67 /*
68  * casts are necessary for constants, because we never know how for sure
69  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
70  */
71 #define __swab16(x) ((uint16_t)( \
72  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
73  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
74 
75 #define __swab32(x) ((uint32_t)( \
76  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
77  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
78  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
79  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
80 
81 #define __swab64(x) ((uint64_t)( \
82  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
83  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
84  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
85  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
86  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
87  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
88  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
89  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
90 #endif
91 
92 #define REMOTE_MSG_VERSION 1
93 #define ENDIAN_LOCAL 0xBADADBBD
94 
95 struct crm_remote_header_v0
96 {
97  uint32_t endian; /* Detect messages from hosts with different endian-ness */
99  uint64_t id;
100  uint64_t flags;
105 
106  /* New fields get added here */
107 
108 } __attribute__ ((packed));
109 
110 static struct crm_remote_header_v0 *
111 crm_remote_header(crm_remote_t * remote)
112 {
113  struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
114  if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
115  return NULL;
116 
117  } else if(header->endian != ENDIAN_LOCAL) {
118  uint32_t endian = __swab32(header->endian);
121  if(endian != ENDIAN_LOCAL) {
122  crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
123  ENDIAN_LOCAL, header->endian, endian);
124  return NULL;
125  }
126 
127  header->id = __swab64(header->id);
128  header->flags = __swab64(header->flags);
129  header->endian = __swab32(header->endian);
130 
131  header->version = __swab32(header->version);
132  header->size_total = __swab32(header->size_total);
133  header->payload_offset = __swab32(header->payload_offset);
134  header->payload_compressed = __swab32(header->payload_compressed);
135  header->payload_uncompressed = __swab32(header->payload_uncompressed);
136  }
137 
138  return header;
139 }
140 
141 #ifdef HAVE_GNUTLS_GNUTLS_H
142 
143 int
144 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
145 {
146  int rc = 0;
147  int pollrc = 0;
148  time_t start = time(NULL);
149 
150  do {
151  rc = gnutls_handshake(*remote->tls_session);
152  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
153  pollrc = crm_remote_ready(remote, 1000);
154  if (pollrc < 0) {
155  /* poll returned error, there is no hope */
156  rc = -1;
157  }
158  }
159 
160  } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
161  (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
162 
163  if (rc < 0) {
164  crm_trace("gnutls_handshake() failed with %d", rc);
165  }
166  return rc;
167 }
168 
169 void *
170 crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ,
171  void *credentials)
172 {
173  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
174 
175  gnutls_init(session, type);
176 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
177 /* http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication */
178  gnutls_priority_set_direct(*session, "NORMAL:+ANON-DH", NULL);
179 /* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
180 # else
181  gnutls_set_default_priority(*session);
182  gnutls_kx_set_priority(*session, anon_tls_kx_order);
183 # endif
184  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
185  switch (type) {
186  case GNUTLS_SERVER:
187  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
188  (gnutls_anon_server_credentials_t) credentials);
189  break;
190  case GNUTLS_CLIENT:
191  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
192  (gnutls_anon_client_credentials_t) credentials);
193  break;
194  }
195 
196  return session;
197 }
198 
199 void *
200 create_psk_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ , void *credentials)
201 {
202  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
203 
204  gnutls_init(session, type);
205 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
206  gnutls_priority_set_direct(*session, "NORMAL:+DHE-PSK:+PSK", NULL);
207 # else
208  gnutls_set_default_priority(*session);
209  gnutls_kx_set_priority(*session, psk_tls_kx_order);
210 # endif
211  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
212  switch (type) {
213  case GNUTLS_SERVER:
214  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
215  (gnutls_psk_server_credentials_t) credentials);
216  break;
217  case GNUTLS_CLIENT:
218  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
219  (gnutls_psk_client_credentials_t) credentials);
220  break;
221  }
222 
223  return session;
224 }
225 
226 static int
227 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
228 {
229  const char *unsent = buf;
230  int rc = 0;
231  int total_send;
232 
233  if (buf == NULL) {
234  return -1;
235  }
236 
237  total_send = len;
238  crm_trace("Message size: %llu", (unsigned long long) len);
239 
240  while (TRUE) {
241  rc = gnutls_record_send(*session, unsent, len);
242 
243  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
244  crm_debug("Retry");
245 
246  } else if (rc < 0) {
247  crm_err("Connection terminated rc = %d", rc);
248  break;
249 
250  } else if (rc < len) {
251  crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
252  len -= rc;
253  unsent += rc;
254  } else {
255  crm_trace("Sent all %d bytes", rc);
256  break;
257  }
258  }
259 
260  return rc < 0 ? rc : total_send;
261 }
262 #endif
263 
264 static int
265 crm_send_plaintext(int sock, const char *buf, size_t len)
266 {
267 
268  int rc = 0;
269  const char *unsent = buf;
270  int total_send;
271 
272  if (buf == NULL) {
273  return -1;
274  }
275  total_send = len;
276 
277  crm_trace("Message on socket %d: size=%llu",
278  sock, (unsigned long long) len);
279  retry:
280  rc = write(sock, unsent, len);
281  if (rc < 0) {
282  switch (errno) {
283  case EINTR:
284  case EAGAIN:
285  crm_trace("Retry");
286  goto retry;
287  default:
288  crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
289  break;
290  }
291 
292  } else if (rc < len) {
293  crm_trace("Only sent %d of %llu remaining bytes",
294  rc, (unsigned long long) len);
295  len -= rc;
296  unsent += rc;
297  goto retry;
298 
299  } else {
300  crm_trace("Sent %d bytes: %.100s", rc, buf);
301  }
302 
303  return rc < 0 ? rc : total_send;
304 
305 }
306 
307 static int
308 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
309 {
310  int lpc = 0;
311  int rc = -ESOCKTNOSUPPORT;
312 
313  for(; lpc < iovs; lpc++) {
314 
315 #ifdef HAVE_GNUTLS_GNUTLS_H
316  if (remote->tls_session) {
317  rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
318  } else if (remote->tcp_socket) {
319 #else
320  if (remote->tcp_socket) {
321 #endif
322  rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
323 
324  } else {
325  crm_err("Unsupported connection type");
326  }
327  }
328  return rc;
329 }
330 
331 int
332 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
333 {
334  int rc = -1;
335  static uint64_t id = 0;
336  char *xml_text = dump_xml_unformatted(msg);
337 
338  struct iovec iov[2];
339  struct crm_remote_header_v0 *header;
340 
341  if (xml_text == NULL) {
342  crm_err("Invalid XML, can not send msg");
343  return -1;
344  }
345 
346  header = calloc(1, sizeof(struct crm_remote_header_v0));
347  iov[0].iov_base = (caddr_t)header;
348  iov[0].iov_len = sizeof(struct crm_remote_header_v0);
349 
350  iov[1].iov_base = xml_text;
351  iov[1].iov_len = 1 + strlen(xml_text);
352 
353  id++;
354  header->id = id;
355  header->endian = ENDIAN_LOCAL;
356  header->version = REMOTE_MSG_VERSION;
357  header->payload_offset = iov[0].iov_len;
358  header->payload_uncompressed = iov[1].iov_len;
359  header->size_total = iov[0].iov_len + iov[1].iov_len;
360 
361  crm_trace("Sending len[0]=%d, start=%x",
362  (int)iov[0].iov_len, *(int*)(void*)xml_text);
363  rc = crm_remote_sendv(remote, iov, 2);
364  if (rc < 0) {
365  crm_err("Failed to send remote msg, rc = %d", rc);
366  }
367 
368  free(iov[0].iov_base);
369  free(iov[1].iov_base);
370  return rc;
371 }
372 
373 
379 xmlNode *
381 {
382  xmlNode *xml = NULL;
383  struct crm_remote_header_v0 *header = crm_remote_header(remote);
384 
385  if (remote->buffer == NULL || header == NULL) {
386  return NULL;
387  }
388 
389  /* Support compression on the receiving end now, in case we ever want to add it later */
390  if (header->payload_compressed) {
391  int rc = 0;
392  unsigned int size_u = 1 + header->payload_uncompressed;
393  char *uncompressed = calloc(1, header->payload_offset + size_u);
394 
395  crm_trace("Decompressing message data %d bytes into %d bytes",
396  header->payload_compressed, size_u);
397 
398  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
399  remote->buffer + header->payload_offset,
400  header->payload_compressed, 1, 0);
401 
402  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
403  crm_warn("Couldn't decompress v%d message, we only understand v%d",
404  header->version, REMOTE_MSG_VERSION);
405  free(uncompressed);
406  return NULL;
407 
408  } else if (rc != BZ_OK) {
409  crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
410  free(uncompressed);
411  return NULL;
412  }
413 
414  CRM_ASSERT(size_u == header->payload_uncompressed);
415 
416  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
417  remote->buffer_size = header->payload_offset + size_u;
418 
419  free(remote->buffer);
420  remote->buffer = uncompressed;
421  header = crm_remote_header(remote);
422  }
423 
424  /* take ownership of the buffer */
425  remote->buffer_offset = 0;
426 
427  CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
428 
429  xml = string2xml(remote->buffer + header->payload_offset);
430  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
431  crm_warn("Couldn't parse v%d message, we only understand v%d",
432  header->version, REMOTE_MSG_VERSION);
433 
434  } else if (xml == NULL) {
435  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
436  }
437 
438  return xml;
439 }
440 
450 int
451 crm_remote_ready(crm_remote_t *remote, int total_timeout)
452 {
453  struct pollfd fds = { 0, };
454  int sock = 0;
455  int rc = 0;
456  time_t start;
457  int timeout = total_timeout;
458 
459 #ifdef HAVE_GNUTLS_GNUTLS_H
460  if (remote->tls_session) {
461  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
462 
463  sock = GPOINTER_TO_INT(sock_ptr);
464  } else if (remote->tcp_socket) {
465 #else
466  if (remote->tcp_socket) {
467 #endif
468  sock = remote->tcp_socket;
469  } else {
470  crm_err("Unsupported connection type");
471  }
472 
473  if (sock <= 0) {
474  crm_trace("No longer connected");
475  return -ENOTCONN;
476  }
477 
478  start = time(NULL);
479  errno = 0;
480  do {
481  fds.fd = sock;
482  fds.events = POLLIN;
483 
484  /* If we got an EINTR while polling, and we have a
485  * specific timeout we are trying to honor, attempt
486  * to adjust the timeout to the closest second. */
487  if (errno == EINTR && (timeout > 0)) {
488  timeout = total_timeout - ((time(NULL) - start) * 1000);
489  if (timeout < 1000) {
490  timeout = 1000;
491  }
492  }
493 
494  rc = poll(&fds, 1, timeout);
495  } while (rc < 0 && errno == EINTR);
496 
497  return (rc < 0)? -errno : rc;
498 }
499 
500 
511 static size_t
512 crm_remote_recv_once(crm_remote_t * remote)
513 {
514  int rc = 0;
515  size_t read_len = sizeof(struct crm_remote_header_v0);
516  struct crm_remote_header_v0 *header = crm_remote_header(remote);
517 
518  if(header) {
519  /* Stop at the end of the current message */
520  read_len = header->size_total;
521  }
522 
523  /* automatically grow the buffer when needed */
524  if(remote->buffer_size < read_len) {
525  remote->buffer_size = 2 * read_len;
526  crm_trace("Expanding buffer to %llu bytes",
527  (unsigned long long) remote->buffer_size);
528 
529  remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
530  CRM_ASSERT(remote->buffer != NULL);
531  }
532 
533 #ifdef HAVE_GNUTLS_GNUTLS_H
534  if (remote->tls_session) {
535  rc = gnutls_record_recv(*(remote->tls_session),
536  remote->buffer + remote->buffer_offset,
537  remote->buffer_size - remote->buffer_offset);
538  if (rc == GNUTLS_E_INTERRUPTED) {
539  rc = -EINTR;
540  } else if (rc == GNUTLS_E_AGAIN) {
541  rc = -EAGAIN;
542  } else if (rc < 0) {
543  crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
544  rc = -pcmk_err_generic;
545  }
546  } else if (remote->tcp_socket) {
547 #else
548  if (remote->tcp_socket) {
549 #endif
550  errno = 0;
551  rc = read(remote->tcp_socket,
552  remote->buffer + remote->buffer_offset,
553  remote->buffer_size - remote->buffer_offset);
554  if(rc < 0) {
555  rc = -errno;
556  }
557 
558  } else {
559  crm_err("Unsupported connection type");
560  return -ESOCKTNOSUPPORT;
561  }
562 
563  /* process any errors. */
564  if (rc > 0) {
565  remote->buffer_offset += rc;
566  /* always null terminate buffer, the +1 to alloc always allows for this. */
567  remote->buffer[remote->buffer_offset] = '\0';
568  crm_trace("Received %u more bytes, %llu total",
569  rc, (unsigned long long) remote->buffer_offset);
570 
571  } else if (rc == -EINTR || rc == -EAGAIN) {
572  crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
573 
574  } else if (rc == 0) {
575  crm_debug("EOF encoutered after %llu bytes",
576  (unsigned long long) remote->buffer_offset);
577  return -ENOTCONN;
578 
579  } else {
580  crm_debug("Error receiving message after %llu bytes: %s (%d)",
581  (unsigned long long) remote->buffer_offset,
582  pcmk_strerror(rc), rc);
583  return -ENOTCONN;
584  }
585 
586  header = crm_remote_header(remote);
587  if(header) {
588  if(remote->buffer_offset < header->size_total) {
589  crm_trace("Read less than the advertised length: %llu < %u bytes",
590  (unsigned long long) remote->buffer_offset,
591  header->size_total);
592  } else {
593  crm_trace("Read full message of %llu bytes",
594  (unsigned long long) remote->buffer_offset);
595  return remote->buffer_offset;
596  }
597  }
598 
599  return -EAGAIN;
600 }
601 
609 gboolean
610 crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconnected)
611 {
612  int rc;
613  time_t start = time(NULL);
614  int remaining_timeout = 0;
615 
616  if (total_timeout == 0) {
617  total_timeout = 10000;
618  } else if (total_timeout < 0) {
619  total_timeout = 60000;
620  }
621  *disconnected = 0;
622 
623  remaining_timeout = total_timeout;
624  while ((remaining_timeout > 0) && !(*disconnected)) {
625 
626  /* read some more off the tls buffer if we still have time left. */
627  crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d",
628  total_timeout, remaining_timeout);
629  rc = crm_remote_ready(remote, remaining_timeout);
630 
631  if (rc == 0) {
632  crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
633  return FALSE;
634 
635  } else if(rc < 0) {
636  crm_debug("could not poll: %s (%d)", pcmk_strerror(rc), rc);
637 
638  } else {
639  rc = crm_remote_recv_once(remote);
640  if(rc > 0) {
641  return TRUE;
642  } else if (rc < 0) {
643  crm_debug("recv() failed: %s (%d)", pcmk_strerror(rc), rc);
644  }
645  }
646 
647  if(rc == -ENOTCONN) {
648  *disconnected = 1;
649  return FALSE;
650  }
651 
652  remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
653  }
654 
655  return FALSE;
656 }
657 
658 struct tcp_async_cb_data {
659  gboolean success;
660  int sock;
661  void *userdata;
662  void (*callback) (void *userdata, int sock);
663  int timeout; /*ms */
664  time_t start;
665 };
666 
667 static gboolean
668 check_connect_finished(gpointer userdata)
669 {
670  struct tcp_async_cb_data *cb_data = userdata;
671  int rc = 0;
672  int sock = cb_data->sock;
673  int error = 0;
674 
675  fd_set rset, wset;
676  socklen_t len = sizeof(error);
677  struct timeval ts = { 0, };
678 
679  if (cb_data->success == TRUE) {
680  goto dispatch_done;
681  }
682 
683  FD_ZERO(&rset);
684  FD_SET(sock, &rset);
685  wset = rset;
686 
687  crm_trace("fd %d: checking to see if connect finished", sock);
688  rc = select(sock + 1, &rset, &wset, NULL, &ts);
689 
690  if (rc < 0) {
691  rc = errno;
692  if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
693  /* reschedule if there is still time left */
694  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
695  goto reschedule;
696  } else {
697  rc = -ETIMEDOUT;
698  }
699  }
700  crm_trace("fd %d: select failed %d connect dispatch ", sock, rc);
701  goto dispatch_done;
702  } else if (rc == 0) {
703  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
704  goto reschedule;
705  }
706  crm_debug("fd %d: timeout during select", sock);
707  rc = -ETIMEDOUT;
708  goto dispatch_done;
709  } else {
710  crm_trace("fd %d: select returned success", sock);
711  rc = 0;
712  }
713 
714  /* can we read or write to the socket now? */
715  if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
716  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
717  crm_trace("fd %d: call to getsockopt failed", sock);
718  rc = -1;
719  goto dispatch_done;
720  }
721 
722  if (error) {
723  crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
724  rc = -1;
725  goto dispatch_done;
726  }
727  } else {
728  crm_trace("neither read nor write set after select");
729  rc = -1;
730  goto dispatch_done;
731  }
732 
733  dispatch_done:
734  if (!rc) {
735  crm_trace("fd %d: connected", sock);
736  /* Success, set the return code to the sock to report to the callback */
737  rc = cb_data->sock;
738  cb_data->sock = 0;
739  } else {
740  close(sock);
741  }
742 
743  if (cb_data->callback) {
744  cb_data->callback(cb_data->userdata, rc);
745  }
746  free(cb_data);
747  return FALSE;
748 
749  reschedule:
750 
751  /* will check again next interval */
752  return TRUE;
753 }
754 
755 static int
756 internal_tcp_connect_async(int sock,
757  const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
758  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
759 {
760  int rc = 0;
761  int flag = 0;
762  int interval = 500;
763  int timer;
764  struct tcp_async_cb_data *cb_data = NULL;
765 
766  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
767  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
768  crm_err("fcntl() write failed");
769  return -1;
770  }
771  }
772 
773  rc = connect(sock, addr, addrlen);
774 
775  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
776  return -1;
777  }
778 
779  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
780  cb_data->userdata = userdata;
781  cb_data->callback = callback;
782  cb_data->sock = sock;
783  cb_data->timeout = timeout;
784  cb_data->start = time(NULL);
785 
786  if (rc == 0) {
787  /* The connect was successful immediately, we still return to mainloop
788  * and let this callback get called later. This avoids the user of this api
789  * to have to account for the fact the callback could be invoked within this
790  * function before returning. */
791  cb_data->success = TRUE;
792  interval = 1;
793  }
794 
795  /* Check connect finished is mostly doing a non-block poll on the socket
796  * to see if we can read/write to it. Once we can, the connect has completed.
797  * This method allows us to connect to the server without blocking mainloop.
798  *
799  * This is a poor man's way of polling to see when the connection finished.
800  * At some point we should figure out a way to use a mainloop fd callback for this.
801  * Something about the way mainloop is currently polling prevents this from working at the
802  * moment though. */
803  crm_trace("fd %d: scheduling to check if connect finished in %dms second", sock, interval);
804  timer = g_timeout_add(interval, check_connect_finished, cb_data);
805  if (timer_id) {
806  *timer_id = timer;
807  }
808 
809  return 0;
810 }
811 
812 static int
813 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
814 {
815  int flag = 0;
816  int rc = connect(sock, addr, addrlen);
817 
818  if (rc == 0) {
819  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
820  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
821  crm_err("fcntl() write failed");
822  return -1;
823  }
824  }
825  }
826 
827  return rc;
828 }
829 
836 int
837 crm_remote_tcp_connect_async(const char *host, int port, int timeout, /*ms */
838  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
839 {
840  char buffer[256];
841  struct addrinfo *res = NULL;
842  struct addrinfo *rp = NULL;
843  struct addrinfo hints;
844  const char *server = host;
845  int ret_ga;
846  int sock = -1;
847 
848  /* getaddrinfo */
849  memset(&hints, 0, sizeof(struct addrinfo));
850  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
851  hints.ai_socktype = SOCK_STREAM;
852  hints.ai_flags = AI_CANONNAME;
853 
854  crm_debug("Looking up %s", server);
855  ret_ga = getaddrinfo(server, NULL, &hints, &res);
856  if (ret_ga) {
857  crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
858  return -1;
859  }
860 
861  if (!res || !res->ai_addr) {
862  crm_err("getaddrinfo failed");
863  goto async_cleanup;
864  }
865 
866  for (rp = res; rp != NULL; rp = rp->ai_next) {
867  struct sockaddr *addr = rp->ai_addr;
868 
869  if (!addr) {
870  continue;
871  }
872 
873  if (rp->ai_canonname) {
874  server = res->ai_canonname;
875  }
876  crm_debug("Got address %s for %s", server, host);
877 
878  /* create socket */
879  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
880  if (sock == -1) {
881  crm_err("Socket creation failed for remote client connection.");
882  continue;
883  }
884 
885  memset(buffer, 0, DIMOF(buffer));
886  if (addr->sa_family == AF_INET6) {
887  struct sockaddr_in6 *addr_in = (struct sockaddr_in6 *)(void*)addr;
888 
889  addr_in->sin6_port = htons(port);
890  inet_ntop(addr->sa_family, &addr_in->sin6_addr, buffer, DIMOF(buffer));
891 
892  } else {
893  struct sockaddr_in *addr_in = (struct sockaddr_in *)(void*)addr;
894 
895  addr_in->sin_port = htons(port);
896  inet_ntop(addr->sa_family, &addr_in->sin_addr, buffer, DIMOF(buffer));
897  }
898 
899  crm_info("Attempting to connect to remote server at %s:%d", buffer, port);
900 
901  if (callback) {
902  if (internal_tcp_connect_async
903  (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
904  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
905  }
906 
907  } else {
908  if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
909  break; /* Success */
910  }
911  }
912 
913  close(sock);
914  sock = -1;
915  }
916 
917 async_cleanup:
918 
919  if (res) {
920  freeaddrinfo(res);
921  }
922  return sock;
923 }
924 
925 int
926 crm_remote_tcp_connect(const char *host, int port)
927 {
928  return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
929 }
930 
931 
932 /* Convert a struct sockaddr address to a string, IPv4 and IPv6: */
933 
934 static char *
935 get_ip_str(const struct sockaddr_storage * sa, char * s, size_t maxlen)
936 {
937  switch(((struct sockaddr *)sa)->sa_family) {
938  case AF_INET:
939  inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
940  s, maxlen);
941  break;
942 
943  case AF_INET6:
944  inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
945  s, maxlen);
946  break;
947 
948  default:
949  strncpy(s, "Unknown AF", maxlen);
950  return NULL;
951  }
952 
953  return s;
954 }
955 
956 int
958 {
959  int csock = 0;
960  int rc = 0;
961  int flag = 0;
962  unsigned laddr = 0;
963  struct sockaddr_storage addr;
964  char addr_str[INET6_ADDRSTRLEN];
965 #ifdef TCP_USER_TIMEOUT
966  int optval;
967  long sbd_timeout = crm_get_sbd_timeout();
968 #endif
969 
970  /* accept the connection */
971  laddr = sizeof(addr);
972  memset(&addr, 0, sizeof(addr));
973  csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
974  get_ip_str(&addr, addr_str, INET6_ADDRSTRLEN);
975  crm_info("New remote connection from %s", addr_str);
976 
977  if (csock == -1) {
978  crm_err("accept socket failed");
979  return -1;
980  }
981 
982  if ((flag = fcntl(csock, F_GETFL)) >= 0) {
983  if ((rc = fcntl(csock, F_SETFL, flag | O_NONBLOCK)) < 0) {
984  crm_err("fcntl() write failed");
985  close(csock);
986  return rc;
987  }
988  } else {
989  crm_err("fcntl() read failed");
990  close(csock);
991  return flag;
992  }
993 
994 #ifdef TCP_USER_TIMEOUT
995  if (sbd_timeout > 0) {
996  optval = sbd_timeout / 2; /* time to fail and retry before watchdog */
997  rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
998  &optval, sizeof(optval));
999  if (rc < 0) {
1000  crm_err("setting TCP_USER_TIMEOUT (%d) on client socket failed",
1001  optval);
1002  close(csock);
1003  return rc;
1004  }
1005  }
1006 #endif
1007 
1008  return csock;
1009 }
A dumping ground.
size_t buffer_offset
Definition: ipcs.h:45
long crm_get_sbd_timeout(void)
Definition: watchdog.c:246
uint32_t payload_compressed
Definition: remote.c:124
const char * pcmk_strerror(int rc)
Definition: logging.c:1128
char * buffer
Definition: ipcs.h:43
AIS_Host host
Definition: internal.h:52
uint32_t payload_uncompressed
Definition: remote.c:125
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:150
#define ENDIAN_LOCAL
Definition: remote.c:93
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:118
xmlNode * string2xml(const char *input)
Definition: xml.c:2696
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
Definition: remote.c:332
int crm_remote_tcp_connect(const char *host, int port)
Definition: remote.c:926
#define crm_warn(fmt, args...)
Definition: logging.h:249
#define crm_debug(fmt, args...)
Definition: logging.h:253
int crm_initiate_client_tls_handshake(crm_remote_t *remote, int timeout_ms)
void gnutls_session_t
Definition: cib_remote.c:52
int crm_remote_accept(int ssock)
Definition: remote.c:957
#define crm_trace(fmt, args...)
Definition: logging.h:254
uint64_t id
Definition: remote.c:120
int crm_remote_ready(crm_remote_t *remote, int total_timeout)
Definition: remote.c:451
#define __swab64(x)
Definition: remote.c:81
Wrappers for and extensions to libxml2.
#define pcmk_err_generic
Definition: error.h:45
uint32_t payload_offset
Definition: remote.c:123
struct tcp_async_cb_data __attribute__
uint32_t size_total
Definition: remote.c:122
#define __swab32(x)
Definition: remote.c:75
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:226
size_t buffer_size
Definition: ipcs.h:44
#define REMOTE_MSG_VERSION
Definition: remote.c:92
#define crm_err(fmt, args...)
Definition: logging.h:248
const char * bz2_strerror(int rc)
Definition: logging.c:1191
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3726
#define DIMOF(a)
Definition: crm.h:39
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
Definition: remote.c:380
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:35
void * create_psk_tls_session(int csock, int type, void *credentials)
void * crm_create_anon_tls_session(int sock, int type, void *credentials)
int tcp_socket
Definition: ipcs.h:47
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
Definition: remote.c:837
#define crm_info(fmt, args...)
Definition: logging.h:251
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
Definition: remote.c:610
uint32_t version
Definition: remote.c:119
uint64_t flags
Definition: remote.c:121
enum crm_ais_msg_types type
Definition: internal.h:51