22 #include <sys/param.h> 24 #include <sys/types.h> 27 #include <sys/socket.h> 28 #include <arpa/inet.h> 29 #include <netinet/in.h> 30 #include <netinet/ip.h> 31 #include <netinet/tcp.h> 45 #ifdef HAVE_GNUTLS_GNUTLS_H 47 # include <gnutls/gnutls.h> 49 const int psk_tls_kx_order[] = {
54 const int anon_tls_kx_order[] = {
64 #ifdef HAVE_LINUX_SWAB_H 65 # include <linux/swab.h> 71 #define __swab16(x) ((uint16_t)( \ 72 (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \ 73 (((uint16_t)(x) & (uint16_t)0xff00U) >> 8))) 75 #define __swab32(x) ((uint32_t)( \ 76 (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \ 77 (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \ 78 (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \ 79 (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24))) 81 #define __swab64(x) ((uint64_t)( \ 82 (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \ 83 (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \ 84 (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \ 85 (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \ 86 (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \ 87 (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \ 88 (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \ 89 (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56))) 92 #define REMOTE_MSG_VERSION 1 93 #define ENDIAN_LOCAL 0xBADADBBD 95 struct crm_remote_header_v0
110 static struct crm_remote_header_v0 *
113 struct crm_remote_header_v0 *header = (
struct crm_remote_header_v0 *)remote->
buffer;
114 if(remote->
buffer_offset <
sizeof(
struct crm_remote_header_v0)) {
122 crm_err(
"Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
128 header->flags =
__swab64(header->flags);
129 header->endian =
__swab32(header->endian);
131 header->version =
__swab32(header->version);
132 header->size_total =
__swab32(header->size_total);
133 header->payload_offset =
__swab32(header->payload_offset);
134 header->payload_compressed =
__swab32(header->payload_compressed);
135 header->payload_uncompressed =
__swab32(header->payload_uncompressed);
141 #ifdef HAVE_GNUTLS_GNUTLS_H 148 time_t start = time(NULL);
151 rc = gnutls_handshake(*remote->tls_session);
152 if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
160 }
while (((time(NULL) - start) < (timeout_ms / 1000)) &&
161 (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
164 crm_trace(
"gnutls_handshake() failed with %d", rc);
175 gnutls_init(session, type);
176 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT 178 gnutls_priority_set_direct(*session,
"NORMAL:+ANON-DH", NULL);
181 gnutls_set_default_priority(*session);
182 gnutls_kx_set_priority(*session, anon_tls_kx_order);
184 gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
187 gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
188 (gnutls_anon_server_credentials_t) credentials);
191 gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
192 (gnutls_anon_client_credentials_t) credentials);
204 gnutls_init(session, type);
205 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT 206 gnutls_priority_set_direct(*session,
"NORMAL:+DHE-PSK:+PSK", NULL);
208 gnutls_set_default_priority(*session);
209 gnutls_kx_set_priority(*session, psk_tls_kx_order);
211 gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
214 gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
215 (gnutls_psk_server_credentials_t) credentials);
218 gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
219 (gnutls_psk_client_credentials_t) credentials);
229 const char *unsent = buf;
238 crm_trace(
"Message size: %llu", (
unsigned long long) len);
241 rc = gnutls_record_send(*session, unsent, len);
243 if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
247 crm_err(
"Connection terminated rc = %d", rc);
250 }
else if (rc < len) {
251 crm_debug(
"Sent %d of %llu bytes", rc, (
unsigned long long) len);
260 return rc < 0 ? rc : total_send;
265 crm_send_plaintext(
int sock,
const char *buf,
size_t len)
269 const char *unsent = buf;
277 crm_trace(
"Message on socket %d: size=%llu",
278 sock, (
unsigned long long) len);
280 rc = write(sock, unsent, len);
288 crm_perror(LOG_ERR,
"Could only write %d of the remaining %d bytes", rc, (
int)len);
292 }
else if (rc < len) {
293 crm_trace(
"Only sent %d of %llu remaining bytes",
294 rc, (
unsigned long long) len);
300 crm_trace(
"Sent %d bytes: %.100s", rc, buf);
303 return rc < 0 ? rc : total_send;
308 crm_remote_sendv(
crm_remote_t * remote,
struct iovec * iov,
int iovs)
311 int rc = -ESOCKTNOSUPPORT;
313 for(; lpc < iovs; lpc++) {
315 #ifdef HAVE_GNUTLS_GNUTLS_H 316 if (remote->tls_session) {
317 rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
322 rc = crm_send_plaintext(remote->
tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
325 crm_err(
"Unsupported connection type");
335 static uint64_t
id = 0;
339 struct crm_remote_header_v0 *header;
341 if (xml_text == NULL) {
342 crm_err(
"Invalid XML, can not send msg");
346 header = calloc(1,
sizeof(
struct crm_remote_header_v0));
347 iov[0].iov_base = (caddr_t)header;
348 iov[0].iov_len =
sizeof(
struct crm_remote_header_v0);
350 iov[1].iov_base = xml_text;
351 iov[1].iov_len = 1 + strlen(xml_text);
357 header->payload_offset = iov[0].iov_len;
358 header->payload_uncompressed = iov[1].iov_len;
359 header->size_total = iov[0].iov_len + iov[1].iov_len;
362 (
int)iov[0].iov_len, *(
int*)(
void*)xml_text);
363 rc = crm_remote_sendv(remote, iov, 2);
365 crm_err(
"Failed to send remote msg, rc = %d", rc);
368 free(iov[0].iov_base);
369 free(iov[1].iov_base);
383 struct crm_remote_header_v0 *header = crm_remote_header(remote);
385 if (remote->
buffer == NULL || header == NULL) {
390 if (header->payload_compressed) {
392 unsigned int size_u = 1 + header->payload_uncompressed;
393 char *uncompressed = calloc(1, header->payload_offset + size_u);
395 crm_trace(
"Decompressing message data %d bytes into %d bytes",
396 header->payload_compressed, size_u);
398 rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
399 remote->
buffer + header->payload_offset,
400 header->payload_compressed, 1, 0);
403 crm_warn(
"Couldn't decompress v%d message, we only understand v%d",
408 }
else if (rc != BZ_OK) {
414 CRM_ASSERT(size_u == header->payload_uncompressed);
416 memcpy(uncompressed, remote->
buffer, header->payload_offset);
417 remote->
buffer_size = header->payload_offset + size_u;
420 remote->
buffer = uncompressed;
421 header = crm_remote_header(remote);
427 CRM_LOG_ASSERT(remote->
buffer[
sizeof(
struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
431 crm_warn(
"Couldn't parse v%d message, we only understand v%d",
434 }
else if (xml == NULL) {
435 crm_err(
"Couldn't parse: '%.120s'", remote->
buffer + header->payload_offset);
453 struct pollfd fds = { 0, };
457 int timeout = total_timeout;
459 #ifdef HAVE_GNUTLS_GNUTLS_H 460 if (remote->tls_session) {
461 void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
463 sock = GPOINTER_TO_INT(sock_ptr);
470 crm_err(
"Unsupported connection type");
487 if (errno == EINTR && (timeout > 0)) {
488 timeout = total_timeout - ((time(NULL) - start) * 1000);
489 if (timeout < 1000) {
494 rc = poll(&fds, 1, timeout);
495 }
while (rc < 0 && errno == EINTR);
497 return (rc < 0)? -errno : rc;
515 size_t read_len =
sizeof(
struct crm_remote_header_v0);
516 struct crm_remote_header_v0 *header = crm_remote_header(remote);
520 read_len = header->size_total;
526 crm_trace(
"Expanding buffer to %llu bytes",
533 #ifdef HAVE_GNUTLS_GNUTLS_H 534 if (remote->tls_session) {
535 rc = gnutls_record_recv(*(remote->tls_session),
538 if (rc == GNUTLS_E_INTERRUPTED) {
540 }
else if (rc == GNUTLS_E_AGAIN) {
543 crm_debug(
"TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
559 crm_err(
"Unsupported connection type");
560 return -ESOCKTNOSUPPORT;
568 crm_trace(
"Received %u more bytes, %llu total",
571 }
else if (rc == -EINTR || rc == -EAGAIN) {
574 }
else if (rc == 0) {
575 crm_debug(
"EOF encoutered after %llu bytes",
580 crm_debug(
"Error receiving message after %llu bytes: %s (%d)",
586 header = crm_remote_header(remote);
589 crm_trace(
"Read less than the advertised length: %llu < %u bytes",
593 crm_trace(
"Read full message of %llu bytes",
613 time_t start = time(NULL);
614 int remaining_timeout = 0;
616 if (total_timeout == 0) {
617 total_timeout = 10000;
618 }
else if (total_timeout < 0) {
619 total_timeout = 60000;
623 remaining_timeout = total_timeout;
624 while ((remaining_timeout > 0) && !(*disconnected)) {
627 crm_trace(
"waiting to receive remote msg, starting timeout %d, remaining_timeout %d",
628 total_timeout, remaining_timeout);
632 crm_err(
"poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
639 rc = crm_remote_recv_once(remote);
647 if(rc == -ENOTCONN) {
652 remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
658 struct tcp_async_cb_data {
662 void (*callback) (
void *userdata,
int sock);
668 check_connect_finished(gpointer userdata)
670 struct tcp_async_cb_data *cb_data = userdata;
672 int sock = cb_data->sock;
676 socklen_t len =
sizeof(error);
677 struct timeval ts = { 0, };
679 if (cb_data->success == TRUE) {
687 crm_trace(
"fd %d: checking to see if connect finished", sock);
688 rc = select(sock + 1, &rset, &wset, NULL, &ts);
692 if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
694 if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
700 crm_trace(
"fd %d: select failed %d connect dispatch ", sock, rc);
702 }
else if (rc == 0) {
703 if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
706 crm_debug(
"fd %d: timeout during select", sock);
710 crm_trace(
"fd %d: select returned success", sock);
715 if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
716 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
717 crm_trace(
"fd %d: call to getsockopt failed", sock);
723 crm_trace(
"fd %d: error returned from getsockopt: %d", sock, error);
728 crm_trace(
"neither read nor write set after select");
743 if (cb_data->callback) {
744 cb_data->callback(cb_data->userdata, rc);
756 internal_tcp_connect_async(
int sock,
757 const struct sockaddr *addr, socklen_t addrlen,
int timeout ,
758 int *timer_id,
void *userdata,
void (*callback) (
void *userdata,
int sock))
764 struct tcp_async_cb_data *cb_data = NULL;
766 if ((flag = fcntl(sock, F_GETFL)) >= 0) {
767 if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
768 crm_err(
"fcntl() write failed");
773 rc = connect(sock, addr, addrlen);
775 if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
779 cb_data = calloc(1,
sizeof(
struct tcp_async_cb_data));
780 cb_data->userdata = userdata;
781 cb_data->callback = callback;
782 cb_data->sock = sock;
783 cb_data->timeout = timeout;
784 cb_data->start = time(NULL);
791 cb_data->success = TRUE;
803 crm_trace(
"fd %d: scheduling to check if connect finished in %dms second", sock, interval);
804 timer = g_timeout_add(interval, check_connect_finished, cb_data);
813 internal_tcp_connect(
int sock,
const struct sockaddr *addr, socklen_t addrlen)
816 int rc = connect(sock, addr, addrlen);
819 if ((flag = fcntl(sock, F_GETFL)) >= 0) {
820 if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
821 crm_err(
"fcntl() write failed");
838 int *timer_id,
void *userdata,
void (*callback) (
void *userdata,
int sock))
841 struct addrinfo *res = NULL;
842 struct addrinfo *rp = NULL;
843 struct addrinfo hints;
844 const char *server =
host;
849 memset(&hints, 0,
sizeof(
struct addrinfo));
850 hints.ai_family = AF_UNSPEC;
851 hints.ai_socktype = SOCK_STREAM;
852 hints.ai_flags = AI_CANONNAME;
855 ret_ga = getaddrinfo(server, NULL, &hints, &res);
857 crm_err(
"getaddrinfo: %s", gai_strerror(ret_ga));
861 if (!res || !res->ai_addr) {
866 for (rp = res; rp != NULL; rp = rp->ai_next) {
867 struct sockaddr *addr = rp->ai_addr;
873 if (rp->ai_canonname) {
874 server = res->ai_canonname;
876 crm_debug(
"Got address %s for %s", server, host);
879 sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
881 crm_err(
"Socket creation failed for remote client connection.");
885 memset(buffer, 0,
DIMOF(buffer));
886 if (addr->sa_family == AF_INET6) {
887 struct sockaddr_in6 *addr_in = (
struct sockaddr_in6 *)(
void*)addr;
889 addr_in->sin6_port = htons(port);
890 inet_ntop(addr->sa_family, &addr_in->sin6_addr, buffer,
DIMOF(buffer));
893 struct sockaddr_in *addr_in = (
struct sockaddr_in *)(
void*)addr;
895 addr_in->sin_port = htons(port);
896 inet_ntop(addr->sa_family, &addr_in->sin_addr, buffer,
DIMOF(buffer));
899 crm_info(
"Attempting to connect to remote server at %s:%d", buffer, port);
902 if (internal_tcp_connect_async
903 (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
908 if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
935 get_ip_str(
const struct sockaddr_storage * sa,
char * s,
size_t maxlen)
937 switch(((
struct sockaddr *)sa)->sa_family) {
939 inet_ntop(AF_INET, &(((
struct sockaddr_in *)sa)->sin_addr),
944 inet_ntop(AF_INET6, &(((
struct sockaddr_in6 *)sa)->sin6_addr),
949 strncpy(s,
"Unknown AF", maxlen);
963 struct sockaddr_storage addr;
964 char addr_str[INET6_ADDRSTRLEN];
965 #ifdef TCP_USER_TIMEOUT 971 laddr =
sizeof(addr);
972 memset(&addr, 0,
sizeof(addr));
973 csock = accept(ssock, (
struct sockaddr *)&addr, &laddr);
974 get_ip_str(&addr, addr_str, INET6_ADDRSTRLEN);
975 crm_info(
"New remote connection from %s", addr_str);
978 crm_err(
"accept socket failed");
982 if ((flag = fcntl(csock, F_GETFL)) >= 0) {
983 if ((rc = fcntl(csock, F_SETFL, flag | O_NONBLOCK)) < 0) {
984 crm_err(
"fcntl() write failed");
989 crm_err(
"fcntl() read failed");
994 #ifdef TCP_USER_TIMEOUT 995 if (sbd_timeout > 0) {
996 optval = sbd_timeout / 2;
997 rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
998 &optval,
sizeof(optval));
1000 crm_err(
"setting TCP_USER_TIMEOUT (%d) on client socket failed",
long crm_get_sbd_timeout(void)
uint32_t payload_compressed
const char * pcmk_strerror(int rc)
uint32_t payload_uncompressed
#define CRM_LOG_ASSERT(expr)
Wrappers for and extensions to glib mainloop.
xmlNode * string2xml(const char *input)
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
int crm_remote_tcp_connect(const char *host, int port)
#define crm_warn(fmt, args...)
#define crm_debug(fmt, args...)
int crm_initiate_client_tls_handshake(crm_remote_t *remote, int timeout_ms)
int crm_remote_accept(int ssock)
#define crm_trace(fmt, args...)
int crm_remote_ready(crm_remote_t *remote, int total_timeout)
Wrappers for and extensions to libxml2.
struct tcp_async_cb_data __attribute__
#define crm_perror(level, fmt, args...)
Log a system error message.
#define REMOTE_MSG_VERSION
#define crm_err(fmt, args...)
const char * bz2_strerror(int rc)
char * dump_xml_unformatted(xmlNode *msg)
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
void * create_psk_tls_session(int csock, int type, void *credentials)
void * crm_create_anon_tls_session(int sock, int type, void *credentials)
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
#define crm_info(fmt, args...)
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
enum crm_ais_msg_types type