XRootD
Loading...
Searching...
No Matches
XrdClParallelOperation.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4// Michal Simon <michal.simon@cern.ch>
5//------------------------------------------------------------------------------
6// This file is part of the XRootD software suite.
7//
8// XRootD is free software: you can redistribute it and/or modify
9// it under the terms of the GNU Lesser General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// XRootD is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public License
19// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20//
21// In applying this licence, CERN does not waive the privileges and immunities
22// granted to it by virtue of its status as an Intergovernmental Organization
23// or submit itself to any jurisdiction.
24//------------------------------------------------------------------------------
25
26#ifndef __XRD_CL_PARALLELOPERATION_HH__
27#define __XRD_CL_PARALLELOPERATION_HH__
28
34
35#include <atomic>
36#include <condition_variable>
37#include <mutex>
38
39namespace XrdCl
40{
41
42 //----------------------------------------------------------------------------
43 // Interface for different execution policies:
44 // - all : all operations need to succeed in order for the parallel
45 // operation to be successful
46 // - any : just one of the operations needs to succeed in order for
47 // the parallel operation to be successful
48 // - some : n (user defined) operations need to succeed in order for
49 // the parallel operation to be successful
50 // - at least : at least n (user defined) operations need to succeed in
51 // order for the parallel operation to be successful (the
52 // user handler will be called only when all operations are
53 // resolved)
54 //
55 // @param status : status returned by one of the aggregated operations
56 //
57 // @return : true if the status should be passed to the user handler,
58 // false otherwise.
59 //----------------------------------------------------------------------------
61 {
63 {
64 }
65
66 virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
67
68 virtual XRootDStatus Result() = 0;
69 };
70
71 //----------------------------------------------------------------------------
77 //----------------------------------------------------------------------------
78 template<bool HasHndl>
79 class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
80 {
81 template<bool> friend class ParallelOperation;
82
83 public:
84
85 //------------------------------------------------------------------------
87 //------------------------------------------------------------------------
88 template<bool from>
90 ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
91 pipelines( std::move( obj.pipelines ) ),
92 policy( std::move( obj.policy ) )
93 {
94 }
95
96 //------------------------------------------------------------------------
102 //------------------------------------------------------------------------
103 template<class Container>
104 ParallelOperation( Container &&container )
105 {
106 static_assert( !HasHndl, "Constructor is available only operation without handler");
107
108 pipelines.reserve( container.size() );
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter( pipelines ) );
112 container.clear(); // there's junk inside so we clear it
113 }
114
116 {
117 }
118
119 //------------------------------------------------------------------------
121 //------------------------------------------------------------------------
122 std::string ToString()
123 {
124 std::ostringstream oss;
125 oss << "Parallel(";
126 for( size_t i = 0; i < pipelines.size(); i++ )
127 {
128 oss << pipelines[i]->ToString();
129 if( i + 1 != pipelines.size() )
130 {
131 oss << " && ";
132 }
133 }
134 oss << ")";
135 return oss.str();
136 }
137
138 //------------------------------------------------------------------------
143 //------------------------------------------------------------------------
145 {
146 policy.reset( new AllPolicy() );
147 return std::move( *this );
148 }
149
150 //------------------------------------------------------------------------
155 //------------------------------------------------------------------------
157 {
158 policy.reset( new AnyPolicy( pipelines.size() ) );
159 return std::move( *this );
160 }
161
162 //------------------------------------------------------------------------
163 // Set policy to `Some`
167 //------------------------------------------------------------------------
169 {
170 policy.reset( new SomePolicy( pipelines.size(), threshold ) );
171 return std::move( *this );
172 }
173
174 //------------------------------------------------------------------------
180 //------------------------------------------------------------------------
182 {
183 policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
184 return std::move( *this );
185 }
186
187 private:
188
189 //------------------------------------------------------------------------
194 //------------------------------------------------------------------------
195 struct AllPolicy : public PolicyExecutor
196 {
197 bool Examine( const XrdCl::XRootDStatus &status )
198 {
199 std::unique_lock<std::mutex> lck(resMtx);
200
201 // keep the status in case this is the final result
202 res = status;
203 if( status.IsOK() ) return false;
204 // we require all request to succeed
205 return true;
206 }
207
208 XRootDStatus Result()
209 {
210 std::unique_lock<std::mutex> lck(resMtx);
211 return res;
212 }
213
214 std::mutex resMtx;
215 XRootDStatus res;
216 };
217
218 //------------------------------------------------------------------------
223 //------------------------------------------------------------------------
224 struct AnyPolicy : public PolicyExecutor
225 {
226 AnyPolicy( size_t size) : cnt( size )
227 {
228 }
229
230 bool Examine( const XrdCl::XRootDStatus &status )
231 {
232 std::unique_lock<std::mutex> lck(resMtx);
233
234 // keep the status in case this is the final result
235 res = status;
236 // decrement the counter
237 --cnt;
238 // we require just one operation to be successful
239 if( status.IsOK() ) return true;
240 // lets see if this is the last one?
241 if( cnt == 0 ) return true;
242 // we still have a chance there will be one that is successful
243 return false;
244 }
245
246 XRootDStatus Result()
247 {
248 std::unique_lock<std::mutex> lck(resMtx);
249 return res;
250 }
251
252 private:
253 std::mutex resMtx;
254 size_t cnt;
255 XRootDStatus res;
256 };
257
258 //------------------------------------------------------------------------
263 //------------------------------------------------------------------------
264 struct SomePolicy : PolicyExecutor
265 {
266 SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ),
267 threshold( threshold ), size( size )
268 {
269 }
270
271 bool Examine( const XrdCl::XRootDStatus &status )
272 {
273 std::unique_lock<std::mutex> resMtx;
274
275 // keep the status in case this is the final result
276 res = status;
277 if( status.IsOK() )
278 {
279 ++succeeded;
280 if( succeeded == threshold ) return true; // we reached the threshold
281 // we are not yet there
282 return false;
283 }
284 ++failed;
285 // did we drop below the threshold
286 if( failed > size - threshold ) return true;
287 // we still have a chance there will be enough of successful operations
288 return false;
289 }
290
291 XRootDStatus Result()
292 {
293 std::unique_lock<std::mutex> lck(resMtx);
294 return res;
295 }
296
297 private:
298 std::mutex resMtx;
299 size_t failed;
300 size_t succeeded;
301 const size_t threshold;
302 const size_t size;
303 XRootDStatus res;
304 };
305
306 //------------------------------------------------------------------------
312 //------------------------------------------------------------------------
313 struct AtLeastPolicy : PolicyExecutor
314 {
315 AtLeastPolicy( size_t size, size_t threshold ) : pending_cnt( size ),
316 failed_cnt( 0 ),
317 failed_threshold( size - threshold )
318 {
319 }
320
321 //----------------------------------------------------------------------
325 //----------------------------------------------------------------------
326 bool Examine( const XrdCl::XRootDStatus &status )
327 {
328 std::unique_lock<std::mutex> lck(resMtx);
329
330 if (!status.IsOK()) {
331 ++failed_cnt;
332 if (failed_cnt > failed_threshold) {
333 res = status;
334 return true;
335 }
336 }
337
338 --pending_cnt;
339 return pending_cnt == 0;
340 }
341
342 XRootDStatus Result()
343 {
344 std::unique_lock<std::mutex> lck(resMtx);
345 return res;
346 }
347
348 private:
349 std::mutex resMtx;
350 size_t pending_cnt;
351 size_t failed_cnt;
352 const size_t failed_threshold;
353 XRootDStatus res;
354 };
355
356 //------------------------------------------------------------------------
358 //------------------------------------------------------------------------
359 struct barrier_t
360 {
361 barrier_t() : on( true ) { }
362
363 void wait()
364 {
365 std::unique_lock<std::mutex> lck( mtx );
366 if( on ) cv.wait( lck );
367 }
368
369 void lift()
370 {
371 std::unique_lock<std::mutex> lck( mtx );
372 on = false;
373 cv.notify_all();
374 }
375
376 private:
377 std::condition_variable cv;
378 std::mutex mtx;
379 bool on;
380 };
381
382 //------------------------------------------------------------------------
387 //------------------------------------------------------------------------
388 struct Ctx
389 {
390 //----------------------------------------------------------------------
394 //----------------------------------------------------------------------
395 Ctx( PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ),
396 policy( policy )
397 {
398 }
399
400 //----------------------------------------------------------------------
402 //----------------------------------------------------------------------
403 ~Ctx()
404 {
405 Handle( XRootDStatus() );
406 }
407
408 //----------------------------------------------------------------------
413 //----------------------------------------------------------------------
414 inline void Examine( const XRootDStatus &st )
415 {
416 if( policy->Examine( st ) )
417 Handle( policy->Result() );
418 }
419
420 //----------------------------------------------------------------------
425 //---------------------------------------------------------------------
426 inline void Handle( const XRootDStatus &st )
427 {
428 PipelineHandler* hdlr = handler.exchange( nullptr, std::memory_order_relaxed );
429 if( hdlr )
430 {
431 barrier.wait();
432 hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
433 }
434 }
435
436 //----------------------------------------------------------------------
438 //----------------------------------------------------------------------
439 std::atomic<PipelineHandler*> handler;
440
441 //----------------------------------------------------------------------
443 //----------------------------------------------------------------------
444 std::unique_ptr<PolicyExecutor> policy;
445
446 //----------------------------------------------------------------------
449 //----------------------------------------------------------------------
450 barrier_t barrier;
451 };
452
453 //------------------------------------------------------------------------
455 //------------------------------------------------------------------------
456 struct PipelineEnd : public Job
457 {
458 //----------------------------------------------------------------------
459 // Constructor
460 //----------------------------------------------------------------------
461 PipelineEnd( std::shared_ptr<Ctx> &ctx,
462 const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
463 {
464 }
465
466 //----------------------------------------------------------------------
467 // Run Ctx::Examine in the thread-pool
468 //----------------------------------------------------------------------
469 void Run( void* )
470 {
471 ctx->Examine( st );
472 delete this;
473 }
474
475 private:
476 std::shared_ptr<Ctx> ctx; //< ParallelOperaion context
477 XrdCl::XRootDStatus st; //< final status of the ParallelOperation
478 };
479
480 //------------------------------------------------------------------------
482 //------------------------------------------------------------------------
483 inline static
484 void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st)
485 {
486 XrdCl::JobManager *mgr = XrdCl::DefaultEnv::GetPostMaster()->GetJobManager();
487 PipelineEnd *end = new PipelineEnd( ctx, st );
488 mgr->QueueJob( end, nullptr );
489 }
490
491 //------------------------------------------------------------------------
497 //------------------------------------------------------------------------
498 XRootDStatus RunImpl( PipelineHandler *handler, time_t pipelineTimeout )
499 {
500 // make sure we have a valid policy for the parallel operation
501 if( !policy ) policy.reset( new AllPolicy() );
502
503 std::shared_ptr<Ctx> ctx =
504 std::make_shared<Ctx>( handler, policy.release() );
505
506 time_t timeout = pipelineTimeout < this->timeout ?
507 pipelineTimeout : this->timeout;
508
509 for( size_t i = 0; i < pipelines.size(); ++i )
510 {
511 if( !pipelines[i] ) continue;
512 pipelines[i].Run( timeout,
513 [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } );
514 }
515
516 ctx->barrier.lift();
517 return XRootDStatus();
518 }
519
520 std::vector<Pipeline> pipelines;
521 std::unique_ptr<PolicyExecutor> policy;
522 };
523
524 //----------------------------------------------------------------------------
526 //----------------------------------------------------------------------------
527 template<class Container>
528 inline ParallelOperation<false> Parallel( Container &&container )
529 {
530 return ParallelOperation<false>( container );
531 }
532
533 //----------------------------------------------------------------------------
535 //----------------------------------------------------------------------------
536 inline void PipesToVec( std::vector<Pipeline>& )
537 {
538 // base case
539 }
540
541 //----------------------------------------------------------------------------
542 // Declare PipesToVec (we need to do declare those functions ahead of
543 // definitions, as they may call each other.
544 //----------------------------------------------------------------------------
545 template<typename ... Others>
546 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
547 Others&... others );
548
549 template<typename ... Others>
550 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
551 Others&... others );
552
553 template<typename ... Others>
554 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
555 Others&... others );
556
557 //----------------------------------------------------------------------------
558 // Define PipesToVec
559 //----------------------------------------------------------------------------
560 template<typename ... Others>
561 void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
562 Others&... others )
563 {
564 v.emplace_back( operation );
565 PipesToVec( v, others... );
566 }
567
568 template<typename ... Others>
569 void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
570 Others&... others )
571 {
572 v.emplace_back( operation );
573 PipesToVec( v, others... );
574 }
575
576 template<typename ... Others>
577 void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
578 Others&... others )
579 {
580 v.emplace_back( std::move( pipeline ) );
581 PipesToVec( v, others... );
582 }
583
584 //----------------------------------------------------------------------------
589 //----------------------------------------------------------------------------
590 template<typename ... Operations>
591 inline ParallelOperation<false> Parallel( Operations&& ... operations )
592 {
593 constexpr size_t size = sizeof...( operations );
594 std::vector<Pipeline> v;
595 v.reserve( size );
596 PipesToVec( v, operations... );
597 return Parallel( v );
598 }
599}
600
601#endif // __XRD_CL_OPERATIONS_HH__
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
friend class PipelineHandler
std::unique_ptr< PipelineHandler > handler
Operation handler.
ParallelOperation< HasHndl > Some(size_t threshold)
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
ParallelOperation(Container &&container)
ParallelOperation< HasHndl > All()
ParallelOperation< HasHndl > Any()
ParallelOperation< HasHndl > AtLeast(size_t threshold)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
bool IsOK() const
We're fine.