XRootD
Loading...
Searching...
No Matches
XrdClHttpOps.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3/* */
4/* This file is part of the XrdClHttp client plugin for XRootD. */
5/* */
6/* XRootD is free software: you can redistribute it and/or modify it under */
7/* the terms of the GNU Lesser General Public License as published by the */
8/* Free Software Foundation, either version 3 of the License, or (at your */
9/* option) any later version. */
10/* */
11/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14/* License for more details. */
15/* */
16/* The copyright holder's institutional names and contributor's names may not */
17/* be used to endorse or promote products derived from this software without */
18/* specific prior written permission of the institution or contributor. */
19/******************************************************************************/
20
21#include "XrdClHttpOps.hh"
22#include "XrdClHttpResponses.hh"
23#include "XrdClHttpUtil.hh"
24#include "XrdClHttpWorker.hh"
25
27#include <XrdCl/XrdClLog.hh>
29
30#include <arpa/inet.h>
31#include <unistd.h>
32#include <chrono>
33#include <cmath>
34#ifdef __APPLE__
35#include <stdlib.h>
36#else
37#include <sys/random.h>
38#endif
39#include <utility>
40
41using namespace XrdClHttp;
42
43std::chrono::steady_clock::duration CurlOperation::m_stall_interval{CurlOperation::m_default_stall_interval};
45
46namespace {
47
48// For connection callbacks, we don't want to require a real DNS lookup; instead, we
49// will generate a fake address in the 169.254.x.y range and use that for the connection.
50// This will be fed to libcurl via the CURLOPT_RESOLVE option, which will bypass DNS lookups.
51
52// A randomized counter for generating fake addresses in the 169.254.x.y range
53thread_local int64_t fake_dns_counter = -1;
54
55// Map from hostname:port to fake address (e.g., 169.254.x.y:port) and
56// std::string pointer for the fake address.
57// We must track the std::string pointer so we can pass it to libcurl
58// as the CURLOPT_CLOSESOCKETDATA, which is passed to the close socket callback; the
59// lifetime of the pointer must be at least as long as the lifetime of the socket
60// (we maintain a reference count manually below).
61thread_local std::unordered_map<std::string, std::pair<std::string, std::string*>> fake_dns_map;
62
63// Reverse map from fake address (e.g., 169.254.x.y:port) to hostname:port and reference pointer
64thread_local std::unordered_map<std::string, std::pair<std::string, std::string*>> reverse_fake_dns_map;
65
66// References to fake addresses in use. The value is a reference count of sockets using
67// this address; when the count goes to zero, we can remove the entry from the above maps.
68// The second member is a unique_ptr to the std::string for the fake address, which will be
69// cleaned up when the refcount goes to zero.
70struct refcount_entry {
71 int count;
72 std::unique_ptr<std::string> addr;
73 std::chrono::steady_clock::time_point last_used;
74
75 bool IsExpired(std::chrono::steady_clock::time_point now) const {
76 return (now - last_used) > std::chrono::minutes(1);
77 }
78};
79
80thread_local std::unordered_map<std::string *, std::unique_ptr<refcount_entry>> fake_dns_refcount;
81
82std::string GenerateFakeEndpoint() {
83 if (fake_dns_counter == -1) {
84#ifdef __APPLE__
85 fake_dns_counter = arc4random();
86#else
87 errno = 0;
88 while (fake_dns_counter < 0 || errno == EINTR) {
89 if (getrandom((void*)&fake_dns_counter, sizeof(fake_dns_counter), 0) == sizeof(fake_dns_counter)) {
90 break;
91 }
92 }
93#endif
94 }
95 uint64_t addr = static_cast<uint64_t>(fake_dns_counter);
96 uint32_t class_d = addr & 0xff;
97 uint32_t class_c = (addr >> 8) & 0xff;
98 uint32_t port = 1024 + ((addr >> 16) % (65535 - 1024));
99 fake_dns_counter++;
100
101 return std::string("169.254.") + std::to_string(class_c) + "." + std::to_string(class_d) + ":" + std::to_string(port);
102}
103
104std::string *GetFakeEndpointForHost(const std::string &host, int port) {
105 std::string key = host + ":" + std::to_string(port);
106 auto it = fake_dns_map.find(key);
107 if (it != fake_dns_map.end()) {
108 return it->second.second;
109 }
110 auto addr = GenerateFakeEndpoint();
111 if (reverse_fake_dns_map.find(addr) != reverse_fake_dns_map.end()) {
112 return nullptr; // Collision, out of addresses.
113 }
114 auto addr_ptr_raw = new std::string(addr);
115 std::unique_ptr<std::string> addr_ptr(addr_ptr_raw);
116 fake_dns_map[key] = {addr, addr_ptr.get()};
117 reverse_fake_dns_map[addr] = {key, addr_ptr.get()};
118 std::unique_ptr<refcount_entry> new_entry(new refcount_entry{0, std::move(addr_ptr), std::chrono::steady_clock::now()});
119 fake_dns_refcount[addr_ptr_raw] = std::move(new_entry);
120 return addr_ptr_raw;
121}
122
123std::pair<std::string, int> ParseHostPort(const std::string &location) {
124 auto pos = location.find("://");
125 std::string authority = (pos == std::string::npos) ? location : location.substr(pos + 3);
126 std::string schema = (pos == std::string::npos) ? "" : location.substr(0, pos);
127 int std_port = (schema == "https" || schema == "davs") ? 443 : 80;
128 auto at_pos = authority.find('@');
129 std::string hostport = (at_pos == std::string::npos) ? authority : authority.substr(at_pos + 1);
130 pos = hostport.find('/');
131 if (pos != std::string::npos) {
132 hostport = hostport.substr(0, pos);
133 }
134 pos = hostport.find(':');
135 if (pos == std::string::npos) {
136 return {hostport, std_port};
137 }
138 int port = std_port;
139 try {
140 port = std::stoi(hostport.substr(pos + 1));
141 } catch (...) {
142 port = std_port;
143 }
144 return {hostport.substr(0, pos), port};
145}
146
147std::string DavToHttp(const std::string &url) {
148 if (url.compare(0, 6, "dav://") == 0) {
149 return "http://" + url.substr(6);
150 }
151 if (url.compare(0, 7, "davs://") == 0) {
152 return "https://" + url.substr(7);
153 }
154 return url;
155}
156
157} // namespace
158
159std::chrono::steady_clock::time_point CalculateExpiry(struct timespec timeout) {
160 if (timeout.tv_sec == 0 && timeout.tv_nsec == 0) {
161 return std::chrono::steady_clock::now() + std::chrono::seconds(30);
162 }
163 return std::chrono::steady_clock::now() + std::chrono::seconds(timeout.tv_sec) + std::chrono::nanoseconds(timeout.tv_nsec);
164}
165
167 struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout,
168 HeaderCallout *header_callout) :
169 CurlOperation::CurlOperation(handler, url, CalculateExpiry(timeout), logger, callout, header_callout)
170 {}
171
173 std::chrono::steady_clock::time_point expiry, XrdCl::Log *logger,
174 CreateConnCalloutType callout, HeaderCallout *header_callout) :
175 m_header_expiry(expiry),
176 m_header_callout(header_callout),
177 m_last_reset(std::chrono::steady_clock::now()),
178 m_last_header_reset(m_last_reset),
179 m_start_op(m_last_reset),
180 m_header_start(m_last_reset),
181 m_conn_callout(callout),
182 m_url(DavToHttp(url)),
183 m_handler(handler),
184 m_curl(nullptr, &curl_easy_cleanup),
185 m_logger(logger)
186 {}
187
189
190void
191CurlOperation::Fail(uint16_t errCode, uint32_t errNum, const std::string &msg)
192{
193 SetDone(true);
194 if (m_handler == nullptr) {return;}
195 if (!msg.empty()) {
196 m_logger->Debug(kLogXrdClHttp, "curl operation failed with message: %s", msg.c_str());
197 } else {
198 m_logger->Debug(kLogXrdClHttp, "curl operation failed with status code %d", errNum);
199 }
200 auto status = new XrdCl::XRootDStatus(XrdCl::stError, errCode, errNum, msg);
201 auto handle = m_handler;
202 m_handler = nullptr;
203 handle->HandleResponse(status, nullptr);
204}
205
206int
207CurlOperation::FailCallback(XErrorCode ecode, const std::string &emsg) {
208 m_callback_error_code = ecode;
209 m_callback_error_str = emsg;
210 m_error = OpError::ErrCallback;
211 m_logger->Debug(kLogXrdClHttp, "%s", emsg.c_str());
212 return 0;
213}
214
215bool
217{
218 if (!m_header_callout) {
219 m_header_slist.reset();
220 for (const auto &header : m_headers_list) {
221 m_header_slist.reset(curl_slist_append(m_header_slist.release(),
222 (header.first + ": " + header.second).c_str()));
223 }
224 return curl_easy_setopt(curl, CURLOPT_HTTPHEADER, m_header_slist.get()) == CURLE_OK;
225 }
226 const auto &verb = GetVerbString(GetVerb());
227
228 auto extra_headers = m_header_callout->GetHeaders(verb, m_url, m_headers_list);
229 if (!extra_headers) {
230 m_logger->Error(kLogXrdClHttp, "Failed to get headers from header callout for %s", m_url.c_str());
231 return false;
232 }
233 m_header_slist.reset();
234 for (const auto &header : *extra_headers) {
235 if (!strcasecmp(header.first.c_str(), "Content-Length")) {
236 auto upload_size = std::stoull(header.second);
237 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, upload_size);
238 continue;
239 }
240 m_header_slist.reset(curl_slist_append(m_header_slist.release(),
241 (header.first + ": " + header.second).c_str()));
242 }
243 return curl_easy_setopt(curl, CURLOPT_HTTPHEADER, m_header_slist.get()) == CURLE_OK;
244}
245
246const std::string
248{
249 switch (verb) {
250 case HttpVerb::COPY:
251 return "COPY";
252 case HttpVerb::DELETE:
253 return "DELETE";
254 case HttpVerb::GET:
255 return "GET";
256 case HttpVerb::HEAD:
257 return "HEAD";
258 case HttpVerb::MKCOL:
259 return "MKCOL";
261 return "OPTIONS";
263 return "PROPFIND";
264 case HttpVerb::PUT:
265 return "PUT";
266 case HttpVerb::Count:
267 return "UNKNOWN";
268 }
269 return "UNKNOWN";
270}
271
272size_t
273CurlOperation::HeaderCallback(char *buffer, size_t size, size_t nitems, void *this_ptr)
274{
275 std::string header(buffer, size * nitems);
276 auto me = static_cast<CurlOperation*>(this_ptr);
277 auto now = std::chrono::steady_clock::now();
278 if (!me->m_received_header) {
279 me->m_received_header = true;
280 me->m_header_start = now;
281 }
282 me->m_header_lastop = now;
283 auto rv = me->Header(header);
284 return rv ? (size * nitems) : 0;
285}
286
287bool
288CurlOperation::Header(const std::string &header)
289{
290 auto result = m_headers.Parse(header);
291 // m_logger->Debug(kLogXrdClHttp, "Got header: %s", header.c_str());
292 if (!result) {
293 m_logger->Debug(kLogXrdClHttp, "Failed to parse response header: %s", header.c_str());
294 }
295 if (m_headers.HeadersDone()) {
296 if (!m_response_info) {
297 m_response_info.reset(new ResponseInfo());
298 }
299 m_response_info->AddResponse(m_headers.MoveHeaders());
300 }
301 return result;
302}
303
305CurlOperation::Redirect(std::string &target)
306{
307 m_callout.reset();
308 m_conn_callout_result = -1;
309 m_conn_callout_listener = -1;
310 m_tried_broker = false;
311
312 auto location = m_headers.GetLocation();
313 if (location.empty()) {
314 m_logger->Warning(kLogXrdClHttp, "After request to %s, server returned a redirect with no new location", m_url.c_str());
315 Fail(XrdCl::errErrorResponse, kXR_ServerError, "Server returned redirect without updated location");
317 }
318 if (location.size() && location[0] == '/') { // hostname not included in the location - redirect to self.
319 std::string_view orig_url(m_url);
320 auto scheme_loc = orig_url.find("://");
321 if (scheme_loc == std::string_view::npos) {
322 Fail(XrdCl::errErrorResponse, kXR_ServerError, "Server returned a location with unknown hostname");
324 }
325 auto path_loc = orig_url.find('/', scheme_loc + 3);
326 if (path_loc == std::string_view::npos) {
327 location = m_url + location;
328 } else {
329 location = std::string(orig_url.substr(0, path_loc)) + location;
330 }
331 }
332 m_logger->Debug(kLogXrdClHttp, "Request for %s redirected to %s", m_url.c_str(), location.c_str());
333 target = location;
334 curl_easy_setopt(m_curl.get(), CURLOPT_URL, location.c_str());
335 int disable_x509;
336 auto env = XrdCl::DefaultEnv::GetEnv();
337 if (env->GetInt("HttpDisableX509", disable_x509) && !disable_x509) {
338 std::string cert, key;
339 env->GetString("HttpClientCertFile", cert);
340 env->GetString("HttpClientKeyFile", key);
341 if (!cert.empty())
342 curl_easy_setopt(m_curl.get(), CURLOPT_SSLCERT, cert.c_str());
343 if (!key.empty())
344 curl_easy_setopt(m_curl.get(), CURLOPT_SSLKEY, key.c_str());
345 }
347
348 if (m_conn_callout) {
349 auto conn_callout = m_conn_callout(location, *m_response_info);
350 if (conn_callout != nullptr) {
351
352 auto [host, port] = ParseHostPort(location);
353 if (host.empty() || port == -1) {
354 Fail(XrdCl::errInternal, 0, "Failed to parse host and port from URL " + location);
356 }
357 auto fake_addr = GetFakeEndpointForHost(host, port);
358 if (!fake_addr || fake_addr->empty()) {
359 Fail(XrdCl::errInternal, 0, "Failed to generate a fake address for host " + host);
361 }
362 m_resolve_slist.reset(curl_slist_append(m_resolve_slist.release(),
363 (host + ":" + std::to_string(port) + ":" + *fake_addr).c_str()));
364 m_logger->Debug(kLogXrdClHttp, "For connection callout in redirect, mapping %s:%d -> %s", host.c_str(), port, fake_addr->c_str());
365
366 m_callout.reset(conn_callout);
367 std::string err;
369 if ((m_conn_callout_listener = m_callout->BeginCallout(err, m_header_expiry)) == -1) {
370 auto errMsg = "Failed to start a connection callout request: " + err;
371 Fail(XrdCl::errInternal, 0, errMsg.c_str());
373 }
374 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, CurlOperation::OpenSocketCallback);
375 curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETFUNCTION, CurlOperation::CloseSocketCallback);
376 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, this);
377 curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETDATA, fake_addr);
378 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, CurlOperation::SockOptCallback);
379 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, this);
380 curl_easy_setopt(m_curl.get(), CURLOPT_CONNECT_TO, m_resolve_slist.get());
381 }
382 }
383 m_received_header = false;
384
385 m_last_header_reset = m_last_reset = m_header_start = m_start_op = m_header_lastop = std::chrono::steady_clock::now();
387}
388
389namespace {
390
391size_t
392NullCallback(char * /*buffer*/, size_t size, size_t nitems, void * /*this_ptr*/)
393{
394 return size * nitems;
395}
396
397}
398
399void
401 m_is_paused = paused;
402 if (m_is_paused) {
403 m_pause_start = std::chrono::steady_clock::now();
404 } else if (m_pause_start != std::chrono::steady_clock::time_point{}) {
405 m_pause_duration += std::chrono::steady_clock::now() - m_pause_start;
406 m_pause_start = std::chrono::steady_clock::time_point{};
407 }
408}
409
410bool
412{
413 if ((m_conn_callout_listener = m_callout->BeginCallout(err, m_header_expiry)) == -1) {
414 err = "Failed to start a callout for a socket connection: " + err;
415 Fail(XrdCl::errInternal, 1, err.c_str());
416 return false;
417 }
418 return true;
419}
420
421std::tuple<uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration>
423 auto now = std::chrono::steady_clock::now();
424 std::chrono::steady_clock::duration pre_header{}, post_header{}, pause_duration{};
425 if (m_received_header) {
426 if (m_last_header_reset < m_header_start) {
427 pre_header = m_header_start - m_last_header_reset;
428 m_last_header_reset = m_header_start;
429 }
430 post_header = now - ((m_last_reset < m_header_start) ? m_header_start : m_last_reset);
431 m_last_reset = now;
432 } else {
433 pre_header = now - m_last_header_reset;
434 m_last_header_reset = now;
435 }
436 if (IsPaused()) {
437 m_pause_duration += now - m_pause_start;
438 m_pause_start = now;
439 }
440 if (m_pause_duration != std::chrono::steady_clock::duration::zero()) {
441 pause_duration = m_pause_duration;
442 m_pause_duration = std::chrono::steady_clock::duration::zero();
443 }
444 auto bytes = m_bytes;
445 m_bytes = 0;
446 return {bytes, pre_header, post_header, pause_duration};
447}
448
449bool
450CurlOperation::HeaderTimeoutExpired(const std::chrono::steady_clock::time_point &now) {
451 if (m_received_header) return false;
452
453 if (now > m_header_expiry) {
454 if (m_error == OpError::ErrNone) m_error = OpError::ErrHeaderTimeout;
455 return true;
456 }
457 return false;
458}
459
460bool
461CurlOperation::OperationTimeoutExpired(const std::chrono::steady_clock::time_point &now) {
462 if (m_operation_expiry == std::chrono::steady_clock::time_point{} ||
463 !m_received_header) {
464 return false;
465 }
466
467 if (now > m_operation_expiry) {
468 if (m_error == OpError::ErrNone) m_error = OpError::ErrOperationTimeout;
469 return true;
470 }
471 return false;
472}
473
474bool
475CurlOperation::TransferStalled(uint64_t xfer, const std::chrono::steady_clock::time_point &now)
476{
477 // First, check to see how long it's been since any data was sent.
478 if (m_last_xfer == std::chrono::steady_clock::time_point()) {
479 m_last_xfer = m_header_lastop;
480 }
481 auto elapsed = now - m_last_xfer;
482 uint64_t xfer_diff = 0;
483 if (xfer > m_last_xfer_count) {
484 xfer_diff = xfer - m_last_xfer_count;
485 m_last_xfer_count = xfer;
486 m_last_xfer = now;
487 }
488
489 // If progress is made in this callback do not classify as stalled
490 if (elapsed > m_stall_interval && xfer_diff == 0) {
492 return true;
493 }
494
495 // Curl updated us with new timing but the byte count hasn't changed; no need to update the EMA.
496 if (xfer_diff == 0) {
497 return false;
498 }
499
500 // If the transfer is not stalled, then we check to see if the exponentially-weighted
501 // moving average of the transfer rate is below the minimum.
502
503 // If the stall interval since the last header hasn't passed, then we don't check for slow transfers.
504 auto elapsed_since_last_headerop = now - m_header_lastop;
505 if (elapsed_since_last_headerop < m_stall_interval) {
506 return false;
507 } else if (m_ema_rate < 0) {
508 m_ema_rate = xfer / std::chrono::duration<double>(elapsed_since_last_headerop).count();
509 }
510 // Calculate the exponential moving average of the transfer rate.
511 double elapsed_seconds = std::chrono::duration<double>(elapsed).count();
512 auto recent_rate = static_cast<double>(xfer_diff) / elapsed_seconds;
513 auto alpha = 1.0 - exp(-elapsed_seconds / std::chrono::duration<double>(m_stall_interval).count());
514 m_ema_rate = (1.0 - alpha) * m_ema_rate + alpha * recent_rate;
515 if (m_ema_rate < static_cast<double>(m_minimum_rate)) {
516 if (m_error == OpError::ErrNone) m_error = OpError::ErrTransferSlow;
517 return true;
518 }
519 return false;
520}
521
522bool
524{
525 if (curl == nullptr) {
526 throw std::runtime_error("Unable to setup curl operation with no handle");
527 }
528 struct timespec now;
529 if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) {
530 throw std::runtime_error("Unable to get current time");
531 }
532
533 m_pause_start = {};
534 m_last_header_reset = m_last_reset = m_start_op = m_header_start = m_header_lastop = std::chrono::steady_clock::now();
535
536 m_curl.reset(curl);
537 m_curl_error_buffer[0] = '\0';
538 curl_easy_setopt(m_curl.get(), CURLOPT_URL, m_url.c_str());
539 curl_easy_setopt(m_curl.get(), CURLOPT_ERRORBUFFER, m_curl_error_buffer);
540 curl_easy_setopt(m_curl.get(), CURLOPT_HEADERFUNCTION, CurlStatOp::HeaderCallback);
541 curl_easy_setopt(m_curl.get(), CURLOPT_HEADERDATA, this);
542 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, NullCallback);
543 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, nullptr);
544 curl_easy_setopt(m_curl.get(), CURLOPT_XFERINFOFUNCTION, CurlOperation::XferInfoCallback);
545 curl_easy_setopt(m_curl.get(), CURLOPT_XFERINFODATA, this);
546 curl_easy_setopt(m_curl.get(), CURLOPT_NOPROGRESS, 0L);
547 // Note: libcurl is not threadsafe unless this option is set.
548 // Before we set it, we saw deadlocks (and partial deadlocks) in practice.
549 curl_easy_setopt(m_curl.get(), CURLOPT_NOSIGNAL, 1L);
550
551 m_parsed_url.reset(new XrdCl::URL(m_url));
552 auto env = XrdCl::DefaultEnv::GetEnv();
553 int disable_x509;
554 if ((env->GetInt("HttpDisableX509", disable_x509) && !disable_x509)) {
555 auto [cert, key] = worker.ClientX509CertKeyFile();
556 if (!cert.empty()) {
557 m_logger->Debug(kLogXrdClHttp, "Using client X.509 credential found at %s", cert.c_str());
558 curl_easy_setopt(m_curl.get(), CURLOPT_SSLCERT, cert.c_str());
559 if (key.empty()) {
560 m_logger->Error(kLogXrdClHttp, "X.509 client credential specified but not the client key");
561 } else {
562 curl_easy_setopt(m_curl.get(), CURLOPT_SSLKEY, key.c_str());
563 }
564 }
565 }
566
567 if (m_conn_callout) {
568 ResponseInfo info;
569 auto callout = m_conn_callout(m_url, info);
570 if (callout) {
571 m_callout.reset(callout);
572 m_conn_callout_listener = -1;
573 m_conn_callout_result = -1;
574 m_tried_broker = false;
575
576 auto [host, port] = ParseHostPort(m_url);
577 if (host.empty() || port == -1) {
578 throw std::runtime_error ("Failed to parse host and port from URL " + m_url);
579 }
580 auto fake_addr = GetFakeEndpointForHost(host, port);
581 if (!fake_addr || fake_addr->empty()) {
582 throw std::runtime_error("Failed to generate a fake address for host " + host);
583 }
584 m_resolve_slist.reset(curl_slist_append(m_resolve_slist.release(),
585 (host + ":" + std::to_string(port) + ":" + *fake_addr).c_str()));
586 m_logger->Debug(kLogXrdClHttp, "For connection callout in operation setup, mapping %s:%d -> %s", host.c_str(), port, fake_addr->c_str());
587
588 curl_easy_setopt(m_curl.get(), CURLOPT_CONNECT_TO, m_resolve_slist.get());
589
590 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, CurlOperation::OpenSocketCallback);
591 curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETFUNCTION, CurlOperation::CloseSocketCallback);
592 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, this);
593 curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETDATA, fake_addr);
594 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, CurlOperation::SockOptCallback);
595 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, this);
596 }
597 }
598
599 return true;
600}
601
602void
604{
605 m_conn_callout_listener = -1;
606 m_conn_callout_result = -1;
607 m_tried_broker = false;
608 m_callout.reset();
609
610 if (m_curl == nullptr) return;
611 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, nullptr);
612 curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETFUNCTION, nullptr);
613 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, nullptr);
614 curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETDATA, nullptr);
615 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, nullptr);
616 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, nullptr);
617 curl_easy_setopt(m_curl.get(), CURLOPT_SSLCERT, nullptr);
618 curl_easy_setopt(m_curl.get(), CURLOPT_SSLKEY, nullptr);
619 curl_easy_setopt(m_curl.get(), CURLOPT_HTTPHEADER, nullptr);
620 curl_easy_setopt(m_curl.get(), CURLOPT_CONNECT_TO, nullptr);
621 m_header_slist.reset();
622 m_curl.release();
623}
624
625curl_socket_t
626CurlOperation::OpenSocketCallback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address)
627{
628 auto me = reinterpret_cast<CurlOperation*>(clientp);
629 auto fd = me->m_conn_callout_result;
630 me->m_conn_callout_result = -1;
631 if (fd == -1) {
632 std::string err;
633 if ((me->m_conn_callout_listener = me->m_callout->BeginCallout(err, me->m_header_expiry)) == -1) {
634 me->m_logger->Debug(kLogXrdClHttp, "Failed to start a connection callout request: %s", err.c_str());
635 }
636 return CURL_SOCKET_BAD;
637 } else {
638 sockaddr_in *inaddr = reinterpret_cast<sockaddr_in*>(&address->addr);
639 char ip_str[INET_ADDRSTRLEN];
640 char full_address_str[INET_ADDRSTRLEN + 6];
641 inet_ntop(AF_INET, &(inaddr->sin_addr), ip_str, INET_ADDRSTRLEN);
642 int port = ntohs(inaddr->sin_port);
643 snprintf(full_address_str, sizeof(full_address_str), "%s:%d", ip_str, port);
644 me->m_logger->Debug(kLogXrdClHttp, "Recording socket %d for %s", fd, full_address_str);
645 auto reverse_iter = reverse_fake_dns_map.find(full_address_str);
646 if (reverse_iter == reverse_fake_dns_map.end()) {
647 me->m_logger->Error(kLogXrdClHttp, "Failed to find fake DNS reverse entry for %s", full_address_str);
648 close(fd);
649 return CURL_SOCKET_BAD;
650 } else {
651 auto iter = fake_dns_refcount.find(reverse_iter->second.second);
652 if (iter == fake_dns_refcount.end()) {
653 me->m_logger->Error(kLogXrdClHttp, "Failed to find fake DNS refcount entry for %s", full_address_str);
654 close(fd);
655 return CURL_SOCKET_BAD;
656 }
657 iter->second->count++;
658 iter->second->last_used = std::chrono::steady_clock::now();
659 }
660
661 return fd;
662 }
663}
664
665int
666CurlOperation::SockOptCallback(void *clientp, curl_socket_t curlfd, curlsocktype purpose)
667{
668 return CURL_SOCKOPT_ALREADY_CONNECTED;
669}
670
671curl_socket_t
672CurlOperation::CloseSocketCallback(void *clientp, curl_socket_t fd)
673{
674 close(fd);
675 auto me = reinterpret_cast<std::string*>(clientp);
676 if (me == nullptr) {return 0;}
677 auto iter = fake_dns_refcount.find(me);
678 if (iter != fake_dns_refcount.end()) {
679 iter->second->count--;
680 if (iter->second->count <= 0 && iter->second->IsExpired(std::chrono::steady_clock::now())) {
681 auto rev_iter = reverse_fake_dns_map.find(*me);
682 if (rev_iter != reverse_fake_dns_map.end()) {
683 fake_dns_map.erase(rev_iter->second.first);
684 reverse_fake_dns_map.erase(rev_iter);
685 }
686 fake_dns_refcount.erase(iter);
687 }
688 }
689
690 return 0;
691}
692
693void
695{
696 auto now = std::chrono::steady_clock::now();
697 for (auto it = fake_dns_refcount.begin(); it != fake_dns_refcount.end(); ) {
698 if (it->second->count <= 0 && it->second->IsExpired(now)) {
699 auto rev_iter = reverse_fake_dns_map.find(*it->first);
700 if (rev_iter != reverse_fake_dns_map.end()) {
701 fake_dns_map.erase(rev_iter->second.first);
702 reverse_fake_dns_map.erase(rev_iter);
703 }
704 it = fake_dns_refcount.erase(it);
705 } else {
706 ++it;
707 }
708 }
709}
710
711int
712CurlOperation::XferInfoCallback(void *clientp, curl_off_t /*dltotal*/, curl_off_t dlnow, curl_off_t /*ultotal*/, curl_off_t ulnow)
713{
714 auto me = reinterpret_cast<CurlOperation*>(clientp);
715 auto now = std::chrono::steady_clock::now();
716 if (me->HeaderTimeoutExpired(now) || me->OperationTimeoutExpired(now)) {
717 return 1; // return value triggers CURLE_ABORTED_BY_CALLBACK
718 }
719 uint64_t xfer_bytes = dlnow > ulnow ? dlnow : ulnow;
720 if (me->TransferStalled(xfer_bytes, now)) {
721 return 1;
722 }
723 return 0;
724}
725
726int
728{
729 m_conn_callout_result = m_callout ? m_callout->FinishCallout(err) : -1;
730 if (m_callout && m_conn_callout_result == -1) {
731 m_logger->Error(kLogXrdClHttp, "Error when getting socket callout: %s", err.c_str());
732 } else if (m_callout) {
733 m_logger->Debug(kLogXrdClHttp, "Got callback socket %d", m_conn_callout_result);
734 }
735 return m_conn_callout_result;
736}
XErrorCode
@ kXR_ServerError
std::chrono::steady_clock::time_point CalculateExpiry(struct timespec timeout)
void CURL
#define close(a)
Definition XrdPosix.hh:48
int emsg(int rc, char *msg)
void SetDone(bool has_failed)
int FailCallback(XErrorCode ecode, const std::string &emsg)
bool FinishSetup(CURL *curl)
const std::string m_url
std::chrono::steady_clock::time_point m_header_expiry
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
bool TransferStalled(uint64_t xfer_bytes, const std::chrono::steady_clock::time_point &now)
static const std::string GetVerbString(HttpVerb)
virtual HttpVerb GetVerb() const =0
virtual void ReleaseHandle()
static void CleanupDnsCache()
std::tuple< uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration > StatisticsReset()
static constexpr int m_default_minimum_rate
std::vector< std::pair< std::string, std::string > > m_headers_list
HeaderCallout * m_header_callout
bool HeaderTimeoutExpired(const std::chrono::steady_clock::time_point &now)
virtual int WaitSocketCallback(std::string &err)
std::chrono::steady_clock::time_point m_operation_expiry
virtual void Fail(uint16_t errCode, uint32_t errNum, const std::string &)
virtual RedirectAction Redirect(std::string &target)
XrdCl::ResponseHandler * m_handler
CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout)
void SetPaused(bool paused)
bool StartConnectionCallout(std::string &err)
bool OperationTimeoutExpired(const std::chrono::steady_clock::time_point &now)
virtual bool Setup(CURL *curl, CurlWorker &)
std::tuple< std::string, std::string > ClientX509CertKeyFile() const
static Env * GetEnv()
Get default client environment.
Handle diagnostics.
Definition XrdClLog.hh:101
Handle an async response.
URL representation.
Definition XrdClURL.hh:31
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
const uint64_t kLogXrdClHttp
const uint16_t errErrorResponse
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.