XRootD
Loading...
Searching...
No Matches
XrdXrootdTransit.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d T r a n s i t . c c */
4/* */
5/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstring>
32#include <unistd.h>
33#include <sys/uio.h>
34
36
38
39#include "Xrd/XrdBuffer.hh"
40#include "Xrd/XrdLink.hh"
42#include "XrdOuc/XrdOucUtils.hh"
50
51/******************************************************************************/
52/* C l o b a l S y m b o l s */
53/******************************************************************************/
54
56
57#undef TRACELINK
58#define TRACELINK Link
59
60#define XRD_GETNUM(x)\
61 ntohl(*(static_cast<unsigned int *>(static_cast<void *>(x))))
62
63/******************************************************************************/
64/* S t a t i c M e m b e r s */
65/******************************************************************************/
66
67const char *XrdXrootdTransit::reqTab = XrdXrootdTransit::ReqTable();
68
70 XrdXrootdTransit::TranStack("TranStack",
71 "transit protocol anchor");
72
73/******************************************************************************/
74/* A l l o c */
75/******************************************************************************/
76
78 XrdLink *linkP,
79 XrdSecEntity *seceP,
80 const char *nameP,
81 const char *protP
82 )
83{
85
86// Simply return a new transit object masquerading as a bridge
87//
88 if (!(xp = TranStack.Pop())) xp = new XrdXrootdTransit();
89 xp->Init(rsltP, linkP, seceP, nameP, protP);
90 return xp;
91}
92
93/******************************************************************************/
94/* A t t n */
95/******************************************************************************/
96
97int XrdXrootdTransit::Attn(XrdLink *lP, short *theSID, int rcode,
98 const struct iovec *ioV, int ioN, int ioL)
99{
101
102// Find the request
103//
104 if (!(tP = XrdXrootdTransPend::Remove(lP, *theSID)))
105 {TRACE(REQ, "Unable to find request for " <<lP->ID <<" sid=" <<*theSID);
106 return 0;
107 }
108
109// Resume the request as we have been waiting for the response.
110//
111 return tP->bridge->AttnCont(tP, rcode, ioV, ioN, ioL);
112}
113
114/******************************************************************************/
115/* A t t n C o n t */
116/******************************************************************************/
117
118int XrdXrootdTransit::AttnCont(XrdXrootdTransPend *tP, int rcode,
119 const struct iovec *ioV, int ioN, int ioL)
120{
121 int rc;
122
123// we ensure waitPend has been cleared. This is to allow the process or
124// redrive loops to have taken the correct action before we zero the
125// runWait value.
126//
127 {
128 XrdSysCondVarHelper clk(waitCnd);
129 while(waitPend) waitCnd.Wait();
130 }
131
132// Refresh the request structure
133//
134 memcpy(&Request, &(tP->Pend.Request), sizeof(Request));
135 delete tP;
136 runWait = 0;
137
138// Reissue the request if it's a wait 0 response.
139//
140 if (rcode==kXR_wait
141 && (!ioN || XRD_GETNUM(ioV[0].iov_base) == 0))
142 {// we don't set waitPend as the job can start straight away
143 Sched->Schedule((XrdJob *)&waitJob);
144 return 0;
145 }
146
147// Send off the deferred response
148//
149 rc = Send(rcode, ioV, ioN, ioL);
150
151// Handle end based on current state
152//
153 if (rc < 0) return rc;
154
155 if (!runWait)
156 {if (runDone) runStatus.store(0, std::memory_order_release);
157 if (reInvoke) Sched->Schedule((XrdJob *)&respJob);
158 else Link->Enable();
159 }
160 else
161 {XrdSysCondVarHelper clk(waitCnd);
162 waitPend = false;
163 waitCnd.Broadcast();
164 }
165
166// All done
167//
168 return rc;
169}
170
171/******************************************************************************/
172/* D i s c */
173/******************************************************************************/
174
176{
177 char buff[128];
178
179// We do not allow disconnection while we are active
180//
181 if (runStatus.fetch_add(1, std::memory_order_acq_rel)) return false;
182
183// Reconnect original protocol to the link
184//
185 Link->setProtocol(realProt);
186
187// Now we need to recycle our xrootd part
188//
189 sprintf(buff, "%s disconnection", pName);
190 XrdXrootdProtocol::Recycle(Link, time(0)-cTime, buff);
191
192// Make sure that any pending wait jobs can exit
193//
194 {
195 XrdSysCondVarHelper clk(waitCnd);
196 waitPend = false;
197 runWait = 0;
198 waitCnd.Broadcast();
199 }
200
201// Now just free up our object.
202//
203 TranStack.Push(&TranLink);
204 return true;
205}
206
207/******************************************************************************/
208/* Private: F a i l */
209/******************************************************************************/
210
211bool XrdXrootdTransit::Fail(int ecode, const char *etext)
212{
213 runError = ecode;
214 runEText = etext;
215 return true;
216}
217
218/******************************************************************************/
219/* F a t a l */
220/******************************************************************************/
221
222int XrdXrootdTransit::Fatal(int rc)
223{
224 XrdXrootd::Bridge::Context rInfo(Link, Request.header.streamid,
225 Request.header.requestid);
226
227 return (respObj->Error(rInfo, runError, runEText) ? rc : -1);
228}
229
230/******************************************************************************/
231/* I n i t */
232/******************************************************************************/
233
234void XrdXrootdTransit::Init(XrdScheduler *schedP, int qMax, int qTTL)
235{
236 TranStack.Set(schedP, &XrdXrootdTrace, TRACE_MEM);
237 TranStack.Set(qMax, qTTL);
238}
239
240/******************************************************************************/
241
243 XrdLink *linkP,
244 XrdSecEntity *seceP,
245 const char *nameP,
246 const char *protP
247 )
248{
249 XrdNetAddrInfo *addrP;
250 const char *who;
251 char uname[sizeof(Request.login.username)+1];
252
253// Set standard stuff
254//
255 runArgs = 0;
256 runALen = 0;
257 runABsz = 0;
258 runError = 0;
259 runStatus.store(0, std::memory_order_release);
260 runWait = 0;
261 runWTot = 0;
262 runWMax = 3600;
263 runWCall = false;
264 runDone = false;
265 reInvoke = false;
266 waitPend = false;
267 wBuff = 0;
268 wBLen = 0;
269 respObj = respP;
270 pName = protP;
271 mySID = getSID();
272
273// Bind the protocol to the link
274//
275 SI->Bump(SI->Count);
276 Link = linkP;
277 Response.Set(linkP);
278 Response.Set(this);
279 strcpy(Entity.prot, "host");
280 Entity.host = (char *)linkP->Host();
281
282// Develop a trace identifier
283//
284 strncpy(uname, nameP, sizeof(uname)-1);
285 uname[sizeof(uname)-1] = 0;
287 linkP->setID(uname, mySID);
288
289// Place trace identifier everywhere is should be located
290
291// Indicate that this brige supports asynchronous responses
292//
294
295// Mark the client as IPv4 if they came in as IPv4 or mapped IPv4. Note
296// there is no way we can figure out if this is a dual-stack client.
297//
298 addrP = Link->AddrInfo();
299 if (addrP->isIPType(XrdNetAddrInfo::IPv4) || addrP->isMapped())
301
302// Mark the client as being on a private net if the address is private
303//
304 if (addrP->isPrivate()) {clientPV |= XrdOucEI::uPrip; rdType = 1;}
305 else rdType = 0;
306
307// Now tie the security information
308//
309 Client = (seceP ? seceP : &Entity);
310 Client->ueid = mySID;
311 Client->tident = Client->pident = Link->ID;
312 Client->addrInfo = addrP;
313
314// Allocate a monitoring object, if needed for this connection and record login
315//
316 if (Monitor.Ready())
317 {Monitor.Register(linkP->ID, linkP->Host(), protP);
318 if (Monitor.Logins())
319 {if (Monitor.Auths() && seceP) MonAuth();
320 else Monitor.Report(Monitor.Auths() ? "" : 0);
321 }
322 }
323
324// Complete the request ID object
325//
326 ReqID.setID(Request.header.streamid, linkP->FDnum(), linkP->Inst());
327
328// Substitute our protocol for the existing one
329//
330 realProt = linkP->setProtocol(this);
331 linkP->setProtName(protP);
332 linkP->armBridge();
333
334// Document this login
335//
336 who = (seceP && seceP->name ? seceP->name : "nobody");
337 eDest.Log(SYS_LOG_01, "Bridge", Link->ID, "login as", who);
338
339// All done, indicate we are logged in
340//
342 cTime = time(0);
343
344// Propogate a connect through the whole system
345//
346 osFS->Connect(Client);
347}
348
349/******************************************************************************/
350/* P r o c e e d */
351/******************************************************************************/
352
354{
355 int rc;
356
357// If we were interrupted in a reinvoke state, resume that state.
358//
359 if (reInvoke) rc = Process(Link);
360 else rc = 0;
361
362// Handle ending status
363//
364 if (rc >= 0) Link->Enable();
365 else if (rc != -EINPROGRESS) Link->Close();
366}
367
368/******************************************************************************/
369/* P r o c e s s */
370/******************************************************************************/
371
373{
374 int rc;
375
376// This entry is serialized via link processing and data is now available.
377// One of the following will be returned.
378//
379// < 0 -> Stop getting requests,
380// -EINPROGRESS leave link disabled but otherwise all is well
381// -n Error, disable and close the link
382// = 0 -> OK, get next request, if allowed, o/w enable the link
383// > 0 -> Slow link, stop getting requests and enable the link
384//
385
386// Reflect data is present to the underlying protocol and if Run() has been
387// called we need to dispatch that request. This may be iterative.
388//
389do{rc = realProt->Process((reInvoke ? 0 : lp));
390 if (rc >= 0 && runStatus.load(std::memory_order_acquire))
391 {reInvoke = (rc == 0);
392 if (runError) rc = Fatal(rc);
393 else {runDone = false;
395 if (rc >= 0)
396 {if (runDone) runStatus.store(0, std::memory_order_release);
397 if (runWait)
398 {XrdSysCondVarHelper clk(waitCnd);
399 waitPend = false;
400 waitCnd.Broadcast();
401 return -EINPROGRESS;
402 }
403 if (!runDone) return rc;
404 }
405 }
406 } else reInvoke = false;
407 } while(rc >= 0 && reInvoke);
408
409// Make sure that we indicate that we are no longer active
410//
411 runStatus.store(0, std::memory_order_release);
412
413// All done
414//
415 return (rc ? rc : 1);
416}
417
418/******************************************************************************/
419/* R e c y c l e */
420/******************************************************************************/
421
422void XrdXrootdTransit::Recycle(XrdLink *lp, int consec, const char *reason)
423{
424
425// Set ourselves as active so we can't get more requests
426//
427 runStatus.fetch_add(1, std::memory_order_acq_rel);
428
429// If we were active then we will need to quiesce before dismantling ourselves.
430// Note that Recycle() can only be called if the link is enabled. So, this bit
431// of code is improbable but we check it anyway.
432//
433 if (runWait > 0) {
434 TRACEP(EMSG, "WARNING: Recycle is canceling wait job; the wait job might already be running during recycle.");
435 Sched->Cancel(&waitJob);
436 }
437
438// First we need to recycle the real protocol
439//
440 if (realProt) realProt->Recycle(lp, consec, reason);
441
442// Now we need to recycle our xrootd part
443//
444 XrdXrootdProtocol::Recycle(lp, consec, reason);
445
446// Release the argument buffer
447//
448 if (runArgs) {free(runArgs); runArgs = 0;}
449
450// Delete all pending requests
451//
453
454// Make sure that any pending wait jobs can exit
455//
456 {
457 XrdSysCondVarHelper clk(waitCnd);
458 waitPend = false;
459 runWait = 0;
460 waitCnd.Broadcast();
461 }
462
463// Now just free up our object.
464//
465 TranStack.Push(&TranLink);
466}
467
468/******************************************************************************/
469/* R e d r i v e */
470/******************************************************************************/
471
473{
474 static int eCode = htonl(kXR_NoMemory);
475 static char eText[] = "Insufficent memory to re-issue request";
476 static struct iovec ioV[] = {{(char *)&eCode,sizeof(eCode)},
477 {(char *)&eText,sizeof(eText)}};
478 int rc;
479
480// we ensure waitPend has been cleared. This is to allow the process or
481// redrive loops to have taken the correct action before we zero the
482// runWait value.
483//
484 {
485 XrdSysCondVarHelper clk(waitCnd);
486 while(waitPend) waitCnd.Wait();
487 }
488
489// Do some tracing
490//
491 TRACEP(REQ, "Bridge redrive runStatus="<<runStatus.load(std::memory_order_acquire)
492 <<" runError="<<runError
493 <<" runWait="<<runWait<<" runWTot="<<runWTot);
494
495// Update wait statistics
496//
497 runWTot += runWait;
498 runWait = 0;
499
500// While we are running asynchronously, there is no way that this object can
501// be deleted while a timer is outstanding as the link has been disabled. So,
502// we can reissue the request with little worry.
503//
504// This is a bit tricky here as a redriven request may result in a wait. If
505// this happens we cannot hand the result off to the real protocol until we
506// wait and successfully redrive. The wait handling occurs asynchronously
507// so all we need to do is honor it here.
508//
509 if (!runALen || RunCopy(runArgs, runALen)) {
510 do{runDone = false;
511 rc = Process2();
512 TRACEP(REQ, "Bridge redrive Process2 rc="<<rc
513 <<" runError="<<runError<<" runWait="<<runWait);
514 if (rc < 0) break;
515 if (runDone) runStatus.store(0, std::memory_order_release);
516 if (runWait || !runDone || !reInvoke) break;
517 rc = realProt->Process(NULL);
518 TRACEP(REQ, "Bridge redrive callback rc="<<rc
519 <<" runStatus="<<runStatus.load(std::memory_order_acquire));
520 if (rc < 0 || !runStatus.load(std::memory_order_acquire))
521 {reInvoke = false;
522 break;
523 }
524 reInvoke = (rc == 0);
525 if (runError) rc = Fatal(rc);
526 } while((rc >= 0) && !runError && !runWait);
527 }
528 else rc = Send(kXR_error, ioV, 2, 0);
529
530// Defer the request if need be
531//
532 if (rc >= 0 && runWait)
533 {XrdSysCondVarHelper clk(waitCnd);
534 waitPend = false;
535 waitCnd.Broadcast();
536 return;
537 }
538 runWTot = 0;
539
540// Indicate we are no longer active
541//
542 runStatus.store(0, std::memory_order_release);
543
544// If the link needs to be terminated, terminate the link. Otherwise, we can
545// enable the link for new requests at this point.
546//
547 if (rc < 0) Link->Close();
548 else Link->Enable();
549}
550
551/******************************************************************************/
552/* R e q T a b l e */
553/******************************************************************************/
554
555#define KXR_INDEX(x) x-kXR_auth
556
558{
559 static char rTab[kXR_truncate-kXR_auth+1];
560
561// Initialize the table
562//
563 memset(rTab, 0, sizeof(rTab));
564 rTab[KXR_INDEX(kXR_chmod)] = 1;
565 rTab[KXR_INDEX(kXR_close)] = 1;
566 rTab[KXR_INDEX(kXR_dirlist)] = 1;
567 rTab[KXR_INDEX(kXR_locate)] = 1;
568 rTab[KXR_INDEX(kXR_mkdir)] = 1;
569 rTab[KXR_INDEX(kXR_mv)] = 1;
570 rTab[KXR_INDEX(kXR_open)] = 1;
571 rTab[KXR_INDEX(kXR_prepare)] = 1;
572 rTab[KXR_INDEX(kXR_protocol)] = 1;
573 rTab[KXR_INDEX(kXR_query)] = 1;
574 rTab[KXR_INDEX(kXR_read)] = 2;
575 rTab[KXR_INDEX(kXR_readv)] = 2;
576 rTab[KXR_INDEX(kXR_rm)] = 1;
577 rTab[KXR_INDEX(kXR_rmdir)] = 1;
578 rTab[KXR_INDEX(kXR_set)] = 1;
579 rTab[KXR_INDEX(kXR_stat)] = 1;
580 rTab[KXR_INDEX(kXR_statx)] = 1;
581 rTab[KXR_INDEX(kXR_sync)] = 1;
582 rTab[KXR_INDEX(kXR_truncate)] = 1;
583 rTab[KXR_INDEX(kXR_write)] = 2;
584
585// Now return the address
586//
587 return rTab;
588}
589
590/******************************************************************************/
591/* Private: R e q W r i t e */
592/******************************************************************************/
593
594bool XrdXrootdTransit::ReqWrite(char *xdataP, int xdataL)
595{
596
597// Make sure we always transit to the resume point
598//
599 myBlen = 0;
600
601// If nothing was read, then this is a straight-up write
602//
603 if (!xdataL || !xdataP || !Request.header.dlen)
604 {Resume = 0; wBuff = xdataP; wBLen = xdataL;
605 return true;
606 }
607
608// Partial data was read, we may have to split this between a direct write
609// and a network read/write -- somewhat complicated.
610//
611 myBuff = wBuff = xdataP;
612 myBlast = wBLen = xdataL;
614 return true;
615}
616
617/******************************************************************************/
618/* R u n */
619/******************************************************************************/
620
621bool XrdXrootdTransit::Run(const char *xreqP, char *xdataP, int xdataL)
622{
623 int movLen;
624
625// We do not allow re-entry if we are curently processing a request.
626// It will be reset, as need, when a response is effected.
627//
628
629 if (runStatus.fetch_add(1, std::memory_order_acq_rel))
630 {TRACEP(REQ, "Bridge request failed due to re-entry");
631 return false;
632 }
633
634// Copy the request header
635//
636 memcpy((void *)&Request, (void *)xreqP, sizeof(Request));
637
638// Validate that we can actually handle this request
639//
640 Request.header.requestid = ntohs(Request.header.requestid);
641 if (Request.header.requestid & 0x8000
642 || Request.header.requestid > static_cast<kXR_unt16>(kXR_truncate)
643 || !reqTab[Request.header.requestid - kXR_auth])
644 {TRACEP(REQ, "Unsupported bridge request");
645 return Fail(kXR_Unsupported, "Unsupported bridge request");
646 }
647
648// Validate the data length
649//
650 Request.header.dlen = ntohl(Request.header.dlen);
651 if (Request.header.dlen < 0)
652 {TRACEP(REQ, "Invalid request data length");
653 return Fail(kXR_ArgInvalid, "Invalid request data length");
654 }
655
656// Copy the stream id and trace this request
657//
658 Response.Set(Request.header.streamid);
659 TRACEP(REQ, "Bridge req=" <<Request.header.requestid
660 <<" dlen=" <<Request.header.dlen <<" blen=" <<xdataL);
661
662// If this is a write request, we will need to do a lot more
663//
664 if (Request.header.requestid == kXR_write) return ReqWrite(xdataP, xdataL);
665
666// Obtain any needed buffer and handle any existing data arguments. Also, we
667// need to keep a shadow copy of the request arguments should we get a wait
668// and will need to re-issue the request (the server mangles the args).
669//
670 if (Request.header.dlen)
671 {movLen = (xdataL < Request.header.dlen ? xdataL : Request.header.dlen);
672 if (!RunCopy(xdataP, movLen)) return true;
673 if (!runArgs || movLen > runABsz)
674 {if (runArgs) free(runArgs);
675 if (!(runArgs = (char *)malloc(movLen)))
676 {TRACEP(REQ, "Failed to allocate memory");
677 return Fail(kXR_NoMemory, "Insufficient memory");
678 }
679 runABsz = movLen;
680 }
681 memcpy(runArgs, xdataP, movLen); runALen = movLen;
682 if ((myBlen = Request.header.dlen - movLen))
683 {myBuff = argp->buff + movLen;
685 return true;
686 }
687 } else runALen = 0;
688
689// If we have all the data, indicate request accepted.
690//
691 runError = 0;
692 Resume = 0;
693 return true;
694}
695
696/******************************************************************************/
697/* Privae: R u n C o p y */
698/******************************************************************************/
699
700bool XrdXrootdTransit::RunCopy(char *buffP, int buffL)
701{
702
703// Allocate a buffer if we do not have one or it is too small
704//
705 if (!argp || Request.header.dlen+1 > argp->bsize)
706 {if (argp) BPool->Release(argp);
707 if (!(argp = BPool->Obtain(Request.header.dlen+1)))
708 {Fail(kXR_ArgTooLong, "Request argument too long"); return false;}
709 hcNow = hcPrev; halfBSize = argp->bsize >> 1;
710 }
711
712// Copy the arguments to the buffer
713//
714 memcpy(argp->buff, buffP, buffL);
715 argp->buff[buffL] = 0;
716 return true;
717}
718
719/******************************************************************************/
720/* S e n d */
721/******************************************************************************/
722
723int XrdXrootdTransit::Send(int rcode, const struct iovec *ioV, int ioN, int ioL)
724{
725 XrdXrootd::Bridge::Context rInfo(Link, Request.header.streamid,
726 Request.header.requestid);
727 const char *eMsg;
728 int rc;
729 bool aOK;
730
731// Invoke the result object (we initially assume this is the final result)
732//
733 runDone = true;
734 switch(rcode)
735 {case kXR_error:
736 rc = XRD_GETNUM(ioV[0].iov_base);
737 eMsg = (ioN < 2 ? "" : (const char *)ioV[1].iov_base);
738 if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
739 aOK = respObj->Error(rInfo, rc, eMsg);
740 break;
741 case kXR_ok:
742 if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
743 aOK = (ioN ? respObj->Data(rInfo, ioV, ioN, ioL, true)
744 : respObj->Done(rInfo));
745 break;
746 case kXR_oksofar:
747 aOK = respObj->Data(rInfo, ioV, ioN, ioL, false);
748 runDone = false;
749 break;
750 case kXR_redirect:
751 if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
752 rc = XRD_GETNUM(ioV[0].iov_base);
753 aOK = respObj->Redir(rInfo,rc,(const char *)ioV[1].iov_base);
754 break;
755 case kXR_wait:
756 return Wait(rInfo, ioV, ioN, ioL);
757 break;
758 case kXR_waitresp:
759 runDone = false;
760 return WaitResp(rInfo, ioV, ioN, ioL);
761 break;
762 default: if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
763 aOK = respObj->Error(rInfo, kXR_ServerError,
764 "internal logic error");
765 break;
766 };
767
768// All done
769//
770 return (aOK ? 0 : -1);
771}
772
773/******************************************************************************/
774
775int XrdXrootdTransit::Send(long long offset, int dlen, int fdnum)
776{
777 XrdXrootdTransSend sfInfo(Link, Request.header.streamid,
778 Request.header.requestid,
779 offset, dlen, fdnum);
780
781// Effect callback (this is always a final result)
782//
783 runDone = true;
784 return (respObj->File(sfInfo, dlen) ? 0 : -1);
785}
786
787/******************************************************************************/
788
789int XrdXrootdTransit::Send(XrdOucSFVec *sfvec, int sfvnum, int dlen)
790{
791 XrdXrootdTransSend sfInfo(Link, Request.header.streamid,
792 Request.header.requestid,
793 sfvec, sfvnum, dlen);
794
795// Effect callback (this is always a final result)
796//
797 runDone = true;
798 return (respObj->File(sfInfo, dlen) ? 0 : -1);
799}
800
801/******************************************************************************/
802/* Private: W a i t */
803/******************************************************************************/
804
805int XrdXrootdTransit::Wait(XrdXrootd::Bridge::Context &rInfo,
806 const struct iovec *ioV, int ioN, int ioL)
807{
808 const char *eMsg;
809
810// Trace this request if need be
811//
812 runWait = XRD_GETNUM(ioV[0].iov_base);
813 eMsg = (ioN < 2 ? "reason unknown" : (const char *)ioV[1].iov_base);
814
815// Check if the protocol wants to handle all waits
816//
817 if (runWMax <= 0)
818 {int wtime = runWait;
819 runWait = 0;
820 return (respObj->Wait(rInfo, wtime, eMsg) ? 0 : -1);
821 }
822
823// Check if we have exceeded the maximum wait time
824//
825 if (runWTot >= runWMax)
826 {runDone = true;
827 runWait = 0;
828 return (respObj->Error(rInfo, kXR_Cancelled, eMsg) ? 0 : -1);
829 }
830
831// Readjust wait time
832//
833 if (runWait > runWMax) runWait = runWMax;
834
835// Check if the protocol wants a wait notification
836//
837 if (runWCall && !(respObj->Wait(rInfo, runWait, eMsg))) return -1;
838
839// The process, redrive & recycle rely on a non-zero or positive
840// runWait to indicate wait is scheduled, so make sure that is true.
841//
842 if (runWait <= 0) runWait = 1;
843
844 TRACEP(REQ, "Bridge delaying request " <<runWait <<" sec (" <<eMsg <<")");
845
846// Delay processing (and thus clearing runWait) until the process or redrive
847// loops can detect that we are waiting
848//
849 waitPend = true;
850
851// All done, schedule the wait
852//
853 Sched->Schedule((XrdJob *)&waitJob, time(0)+runWait);
854
855 return 0;
856}
857
858/******************************************************************************/
859/* Private: W a i t R e s p */
860/******************************************************************************/
861
862int XrdXrootdTransit::WaitResp(XrdXrootd::Bridge::Context &rInfo,
863 const struct iovec *ioV, int ioN, int ioL)
864{
865 XrdXrootdTransPend *trP;
866 const char *eMsg;
867 int wTime;
868
869// Trace this request if need be
870//
871 wTime = XRD_GETNUM(ioV[0].iov_base);
872 eMsg = (ioN < 2 ? "reason unknown" : (const char *)ioV[1].iov_base);
873 TRACEP(REQ, "Bridge waiting for resp; sid=" <<rInfo.sID.num
874 <<" wt=" <<wTime <<" (" <<eMsg <<")");
875
876// We would issue callback to see how we should handle this. However, we can't
877// predictably handle a waitresp. So that means we will just wait for a resp.
878//
879// XrdXrootd::Bridge::Result *newCBP = respObj->WaitResp(rInfo, runWait, eMsg);
880
881// Save the current state
882//
883 trP = new XrdXrootdTransPend(Link, this, &Request);
884 trP->Queue();
885
886// Effect a wait
887//
888 runWait = -1;
889
890// Delay processing (and thus clearing runWait) until the process or redrive
891// loops can detect that we are waiting
892//
893 waitPend = true;
894
895 return 0;
896}
@ kXR_ArgInvalid
Definition XProtocol.hh:990
@ kXR_Unsupported
@ kXR_Cancelled
@ kXR_ServerError
@ kXR_ArgTooLong
Definition XProtocol.hh:992
@ kXR_NoMemory
Definition XProtocol.hh:998
kXR_char username[8]
Definition XProtocol.hh:396
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
struct ClientRequestHdr header
Definition XProtocol.hh:846
struct ClientLoginRequest login
Definition XProtocol.hh:857
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_auth
Definition XProtocol.hh:112
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_statx
Definition XProtocol.hh:134
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_prepare
Definition XProtocol.hh:133
@ kXR_asyncap
Definition XProtocol.hh:378
@ kXR_ver002
Definition XProtocol.hh:386
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
#define EMSG(x)
void Fatal(const char *op, const char *target)
Definition XrdCrc32c.cc:58
XrdOucTrace * XrdXrootdTrace
#define eMsg(x)
const int SYS_LOG_01
#define TRACE_MEM
Definition XrdTrace.hh:38
#define TRACE(act, x)
Definition XrdTrace.hh:63
#define XRD_LOGGEDIN
#define TRACEP(act, x)
#define XRD_GETNUM(x)
#define KXR_INDEX(x)
void Release(XrdBuffer *bp)
Definition XrdBuffer.cc:221
XrdBuffer * Obtain(int bsz)
Definition XrdBuffer.cc:140
friend class XrdScheduler
Definition XrdJob.hh:44
XrdJob(const char *desc="")
Definition XrdJob.hh:51
bool isMapped() const
bool isIPType(IPType ipType) const
void Bump(int &val)
static void Sanitize(char *instr, char subc='_')
char prot[XrdSecPROTOIDSIZE]
Auth protocol used (e.g. krb5)
char * name
Entity's name.
char * host
Entity's host name dnr dependent.
static XrdXrootdStats * SI
static XrdSysError & eDest
static unsigned int getSID()
XrdXrootdMonitor::User Monitor
int(XrdXrootdProtocol::* Resume)()
static XrdScheduler * Sched
int Process(XrdLink *lp) override
void Recycle(XrdLink *lp, int consec, const char *reason) override
XrdXrootdResponse Response
static XrdBuffManager * BPool
static XrdSfsFileSystem * osFS
void Set(XrdLink *lp)
static XrdXrootdTransPend * Remove(XrdLink *lP, short sid)
union XrdXrootdTransPend::@371012140333040222300212162025004307132302363251 Pend
XrdXrootdTransit * bridge
static void Clear(XrdXrootdTransit *trP)
bool Run(const char *xreqP, char *xdataP=0, int xdataL=0)
Inject an xrootd request into the protocol stack.
static const char * ReqTable()
Initialize the valid request table.
void Redrive()
Redrive a request after a wait.
int Send(int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle request data response.
void Recycle(XrdLink *lp, int consec, const char *reason)
Handle link shutdown.
static void Init(XrdScheduler *schedP, int qMax, int qTTL)
Perform one-time initialization.
static XrdXrootdTransit * Alloc(XrdXrootd::Bridge::Result *respP, XrdLink *linkP, XrdSecEntity *seceP, const char *nameP, const char *protP)
Get a new transit object.
static int Attn(XrdLink *lP, short *theSID, int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle attention response (i.e. async response)
XrdXrootdTransit()
Constructor & Destructor.
void Proceed()
Resume processing after a waitresp completion.
bool Disc()
Handle dismantlement.
int Process(XrdLink *lp)
Handle link activation (replaces parent activation).
union XrdXrootd::Bridge::Context::@216053020250347016153077031152206173164054152024 sID
associated request stream ID
virtual bool Wait(Bridge::Context &info, int wtime, const char *wtext)
static const int uIPv4
ucap: Supports read redirects
static const int uPrip