XRootD
Loading...
Searching...
No Matches
XrdClFileStateHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClURL.hh"
27#include "XrdCl/XrdClLog.hh"
28#include "XrdCl/XrdClStatus.hh"
35#include "XrdCl/XrdClMonitor.hh"
41#include "XrdCl/XrdClUtils.hh"
42
43#ifdef WITH_XRDEC
45#endif
46
47#include "XrdOuc/XrdOucCRC.hh"
49#include "XrdOuc/XrdOucUtils.hh"
50
54
55#include <sstream>
56#include <memory>
57#include <numeric>
58#include <sys/time.h>
59#include <uuid/uuid.h>
60#include <mutex>
61
62namespace
63{
64 //----------------------------------------------------------------------------
65 // Helper callback for handling PgRead responses
66 //----------------------------------------------------------------------------
67 class PgReadHandler : public XrdCl::ResponseHandler
68 {
69 friend class PgReadRetryHandler;
70
71 public:
72
73 //------------------------------------------------------------------------
74 // Constructor
75 //------------------------------------------------------------------------
76 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77 XrdCl::ResponseHandler *userHandler,
78 uint64_t orgOffset ) :
79 stateHandler( stateHandler ),
80 userHandler( userHandler ),
81 orgOffset( orgOffset ),
82 maincall( true ),
83 retrycnt( 0 ),
84 nbrepair( 0 )
85 {
86 }
87
88 //------------------------------------------------------------------------
89 // Handle the response
90 //------------------------------------------------------------------------
91 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
92 XrdCl::AnyObject *response,
93 XrdCl::HostList *hostList )
94 {
95 using namespace XrdCl;
96
97 std::unique_lock<std::mutex> lck( mtx );
98
99 if( !maincall )
100 {
101 //--------------------------------------------------------------------
102 // We are serving PgRead retry request
103 //--------------------------------------------------------------------
104 --retrycnt;
105 if( !status->IsOK() )
106 st.reset( status );
107 else
108 {
109 delete status; // by convention other args are null (see PgReadRetryHandler)
110 ++nbrepair; // update number of repaired pages
111 }
112
113 if( retrycnt == 0 )
114 {
115 //------------------------------------------------------------------
116 // All retries came back
117 //------------------------------------------------------------------
118 if( st->IsOK() )
119 {
120 PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121 pginf.SetNbRepair( nbrepair );
122 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
123 }
124 else
125 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
126 lck.unlock();
127 delete this;
128 }
129
130 return;
131 }
132
133 //----------------------------------------------------------------------
134 // We are serving main PgRead request
135 //----------------------------------------------------------------------
136 if( !status->IsOK() )
137 {
138 //--------------------------------------------------------------------
139 // The main PgRead request has failed
140 //--------------------------------------------------------------------
141 userHandler->HandleResponseWithHosts( status, response, hostList );
142 lck.unlock();
143 delete this;
144 return;
145 }
146
147 maincall = false;
148
149 //----------------------------------------------------------------------
150 // Do the integrity check
151 //----------------------------------------------------------------------
152 PageInfo *pginf = 0;
153 response->Get( pginf );
154
155 uint64_t pgoff = pginf->GetOffset();
156 uint32_t bytesRead = pginf->GetLength();
157 std::vector<uint32_t> &cksums = pginf->GetCksums();
158 char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
159 size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
160 uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
161 if( pgsize > bytesRead ) pgsize = bytesRead;
162
163 for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
164 {
165 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
166 if( crcval != cksums[pgnb] )
167 {
168 Log *log = DefaultEnv::GetLog();
169 log->Info( FileMsg, "[%p@%s] Received corrupted page, will retry page #%zu.",
170 (void*)this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
171
172 XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
173 if( !st.IsOK())
174 {
175 *status = st; // the reason for this failure
176 break;
177 }
178 ++retrycnt; // update the retry counter
179 }
180
181 bytesRead -= pgsize;
182 buffer += pgsize;
183 pgoff += pgsize;
184 pgsize = XrdSys::PageSize;
185 if( pgsize > bytesRead ) pgsize = bytesRead;
186 }
187
188
189 if( retrycnt == 0 )
190 {
191 //--------------------------------------------------------------------
192 // All went well!
193 //--------------------------------------------------------------------
194 userHandler->HandleResponseWithHosts( status, response, hostList );
195 lck.unlock();
196 delete this;
197 return;
198 }
199
200 //----------------------------------------------------------------------
201 // We have to wait for retries!
202 //----------------------------------------------------------------------
203 resp.reset( response );
204 hosts.reset( hostList );
205 st.reset( status );
206 }
207
208 void UpdateCksum( size_t pgnb, uint32_t crcval )
209 {
210 if( resp )
211 {
212 XrdCl::PageInfo *pginf = 0;
213 resp->Get( pginf );
214 pginf->GetCksums()[pgnb] = crcval;
215 }
216 }
217
218 private:
219
220 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221 XrdCl::ResponseHandler *userHandler;
222 uint64_t orgOffset;
223
224 std::unique_ptr<XrdCl::AnyObject> resp;
225 std::unique_ptr<XrdCl::HostList> hosts;
226 std::unique_ptr<XrdCl::XRootDStatus> st;
227
228 std::mutex mtx;
229 bool maincall;
230 size_t retrycnt;
231 size_t nbrepair;
232
233 };
234
235 //----------------------------------------------------------------------------
236 // Helper callback for handling PgRead retries
237 //----------------------------------------------------------------------------
238 class PgReadRetryHandler : public XrdCl::ResponseHandler
239 {
240 public:
241
242 PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
243 pgnb( pgnb )
244 {
245
246 }
247
248 //------------------------------------------------------------------------
249 // Handle the response
250 //------------------------------------------------------------------------
251 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252 XrdCl::AnyObject *response,
253 XrdCl::HostList *hostList )
254 {
255 using namespace XrdCl;
256
257 if( !status->IsOK() )
258 {
259 Log *log = DefaultEnv::GetLog();
260 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
261 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
263 delete this;
264 return;
265 }
266
267 XrdCl::PageInfo *pginf = 0;
268 response->Get( pginf );
269 if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
270 {
271 Log *log = DefaultEnv::GetLog();
272 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
273 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
274 // we retry a page at a time so the length cannot exceed 4KB
275 DeleteArgs( status, response, hostList );
276 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
277 delete this;
278 return;
279 }
280
281 uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
282 if( crcval != pginf->GetCksums().front() )
283 {
284 Log *log = DefaultEnv::GetLog();
285 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
286 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287 DeleteArgs( status, response, hostList );
288 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
289 delete this;
290 return;
291 }
292
293 Log *log = DefaultEnv::GetLog();
294 log->Info( FileMsg, "[%p@%s] Successfully recovered page #%zu.",
295 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
296
297 DeleteArgs( 0, response, hostList );
298 pgReadHandler->UpdateCksum( pgnb, crcval );
299 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
300 delete this;
301 }
302
303 private:
304
305 inline void DeleteArgs( XrdCl::XRootDStatus *status,
306 XrdCl::AnyObject *response,
307 XrdCl::HostList *hostList )
308 {
309 delete status;
310 delete response;
311 delete hostList;
312 }
313
314 PgReadHandler *pgReadHandler;
315 size_t pgnb;
316 };
317
318 //----------------------------------------------------------------------------
319 // Handle PgRead substitution with ordinary Read
320 //----------------------------------------------------------------------------
321 class PgReadSubstitutionHandler : public XrdCl::ResponseHandler
322 {
323 public:
324
325 //------------------------------------------------------------------------
326 // Constructor
327 //------------------------------------------------------------------------
328 PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329 XrdCl::ResponseHandler *userHandler ) :
330 stateHandler( stateHandler ),
331 userHandler( userHandler )
332 {
333 }
334
335 //------------------------------------------------------------------------
336 // Handle the response
337 //------------------------------------------------------------------------
338 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339 XrdCl::AnyObject *rdresp,
340 XrdCl::HostList *hostList )
341 {
342 if( !status->IsOK() )
343 {
344 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
345 delete this;
346 return;
347 }
348
349 using namespace XrdCl;
350
351 ChunkInfo *chunk = 0;
352 rdresp->Get( chunk );
353
354 std::vector<uint32_t> cksums;
355 if( stateHandler->pIsChannelEncrypted )
356 {
357 size_t nbpages = chunk->length / XrdSys::PageSize;
358 if( chunk->length % XrdSys::PageSize )
359 ++nbpages;
360 cksums.reserve( nbpages );
361
362 size_t size = chunk->length;
363 char *buffer = reinterpret_cast<char*>( chunk->buffer );
364
365 for( size_t pg = 0; pg < nbpages; ++pg )
366 {
367 size_t pgsize = XrdSys::PageSize;
368 if( pgsize > size ) pgsize = size;
369 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
370 cksums.push_back( crcval );
371 buffer += pgsize;
372 size -= pgsize;
373 }
374 }
375
376 PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
377 chunk->buffer, std::move( cksums ) );
378 delete rdresp;
379 AnyObject *response = new AnyObject();
380 response->Set( pages );
381 userHandler->HandleResponseWithHosts( status, response, hostList );
382
383 delete this;
384 }
385
386 private:
387
388 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
389 XrdCl::ResponseHandler *userHandler;
390 };
391
392 //----------------------------------------------------------------------------
393 // Object that does things to the FileStateHandler when kXR_open returns
394 // and then calls the user handler
395 //----------------------------------------------------------------------------
396 class OpenHandler: public XrdCl::ResponseHandler
397 {
398 public:
399 //------------------------------------------------------------------------
400 // Constructor
401 //------------------------------------------------------------------------
402 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
403 XrdCl::ResponseHandler *userHandler ):
404 pStateHandler( stateHandler ),
405 pUserHandler( userHandler )
406 {
407 }
408
409 //------------------------------------------------------------------------
410 // Handle the response
411 //------------------------------------------------------------------------
412 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
413 XrdCl::AnyObject *response,
414 XrdCl::HostList *hostList )
415 {
416 using namespace XrdCl;
417
418 //----------------------------------------------------------------------
419 // Extract the statistics info
420 //----------------------------------------------------------------------
421 OpenInfo *openInfo = 0;
422 if( status->IsOK() )
423 response->Get( openInfo );
424#ifdef WITH_XRDEC
425 else
426 //--------------------------------------------------------------------
427 // Handle EC redirect
428 //--------------------------------------------------------------------
429 if( status->code == errRedirect )
430 {
431 std::string ecurl = status->GetErrorMessage();
432 EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
433 if( ecHandler && pStateHandler->NeedFileTempl() )
434 {
435 delete status;
436 status = new XRootDStatus( stError, errNotSupported, 0,
437 "File template not supported with Ec" );
438 delete ecHandler;
439 ecHandler = 0;
440 }
441 else if( ecHandler )
442 {
443 pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
444 ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
445 return;
446 }
447 }
448#endif
449 //----------------------------------------------------------------------
450 // Notify the state handler and the client and say bye bye
451 //----------------------------------------------------------------------
452 pStateHandler->OnOpen( status, openInfo, hostList );
453 delete response;
454 if( pUserHandler )
455 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
456 else
457 {
458 delete status;
459 delete hostList;
460 }
461 delete this;
462 }
463
464 private:
465 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
466 XrdCl::ResponseHandler *pUserHandler;
467 };
468
469 //----------------------------------------------------------------------------
470 // Object that does things to the FileStateHandler when kXR_close returns
471 // and then calls the user handler
472 //----------------------------------------------------------------------------
473 class CloseHandler: public XrdCl::ResponseHandler
474 {
475 public:
476 //------------------------------------------------------------------------
477 // Constructor
478 //------------------------------------------------------------------------
479 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
480 XrdCl::ResponseHandler *userHandler,
481 XrdCl::Message *message ):
482 pStateHandler( stateHandler ),
483 pUserHandler( userHandler ),
484 pMessage( message )
485 {
486 }
487
488 //------------------------------------------------------------------------
490 //------------------------------------------------------------------------
491 virtual ~CloseHandler()
492 {
493 delete pMessage;
494 }
495
496 //------------------------------------------------------------------------
497 // Handle the response
498 //------------------------------------------------------------------------
499 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
500 XrdCl::AnyObject *response,
501 XrdCl::HostList *hostList )
502 {
503 pStateHandler->OnClose( status );
504 if( pUserHandler )
505 pUserHandler->HandleResponseWithHosts( status, response, hostList );
506 else
507 {
508 delete response;
509 delete status;
510 delete hostList;
511 }
512
513 delete this;
514 }
515
516 private:
517 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
518 XrdCl::ResponseHandler *pUserHandler;
519 XrdCl::Message *pMessage;
520 };
521
522 //----------------------------------------------------------------------------
523 // Stateful message handler
524 //----------------------------------------------------------------------------
525 class StatefulHandler: public XrdCl::ResponseHandler
526 {
527 public:
528 //------------------------------------------------------------------------
529 // Constructor
530 //------------------------------------------------------------------------
531 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
532 XrdCl::ResponseHandler *userHandler,
533 XrdCl::Message *message,
534 const XrdCl::MessageSendParams &sendParams ):
535 pStateHandler( stateHandler ),
536 pUserHandler( userHandler ),
537 pMessage( message ),
538 pSendParams( sendParams )
539 {
540 }
541
542 //------------------------------------------------------------------------
543 // Destructor
544 //------------------------------------------------------------------------
545 virtual ~StatefulHandler()
546 {
547 delete pMessage;
548 delete pSendParams.chunkList;
549 delete pSendParams.kbuff;
550 }
551
552 //------------------------------------------------------------------------
553 // Handle the response
554 //------------------------------------------------------------------------
555 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
556 XrdCl::AnyObject *response,
557 XrdCl::HostList *hostList )
558 {
559 using namespace XrdCl;
560 std::unique_ptr<AnyObject> responsePtr( response );
561 pSendParams.hostList = hostList;
562
563 //----------------------------------------------------------------------
564 // Houston we have a problem...
565 //----------------------------------------------------------------------
566 if( !status->IsOK() )
567 {
568 XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
569 return;
570 }
571
572 //----------------------------------------------------------------------
573 // We're clear
574 //----------------------------------------------------------------------
575 responsePtr.release();
576 XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
577 if( pUserHandler )
578 pUserHandler->HandleResponseWithHosts( status, response, hostList );
579 else
580 {
581 delete status,
582 delete response;
583 delete hostList;
584 }
585 delete this;
586 }
587
588 //------------------------------------------------------------------------
590 //------------------------------------------------------------------------
591 XrdCl::ResponseHandler *GetUserHandler()
592 {
593 return pUserHandler;
594 }
595
596 private:
597 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
598 XrdCl::ResponseHandler *pUserHandler;
599 XrdCl::Message *pMessage;
600 XrdCl::MessageSendParams pSendParams;
601 };
602
603 //----------------------------------------------------------------------------
604 // Release-buffer Handler
605 //----------------------------------------------------------------------------
606 class ReleaseBufferHandler: public XrdCl::ResponseHandler
607 {
608 public:
609
610 //------------------------------------------------------------------------
611 // Constructor
612 //------------------------------------------------------------------------
613 ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
614 buffer( std::move( buffer ) ),
615 handler( handler )
616 {
617 }
618
619 //------------------------------------------------------------------------
620 // Handle the response
621 //------------------------------------------------------------------------
622 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
623 XrdCl::AnyObject *response,
624 XrdCl::HostList *hostList )
625 {
626 if (handler)
627 handler->HandleResponseWithHosts( status, response, hostList );
628 }
629
630 //------------------------------------------------------------------------
631 // Get the underlying buffer
632 //------------------------------------------------------------------------
633 XrdCl::Buffer& GetBuffer()
634 {
635 return buffer;
636 }
637
638 private:
639 XrdCl::Buffer buffer;
640 XrdCl::ResponseHandler *handler;
641 };
642}
643
644namespace XrdCl
645{
646 //----------------------------------------------------------------------------
647 // Constructor
648 //----------------------------------------------------------------------------
650 pFileState( Closed ),
651 pStatInfo( 0 ),
652 pFileUrl( 0 ),
653 pDataServer( 0 ),
654 pLoadBalancer( 0 ),
655 pStateRedirect( 0 ),
656 pWrtRecoveryRedir( 0 ),
657 pFileHandle( 0 ),
658 pOpenMode( 0 ),
659 pOpenFlags( OpenFlags::None ),
660 pSessionId( 0 ),
661 pDoRecoverRead( true ),
662 pDoRecoverWrite( true ),
663 pFollowRedirects( true ),
664 pUseVirtRedirector( true ),
665 pIsChannelEncrypted( false ),
666 pAllowBundledClose( false ),
667 pPlugin( plugin )
668 {
669 pFileHandle = new uint8_t[4];
670 ResetMonitoringVars();
673 pLFileHandler = new LocalFileHandler();
674 }
675
676 //------------------------------------------------------------------------
681 //------------------------------------------------------------------------
682 FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
683 pFileState( Closed ),
684 pStatInfo( 0 ),
685 pFileUrl( 0 ),
686 pDataServer( 0 ),
687 pLoadBalancer( 0 ),
688 pStateRedirect( 0 ),
689 pWrtRecoveryRedir( 0 ),
690 pFileHandle( 0 ),
691 pOpenMode( 0 ),
692 pOpenFlags( OpenFlags::None ),
693 pSessionId( 0 ),
694 pDoRecoverRead( true ),
695 pDoRecoverWrite( true ),
696 pFollowRedirects( true ),
697 pUseVirtRedirector( useVirtRedirector ),
698 pAllowBundledClose( false ),
699 pPlugin( plugin )
700 {
701 pFileHandle = new uint8_t[4];
702 ResetMonitoringVars();
705 pLFileHandler = new LocalFileHandler();
706 }
707
708 //----------------------------------------------------------------------------
709 // Destructor
710 //----------------------------------------------------------------------------
712 {
713 //--------------------------------------------------------------------------
714 // This, in principle, should never ever happen. Except for the case
715 // when we're interfaced with ROOT that may call this desctructor from
716 // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
717 // has been finalized by the linker. So, if we don't have the log object
718 // at this point we just give up the hope.
719 //--------------------------------------------------------------------------
720 if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
721 DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
722
725
728
729 if( pFileState != Closed && DefaultEnv::GetLog() )
730 {
731 XRootDStatus st;
732 MonitorClose( &st );
733 ResetMonitoringVars();
734 }
735
736 // check if the logger is still there, this is only for root, as root might
737 // have unload us already so in this case we don't want to do anything
738 if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
739 {
741 registry.Release( *pFileUrl );
742 }
743
744 delete pStatInfo;
745 delete pFileUrl;
746 delete pDataServer;
747 delete pLoadBalancer;
748 delete [] pFileHandle;
749 delete pLFileHandler;
750 }
751
752 //----------------------------------------------------------------------------
753 // Open with file template
754 //----------------------------------------------------------------------------
756 std::shared_ptr<FileStateHandler> &self,
758 const std::string &url,
759 OpenFlags::Flags flags,
760 uint16_t mode,
761 ResponseHandler *handler,
762 time_t timeout )
763 {
764 if( !templ )
765 return XRootDStatus( stError, errInvalidArgs, 0, "Template file not available" );
766
767 FileStateHandlerTemplate *fht = dynamic_cast<FileStateHandlerTemplate*>( templ );
768 if( !fht )
769 return XRootDStatus( stError, errInvalidArgs, 0, "Template file invalid" );
770
771 self->pTemplateFileWp = fht->pTemplateFileWp;
772
773 return OpenImpl( self, url, flags, mode, handler, timeout );
774 }
775
776 //----------------------------------------------------------------------------
777 // Open the file pointed to by the given URL
778 //----------------------------------------------------------------------------
779 XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
780 const std::string &url,
781 OpenFlags::Flags flags,
782 uint16_t mode,
783 ResponseHandler *handler,
784 time_t timeout )
785 {
786 self->pTemplateFileWp.reset();
787 return OpenImpl( self, url, flags, mode, handler, timeout );
788 }
789
790 //----------------------------------------------------------------------------
791 // Most of Open implementation, used by Open and OpenUsingTemplate
792 //----------------------------------------------------------------------------
793 XRootDStatus FileStateHandler::OpenImpl( std::shared_ptr<FileStateHandler> &self,
794 const std::string &url,
795 OpenFlags::Flags flags,
796 uint16_t mode,
797 ResponseHandler *handler,
798 time_t timeout )
799 {
800 XrdSysMutexHelper scopedLock( self->pMutex );
801
802 //--------------------------------------------------------------------------
803 // Check if we can proceed
804 //--------------------------------------------------------------------------
805 if( self->pFileState == Error )
806 return self->pStatus;
807
808 if( self->pFileState == OpenInProgress )
810
811 if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
812 self->pFileState == Recovering )
814
815 self->pFileState = OpenInProgress;
816
817 //--------------------------------------------------------------------------
818 // Check if the parameters are valid
819 //--------------------------------------------------------------------------
820 Log *log = DefaultEnv::GetLog();
821
822 if( self->pFileUrl )
823 {
824 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
825 {
827 registry.Release( *self->pFileUrl );
828 }
829 delete self->pFileUrl;
830 self->pFileUrl = 0;
831 }
832
833 self->pFileUrl = new URL( url );
834
835 //--------------------------------------------------------------------------
836 // Add unique uuid to each open request so replays due to error/timeout
837 // recovery can be correctly handled.
838 //--------------------------------------------------------------------------
839 URL::ParamsMap cgi = self->pFileUrl->GetParams();
840 uuid_t uuid;
841 char requuid[37]= {0};
842 uuid_generate( uuid );
843 uuid_unparse( uuid, requuid );
844 cgi["xrdcl.requuid"] = requuid;
845 self->pFileUrl->SetParams( cgi );
846
847 if( !self->pFileUrl->IsValid() )
848 {
849 log->Error( FileMsg, "[%p@%s] Trying to open invalid url: %s",
850 (void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
851 self->pStatus = XRootDStatus( stError, errInvalidArgs );
852 self->pFileState = Closed;
853 return self->pStatus;
854 }
855
856 //--------------------------------------------------------------------------
857 // Check if the recovery procedures should be enabled
858 //--------------------------------------------------------------------------
859 const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
860 URL::ParamsMap::const_iterator it;
861 it = urlParams.find( "xrdcl.recover-reads" );
862 if( (it != urlParams.end() && it->second == "false") ||
863 !self->pDoRecoverRead )
864 {
865 self->pDoRecoverRead = false;
866 log->Debug( FileMsg, "[%p@%s] Read recovery procedures are disabled",
867 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
868 }
869
870 it = urlParams.find( "xrdcl.recover-writes" );
871 if( (it != urlParams.end() && it->second == "false") ||
872 !self->pDoRecoverWrite )
873 {
874 self->pDoRecoverWrite = false;
875 log->Debug( FileMsg, "[%p@%s] Write recovery procedures are disabled",
876 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
877 }
878
879 //--------------------------------------------------------------------------
880 // Open the file
881 //--------------------------------------------------------------------------
882 log->Debug( FileMsg, "[%p@%s] Sending an open command", (void*)self.get(),
883 self->pFileUrl->GetObfuscatedURL().c_str() );
884
885 self->pOpenMode = mode;
886 self->pOpenFlags = flags;
887 OpenHandler *openHandler = new OpenHandler( self, handler );
888
889 Message *msg;
890 ClientOpenRequest *req;
891 std::string path = self->pFileUrl->GetPathWithFilteredParams();
892 MessageUtils::CreateRequest( msg, req, path.length() );
893
894 req->requestid = kXR_open;
895 req->mode = mode;
896 req->options = (flags&0xffff) | kXR_async | kXR_retstat;
897 req->dlen = path.length();
898 URL sendUrl;
899 XRootDStatus st = FillFhTempl( self, *self->pFileUrl, msg, sendUrl );
900 if( !st.IsOK() )
901 {
902 delete openHandler;
903 self->pStatus = st;
904 self->pFileState = Closed;
905 return st;
906 }
907 msg->Append( path.c_str(), path.length(), 24 );
908
910 MessageSendParams params; params.timeout = timeout;
911 params.followRedirects = self->pFollowRedirects;
913
914 st = self->IssueRequest( sendUrl, msg, openHandler, params );
915
916 if( !st.IsOK() )
917 {
918 delete openHandler;
919 self->pStatus = st;
920 self->pFileState = Closed;
921 return st;
922 }
923 return st;
924 }
925
926 //----------------------------------------------------------------------------
927 // Close the file object
928 //----------------------------------------------------------------------------
929 XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
930 ResponseHandler *handler,
931 time_t timeout )
932 {
933 XrdSysMutexHelper scopedLock( self->pMutex );
934
935 //--------------------------------------------------------------------------
936 // Check if we can proceed
937 //--------------------------------------------------------------------------
938 if( self->pFileState == Error )
939 return self->pStatus;
940
941 if( self->pFileState == CloseInProgress )
943
944 if( self->pFileState == Closed )
946
947 if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
949
950 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
952
953 self->pFileState = CloseInProgress;
954
955 Log *log = DefaultEnv::GetLog();
956 log->Debug( FileMsg, "[%p@%s] Sending a close command for handle %#x to %s",
957 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
958 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
959
960 //--------------------------------------------------------------------------
961 // Close the file
962 //--------------------------------------------------------------------------
963 Message *msg;
965 MessageUtils::CreateRequest( msg, req );
966
967 req->requestid = kXR_close;
968 memcpy( req->fhandle, self->pFileHandle, 4 );
969
971 msg->SetSessionId( self->pSessionId );
972 CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
973 MessageSendParams params;
974 params.timeout = timeout;
975 params.followRedirects = false;
976 params.stateful = true;
978
979 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
980
981 if( !st.IsOK() )
982 {
983 // an invalid-session error means the connection to the server has been
984 // closed, which in turn means that the server closed the file already
987 st.code == errPollerError || st.code == errSocketError )
988 {
989 self->pFileState = Closed;
990 ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
991 nullptr, nullptr );
993 return XRootDStatus();
994 }
995
996 delete closeHandler;
997 self->pStatus = st;
998 self->pFileState = Error;
999 return st;
1000 }
1001 return st;
1002 }
1003
1004 //----------------------------------------------------------------------------
1005 // Stat the file
1006 //----------------------------------------------------------------------------
1007 XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
1008 bool force,
1009 ResponseHandler *handler,
1010 time_t timeout )
1011 {
1012 XrdSysMutexHelper scopedLock( self->pMutex );
1013
1014 if( self->pFileState == Error ) return self->pStatus;
1015
1016 if( self->pFileState != Opened && self->pFileState != Recovering )
1018
1019 //--------------------------------------------------------------------------
1020 // Return the cached info
1021 //--------------------------------------------------------------------------
1022 if( !force )
1023 {
1024 AnyObject *obj = new AnyObject();
1025 obj->Set( new StatInfo( *self->pStatInfo ) );
1026 if (handler)
1027 handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
1028 return XRootDStatus();
1029 }
1030
1031 Log *log = DefaultEnv::GetLog();
1032 log->Debug( FileMsg, "[%p@%s] Sending a stat command for handle %#x to %s",
1033 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1034 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1035
1036 //--------------------------------------------------------------------------
1037 // Issue a new stat request
1038 // stating a file handle doesn't work (fixed in 3.2.0) so we need to
1039 // stat the pat
1040 //--------------------------------------------------------------------------
1041 Message *msg;
1042 ClientStatRequest *req;
1043 std::string path = self->pFileUrl->GetPath();
1044 MessageUtils::CreateRequest( msg, req );
1045
1046 req->requestid = kXR_stat;
1047 memcpy( req->fhandle, self->pFileHandle, 4 );
1048
1049 MessageSendParams params;
1050 params.timeout = timeout;
1051 params.followRedirects = false;
1052 params.stateful = true;
1054
1056 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1057
1058 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1059 }
1060
1061 //----------------------------------------------------------------------------
1062 // Preread scattered data tracts in one operation - async
1063 //----------------------------------------------------------------------------
1064 XRootDStatus FileStateHandler::PreRead( std::shared_ptr<FileStateHandler> &self,
1065 const TractList &tracts,
1066 ResponseHandler *handler,
1067 time_t timeout )
1068 {
1069 //--------------------------------------------------------------------------
1070 // Sanity check
1071 //--------------------------------------------------------------------------
1072 XrdSysMutexHelper scopedLock( self->pMutex );
1073
1074 if( self->pFileState == Error ) return self->pStatus;
1075
1076 if( self->pFileState != Opened && self->pFileState != Recovering )
1078
1079 Log *log = DefaultEnv::GetLog();
1080 log->Debug( FileMsg, "[%p@%s] Sending an read+preread command for handle %#x to %s",
1081 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1082 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1083
1084 //--------------------------------------------------------------------------
1085 // Build the message
1086 //--------------------------------------------------------------------------
1087 Message *msg;
1088 ClientReadRequest *req;
1089 MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*tracts.size() + 8 );
1090
1091 req->requestid = kXR_read;
1092 req->offset = 0;
1093 req->rlen = 0;
1094 memcpy( req->fhandle, self->pFileHandle, 4 );
1095 req->dlen = sizeof(readahead_list)*tracts.size() + 8;
1096
1097 static char dummyBuff[8];
1098 ChunkList *list = new ChunkList();
1099 list->push_back( ChunkInfo( 0, 0, dummyBuff ) );
1100
1101 //--------------------------------------------------------------------------
1102 // Copy the tract info
1103 //--------------------------------------------------------------------------
1104 readahead_list *dataTract = (readahead_list*)msg->GetBuffer( 24 + 8 );
1105 for( size_t i = 0; i < tracts.size(); ++i )
1106 {
1107 dataTract[i].rlen = tracts[i].length;
1108 dataTract[i].offset = tracts[i].offset;
1109 memcpy( dataTract[i].fhandle, req->fhandle, 4 );
1110 }
1111
1112 //--------------------------------------------------------------------------
1113 // Send the message
1114 //--------------------------------------------------------------------------
1115 MessageSendParams params;
1116 params.timeout = timeout;
1117 params.followRedirects = false;
1118 params.stateful = true;
1119 params.chunkList = list;
1121
1123 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1124
1125 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1126 }
1127
1128 //----------------------------------------------------------------------------
1129 // Read a data chunk at a given offset - sync
1130 //----------------------------------------------------------------------------
1131 XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1132 uint64_t offset,
1133 uint32_t size,
1134 void *buffer,
1135 ResponseHandler *handler,
1136 time_t timeout )
1137 {
1138 XrdSysMutexHelper scopedLock( self->pMutex );
1139
1140 if( self->pFileState == Error ) return self->pStatus;
1141
1142 if( self->pFileState != Opened && self->pFileState != Recovering )
1144
1145 Log *log = DefaultEnv::GetLog();
1146 log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1147 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1148 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1149
1150 Message *msg;
1151 ClientReadRequest *req;
1152 MessageUtils::CreateRequest( msg, req );
1153
1154 req->requestid = kXR_read;
1155 req->offset = offset;
1156 req->rlen = size;
1157 memcpy( req->fhandle, self->pFileHandle, 4 );
1158
1159 ChunkList *list = new ChunkList();
1160 list->push_back( ChunkInfo( offset, size, buffer ) );
1161
1163 MessageSendParams params;
1164 params.timeout = timeout;
1165 params.followRedirects = false;
1166 params.stateful = true;
1167 params.chunkList = list;
1169 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1170
1171 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1172 }
1173
1174 //------------------------------------------------------------------------
1175 // Read data pages at a given offset
1176 //------------------------------------------------------------------------
1177 XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1178 uint64_t offset,
1179 uint32_t size,
1180 void *buffer,
1181 ResponseHandler *handler,
1182 time_t timeout )
1183 {
1184 int issupported = true;
1185 AnyObject obj;
1187 int protver = 0;
1188 XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1189 if( st1.IsOK() && st2.IsOK() )
1190 {
1191 int *ptr = 0;
1192 obj.Get( ptr );
1193 issupported = ( ptr && (*ptr & kXR_suppgrw) ) && ( protver >= kXR_PROTPGRWVERSION );
1194 delete ptr;
1195 }
1196 else
1197 issupported = false;
1198
1199 if( !issupported )
1200 {
1201 DefaultEnv::GetLog()->Debug( FileMsg, "[%p@%s] PgRead not supported; substituting with Read.",
1202 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1203 ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1204 auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1205 if( !st.IsOK() ) delete substitHandler;
1206 return st;
1207 }
1208
1209 ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1210 auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1211 if( !st.IsOK() ) delete pgHandler;
1212 return st;
1213 }
1214
1215 XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1216 uint64_t offset,
1217 uint32_t size,
1218 size_t pgnb,
1219 void *buffer,
1220 PgReadHandler *handler,
1221 time_t timeout )
1222 {
1223 if( size > (uint32_t)XrdSys::PageSize )
1224 return XRootDStatus( stError, errInvalidArgs, EINVAL,
1225 "PgRead retry size exceeded 4KB." );
1226
1227 ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1228 XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1229 if( !st.IsOK() ) delete retryHandler;
1230 return st;
1231 }
1232
1233 XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1234 uint64_t offset,
1235 uint32_t size,
1236 void *buffer,
1237 uint16_t flags,
1238 ResponseHandler *handler,
1239 time_t timeout )
1240 {
1241 XrdSysMutexHelper scopedLock( self->pMutex );
1242
1243 if( self->pFileState == Error ) return self->pStatus;
1244
1245 if( self->pFileState != Opened && self->pFileState != Recovering )
1247
1248 Log *log = DefaultEnv::GetLog();
1249 log->Debug( FileMsg, "[%p@%s] Sending a pgread command for handle %#x to %s",
1250 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1251 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1252
1253 Message *msg;
1255 MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1256
1257 req->requestid = kXR_pgread;
1258 req->offset = offset;
1259 req->rlen = size;
1260 memcpy( req->fhandle, self->pFileHandle, 4 );
1261
1262 //--------------------------------------------------------------------------
1263 // Now adjust the message size so it can hold PgRead arguments
1264 //--------------------------------------------------------------------------
1265 req->dlen = sizeof( ClientPgReadReqArgs );
1266 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1267 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1268 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1269 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1270 args->reqflags = flags;
1271
1272 ChunkList *list = new ChunkList();
1273 list->push_back( ChunkInfo( offset, size, buffer ) );
1274
1276 MessageSendParams params;
1277 params.timeout = timeout;
1278 params.followRedirects = false;
1279 params.stateful = true;
1280 params.chunkList = list;
1282 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1283
1284 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1285 }
1286
1287 //----------------------------------------------------------------------------
1288 // Write a data chunk at a given offset - async
1289 //----------------------------------------------------------------------------
1290 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1291 uint64_t offset,
1292 uint32_t size,
1293 const void *buffer,
1294 ResponseHandler *handler,
1295 time_t timeout )
1296 {
1297 XrdSysMutexHelper scopedLock( self->pMutex );
1298
1299 if( self->pFileState == Error ) return self->pStatus;
1300
1301 if( self->pFileState != Opened && self->pFileState != Recovering )
1303
1304 Log *log = DefaultEnv::GetLog();
1305 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1306 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1307 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1308
1309 Message *msg;
1310 ClientWriteRequest *req;
1311 MessageUtils::CreateRequest( msg, req );
1312
1313 req->requestid = kXR_write;
1314 req->offset = offset;
1315 req->dlen = size;
1316 memcpy( req->fhandle, self->pFileHandle, 4 );
1317
1318 ChunkList *list = new ChunkList();
1319 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1320
1321 MessageSendParams params;
1322 params.timeout = timeout;
1323 params.followRedirects = false;
1324 params.stateful = true;
1325 params.chunkList = list;
1326
1328
1330 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1331
1332 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1333 }
1334
1335 //----------------------------------------------------------------------------
1336 // Write a data chunk at a given offset
1337 //----------------------------------------------------------------------------
1338 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1339 uint64_t offset,
1340 Buffer &&buffer,
1341 ResponseHandler *handler,
1342 time_t timeout )
1343 {
1344 //--------------------------------------------------------------------------
1345 // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1346 // so fall back to normal write
1347 //--------------------------------------------------------------------------
1348 if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1349 {
1350 Log *log = DefaultEnv::GetLog();
1351 log->Info( FileMsg, "[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1352 "cannot convert it to kernel space buffer.", (void*)self.get(),
1353 self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1354
1355 void *buff = buffer.GetBuffer();
1356 uint32_t size = buffer.GetSize();
1357 ReleaseBufferHandler *wrtHandler =
1358 new ReleaseBufferHandler( std::move( buffer ), handler );
1359 XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1360 if( !st.IsOK() )
1361 {
1362 buffer = std::move( wrtHandler->GetBuffer() );
1363 delete wrtHandler;
1364 }
1365 return st;
1366 }
1367
1368 //--------------------------------------------------------------------------
1369 // Transfer the data from user space to kernel space
1370 //--------------------------------------------------------------------------
1371 uint32_t length = buffer.GetSize();
1372 char *ubuff = buffer.Release();
1373
1374 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1375 ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1376 if( ret < 0 )
1378
1379 //--------------------------------------------------------------------------
1380 // Now create a write request and enqueue it
1381 //--------------------------------------------------------------------------
1382 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1383 }
1384
1385 //----------------------------------------------------------------------------
1386 // Write a data from a given file descriptor at a given offset - async
1387 //----------------------------------------------------------------------------
1388 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1389 uint64_t offset,
1390 uint32_t size,
1391 Optional<uint64_t> fdoff,
1392 int fd,
1393 ResponseHandler *handler,
1394 time_t timeout )
1395 {
1396 //--------------------------------------------------------------------------
1397 // Read the data from the file descriptor into a kernel buffer
1398 //--------------------------------------------------------------------------
1399 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1400 ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1401 XrdSys::Read( fd, *kbuff, size );
1402 if( ret < 0 )
1404
1405 //--------------------------------------------------------------------------
1406 // Now create a write request and enqueue it
1407 //--------------------------------------------------------------------------
1408 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1409 }
1410
1411 //----------------------------------------------------------------------------
1412 // Write number of pages at a given offset - async
1413 //----------------------------------------------------------------------------
1414 XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1415 uint64_t offset,
1416 uint32_t size,
1417 const void *buffer,
1418 std::vector<uint32_t> &cksums,
1419 ResponseHandler *handler,
1420 time_t timeout )
1421 {
1422 //--------------------------------------------------------------------------
1423 // Resolve timeout value
1424 //--------------------------------------------------------------------------
1425 if( timeout == 0 )
1426 {
1427 int val = DefaultRequestTimeout;
1428 XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1429 timeout = val;
1430 }
1431
1432 //--------------------------------------------------------------------------
1433 // Validate the digest vector size
1434 //--------------------------------------------------------------------------
1435 if( cksums.empty() )
1436 {
1437 const char *data = static_cast<const char*>( buffer );
1438 XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1439 }
1440 else
1441 {
1442 size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1443 if( crc32cCnt != cksums.size() )
1444 return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1445 }
1446
1447 //--------------------------------------------------------------------------
1448 // Create a context for PgWrite operation
1449 //--------------------------------------------------------------------------
1450 struct pgwrt_t
1451 {
1452 pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1453 {
1454 }
1455
1456 ~pgwrt_t()
1457 {
1458 if( handler )
1459 {
1460 // if all retries were successful no error status was set
1461 if( !status ) status = new XRootDStatus();
1462 handler->HandleResponse( status, nullptr );
1463 }
1464 }
1465
1466 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1467 {
1468 if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1469 return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1470 }
1471
1472 inline void SetStatus( XRootDStatus* s )
1473 {
1474 if( !status ) status = s;
1475 else delete s;
1476 }
1477
1478 ResponseHandler *handler;
1479 XRootDStatus *status;
1480 };
1481 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1482
1483 int fLen, lLen;
1484 XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1485 uint32_t fstpglen = fLen;
1486
1487 time_t start = ::time( nullptr );
1488 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1489 {
1490 std::unique_ptr<AnyObject> scoped( r );
1491 // if the request failed simply pass the status to the
1492 // user handler
1493 if( !s->IsOK() )
1494 {
1495 pgwrt->SetStatus( s );
1496 return; // pgwrt destructor will call the handler
1497 }
1498 // also if the request was sucessful and there were no
1499 // corrupted pages pass the status to the user handler
1500 RetryInfo *inf = nullptr;
1501 r->Get( inf );
1502 if( !inf->NeedRetry() )
1503 {
1504 pgwrt->SetStatus( s );
1505 return; // pgwrt destructor will call the handler
1506 }
1507 delete s;
1508 // first adjust the timeout value
1509 time_t elapsed = ::time( nullptr ) - start;
1510 if( elapsed >= timeout )
1511 {
1512 pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1513 return; // pgwrt destructor will call the handler
1514 }
1515 else timeout -= elapsed;
1516 // retransmit the corrupted pages
1517 for( size_t i = 0; i < inf->Size(); ++i )
1518 {
1519 auto tpl = inf->At( i );
1520 uint64_t pgoff = std::get<0>( tpl );
1521 uint32_t pglen = std::get<1>( tpl );
1522 const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1523 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1524 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1525 {
1526 std::unique_ptr<AnyObject> scoped( r );
1527 // if we failed simply set the status
1528 if( !s->IsOK() )
1529 {
1530 pgwrt->SetStatus( s );
1531 return; // the destructor will call the handler
1532 }
1533 delete s;
1534 // otherwise check if the data were not corrupted again
1535 RetryInfo *inf = nullptr;
1536 r->Get( inf );
1537 if( inf->NeedRetry() ) // so we failed in the end
1538 {
1539 DefaultEnv::GetLog()->Warning( FileMsg, "[%p@%s] Failed retransmitting corrupted "
1540 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1541 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1542 pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1543 "Failed to retransmit corrupted page" ) );
1544 }
1545 else
1546 DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Succesfuly retransmitted corrupted "
1547 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1548 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1549 } );
1550 auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1551 if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1552 DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Retransmitting corrupted page: "
1553 "pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1554 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1555 }
1556 } );
1557
1558 auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1559 if( !st.IsOK() )
1560 {
1561 pgwrt->handler = nullptr;
1562 delete h;
1563 }
1564 return st;
1565 }
1566
1567 //------------------------------------------------------------------------
1568 // Write number of pages at a given offset - async
1569 //------------------------------------------------------------------------
1570 XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1571 uint64_t offset,
1572 uint32_t size,
1573 const void *buffer,
1574 uint32_t digest,
1575 ResponseHandler *handler,
1576 time_t timeout )
1577 {
1578 std::vector<uint32_t> cksums{ digest };
1579 return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1580 }
1581
1582 //------------------------------------------------------------------------
1583 // Write number of pages at a given offset - async
1584 //------------------------------------------------------------------------
1585 XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1586 uint64_t offset,
1587 uint32_t size,
1588 const void *buffer,
1589 std::vector<uint32_t> &cksums,
1590 kXR_char flags,
1591 ResponseHandler *handler,
1592 time_t timeout )
1593 {
1594 XrdSysMutexHelper scopedLock( self->pMutex );
1595
1596 if( self->pFileState == Error ) return self->pStatus;
1597
1598 if( self->pFileState != Opened && self->pFileState != Recovering )
1600
1601 Log *log = DefaultEnv::GetLog();
1602 log->Debug( FileMsg, "[%p@%s] Sending a pgwrite command for handle %#x to %s",
1603 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1604 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1605
1606 //--------------------------------------------------------------------------
1607 // Create the message
1608 //--------------------------------------------------------------------------
1609 Message *msg;
1611 MessageUtils::CreateRequest( msg, req );
1612
1613 req->requestid = kXR_pgwrite;
1614 req->offset = offset;
1615 req->dlen = size + cksums.size() * sizeof( uint32_t );
1616 req->reqflags = flags;
1617 memcpy( req->fhandle, self->pFileHandle, 4 );
1618
1619 ChunkList *list = new ChunkList();
1620 list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1621
1622 MessageSendParams params;
1623 params.timeout = timeout;
1624 params.followRedirects = false;
1625 params.stateful = true;
1626 params.chunkList = list;
1627 params.crc32cDigests.swap( cksums );
1628
1630
1632 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1633
1634 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1635 }
1636
1637 //----------------------------------------------------------------------------
1638 // Commit all pending disk writes - async
1639 //----------------------------------------------------------------------------
1640 XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1641 ResponseHandler *handler,
1642 time_t timeout )
1643 {
1644 XrdSysMutexHelper scopedLock( self->pMutex );
1645
1646 if( self->pFileState == Error ) return self->pStatus;
1647
1648 if( self->pFileState != Opened && self->pFileState != Recovering )
1650
1651 Log *log = DefaultEnv::GetLog();
1652 log->Debug( FileMsg, "[%p@%s] Sending a sync command for handle %#x to %s",
1653 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1654 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1655
1656 Message *msg;
1657 ClientSyncRequest *req;
1658 MessageUtils::CreateRequest( msg, req );
1659
1660 req->requestid = kXR_sync;
1661 memcpy( req->fhandle, self->pFileHandle, 4 );
1662
1663 MessageSendParams params;
1664 params.timeout = timeout;
1665 params.followRedirects = false;
1666 params.stateful = true;
1668
1670 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1671
1672 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1673 }
1674
1675 //----------------------------------------------------------------------------
1676 // Truncate the file to a particular size - async
1677 //----------------------------------------------------------------------------
1678 XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1679 uint64_t size,
1680 ResponseHandler *handler,
1681 time_t timeout )
1682 {
1683 XrdSysMutexHelper scopedLock( self->pMutex );
1684
1685 if( self->pFileState == Error ) return self->pStatus;
1686
1687 if( self->pFileState != Opened && self->pFileState != Recovering )
1689
1690 Log *log = DefaultEnv::GetLog();
1691 log->Debug( FileMsg, "[%p@%s] Sending a truncate command for handle %#x to %s",
1692 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1693 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1694
1695 Message *msg;
1697 MessageUtils::CreateRequest( msg, req );
1698
1699 req->requestid = kXR_truncate;
1700 memcpy( req->fhandle, self->pFileHandle, 4 );
1701 req->offset = size;
1702
1703 MessageSendParams params;
1704 params.timeout = timeout;
1705 params.followRedirects = false;
1706 params.stateful = true;
1708
1710 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1711
1712 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1713 }
1714
1715 //----------------------------------------------------------------------------
1716 // Read scattered data chunks in one operation - async
1717 //----------------------------------------------------------------------------
1718 XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1719 const ChunkList &chunks,
1720 void *buffer,
1721 ResponseHandler *handler,
1722 time_t timeout )
1723 {
1724 //--------------------------------------------------------------------------
1725 // Sanity check
1726 //--------------------------------------------------------------------------
1727 XrdSysMutexHelper scopedLock( self->pMutex );
1728
1729 if( self->pFileState == Error ) return self->pStatus;
1730
1731 if( self->pFileState != Opened && self->pFileState != Recovering )
1733
1734 Log *log = DefaultEnv::GetLog();
1735 log->Debug( FileMsg, "[%p@%s] Sending a vector read command for handle %#x to %s",
1736 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1737 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1738
1739 //--------------------------------------------------------------------------
1740 // Build the message
1741 //--------------------------------------------------------------------------
1742 Message *msg;
1743 ClientReadVRequest *req;
1744 MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1745
1746 req->requestid = kXR_readv;
1747 req->dlen = sizeof(readahead_list)*chunks.size();
1748
1749 ChunkList *list = new ChunkList();
1750 char *cursor = (char*)buffer;
1751
1752 //--------------------------------------------------------------------------
1753 // Copy the chunk info
1754 //--------------------------------------------------------------------------
1755 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1756 for( size_t i = 0; i < chunks.size(); ++i )
1757 {
1758 dataChunk[i].rlen = chunks[i].length;
1759 dataChunk[i].offset = chunks[i].offset;
1760 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1761
1762 void *chunkBuffer;
1763 if( cursor )
1764 {
1765 chunkBuffer = cursor;
1766 cursor += chunks[i].length;
1767 }
1768 else
1769 chunkBuffer = chunks[i].buffer;
1770
1771 list->push_back( ChunkInfo( chunks[i].offset,
1772 chunks[i].length,
1773 chunkBuffer ) );
1774 }
1775
1776 //--------------------------------------------------------------------------
1777 // Send the message
1778 //--------------------------------------------------------------------------
1779 MessageSendParams params;
1780 params.timeout = timeout;
1781 params.followRedirects = false;
1782 params.stateful = true;
1783 params.chunkList = list;
1785
1787 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1788
1789 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1790 }
1791
1792 //------------------------------------------------------------------------
1793 // Write scattered data chunks in one operation - async
1794 //------------------------------------------------------------------------
1795 XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1796 const ChunkList &chunks,
1797 ResponseHandler *handler,
1798 time_t timeout )
1799 {
1800 //--------------------------------------------------------------------------
1801 // Sanity check
1802 //--------------------------------------------------------------------------
1803 XrdSysMutexHelper scopedLock( self->pMutex );
1804
1805 if( self->pFileState == Error ) return self->pStatus;
1806
1807 if( self->pFileState != Opened && self->pFileState != Recovering )
1809
1810 Log *log = DefaultEnv::GetLog();
1811 log->Debug( FileMsg, "[%p@%s] Sending a vector write command for handle %#x to %s",
1812 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1813 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1814
1815 //--------------------------------------------------------------------------
1816 // Determine the size of the payload
1817 //--------------------------------------------------------------------------
1818
1819 // the size of write vector
1820 uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1821
1822 //--------------------------------------------------------------------------
1823 // Build the message
1824 //--------------------------------------------------------------------------
1825 Message *msg;
1827 MessageUtils::CreateRequest( msg, req, payloadSize );
1828
1829 req->requestid = kXR_writev;
1830 req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1831
1832 ChunkList *list = new ChunkList();
1833
1834 //--------------------------------------------------------------------------
1835 // Copy the chunk info
1836 //--------------------------------------------------------------------------
1837 XrdProto::write_list *writeList =
1838 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1839
1840
1841
1842 for( size_t i = 0; i < chunks.size(); ++i )
1843 {
1844 writeList[i].wlen = chunks[i].length;
1845 writeList[i].offset = chunks[i].offset;
1846 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1847
1848 list->push_back( ChunkInfo( chunks[i].offset,
1849 chunks[i].length,
1850 chunks[i].buffer ) );
1851 }
1852
1853 //--------------------------------------------------------------------------
1854 // Send the message
1855 //--------------------------------------------------------------------------
1856 MessageSendParams params;
1857 params.timeout = timeout;
1858 params.followRedirects = false;
1859 params.stateful = true;
1860 params.chunkList = list;
1862
1864 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1865
1866 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1867 }
1868
1869 //------------------------------------------------------------------------
1870 // Write scattered buffers in one operation - async
1871 //------------------------------------------------------------------------
1872 XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1873 uint64_t offset,
1874 const struct iovec *iov,
1875 int iovcnt,
1876 ResponseHandler *handler,
1877 time_t timeout )
1878 {
1879 XrdSysMutexHelper scopedLock( self->pMutex );
1880
1881 if( self->pFileState == Error ) return self->pStatus;
1882
1883 if( self->pFileState != Opened && self->pFileState != Recovering )
1885
1886 Log *log = DefaultEnv::GetLog();
1887 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1888 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1889 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1890
1891 Message *msg;
1892 ClientWriteRequest *req;
1893 MessageUtils::CreateRequest( msg, req );
1894
1895 ChunkList *list = new ChunkList();
1896
1897 uint32_t size = 0;
1898 for( int i = 0; i < iovcnt; ++i )
1899 {
1900 if( iov[i].iov_len == 0 ) continue;
1901 size += iov[i].iov_len;
1902 list->push_back( ChunkInfo( 0, iov[i].iov_len,
1903 (char*)iov[i].iov_base ) );
1904 }
1905
1906 req->requestid = kXR_write;
1907 req->offset = offset;
1908 req->dlen = size;
1909 memcpy( req->fhandle, self->pFileHandle, 4 );
1910
1911 MessageSendParams params;
1912 params.timeout = timeout;
1913 params.followRedirects = false;
1914 params.stateful = true;
1915 params.chunkList = list;
1916
1918
1920 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1921
1922 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1923 }
1924
1925 //------------------------------------------------------------------------
1926 // Read data into scattered buffers in one operation - async
1927 //------------------------------------------------------------------------
1928 XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1929 uint64_t offset,
1930 struct iovec *iov,
1931 int iovcnt,
1932 ResponseHandler *handler,
1933 time_t timeout )
1934 {
1935 XrdSysMutexHelper scopedLock( self->pMutex );
1936
1937 if( self->pFileState == Error ) return self->pStatus;
1938
1939 if( self->pFileState != Opened && self->pFileState != Recovering )
1941
1942 Log *log = DefaultEnv::GetLog();
1943 log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1944 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1945 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1946
1947 Message *msg;
1948 ClientReadRequest *req;
1949 MessageUtils::CreateRequest( msg, req );
1950
1951 // calculate the total read size
1952 size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1953 {
1954 return acc + rhs.iov_len;
1955 } );
1956 req->requestid = kXR_read;
1957 req->offset = offset;
1958 req->rlen = size;
1960 memcpy( req->fhandle, self->pFileHandle, 4 );
1961
1962 ChunkList *list = new ChunkList();
1963 list->reserve( iovcnt );
1964 uint64_t choff = offset;
1965 for( int i = 0; i < iovcnt; ++i )
1966 {
1967 list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1968 choff += iov[i].iov_len;
1969 }
1970
1972 MessageSendParams params;
1973 params.timeout = timeout;
1974 params.followRedirects = false;
1975 params.stateful = true;
1976 params.chunkList = list;
1978 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1979
1980 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1981 }
1982
1983
1984 //----------------------------------------------------------------------------
1985 // Performs a custom operation on an open file, server implementation
1986 // dependent - async
1987 //----------------------------------------------------------------------------
1988 XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1989 QueryCode::Code queryCode,
1990 const Buffer &arg,
1991 ResponseHandler *handler,
1992 time_t timeout )
1993 {
1994 XrdSysMutexHelper scopedLock( self->pMutex );
1995
1996 if( self->pFileState == Error ) return self->pStatus;
1997
1998 if( self->pFileState != Opened && self->pFileState != Recovering )
2000
2001 Log *log = DefaultEnv::GetLog();
2002 log->Debug( FileMsg, "[%p@%s] Sending a fcntl command for handle %#x to %s",
2003 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2004 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2005
2006 Message *msg;
2007 ClientQueryRequest *req;
2008 MessageUtils::CreateRequest( msg, req, arg.GetSize() );
2009
2010 req->requestid = kXR_query;
2011 req->infotype = queryCode;
2012 req->dlen = arg.GetSize();
2013 memcpy( req->fhandle, self->pFileHandle, 4 );
2014 msg->Append( arg.GetBuffer(), arg.GetSize(), sizeof(ClientQueryRequest) );
2015
2016 MessageSendParams params;
2017 params.timeout = timeout;
2018 params.followRedirects = false;
2019 params.stateful = true;
2021
2023 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2024
2025 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2026 }
2027
2028 //----------------------------------------------------------------------------
2029 // Get access token to a file - async
2030 //----------------------------------------------------------------------------
2031 XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
2032 ResponseHandler *handler,
2033 time_t timeout )
2034 {
2035 XrdSysMutexHelper scopedLock( self->pMutex );
2036
2037 if( self->pFileState == Error ) return self->pStatus;
2038
2039 if( self->pFileState != Opened && self->pFileState != Recovering )
2041
2042 Log *log = DefaultEnv::GetLog();
2043 log->Debug( FileMsg, "[%p@%s] Sending a visa command for handle %#x to %s",
2044 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2045 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2046
2047 Message *msg;
2048 ClientQueryRequest *req;
2049 MessageUtils::CreateRequest( msg, req );
2050
2051 req->requestid = kXR_query;
2052 req->infotype = kXR_Qvisa;
2053 memcpy( req->fhandle, self->pFileHandle, 4 );
2054
2055 MessageSendParams params;
2056 params.timeout = timeout;
2057 params.followRedirects = false;
2058 params.stateful = true;
2060
2062 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2063
2064 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2065 }
2066
2067 //------------------------------------------------------------------------
2068 // Set extended attributes - async
2069 //------------------------------------------------------------------------
2070 XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
2071 const std::vector<xattr_t> &attrs,
2072 ResponseHandler *handler,
2073 time_t timeout )
2074 {
2075 XrdSysMutexHelper scopedLock( self->pMutex );
2076
2077 if( self->pFileState == Error ) return self->pStatus;
2078
2079 if( self->pFileState != Opened && self->pFileState != Recovering )
2081
2082 Log *log = DefaultEnv::GetLog();
2083 log->Debug( FileMsg, "[%p@%s] Sending a fattr set command for handle %#x to %s",
2084 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2085 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2086
2087 //--------------------------------------------------------------------------
2088 // Issue a new fattr get request
2089 //--------------------------------------------------------------------------
2090 return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
2091 }
2092
2093 //------------------------------------------------------------------------
2094 // Get extended attributes - async
2095 //------------------------------------------------------------------------
2096 XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
2097 const std::vector<std::string> &attrs,
2098 ResponseHandler *handler,
2099 time_t timeout )
2100 {
2101 XrdSysMutexHelper scopedLock( self->pMutex );
2102
2103 if( self->pFileState == Error ) return self->pStatus;
2104
2105 if( self->pFileState != Opened && self->pFileState != Recovering )
2107
2108 Log *log = DefaultEnv::GetLog();
2109 log->Debug( FileMsg, "[%p@%s] Sending a fattr get command for handle %#x to %s",
2110 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2111 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2112
2113 //--------------------------------------------------------------------------
2114 // Issue a new fattr get request
2115 //--------------------------------------------------------------------------
2116 return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
2117 }
2118
2119 //------------------------------------------------------------------------
2120 // Delete extended attributes - async
2121 //------------------------------------------------------------------------
2122 XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
2123 const std::vector<std::string> &attrs,
2124 ResponseHandler *handler,
2125 time_t timeout )
2126 {
2127 XrdSysMutexHelper scopedLock( self->pMutex );
2128
2129 if( self->pFileState == Error ) return self->pStatus;
2130
2131 if( self->pFileState != Opened && self->pFileState != Recovering )
2133
2134 Log *log = DefaultEnv::GetLog();
2135 log->Debug( FileMsg, "[%p@%s] Sending a fattr del command for handle %#x to %s",
2136 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2137 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2138
2139 //--------------------------------------------------------------------------
2140 // Issue a new fattr del request
2141 //--------------------------------------------------------------------------
2142 return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2143 }
2144
2145 //------------------------------------------------------------------------
2146 // List extended attributes - async
2147 //------------------------------------------------------------------------
2148 XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2149 ResponseHandler *handler,
2150 time_t timeout )
2151 {
2152 XrdSysMutexHelper scopedLock( self->pMutex );
2153
2154 if( self->pFileState == Error ) return self->pStatus;
2155
2156 if( self->pFileState != Opened && self->pFileState != Recovering )
2158
2159 Log *log = DefaultEnv::GetLog();
2160 log->Debug( FileMsg, "[%p@%s] Sending a fattr list command for handle %#x to %s",
2161 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2162 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2163
2164 //--------------------------------------------------------------------------
2165 // Issue a new fattr get request
2166 //--------------------------------------------------------------------------
2167 static const std::vector<std::string> nothing;
2168 return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2169 nothing, handler, timeout );
2170 }
2171
2172 //------------------------------------------------------------------------
2182 //------------------------------------------------------------------------
2183 XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2184 kXR_char code,
2185 ResponseHandler *handler,
2186 time_t timeout )
2187 {
2188 XrdSysMutexHelper scopedLock( self->pMutex );
2189
2190 if( self->pFileState == Error ) return self->pStatus;
2191
2192 if( self->pFileState != Opened && self->pFileState != Recovering )
2194
2195 Log *log = DefaultEnv::GetLog();
2196 log->Debug( FileMsg, "[%p@%s] Sending a checkpoint command for handle %#x to %s",
2197 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2198 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2199
2200 Message *msg;
2202 MessageUtils::CreateRequest( msg, req );
2203
2204 req->requestid = kXR_chkpoint;
2205 req->opcode = code;
2206 memcpy( req->fhandle, self->pFileHandle, 4 );
2207
2208 MessageSendParams params;
2209 params.timeout = timeout;
2210 params.followRedirects = false;
2211 params.stateful = true;
2212
2214
2216 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2217
2218 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2219 }
2220
2221 //------------------------------------------------------------------------
2231 //------------------------------------------------------------------------
2232 XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2233 uint64_t offset,
2234 uint32_t size,
2235 const void *buffer,
2236 ResponseHandler *handler,
2237 time_t timeout )
2238 {
2239 XrdSysMutexHelper scopedLock( self->pMutex );
2240
2241 if( self->pFileState == Error ) return self->pStatus;
2242
2243 if( self->pFileState != Opened && self->pFileState != Recovering )
2245
2246 Log *log = DefaultEnv::GetLog();
2247 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2248 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2249 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2250
2251 Message *msg;
2253 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2254
2255 req->requestid = kXR_chkpoint;
2256 req->opcode = kXR_ckpXeq;
2257 req->dlen = 24; // as specified in the protocol specification
2258 memcpy( req->fhandle, self->pFileHandle, 4 );
2259
2261 wrtreq->requestid = kXR_write;
2262 wrtreq->offset = offset;
2263 wrtreq->dlen = size;
2264 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2265
2266 ChunkList *list = new ChunkList();
2267 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2268
2269 MessageSendParams params;
2270 params.timeout = timeout;
2271 params.followRedirects = false;
2272 params.stateful = true;
2273 params.chunkList = list;
2274
2276
2278 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2279
2280 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2281 }
2282
2283 //------------------------------------------------------------------------
2293 //------------------------------------------------------------------------
2294 XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2295 uint64_t offset,
2296 const struct iovec *iov,
2297 int iovcnt,
2298 ResponseHandler *handler,
2299 time_t timeout )
2300 {
2301 XrdSysMutexHelper scopedLock( self->pMutex );
2302
2303 if( self->pFileState == Error ) return self->pStatus;
2304
2305 if( self->pFileState != Opened && self->pFileState != Recovering )
2307
2308 Log *log = DefaultEnv::GetLog();
2309 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2310 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2311 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2312
2313 Message *msg;
2315 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2316
2317 req->requestid = kXR_chkpoint;
2318 req->opcode = kXR_ckpXeq;
2319 req->dlen = 24; // as specified in the protocol specification
2320 memcpy( req->fhandle, self->pFileHandle, 4 );
2321
2322 ChunkList *list = new ChunkList();
2323 uint32_t size = 0;
2324 for( int i = 0; i < iovcnt; ++i )
2325 {
2326 if( iov[i].iov_len == 0 ) continue;
2327 size += iov[i].iov_len;
2328 list->push_back( ChunkInfo( 0, iov[i].iov_len,
2329 (char*)iov[i].iov_base ) );
2330 }
2331
2333 wrtreq->requestid = kXR_write;
2334 wrtreq->offset = offset;
2335 wrtreq->dlen = size;
2336 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2337
2338 MessageSendParams params;
2339 params.timeout = timeout;
2340 params.followRedirects = false;
2341 params.stateful = true;
2342 params.chunkList = list;
2343
2345
2347 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2348
2349 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2350 }
2351
2352 //----------------------------------------------------------------------------
2353 // Check if the file is open
2354 //----------------------------------------------------------------------------
2356 {
2357 XrdSysMutexHelper scopedLock( pMutex );
2358
2359 if( pFileState == Opened || pFileState == Recovering )
2360 return true;
2361 return false;
2362 }
2363
2364 //----------------------------------------------------------------------------
2365 // Set file property
2366 //----------------------------------------------------------------------------
2367 bool FileStateHandler::SetProperty( const std::string &name,
2368 const std::string &value )
2369 {
2370 XrdSysMutexHelper scopedLock( pMutex );
2371 if( name == "ReadRecovery" )
2372 {
2373 if( value == "true" ) pDoRecoverRead = true;
2374 else pDoRecoverRead = false;
2375 return true;
2376 }
2377 else if( name == "WriteRecovery" )
2378 {
2379 if( value == "true" ) pDoRecoverWrite = true;
2380 else pDoRecoverWrite = false;
2381 return true;
2382 }
2383 else if( name == "FollowRedirects" )
2384 {
2385 if( value == "true" ) pFollowRedirects = true;
2386 else pFollowRedirects = false;
2387 return true;
2388 }
2389 else if( name == "BundledClose" )
2390 {
2391 if( value == "true" ) pAllowBundledClose = true;
2392 else pAllowBundledClose = false;
2393 return true;
2394 }
2395 return false;
2396 }
2397
2398 //----------------------------------------------------------------------------
2399 // Get file property
2400 //----------------------------------------------------------------------------
2401 bool FileStateHandler::GetProperty( const std::string &name,
2402 std::string &value ) const
2403 {
2404 XrdSysMutexHelper scopedLock( pMutex );
2405 if( name == "ReadRecovery" )
2406 {
2407 if( pDoRecoverRead ) value = "true";
2408 else value = "false";
2409 return true;
2410 }
2411 else if( name == "WriteRecovery" )
2412 {
2413 if( pDoRecoverWrite ) value = "true";
2414 else value = "false";
2415 return true;
2416 }
2417 else if( name == "FollowRedirects" )
2418 {
2419 if( pFollowRedirects ) value = "true";
2420 else value = "false";
2421 return true;
2422 }
2423 else if( name == "DataServer" && pDataServer )
2424 { value = pDataServer->GetHostId(); return true; }
2425 else if( name == "LastURL" && pDataServer )
2426 { value = pDataServer->GetURL(); return true; }
2427 else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2428 { value = pWrtRecoveryRedir->GetHostId(); return true; }
2429 value = "";
2430 return false;
2431 }
2432
2433 //----------------------------------------------------------------------------
2434 // Process the results of the opening operation
2435 //----------------------------------------------------------------------------
2437 const OpenInfo *openInfo,
2438 const HostList *hostList )
2439 {
2440 Log *log = DefaultEnv::GetLog();
2441 XrdSysMutexHelper scopedLock( pMutex );
2442
2443 //--------------------------------------------------------------------------
2444 // Assign the data server and the load balancer
2445 //--------------------------------------------------------------------------
2446 std::string lastServer = pFileUrl->GetHostId();
2447 if( hostList )
2448 {
2449 delete pDataServer;
2450 delete pLoadBalancer;
2451 pLoadBalancer = 0;
2452 delete pWrtRecoveryRedir;
2453 pWrtRecoveryRedir = 0;
2454
2455 pDataServer = new URL( hostList->back().url );
2456 pDataServer->SetParams( pFileUrl->GetParams() );
2457 if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2458 lastServer = pDataServer->GetHostId();
2459 HostList::const_iterator itC;
2460 URL::ParamsMap params = pDataServer->GetParams();
2461 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2462 {
2463 MessageUtils::MergeCGI( params,
2464 itC->url.GetParams(),
2465 true );
2466 }
2467 pDataServer->SetParams( params );
2468
2469 HostList::const_reverse_iterator it;
2470 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2471 if( it->loadBalancer )
2472 {
2473 pLoadBalancer = new URL( it->url );
2474 break;
2475 }
2476
2477 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2478 if( it->flags & kXR_recoverWrts )
2479 {
2480 pWrtRecoveryRedir = new URL( it->url );
2481 break;
2482 }
2483 }
2484
2485 log->Debug(FileMsg, "[%p@%s] Open has returned with status %s",
2486 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), status->ToStr().c_str() );
2487
2488 if( pDataServer && !pDataServer->IsLocalFile() )
2489 {
2490 //------------------------------------------------------------------------
2491 // Check if we are using a secure connection
2492 //------------------------------------------------------------------------
2493 XrdCl::AnyObject isencobj;
2495 QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2496 if( st.IsOK() )
2497 {
2498 bool *isenc;
2499 isencobj.Get( isenc );
2500 pIsChannelEncrypted = isenc ? *isenc : false;
2501 delete isenc;
2502 }
2503 }
2504
2505 //--------------------------------------------------------------------------
2506 // We have failed
2507 //--------------------------------------------------------------------------
2508 pStatus = *status;
2509 if( !pStatus.IsOK() || !openInfo )
2510 {
2511 log->Debug(FileMsg, "[%p@%s] Error while opening at %s: %s",
2512 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2513 pStatus.ToStr().c_str() );
2514 FailQueuedMessages( pStatus );
2515 pFileState = Error;
2516
2517 //------------------------------------------------------------------------
2518 // Report to monitoring
2519 //------------------------------------------------------------------------
2521 if( mon )
2522 {
2524 i.file = pFileUrl;
2525 i.status = status;
2527 mon->Event( Monitor::EvErrIO, &i );
2528 }
2529 }
2530 //--------------------------------------------------------------------------
2531 // We have succeeded
2532 //--------------------------------------------------------------------------
2533 else
2534 {
2535 //------------------------------------------------------------------------
2536 // if requested file colocation or dup was done, don't do again on reopen
2537 //------------------------------------------------------------------------
2538 pOpenFlags &= ~(OpenFlags::Dup | OpenFlags::Samefs);
2539
2540 //------------------------------------------------------------------------
2541 // Store the response info
2542 //------------------------------------------------------------------------
2543 openInfo->GetFileHandle( pFileHandle );
2544 pSessionId = openInfo->GetSessionId();
2545 if( openInfo->GetStatInfo() )
2546 {
2547 delete pStatInfo;
2548 pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2549 }
2550
2551 log->Debug( FileMsg, "[%p@%s] successfully opened at %s, handle: %#x, "
2552 "session id: %llu", (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
2553 pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2554 (unsigned long long) pSessionId );
2555
2556 //------------------------------------------------------------------------
2557 // Inform the monitoring about opening success
2558 //------------------------------------------------------------------------
2559 gettimeofday( &pOpenTime, 0 );
2561 if( mon )
2562 {
2564 i.file = pFileUrl;
2565 i.dataServer = pDataServer->GetHostId();
2566 i.oFlags = pOpenFlags;
2567 i.oFlags2 = pOpenFlags>>16;
2568 i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2569 mon->Event( Monitor::EvOpen, &i );
2570 }
2571
2572 //------------------------------------------------------------------------
2573 // Resend the queued messages if any
2574 //------------------------------------------------------------------------
2575 ReSendQueuedMessages();
2576 pFileState = Opened;
2577 }
2578 }
2579
2580 //----------------------------------------------------------------------------
2581 // Process the results of the closing operation
2582 //----------------------------------------------------------------------------
2584 {
2585 Log *log = DefaultEnv::GetLog();
2586 XrdSysMutexHelper scopedLock( pMutex );
2587
2588 log->Debug(FileMsg, "[%p@%s] Close returned from %s with: %s", (void*)this,
2589 pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2590 status->ToStr().c_str() );
2591
2592 log->Dump(FileMsg, "[%p@%s] Items in the fly %zu, queued for recovery %zu",
2593 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2594
2595 MonitorClose( status );
2596 ResetMonitoringVars();
2597
2598 pStatus = *status;
2599 pFileState = Closed;
2600 }
2601
2602 //----------------------------------------------------------------------------
2603 // Handle an error while sending a stateful message
2604 //----------------------------------------------------------------------------
2605 void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2606 XRootDStatus *status,
2607 Message *message,
2608 ResponseHandler *userHandler,
2609 MessageSendParams &sendParams )
2610 {
2611 //--------------------------------------------------------------------------
2612 // It may be a redirection
2613 //--------------------------------------------------------------------------
2614 if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2615 {
2616 static const std::string root = "root", xroot = "xroot", file = "file",
2617 roots = "roots", xroots = "xroots";
2618 std::string msg = status->GetErrorMessage();
2619 if( !msg.compare( 0, root.size(), root ) ||
2620 !msg.compare( 0, xroot.size(), xroot ) ||
2621 !msg.compare( 0, file.size(), file ) ||
2622 !msg.compare( 0, roots.size(), roots ) ||
2623 !msg.compare( 0, xroots.size(), xroots ) )
2624 {
2625 FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2626 return;
2627 }
2628 }
2629
2630 //--------------------------------------------------------------------------
2631 // Handle error
2632 //--------------------------------------------------------------------------
2633 Log *log = DefaultEnv::GetLog();
2634 XrdSysMutexHelper scopedLock( self->pMutex );
2635 self->pInTheFly.erase( message );
2636
2637 log->Dump( FileMsg, "[%p@%s] File state error encountered. Message %s "
2638 "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2639 message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2640
2641 //--------------------------------------------------------------------------
2642 // Report to monitoring
2643 //--------------------------------------------------------------------------
2645 if( mon )
2646 {
2648 i.file = self->pFileUrl;
2649 i.status = status;
2650
2651 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2652 switch( req->header.requestid )
2653 {
2661 }
2662
2663 mon->Event( Monitor::EvErrIO, &i );
2664 }
2665
2666 //--------------------------------------------------------------------------
2667 // The message is not recoverable
2668 // (message using a kernel buffer is not recoverable by definition)
2669 //--------------------------------------------------------------------------
2670 if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2671 {
2672 log->Error( FileMsg, "[%p@%s] Fatal file state error. Message %s "
2673 "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2674 message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2675
2676 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2677 delete status;
2678 return;
2679 }
2680
2681 //--------------------------------------------------------------------------
2682 // Insert the message to the recovery queue and start the recovery
2683 // procedure if we don't have any more message in the fly
2684 //--------------------------------------------------------------------------
2685 self->pCloseReason = *status;
2686 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2687 delete status;
2688 }
2689
2690 //----------------------------------------------------------------------------
2691 // Handle stateful redirect
2692 //----------------------------------------------------------------------------
2693 void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2694 const std::string &redirectUrl,
2695 Message *message,
2696 ResponseHandler *userHandler,
2697 MessageSendParams &sendParams )
2698 {
2699 XrdSysMutexHelper scopedLock( self->pMutex );
2700 self->pInTheFly.erase( message );
2701
2702 //--------------------------------------------------------------------------
2703 // Register the state redirect url and append the new cgi information to
2704 // the file URL
2705 //--------------------------------------------------------------------------
2706 if( !self->pStateRedirect )
2707 {
2708 std::ostringstream o;
2709 self->pStateRedirect = new URL( redirectUrl );
2710 URL::ParamsMap params = self->pFileUrl->GetParams();
2711 MessageUtils::MergeCGI( params,
2712 self->pStateRedirect->GetParams(),
2713 false );
2714 self->pFileUrl->SetParams( params );
2715 }
2716
2717 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2718 }
2719
2720 //----------------------------------------------------------------------------
2721 // Handle stateful response
2722 //----------------------------------------------------------------------------
2723 void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2724 XRootDStatus *status,
2725 Message *message,
2726 AnyObject *response,
2727 HostList */*urlList*/ )
2728 {
2729 Log *log = DefaultEnv::GetLog();
2730 XrdSysMutexHelper scopedLock( self->pMutex );
2731
2732 log->Dump( FileMsg, "[%p@%s] Got state response for message %s",
2733 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2734 message->GetObfuscatedDescription().c_str() );
2735
2736 //--------------------------------------------------------------------------
2737 // Since this message may be the last "in-the-fly" and no recovery
2738 // is done if messages are in the fly, we may need to trigger recovery
2739 //--------------------------------------------------------------------------
2740 self->pInTheFly.erase( message );
2741 RunRecovery( self );
2742
2743 //--------------------------------------------------------------------------
2744 // Play with the actual response before returning it. This is a good
2745 // place to do caching in the future.
2746 //--------------------------------------------------------------------------
2747 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2748 switch( req->header.requestid )
2749 {
2750 //------------------------------------------------------------------------
2751 // Cache the stat response
2752 //------------------------------------------------------------------------
2753 case kXR_stat:
2754 {
2755 StatInfo *info = 0;
2756 response->Get( info );
2757 delete self->pStatInfo;
2758 self->pStatInfo = new StatInfo( *info );
2759 break;
2760 }
2761
2762 //------------------------------------------------------------------------
2763 // Handle read response
2764 //------------------------------------------------------------------------
2765 case kXR_read:
2766 {
2767 ++self->pRCount;
2768 self->pRBytes += req->read.rlen;
2769 break;
2770 }
2771
2772 //------------------------------------------------------------------------
2773 // Handle read response
2774 //------------------------------------------------------------------------
2775 case kXR_pgread:
2776 {
2777 ++self->pRCount;
2778 self->pRBytes += req->pgread.rlen;
2779 break;
2780 }
2781
2782 //------------------------------------------------------------------------
2783 // Handle readv response
2784 //------------------------------------------------------------------------
2785 case kXR_readv:
2786 {
2787 ++self->pVRCount;
2788 size_t segs = req->header.dlen/sizeof(readahead_list);
2789 readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2790 for( size_t i = 0; i < segs; ++i )
2791 self->pVRBytes += dataChunk[i].rlen;
2792 self->pVSegs += segs;
2793 break;
2794 }
2795
2796 //------------------------------------------------------------------------
2797 // Handle write response
2798 //------------------------------------------------------------------------
2799 case kXR_write:
2800 {
2801 ++self->pWCount;
2802 self->pWBytes += req->write.dlen;
2803 break;
2804 }
2805
2806 //------------------------------------------------------------------------
2807 // Handle write response
2808 //------------------------------------------------------------------------
2809 case kXR_pgwrite:
2810 {
2811 ++self->pWCount;
2812 self->pWBytes += req->pgwrite.dlen;
2813 break;
2814 }
2815
2816 //------------------------------------------------------------------------
2817 // Handle writev response
2818 //------------------------------------------------------------------------
2819 case kXR_writev:
2820 {
2821 ++self->pVWCount;
2822 size_t size = req->header.dlen/sizeof(readahead_list);
2823 XrdProto::write_list *wrtList =
2824 reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2825 for( size_t i = 0; i < size; ++i )
2826 self->pVWBytes += wrtList[i].wlen;
2827 break;
2828 }
2829 };
2830 }
2831
2832 //------------------------------------------------------------------------
2834 //------------------------------------------------------------------------
2835 void FileStateHandler::Tick( time_t now )
2836 {
2837 if (pMutex.CondLock())
2838 {TimeOutRequests( now );
2839 pMutex.UnLock();
2840 }
2841 }
2842
2843 //----------------------------------------------------------------------------
2844 // Declare timeout on requests being recovered
2845 //----------------------------------------------------------------------------
2847 {
2848 if( !pToBeRecovered.empty() )
2849 {
2850 Log *log = DefaultEnv::GetLog();
2851 log->Dump( FileMsg, "[%p@%s] Got a timer event", (void*)this,
2852 pFileUrl->GetObfuscatedURL().c_str() );
2853 RequestList::iterator it;
2855 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2856 {
2857 if( it->params.expires <= now )
2858 {
2859 jobMan->QueueJob( new ResponseJob(
2860 it->handler,
2862 0, it->params.hostList ) );
2863 it = pToBeRecovered.erase( it );
2864 }
2865 else
2866 ++it;
2867 }
2868 }
2869 }
2870
2871 //----------------------------------------------------------------------------
2872 // Called in the child process after the fork
2873 //----------------------------------------------------------------------------
2875 {
2876 Log *log = DefaultEnv::GetLog();
2877
2878 if( pFileState == Closed || pFileState == Error )
2879 return;
2880
2881 if( (IsReadOnly() && pDoRecoverRead) ||
2882 (!IsReadOnly() && pDoRecoverWrite) )
2883 {
2884 log->Debug( FileMsg, "[%p@%s] Putting the file in recovery state in "
2885 "process %d", (void*)this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2886 pFileState = Recovering;
2887 pInTheFly.clear();
2888 pToBeRecovered.clear();
2889 }
2890 else
2891 pFileState = Error;
2892 }
2893
2894 //------------------------------------------------------------------------
2895 // Try other data server
2896 //------------------------------------------------------------------------
2897 XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, time_t timeout )
2898 {
2899 XrdSysMutexHelper scopedLock( self->pMutex );
2900
2901 if( self->pFileState != Opened || !self->pLoadBalancer )
2903
2904 self->pFileState = Recovering;
2905
2906 Log *log = DefaultEnv::GetLog();
2907 log->Debug( FileMsg, "[%p@%s] Reopen file at next data server.",
2908 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2909
2910 // merge CGI
2911 auto lbcgi = self->pLoadBalancer->GetParams();
2912 auto dtcgi = self->pDataServer->GetParams();
2913 MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2914 // update tried CGI
2915 auto itr = lbcgi.find( "tried" );
2916 if( itr == lbcgi.end() )
2917 lbcgi["tried"] = self->pDataServer->GetHostName();
2918 else
2919 {
2920 std::string tried = itr->second;
2921 tried += "," + self->pDataServer->GetHostName();
2922 lbcgi["tried"] = tried;
2923 }
2924 self->pLoadBalancer->SetParams( lbcgi );
2925
2926 return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2927 }
2928
2929 //------------------------------------------------------------------------
2930 // Generic implementation of xattr operation
2931 //------------------------------------------------------------------------
2932 template<typename T>
2933 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2934 kXR_char subcode,
2935 kXR_char options,
2936 const std::vector<T> &attrs,
2937 ResponseHandler *handler,
2938 time_t timeout )
2939 {
2940 //--------------------------------------------------------------------------
2941 // Issue a new fattr request
2942 //--------------------------------------------------------------------------
2943 Message *msg;
2944 ClientFattrRequest *req;
2945 MessageUtils::CreateRequest( msg, req );
2946
2947 req->requestid = kXR_fattr;
2948 req->subcode = subcode;
2949 req->numattr = attrs.size();
2950 req->options = options;
2951 memcpy( req->fhandle, self->pFileHandle, 4 );
2953 if( !st.IsOK() ) return st;
2954
2955 MessageSendParams params;
2956 params.timeout = timeout;
2957 params.followRedirects = false;
2958 params.stateful = true;
2960
2962 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2963
2964 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2965 }
2966
2967 //----------------------------------------------------------------------------
2968 // Send a message to a host or put it in the recovery queue
2969 //----------------------------------------------------------------------------
2970 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2971 const URL &url,
2972 Message *msg,
2973 ResponseHandler *handler,
2974 MessageSendParams &sendParams )
2975 {
2976 //--------------------------------------------------------------------------
2977 // Recovering
2978 //--------------------------------------------------------------------------
2979 if( self->pFileState == Recovering )
2980 {
2981 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2982 }
2983
2984 //--------------------------------------------------------------------------
2985 // Trying to send
2986 //--------------------------------------------------------------------------
2987 if( self->pFileState == Opened )
2988 {
2989 msg->SetSessionId( self->pSessionId );
2990 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2991
2992 //------------------------------------------------------------------------
2993 // Invalid session id means that the connection has been broken while we
2994 // were idle so we haven't been informed about this fact earlier.
2995 //------------------------------------------------------------------------
2996 if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
2997 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2998
2999 if( st.IsOK() )
3000 self->pInTheFly.insert(msg);
3001 else
3002 delete handler;
3003 return st;
3004 }
3005 return Status( stError, errInvalidOp );
3006 }
3007
3008 //----------------------------------------------------------------------------
3009 // Check if the stateful error is recoverable
3010 //----------------------------------------------------------------------------
3011 bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
3012 {
3013 const auto recoverable_errors = {
3020 };
3021
3022 if (pDoRecoverRead || pDoRecoverWrite)
3023 for (const auto error : recoverable_errors)
3024 if (status.code == error)
3025 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
3026
3027 return false;
3028 }
3029
3030 //----------------------------------------------------------------------------
3031 // Check if the file is open for read only
3032 //----------------------------------------------------------------------------
3033 bool FileStateHandler::IsReadOnly() const
3034 {
3035 // Keeping the check for append (with a cast) as this was previously tested,
3036 // but OpenFlags::Flags does not currently enumerate the Append flag
3037 if( (pOpenFlags & OpenFlags::Read) && !(pOpenFlags & OpenFlags::Update) &&
3038 !(pOpenFlags & static_cast<OpenFlags::Flags>(kXR_open_apnd)) )
3039 return true;
3040 return false;
3041 }
3042
3043 //----------------------------------------------------------------------------
3044 // Recover a message
3045 //----------------------------------------------------------------------------
3046 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
3047 RequestData rd,
3048 bool callbackOnFailure )
3049 {
3050 self->pFileState = Recovering;
3051
3052 Log *log = DefaultEnv::GetLog();
3053 log->Dump( FileMsg, "[%p@%s] Putting message %s in the recovery list",
3054 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3055 rd.request->GetObfuscatedDescription().c_str() );
3056
3057 Status st = RunRecovery( self );
3058 if( st.IsOK() )
3059 {
3060 self->pToBeRecovered.push_back( rd );
3061 return st;
3062 }
3063
3064 if( callbackOnFailure )
3065 self->FailMessage( rd, st );
3066
3067 return st;
3068 }
3069
3070 //----------------------------------------------------------------------------
3071 // Run the recovery procedure if appropriate
3072 //----------------------------------------------------------------------------
3073 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
3074 {
3075 if( self->pFileState != Recovering )
3076 return Status();
3077
3078 if( !self->pInTheFly.empty() )
3079 return Status();
3080
3081 Log *log = DefaultEnv::GetLog();
3082 log->Debug( FileMsg, "[%p@%s] Running the recovery procedure", (void*)self.get(),
3083 self->pFileUrl->GetObfuscatedURL().c_str() );
3084
3085 Status st;
3086 if( self->pStateRedirect )
3087 {
3088 SendClose( self, 0 );
3089 st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
3090 delete self->pStateRedirect; self->pStateRedirect = 0;
3091 }
3092 else if( self->IsReadOnly() && self->pLoadBalancer )
3093 st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
3094 else
3095 st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
3096
3097 if( !st.IsOK() )
3098 {
3099 self->pFileState = Error;
3100 self->pStatus = st;
3101 self->FailQueuedMessages( st );
3102 }
3103
3104 return st;
3105 }
3106
3107 //----------------------------------------------------------------------------
3108 // Send a close and ignore the response
3109 //----------------------------------------------------------------------------
3110 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
3111 time_t timeout )
3112 {
3113 Message *msg;
3114 ClientCloseRequest *req;
3115 MessageUtils::CreateRequest( msg, req );
3116
3117 req->requestid = kXR_close;
3118 memcpy( req->fhandle, self->pFileHandle, 4 );
3119
3121 msg->SetSessionId( self->pSessionId );
3122 ResponseHandler *handler = ResponseHandler::Wrap(
3123 [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
3124 MessageSendParams params;
3125 params.timeout = timeout;
3126 params.followRedirects = false;
3127 params.stateful = true;
3128
3130
3131 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3132 }
3133
3134 //----------------------------------------------------------------------------
3135 // Re-open the current file at a given server
3136 //----------------------------------------------------------------------------
3137 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3138 const URL &url,
3139 time_t timeout )
3140 {
3141 Log *log = DefaultEnv::GetLog();
3142 log->Dump( FileMsg, "[%p@%s] Sending a recovery open command to %s",
3143 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.GetObfuscatedURL().c_str() );
3144
3145 //--------------------------------------------------------------------------
3146 // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3147 // procedure to delete a file that has been partially updated or fail it
3148 // because a partially uploaded file already exists.
3149 //--------------------------------------------------------------------------
3150 if( self->pOpenFlags & OpenFlags::Delete)
3151 {
3152 self->pOpenFlags &= ~OpenFlags::Delete;
3153 self->pOpenFlags |= OpenFlags::Update;
3154 }
3155
3156 self->pOpenFlags &= ~OpenFlags::New;
3157
3158 Message *msg;
3159 ClientOpenRequest *req;
3160 URL u = url;
3161
3162 if( url.GetPath().empty() )
3163 u.SetPath( self->pFileUrl->GetPath() );
3164
3165 std::string path = u.GetPathWithFilteredParams();
3166 MessageUtils::CreateRequest( msg, req, path.length() );
3167
3168 req->requestid = kXR_open;
3169 req->mode = self->pOpenMode;
3170 req->options = (self->pOpenFlags & 0xffff);
3171 req->dlen = path.length();
3172 URL sendUrl;
3173 XRootDStatus st = FillFhTempl( self, url, msg, sendUrl );
3174 if( !st.IsOK() )
3175 {
3176 self->pStatus = st;
3177 self->pFileState = Closed;
3178 return st;
3179 }
3180 msg->Append( path.c_str(), path.length(), 24 );
3181
3182 // create a new reopen handler
3183 // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3184 // until we know that 'SendMessage' was successful)
3185 OpenHandler *openHandler = new OpenHandler( self, 0 );
3186 MessageSendParams params; params.timeout = timeout;
3189
3190 //--------------------------------------------------------------------------
3191 // Issue the open request
3192 //--------------------------------------------------------------------------
3193 st = self->IssueRequest( sendUrl, msg, openHandler, params );
3194
3195 // if there was a problem destroy the open handler
3196 if( !st.IsOK() )
3197 {
3198 delete openHandler;
3199 self->pStatus = st;
3200 self->pFileState = Closed;
3201 }
3202 return st;
3203 }
3204
3205 //------------------------------------------------------------------------
3206 // Fail a message
3207 //------------------------------------------------------------------------
3208 void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3209 {
3210 Log *log = DefaultEnv::GetLog();
3211 log->Dump( FileMsg, "[%p@%s] Failing message %s with %s",
3212 (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3213 rd.request->GetObfuscatedDescription().c_str(),
3214 status.ToStr().c_str() );
3215
3216 StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3217 if( !sh )
3218 {
3219 Log *log = DefaultEnv::GetLog();
3220 log->Error( FileMsg, "[%p@%s] Internal error while recovering %s",
3221 (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3222 rd.request->GetObfuscatedDescription().c_str() );
3223 return;
3224 }
3225
3226 JobManager *jobMan = DefaultEnv::GetPostMaster()->GetJobManager();
3227 ResponseHandler *userHandler = sh->GetUserHandler();
3228 jobMan->QueueJob( new ResponseJob(
3229 userHandler,
3230 new XRootDStatus( status ),
3231 0, rd.params.hostList ) );
3232
3233 delete sh;
3234 }
3235
3236 //----------------------------------------------------------------------------
3237 // Fail queued messages
3238 //----------------------------------------------------------------------------
3239 void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3240 {
3241 RequestList::iterator it;
3242 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3243 FailMessage( *it, status );
3244 pToBeRecovered.clear();
3245 }
3246
3247 //------------------------------------------------------------------------
3248 // Re-send queued messages
3249 //------------------------------------------------------------------------
3250 void FileStateHandler::ReSendQueuedMessages()
3251 {
3252 RequestList::iterator it;
3253 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3254 {
3255 it->request->SetSessionId( pSessionId );
3256 ReWriteFileHandle( it->request );
3257 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3258 it->handler, it->params );
3259 if( !st.IsOK() )
3260 FailMessage( *it, st );
3261 }
3262 pToBeRecovered.clear();
3263 }
3264
3265 //------------------------------------------------------------------------
3266 // Re-write file handle
3267 //------------------------------------------------------------------------
3268 void FileStateHandler::ReWriteFileHandle( Message *msg )
3269 {
3270 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
3271 switch( hdr->requestid )
3272 {
3273 case kXR_read:
3274 {
3275 ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
3276 memcpy( req->fhandle, pFileHandle, 4 );
3277 break;
3278 }
3279 case kXR_write:
3280 {
3281 ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
3282 memcpy( req->fhandle, pFileHandle, 4 );
3283 break;
3284 }
3285 case kXR_sync:
3286 {
3287 ClientSyncRequest *req = (ClientSyncRequest*)msg->GetBuffer();
3288 memcpy( req->fhandle, pFileHandle, 4 );
3289 break;
3290 }
3291 case kXR_truncate:
3292 {
3293 ClientTruncateRequest *req = (ClientTruncateRequest*)msg->GetBuffer();
3294 memcpy( req->fhandle, pFileHandle, 4 );
3295 break;
3296 }
3297 case kXR_readv:
3298 {
3299 ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
3300 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3301 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3302 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3303 break;
3304 }
3305 case kXR_writev:
3306 {
3307 ClientWriteVRequest *req =
3308 reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3309 XrdProto::write_list *wrtList =
3310 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3311 size_t size = req->dlen / sizeof(XrdProto::write_list);
3312 for( size_t i = 0; i < size; ++i )
3313 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3314 break;
3315 }
3316 case kXR_pgread:
3317 {
3318 ClientPgReadRequest *req = (ClientPgReadRequest*) msg->GetBuffer();
3319 memcpy( req->fhandle, pFileHandle, 4 );
3320 break;
3321 }
3322 case kXR_pgwrite:
3323 {
3324 ClientPgWriteRequest *req = (ClientPgWriteRequest*) msg->GetBuffer();
3325 memcpy( req->fhandle, pFileHandle, 4 );
3326 break;
3327 }
3328 }
3329
3330 Log *log = DefaultEnv::GetLog();
3331 log->Dump( FileMsg, "[%p@%s] Rewritten file handle for %s to %#x",
3332 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), msg->GetObfuscatedDescription().c_str(),
3333 *((uint32_t*)pFileHandle) );
3335 }
3336
3337 //----------------------------------------------------------------------------
3338 // Dispatch monitoring information on close
3339 //----------------------------------------------------------------------------
3340 void FileStateHandler::MonitorClose( const XRootDStatus *status )
3341 {
3342 Monitor *mon = DefaultEnv::GetMonitor();
3343 if( mon )
3344 {
3345 Monitor::CloseInfo i;
3346 i.file = pFileUrl;
3347 i.oTOD = pOpenTime;
3348 gettimeofday( &i.cTOD, 0 );
3349 i.rBytes = pRBytes;
3350 i.vrBytes = pVRBytes;
3351 i.wBytes = pWBytes;
3352 i.vwBytes = pVWBytes;
3353 i.vSegs = pVSegs;
3354 i.rCount = pRCount;
3355 i.vCount = pVRCount;
3356 i.wCount = pWCount;
3357 i.status = status;
3358 mon->Event( Monitor::EvClose, &i );
3359 }
3360 }
3361
3362 XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3363 Message *msg,
3364 ResponseHandler *handler,
3365 MessageSendParams &sendParams )
3366 {
3367 // first handle Metalinks
3368 if( pUseVirtRedirector && url.IsMetalink() )
3369 return MessageUtils::RedirectMessage( url, msg, handler,
3370 sendParams, pLFileHandler );
3371
3372 // than local file access
3373 if( url.IsLocalFile() )
3374 return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3375
3376 // and finally ordinary XRootD requests
3377 return MessageUtils::SendMessage( url, msg, handler,
3378 sendParams, pLFileHandler );
3379 }
3380
3381 //------------------------------------------------------------------------
3382 // Send a write request with payload being stored in a kernel buffer
3383 //------------------------------------------------------------------------
3384 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3385 uint64_t offset,
3386 uint32_t length,
3387 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3388 ResponseHandler *handler,
3389 time_t timeout )
3390 {
3391 //--------------------------------------------------------------------------
3392 // Create the write request
3393 //--------------------------------------------------------------------------
3394 XrdSysMutexHelper scopedLock( self->pMutex );
3395
3396 if( self->pFileState != Opened && self->pFileState != Recovering )
3397 return XRootDStatus( stError, errInvalidOp );
3398
3399 Log *log = DefaultEnv::GetLog();
3400 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
3401 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3402 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3403
3404 Message *msg;
3405 ClientWriteRequest *req;
3406 MessageUtils::CreateRequest( msg, req );
3407
3408 req->requestid = kXR_write;
3409 req->offset = offset;
3410 req->dlen = length;
3411 memcpy( req->fhandle, self->pFileHandle, 4 );
3412
3413 MessageSendParams params;
3414 params.timeout = timeout;
3415 params.followRedirects = false;
3416 params.stateful = true;
3417 params.kbuff = kbuff.release();
3418 params.chunkList = new ChunkList();
3419
3421
3423 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3424
3425 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3426 }
3427
3428 //------------------------------------------------------------------------
3429 // Fills in the file template value and optiont fields that need the
3430 // template (i.e. samefs and dup) in an Open message request
3431 //------------------------------------------------------------------------
3432 XRootDStatus FileStateHandler::FillFhTempl(
3433 std::shared_ptr<FileStateHandler> &self,
3434 const URL &url, Message *msg, URL &sendUrl)
3435 {
3436 ClientOpenRequest *req = (ClientOpenRequest*)msg->GetBuffer();
3437 sendUrl = url;
3438
3439 if( !self->NeedFileTempl() )
3440 {
3441 // template file not requireed
3442 return XRootDStatus();
3443 }
3444
3445 using wp = std::weak_ptr<FileStateHandler>;
3446 if( !self->pTemplateFileWp.owner_before(wp{}) &&
3447 !wp{}.owner_before(self->pTemplateFileWp) )
3448 {
3449 // no tempalte file was set
3450 return XRootDStatus( stError, errInvalidArgs, 0,
3451 "File flags required a template file" );
3452 }
3453
3454 // all the options that need template
3455 if( self->pOpenFlags & OpenFlags::Dup )
3456 req->optiont |= kXR_dup;
3457 if( self->pOpenFlags & OpenFlags::Samefs )
3458 req->optiont |= kXR_samefs;
3459
3460 std::shared_ptr<FileStateHandler> tfp = self->pTemplateFileWp.lock();
3461 if(!tfp)
3462 return XRootDStatus( stError, errInvalidArgs, 0,
3463 "Template file object does not exist" );
3464
3465 XrdSysMutexHelper scopedLock( tfp->pMutex );
3466
3467 if( tfp->pFileState != Opened )
3468 return XRootDStatus( stError, errInvalidOp, 0,
3469 "Template file not open" );
3470
3471 if (!tfp->pDataServer || !tfp->pFileHandle)
3472 return XRootDStatus( stError, errInvalidArgs, 0,
3473 "Template file not connected" );
3474
3475 sendUrl.SetHostPort( tfp->pDataServer->GetHostName(),tfp->pDataServer->GetPort() );
3476 sendUrl.SetUserName( tfp->pDataServer->GetUserName() );
3477 msg->SetSessionId( tfp->pSessionId );
3478 memcpy( req->fhtemplt, tfp->pFileHandle, sizeof(req->fhtemplt) );
3479
3480 if( !Utils::HasKSameFS( sendUrl ) )
3481 return XRootDStatus( stError, errNotSupported );
3482
3483 return XRootDStatus();
3484 }
3485
3486 //------------------------------------------------------------------------
3487 // Clone file ranges into current file
3488 //------------------------------------------------------------------------
3489 XRootDStatus FileStateHandler::Clone(std::shared_ptr<FileStateHandler> &self,
3490 const CloneLocations &locs,
3491 ResponseHandler *handler,
3492 time_t timeout )
3493 {
3494 XrdSysMutexHelper scopedLock( self->pMutex );
3495
3496 if( self->pFileState == Error ) return self->pStatus;
3497
3498 if( self->pFileState != Opened && self->pFileState != Recovering )
3500
3501 if( !Utils::HasKSameFS( *self->pDataServer ) )
3503
3504 Log *log = DefaultEnv::GetLog();
3505 log->Debug( FileMsg, "[%p@%s] Sending a clone command for handle %#x to %s",
3506 self.get(), self->pFileUrl->GetURL().c_str(),
3507 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3508
3509 Message *msg;
3510 ClientReadRequest *req;
3511
3512 size_t nrange = locs.locations.size();
3513
3514 MessageUtils::CreateRequest( msg, req, sizeof(XrdProto::clone_list)*nrange );
3515
3516 req->requestid = kXR_clone;
3517 req->dlen = sizeof(XrdProto::clone_list)*nrange;
3518 memcpy( req->fhandle, self->pFileHandle, 4 );
3519
3521 int idx=0;
3522 for(auto &loc: locs.locations)
3523 {
3524 if( !loc.file )
3525 return XRootDStatus( stError, errInvalidOp, 0,
3526 "Template file not available" );
3527
3528 FileStateHandlerTemplate *fht = dynamic_cast<FileStateHandlerTemplate*>(loc.file.get());
3529 if( !fht )
3530 return XRootDStatus( stError, errInvalidOp, 0,
3531 "Template file invalid" );
3532
3533 std::shared_ptr<FileStateHandler> tfp = fht->pTemplateFileWp.lock();
3534 if( !tfp )
3535 return XRootDStatus( stError, errInvalidOp, 0,
3536 "Template file object does not exist" );
3537
3538 XrdSysMutexHelper scopedLock( tfp->pMutex );
3539 if( tfp->pFileState != Opened )
3540 return XRootDStatus( stError, errInvalidOp, 0,
3541 "Template file not open" );
3542
3543 if( tfp->pSessionId != self->pSessionId )
3544 return XRootDStatus( stError, errInvalidOp, 0,
3545 "Clone source not at same location as destination" );
3546
3547 memcpy( cl[idx].srcFH, tfp->pFileHandle, 4 );
3548 cl[idx].srcOffs = loc.srcOffs;
3549 cl[idx].srcLen = loc.srcLen;
3550 cl[idx].dstOffs = loc.dstOffs;
3551 ++idx;
3552 }
3553
3555 MessageSendParams params;
3556 params.timeout = timeout;
3557 params.followRedirects = false;
3558 params.stateful = true;
3560 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3561
3562 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3563 }
3564}
kXR_unt16 requestid
Definition XProtocol.hh:511
kXR_unt16 requestid
Definition XProtocol.hh:666
kXR_unt16 requestid
Definition XProtocol.hh:847
@ kXR_fattrDel
Definition XProtocol.hh:300
@ kXR_fattrSet
Definition XProtocol.hh:303
@ kXR_fattrList
Definition XProtocol.hh:302
@ kXR_fattrGet
Definition XProtocol.hh:301
#define kXR_suppgrw
kXR_char fhandle[4]
Definition XProtocol.hh:565
kXR_char fhandle[4]
Definition XProtocol.hh:823
struct ClientPgReadRequest pgread
Definition XProtocol.hh:903
kXR_char fhandle[4]
Definition XProtocol.hh:848
kXR_char fhandle[4]
Definition XProtocol.hh:812
kXR_unt16 requestid
Definition XProtocol.hh:680
@ kXR_virtReadv
Definition XProtocol.hh:152
kXR_char fhtemplt[4]
Definition XProtocol.hh:516
kXR_unt16 options
Definition XProtocol.hh:513
static const int kXR_ckpXeq
Definition XProtocol.hh:218
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:904
kXR_unt16 requestid
Definition XProtocol.hh:257
@ kXR_async
Definition XProtocol.hh:488
@ kXR_open_apnd
Definition XProtocol.hh:492
@ kXR_retstat
Definition XProtocol.hh:493
struct ClientRequestHdr header
Definition XProtocol.hh:887
kXR_char fhandle[4]
Definition XProtocol.hh:543
#define kXR_recoverWrts
kXR_unt16 optiont
Definition XProtocol.hh:514
kXR_char fhandle[4]
Definition XProtocol.hh:681
kXR_char fhandle[4]
Definition XProtocol.hh:258
kXR_unt16 requestid
Definition XProtocol.hh:159
kXR_char fhandle[4]
Definition XProtocol.hh:669
@ kXR_read
Definition XProtocol.hh:126
@ kXR_open
Definition XProtocol.hh:123
@ kXR_writev
Definition XProtocol.hh:144
@ kXR_clone
Definition XProtocol.hh:145
@ kXR_readv
Definition XProtocol.hh:138
@ kXR_sync
Definition XProtocol.hh:129
@ kXR_fattr
Definition XProtocol.hh:133
@ kXR_query
Definition XProtocol.hh:114
@ kXR_write
Definition XProtocol.hh:132
@ kXR_truncate
Definition XProtocol.hh:141
@ kXR_stat
Definition XProtocol.hh:130
@ kXR_pgread
Definition XProtocol.hh:143
@ kXR_chkpoint
Definition XProtocol.hh:125
@ kXR_close
Definition XProtocol.hh:116
@ kXR_pgwrite
Definition XProtocol.hh:139
struct ClientReadRequest read
Definition XProtocol.hh:909
kXR_int32 rlen
Definition XProtocol.hh:696
kXR_unt16 requestid
Definition XProtocol.hh:808
kXR_unt16 requestid
Definition XProtocol.hh:822
kXR_int64 offset
Definition XProtocol.hh:697
#define kXR_PROTPGRWVERSION
Definition XProtocol.hh:73
@ kXR_dup
Definition XProtocol.hh:503
@ kXR_samefs
Definition XProtocol.hh:504
struct ClientWriteRequest write
Definition XProtocol.hh:918
kXR_unt16 requestid
Definition XProtocol.hh:706
@ kXR_Qvisa
Definition XProtocol.hh:656
unsigned char kXR_char
Definition XPtypes.hh:65
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, time_t timeout)
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
An interface for file plug-ins.
std::weak_ptr< FileStateHandler > pTemplateFileWp
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, time_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, time_t timeout)
Try other data server.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Clone(std::shared_ptr< FileStateHandler > &self, const CloneLocations &locs, ResponseHandler *handler, time_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, time_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, time_t timeout=0)
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Opened
Opening has succeeded.
@ Recovering
Recovering from an error.
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus OpenUsingTemplate(std::shared_ptr< FileStateHandler > &self, ExportedFileTemplate *templ, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PreRead(std::shared_ptr< FileStateHandler > &self, const TractList &tracts, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, time_t timeout=0)
bool IsOpen() const
Check if the file is open.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes).
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
void SetHostPort(const std::string &hostName, int port)
Definition XrdClURL.hh:206
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition XrdClURL.cc:331
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition XrdClURL.cc:498
void SetPath(const std::string &path)
Set the path.
Definition XrdClURL.hh:225
bool IsLocalFile() const
Definition XrdClURL.cc:474
void SetUserName(const std::string &userName)
Set the username.
Definition XrdClURL.hh:143
static bool HasKSameFS(const XrdCl::URL &url)
Check if given server supports kXR_clone and kXR_samefs.
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint64_t FileMsg
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
std::vector< TractInfo > TractList
List of Tracts.
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errNotSupported
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
T & To(AnyObject &any)
const uint16_t errRedirect
const uint16_t errSocketDisconnected
none object for initializing empty Optional
XrdSysError Log
Definition XrdConfig.cc:113
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition XProtocol.hh:328
kXR_char fhandle[4]
Definition XProtocol.hh:318
kXR_unt16 requestid
Definition XProtocol.hh:317
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< CloneLocation > locations
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
uint16_t oFlags2
OpenFlags upper 16 bits.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
Open flags, may be or'd when appropriate.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Samefs
Open file on the same filesystem as another.
@ Update
Open for reading and writing.
@ Dup
Open file duplicating content from another.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
Code
XRootD query request codes.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted