91 pipelines( std::move( obj.pipelines ) ),
92 policy( std::move( obj.policy ) )
103 template<
class Container>
106 static_assert( !HasHndl,
"Constructor is available only operation without handler");
108 pipelines.reserve( container.size() );
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter( pipelines ) );
124 std::ostringstream oss;
126 for(
size_t i = 0; i < pipelines.size(); i++ )
128 oss << pipelines[i]->ToString();
129 if( i + 1 != pipelines.size() )
146 policy.reset(
new AllPolicy() );
147 return std::move( *
this );
158 policy.reset(
new AnyPolicy( pipelines.size() ) );
159 return std::move( *
this );
170 policy.reset(
new SomePolicy( pipelines.size(), threshold ) );
171 return std::move( *
this );
183 policy.reset(
new AtLeastPolicy( pipelines.size(), threshold ) );
184 return std::move( *
this );
199 std::unique_lock<std::mutex> lck(resMtx);
203 if( status.
IsOK() )
return false;
210 std::unique_lock<std::mutex> lck(resMtx);
224 struct AnyPolicy :
public PolicyExecutor
226 AnyPolicy(
size_t size) : cnt( size )
230 bool Examine(
const XrdCl::XRootDStatus &status )
232 std::unique_lock<std::mutex> lck(resMtx);
239 if( status.
IsOK() )
return true;
241 if( cnt == 0 )
return true;
248 std::unique_lock<std::mutex> lck(resMtx);
264 struct SomePolicy : PolicyExecutor
266 SomePolicy(
size_t size,
size_t threshold ) : failed( 0 ), succeeded( 0 ),
267 threshold( threshold ), size( size )
271 bool Examine(
const XrdCl::XRootDStatus &status )
273 std::unique_lock<std::mutex> resMtx;
280 if( succeeded == threshold )
return true;
286 if( failed > size - threshold )
return true;
293 std::unique_lock<std::mutex> lck(resMtx);
301 const size_t threshold;
313 struct AtLeastPolicy : PolicyExecutor
315 AtLeastPolicy(
size_t size,
size_t threshold ) : pending_cnt( size ),
317 failed_threshold( size - threshold )
326 bool Examine(
const XrdCl::XRootDStatus &status )
328 std::unique_lock<std::mutex> lck(resMtx);
330 if (!status.
IsOK()) {
332 if (failed_cnt > failed_threshold) {
339 return pending_cnt == 0;
344 std::unique_lock<std::mutex> lck(resMtx);
352 const size_t failed_threshold;
361 barrier_t() : on( true ) { }
365 std::unique_lock<std::mutex> lck( mtx );
366 if( on ) cv.wait( lck );
371 std::unique_lock<std::mutex> lck( mtx );
377 std::condition_variable cv;
395 Ctx(
PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ),
405 Handle( XRootDStatus() );
414 inline void Examine(
const XRootDStatus &st )
416 if( policy->Examine( st ) )
417 Handle( policy->Result() );
426 inline void Handle(
const XRootDStatus &st )
428 PipelineHandler* hdlr = handler.exchange(
nullptr, std::memory_order_relaxed );
432 hdlr->HandleResponse(
new XRootDStatus( st ),
nullptr );
439 std::atomic<PipelineHandler*> handler;
444 std::unique_ptr<PolicyExecutor> policy;
456 struct PipelineEnd :
public Job
461 PipelineEnd( std::shared_ptr<Ctx> &ctx,
462 const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
476 std::shared_ptr<Ctx> ctx;
477 XrdCl::XRootDStatus st;
484 void Schedule( std::shared_ptr<Ctx> &ctx,
const XrdCl::XRootDStatus &st)
487 PipelineEnd *end =
new PipelineEnd( ctx, st );
501 if( !policy ) policy.reset(
new AllPolicy() );
503 std::shared_ptr<Ctx> ctx =
504 std::make_shared<Ctx>(
handler, policy.release() );
507 pipelineTimeout : this->
timeout;
509 for(
size_t i = 0; i < pipelines.size(); ++i )
511 if( !pipelines[i] )
continue;
513 [ctx](
const XRootDStatus &st )
mutable { Schedule( ctx, st ); } );
517 return XRootDStatus();
520 std::vector<Pipeline> pipelines;
521 std::unique_ptr<PolicyExecutor> policy;
JobManager * GetJobManager()
Get the job manager object user by the post master.