36#include <unordered_set>
66 env->
GetInt(
"WorkerThreads", workerThreads );
102 typedef std::map<std::string, std::shared_ptr<Channel> >
ChannelMap;
145 env->
GetString(
"PollerPreference", pollerPref );
149 if( !pImpl->pPoller )
152 bool st = pImpl->pPoller->Initialize();
156 delete pImpl->pPoller;
160 pImpl->pJobManager->Initialize();
161 pImpl->pInitialized =
true;
173 if( !pImpl->pInitialized )
176 pImpl->pInitialized =
false;
177 pImpl->pJobManager->Finalize();
184 auto finSet = pImpl->pFinalizeSet;
185 for(
auto ch: finSet ) ch->Finalize();
187 pImpl->pChannelMap.clear();
188 return pImpl->pPoller->Finalize();
196 if( !pImpl->pInitialized )
199 if( !pImpl->pPoller->Start() )
202 if( !pImpl->pTaskManager->Start() )
204 pImpl->pPoller->Stop();
208 if( !pImpl->pJobManager->Start() )
210 pImpl->pPoller->Stop();
211 pImpl->pTaskManager->Stop();
215 pImpl->pRunning =
true;
224 if( !pImpl->pInitialized || !pImpl->pRunning )
227 if( !pImpl->pJobManager->Stop() )
229 if( !pImpl->pPoller->Stop() )
231 if( !pImpl->pTaskManager->Stop() )
233 pImpl->pRunning =
false;
254 auto channel = pImpl->GetChannel( url );
259 return channel->Send( msg, handler, stateful, expires );
280 std::shared_ptr<Channel> channel;
283 PostMasterImpl::ChannelMap::iterator it =
285 if( it == pImpl->pChannelMap.end() )
287 channel = it->second;
293 return channel->QueryTransport( query, result );
302 auto channel = pImpl->GetChannel( url );
307 channel->RegisterEventHandler( handler );
317 auto channel = pImpl->GetChannel( url );
322 channel->RemoveEventHandler( handler );
331 return pImpl->pTaskManager;
339 return pImpl->pJobManager;
355 std::shared_ptr<Channel> channel;
358 PostMasterImpl::ChannelMap::iterator it =
361 if( it == pImpl->pChannelMap.end() )
363 channel = it->second;
364 pImpl->pChannelMap.erase( it );
367 channel->ForceDisconnect( hush );
375 const uint64_t sess )
382 PostMasterImpl::ChannelMap::iterator it =
383 pImpl->pChannelMap.find( channel->GetURL().GetChannelId() );
385 if( it != pImpl->pChannelMap.end() && it->second == channel )
386 pImpl->pChannelMap.erase( it );
389 channel->ForceDisconnect( channel, sess );
395 std::shared_ptr<Channel> channel;
398 PostMasterImpl::ChannelMap::iterator it =
401 if( it == pImpl->pChannelMap.end() )
403 channel = it->second;
406 channel->ForceReconnect();
415 auto channel = pImpl->GetChannel( url );
416 if( !channel )
return 0;
417 return channel->NbConnectedStrm();
424 std::shared_ptr<Job> onConnJob )
426 auto channel = pImpl->GetChannel( url );
427 if( !channel )
return;
428 channel->SetOnDataConnectHandler( onConnJob );
437 pImpl->pOnConnJob = std::move( onConnJob );
446 pImpl->pOnConnErrCB = std::move( handler );
455 if( pImpl->pOnConnJob )
457 URL *ptr =
new URL( url );
458 pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
468 if( pImpl->pOnConnErrCB )
471 pImpl->pJobManager->QueueJob( job,
nullptr );
484 std::shared_ptr<Channel> passive;
485 PostMasterImpl::ChannelMap::iterator it =
487 if( it != pImpl->pChannelMap.end() )
488 passive = it->second;
493 if( !passive )
return;
501 if( !passive->CanCollapse( url ) )
return;
503 scopedLock.
Lock( &pImpl->pChannelMapMutex );
505 if( it == pImpl->pChannelMap.end() || it->second != passive )
531 std::shared_ptr<Channel> active(
new Channel{ alias,
532 pImpl->pPoller, trHandler, pImpl->pTaskManager, pImpl->pJobManager, url },
533 [
this](
Channel *ch) { this->pImpl->removeFinalize( ch );
delete ch; });
534 pImpl->addFinalize( active.get() );
535 active->SetSelf( active );
548 auto channel = pImpl->GetChannel( url );
550 if( !channel )
return;
552 return channel->DecFileInstCnt();
560 return pImpl->pRunning;
569 std::shared_ptr<Channel> channel;
590 channel->SetSelf( channel );
595 channel = it->second;
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
Interface for a job to be run by the job manager.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Info(uint64_t topic, const char *format,...)
Print an info.
The message representation used throughout the system.
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
std::string GetChannelId() const
std::string GetHostId() const
Get the host part of the URL (user:password@host:port).
const std::string & GetProtocol() const
Get the protocol.
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
void Lock(XrdSysMutex *Mutex)
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
const char *const DefaultPollerPreference
const uint16_t errNotSupported
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
TaskManager * pTaskManager
std::map< std::string, std::shared_ptr< Channel > > ChannelMap
XrdSysMutex pFinalizeSetMutex
std::unique_ptr< Job > pOnConnJob
XrdSysMutex pChannelMapMutex
void addFinalize(Channel *ch)
Used to maintain a non-owning set of live Channels. Used by Finalize.
std::unordered_set< Channel * > pFinalizeSet
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
void removeFinalize(Channel *ch)
Used to maintain a non-owning set of live Channels. Used by Finalize.
std::shared_ptr< Channel > GetChannel(const URL &url)
Get a channel for url, creating one if needed.
Procedure execution status.