XRootD
Loading...
Searching...
No Matches
XrdClHttpOpRead.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3/* */
4/* This file is part of the XrdClHttp client plugin for XRootD. */
5/* */
6/* XRootD is free software: you can redistribute it and/or modify it under */
7/* the terms of the GNU Lesser General Public License as published by the */
8/* Free Software Foundation, either version 3 of the License, or (at your */
9/* option) any later version. */
10/* */
11/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14/* License for more details. */
15/* */
16/* The copyright holder's institutional names and contributor's names may not */
17/* be used to endorse or promote products derived from this software without */
18/* specific prior written permission of the institution or contributor. */
19/******************************************************************************/
20
21#include "XrdClHttpOps.hh"
22
23#include <XrdCl/XrdClLog.hh>
25#include <XrdOuc/XrdOucCRC.hh>
27
28using namespace XrdClHttp;
29
30CurlReadOp::CurlReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
31 const std::string &url, struct timespec timeout, const std::pair<uint64_t, uint64_t> &op,
32 char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout,
33 HeaderCallout *header_callout) :
34 CurlOperation(handler, url, timeout, logger, callout, header_callout),
35 m_default_handler(default_handler),
36 m_op(op),
37 m_buffer(buffer),
39 {}
40
41bool
42CurlReadOp::Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, char *buffer, size_t buffer_size)
43{
44 if (op.get() != this) {
45 m_logger->Debug(kLogXrdClHttp, "Interface error: must provide shared pointer to self");
46 Fail(XrdCl::errInternal, 0, "Interface error: must provide shared pointer to self");
47 return false;
48 }
49 m_handler = handler;
50 m_buffer = buffer;
51 m_buffer_size = buffer_size;
52 m_written = 0;
53
54 if (!m_prefetch_buffer.empty()) {
55 auto prefetch_remaining = m_prefetch_buffer.size() - m_prefetch_buffer_offset;
56 auto to_copy = prefetch_remaining > buffer_size ? buffer_size : prefetch_remaining;
57 m_written += to_copy;
58 memcpy(buffer, m_prefetch_buffer.data() + m_prefetch_buffer_offset, to_copy);
59 m_prefetch_buffer_offset += to_copy;
60 if (m_prefetch_buffer_offset == m_prefetch_buffer.size()) {
61 m_prefetch_buffer.clear();
62 m_prefetch_buffer_offset = 0;
63 }
64 }
65
66 // This handles the case where the transfer finished but its last WriteCallback
67 // produced more data than the client buffer could hold, the excess being stored
68 // in the prefetch buffer
69 // we just need to deliver the response without re-queuing to the continue queue
70 if (op->IsDone()) {
71 DeliverResponse();
72 } else {
73 try {
74 m_continue_queue->Produce(op);
75 } catch (...) {
76 Fail(XrdCl::errInternal, ENOMEM, "Failed to continue the curl operation");
77 return false;
78 }
79 }
80
81 return true;
82}
83
84bool
86{
87 if (IsDone()) {
88 return false;
89 }
90 if (!m_curl) {
91 return false;
92 }
93
94 CURLcode rc;
95 if ((rc = curl_easy_pause(m_curl.get(), CURLPAUSE_CONT)) != CURLE_OK) {
96 m_logger->Error(kLogXrdClHttp, "Failed to continue a paused handle: %s", curl_easy_strerror(rc));
97 return false;
98 }
99 SetPaused(false);
100 return m_curl.get();
101 }
102
103bool
105{
106 if (!CurlOperation::Setup(curl, worker)) {return false;}
107
108 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, CurlReadOp::WriteCallback);
109 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, this);
110
111 // Note: range requests are inclusive of the end byte, meaning "bytes=0-1023" is a 1024-byte request.
112 // This is why we subtract '1' off the end.
113 if (m_op.second == 0) {
114 Success();
115 return true;
116 }
117 if (m_op.second >= 1024*1024) {
118 curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);
119 }
120 else if (m_op.second >= 256*1024) {
121 curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 64*1024);
122 }
123 else if (m_op.second >= 128*1024) {
124 curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 32*1024);
125 }
126 // If the requested read size is UINT64_MAX, it means read the entire object;
127 // in this case, we do not set the Range header.
128 if (m_op.second != UINT64_MAX) {
129 auto range_req = "bytes=" + std::to_string(m_op.first) + "-" + std::to_string(m_op.first + m_op.second - 1);
130 m_headers_list.emplace_back("Range", range_req);
131 }
132
133 return true;
134}
135
136void
137CurlReadOp::Fail(uint16_t errCode, uint32_t errNum, const std::string &msg)
138{
139 std::string custom_msg = msg;
140 SetDone(true);
141 if (m_handler == nullptr && m_default_handler == nullptr) {return;}
142 if (!custom_msg.empty()) {
143 m_logger->Debug(kLogXrdClHttp, "curl operation at offset %llu failed with message: %s%s", static_cast<long long unsigned>(m_op.first), msg.c_str(), m_err_msg.empty() ? "" : (", server message: " + m_err_msg).c_str());
144 custom_msg += " (read operation at offset " + std::to_string(static_cast<long long unsigned>(m_op.first)) + ")";
145 } else {
146 m_logger->Debug(kLogXrdClHttp, "curl operation at offset %llu failed with status code %d%s", static_cast<long long unsigned>(m_op.first), errNum, m_err_msg.empty() ? "" : (", server message: " + m_err_msg).c_str());
147 }
148 auto status = new XrdCl::XRootDStatus(XrdCl::stError, errCode, errNum, custom_msg);
149 auto handle = m_handler;
150 m_handler = nullptr;
151 if (handle) handle->HandleResponse(status, nullptr);
152 else m_default_handler->HandleResponse(status, nullptr);
153}
154
155void
156CurlReadOp::DeliverResponse()
157{
158 if (m_handler == nullptr) {return;}
159 auto handle = m_handler;
160 auto status = new XrdCl::XRootDStatus();
161
162 auto chunk_info = new XrdCl::ChunkInfo(m_op.first + m_prefetch_object_offset, m_written, m_buffer);
163 m_prefetch_object_offset += m_written;
164 auto obj = new XrdCl::AnyObject();
165 obj->Set(chunk_info);
166
167 // Reset the internal buffers to avoid writes to locations we do not own
168 m_buffer = nullptr;
169 m_buffer_size = 0;
170
171 m_handler = nullptr;
172 // Note: As soon as this is invoked, another thread may continue and start to manipulate
173 // the CurlReadOp object. To avoid race conditions, all reads/writes to member data must
174 // be done *before* the callback is invoked.
175 handle->HandleResponse(status, obj);
176}
177
178void
180{
181 SetPaused(true);
182 if (m_handler == nullptr) {
183 m_logger->Warning(kLogXrdClHttp, "Get operation paused with no callback handler");
184 return;
185 }
186 DeliverResponse();
187}
188
189void
191{
192 SetDone(false);
193 if (m_handler == nullptr) {return;}
194 auto status = new XrdCl::XRootDStatus();
195 auto chunk_info = new XrdCl::ChunkInfo(m_op.first + m_prefetch_object_offset, m_written, m_buffer);
196 m_prefetch_object_offset += m_written;
197 auto obj = new XrdCl::AnyObject();
198 obj->Set(chunk_info);
199 auto handle = m_handler;
200 m_handler = nullptr;
201 handle->HandleResponse(status, obj);
202}
203
204void
206{
207 if (m_curl == nullptr) return;
208 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, nullptr);
209 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, nullptr);
210 curl_easy_setopt(m_curl.get(), CURLOPT_HTTPHEADER, nullptr);
211 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, nullptr);
212 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, nullptr);
213 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, nullptr);
214 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, nullptr);
216}
217
218size_t
219CurlReadOp::WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr)
220{
221 return static_cast<CurlReadOp*>(this_ptr)->Write(buffer, size * nitems);
222}
223
224size_t
225CurlReadOp::Write(char *buffer, size_t length)
226{
227 //m_logger->Debug(kLogXrdClHttp, "Received a write of size %ld with offset %lld; total received is %ld; remaining is %ld", static_cast<long>(length), static_cast<long long>(m_op.first), static_cast<long>(length + m_written), static_cast<long>(m_op.second - length - m_written));
229 return FailCallback(kXR_ServerError, "Server responded with a multipart byterange which is not supported");
230 }
231 if (m_written == 0 && (m_headers.GetOffset() != m_op.first)) {
232 return FailCallback(kXR_ServerError, "Server did not return content with correct offset");
233 }
234 // If the operation failed, do not copy the body of the response into the buffer; it is likely
235 // an error message and not what we want to provide to the consumer buffer.
236 if (m_headers.GetStatusCode() > 299) {
237 // Record error message; prevent the server from spamming overly-long responses as we
238 // buffer them in memory.
239 if (m_err_msg.size() < 4*1024) {
240 m_err_msg.append(buffer, length);
241 }
242 UpdateBytes(length);
243 return length;
244 }
245 // The write callback is "all or nothing". Either you accept the whole thing (buffering
246 // in m_prefetch_buffer any data that the client-provided buffer is too small to accept)
247 // or you return CURL_WRITEFUNC_PAUSE and the delivery will be retried the next time the
248 // handle is unpaused.
249 //
250 // If `m_buffer` is nullptr, then it indicates we are unpaused while there is no ongoing
251 // File::Read operation; this typically happens when the transfer timeout occurs. Simply
252 // re-pause the transfer to go through the libcurl state machine and trigger the failure.
253 if (!m_buffer || (m_buffer_size == m_written)) {
254 Pause();
255 return CURL_WRITEFUNC_PAUSE;
256 }
257 UpdateBytes(length);
258 auto output_remaining = m_buffer_size - m_written;
259 auto larger_than_result_buffer = length > output_remaining;
260 auto to_copy = larger_than_result_buffer ? output_remaining : length;
261 memcpy(m_buffer + m_written, buffer, to_copy);
262 m_written += to_copy;
263 // We don't have enough space in the buffer to write the response and this is a single-shot
264 // read request
265 if ((m_op.second <= m_buffer_size) && larger_than_result_buffer) {
266 return FailCallback(kXR_ServerError, "Server sent back more data than requested");
267 } else if (larger_than_result_buffer) {
268 auto input_remaining = length - output_remaining;
269 m_prefetch_buffer.append(buffer + to_copy, input_remaining);
270 m_prefetch_buffer_offset = 0;
271 }
272 return length;
273}
274
275void
277{
278 SetDone(false);
279 if (m_handler == nullptr) {return;}
280 auto status = new XrdCl::XRootDStatus();
281
282 std::vector<uint32_t> cksums;
283 size_t nbpages = m_written / XrdSys::PageSize;
284 if (m_written % XrdSys::PageSize) ++nbpages;
285 cksums.reserve(nbpages);
286
287 auto buffer = m_buffer;
288 size_t size = m_written;
289 for (size_t pg=0; pg<nbpages; ++pg)
290 {
291 auto pgsize = static_cast<size_t>(XrdSys::PageSize);
292 if (pgsize > size) pgsize = size;
293 cksums.push_back(XrdOucCRC::Calc32C(buffer, pgsize));
294 buffer += pgsize;
295 size -= pgsize;
296 }
297
298 auto page_info = new XrdCl::PageInfo(m_op.first, m_written, m_buffer, std::move(cksums));
299 auto obj = new XrdCl::AnyObject();
300 obj->Set(page_info);
301 auto handle = m_handler;
302 m_handler = nullptr;
303 handle->HandleResponse(status, obj);
304}
@ kXR_ServerError
void CURL
void SetDone(bool has_failed)
int FailCallback(XErrorCode ecode, const std::string &emsg)
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
virtual void ReleaseHandle()
void UpdateBytes(uint64_t bytes)
std::vector< std::pair< std::string, std::string > > m_headers_list
XrdCl::ResponseHandler * m_handler
CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout)
void SetPaused(bool paused)
virtual bool Setup(CURL *curl, CurlWorker &)
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
std::pair< uint64_t, uint64_t > m_op
bool Setup(CURL *curl, CurlWorker &) override
std::shared_ptr< XrdClHttp::HandlerQueue > m_continue_queue
bool ContinueHandle() override
CurlReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, struct timespec timeout, const std::pair< uint64_t, uint64_t > &op, char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
bool Continue(std::shared_ptr< CurlOperation > op, XrdCl::ResponseHandler *handler, char *buffer, size_t buffer_size)
void ReleaseHandle() override
bool IsMultipartByterange() const
Handle diagnostics.
Definition XrdClLog.hh:101
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
const uint64_t kLogXrdClHttp
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.
static const int PageSize
Describe a data chunk for vector read.