XRootD
Loading...
Searching...
No Matches
XrdCmsRRQ Class Reference

#include <XrdCmsRRQ.hh>

+ Collaboration diagram for XrdCmsRRQ:

Classes

struct  Info
 

Public Member Functions

 XrdCmsRRQ ()
 
 ~XrdCmsRRQ ()
 
short Add (short Snum, XrdCmsRRQInfo *ip)
 
void Del (short Snum, const void *Key)
 
int Init (int Tint=0, int Tdly=0)
 
int Ready (int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
 
void * Respond ()
 
void Statistics (Info &Data)
 
void * TimeOut ()
 

Detailed Description

Definition at line 104 of file XrdCmsRRQ.hh.

Constructor & Destructor Documentation

◆ XrdCmsRRQ()

XrdCmsRRQ::XrdCmsRRQ ( )
inline

Definition at line 148 of file XrdCmsRRQ.hh.

148 : isWaiting(0), isReady(0),
149 luFast(0), luSlow(0), rdFast(0), rdSlow(0),
150 Tslice(178), Tdelay(5), myClock(0) {}

◆ ~XrdCmsRRQ()

XrdCmsRRQ::~XrdCmsRRQ ( )
inline

Definition at line 151 of file XrdCmsRRQ.hh.

151{}

Member Function Documentation

◆ Add()

short XrdCmsRRQ::Add ( short Snum,
XrdCmsRRQInfo * ip )

Definition at line 76 of file XrdCmsRRQ.cc.

77{
78// EPNAME("RRQ Add");
79 XrdCmsRRQSlot *sp;
80
81// Obtain a slot and fill it in
82//
83 if (!(sp = XrdCmsRRQSlot::Alloc(Info))) return 0;
84// DEBUG("adding slot " <<sp->slotNum);
85
86// If a slot number given, check if it's the right slot and it is still queued.
87// If so, piggy-back this request to existing one and make a fast exit
88//
89 myMutex.Lock(); Stats.Add2Q++;
90 if (Snum && Slot[Snum].Info.Key == Info->Key && Slot[Snum].Expire)
91 {if (Info->isLU)
92 {sp->LkUp = Slot[Snum].LkUp;
93 Slot[Snum].LkUp = sp;
94 } else {
95 sp->Cont = Slot[Snum].Cont;
96 Slot[Snum].Cont = sp;
97 }
98 Stats.PBack++;
99 myMutex.UnLock();
100 return Snum;
101 }
102
103// Queue this slot to the pending response queue and tell the timeout scheduler
104//
105 sp->Expire = myClock+1;
106 if (waitQ.Singleton()) isWaiting.Post();
107 waitQ.Prev()->Insert(&sp->Link);
108 myMutex.UnLock();
109 return sp->slotNum;
110}

◆ Del()

void XrdCmsRRQ::Del ( short Snum,
const void * Key )

Definition at line 116 of file XrdCmsRRQ.cc.

117{
118 Ready(Snum, Key, 0, 0);
119}
int Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
Definition XrdCmsRRQ.cc:197

References Ready().

+ Here is the call graph for this function:

◆ Init()

int XrdCmsRRQ::Init ( int Tint = 0,
int Tdly = 0 )

Definition at line 125 of file XrdCmsRRQ.cc.

126{
127 int rc;
128 pthread_t tid;
129
130// Set values
131//
132 if (Tint) Tslice = Tint;
133 if (Tdly) Tdelay = Tdly;
134 Stats.Reset();
135
136// Fill out the response structure
137//
138 dataResp.Hdr.streamid = 0;
139 dataResp.Hdr.rrCode = kYR_data;
140 dataResp.Hdr.modifier = 0;
141 dataResp.Hdr.datalen = 0;
142 dataResp.Val = 0;
143
144// Fill out the data i/o vector
145//
146 data_iov[0].iov_base = (char *)&dataResp;
147 data_iov[0].iov_len = sizeof(dataResp);
148 data_iov[1].iov_base = databuff;;
149
150// Fill out the response structure
151//
152 redrResp.Hdr.streamid = 0;
153 redrResp.Hdr.rrCode = kYR_redirect;
154 redrResp.Hdr.modifier = 0;
155 redrResp.Hdr.datalen = 0;
156 redrResp.Val = 0;
157
158// Fill out the redirect i/o vector
159//
160 redr_iov[0].iov_base = (char *)&redrResp;
161 redr_iov[0].iov_len = sizeof(redrResp);
162 redr_iov[1].iov_base = hostbuff;;
163
164// Fill out the wait info
165//
166 waitResp.Hdr.streamid = 0;
167 waitResp.Hdr.rrCode = kYR_wait;
168 waitResp.Hdr.modifier = 0;
169 waitResp.Hdr.datalen = htons(static_cast<unsigned short>(sizeof(waitResp.Val)));
170 waitResp.Val = htonl(Tdelay);
171
172// Start the responder thread
173//
174 if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartRespond, (void *)0,
175 0, "Request Responder")))
176 {Say.Emsg("Config", rc, "create request responder thread");
177 return 1;
178 }
179
180// Start the timeout thread
181//
182 if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartTimeOut, (void *)0,
183 0, "Request Timeout")))
184 {Say.Emsg("Config", rc, "create request timeout thread");
185 return 1;
186 }
187
188// All done
189//
190 return 0;
191}
void * XrdCmsRRQ_StartRespond(void *parg)
Definition XrdCmsRRQ.cc:67
void * XrdCmsRRQ_StartTimeOut(void *parg)
Definition XrdCmsRRQ.cc:65
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
@ kYR_redirect
Definition YProtocol.hh:143
XrdSysError Say

References XrdCms::kYR_data, XrdCms::kYR_redirect, XrdCms::kYR_wait, XrdSysThread::Run(), XrdCms::Say, XrdCmsRRQ_StartRespond(), and XrdCmsRRQ_StartTimeOut().

+ Here is the call graph for this function:

◆ Ready()

int XrdCmsRRQ::Ready ( int Snum,
const void * Key,
SMask_t mask1,
SMask_t mask2 )

Definition at line 197 of file XrdCmsRRQ.cc.

198{
199// EPNAME("RRQ Ready");
200 XrdCmsRRQSlot *sp;
201
202// Check if it's the right slot and it is still queued.
203//
204 myMutex.Lock();
205 sp = &Slot[Snum];
206 if (sp->Info.Key != Key || !sp->Expire)
207 {myMutex.UnLock();
208// DEBUG("slot " <<Snum <<" no longer valid");
209 return 1;
210 }
211
212// Update the arguments. The first is the running node mask and the second is
213// a fixed differentiation mask. Accumulate the 1st but replace the 2nd.
214//
215 sp->Arg1 |= mask1; sp->Arg2 = mask2;
216 Stats.Resp++;
217
218// Check if we should still hold on to this slot because the number of actual
219// responders is less than the number needed.
220//
221 if (sp->Info.actR < sp->Info.minR)
222 {sp->Info.actR++; Stats.Multi++;
223 myMutex.UnLock();
224 return 0;
225 }
226
227// Move the element from the waiting queue to the ready queue
228//
229 sp->Link.Remove();
230 if (readyQ.Singleton()) isReady.Post();
231 readyQ.Prev()->Insert(&sp->Link);
232 myMutex.UnLock();
233// DEBUG("readied slot " <<Snum <<" mask " <<mask);
234 return 1;
235}

References XrdCmsRRQInfo::actR, XrdCmsRRQInfo::Key, XrdCmsRRQInfo::minR, and XrdOucDLlist< T >::Remove().

Referenced by Del().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Respond()

void * XrdCmsRRQ::Respond ( )

Definition at line 241 of file XrdCmsRRQ.cc.

242{
243// EPNAME("RRQ Respond");
244 XrdCmsRRQSlot *sp;
245
246// In an endless loop, process all ready elements
247//
248 do {isReady.Wait(); // DEBUG("responder awoken");
249 do {myMutex.Lock();
250 Stats.rdFast += rdFast; Stats.rdSlow += rdSlow;
251 Stats.luFast += luFast; Stats.luSlow += luSlow;
252 if (readyQ.Singleton()) {myMutex.UnLock(); break;}
253 sp = readyQ.Next()->Item(); sp->Link.Remove(); sp->Expire = 0;
254 myMutex.UnLock();
255
256 // A locate request can be pggy-backed on a select request and vice-versa
257 // We separate the two queues here as each has a different response.
258 //
259 if (sp->Info.isLU)
260 {if (sp->Cont)
261 {sp->Cont->Arg1 = sp->Arg1;
262 sendRedResp(sp->Cont);
263 }
264 sendLocResp(sp);
265 } else {
266 if (sp->LkUp)
267 {sp->LkUp->Arg1 = sp->Arg1; sp->LkUp->Arg2 = sp->Arg2;
268 sendLocResp(sp->LkUp);
269 }
270 sendRedResp(sp);
271 }
272 sp->Recycle();
273 } while(1);
274 } while(1);
275
276// Keep the compiler happy
277//
278 return (void *)0;
279}

References XrdCmsRRQInfo::isLU, and XrdOucDLlist< T >::Remove().

+ Here is the call graph for this function:

◆ Statistics()

void XrdCmsRRQ::Statistics ( Info & Data)
inline

Definition at line 144 of file XrdCmsRRQ.hh.

144{myMutex.Lock(); Data = Stats; myMutex.UnLock();}

◆ TimeOut()

void * XrdCmsRRQ::TimeOut ( )

Definition at line 398 of file XrdCmsRRQ.cc.

399{
400// EPNAME("RRQ TimeOut");
401 XrdCmsRRQSlot *sp;
402
403// We measure millisecond intervals to timeout waiting requests. We used to zero
404// out arg1/2 to force expiration, but they would be zero anyway if no responses
405// occurred. Now with qdn we need to leave them alone as we may have deferred
406// a fast dispatch because we were waiting for more than one responder.
407//
408 while(1)
409 {isWaiting.Wait();
410 myMutex.Lock();
411 while(1)
412 {myClock++;
413 myMutex.UnLock();
414 XrdSysTimer::Wait(Tslice);
415 myMutex.Lock();
416 while((sp=waitQ.Next()->Item()) && sp->Expire < myClock)
417 {sp->Link.Remove();
418 if (readyQ.Singleton()) isReady.Post();
419// sp->Arg1 = 0; sp->Arg2 = 0;
420// DEBUG("expired slot " <<sp->slotNum);
421 readyQ.Prev()->Insert(&sp->Link);
422 }
423 if (waitQ.Singleton()) break;
424 }
425 myMutex.UnLock();
426 }
427
428// Keep the compiler happy
429//
430 return (void *)0;
431}
static void Wait(int milliseconds)

References XrdOucDLlist< T >::Remove(), and XrdSysTimer::Wait().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: