XRootD
Loading...
Searching...
No Matches
XrdClS3DownloadHandler.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3/* */
4/* This file is part of the XrdClS3 client plugin for XRootD. */
5/* */
6/* XRootD is free software: you can redistribute it and/or modify it under */
7/* the terms of the GNU Lesser General Public License as published by the */
8/* Free Software Foundation, either version 3 of the License, or (at your */
9/* option) any later version. */
10/* */
11/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14/* License for more details. */
15/* */
16/* The copyright holder's institutional names and contributor's names may not */
17/* be used to endorse or promote products derived from this software without */
18/* specific prior written permission of the institution or contributor. */
19/******************************************************************************/
20
22#include "XrdClS3Filesystem.hh"
23
26#include <XrdCl/XrdClFile.hh>
27
28#include <charconv>
29
30using namespace XrdClS3;
31
32namespace {
33
34class S3DownloadHandler : public XrdCl::ResponseHandler {
35public:
36 S3DownloadHandler(std::unique_ptr<XrdCl::File> file, XrdCl::ResponseHandler *handler, time_t timeout)
37 : m_expiry(time(NULL) + timeout), m_file(std::move(file)), m_handler(handler), m_buffer(new XrdCl::Buffer(kReadSize))
38 {
39 if (timeout == 0) {
41 XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
42 m_expiry += val;
43 }
44 }
45
46 virtual ~S3DownloadHandler() noexcept = default;
47
48 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override;
49
50private:
51 time_t m_expiry; // Expiration time for the download operation
52 std::unique_ptr<XrdCl::File> m_file; // File we are reading from
53 XrdCl::ResponseHandler *m_handler; // Handler to call with the final result buffer (or failure).
54 std::unique_ptr<XrdCl::Buffer> m_buffer; // Buffer to hold the data read from the file
55 static constexpr size_t kReadSize = 32 * 1024; // Size of each read operation (32 KB)
56
57 std::pair<time_t, bool> GetTimeout() const {
58 // Calculate the timeout based on the current time and the expiry time
59 time_t now = time(NULL);
60 if (now >= m_expiry) {
61 return {0, false}; // No time left, return 0 timeout
62 }
63 return {m_expiry - now, true};
64 }
65
66 class ReadHandler : public XrdCl::ResponseHandler {
67 public:
68 ReadHandler(std::unique_ptr<S3DownloadHandler> parent) : m_parent(std::move(parent)) {}
69 virtual ~ReadHandler() noexcept = default;
70
71 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override;
72 private:
73 std::unique_ptr<S3DownloadHandler> m_parent; // Pointer to the parent handler to access its members
74 };
75
76 class CloseHandler : public XrdCl::ResponseHandler {
77 public:
78 CloseHandler(std::unique_ptr<S3DownloadHandler> parent, std::unique_ptr<XrdCl::XRootDStatus> status) : m_parent(std::move(parent)), m_read_status(std::move(status)) {}
79 virtual ~CloseHandler() noexcept = default;
80
81 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override;
82
83 private:
84 std::unique_ptr<S3DownloadHandler> m_parent; // Pointer to the parent handler to access its members
85 std::unique_ptr<XrdCl::XRootDStatus> m_read_status; // Status from the read operation; if nullptr, the read was successful
86 };
87};
88
89void
90S3DownloadHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw)
91{
92 std::unique_ptr<S3DownloadHandler> self(this);
93 std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
94 std::unique_ptr<XrdCl::AnyObject> response(response_raw);
95
96 // If the open failed, we pass the status up the chain.
97 if (!status || !status->IsOK()) {
98 if (m_handler) m_handler->HandleResponse(status.release(), response.release());
99 return;
100 }
101 auto [timeout, ok] = GetTimeout();
102 if (!ok) {
103 // If we have no time left, we cannot proceed with the read.
104 if (m_handler) {
105 m_handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOperationExpired, 0, "Download operation timed out"), nullptr);
106 }
107 return;
108 }
109
110 // Open succeeded, so we can now read the file.
111 auto st = m_file->Read(0, S3DownloadHandler::kReadSize, m_buffer->GetBufferAtCursor(), new ReadHandler(std::move(self)), timeout);
112 if (!st.IsOK()) {
113 // If the read request failed, we close the file and return the error.
114 std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(self), std::unique_ptr<XrdCl::XRootDStatus>(new XrdCl::XRootDStatus(st))));
115 auto close_st = m_file->Close(closeHandler.get(), timeout);
116 if (close_st.IsOK()) {
117 closeHandler.release(); // The close handler now owns itself
118 } else {
119 if (m_handler) {
120 m_handler->HandleResponse(new XrdCl::XRootDStatus(close_st), nullptr);
121 }
122 }
123 return;
124 }
125}
126
127void
128S3DownloadHandler::ReadHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
129 std::unique_ptr<ReadHandler> self(this);
130 std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
131 std::unique_ptr<XrdCl::AnyObject> response(response_raw);
132
133 auto [timeout, ok] = m_parent->GetTimeout();
134 if (!ok) {
135 // If we have no time left, we cannot proceed with the read.
136 if (m_parent->m_handler) {
137 m_parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOperationExpired, 0, "Download operation timed out"), nullptr);
138 }
139 return;
140 }
141
142 if (!status || !status->IsOK()) {
143 auto parent = m_parent.get();
144 std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(m_parent), std::move(status)));
145 auto st = parent->m_file->Close(closeHandler.get(), timeout);
146 if (st.IsOK()) {
147 closeHandler.release();
148 } else if (parent->m_handler) {
149 parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(st), nullptr);
150 }
151 return;
152 }
153
154 XrdCl::ChunkInfo *chunkInfo = nullptr;
155 response->Get(chunkInfo);
156 if (!chunkInfo) {
157 // If we didn't get a chunk, we can close the file and return.
158 auto parent = m_parent.get();
159 std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(m_parent),
160 std::unique_ptr<XrdCl::XRootDStatus>(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInternal, 0, "No chunk info received"))));
161 auto st = parent->m_file->Close(closeHandler.get(), timeout);
162 if (st.IsOK()) {
163 closeHandler.release();
164 } else if (parent->m_handler) {
165 parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(st), nullptr);
166 }
167 return;
168 }
169
170 // If we got a chunk but the length is zero, that is the end of the file;
171 // we can close the file and return.
172 if (chunkInfo->GetLength() == 0) {
173 m_parent->m_buffer->ReAllocate(m_parent->m_buffer->GetCursor());
174 auto parent = m_parent.get();
175 std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(m_parent), nullptr));
176 auto st = parent->m_file->Close(closeHandler.get(), timeout);
177 if (st.IsOK()) {
178 closeHandler.release();
179 } else if (parent->m_handler) {
180 parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(st), nullptr);
181 }
182 return;
183 }
184
185 // Read was successful; read additional data if available.
186 m_parent->m_buffer->AdvanceCursor(chunkInfo->GetLength());
187 m_parent->m_buffer->ReAllocate(m_parent->m_buffer->GetCursor() + S3DownloadHandler::kReadSize);
188 auto st = m_parent->m_file->Read(m_parent->m_buffer->GetCursor(), kReadSize, m_parent->m_buffer->GetBufferAtCursor(), self.release(), timeout);
189 if (!st.IsOK()) {
190 // If the read request failed, close or delete the parent handler.
191 auto parent = m_parent.get();
192 std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(m_parent), nullptr));
193 auto close_st = parent->m_file->Close(closeHandler.get(), timeout);
194 if (close_st.IsOK()) {
195 closeHandler.release();
196 } else if (parent->m_handler) {
197 parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(close_st), nullptr);
198 }
199 }
200}
201
202void
203S3DownloadHandler::CloseHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
204 std::unique_ptr<CloseHandler> self(this);
205 std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
206 std::unique_ptr<XrdCl::AnyObject> response(response_raw);
207
208 // If there was a read error, then we report that to the handler and ignore the close status.
209 if (m_read_status) {
210 // If we had a read status, we pass it up the chain.
211 if (m_parent->m_handler) {
212 m_parent->m_handler->HandleResponse(m_read_status.release(), nullptr);
213 }
214 return;
215 }
216
217 if (!status || !status->IsOK()) {
218 if (m_parent->m_handler) {
219 m_parent->m_handler->HandleResponse(status.release(), nullptr);
220 }
221 return;
222 }
223
224 // If the close was successful, we can pass the buffer to the handler.
225 response.reset(new XrdCl::AnyObject());
226 response->Set(m_parent->m_buffer.release(), true); // Take ownership of the buffer
227 if (m_parent->m_handler) {
228 m_parent->m_handler->HandleResponse(status.release(), response.release());
229 }
230}
231
232} // namespace
233
234XrdCl::XRootDStatus
235XrdClS3::DownloadUrl(const std::string &url, XrdClHttp::HeaderCallout *header_callout, XrdCl::ResponseHandler *handler, time_t timeout)
236{
237 std::unique_ptr<XrdCl::File> http_file(new XrdCl::File());
238 // Hack - we need to set a few properties on the file object before the open occurs.
239 // However, the "real" (plugin) file object is not created until the open call.
240 // This forces the plugin object to be created, so we can set the properties and Open later.
241 auto status = http_file->Open(url, XrdCl::OpenFlags::Compress, XrdCl::Access::None, nullptr, time_t(0));
242 if (!status.IsOK()) {
243 return status;
244 }
245
246
247 if (header_callout) {
248 auto callout_loc = reinterpret_cast<long long>(header_callout);
249 size_t buf_size = 16;
250 char callout_buf[buf_size];
251 std::to_chars_result result = std::to_chars(callout_buf, callout_buf + buf_size - 1, callout_loc, 16);
252 if (result.ec == std::errc{}) {
253 std::string callout_str(callout_buf, result.ptr - callout_buf);
254 http_file->SetProperty("XrdClHttpHeaderCallout", callout_str);
255 }
256 }
257 http_file->SetProperty("XrdClHttpFullDownload", "true");
258
259 auto http_file_raw = http_file.get();
260 S3DownloadHandler *downloadHandler = new S3DownloadHandler(std::move(http_file), handler, timeout);
261
262 return http_file_raw->Open(url, XrdCl::OpenFlags::Read, XrdCl::Access::None, downloadHandler, timeout);
263}
static void parent()
XrdOucString File
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A file.
Definition XrdClFile.hh:52
Handle an async response.
XrdCl::XRootDStatus DownloadUrl(const std::string &url, XrdClHttp::HeaderCallout *header_callout, XrdCl::ResponseHandler *handler, time_t timeout)
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.
const int DefaultRequestTimeout
uint32_t GetLength() const
Get the data length.
@ Read
Open only for reading.