39#include <netinet/in.h>
44#include "XrdVersion.hh"
83 int XrdCmsProtocol::readWait = 1000;
152 static int thePort = -1;
153 char *cfn = pi->
ConfigFN, buff[128];
158 {
if (pi->
Port && pi->
Port != thePort)
159 {sprintf(buff,
"%d disallowed; only using port %d",pi->
Port,thePort);
160 Say.Emsg(
"Config",
"Alternate port", buff);
167 if (
Config.Configure0(pi))
168 {
Config.doWait = -1;
return 0;}
173 {
while(*parms ==
' ') parms++;
176 while(*parms !=
' ' && *parms) parms++;
183 Say.Say(
"Copr. 2003-2020 Stanford University/SLAC cmsd.");
187 if (cfn) cfn = strdup(cfn);
189 {
Config.doWait = -1;
return 0;}
212 {Reply_Delay(Arg, theDelay);
return 0;}
218 Say.Emsg(
"Protocol",
"invalid request code from", myNode->Ident);
219 else if ((etxt = (myNode->*Method)(Arg)))
221 {
DEBUGR(etxt+1 <<
" delayed " <<Arg.waitVal <<
" seconds");
223 }
else if (*etxt ==
'.')
return -ECONNABORTED;
241 if ((dlen = lp->
Peek((
char *)&Hdr,
sizeof(Hdr),readWait)) !=
sizeof(Hdr))
242 {
if (dlen <= 0) lp->
setEtext(
"login not received");
249 {
if (!strncmp((
char *)&Hdr,
"login ", 6))
250 lp->
setEtext(
"protocol version 1 unsupported");
265void XrdCmsProtocol::Pander(
const char *manager,
int mport)
270 time_t ddmsg = time(0);
271 unsigned int Mode, Role = 0;
274 int Lvl=0, Netopts=0, waits=6, tries=6, fails=0, xport=mport;
275 int rc, fsUtil, KickedOut, blRedir, myNID = Manager->
ManTree->
Register();
278 const char *Reason = 0, *manp = manager;
279 const int manblen =
sizeof(manbuff);
284 DEBUG(myRole <<
" services to " <<manager <<
':' <<mport);
288 memset(&loginData, 0,
sizeof(loginData));
296 loginData.
HoldTime=
static_cast<int>(getpid());
330 {
Say.
Emsg(
"Pander",
"Suspend state still active."); waits=6;}
334 if (!(rc = Manager->ManTree->Trying(myNID, Lvl)) && Lvl)
335 {
DEBUG(
"restarting at root node " <<manager <<
':' <<mport);
336 manp = manager; xport = mport; Lvl = 0;
337 }
else if (rc < 0)
break;
339 DEBUG(
"trying to connect to lvl " <<Lvl <<
' ' <<manp <<
':' <<xport);
343 {
Say.
Emsg(
"Pander",
"Is hostname", manp,
"spelled correctly "
344 "or just not running?");
348 else {tries = 6; Netopts = 0;}
349 if ((Lvl = Manager->myMans->Next(xport,manbuff,manblen)))
351 else {
if (manp != manager) fails++;
356 Netopts = 0; tries = waits = 6;
360 if (!(Link->AddrInfo()->isRegistered())
362 {
char *oldName = strdup(Link->Host());
363 Say.
Emsg(
"Protocol", oldName,
"is missing an IPv6 ptr record; "
364 "attempting local registration as", manp);
365 if (!(Link->Register(manp)))
367 "registration failed; address mismatch.");
370 "is now locally registered as", manp);
377 if (!(myNode = Manager->Add(Link, Lvl+1, terminate)))
379 if (terminate)
break;
380 Say.
Emsg(
"Pander",
"Unable to obtain node object.");
388 | (
CmsState.NoStaging ? int(CmsLoginData::kYR_nostage) : 0);
389 if (fails >= 6 && manp == manager)
398 Data = loginData; Data.
Mode =
Mode | myShare | myTimeZ;
400 {
if (!Manager->ManTree->Connect(myNID, myNode)) KickedOut = 1;
401 else {XrdOucEnv cgiEnv((
const char *)Data.
envCGI);
402 const char *sname = cgiEnv.Get(
"site");
403 Say.
Emsg(
"Protocol",
"Logged into", sname, Link->Name());
405 Manager->Verify(Link, (
const char *)Data.
SID, sname);
406 Reason = Dispatch(isUp, TimeOut, 2);
414 if (Data.
SID) {free(Data.SID); Data.SID = 0;}
415 if (Data.
envCGI) {free(Data.envCGI); Data.envCGI = 0;}
419 Manager->Remove(myNode, (rc ==
kYR_redirect ?
"redirected"
420 : (Reason ? Reason :
"lost connection")));
421 Manager->ManTree->Disc(myNID);
428 Sync(); Manager->Delete(myNode); myNode = 0; Reason = 0;
434 Manager->myMans->Add(Link->NetAddr(), (
char *)Data.
Paths,
436 else Manager->Rerun((
char *)Data.
Paths);
443 if (!KickedOut && (Lvl = Manager->myMans->Next(xport,manbuff,manblen)))
444 {manp = manbuff; continue;}
446 if (manp != manager) fails++;
447 manp = manager; xport = mport;
452 Manager->Finished(manager, mport);
477 if ((Routing=Admit()))
479 if (RSlot) {myWay = isLateral; tOut = -1;}
480 else {myWay = isDown; tOut =
Config.AskPing*1000;}
482 if ((Reason = Dispatch(myWay, tOut, 2))) lp->
setEtext(Reason);
493 if (!myNode)
return -1;
502 {
RTable.Del(myNode); RSlot = 0;
503 myNode->UnLock();
delete myNode; myNode = 0;
513 if (myNode->isBound)
Cluster.Remove(0, myNode, !loggedIn);
514 else if (myNode->isGone)
Cluster.Remove(myNode);
515 else myNode->UnLock();
529 bool isLoggedIn = loggedIn != 0;
532 ProtLink = ProtStack;
539 if (reason)
Say.Emsg(
"Protocol", lp->
ID,
"logged out;", reason);
540 else Say.Emsg(
"Protocol", lp->
ID,
"logged out.");
542 if (reason)
Say.Emsg(
"Protocol", lp->
ID,
"login failed;", reason);
570char *getAltName(
char *sid,
char *buff,
int blen)
572 char *atsign, *spacec, *retval = 0;
575 if ((atsign = index(sid,
'@')))
577 if ((spacec = index(atsign,
' ')))
580 if (n > 3 && n < blen)
581 {strcpy(buff, atsign);
594 char *
envP = 0, envBuff[256], myBuff[4096];
595 XrdCmsLogin Source(myBuff,
sizeof(myBuff));
600 int addedp = 0, Status = 0, isPeer = 0, isProxy = 0;
601 int isMan, isServ, isSubm, wasSuspended = 0, Share = 100, tZone = 0;
606 {snprintf(envBuff,
sizeof(envBuff),
"site=%s",
Config.
mySite);
621 if (!Source.Admit(Link, Data,
Config.
mySID, envP))
return 0;
625 XrdOucEnv cgiEnv((
const char *)Data.
envCGI);
631 if (!(Link->AddrInfo()->isRegistered())
633 {
const char *altName = cgiEnv.Get(
"myHN");
634 const char *altType =
"stated mapping";
635 char hBF[256], *oldName = strdup(Link->Host());
636 if (!altName) {altName = getAltName((
char *)Data.
SID, hBF,
sizeof(hBF));
637 altType =
"inferred mapping";
639 Say.
Emsg(
"Protocol",
"DNS lookup for", oldName,
"failed; "
640 "IPv6 ptr record missing!");
642 {
Say.
Emsg(
"Protocol", oldName,
"did not supply a fallback "
643 "mapping; using IPv6 address.");
646 snprintf(buff,
sizeof(buff),
"%s -> %s", oldName, altName);
647 Say.
Emsg(
"Protocol",
"Attempting to use", altType, buff);
648 if (!(Link->Register(altName)))
649 {
Say.
Emsg(
"Protocol", buff, altType,
"failed; address mismatch.");
652 "is now locally registered as", altName);
661 {Link->setID(
"redirector", Data.
HoldTime);
662 return Admit_Redirector(wasSuspended);
668 return Login_Failed(
"configuration disallows subscribers");
697 else return Login_Failed(
"invalid login role");
708 Reason =
"configuration only allows proxies";
710 else if (isProxy) Reason =
"configuration disallows proxies";
712 Reason =
"configuration disallows peers";
713 if (Reason)
return Login_Failed(Reason);
723 Say.
Emsg(
"Protocol",Link->Name(),
"has not yet found a cluster slot!");
729 (
const char *)Data.
SID, (
const char *)Data.
ifList)))
730 return (XrdCmsRouting *)0;
731 myNode->RoleID =
static_cast<char>(roleID);
732 myNode->setVersion(Data.
Version);
739 if (Share > 0) myNode->setShare(Share);
745 tZone = myNode->setTZone(tZone);
751 <<
" MB Util=" <<Data.
fsUtil <<
" Share=" <<Share
752 <<
" TZone=" <<tZone);
753 myNode->DiskTotal = Data.
tSpace;
754 myNode->DiskMinF = Data.
mSpace;
755 myNode->DiskFree = Data.
fSpace;
756 myNode->DiskNums = Data.
fsNum;
757 myNode->DiskUtil = Data.
fsUtil;
763 {XrdOucTokenizer thePaths((
char *)Data.
Paths);
765 ConfigCheck(Data.
Paths);
766 while((tp = thePaths.GetLine()))
767 {
DEBUG(Link->Name() <<
" adding path: " <<tp);
768 if (!(tp = thePaths.GetToken())
769 || !(pp = thePaths.GetToken()))
break;
770 if (!(newmask = AddPath(myNode, tp, pp)))
771 return Login_Failed(
"invalid exported path");
782 pinfo.
rovec = myNode->Mask();
783 if (myNode->isPeer) pinfo.
ssvec = myNode->Mask();
785 Say.
Emsg(
"Protocol", myNode->Ident,
"defaulted r /");
795 myNode->isBad &= ~XrdCmsNode::isDisabled;
800 isNBSQ = Link->setNB();
804 const char *sname = cgiEnv.Get(
"site");
805 const char *lfmt = (myNode->isMan > 1 ?
"Standby%s%s" :
"Primary%s%s");
806 snprintf(envBuff,
sizeof(envBuff),lfmt,(sname ?
" ":
""),(sname ? sname :
""));
807 Say.
Emsg(
"Protocol", envBuff, myNode->Ident,
811 Say.
Emsg(
"Protocol", myNode->Ident,
"system ID:", (
const char *)Data.
SID);
823XrdCmsRouting *XrdCmsProtocol::Admit_Redirector(
int wasSuspended)
825 EPNAME(
"Admit_Redirector");
826 static CmsStatusRequest newState
831 myRole =
"redirector";
836 myNode =
new XrdCmsNode(Link); myNode->Lock();
841 Say.
Emsg(
"Protocol",myNode->Ident,
"login failed; too many redirectors.");
843 }
else myNode->setSlot(RSlot);
849 myNode->Send((
char *)&newState,
sizeof(newState));
853 Say.
Emsg(
"Protocol", myNode->Ident,
"logged in.");
854 DEBUG(myNode->Ident <<
" assigned slot " <<RSlot);
863 const char *pType,
const char *
Path)
901 if ((xp = ProtStack)) ProtStack = xp->ProtLink;
907 if (!xp)
Say.Emsg(
"Protocol",
"No more protocol objects.");
908 else xp->Init(theRole, uMan, theMan, thePort);
919void XrdCmsProtocol::ConfigCheck(
unsigned char *theConfig)
921 unsigned int ConfigID;
926 if (!theConfig) ConfigID = 1;
932 {
if (myNode->
ConfigID)
Say.Emsg(
"Protocol",Link->
Name(),
"reconfigured.");
949const char *XrdCmsProtocol::Dispatch(Bearing cDir,
int maxWait,
int maxTries)
952 static const int ReqSize =
sizeof(CmsRRHdr);
955 const char *toRC = (cDir == isUp ?
"manager not active"
956 :
"server not responding");
957 const char *myArgs, *myArgt;
967do{
if ((rc = Link->RecvAll((
char *)&Data->
Request, ReqSize, maxWait)) < 0)
969 "blacklisted" :
"request read failed");
970 if (!toLeft--)
return toRC;
973 return "server blacklisted w/ redirect";
974 if (!SendPing())
return "server unreachable";
984 return "server blacklisted w/ redirect";
985 if (!SendPing())
return "server unreachable";
996 <<
" dlen=" <<Data->
Dlen);
997 if (!(Data->
Dlen)) {myArgs = myArgt = 0;}
998 else {
if (Data->
Dlen > maxReqSize)
999 {
Say.
Emsg(
"Protocol",
"Request args too long from",Link->Name());
1000 return "protocol error";
1004 {
Say.
Emsg(
"Protocol",
"No buffers to serve", Link->Name());
1005 return "insufficient buffers";
1007 if ((rc = Link->RecvAll(Data->
Buff, Data->
Dlen, maxWait)) < 0)
1008 return (rc == -ETIMEDOUT ?
"read timed out" :
"read failed");
1016 Say.
Emsg(
"Protocol",Link->Name(),
"sent an invalid request -", buff);
1026 || !ProtArgs.Parse(
int(Data->
Request.
rrCode),myArgs,myArgt,Data))
1027 {Reply_Error(*Data,
kYR_EINVAL,
"badly formed request");
1040 {
if ((rc =
Execute(*Data)) && rc == -ECONNABORTED)
return "disconnected";}
1046 else Say.
Emsg(
"Protocol",
"No jobs to serve", Link->Name());
1051 return "logic error";
1065 if (myRole) Pander(myMan, myManPort);
1072void XrdCmsProtocol::Init(
const char *iRole,
XrdCmsManager *uMan,
1073 const char *iMan,
int iPort)
1092XrdCmsRouting *XrdCmsProtocol::Login_Failed(
const char *reason)
1094 Link->setEtext(reason);
1095 return (XrdCmsRouting *)0;
1111 if (refWait && refCount <= 0) {refWait->Post(); refWait = 0;}
1127 struct iovec ioB[2] = {{(
char *)&Data.
Request,
sizeof(Data.
Request)},
1135 "msg TTL exceeded for", Data.
Path);
1148 "aborted; no servers handling", Data.
Path);
1155 {
if (!(amask = pinfo.
rwvec))
1157 "aborted; no r/w servers handling", Data.
Path);
1189 Link->Send((
char *)&Resp,
sizeof(Resp));
1190 }
else act =
" skip";
1192 DEBUG(myNode->Ident <<act <<
" delay " <<ntohl(theDelay));
1199void XrdCmsProtocol::Reply_Error(
XrdCmsRRData &Data,
int ecode,
const char *etext)
1203 int n = strlen(etext)+1;
1207 htons((
unsigned short int)(
sizeof(
kXR_unt32)+n))},
1208 htonl(
static_cast<unsigned int>(ecode))};
1209 struct iovec ioV[2] = {{(
char *)&Resp,
sizeof(Resp)},
1210 {(
char *)etext, (
size_t)n}};
1213 }
else act =
" skip";
1215 DEBUG(myNode->Ident <<act <<
" err " <<ecode <<
' ' <<etext);
1222bool XrdCmsProtocol::SendPing()
1234 if (Link->Send((
char *)&Ping,
sizeof(Ping)) < 0)
return false;
1242void XrdCmsProtocol::Sync()
1245 XrdSysSemaphore mySem(0);
1250 if (refCount <= 0) refMutex.UnLock();
1251 else {refWait = &mySem;
1252 DEBUG(
"Waiting for " <<refCount <<
' ' <<myNode->Ident
1253 <<
" thread(s) to end.");
XrdProtocol * XrdgetProtocol(const char *pname, char *parms, XrdProtocol_Config *pi)
XrdVERSIONINFO(XrdgetProtocol, cmsd)
int XrdgetProtocolPort(const char *pname, char *parms, XrdProtocol_Config *pi)
unsigned long long SMask_t
XrdProtocol * XrdgetProtocol(const char *pname, char *parms, XrdProtocol_Config *pi)
int XrdgetProtocolPort(const char *pname, char *parms, XrdProtocol_Config *pi)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
void ResetRef(SMask_t smask, bool isLocked=false)
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsNode * Add(XrdLink *lp, int dport, int Status, int sport, const char *theNID, const char *theIF)
static XrdCmsJob * Alloc(XrdCmsProtocol *, XrdCmsRRData *)
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
int FreeSpace(int &tutil)
static const char allowsRW
static const char allowsSS
static const char isSuspend
static const char isDoomed
static const char isBlisted
int Find(const char *pname, XrdCmsPInfo &masks)
SMask_t Insert(const char *pname, XrdCmsPInfo *pinfo)
void Recycle(XrdLink *lp, int consec, const char *reason)
int Execute(XrdCmsRRData &Data)
int Stats(char *buff, int blen, int do_sync=0)
static XrdCmsProtocol * Alloc(const char *theRole="", XrdCmsManager *mP=0, const char *theMan=0, int thePort=0)
XrdProtocol * Match(XrdLink *lp)
static XrdCmsRRData * Objectify(XrdCmsRRData *op=0)
short Add(XrdCmsNode *nP)
static const char * Name(RoleID rid)
const char * getName(int Code)
const char *(XrdCmsNode::* NodeMethod_t)(XrdCmsRRData &)
static const char FES_Suspend
static const char All_Suspend
XrdLink * Connect(const char *host, int port, int opts=0, int timeout=-1)
void Secure(XrdNetSecurity *secp)
XrdJob(const char *desc="")
void Serialize()
Wait for all outstanding requests to be completed on the link.
int setEtext(const char *text)
int Peek(char *buff, int blen, int timeout=-1)
char * ID
Pointer to the client's link identity.
const char * Name() const
static uint32_t CRC32(const unsigned char *data, int count)
XrdProtocol(const char *jname)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
static const unsigned char kYR_Version
static const int CMS_isSuper
static const int CMS_noStage
static const int CMS_isMan
static const int CMS_isPeer
static const int CMS_isProxy
static const int CMS_Suspend