XRootD
Loading...
Searching...
No Matches
XrdClHttpUtil.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3/* */
4/* This file is part of the XrdClHttp client plugin for XRootD. */
5/* */
6/* XRootD is free software: you can redistribute it and/or modify it under */
7/* the terms of the GNU Lesser General Public License as published by the */
8/* Free Software Foundation, either version 3 of the License, or (at your */
9/* option) any later version. */
10/* */
11/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14/* License for more details. */
15/* */
16/* The copyright holder's institutional names and contributor's names may not */
17/* be used to endorse or promote products derived from this software without */
18/* specific prior written permission of the institution or contributor. */
19/******************************************************************************/
20
21#include "XrdClHttpFile.hh"
22#include "XrdClHttpOps.hh"
24#include "XrdClHttpUtil.hh"
25#include "XrdClHttpWorker.hh"
26
29#include <XrdCl/XrdClLog.hh>
30#include <XrdCl/XrdClURL.hh>
32#include <XrdOuc/XrdOucCRC.hh>
35#include <XrdVersion.hh>
36
37#include <curl/curl.h>
38#include <openssl/bio.h>
39#include <openssl/evp.h>
40
41#include <fcntl.h>
42#include <fstream>
43#ifdef __APPLE__
44#include <pthread.h>
45#else
46#include <sys/syscall.h>
47#include <sys/types.h>
48#endif
49#include <unistd.h>
50
51#include <charconv>
52#include <sstream>
53#include <stdexcept>
54#include <utility>
55
56using namespace XrdClHttp;
57
58thread_local std::vector<CURL*> HandlerQueue::m_handles;
59std::atomic<unsigned> CurlWorker::m_maintenance_period = 5;
60std::vector<std::unique_ptr<XrdClHttp::CurlWorker>> CurlWorker::m_workers;
61std::mutex CurlWorker::m_workers_mutex;
62
63// Performance statistics for the worker
64std::atomic<uint64_t> CurlWorker::m_conncall_errors = 0;
65std::atomic<uint64_t> CurlWorker::m_conncall_req = 0;
66std::atomic<uint64_t> CurlWorker::m_conncall_success = 0;
67std::atomic<uint64_t> CurlWorker::m_conncall_timeout = 0;
68decltype(CurlWorker::m_ops) CurlWorker::m_ops = {};
69std::vector<std::atomic<std::chrono::system_clock::rep>*> CurlWorker::m_workers_last_completed_cycle;
70std::vector<std::atomic<std::chrono::system_clock::rep>*> CurlWorker::m_workers_oldest_op;
71std::mutex CurlWorker::m_worker_stats_mutex;
72
73// Performance statistics for the queue
74std::atomic<uint64_t> HandlerQueue::m_ops_consumed = 0; // Count of operations consumed from the queue.
75std::atomic<uint64_t> HandlerQueue::m_ops_produced = 0; // Count of operations added to the queue.
76std::atomic<uint64_t> HandlerQueue::m_ops_rejected = 0; // Count of operations rejected by the queue.
77
78// shutdown + init trigger, must be last of the static members
79CurlWorker::initcontrol CurlWorker::m_initcontrol;
80
82 CURL *curl{nullptr};
83 time_t expiry{0};
84};
85
86namespace {
87
88pid_t getthreadid() {
89#if defined(__APPLE__)
90 uint64_t pth_threadid;
91 pthread_threadid_np(pthread_self(), &pth_threadid);
92 return pth_threadid;
93#elif defined(__linux__)
94 // NOTE: glibc 2.30 finally provides a gettid() wrapper; however,
95 // we currently support RHEL 8, which is based on glibc 2.28. Until
96 // we drop that platform, it's easier to do the syscall directly on Linux
97 // instead of additional ifdef calls.
98 return syscall(SYS_gettid);
99#else
100 return getpid();
101#endif
102}
103
104}
105
106bool XrdClHttp::HTTPStatusIsError(unsigned status) {
107 return (status < 100) || (status >= 400);
108}
109
110std::pair<uint16_t, uint32_t> XrdClHttp::HTTPStatusConvert(unsigned status) {
111 switch (status) {
112 case 400: // Bad Request
113 return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
114 case 401: // Unauthorized (needs authentication)
115 return std::make_pair(XrdCl::errErrorResponse, kXR_NotAuthorized);
116 case 402: // Payment Required
117 case 403: // Forbidden (failed authorization)
118 return std::make_pair(XrdCl::errErrorResponse, kXR_NotAuthorized);
119 case 404:
120 return std::make_pair(XrdCl::errErrorResponse, kXR_NotFound);
121 case 405: // Method not allowed
122 case 406: // Not acceptable
123 return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
124 case 407: // Proxy Authentication Required
125 return std::make_pair(XrdCl::errErrorResponse, kXR_NotAuthorized);
126 case 408: // Request timeout
127 return std::make_pair(XrdCl::errErrorResponse, kXR_ReqTimedOut);
128 case 409: // Conflict
129 return std::make_pair(XrdCl::errErrorResponse, kXR_Conflict);
130 case 410: // Gone
131 return std::make_pair(XrdCl::errErrorResponse, kXR_NotFound);
132 case 411: // Length required
133 case 412: // Precondition failed
134 case 413: // Payload too large
135 case 414: // URI too long
136 case 415: // Unsupported Media Type
137 case 416: // Range Not Satisfiable
138 case 417: // Expectation Failed
139 case 418: // I'm a teapot
140 return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
141 case 421: // Misdirected Request
142 case 422: // Unprocessable Content
143 return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
144 case 423: // Locked
145 return std::make_pair(XrdCl::errErrorResponse, kXR_FileLocked);
146 case 424: // Failed Dependency
147 case 425: // Too Early
148 case 426: // Upgrade Required
149 case 428: // Precondition Required
150 return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
151 case 429: // Too Many Requests
152 return std::make_pair(XrdCl::errErrorResponse, kXR_Overloaded);
153 case 431: // Request Header Fields Too Large
154 return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
155 case 451: // Unavailable For Legal Reasons
156 return std::make_pair(XrdCl::errErrorResponse, kXR_Impossible);
157 case 500: // Internal Server Error
158 case 501: // Not Implemented
159 case 502: // Bad Gateway
160 case 503: // Service Unavailable
161 return std::make_pair(XrdCl::errErrorResponse, kXR_ServerError);
162 case 504: // Gateway Timeout
163 return std::make_pair(XrdCl::errErrorResponse, kXR_ReqTimedOut);
164 case 507: // Insufficient Storage
165 return std::make_pair(XrdCl::errErrorResponse, kXR_overQuota);
166 case 508: // Loop Detected
167 case 510: // Not Extended
168 case 511: // Network Authentication Required
169 return std::make_pair(XrdCl::errErrorResponse, kXR_ServerError);
170 }
171 return std::make_pair(XrdCl::errUnknown, status);
172}
173
174std::pair<uint16_t, uint32_t> CurlCodeConvert(CURLcode res) {
175 switch (res) {
176 case CURLE_OK:
177 return std::make_pair(XrdCl::errNone, 0);
178 case CURLE_COULDNT_RESOLVE_PROXY:
179 case CURLE_COULDNT_RESOLVE_HOST:
180 return std::make_pair(XrdCl::errInvalidAddr, 0);
181 case CURLE_LOGIN_DENIED:
182 // Commented-out cases are for platforms (RHEL7) where the error
183 // codes are undefined.
184 //case CURLE_AUTH_ERROR:
185 //case CURLE_SSL_CLIENTCERT:
186 case CURLE_REMOTE_ACCESS_DENIED:
187 return std::make_pair(XrdCl::errLoginFailed, EACCES);
188 case CURLE_SSL_CONNECT_ERROR:
189 case CURLE_SSL_ENGINE_NOTFOUND:
190 case CURLE_SSL_ENGINE_SETFAILED:
191 case CURLE_SSL_CERTPROBLEM:
192 case CURLE_SSL_CIPHER:
193 case 51: // In old curl versions, this is CURLE_PEER_FAILED_VERIFICATION; that constant was changed to be 60 / CURLE_SSL_CACERT
194 case CURLE_SSL_SHUTDOWN_FAILED:
195 case CURLE_SSL_CRL_BADFILE:
196 case CURLE_SSL_ISSUER_ERROR:
197 case CURLE_SSL_CACERT: // value is 60; merged with CURLE_PEER_FAILED_VERIFICATION
198 //case CURLE_SSL_PINNEDPUBKEYNOTMATCH:
199 //case CURLE_SSL_INVALIDCERTSTATUS:
200 return std::make_pair(XrdCl::errTlsError, 0);
201 case CURLE_SEND_ERROR:
202 case CURLE_RECV_ERROR:
203 return std::make_pair(XrdCl::errSocketError, EIO);
204 case CURLE_COULDNT_CONNECT:
205 case CURLE_GOT_NOTHING:
206 return std::make_pair(XrdCl::errConnectionError, ECONNREFUSED);
207 case CURLE_OPERATION_TIMEDOUT:
208#ifdef HAVE_XPROTOCOL_TIMEREXPIRED
210#else
211 return std::make_pair(XrdCl::errOperationExpired, ESTALE);
212#endif
213 case CURLE_UNSUPPORTED_PROTOCOL:
214 case CURLE_NOT_BUILT_IN:
215 return std::make_pair(XrdCl::errNotSupported, ENOSYS);
216 case CURLE_FAILED_INIT:
217 return std::make_pair(XrdCl::errInternal, 0);
218 case CURLE_URL_MALFORMAT:
219 return std::make_pair(XrdCl::errInvalidArgs, res);
220 //case CURLE_WEIRD_SERVER_REPLY:
221 //case CURLE_HTTP2:
222 //case CURLE_HTTP2_STREAM:
223 return std::make_pair(XrdCl::errCorruptedHeader, res);
224 case CURLE_PARTIAL_FILE:
225 return std::make_pair(XrdCl::errDataError, res);
226 // These two errors indicate a failure in the callback. That
227 // should generate their own failures, meaning this should never
228 // get use.
229 case CURLE_READ_ERROR:
230 case CURLE_WRITE_ERROR:
231 return std::make_pair(XrdCl::errInternal, res);
232 case CURLE_RANGE_ERROR:
233 case CURLE_BAD_CONTENT_ENCODING:
234 return std::make_pair(XrdCl::errNotSupported, res);
235 case CURLE_TOO_MANY_REDIRECTS:
236 return std::make_pair(XrdCl::errRedirectLimit, res);
237 default:
238 return std::make_pair(XrdCl::errUnknown, res);
239 }
240}
241
242bool HeaderParser::Base64Decode(std::string_view input, std::array<unsigned char, 32> &output) {
243 if (input.size() > 44 || input.size() % 4 != 0) return false;
244 if (input.size() == 0) return true;
245
246 std::unique_ptr<BIO, decltype(&BIO_free_all)> b64(BIO_new(BIO_f_base64()), &BIO_free_all);
247 BIO_set_flags(b64.get(), BIO_FLAGS_BASE64_NO_NL);
248 std::unique_ptr<BIO, decltype(&BIO_free_all)> bmem(
249 BIO_new_mem_buf(const_cast<char *>(input.data()), input.size()), &BIO_free_all);
250 bmem.reset(BIO_push(b64.release(), bmem.release()));
251
252 // Compute expected length of output; used to verify BIO_read consumes all input
253 size_t expectedLen = static_cast<size_t>(input.size() * 0.75);
254 if (input[input.size() - 1] == '=') {
255 expectedLen -= 1;
256 if (input[input.size() - 2] == '=') {
257 expectedLen -= 1;
258 }
259 }
260
261 auto len = BIO_read(bmem.get(), &output[0], output.size());
262
263 if (len == -1 || static_cast<size_t>(len) != expectedLen) return false;
264
265 return true;
266}
267
268// Parse a single header line.
269//
270// Curl promises for its callbacks "The header callback is
271// called once for each header and only complete header lines
272// are passed on to the callback".
273bool HeaderParser::Parse(const std::string &header_line)
274{
275 if (m_recv_all_headers) {
276 m_recv_all_headers = false;
277 m_recv_status_line = false;
278 }
279
280 if (!m_recv_status_line) {
281 m_recv_status_line = true;
282
283 std::stringstream ss(header_line);
284 std::string item;
285 if (!std::getline(ss, item, ' ')) return false;
286 m_resp_protocol = item;
287 if (!std::getline(ss, item, ' ')) return false;
288 try {
289 m_status_code = std::stol(item);
290 } catch (...) {
291 return false;
292 }
293 if (m_status_code < 100 || m_status_code >= 600) {
294 return false;
295 }
296 if (!std::getline(ss, item, '\n')) return false;
297 auto cr_loc = item.find('\r');
298 if (cr_loc != std::string::npos) {
299 m_resp_message = item.substr(0, cr_loc);
300 } else {
301 m_resp_message = item;
302 }
303 return true;
304 }
305
306 if (header_line.empty() || header_line == "\n" || header_line == "\r\n") {
307 m_recv_all_headers = true;
308 return true;
309 }
310
311 auto found = header_line.find(":");
312 if (found == std::string::npos) {
313 return false;
314 }
315
316 std::string header_name = header_line.substr(0, found);
317 if (!Canonicalize(header_name)) {
318 return false;
319 }
320
321 found += 1;
322 while (found < header_line.size()) {
323 if (header_line[found] != ' ') {break;}
324 found += 1;
325 }
326 std::string header_value = header_line.substr(found);
327 // Note: ignoring the fact headers are only supposed to contain ASCII.
328 // We should trim out UTF-8.
329 header_value.erase(header_value.find_last_not_of(" \r\n\t") + 1);
330
331 // Record the line in our header structure. Will be returned as part
332 // of the response info object.
333 auto iter = m_headers.find(header_name);
334 if (iter == m_headers.end()) {
335 m_headers.insert(iter, {header_name, {header_value}});
336 } else {
337 iter->second.push_back(header_value);
338 }
339
340 if (header_name == "Allow") {
341 std::string_view val(header_value);
342 while (!val.empty()) {
343 auto found = val.find(',');
344 auto method = val.substr(0, found);
345 if (method == "PROPFIND") {
346 auto new_verbs = static_cast<unsigned>(m_allow_verbs) | static_cast<unsigned>(VerbsCache::HttpVerb::kPROPFIND);
347 m_allow_verbs = static_cast<VerbsCache::HttpVerb>(new_verbs);
348 }
349 if (found == std::string_view::npos) break;
350 val = val.substr(found + 1);
351 }
352 if (static_cast<unsigned>(m_allow_verbs) & ~static_cast<unsigned>(VerbsCache::HttpVerb::kUnknown)) {
353 m_allow_verbs = static_cast<VerbsCache::HttpVerb>(static_cast<unsigned>(m_allow_verbs) & ~static_cast<unsigned>(VerbsCache::HttpVerb::kUnknown));
354 }
355 } else if (header_name == "Content-Length") {
356 try {
357 m_content_length = std::stoll(header_value);
358 } catch (...) {
359 return false;
360 }
361 }
362 else if (header_name == "Content-Type") {
363 std::string_view val(header_value);
364 auto found = val.find(";");
365 auto first_type = val.substr(0, found);
366 m_multipart_byteranges = first_type == "multipart/byteranges";
367 if (m_multipart_byteranges) {
368 auto remainder = val.substr(found + 1);
369 found = remainder.find("boundary=");
370 if (found != std::string_view::npos) {
371 SetMultipartSeparator(remainder.substr(found + 9));
372 }
373 }
374 }
375 else if (header_name == "Content-Range") {
376 auto found = header_value.find(" ");
377 if (found == std::string::npos) {
378 return false;
379 }
380 std::string range_unit = header_value.substr(0, found);
381 if (range_unit != "bytes") {
382 return false;
383 }
384 auto range_resp = header_value.substr(found + 1);
385 found = range_resp.find("/");
386 if (found == std::string::npos) {
387 return false;
388 }
389 auto incl_range = range_resp.substr(0, found);
390 found = incl_range.find("-");
391 if (found == std::string::npos) {
392 return false;
393 }
394 auto first_pos = incl_range.substr(0, found);
395 try {
396 m_response_offset = std::stoll(first_pos);
397 } catch (...) {
398 return false;
399 }
400 auto last_pos = incl_range.substr(found + 1);
401 size_t last_byte;
402 try {
403 last_byte = std::stoll(last_pos);
404 } catch (...) {
405 return false;
406 }
407 m_content_length = last_byte - m_response_offset + 1;
408 }
409 else if (header_name == "Location") {
410 m_location = header_value;
411 } else if (header_name == "Digest") {
412 ParseDigest(header_value, m_checksums);
413 }
414 else if (header_name == "Etag")
415 {
416 // Note, the original hader name is ETag, renamed to Etag in parsing
417 // remove additional quotes
418 m_etag = header_value;
419 m_etag.erase(remove(m_etag.begin(), m_etag.end(), '\"'), m_etag.end());
420 }
421 else if (header_name == "Cache-Control")
422 {
423 m_cache_control = header_value;
424 }
425
426 return true;
427}
428
429// Parse a RFC 3230 header into the checksum info structure
430//
431// If the parsing fails, the second element of the tuple will be false.
432void HeaderParser::ParseDigest(const std::string &digest, XrdClHttp::ChecksumInfo &info) {
433 std::string_view view(digest);
434 std::array<unsigned char, 32> checksum_value;
435 std::string digest_lower;
436 while (!view.empty()) {
437 auto nextsep = view.find(',');
438 auto entry = view.substr(0, nextsep);
439 if (nextsep == std::string_view::npos) {
440 view = "";
441 } else {
442 view = view.substr(nextsep + 1);
443 }
444 nextsep = entry.find('=');
445 auto name = entry.substr(0, nextsep);
446 auto value = entry.substr(nextsep + 1);
447 digest_lower.clear();
448 digest_lower.resize(name.size());
449 std::transform(name.begin(), name.end(), digest_lower.begin(), [](unsigned char c) {
450 return std::tolower(c);
451 });
452 if (digest_lower == "md5") {
453 if (value.size() != 24) {
454 continue;
455 }
456 if (Base64Decode(value, checksum_value)) {
457 info.Set(XrdClHttp::ChecksumType::kMD5, checksum_value);
458 }
459 } else if (digest_lower == "crc32c") {
460 // XRootD currently incorrectly base64-encodes crc32c checksums; see
461 // https://github.com/xrootd/xrootd/issues/2456
462 // For backward comaptibility, if this looks like base64 encoded (8
463 // bytes long and last two bytes are padding), then we base64 decode.
464 if (value.size() == 8 && value[6] == '=' && value[7] == '=') {
465 if (Base64Decode(value, checksum_value)) {
466 info.Set(XrdClHttp::ChecksumType::kCRC32C, checksum_value);
467 }
468 continue;
469 }
470 std::size_t pos{0};
471 unsigned long val;
472 try {
473 val = std::stoul(value.data(), &pos, 16);
474 } catch (...) {
475 continue;
476 }
477 if (pos == value.size()) {
478 checksum_value[0] = (val >> 24) & 0xFF;
479 checksum_value[1] = (val >> 16) & 0xFF;
480 checksum_value[2] = (val >> 8) & 0xFF;
481 checksum_value[3] = val & 0xFF;
482 info.Set(XrdClHttp::ChecksumType::kCRC32C, checksum_value);
483 }
484 }
485 }
486}
487
488// Convert the checksum type to a RFC 3230 digest name as recorded by IANA here:
489// https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml
491 switch (type) {
493 return "MD5";
495 return "CRC32c";
497 return "SHA";
499 return "SHA-256";
500 default:
501 return "";
502 }
503}
504
505// This clever approach was inspired by golang's net/textproto
506bool HeaderParser::validHeaderByte(unsigned char c)
507{
508 const static uint64_t mask_lower = 0 |
509 uint64_t((1<<10)-1) << '0' |
510 uint64_t(1) << '!' |
511 uint64_t(1) << '#' |
512 uint64_t(1) << '$' |
513 uint64_t(1) << '%' |
514 uint64_t(1) << '&' |
515 uint64_t(1) << '\'' |
516 uint64_t(1) << '*' |
517 uint64_t(1) << '+' |
518 uint64_t(1) << '-' |
519 uint64_t(1) << '.';
520
521 const static uint64_t mask_upper = 0 |
522 uint64_t((1<<26)-1) << ('a'-64) |
523 uint64_t((1<<26)-1) << ('A'-64) |
524 uint64_t(1) << ('^'-64) |
525 uint64_t(1) << ('_'-64) |
526 uint64_t(1) << ('`'-64) |
527 uint64_t(1) << ('|'-64) |
528 uint64_t(1) << ('~'-64);
529
530 if (c >= 128) return false;
531 if (c >= 64) return (uint64_t(1)<<(c-64)) & mask_upper;
532 return (uint64_t(1) << c) & mask_lower;
533}
534
535bool HeaderParser::Canonicalize(std::string &headerName)
536{
537 auto upper = true;
538 const static int toLower = 'a' - 'A';
539 for (size_t idx=0; idx<headerName.size(); idx++) {
540 char c = headerName[idx];
541 if (!validHeaderByte(c)) {
542 return false;
543 }
544 if (upper && 'a' <= c && c <= 'z') {
545 c -= toLower;
546 } else if (!upper && 'A' <= c && c <= 'Z') {
547 c += toLower;
548 }
549 headerName[idx] = c;
550 upper = c == '-';
551 }
552 return true;
553}
554
555HandlerQueue::HandlerQueue(unsigned max_pending_ops) :
556 m_max_pending_ops(max_pending_ops)
557{
558 int filedes[2];
559 auto result = pipe(filedes);
560 if (result == -1) {
561 throw std::runtime_error(strerror(errno));
562 }
563 if (fcntl(filedes[0], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1 || fcntl(filedes[1], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1) {
564 close(filedes[0]);
565 close(filedes[1]);
566 throw std::runtime_error(strerror(errno));
567 }
568 m_read_fd = filedes[0];
569 m_write_fd = filedes[1];
570};
571
572namespace {
573
574bool EnableCurlHeaderDump() {
575 auto *log = XrdCl::DefaultEnv::GetLog();
576 if (log && log->GetLevel() >= XrdCl::Log::DumpMsg)
577 return true;
578
579 return false;
580}
581
582// Debug callback for libcurl headers; enabled with XRD_LOGLEVEL=Dump
583int DumpHeader(CURL *handle, curl_infotype type, char *data, size_t size, void *clientp) {
584 (void)handle;
585 auto *logger = static_cast<XrdCl::Log *>(clientp);
586 if (!logger || !data || size == 0) {
587 return 0;
588 }
589
590 const char *direction = nullptr;
591 switch (type) {
592 case CURLINFO_HEADER_OUT:
593 direction = ">";
594 break;
595 case CURLINFO_HEADER_IN:
596 direction = "<";
597 break;
598 default:
599 return 0;
600 }
601
602 const std::string redacted = obfuscateAuth(std::string(data, size));
603 logger->Debug(kLogXrdClHttp, "%s %s", direction, redacted.c_str());
604 return 0;
605}
606
607}
608
609// Trim left and right side of a string_view for space characters
610std::string_view XrdClHttp::trim_view(const std::string_view &input_view) {
611 auto view = XrdClHttp::ltrim_view(input_view);
612 for (size_t idx = 0; idx < input_view.size(); idx++) {
613 if (!isspace(view[view.size() - 1 - idx])) {
614 return view.substr(0, view.size() - idx);
615 }
616 }
617 return "";
618}
619
620// Trim the left side of a string_view for space
621std::string_view XrdClHttp::ltrim_view(const std::string_view &input_view) {
622 for (size_t idx = 0; idx < input_view.size(); idx++) {
623 if (!isspace(input_view[idx])) {
624 return input_view.substr(idx);
625 }
626 }
627 return "";
628}
629
630CURL *
631XrdClHttp::GetHandle(bool verbose) {
632 auto result = curl_easy_init();
633 if (result == nullptr) {
634 return result;
635 }
636
637 curl_easy_setopt(result, CURLOPT_USERAGENT, "xrdcl-http/" XrdVERSION);
638 curl_easy_setopt(result, CURLOPT_DEBUGFUNCTION, DumpHeader);
639 curl_easy_setopt(result, CURLOPT_DEBUGDATA, XrdCl::DefaultEnv::GetLog());
640 if (verbose)
641 curl_easy_setopt(result, CURLOPT_VERBOSE, 1L);
642
643 auto env = XrdCl::DefaultEnv::GetEnv();
644 std::string ca_file;
645 if (!env->GetString("HttpCertFile", ca_file) || ca_file.empty()) {
646 char *x509_ca_file = getenv("X509_CERT_FILE");
647 if (x509_ca_file) {
648 ca_file = std::string(x509_ca_file);
649 }
650 }
651 if (!ca_file.empty()) {
652 curl_easy_setopt(result, CURLOPT_CAINFO, ca_file.c_str());
653 }
654 std::string ca_dir;
655 if (!env->GetString("HttpCertDir", ca_dir) || ca_dir.empty()) {
656 char *x509_ca_dir = getenv("X509_CERT_DIR");
657 if (x509_ca_dir) {
658 ca_dir = std::string(x509_ca_dir);
659 }
660 }
661 if (!ca_dir.empty()) {
662 curl_easy_setopt(result, CURLOPT_CAPATH, ca_dir.c_str());
663 }
664
665 curl_easy_setopt(result, CURLOPT_BUFFERSIZE, 32*1024);
666
667 return result;
668}
669
670CURL *
672 if (m_handles.size()) {
673 auto result = m_handles.back();
674 m_handles.pop_back();
675 return result;
676 }
677
678 return ::GetHandle(EnableCurlHeaderDump());
679}
680
681void
683 m_handles.push_back(curl);
684}
685
686void
688{
689 std::unique_lock<std::mutex> lk(m_mutex);
690 auto now = std::chrono::steady_clock::now();
691
692 // Iterate through the paused transfers, checking if they are done.
693 for (auto &op : m_ops) {
694 if (!op->IsPaused()) continue;
695
696 if (op->TransferStalled(0, now)) {
697 op->ContinueHandle();
698 }
699 }
700
701 std::vector<decltype(m_ops)::value_type> expired_ops;
702 unsigned expired_count = 0;
703 auto it = std::remove_if(m_ops.begin(), m_ops.end(),
704 [&](const std::shared_ptr<CurlOperation> &handler) {
705 auto expired = handler->GetOperationExpiry() < now;
706 if (expired) {
707 expired_ops.push_back(handler);
708 expired_count++;
709 }
710 return expired;
711 });
712 m_ops.erase(it, m_ops.end());
713
714 // The contents of our pipe and the in-memory queue are now off by expired_count.
715 // Read exactly that many bytes from the pipe and throw them away.
716 char throwaway[64];
717 unsigned bytes_to_read = expired_count;
718 while (bytes_to_read > 0) {
719 size_t chunk = std::min<size_t>(sizeof(throwaway), bytes_to_read);
720 ssize_t n = read(m_read_fd, throwaway, chunk);
721 if (n > 0) {
722 bytes_to_read -= n;
723 } else if (n == -1) {
724 if (errno == EINTR) {
725 continue;
726 } else {
727 // EWOULDBLOCK is a possibility if there's a synchronization error;
728 // for now, just continue on as if we were successful in reading out
729 // the missing bytes
730 break;
731 }
732 } else {
733 break;
734 }
735 }
736
737 // Note: the failure handler may trigger new operations submitted to the queue
738 // (which requires the lock to be held) such as a prefetch operation that gets split
739 // into multiple sub-operations.
740 //
741 // Thus, we must unlock the mutex protecting the queue and avoid touching the shared state of
742 // m_ops.
743 lk.unlock();
744 for (auto &handler : expired_ops) {
745 if (handler) handler->Fail(XrdCl::errOperationExpired, 0, "Operation expired while in queue");
746 }
747}
748
749void
750HandlerQueue::Produce(std::shared_ptr<CurlOperation> handler)
751{
752 auto handler_expiry = handler->GetOperationExpiry();
753 std::unique_lock<std::mutex> lk{m_mutex};
754 m_producer_cv.wait_until(lk,
755 handler_expiry,
756 [&]{return m_ops.size() < m_max_pending_ops;}
757 );
758 if (std::chrono::steady_clock::now() > handler_expiry) {
759 lk.unlock();
760 handler->Fail(XrdCl::errOperationExpired, 0, "Operation expired while waiting for worker");
761 m_ops_rejected.fetch_add(1, std::memory_order_relaxed);
762 return;
763 }
764
765 m_ops.push_back(handler);
766 char ready[] = "1";
767 while (true) {
768 auto result = write(m_write_fd, ready, 1);
769 if (result == -1) {
770 if (errno == EINTR) {
771 continue;
772 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
773 // This should never happen, but if it does, just continue
774 // as if we successfully wrote the notification to the pipe.
775 break;
776 }
777 throw std::runtime_error(strerror(errno));
778 }
779 break;
780 }
781
782 lk.unlock();
783 m_consumer_cv.notify_one();
784 m_ops_produced.fetch_add(1, std::memory_order_relaxed);
785}
786
787std::shared_ptr<CurlOperation>
788HandlerQueue::Consume(std::chrono::steady_clock::duration dur)
789{
790 std::unique_lock<std::mutex> lk(m_mutex);
791 m_consumer_cv.wait_for(lk, dur, [&]{return m_ops.size() > 0 || m_shutdown;});
792 if (m_shutdown || m_ops.empty()) {
793 return {};
794 }
795
796 std::shared_ptr<CurlOperation> result = m_ops.front();
797 m_ops.pop_front();
798
799 char ready[1];
800 while (true) {
801 auto result = read(m_read_fd, ready, 1);
802 if (result == -1) {
803 if (errno == EINTR) {
804 continue;
805 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
806 // This should never happen, but if it does, just continue
807 // as if we successfully read the byte.
808 break;
809 }
810 throw std::runtime_error(strerror(errno));
811 }
812 break;
813 }
814
815 lk.unlock();
816 m_producer_cv.notify_one();
817 m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
818
819 return result;
820}
821
822std::string
824{
825 auto consumed = m_ops_consumed.load(std::memory_order_relaxed);
826 auto produced = m_ops_produced.load(std::memory_order_relaxed);
827 return "{"
828 "\"produced\":" + std::to_string(produced) + ","
829 "\"consumed\":" + std::to_string(consumed) + ","
830 "\"pending\":" + std::to_string(produced - consumed) + ","
831 "\"rejected\":" + std::to_string(m_ops_rejected.load(std::memory_order_relaxed)) +
832 "}";
833}
834
835std::shared_ptr<CurlOperation>
837{
838 std::unique_lock<std::mutex> lk(m_mutex);
839 if (m_ops.size() == 0) {
840 std::shared_ptr<CurlOperation> result;
841 return result;
842 }
843
844 std::shared_ptr<CurlOperation> result = m_ops.front();
845 m_ops.pop_front();
846
847 char ready[1];
848 while (true) {
849 auto result = read(m_read_fd, ready, 1);
850 if (result == -1) {
851 if (errno == EINTR) {
852 continue;
853 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
854 // This should never happen, but if it does, just continue
855 // as if we successfully read the byte.
856 break;
857 }
858 throw std::runtime_error(strerror(errno));
859 }
860 break;
861 }
862
863 lk.unlock();
864 m_producer_cv.notify_one();
865 m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
866
867 return result;
868}
869
870void
872{
873 std::unique_lock lock(m_mutex);
874 m_shutdown = true;
875 m_consumer_cv.notify_all();
876}
877
878void
880{
881 for (auto handle : m_handles) {
882 curl_easy_cleanup(handle);
883 }
884 m_handles.clear();
885}
886
887CurlWorker::CurlWorker(std::shared_ptr<HandlerQueue> queue, VerbsCache &cache, XrdCl::Log* logger) :
888 m_cache(cache),
889 m_queue(queue),
890 m_logger(logger)
891{
892 {
893 std::unique_lock lk(m_worker_stats_mutex);
894 m_stats_offset = m_workers_last_completed_cycle.size();
895 m_workers_last_completed_cycle.push_back(&m_last_completed_cycle);
896 m_workers_oldest_op.push_back(&m_oldest_op);
897 }
898 int pipeInfo[2];
899 if ((pipe(pipeInfo) == -1) || (fcntl(pipeInfo[0], F_SETFD, FD_CLOEXEC)) || (fcntl(pipeInfo[1], F_SETFD, FD_CLOEXEC))) {
900 throw std::runtime_error("Failed to create shutdown monitoring pipe for curl worker");
901 }
902 m_shutdown_pipe_r = pipeInfo[0];
903 m_shutdown_pipe_w = pipeInfo[1];
904
905 // Handle setup of the X509 authentication
906 auto env = XrdCl::DefaultEnv::GetEnv();
907 env->GetString("HttpClientCertFile", m_x509_client_cert_file);
908 env->GetString("HttpClientKeyFile", m_x509_client_key_file);
909}
910
911std::tuple<std::string, std::string> CurlWorker::ClientX509CertKeyFile() const
912{
913 return std::make_tuple(m_x509_client_cert_file, m_x509_client_key_file);
914}
915
916std::string
918{
919 auto now = std::chrono::system_clock::now().time_since_epoch().count();
920 auto oldest_op = now;
921 auto oldest_cycle = now;
922 {
923 std::unique_lock lk(m_worker_stats_mutex);
924 for (const auto &entry : m_workers_last_completed_cycle) {
925 if (!entry) {continue;}
926 auto cycle = entry->load(std::memory_order_relaxed);
927 if (cycle < oldest_cycle) oldest_cycle = cycle;
928 }
929 for (const auto &entry : m_workers_oldest_op) {
930 if (!entry) {continue;}
931 auto op = entry->load(std::memory_order_relaxed);
932 if (op < oldest_op) oldest_op = op;
933 }
934 }
935 auto oldest_op_dbl = std::chrono::duration<double>(std::chrono::system_clock::time_point(std::chrono::system_clock::duration(oldest_op)).time_since_epoch()).count();
936 auto oldest_cycle_dbl = std::chrono::duration<double>(std::chrono::system_clock::time_point(std::chrono::system_clock::duration(oldest_cycle)).time_since_epoch()).count();
937 std::string retval = "{"
938 "\"oldest_op\":" + std::to_string(oldest_op_dbl) + ","
939 "\"oldest_cycle\":" + std::to_string(oldest_cycle_dbl) + ","
940 ;
941
942 for (size_t verb_idx = 0; verb_idx < static_cast<int>(XrdClHttp::CurlOperation::HttpVerb::Count); verb_idx++) {
943 const auto &verb_str = XrdClHttp::CurlOperation::GetVerbString(static_cast<XrdClHttp::CurlOperation::HttpVerb>(verb_idx));
944 for (size_t op_idx = 0; op_idx < 402; op_idx++) {
945 if (op_idx == 401) continue;
946
947 auto &op_stats = m_ops[verb_idx][op_idx];
948 auto duration = op_stats.m_duration.load(std::memory_order_relaxed);
949 if (duration == 0) continue;
950
951 std::string prefix = "http_" + verb_str + "_" + ((op_idx == 402) ? "invalid" : std::to_string(200 + op_idx)) + "_";
952
953 auto duration_dbl = std::chrono::duration<double>(std::chrono::steady_clock::duration(duration)).count();
954 retval += "\"" + prefix + "duration\":" + std::to_string(duration_dbl) + ",";
955
956 duration = op_stats.m_pause_duration.load(std::memory_order_relaxed);
957 if (duration > 0) {
958 duration_dbl = std::chrono::duration<double>(std::chrono::steady_clock::duration(duration)).count();
959 retval += "\"" + prefix + "pause_duration\":" + std::to_string(duration_dbl) + ",";
960 }
961
962 auto count = op_stats.m_bytes.load(std::memory_order_relaxed);
963 if (count) retval += "\"" + prefix + "bytes\":" + std::to_string(count) + ",";
964 count = op_stats.m_error.load(std::memory_order_relaxed);
965 if (count) retval += "\"" + prefix + "error\":" + std::to_string(count) + ",";
966 count = op_stats.m_finished.load(std::memory_order_relaxed);
967 if (count) retval += "\"" + prefix + "finished\":" + std::to_string(count) + ",";
968 count = op_stats.m_client_timeout.load(std::memory_order_relaxed);
969 if (count) retval += "\"" + prefix + "client_timeout\":" + std::to_string(count) + ",";
970 count = op_stats.m_server_timeout.load(std::memory_order_relaxed);
971 if (count) retval += "\"" + prefix + "server_timeout\":" + std::to_string(count) + ",";
972 }
973 {
974 auto &op_stats = m_ops[verb_idx][401];
975 auto duration = op_stats.m_duration.load(std::memory_order_relaxed);
976 if (duration == 0) continue;
977
978 std::string prefix = "http_" + verb_str + "_";
979
980 auto duration_dbl = std::chrono::duration<double>(std::chrono::steady_clock::duration(duration)).count();
981 retval += "\"" + prefix + "preheader_duration\":" + std::to_string(duration_dbl) + ",";
982
983 auto count = op_stats.m_started.load(std::memory_order_relaxed);
984 if (count) retval += "\"" + prefix + "started\":" + std::to_string(count) + ",";
985 count = op_stats.m_error.load(std::memory_order_relaxed);
986 if (count) retval += "\"" + prefix + "preheader_error\":" + std::to_string(count) + ",";
987 count = op_stats.m_finished.load(std::memory_order_relaxed);
988 if (count) retval += "\"" + prefix + "preheader_finished\":" + std::to_string(count) + ",";
989 count = op_stats.m_server_timeout.load(std::memory_order_relaxed);
990 if (count) retval += "\"" + prefix + "preheader_timeout\":" + std::to_string(count) + ",";
991 count = op_stats.m_conncall_timeout.load(std::memory_order_relaxed);
992 if (count) retval += "\"" + prefix + "conncall_timeout\":" + std::to_string(count) + ",";
993 }
994 }
995
996 retval +=
997 "\"conncall_error\":" + std::to_string(m_conncall_errors.load(std::memory_order_relaxed)) + ","
998 "\"conncall_started\":" + std::to_string(m_conncall_req.load(std::memory_order_relaxed)) + ","
999 "\"conncall_success\":" + std::to_string(m_conncall_success.load(std::memory_order_relaxed)) + ","
1000 "\"conncall_timeout\":" + std::to_string(m_conncall_timeout.load(std::memory_order_relaxed)) +
1001 "}";
1002
1003 return retval;
1004}
1005
1006void
1007CurlWorker::OpRecord(XrdClHttp::CurlOperation &op, OpKind kind)
1008{
1009 int sc = op.GetStatusCode();
1010 // - We encode everything pre-header as integer "401". We include a 100-continue request as "pre-header".
1011 // - Status codes out of the acceptable range are labeled "402"
1012 // - Otherwise, we store it in the array shifted by 200 (to avoid more sparsity)
1013 if (sc < 0 || kind == OpKind::Start || sc == 100) {
1014 sc = 401;
1015 } else if (sc < 200 || sc >= 600) {
1016 sc = 402;
1017 } else {
1018 sc -= 200;
1019 }
1020 auto [bytes, pre_headers, post_headers, pause_duration] = op.StatisticsReset();
1021 auto &op_stats = m_ops[static_cast<int>(op.GetVerb())][sc];
1022 op_stats.m_bytes.fetch_add(bytes, std::memory_order_relaxed);
1023 op_stats.m_duration.fetch_add((sc == 401) ? pre_headers.count() : post_headers.count(), std::memory_order_relaxed);
1024 op_stats.m_pause_duration.fetch_add(pause_duration.count(), std::memory_order_relaxed);
1025 if (pre_headers != std::chrono::steady_clock::duration::zero() && sc != 401) {
1026 auto &old_stats = m_ops[static_cast<int>(op.GetVerb())][401];
1027 old_stats.m_duration.fetch_add(pre_headers.count(), std::memory_order_relaxed);
1028 }
1029 switch (kind) {
1030 case OpKind::ConncallTimeout:
1031 op_stats.m_conncall_timeout.fetch_add(1, std::memory_order_relaxed);
1032 break;
1033 case OpKind::ClientTimeout:
1034 op_stats.m_client_timeout.fetch_add(1, std::memory_order_relaxed);
1035 break;
1036 case OpKind::Error:
1037 op_stats.m_error.fetch_add(1, std::memory_order_relaxed);
1038 break;
1039 case OpKind::Finish:
1040 op_stats.m_finished.fetch_add(1, std::memory_order_relaxed);
1041 break;
1042 case OpKind::Start:
1043 op_stats.m_started.fetch_add(1, std::memory_order_relaxed);
1044 break;
1045 case OpKind::ServerTimeout:
1046 op_stats.m_server_timeout.fetch_add(1, std::memory_order_relaxed);
1047 break;
1048 case OpKind::Update:
1049 break;
1050 }
1051}
1052
1053void
1054CurlWorker::Start(std::unique_ptr<XrdClHttp::CurlWorker> self, std::thread tid)
1055{
1056 {
1057 std::unique_lock lock(m_workers_mutex);
1058 m_workers.emplace_back(std::move(self));
1059 m_self_tid = std::move(tid);
1060 }
1061 std::unique_lock lock(m_start_lock);
1062 m_start_complete = true;
1063 m_start_complete_cv.notify_one();
1064}
1065
1066void
1068{
1069 {
1070 std::unique_lock lock(myself->m_start_lock);
1071 myself->m_start_complete_cv.wait(lock, [&]{return myself->m_start_complete;});
1072 }
1073 try {
1074 myself->Run();
1075 } catch (...) {
1076 myself->m_logger->Warning(kLogXrdClHttp, "Curl worker got an exception");
1077 {
1078 std::unique_lock lock(m_workers_mutex);
1079 auto iter = std::remove_if(m_workers.begin(), m_workers.end(), [&](std::unique_ptr<XrdClHttp::CurlWorker> &worker){return worker.get() == myself;});
1080 m_workers.erase(iter);
1081 }
1082 }
1083}
1084
1085void
1087 int max_pending = 50;
1088 XrdCl::DefaultEnv::GetEnv()->GetInt("HttpMaxPendingOps", max_pending);
1089 m_continue_queue.reset(new HandlerQueue(max_pending));
1090 auto &queue = *m_queue.get();
1091 m_logger->Debug(kLogXrdClHttp, "Started a curl worker");
1092
1093 CURLM *multi_handle = curl_multi_init();
1094 if (multi_handle == nullptr) {
1095 throw std::runtime_error("Failed to create curl multi-handle");
1096 }
1097
1098 int running_handles = 0;
1099 time_t last_maintenance = time(NULL);
1100 CURLMcode mres = CURLM_OK;
1101
1102 // Map from a file descriptor that has an outstanding broker request
1103 // to the corresponding CURL handle.
1104 std::unordered_map<int, WaitingForBroker> broker_reqs;
1105 std::vector<struct curl_waitfd> waitfds;
1106
1107 bool want_shutdown = false;
1108 while (!want_shutdown) {
1109 m_last_completed_cycle.store(std::chrono::system_clock::now().time_since_epoch().count());
1110 auto oldest_op = std::chrono::system_clock::now();
1111 for (const auto &entry : m_op_map) {
1112 OpRecord(*entry.second.first, OpKind::Update);
1113 if (entry.second.second < oldest_op) {
1114 oldest_op = entry.second.second;
1115 }
1116 }
1117 m_oldest_op.store(oldest_op.time_since_epoch().count());
1118
1119 // Try continuing any available handles that have more data
1120 while (true) {
1121 auto op = m_continue_queue->TryConsume();
1122 if (!op) {
1123 break;
1124 }
1125 // Avoid race condition where external thread added a continue operation to queue
1126 // while the curl worker thread failed the transfer.
1127 if (op->IsDone()) {
1128 m_logger->Debug(kLogXrdClHttp, "Ignoring continuation of operation that has already completed");
1129 continue;
1130 }
1131 m_logger->Debug(kLogXrdClHttp, "Continuing the curl handle from op %p on thread %d", op.get(), getthreadid());
1132 auto curl = op->GetCurlHandle();
1133 if (!op->ContinueHandle()) {
1134 op->Fail(XrdCl::errInternal, 0, "Failed to continue the curl handle for the operation");
1135 OpRecord(*op, OpKind::Error);
1136 op->ReleaseHandle();
1137 if (curl) {
1138 curl_multi_remove_handle(multi_handle, curl);
1139 curl_easy_cleanup(curl);
1140 m_op_map.erase(curl);
1141 }
1142 running_handles -= 1;
1143 continue;
1144 } else {
1145 auto iter = m_op_map.find(curl);
1146 if (iter != m_op_map.end()) iter->second.second = std::chrono::system_clock::now();
1147 }
1148 }
1149 // Consume from the shared new operation queue
1150 while (running_handles < static_cast<int>(m_max_ops)) {
1151 auto op = running_handles == 0 ? queue.Consume(std::chrono::seconds(1)) : queue.TryConsume();
1152 if (!op) {
1153 break;
1154 }
1155 auto curl = queue.GetHandle();
1156 if (curl == nullptr) {
1157 m_logger->Debug(kLogXrdClHttp, "Unable to allocate a curl handle");
1158 op->Fail(XrdCl::errInternal, ENOMEM, "Unable to get allocate a curl handle");
1159 continue;
1160 }
1161 try {
1162 auto rv = op->Setup(curl, *this);
1163 if (!rv) {
1164 m_logger->Debug(kLogXrdClHttp, "Failed to setup the curl handle");
1165 op->Fail(XrdCl::errInternal, ENOMEM, "Failed to setup the curl handle for the operation");
1166 continue;
1167 }
1168 if (!op->FinishSetup(curl)) {
1169 m_logger->Debug(kLogXrdClHttp, "Failed to finish setup of the curl handle");
1170 op->Fail(XrdCl::errInternal, ENOMEM, "Failed to finish setup of the curl handle for the operation");
1171 continue;
1172 }
1173 } catch (...) {
1174 m_logger->Debug(kLogXrdClHttp, "Unable to setup the curl handle");
1175 op->Fail(XrdCl::errInternal, ENOMEM, "Failed to setup the curl handle for the operation");
1176 continue;
1177 }
1178 op->SetContinueQueue(m_continue_queue);
1179
1180 if (op->IsDone()) {
1181 continue;
1182 }
1183 m_op_map[curl] = {op, std::chrono::system_clock::now()};
1184
1185 // If the operation requires the result of the OPTIONS verb to function, then
1186 // we add that to the multi-handle instead, chaining the two calls together.
1187 if (op->RequiresOptions()) {
1188 std::string modified_url;
1189 std::shared_ptr<CurlOptionsOp> options_op(
1190 new CurlOptionsOp(
1191 curl, op,
1192 std::string(
1193 VerbsCache::GetUrlKey(op->GetUrl(), modified_url)
1194 ),
1195 m_logger, op->GetConnCalloutFunc()
1196 )
1197 );
1198 // Note this `curl` variable is not local to the conditional; it is the curl handle of the
1199 // CurlOptionsOp and will be added below to the multi-handle, causing it - not the parent's
1200 // curl handle - to be executed.
1201 curl = queue.GetHandle();
1202 if (curl == nullptr) {
1203 m_logger->Debug(kLogXrdClHttp, "Unable to allocate a curl handle");
1204 op->Fail(XrdCl::errInternal, ENOMEM, "Unable to get allocate a curl handle");
1205 OpRecord(*op, OpKind::Error);
1206 continue;
1207 }
1208 auto rv = options_op->Setup(curl, *this);
1209 if (!rv) {
1210 m_logger->Debug(kLogXrdClHttp, "Failed to allocate a curl handle for OPTIONS");
1211 continue;
1212 }
1213 m_op_map[curl] = {options_op, std::chrono::system_clock::now()};
1214 OpRecord(*options_op, OpKind::Start);
1215 running_handles += 1;
1216 } else {
1217 OpRecord(*op, OpKind::Start);
1218 }
1219
1220 auto mres = curl_multi_add_handle(multi_handle, curl);
1221 if (mres != CURLM_OK) {
1222 m_logger->Debug(kLogXrdClHttp, "Unable to add operation to the curl multi-handle");
1223 op->Fail(XrdCl::errInternal, mres, "Unable to add operation to the curl multi-handle");
1224 OpRecord(*op, OpKind::Error);
1225 continue;
1226 }
1227 m_logger->Debug(kLogXrdClHttp, "Added request for URL %s to worker thread for processing", op->GetUrl().c_str());
1228 running_handles += 1;
1229 }
1230
1231 // Maintain the periodic reporting of thread activity and fail any operations
1232 // that have expired / timed out.
1233 time_t now = time(NULL);
1234 time_t next_maintenance = last_maintenance + m_maintenance_period.load(std::memory_order_relaxed);
1235 if (now >= next_maintenance) {
1236 m_queue->Expire();
1237 m_continue_queue->Expire();
1238 m_logger->Debug(kLogXrdClHttp, "Curl worker thread %d is running %d operations",
1239 getthreadid(), running_handles);
1240 last_maintenance = now;
1241
1242 // Timeout all the pending broker requests.
1243 std::vector<std::pair<int, CURL *>> expired_ops;
1244 for (const auto &entry : broker_reqs) {
1245 if (entry.second.expiry < now) {
1246 expired_ops.emplace_back(entry.first, entry.second.curl);
1247 }
1248 }
1249 for (const auto &entry : expired_ops) {
1250 auto iter = m_op_map.find(entry.second);
1251 if (iter == m_op_map.end()) {
1252 m_logger->Warning(kLogXrdClHttp, "Found an expired curl handle with no corresponding operation!");
1253 } else {
1254
1255 CurlOptionsOp *options_op = nullptr;
1256 if ((options_op = dynamic_cast<CurlOptionsOp*>(iter->second.first.get())) != nullptr) {
1257 auto parent_op = options_op->GetOperation();
1258 bool parent_op_failed = false;
1259 if (parent_op->IsRedirect()) {
1260 std::string target;
1261 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1262 auto iter = m_op_map.find(options_op->GetParentCurlHandle());
1263 if (iter != m_op_map.end()) {
1264 OpRecord(*iter->second.first, OpKind::Error);
1265 iter->second.first->Fail(XrdCl::errErrorResponse, 0, "Failed to send OPTIONS to redirect target");
1266 m_op_map.erase(iter);
1267 running_handles -= 1;
1268 }
1269 parent_op_failed = true;
1270 } else {
1271 OpRecord(*parent_op, OpKind::Start);
1272 }
1273 } else {
1274 OpRecord(*parent_op, OpKind::Start);
1275 }
1276 if (!parent_op_failed){
1277 curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1278 }
1279 }
1280
1281 iter->second.first->Fail(XrdCl::errConnectionError, 1, "Timeout: connection never provided for request");
1282 iter->second.first->ReleaseHandle();
1283 OpRecord(*(iter->second.first), OpKind::ConncallTimeout);
1284 m_op_map.erase(entry.second);
1285 curl_easy_cleanup(entry.second);
1286 running_handles -= 1;
1287 }
1288 broker_reqs.erase(entry.first);
1289 m_conncall_timeout.fetch_add(1, std::memory_order_relaxed);
1290 }
1291
1292 // Cleanup the fake connection cache entries.
1294 }
1295
1296 waitfds.clear();
1297 waitfds.resize(3 + broker_reqs.size());
1298
1299 waitfds[0].fd = queue.PollFD();
1300 waitfds[0].events = CURL_WAIT_POLLIN;
1301 waitfds[0].revents = 0;
1302 waitfds[1].fd = m_continue_queue->PollFD();
1303 waitfds[1].events = CURL_WAIT_POLLIN;
1304 waitfds[1].revents = 0;
1305 waitfds[2].fd = m_shutdown_pipe_r;
1306 waitfds[2].revents = 0;
1307 waitfds[2].events = CURL_WAIT_POLLIN | CURL_WAIT_POLLPRI;
1308
1309 int idx = 3;
1310 for (const auto &entry : broker_reqs) {
1311 waitfds[idx].fd = entry.first;
1312 waitfds[idx].events = CURL_WAIT_POLLIN|CURL_WAIT_POLLPRI;
1313 waitfds[idx].revents = 0;
1314 idx += 1;
1315 }
1316
1317 long timeo;
1318 curl_multi_timeout(multi_handle, &timeo);
1319 // These commented-out lines are purposely left; will need to revisit after the 0.9.1 release;
1320 // for now, they are too verbose on RHEL7.
1321 //m_logger->Debug(kLogXrdClHttp, "Curl advises a timeout of %ld ms", timeo);
1322 if (running_handles && timeo == -1) {
1323 // Bug workaround: we've seen RHEL7 libcurl have a race condition where it'll not
1324 // set a timeout while doing the DNS lookup; assume that if there are running handles
1325 // but no timeout, we've hit this bug.
1326 //m_logger->Debug(kLogXrdClHttp, "Will sleep for up to 50ms");
1327 mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), 50, nullptr);
1328 } else {
1329 //m_logger->Debug(kLogXrdClHttp, "Will sleep for up to %d seconds", max_sleep_time);
1330 //mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), max_sleep_time*1000, nullptr);
1331 // Temporary test: we've been seeing DNS lookups timeout on additional platforms. Switch to always
1332 // poll as curl_multi_wait doesn't seem to get notified when DNS lookups are done.
1333 mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), 50, nullptr);
1334 }
1335 if (mres != CURLM_OK) {
1336 m_logger->Warning(kLogXrdClHttp, "Failed to wait on multi-handle: %d", mres);
1337 }
1338
1339 // Iterate through the waiting broker callbacks.
1340 for (const auto &entry : waitfds) {
1341 // Ignore the queue's poll fd.
1342 if (waitfds[0].fd == entry.fd || waitfds[1].fd == entry.fd) {
1343 continue;
1344 }
1345 // Handle shutdown requests
1346 if ((waitfds[2].fd == entry.fd) && entry.revents) {
1347 want_shutdown = true;
1348 break;
1349 }
1350 if ((entry.revents & CURL_WAIT_POLLIN) != CURL_WAIT_POLLIN) {
1351 continue;
1352 }
1353 auto handle = broker_reqs[entry.fd].curl;
1354 auto iter = m_op_map.find(handle);
1355 if (iter == m_op_map.end()) {
1356 m_logger->Warning(kLogXrdClHttp, "Internal error: broker responded on FD %d but no corresponding curl operation", entry.fd);
1357 broker_reqs.erase(entry.fd);
1358 m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1359 continue;
1360 }
1361 std::string err;
1362 auto result = iter->second.first->WaitSocketCallback(err);
1363 if (result == -1) {
1364 m_logger->Warning(kLogXrdClHttp, "Error when invoking the broker callback: %s", err.c_str());
1365
1366 CurlOptionsOp *options_op = nullptr;
1367 if ((options_op = dynamic_cast<CurlOptionsOp*>(iter->second.first.get())) != nullptr) {
1368 auto parent_op = options_op->GetOperation();
1369 bool parent_op_failed = false;
1370 if (parent_op->IsRedirect()) {
1371 std::string target;
1372 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1373 auto iter = m_op_map.find(options_op->GetParentCurlHandle());
1374 if (iter != m_op_map.end()) {
1375 OpRecord(*iter->second.first, OpKind::Error);
1376 iter->second.first->Fail(XrdCl::errErrorResponse, 0, "Failed to send OPTIONS to redirect target");
1377 m_op_map.erase(iter);
1378 running_handles -= 1;
1379 }
1380 parent_op_failed = true;
1381 } else {
1382 OpRecord(*parent_op, OpKind::Start);
1383 }
1384 } else {
1385 OpRecord(*parent_op, OpKind::Start);
1386 }
1387 if (!parent_op_failed){
1388 curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1389 }
1390 }
1391
1392 iter->second.first->Fail(XrdCl::errErrorResponse, 1, err);
1393 OpRecord(*iter->second.first, OpKind::Error);
1394 m_op_map.erase(handle);
1395 broker_reqs.erase(entry.fd);
1396 m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1397 running_handles -= 1;
1398 } else {
1399 broker_reqs.erase(entry.fd);
1400 curl_multi_add_handle(multi_handle, handle);
1401 m_conncall_success.fetch_add(1, std::memory_order_relaxed);
1402 }
1403 }
1404
1405 // Do maintenance on the multi-handle
1406 int still_running;
1407 auto mres = curl_multi_perform(multi_handle, &still_running);
1408 if (mres == CURLM_CALL_MULTI_PERFORM) {
1409 continue;
1410 } else if (mres != CURLM_OK) {
1411 m_logger->Warning(kLogXrdClHttp, "Failed to perform multi-handle operation: %d", mres);
1412 break;
1413 }
1414
1415 CURLMsg *msg;
1416 do {
1417 int msgq = 0;
1418 msg = curl_multi_info_read(multi_handle, &msgq);
1419 if (msg && (msg->msg == CURLMSG_DONE)) {
1420 if (!msg->easy_handle) {
1421 m_logger->Warning(kLogXrdClHttp, "Logic error: got a callback for a null handle");
1422 mres = CURLM_BAD_EASY_HANDLE;
1423 break;
1424 }
1425 auto iter = m_op_map.find(msg->easy_handle);
1426 if (iter == m_op_map.end()) {
1427 m_logger->Error(kLogXrdClHttp, "Logic error: got a callback for an entry that doesn't exist");
1428 mres = CURLM_BAD_EASY_HANDLE;
1429 break;
1430 }
1431 auto op = iter->second.first;
1432 auto res = msg->data.result;
1433 bool keep_handle = false;
1434 bool waiting_on_callout = false;
1435 if (res == CURLE_OK) {
1436 auto sc = op->GetStatusCode();
1437 OpRecord(*op, OpKind::Finish);
1438 if (HTTPStatusIsError(sc)) {
1439 auto httpErr = HTTPStatusConvert(sc);
1440 op->Fail(httpErr.first, httpErr.second, op->GetStatusMessage());
1441 op->ReleaseHandle();
1442 // If this was a failed CurlOptionsOp, then we re-activate the parent handle.
1443 // If the parent handle was stopped at a redirect that now returns failure, then
1444 // we'll clean it up.
1445 CurlOptionsOp *options_op = nullptr;
1446 if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get())) != nullptr) {
1447 auto parent_op = options_op->GetOperation();
1448 bool parent_op_failed = false;
1449 if (parent_op->IsRedirect()) {
1450 std::string target;
1451 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1452 OpRecord(*parent_op, OpKind::Error);
1453 m_op_map.erase(options_op->GetParentCurlHandle());
1454 running_handles -= 1;
1455 parent_op_failed = true;
1456 } else {
1457 OpRecord(*parent_op, OpKind::Start);
1458 }
1459 } else {
1460 OpRecord(*parent_op, OpKind::Start);
1461 }
1462 // Have curl execute the parent operation
1463 if (!parent_op_failed) {
1464 curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1465 }
1466 }
1467 // The curl operation was successful, it's just the HTTP request failed; recycle the handle.
1468 queue.RecycleHandle(iter->first);
1469 } else {
1470 CurlOptionsOp *options_op = nullptr;
1471 // If this was a successful OPTIONS op, invoke the parent operation.
1472 if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get()))) {
1473 options_op->Success();
1474 options_op->ReleaseHandle();
1475 // Note: op is scoped external to the conditional block
1476 op = options_op->GetOperation();
1477 op->OptionsDone();
1478 OpRecord(*op, OpKind::Start);
1479 curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1480 curl_multi_remove_handle(multi_handle, iter->first);
1481 queue.RecycleHandle(iter->first);
1482 }
1483 // Check to see if the operation ended in a redirect (note: this might)
1484 // be invoked a second time if this was the parent operation of an OPTIONS
1485 // op.
1486 if (op->IsRedirect()) {
1487 std::string target;
1488 switch (op->Redirect(target)) {
1490 if (options_op) {
1491 // In this case, we failed immediately after an OPTIONS finished.
1492 // Since there's a Start recorded after the OPTIONS processing, we
1493 // must record an error.
1494 // In the non-OPTIONS case, we never recorded a second start and
1495 // don't need a matching failure.
1496 OpRecord(*op, OpKind::Error);
1497 }
1498 keep_handle = false;
1499 break;
1501 if (!options_op) {
1502 // In this case, the redirect occurred without any prior
1503 // OPTIONS call. This implies that `op` is the original call
1504 // and we need to restart it later and record another op start.
1505 keep_handle = true;
1506 OpRecord(*op, OpKind::Start);
1507 }
1508 break;
1510 {
1511 // The redirect resulted in a new endpoint where the cache lookup failed;
1512 // we need to know what HTTP verbs are in the server's Allow list before this
1513 // operation can continue. Inject a new CurlOptionsOp and chain it to the one
1514 // being processed. Once the OPTIONS request is done, then we'll restart this
1515 // operation.
1516 std::string modified_url;
1517 target = VerbsCache::GetUrlKey(target, modified_url);
1518 options_op = new CurlOptionsOp(iter->first, op, target, m_logger, op->GetConnCalloutFunc());
1519 std::shared_ptr<CurlOperation> new_op(options_op);
1520 auto curl = queue.GetHandle();
1521 if (curl == nullptr) {
1522 m_logger->Debug(kLogXrdClHttp, "Unable to allocate a curl handle");
1523 op->Fail(XrdCl::errInternal, ENOMEM, "Unable to get allocate a curl handle");
1524 keep_handle = false;
1525 options_op = nullptr;
1526 break;
1527 }
1528 OpRecord(*new_op, OpKind::Start);
1529 try {
1530 auto rv = new_op->Setup(curl, *this);
1531 if (!rv) {
1532 m_logger->Debug(kLogXrdClHttp, "Unable to configure a curl handle for OPTIONS");
1533 keep_handle = false;
1534 options_op = nullptr;
1535 break;
1536 }
1537 } catch (...) {
1538 m_logger->Debug(kLogXrdClHttp, "Unable to setup the curl handle for the OPTIONS operation");
1539 new_op->Fail(XrdCl::errInternal, ENOMEM, "Failed to setup the curl handle for the OPTIONS operation");
1540 OpRecord(*new_op, OpKind::Error);
1541 keep_handle = false;
1542 break;
1543 }
1544 new_op->SetContinueQueue(m_continue_queue);
1545 m_op_map[curl] = {new_op, std::chrono::system_clock::now()};
1546 auto mres = curl_multi_add_handle(multi_handle, curl);
1547 if (mres != CURLM_OK) {
1548 m_logger->Debug(kLogXrdClHttp, "Unable to add OPTIONS operation to the curl multi-handle: %s", curl_multi_strerror(mres));
1549 op->Fail(XrdCl::errInternal, mres, "Unable to add OPTIONS operation to the curl multi-handle");
1550 OpRecord(*new_op, OpKind::Error);
1551 break;
1552 }
1553 running_handles += 1;
1554 m_logger->Debug(kLogXrdClHttp, "Invoking the OPTIONS operation before redirect to %s", target.c_str());
1555 // The original curl operation needs to be kept around. Note that because options_op
1556 // is non-nil, we won't re-add the handle to the multi-handle.
1557 keep_handle = true;
1558 }
1559 }
1560 int callout_socket = op->WaitSocket();
1561 if ((waiting_on_callout = callout_socket >= 0)) {
1562 auto expiry = time(nullptr) + 20;
1563 m_logger->Debug(kLogXrdClHttp, "Creating a callout wait request on socket %d", callout_socket);
1564 broker_reqs[callout_socket] = {iter->first, expiry};
1565 m_conncall_req.fetch_add(1, std::memory_order_relaxed);
1566 }
1567 } else if (options_op) {
1568 // In this case, the OPTIONS call happened before the parent operation was started.
1569 curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1570 }
1571 if (keep_handle) {
1572 curl_multi_remove_handle(multi_handle, iter->first);
1573 if (!waiting_on_callout && !options_op) {
1574 curl_multi_add_handle(multi_handle, iter->first);
1575 }
1576 } else if (!options_op) {
1577 op->Success();
1578 op->ReleaseHandle();
1579 // If the handle was successful, then we can recycle it.
1580 queue.RecycleHandle(iter->first);
1581 }
1582 }
1583 } else if (res == CURLE_COULDNT_CONNECT && op->UseConnectionCallout() && !op->GetTriedBoker()) {
1584 // In this case, we need to use the broker and the curl handle couldn't reuse
1585 // an existing socket.
1586 keep_handle = true;
1587 op->SetTriedBoker(); // Flag to ensure we try a connection only once per operation.
1588 std::string err;
1589 int wait_socket = -1;
1590 if (!op->StartConnectionCallout(err) || (wait_socket=op->WaitSocket()) == -1) {
1591 m_logger->Error(kLogXrdClHttp, "Failed to start broker-based connection: %s", err.c_str());
1592 op->ReleaseHandle();
1593 keep_handle = false;
1594 } else {
1595 curl_multi_remove_handle(multi_handle, iter->first);
1596 auto expiry = time(nullptr) + 20;
1597 m_logger->Debug(kLogXrdClHttp, "Curl operation requires a new TCP socket; waiting for callout to respond on socket %d", wait_socket);
1598 broker_reqs[wait_socket] = {iter->first, expiry};
1599 m_conncall_req.fetch_add(1, std::memory_order_relaxed);
1600 }
1601 } else {
1602 if (res == CURLE_ABORTED_BY_CALLBACK || res == CURLE_WRITE_ERROR) {
1603 // We cannot invoke the failure from within a callback as the curl thread and
1604 // original thread of execution may fight over the ownership of the handle memory.
1605 switch (op->GetError()) {
1607#ifdef HAVE_XPROTOCOL_TIMEREXPIRED
1608 op->Fail(XrdCl::errOperationExpired, 0, "Origin did not respond with headers within timeout");
1609#else
1610 op->Fail(XrdCl::errOperationExpired, 0, "Origin did not respond within timeout");
1611#endif
1612 OpRecord(*op, OpKind::Error);
1613 break;
1615 auto [ecode, emsg] = op->GetCallbackError();
1616 op->Fail(XrdCl::errErrorResponse, ecode, emsg);
1617 OpRecord(*op, OpKind::Error);
1618 break;
1619 }
1621 op->Fail(XrdCl::errOperationExpired, 0, "Operation timed out");
1622 OpRecord(*op, op->IsPaused() ? OpKind::ClientTimeout : OpKind::ServerTimeout);
1623 break;
1625 op->Fail(XrdCl::errOperationExpired, 0, "Transfer speed below minimum threshold");
1626 OpRecord(*op, OpKind::ServerTimeout);
1627 break;
1629 op->Fail(XrdCl::errOperationExpired, 0, "Transfer stalled for too long");
1630 OpRecord(*op, OpKind::ClientTimeout);
1631 break;
1633 op->Fail(XrdCl::errOperationExpired, 0, "Transfer stalled for too long");
1634 OpRecord(*op, OpKind::ServerTimeout);
1635 break;
1637 op->Fail(XrdCl::errInternal, 0, "Operation was aborted without recording an abort reason");
1638 OpRecord(*op, OpKind::Error);
1639 break;
1640 };
1641 CurlOptionsOp *options_op = nullptr;
1642 if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get())) != nullptr) {
1643 auto parent_op = options_op->GetOperation();
1644 bool parent_op_failed = false;
1645 if (parent_op->IsRedirect()) {
1646 std::string target;
1647 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1648 auto iter = m_op_map.find(options_op->GetParentCurlHandle());
1649 if (iter != m_op_map.end()) {
1650 OpRecord(*iter->second.first, OpKind::Error);
1651 iter->second.first->Fail(XrdCl::errErrorResponse, 0, "Failed to send OPTIONS to redirect target");
1652 m_op_map.erase(iter);
1653 running_handles -= 1;
1654 }
1655 parent_op_failed = true;
1656 } else {
1657 OpRecord(*parent_op, OpKind::Start);
1658 }
1659 } else {
1660 OpRecord(*parent_op, OpKind::Start);
1661 }
1662 if (!parent_op_failed){
1663 curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1664 }
1665 }
1666 } else {
1667 auto xrdCode = CurlCodeConvert(res);
1668 const auto curl_err = op->GetCurlErrorMessage();
1669 const char *curl_easy_err = curl_easy_strerror(res);
1670 const std::string fail_err = !curl_err.empty() ? curl_err : curl_easy_err;
1671 m_logger->Debug(kLogXrdClHttp, "Curl generated an error: %s (%d)", fail_err.c_str(), res);
1672 op->Fail(xrdCode.first, xrdCode.second, fail_err);
1673 OpRecord(*op, OpKind::Error);
1674 CurlOptionsOp *options_op = nullptr;
1675 if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get())) != nullptr) {
1676 auto parent_op = options_op->GetOperation();
1677 bool parent_op_failed = false;
1678 if (parent_op->IsRedirect()) {
1679 std::string target;
1680 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1681 auto iter = m_op_map.find(options_op->GetParentCurlHandle());
1682 if (iter != m_op_map.end()) {
1683 OpRecord(*iter->second.first, OpKind::Error);
1684 iter->second.first->Fail(XrdCl::errErrorResponse, 0, "Failed to send OPTIONS to redirect target");
1685 m_op_map.erase(iter);
1686 running_handles -= 1;
1687 }
1688 parent_op_failed = true;
1689 }
1690 }
1691 if (!parent_op_failed){
1692 curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1693 }
1694 }
1695 }
1696 op->ReleaseHandle();
1697 }
1698 if (!keep_handle) {
1699 curl_multi_remove_handle(multi_handle, iter->first);
1700 if (res != CURLE_OK) {
1701 curl_easy_cleanup(iter->first);
1702 }
1703 for (auto &req : broker_reqs) {
1704 if (req.second.curl == iter->first) {
1705 m_logger->Warning(kLogXrdClHttp, "Curl handle finished while a broker operation was outstanding");
1706 m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1707 }
1708 }
1709 m_op_map.erase(iter);
1710 running_handles -= 1;
1711 }
1712 }
1713 } while (msg);
1714 }
1715
1716 for (auto map_entry : m_op_map) {
1717 if (mres) {
1718 map_entry.second.first->Fail(XrdCl::errInternal, mres, curl_multi_strerror(mres));
1719 OpRecord(*map_entry.second.first, OpKind::Error);
1720 }
1721 if (multi_handle && map_entry.first) curl_multi_remove_handle(multi_handle, map_entry.first);
1722 }
1723
1724 m_queue->ReleaseHandles();
1725 curl_multi_cleanup(multi_handle);
1726}
1727
1728void
1729CurlWorker::Shutdown()
1730{
1731 m_queue->Shutdown();
1732 if (m_shutdown_pipe_w == -1) {
1733 m_logger->Debug(kLogXrdClHttp, "Curl worker shutdown prior to launch of thread");
1734 return;
1735 }
1736 close(m_shutdown_pipe_w);
1737 m_shutdown_pipe_w = -1;
1738
1739 // wait for worker thread to exit
1740 m_self_tid.join();
1741
1742 {
1743 std::unique_lock lk(m_worker_stats_mutex);
1744 m_workers_last_completed_cycle[m_stats_offset] = nullptr;
1745 m_workers_oldest_op[m_stats_offset] = nullptr;
1746 }
1747 m_logger->Debug(kLogXrdClHttp, "Curl worker thread shutdown has completed.");
1748}
1749
1750void
1751CurlWorker::ShutdownAll()
1752{
1753 std::unique_lock lock(m_workers_mutex);
1754 for (auto &worker : m_workers) {
1755 worker->Shutdown();
1756 }
1757}
1758
1759CurlWorker::initcontrol::initcontrol()
1760{
1761 curl_global_init(CURL_GLOBAL_DEFAULT);
1762}
1763
1764CurlWorker::initcontrol::~initcontrol()
1765{
1766 ShutdownAll();
1767 curl_global_cleanup();
1768}
@ kXR_InvalidRequest
@ kXR_Impossible
@ kXR_TimerExpired
@ kXR_NotAuthorized
@ kXR_NotFound
@ kXR_FileLocked
@ kXR_overQuota
@ kXR_Conflict
@ kXR_ServerError
@ kXR_Overloaded
@ kXR_ReqTimedOut
std::pair< uint16_t, uint32_t > CurlCodeConvert(CURLcode res)
void CURL
std::string obfuscateAuth(const std::string &input)
#define close(a)
Definition XrdPosix.hh:48
#define write(a, b, c)
Definition XrdPosix.hh:115
#define read(a, b, c)
Definition XrdPosix.hh:82
int emsg(int rc, char *msg)
bool Set(ChecksumType ctype, const std::array< unsigned char, g_max_checksum_length > &value)
virtual void Success()=0
bool FinishSetup(CURL *curl)
const std::string & GetUrl() const
CURL * GetCurlHandle() const
static const std::string GetVerbString(HttpVerb)
virtual HttpVerb GetVerb() const =0
std::string GetCurlErrorMessage() const
virtual void ReleaseHandle()
virtual bool RequiresOptions() const
static void CleanupDnsCache()
std::tuple< uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration > StatisticsReset()
virtual bool ContinueHandle()
std::string GetStatusMessage() const
CreateConnCalloutType GetConnCalloutFunc() const
virtual void Fail(uint16_t errCode, uint32_t errNum, const std::string &)
virtual RedirectAction Redirect(std::string &target)
virtual void SetContinueQueue(std::shared_ptr< XrdClHttp::HandlerQueue > queue)
bool StartConnectionCallout(std::string &err)
virtual bool Setup(CURL *curl, CurlWorker &)
std::pair< XErrorCode, std::string > GetCallbackError() const
std::shared_ptr< CurlOperation > GetOperation() const
CURL * GetParentCurlHandle() const
void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override
std::tuple< std::string, std::string > ClientX509CertKeyFile() const
CurlWorker(std::shared_ptr< HandlerQueue > queue, VerbsCache &cache, XrdCl::Log *logger)
static void RunStatic(CurlWorker *myself)
void Start(std::unique_ptr< XrdClHttp::CurlWorker > self, std::thread tid)
static std::string GetMonitoringJson()
std::shared_ptr< CurlOperation > Consume(std::chrono::steady_clock::duration)
HandlerQueue(unsigned max_pending_ops)
void Produce(std::shared_ptr< CurlOperation > handler)
static std::string GetMonitoringJson()
std::shared_ptr< CurlOperation > TryConsume()
void SetMultipartSeparator(const std::string_view &sep)
static bool Base64Decode(std::string_view input, std::array< unsigned char, 32 > &output)
static void ParseDigest(const std::string &digest, XrdClHttp::ChecksumInfo &info)
static bool Canonicalize(std::string &headerName)
bool Parse(const std::string &headers)
static std::string ChecksumTypeToDigestName(XrdClHttp::ChecksumType type)
static std::string_view GetUrlKey(const std::string &url, std::string &modified_url)
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Handle diagnostics.
Definition XrdClLog.hh:101
@ DumpMsg
print details of the request and responses
Definition XrdClLog.hh:113
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
std::pair< uint16_t, uint32_t > HTTPStatusConvert(unsigned status)
CURL * GetHandle(bool verbose)
bool HTTPStatusIsError(unsigned status)
std::string_view ltrim_view(const std::string_view &input_view)
const uint64_t kLogXrdClHttp
std::string_view trim_view(const std::string_view &input_view)
const uint16_t errUnknown
Unknown error.
const uint16_t errInvalidAddr
const uint16_t errRedirectLimit
const uint16_t errErrorResponse
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errLoginFailed
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t errInvalidArgs
const uint16_t errConnectionError
const uint16_t errNotSupported
const uint16_t errSocketError
const uint16_t errCorruptedHeader
const uint16_t errNone
No error.