XRootD
Loading...
Searching...
No Matches
XrdSys::IOEvents::Poller Class Referenceabstract

#include <XrdSysIOEvents.hh>

Inheritance diagram for XrdSys::IOEvents::Poller:
Collaboration diagram for XrdSys::IOEvents::Poller:

Classes

struct  PipeData

Public Types

enum  CreateOpts { optTOM }

Public Member Functions

 Poller (int cFD, int rFD)
virtual ~Poller ()
 Destructor. Stop() is effecively called when this object is deleted.
void Stop ()

Static Public Member Functions

static PollerCreate (int &eNum, const char **eTxt=0, int crOpts=0)

Protected Member Functions

virtual void Begin (XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
void CbkTMO ()
bool CbkXeq (Channel *cP, int events, int eNum, const char *eTxt)
 CPP_ATOMIC_TYPE (bool) wakePend
virtual void Exclude (Channel *cP, bool &isLocked, bool dover=1)=0
int GetFault (Channel *cP)
int GetPollEnt (Channel *cP)
int GetRequest ()
virtual bool Include (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
bool Init (Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
void LockChannel (Channel *cP)
virtual bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
int Poll2Enum (short events)
int SendCmd (PipeData &cmd)
void SetPollEnt (Channel *cP, int ptEnt)
virtual void Shutdown ()=0
bool TmoAdd (Channel *cP, int tmoSet)
void TmoDel (Channel *cP)
int TmoGet ()
void UnLockChannel (Channel *cP)

Protected Attributes

ChannelattBase
bool chDead
int cmdFD
int pipeBlen
char * pipeBuff
struct pollfd pipePoll
pthread_t pollTid
PipeData reqBuff
int reqFD
ChanneltmoBase
unsigned char tmoMask

Static Protected Attributes

static time_t maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
static pid_t parentPID = getpid()

Friends

class BootStrap
class Channel

Detailed Description

Define a poller object interface. A poller fields and dispatches event callbacks. An actual instance of a poller object is obtained by using the Create() method. You cannot simply create an instance of this object using new or in-place declaration since it is abstract. Any number of these objects may created. Each creation spawns a polling thread.

Definition at line 371 of file XrdSysIOEvents.hh.

Member Enumeration Documentation

◆ CreateOpts

Create a specialized instance of a poller object, initialize it, and start the polling process. You must call Create() to obtain a specialized poller.

Parameters
eNumPlace where errno is placed upon failure.
eTxtPlace where a pointer to the description of the failing operation is to be set. If null, no description is returned.
crOptsPoller options (see static const optxxx): optTOM - Timeout resumption after a timeout event must be manually reenabled. By default, event timeouts are automatically renabled after successful callbacks.
Returns
!0 Poller successfully created and started. eNum contains zero. eTxt if not null contains a null string. The returned value is a pointer to the Poller object. 0 Poller could not be created. eNum contains the associated errno value. eTxt if not null contains the failing operation.
Enumerator
optTOM 

Definition at line 398 of file XrdSysIOEvents.hh.

Constructor & Destructor Documentation

◆ Poller()

XrdSys::IOEvents::Poller::Poller ( int cFD,
int rFD )

Constructor

Parameters
cFDThe file descriptor to send commands to the poll thread.
rFDThe file descriptor to recv commands in the poll thread.

Definition at line 572 of file XrdSysIOEvents.cc.

573{
574
575// Now initialize local class members
576//
577 attBase = 0;
578 tmoBase = 0;
579 cmdFD = cFD;
580 reqFD = rFD;
581 wakePend = false;
582 pipeBuff = 0;
583 pipeBlen = 0;
584 pipePoll.fd = rFD;
585 pipePoll.events = POLLIN | POLLRDNORM;
586 tmoMask = 255;
587}

References attBase, cmdFD, pipeBlen, pipeBuff, pipePoll, reqFD, tmoBase, and tmoMask.

Referenced by XrdSys::IOEvents::PollE::PollE(), XrdSys::IOEvents::PollerErr1::PollerErr1(), XrdSys::IOEvents::PollerInit::PollerInit(), XrdSys::IOEvents::PollerWait::PollerWait(), XrdSys::IOEvents::PollKQ::PollKQ(), XrdSys::IOEvents::PollPoll::PollPoll(), and XrdSys::IOEvents::PollPort::PollPort().

Here is the caller graph for this function:

◆ ~Poller()

virtual XrdSys::IOEvents::Poller::~Poller ( )
inlinevirtual

Destructor. Stop() is effecively called when this object is deleted.

Definition at line 430 of file XrdSysIOEvents.hh.

430{}

Member Function Documentation

◆ Begin()

virtual void XrdSys::IOEvents::Poller::Begin ( XrdSysSemaphore * syncp,
int & rc,
const char ** eTxt )
protectedpure virtual

Start the polling event loop. An implementation must be supplied. Begin() is called via the internal BootStrap class from a new thread.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

Referenced by XrdSys::IOEvents::BootStrap::Start().

Here is the caller graph for this function:

◆ CbkTMO()

void XrdSys::IOEvents::Poller::CbkTMO ( )
protected

Definition at line 615 of file XrdSysIOEvents.cc.

616{
617 Channel *cP;
618
619// Process each element in the timeout queue, calling the callback function
620// if the timeout has passed. As this method can be called with a lock on the
621// channel mutex, we need to drop it prior to calling the callback.
622//
623 toMutex.Lock();
624 while((cP = tmoBase) && cP->deadLine <= time(0))
625 {int dlType = cP->dlType;
626 toMutex.UnLock();
627 CbkXeq(cP, dlType, 0, 0);
628 toMutex.Lock();
629 }
630 toMutex.UnLock();
631}
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)

References CbkXeq(), Channel, and tmoBase.

Referenced by XrdSys::IOEvents::PollE::Begin(), XrdSys::IOEvents::PollKQ::Begin(), XrdSys::IOEvents::PollPoll::Begin(), XrdSys::IOEvents::PollPort::Begin(), and TmoGet().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ CbkXeq()

bool XrdSys::IOEvents::Poller::CbkXeq ( Channel * cP,
int events,
int eNum,
const char * eTxt )
protected

Definition at line 637 of file XrdSysIOEvents.cc.

639{
640 XrdSysMutexHelper cbkMHelp(cP->chMutex);
641 char oldEvents;
642 bool cbok, retval, isRead, isWrite, isLocked = true;
643
644// Perform any required tracing
645//
646 if (TRACING)
647 {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
648 (cP->chPoller == &pollInit ? "init" :
649 (cP->chPoller == &pollWait ? "wait" : "err")));
650 DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
651 <<" chev=" <<static_cast<int>(cP->chEvents)
652 <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
653 <<" callback " <<(cP->chCB ? "present" : "missing")
654 <<" poller=" <<cbtype);
655 }
656
657// Remove this from the timeout queue if there and reset the deadlines based
658// on the event we are reflecting. This separates read and write deadlines
659//
660 if (cP->inTOQ)
661 {TmoDel(cP);
662 cP->dlType |= (events & CallBack::ValidEvents) << 4;
663 isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
664 if (isRead) cP->rdDL = maxTime;
666 if (isWrite) cP->wrDL = maxTime;
667 } else {
668 cP->dlType &= CallBack::ValidEvents;
669 isRead = isWrite = false;
670 }
671
672// Verify that there is a callback here and the channel is ready. If not,
673// disable this channel for the events being refelcted unless the event is a
674// fatal error. In this case we need to abandon the channel since error events
675// may continue to be generated as we can't always disable them.
676//
677 if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
678 {if (eNum)
679 {cP->chPoller = &pollErr1; cP->chFault = eNum;
680 cP->inPSet = 0;
681 return false;
682 }
683 oldEvents = cP->chEvents;
684 cP->chEvents = 0;
685 retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
686 TRACE_MOD(CbkXeq,cP->chFD,0);
687 if (!isLocked) cP->chMutex.Lock();
688 cP->chEvents = oldEvents;
689 return true;
690 }
691
692// Resolve the problem where we get an error event but the channel wants them
693// presented as a read or write event. If neither is possible then defer the
694// error until the channel is enabled again.
695//
696 if (eNum)
697 {if (cP->chEvents & Channel::errorEvents)
698 {cP->chPoller = &pollErr1; cP->chFault = eNum;
699 cP->chStat = Channel::isCBMode;
700 chDead = false;
701 cbkMHelp.UnLock();
702 cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
703 if (chDead) return true;
704 cbkMHelp.Lock(&(cP->chMutex));
705 cP->inPSet = 0;
706 return false;
707 }
708 if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
709 else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
710 else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
711 return false;
712 }
713 }
714
715// Indicate that we are in callback mode then drop the channel lock and effect
716// the callback. This allows the callback to freely manage locks.
717//
718 cP->chStat = Channel::isCBMode;
719 chDead = false;
720 // Detach() may be called after unlocking the channel and would zero the
721 // callback pointer and argument. So keep a copy.
722 CallBack *cb = cP->chCB;
723 void *cba = cP->chCBA;
724 cbkMHelp.UnLock();
725 IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
726 cbok = cb->Event(cP,cba, events);
727 IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
728
729// If channel destroyed by the callback, bail really fast. Otherwise, regain
730// the channel lock.
731//
732 if (chDead) return true;
733 cbkMHelp.Lock(&(cP->chMutex));
734
735// If the channel is being destroyed; then another thread must have done so.
736// Tell it the callback has finished and just return.
737//
738 if (cP->chStat != Channel::isCBMode)
739 {if (cP->chStat == Channel::isDead)
740 {XrdSysSemaphore *theSem = (XrdSysSemaphore *)cP->chCBA;
741 // channel will be destroyed shortly after post, unlock mutex before
742 cbkMHelp.UnLock();
743 theSem->Post();
744 }
745 return true;
746 }
747 cP->chStat = Channel::isClear;
748
749// Handle enable or disable here. If we keep the channel enabled then reset
750// the timeout if it hasn't been handled via a call from the callback.
751//
752 if (!cbok) Detach(cP,isLocked,false);
753 else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
754 TmoAdd(cP, 0);
755
756// All done. While the mutex should not have been unlocked, we relock it if
757// it has to keep the mutex helper from croaking.
758//
759 if (!isLocked) cP->chMutex.Lock();
760 return true;
761}
#define IF_TRACE(x, fd, y)
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define WEVENTS(x)
#define TRACING(x)
Definition XrdTrace.hh:70
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ ValidEvents
Mask to test for valid events.
@ errorEvents
Error event non-r/w specific.
bool TmoAdd(Channel *cP, int tmoSet)

References BOOLNAME, CbkXeq(), chDead, DO_TRACE, XrdSys::IOEvents::Channel::errorEvents, XrdSys::IOEvents::CallBack::Event(), XrdSys::IOEvents::CallBack::Fatal(), IF_TRACE, XrdSysMutex::Lock(), XrdSysMutexHelper::Lock(), maxTime, Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollInit, XrdSys::IOEvents::pollWait, XrdSysSemaphore::Post(), XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REVENTS, TmoAdd(), TmoDel(), TRACE_MOD, TRACING, XrdSysMutexHelper::UnLock(), XrdSys::IOEvents::CallBack::ValidEvents, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

Referenced by CbkTMO(), and CbkXeq().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ CPP_ATOMIC_TYPE()

XrdSys::IOEvents::Poller::CPP_ATOMIC_TYPE ( bool )
protected

◆ Create()

XrdSys::IOEvents::Poller * XrdSys::IOEvents::Poller::Create ( int & eNum,
const char ** eTxt = 0,
int crOpts = 0 )
static

Definition at line 767 of file XrdSysIOEvents.cc.

770{
771 int fildes[2];
772 struct pollArg pArg;
773 pthread_t tid;
774
775// Create a pipe used to break the poll wait loop
776//
777 if (XrdSysFD_Pipe(fildes))
778 {eNum = errno;
779 if (eTxt) *eTxt = "creating poll pipe";
780 return 0;
781 }
782
783// Create an actual implementation of a poller
784//
785 if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
786 {close(fildes[0]);
787 close(fildes[1]);
788 return 0;
789 }
790
791// Now start a thread to handle this poller object
792//
794 (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
795 {if (eTxt) *eTxt = "creating poller thread"; return 0;}
796
797// Now wait for the thread to finish initializing before we allow use
798// Note that the bootstrap takes ownership of the semaphore and will delete it
799// once the thread positing the semaphore actually ends. This is to avoid
800// semaphore bugs present in certain (e.g. Linux) kernels.
801//
802 pArg.pollSync->Wait();
803
804// Check if all went well
805//
806 if (pArg.retCode)
807 {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
808 eNum = pArg.retCode;
809 delete pArg.pollP;
810 return 0;
811 }
812
813// Set creation options in the new poller
814//
815 if (crOpts & optTOM)
816 pArg.pollP->tmoMask = ~(CallBack::ReadTimeOut|CallBack::WriteTimeOut);
817
818// All done
819//
820 eNum = 0;
821 if (eTxt) *eTxt = "";
822 return pArg.pollP;
823}
#define close(a)
Definition XrdPosix.hh:48
#define XRDSYSTHREAD_BIND
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void * Start(void *parg)

References close, optTOM, XrdSys::IOEvents::pollArg::pollP, XrdSys::IOEvents::pollArg::pollSync, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::pollArg::retCode, XrdSys::IOEvents::pollArg::retMsg, XrdSysThread::Run(), XrdSys::IOEvents::BootStrap::Start(), tmoMask, XrdSysSemaphore::Wait(), XrdSys::IOEvents::CallBack::WriteTimeOut, and XRDSYSTHREAD_BIND.

Referenced by XrdCl::PollerBuiltIn::Start().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Exclude()

virtual void XrdSys::IOEvents::Poller::Exclude ( Channel * cP,
bool & isLocked,
bool dover = 1 )
protectedpure virtual

Remove a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

References Channel.

Here is the call graph for this function:

◆ GetFault()

int XrdSys::IOEvents::Poller::GetFault ( Channel * cP)
inlineprotected

Definition at line 437 of file XrdSysIOEvents.hh.

437{return cP->chFault;}

References Channel.

Referenced by XrdSys::IOEvents::PollerErr1::Include(), and XrdSys::IOEvents::PollerErr1::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetPollEnt()

int XrdSys::IOEvents::Poller::GetPollEnt ( Channel * cP)
inlineprotected

Definition at line 438 of file XrdSysIOEvents.hh.

438{return cP->pollEnt;}

References Channel.

Referenced by XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Exclude(), XrdSys::IOEvents::PollPoll::Include(), XrdSys::IOEvents::PollKQ::Modify(), and XrdSys::IOEvents::PollPoll::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetRequest()

int XrdSys::IOEvents::Poller::GetRequest ( )
protected

Definition at line 875 of file XrdSysIOEvents.cc.

876{
877 ssize_t rlen;
878 int rc;
879
880// See if we are to resume a read or start a fresh one
881//
882 if (!pipeBlen)
883 {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
884
885// Wait for the next request. Some OS's (like Linux) don't support non-blocking
886// pipes. So, we must front the read with a poll.
887//
888 do {rc = poll(&pipePoll, 1, 0);}
889 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
890 if (rc < 1) return 0;
891
892// Now we can put up a read without a delay. Normally a full command will be
893// present. Under some heavy conditions, this may not be the case.
894//
895 do {rlen = read(reqFD, pipeBuff, pipeBlen);}
896 while(rlen < 0 && errno == EINTR);
897 if (rlen <= 0)
898 {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
899 return 0;
900 }
901
902// Check if all the data has arrived. If not all the data is present, defer
903// this request until more data arrives.
904//
905 if (!(pipeBlen -= rlen)) return 1;
906 pipeBuff += rlen;
907 return 0;
908}
#define read(a, b, c)
Definition XrdPosix.hh:82
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104

References pipeBlen, pipeBuff, pipePoll, read, reqBuff, reqFD, and XrdSysE2T().

Here is the call graph for this function:

◆ Include()

virtual bool XrdSys::IOEvents::Poller::Include ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedpure virtual

Add a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

References Channel.

Referenced by Init().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Init()

bool XrdSys::IOEvents::Poller::Init ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLockd )
protected

Definition at line 914 of file XrdSysIOEvents.cc.

916{
917// The channel must be locked upon entry!
918//
919 bool retval;
920
921
922// If we are already in progress then simply update the shadow events and
923// resuppress all current events.
924//
925 if (cP->chPoller == &pollWait)
926 {cP->reMod = cP->chEvents;
927 cP->chEvents = 0;
928 IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
929 return true;
930 }
931
932// Trace this entry
933//
934 IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
935
936// If no events are enabled at this point, just return
937//
938 if (!(cP->chEvents)) return true;
939
940// Refuse to enable a channel without a callback function
941//
942 if (!(cP->chCB))
943 {eNum = EDESTADDRREQ;
944 if (eTxt) *eTxt = "enabling without a callback";
945 return false;
946 }
947
948// So, now we can include the channel in the poll set. We will include it
949// with no events enabled to prevent callbacks prior to completion here.
950//
951 cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
952 retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
953 IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
954 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
955
956// Determine what future poller to use. If we can use the regular poller then
957// set the correct event mask for the channel. Note that we could have lost
958// control but the correct events will be reflected in the "reMod" member.
959//
960 if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
961 else {cP->chPoller = cP->chPollXQ;
962 cP->inPSet = 1;
963 if (cP->reMod)
964 {cP->chEvents = cP->reMod;
965 retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
966 TRACE_MOD(Init,cP->chFD,int(cP->reMod));
967 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
968 } else {
969 TRACE_NOD(Init,cP->chFD,0);
970 }
971 }
972
973// All done
974//
975 cP->reMod = 0;
976 return retval;
977}
#define TRACE_LOK
#define TRACE_NOD(x, fd, y)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)

References BOOLNAME, IF_TRACE, Include(), Init(), XrdSysMutex::Lock(), Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollWait, TRACE_LOK, TRACE_MOD, and TRACE_NOD.

Referenced by Init(), XrdSys::IOEvents::PollerInit::Modify(), and XrdSys::IOEvents::PollerWait::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ LockChannel()

void XrdSys::IOEvents::Poller::LockChannel ( Channel * cP)
inlineprotected

Definition at line 441 of file XrdSysIOEvents.hh.

441{cP->chMutex.Lock();}

References Channel, and XrdSysMutex::Lock().

Here is the call graph for this function:

◆ Modify()

virtual bool XrdSys::IOEvents::Poller::Modify ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedpure virtual

Modify the event status of a channel. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

References Channel.

Referenced by CbkXeq(), and Init().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Poll2Enum()

int XrdSys::IOEvents::Poller::Poll2Enum ( short events)
protected

Definition at line 983 of file XrdSysIOEvents.cc.

984{
985 if (events & POLLERR) return EPIPE;
986
987 if (events & POLLHUP) return ECONNRESET;
988
989 if (events & POLLNVAL) return EBADF;
990
991 return EOPNOTSUPP;
992}

◆ SendCmd()

int XrdSys::IOEvents::Poller::SendCmd ( PipeData & cmd)
protected

Definition at line 998 of file XrdSysIOEvents.cc.

999{
1000 int wlen;
1001
1002// Pipe writes are atomic so we don't need locks. Some commands require
1003// confirmation. We handle that here based on the command. Note that pipes
1004// gaurantee that all of the data will be written or we will block.
1005//
1006 if (cmd.req >= PipeData::Post)
1007 {XrdSysSemaphore mySem(0);
1008 cmd.theSem = &mySem;
1009 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1010 while (wlen < 0 && errno == EINTR);
1011 if (wlen > 0) mySem.Wait();
1012 } else {
1013 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1014 while (wlen < 0 && errno == EINTR);
1015 }
1016
1017// All done
1018//
1019 return (wlen >= 0 ? 0 : errno);
1020}
#define write(a, b, c)
Definition XrdPosix.hh:115

References cmdFD, XrdSys::IOEvents::Poller::PipeData::Post, XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::Poller::PipeData::theSem, XrdSysSemaphore::Wait(), and write.

Referenced by XrdSys::IOEvents::PollE::Exclude(), XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Exclude(), XrdSys::IOEvents::PollPort::Exclude(), XrdSys::IOEvents::PollPoll::Include(), XrdSys::IOEvents::PollPoll::Modify(), and Stop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ SetPollEnt()

void XrdSys::IOEvents::Poller::SetPollEnt ( Channel * cP,
int ptEnt )
protected

Definition at line 1026 of file XrdSysIOEvents.cc.

1027{
1028 cP->pollEnt = pe;
1029}

Referenced by XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Include(), and XrdSys::IOEvents::PollKQ::Modify().

Here is the caller graph for this function:

◆ Shutdown()

virtual void XrdSys::IOEvents::Poller::Shutdown ( )
protectedpure virtual

Shutdown the poller. An implementation must be supplied. The shutdown method must release any allocated storage and close private file descriptors. The polling thread will have already been terminated and x-thread pipe closed. Warning: the derived destructor must call Stop() and do nothing else!

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

Referenced by Stop().

Here is the caller graph for this function:

◆ Stop()

void XrdSys::IOEvents::Poller::Stop ( )

Stop a poller object. Active callbacks are completed. Pending callbacks are discarded. After which the poller event thread exits. Subsequently, each associated channel is disabled and removed from the poller object. If the channel is enabled for a StopEvent, the stop callback is invoked. However, any attempt to use the channel methods that require an active poller will return an error.

Since a stopped poller cannot be restarted; the only thing left is to delete it. This also applies to all the associated channels since they no longer have an active poller.

Definition at line 1035 of file XrdSysIOEvents.cc.

1036{
1037 PipeData cmdbuff;
1038 CallBack *theCB;
1039 Channel *cP;
1040 void *cbArg;
1041 int doCB;
1042
1043// Initialize the pipdata structure
1044//
1045 memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1046 cmdbuff.req = PipeData::Stop;
1047
1048// Lock all of this
1049//
1050 adMutex.Lock();
1051
1052// If we are already shutdown then we are done
1053//
1054 if (cmdFD == -1) {adMutex.UnLock(); return;}
1055
1056// First we must stop the poller thread in an orderly fashion.
1057//
1058 adMutex.UnLock();
1059 SendCmd(cmdbuff);
1060 adMutex.Lock();
1061
1062// Close the pipe communication mechanism
1063//
1064 close(cmdFD); cmdFD = -1;
1065 close(reqFD); reqFD = -1;
1066
1067// Run through cleaning up the channels. While there should not be any other
1068// operations happening on this poller, we take the conservative approach.
1069//
1070 while((cP = attBase))
1071 {REMOVE(attBase, attList, cP);
1072 adMutex.UnLock();
1073 cP->chMutex.Lock();
1074 doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1075 if (cP->inTOQ) TmoDel(cP);
1076 cP->Reset(&pollErr1, cP->chFD, EIDRM);
1077 cP->chPollXQ = &pollErr1;
1078 if (doCB)
1079 {cP->chStat = Channel::isClear;
1080 theCB = cP->chCB; cbArg = cP->chCBA;
1081 cP->chMutex.UnLock();
1082 theCB->Stop(cP, cbArg);
1083 } else cP->chMutex.UnLock();
1084 adMutex.Lock();
1085 }
1086
1087// Now invoke the poller specific shutdown
1088//
1089 Shutdown();
1090 adMutex.UnLock();
1091}
#define REMOVE(dlbase, dlvar, curitem)
@ stopEvent
Poller stop event.
int SendCmd(PipeData &cmd)
virtual void Shutdown()=0

References attBase, Channel, close, cmdFD, XrdSysMutex::Lock(), XrdSys::IOEvents::pollErr1, REMOVE, XrdSys::IOEvents::Poller::PipeData::req, reqFD, SendCmd(), Shutdown(), XrdSys::IOEvents::CallBack::Stop(), XrdSys::IOEvents::Poller::PipeData::Stop, XrdSys::IOEvents::Channel::stopEvent, TmoDel(), and XrdSysMutex::UnLock().

Referenced by XrdSys::IOEvents::PollE::~PollE(), XrdSys::IOEvents::PollKQ::~PollKQ(), XrdSys::IOEvents::PollPoll::~PollPoll(), XrdSys::IOEvents::PollPort::~PollPort(), and XrdCl::PollerBuiltIn::Stop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ TmoAdd()

bool XrdSys::IOEvents::Poller::TmoAdd ( Channel * cP,
int tmoSet )
protected

Definition at line 1097 of file XrdSysIOEvents.cc.

1098{
1099 XrdSysMutexHelper mHelper(toMutex);
1100 time_t tNow;
1101 Channel *ncP;
1102 bool setRTO, setWTO;
1103
1104// Do some tracing
1105//
1106 IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1107 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1108
1109// Remove element from timeout queue if it is there
1110//
1111 if (cP->inTOQ)
1112 {REMOVE(tmoBase, tmoList, cP);
1113 cP->inTOQ = 0;
1114 }
1115
1116// Determine which timeouts need to be reset
1117//
1118 tmoSet|= cP->dlType >> 4;
1121
1122// Reset the required deadlines
1123//
1124 tNow = time(0);
1125 if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1126 cP->rdDL = cP->chRTO + tNow;
1127 if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1128 cP->wrDL = cP->chWTO + tNow;
1129
1130// Calculate the closest enabled deadline
1131//
1132 if (cP->rdDL < cP->wrDL)
1133 {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1134 } else {
1135 cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1136 if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1137 }
1138 IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1139 <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1140
1141// If no timeout really applies, we are done
1142//
1143 if (cP->deadLine == maxTime) return false;
1144
1145// Add the channel to the timeout queue in correct deadline position.
1146//
1147 if ((ncP = tmoBase))
1148 {do {if (cP->deadLine < ncP->deadLine) break;
1149 ncP = ncP->tmoList.next;
1150 } while(ncP != tmoBase);
1151 INSERT(tmoList, ncP, cP);
1152 if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1153 } else tmoBase = cP;
1154 cP->inTOQ = 1;
1155
1156// Indicate to the caller whether or not a wakeup is required
1157//
1158 return (tmoBase == cP);
1159}
#define INSERT(dlvar, curitem, newitem)
#define STATUSOF(x)

References BOOLNAME, Channel, IF_TRACE, INSERT, maxTime, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REMOVE, REVENTS, STATUSOF, TmoAdd(), tmoBase, tmoMask, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

Referenced by CbkXeq(), and TmoAdd().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ TmoDel()

void XrdSys::IOEvents::Poller::TmoDel ( Channel * cP)
protected

Definition at line 1165 of file XrdSysIOEvents.cc.

1166{
1167
1168// Do some tracing
1169//
1170 IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1171 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1172
1173// Get the timeout queue lock and remove the channel from the queue
1174//
1175 toMutex.Lock();
1176 REMOVE(tmoBase, tmoList, cP);
1177 cP->inTOQ = 0;
1178 toMutex.UnLock();
1179}

References BOOLNAME, IF_TRACE, REMOVE, STATUSOF, tmoBase, and TmoDel().

Referenced by CbkXeq(), Stop(), and TmoDel().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ TmoGet()

int XrdSys::IOEvents::Poller::TmoGet ( )
protected

Definition at line 1185 of file XrdSysIOEvents.cc.

1186{
1187 int wtval;
1188
1189// Lock the timeout queue
1190//
1191 toMutex.Lock();
1192
1193// Calculate wait time. If the deadline passed, invoke the timeout callback.
1194// we will need to drop the timeout lock as we don't have the channel lock.
1195//
1196 do {if (!tmoBase) {wtval = -1; break;}
1197 wtval = (tmoBase->deadLine - time(0)) * 1000;
1198 if (wtval > 0) break;
1199 toMutex.UnLock();
1200 CbkTMO();
1201 toMutex.Lock();
1202 } while(1);
1203
1204// Return the value
1205//
1206 CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1207 toMutex.UnLock();
1208 return wtval;
1209}
#define CPP_ATOMIC_STORE(x, val, order)

References CbkTMO(), CPP_ATOMIC_STORE, and tmoBase.

Referenced by XrdSys::IOEvents::PollE::Begin(), XrdSys::IOEvents::PollKQ::Begin(), XrdSys::IOEvents::PollPoll::Begin(), and XrdSys::IOEvents::PollPort::BegTO().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnLockChannel()

void XrdSys::IOEvents::Poller::UnLockChannel ( Channel * cP)
inlineprotected

Definition at line 448 of file XrdSysIOEvents.hh.

448{cP->chMutex.UnLock();}

References Channel, and XrdSysMutex::UnLock().

Referenced by XrdSys::IOEvents::PollE::Exclude(), XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Exclude(), XrdSys::IOEvents::PollPort::Exclude(), XrdSys::IOEvents::PollPoll::Include(), and XrdSys::IOEvents::PollPoll::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ BootStrap

friend class BootStrap
friend

Definition at line 373 of file XrdSysIOEvents.hh.

References BootStrap.

Referenced by BootStrap.

◆ Channel

Member Data Documentation

◆ attBase

Channel* XrdSys::IOEvents::Poller::attBase
protected

Definition at line 488 of file XrdSysIOEvents.hh.

Referenced by Poller(), and Stop().

◆ chDead

bool XrdSys::IOEvents::Poller::chDead
protected

Definition at line 511 of file XrdSysIOEvents.hh.

Referenced by CbkXeq(), and XrdSys::IOEvents::Channel::Delete().

◆ cmdFD

int XrdSys::IOEvents::Poller::cmdFD
protected

Definition at line 494 of file XrdSysIOEvents.hh.

Referenced by Poller(), SendCmd(), and Stop().

◆ maxTime

time_t XrdSys::IOEvents::Poller::maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
staticprotected

Definition at line 513 of file XrdSysIOEvents.hh.

Referenced by CbkXeq(), XrdSys::IOEvents::Channel::Enable(), and TmoAdd().

◆ parentPID

pid_t XrdSys::IOEvents::Poller::parentPID = getpid()
staticprotected

◆ pipeBlen

int XrdSys::IOEvents::Poller::pipeBlen
protected

Definition at line 508 of file XrdSysIOEvents.hh.

Referenced by Poller(), and GetRequest().

◆ pipeBuff

char* XrdSys::IOEvents::Poller::pipeBuff
protected

Definition at line 507 of file XrdSysIOEvents.hh.

Referenced by Poller(), and GetRequest().

◆ pipePoll

struct pollfd XrdSys::IOEvents::Poller::pipePoll
protected

Definition at line 493 of file XrdSysIOEvents.hh.

Referenced by Poller(), and GetRequest().

◆ pollTid

pthread_t XrdSys::IOEvents::Poller::pollTid
protected

◆ reqBuff

PipeData XrdSys::IOEvents::Poller::reqBuff
protected

Definition at line 506 of file XrdSysIOEvents.hh.

Referenced by GetRequest().

◆ reqFD

int XrdSys::IOEvents::Poller::reqFD
protected

Definition at line 495 of file XrdSysIOEvents.hh.

Referenced by Poller(), XrdSys::IOEvents::PollKQ::PollKQ(), GetRequest(), and Stop().

◆ tmoBase

Channel* XrdSys::IOEvents::Poller::tmoBase
protected

Definition at line 489 of file XrdSysIOEvents.hh.

Referenced by Poller(), CbkTMO(), TmoAdd(), TmoDel(), and TmoGet().

◆ tmoMask

unsigned char XrdSys::IOEvents::Poller::tmoMask
protected

Definition at line 509 of file XrdSysIOEvents.hh.

Referenced by Poller(), Create(), and TmoAdd().


The documentation for this class was generated from the following files: