source: XIOS/dev/dev_trunk_omp/src/server.cpp @ 1627

Last change on this file since 1627 was 1601, checked in by yushan, 6 years ago

branch_openmp merged with trunk r1597

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 33.0 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "event_scheduler.hpp"
16#include "string_tools.hpp"
17using namespace ep_lib;
18
19namespace xios
20{
21    MPI_Comm CServer::intraComm ;
22    std::list<MPI_Comm> CServer::interCommLeft ;
23    std::list<MPI_Comm> CServer::interCommRight ;
24    std::list<MPI_Comm> CServer::contextInterComms;
25    std::list<MPI_Comm> CServer::contextIntraComms;
26    int CServer::serverLevel = 0 ;
27    int CServer::nbContexts = 0;
28    bool CServer::isRoot = false ;
29    int CServer::rank_ = INVALID_RANK;
30    StdOFStream CServer::m_infoStream;
31    StdOFStream CServer::m_errorStream;
32    map<string,CContext*> CServer::contextList ;
33    vector<int> CServer::sndServerGlobalRanks;
34    bool CServer::finished=false ;
35    bool CServer::is_MPI_Initialized ;
36    CEventScheduler* CServer::eventScheduler = 0;
37
38//---------------------------------------------------------------
39/*!
40 * \fn void CServer::initialize(void)
41 * Creates intraComm for each possible type of servers (classical, primary or secondary).
42 * Creates interComm and stores them into the following lists:
43 *   classical server -- interCommLeft
44 *   primary server -- interCommLeft and interCommRight
45 *   secondary server -- interCommLeft for each pool.
46 *   IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead.
47 */
48    void CServer::initialize(void)
49    {
50      //int initialized ;
51      //MPI_Initialized(&initialized) ;
52      //if (initialized) is_MPI_Initialized=true ;
53      //else is_MPI_Initialized=false ;
54      int rank ;
55
56      // Not using OASIS
57      if (!CXios::usingOasis)
58      {
59
60        //if (!is_MPI_Initialized)
61        //{
62        //  MPI_Init(NULL, NULL);
63        //}
64        CTimer::get("XIOS").resume() ;
65
66        boost::hash<string> hashString ;
67        unsigned long hashServer = hashString(CXios::xiosCodeId);
68
69        unsigned long* hashAll ;
70        unsigned long* srvLevelAll ;
71
72        int size ;
73        int myColor ;
74        int i,c ;
75        MPI_Comm newComm;
76
77        MPI_Comm_size(CXios::globalComm, &size) ;
78        MPI_Comm_rank(CXios::globalComm, &rank_);
79
80        hashAll=new unsigned long[size] ;
81        MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ;
82
83        map<unsigned long, int> colors ;
84        map<unsigned long, int> leaders ;
85        map<unsigned long, int>::iterator it ;
86
87        // (1) Establish client leaders, distribute processes between two server levels
88        std::vector<int> srvRanks;
89        for(i=0,c=0;i<size;i++)
90        {
91          if (colors.find(hashAll[i])==colors.end())
92          {
93            colors[hashAll[i]]=c ;
94            leaders[hashAll[i]]=i ;
95            c++ ;
96          }
97          if (CXios::usingServer2)
98            if (hashAll[i] == hashServer)
99              srvRanks.push_back(i);
100        }
101
102        if (CXios::usingServer2)
103        {
104          int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.;
105          if (reqNbProc<1 || reqNbProc==srvRanks.size())
106          {
107            error(0)<<"WARNING: void CServer::initialize(void)"<<endl
108                << "It is impossible to dedicate the requested number of processes = "<<reqNbProc
109                <<" to secondary server. XIOS will run in the classical server mode."<<endl;
110          }
111          else
112          {
113            if (CXios::nbPoolsServer2 == 0) CXios::nbPoolsServer2 = reqNbProc;
114            int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ;
115            int poolLeader = firstSndSrvRank;
116//*********** (1) Comment out the line below to set one process per pool
117            sndServerGlobalRanks.push_back(srvRanks[poolLeader]);
118            int nbPools = CXios::nbPoolsServer2;
119            if ( nbPools > reqNbProc || nbPools < 1)
120            {
121              error(0)<<"WARNING: void CServer::initialize(void)"<<endl
122                  << "It is impossible to allocate the requested number of pools = "<<nbPools
123                  <<" on the secondary server. It will be set so that there is one process per pool."<<endl;
124              nbPools = reqNbProc;
125            }
126            int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools;
127            int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools;
128            for (i=0; i<srvRanks.size(); i++)
129            {
130              if (i >= firstSndSrvRank)
131              {
132                if (rank_ == srvRanks[i])
133                {
134                  serverLevel=2;
135                }
136                poolLeader += procsPerPool;
137                if (remainder != 0)
138                {
139                  ++poolLeader;
140                  --remainder;
141                }
142//*********** (2) Comment out the two lines below to set one process per pool
143                if (poolLeader < srvRanks.size())
144                  sndServerGlobalRanks.push_back(srvRanks[poolLeader]);
145//*********** (3) Uncomment the line below to set one process per pool
146//                sndServerGlobalRanks.push_back(srvRanks[i]);
147              }
148              else
149              {
150                if (rank_ == srvRanks[i]) serverLevel=1;
151              }
152            }
153            if (serverLevel==2)
154            {
155              #pragma omp critical (_output)
156              info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;
157              for (i=0; i<sndServerGlobalRanks.size(); i++)
158              {
159                if (rank_>= sndServerGlobalRanks[i])
160                {
161                  if ( i == sndServerGlobalRanks.size()-1)
162                  {
163                    myColor = colors.size() + sndServerGlobalRanks[i];
164                  }
165                  else if (rank_< sndServerGlobalRanks[i+1])
166                  {
167                    myColor = colors.size() + sndServerGlobalRanks[i];
168                    break;
169                  }
170                }
171              }
172            }
173          }
174        }
175
176        // (2) Create intraComm
177        if (serverLevel != 2) myColor=colors[hashServer];
178        MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ;
179
180        // (3) Create interComm
181        if (serverLevel == 0)
182        {
183          int clientLeader;
184          for(it=leaders.begin();it!=leaders.end();it++)
185          {
186            if (it->first!=hashServer)
187            {
188              clientLeader=it->second ;
189              int intraCommSize, intraCommRank ;
190              MPI_Comm_size(intraComm,&intraCommSize) ;
191              MPI_Comm_rank(intraComm,&intraCommRank) ;
192
193              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
194              #pragma omp critical (_output)
195              {
196                info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize
197                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
198              }
199             
200              interCommLeft.push_back(newComm) ;
201            }
202          }
203        }
204        else if (serverLevel == 1)
205        {
206          int clientLeader, srvSndLeader;
207          int srvPrmLeader ;
208
209          for (it=leaders.begin();it!=leaders.end();it++)
210          {
211            if (it->first != hashServer)
212            {
213              clientLeader=it->second ;
214              int intraCommSize, intraCommRank ;
215              MPI_Comm_size(intraComm, &intraCommSize) ;
216              MPI_Comm_rank(intraComm, &intraCommRank) ;
217             
218              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
219              #pragma omp critical (_output)
220              {
221                info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize
222                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
223              }
224              interCommLeft.push_back(newComm) ;
225            }
226          }
227
228          for (int i = 0; i < sndServerGlobalRanks.size(); ++i)
229          {
230            int intraCommSize, intraCommRank ;
231            MPI_Comm_size(intraComm, &intraCommSize) ;
232            MPI_Comm_rank(intraComm, &intraCommRank) ;
233
234            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ;
235            #pragma omp critical (_output)
236            {
237              info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize
238                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< sndServerGlobalRanks[i]<<endl ;
239            }
240            interCommRight.push_back(newComm) ;
241          }
242        }
243        else
244        {
245          int clientLeader;
246          clientLeader = leaders[hashString(CXios::xiosCodeId)];
247          int intraCommSize, intraCommRank ;
248          MPI_Comm_size(intraComm, &intraCommSize) ;
249          MPI_Comm_rank(intraComm, &intraCommRank) ;
250
251          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ;
252          #pragma omp critical (_output)
253          {
254            info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize
255                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
256          }
257
258          interCommLeft.push_back(newComm) ;
259        }
260
261        delete [] hashAll ;
262
263      }
264      // using OASIS
265      else
266      {
267        int size;
268        int myColor;
269        int* srvGlobalRanks;
270        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
271
272        CTimer::get("XIOS").resume() ;
273        MPI_Comm localComm;
274        oasis_get_localcomm(localComm);
275        MPI_Comm_rank(localComm,&rank_) ;
276
277//      (1) Create server intraComm
278        if (!CXios::usingServer2)
279        {
280          MPI_Comm_dup(localComm, &intraComm);
281        }
282        else
283        {
284          int globalRank;
285          MPI_Comm_size(localComm,&size) ;
286          MPI_Comm_rank(CXios::globalComm,&globalRank) ;
287          srvGlobalRanks = new int[size] ;
288          MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ;
289
290          int reqNbProc = size*CXios::ratioServer2/100.;
291          if (reqNbProc < 1 || reqNbProc == size)
292          {
293            error(0)<<"WARNING: void CServer::initialize(void)"<<endl
294                << "It is impossible to dedicate the requested number of processes = "<<reqNbProc
295                <<" to secondary server. XIOS will run in the classical server mode."<<endl;
296            MPI_Comm_dup(localComm, &intraComm);
297          }
298          else
299          {
300            int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ;
301            int poolLeader = firstSndSrvRank;
302//*********** (1) Comment out the line below to set one process per pool
303//            sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);
304            int nbPools = CXios::nbPoolsServer2;
305            if ( nbPools > reqNbProc || nbPools < 1)
306            {
307              error(0)<<"WARNING: void CServer::initialize(void)"<<endl
308                  << "It is impossible to allocate the requested number of pools = "<<nbPools
309                  <<" on the secondary server. It will be set so that there is one process per pool."<<endl;
310              nbPools = reqNbProc;
311            }
312            int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools;
313            int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools;
314            for (int i=0; i<size; i++)
315            {
316              if (i >= firstSndSrvRank)
317              {
318                if (globalRank == srvGlobalRanks[i])
319                {
320                  serverLevel=2;
321                }
322                poolLeader += procsPerPool;
323                if (remainder != 0)
324                {
325                  ++poolLeader;
326                  --remainder;
327                }
328//*********** (2) Comment out the two lines below to set one process per pool
329//                if (poolLeader < size)
330//                  sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);
331//*********** (3) Uncomment the line below to set one process per pool
332                sndServerGlobalRanks.push_back(srvGlobalRanks[i]);
333              }
334              else
335              {
336                if (globalRank == srvGlobalRanks[i]) serverLevel=1;
337              }
338            }
339            if (serverLevel==2)
340            {
341              info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;
342              for (int i=0; i<sndServerGlobalRanks.size(); i++)
343              {
344                if (globalRank>= sndServerGlobalRanks[i])
345                {
346                  if (i == sndServerGlobalRanks.size()-1)
347                  {
348                    myColor = sndServerGlobalRanks[i];
349                  }
350                  else if (globalRank< sndServerGlobalRanks[i+1])
351                  {
352                    myColor = sndServerGlobalRanks[i];
353                    break;
354                  }
355                }
356              }
357            }
358            if (serverLevel != 2) myColor=0;
359            MPI_Comm_split(localComm, myColor, rank_, &intraComm) ;
360          }
361        }
362
363        string codesId=CXios::getin<string>("oasis_codes_id") ;
364        vector<string> oasisCodeId=splitRegex(codesId,"\\s*,\\s*") ;
365 
366        vector<string>::iterator it ;
367
368        MPI_Comm newComm ;
369        int globalRank ;
370        MPI_Comm_rank(CXios::globalComm,&globalRank);
371
372//      (2) Create interComms with models
373        for(it=oasisCodeId.begin();it!=oasisCodeId.end();it++)
374        {
375          oasis_get_intercomm(newComm,*it) ;
376          if ( serverLevel == 0 || serverLevel == 1)
377          {
378            interCommLeft.push_back(newComm) ;
379            if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
380          }
381        }
382
383//      (3) Create interComms between primary and secondary servers
384        int intraCommSize, intraCommRank ;
385        MPI_Comm_size(intraComm,&intraCommSize) ;
386        MPI_Comm_rank(intraComm, &intraCommRank) ;
387
388        if (serverLevel == 1)
389        {
390          for (int i = 0; i < sndServerGlobalRanks.size(); ++i)
391          {
392            int srvSndLeader = sndServerGlobalRanks[i];
393            info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize
394                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvSndLeader<<endl ;
395            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ;
396            interCommRight.push_back(newComm) ;
397          }
398        }
399        else if (serverLevel == 2)
400        {
401          info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize
402                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvGlobalRanks[0] <<endl ;
403          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ;
404          interCommLeft.push_back(newComm) ;
405        }
406        if (CXios::usingServer2) delete [] srvGlobalRanks ;
407
408        bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ;
409        if (!oasisEnddef) oasis_enddef() ;
410      }
411
412
413      MPI_Comm_rank(intraComm, &rank) ;
414      if (rank==0) isRoot=true;
415      else isRoot=false;
416     
417      eventScheduler = new CEventScheduler(intraComm) ;
418    }
419
420    void CServer::finalize(void)
421    {
422      CTimer::get("XIOS").suspend() ;
423     
424      delete eventScheduler ;
425
426      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
427        MPI_Comm_free(&(*it));
428
429      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
430        MPI_Comm_free(&(*it));
431
432//      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
433//        MPI_Comm_free(&(*it));
434
435//        for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)
436//          MPI_Comm_free(&(*it));
437
438        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
439          MPI_Comm_free(&(*it));
440
441      MPI_Comm_free(&intraComm);
442
443      if (!is_MPI_Initialized)
444      {
445        if (CXios::usingOasis) oasis_finalize();
446        //else MPI_Finalize() ;
447      }
448
449      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
450      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
451      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
452      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
453    }
454
455     void CServer::eventLoop(void)
456     {
457       bool stop=false ;
458
459       CTimer::get("XIOS server").resume() ;
460       while(!stop)
461       {
462         if (isRoot)
463         {
464           listenContext();
465           listenRootContext();
466           listenOasisEnddef() ;
467           listenRootOasisEnddef() ;
468           if (!finished) listenFinalize() ;
469         }
470         else
471         {
472           listenRootContext();
473           listenRootOasisEnddef() ;
474           if (!finished) listenRootFinalize() ;
475         }
476
477         contextEventLoop() ;
478         if (finished && contextList.empty()) stop=true ;
479         eventScheduler->checkEvent() ;
480       }
481       CTimer::get("XIOS server").suspend() ;
482     }
483
484     void CServer::listenFinalize(void)
485     {
486        list<MPI_Comm>::iterator it, itr;
487        int msg ;
488        int flag ;
489
490        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
491        {
492           MPI_Status status ;
493           traceOff() ;
494           MPI_Iprobe(0,0,*it,&flag,&status) ;
495           traceOn() ;
496           if (flag==true)
497           {
498              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
499              info(20)<<" CServer : Receive client finalize"<<endl ;
500              // Sending server finalize message to secondary servers (if any)
501              for(itr=interCommRight.begin();itr!=interCommRight.end();itr++)
502              {
503                MPI_Send(&msg,1,MPI_INT,0,0,*itr) ;
504              }
505              MPI_Comm_free(&(*it));
506              interCommLeft.erase(it) ;
507              break ;
508            }
509         }
510
511         if (interCommLeft.empty())
512         {
513           int i,size ;
514           MPI_Comm_size(intraComm,&size) ;
515           MPI_Request* requests= new MPI_Request[size-1] ;
516           MPI_Status* status= new MPI_Status[size-1] ;
517
518           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
519           MPI_Waitall(size-1,requests,status) ;
520
521           finished=true ;
522           delete [] requests ;
523           delete [] status ;
524         }
525     }
526
527
528     void CServer::listenRootFinalize()
529     {
530        int flag ;
531        MPI_Status status ;
532        int msg ;
533
534        traceOff() ;
535        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
536        traceOn() ;
537        if (flag==true)
538        {
539           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
540           finished=true ;
541        }
542      }
543
544
545   /*!
546    * Root process is listening for an order sent by client to call "oasis_enddef".
547    * The root client of a compound send the order (tag 5). It is probed and received.
548    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
549    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
550    */
551   
552     void CServer::listenOasisEnddef(void)
553     {
554        int flag ;
555        MPI_Status status ;
556        list<MPI_Comm>::iterator it;
557        int msg ;
558        static int nbCompound=0 ;
559        int size ;
560        static bool sent=false ;
561        static MPI_Request* allRequests ;
562        static MPI_Status* allStatus ;
563
564
565        if (sent)
566        {
567          MPI_Comm_size(intraComm,&size) ;
568          MPI_Testall(size,allRequests, &flag, allStatus) ;
569          if (flag==true)
570          {
571            delete [] allRequests ;
572            delete [] allStatus ;
573            sent=false ;
574          }
575        }
576       
577
578        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
579        {
580           MPI_Status status ;
581           traceOff() ;
582           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
583           traceOn() ;
584           if (flag==true)
585           {
586              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
587              nbCompound++ ;
588              if (nbCompound==interCommLeft.size())
589              {
590                for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
591                {
592                   MPI_Send(&msg,1,MPI_INT,0,5,*it) ; // tags oasis_endded = 5
593                }
594                MPI_Comm_size(intraComm,&size) ;
595                allRequests= new MPI_Request[size] ;
596                allStatus= new MPI_Status[size] ;
597                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm,&allRequests[i]) ; // tags oasis_endded = 5
598                sent=true ;
599              }
600           }
601        }
602     }
603     
604   /*!
605    * Processes probes message from root process if oasis_enddef call must be done.
606    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
607    */
608     void CServer::listenRootOasisEnddef(void)
609     {
610       int flag ;
611       MPI_Status status ;
612       const int root=0 ;
613       int msg ;
614       static bool eventSent=false ;
615
616       if (eventSent)
617       {
618         boost::hash<string> hashString;
619         size_t hashId = hashString("oasis_enddef");
620         if (eventScheduler->queryEvent(0,hashId))
621         {
622           oasis_enddef() ;
623           eventSent=false ;
624         }
625       }
626         
627       traceOff() ;
628       MPI_Iprobe(root,5,intraComm, &flag, &status) ;
629       traceOn() ;
630       if (flag==true)
631       {
632         MPI_Recv(&msg,1,MPI_INT,root,5,intraComm,&status) ; // tags oasis_endded = 5
633         boost::hash<string> hashString;
634         size_t hashId = hashString("oasis_enddef");
635         eventScheduler->registerEvent(0,hashId);
636         eventSent=true ;
637       }
638     }
639
640
641
642     
643
644     void CServer::listenContext(void)
645     {
646
647       MPI_Status status ;
648       int flag ;
649       static char* buffer ;
650       static MPI_Request request ;
651       static bool recept=false ;
652       int rank ;
653       int count ;
654
655       if (recept==false)
656       {
657         traceOff() ;
658         MPI_Iprobe(-2,1,CXios::globalComm, &flag, &status) ;
659         traceOn() ;
660         if (flag==true)
661         {
662           #ifdef _usingMPI
663           rank=status.MPI_SOURCE ;
664           #elif _usingEP
665           rank=status.ep_src;
666           #endif
667           MPI_Get_count(&status,MPI_CHAR,&count) ;
668           buffer=new char[count] ;
669           MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
670           recept=true ;
671         }
672       }
673       else
674       {
675         traceOff() ;
676         MPI_Test(&request,&flag,&status) ;
677         traceOn() ;
678         if (flag==true)
679         {
680           #ifdef _usingMPI
681           rank=status.MPI_SOURCE ;
682           #elif _usingEP
683           rank=status.ep_src;
684           #endif
685           MPI_Get_count(&status,MPI_CHAR,&count) ;
686           recvContextMessage((void*)buffer,count) ;
687           delete [] buffer ;
688           recept=false ;
689         }
690       }
691     }
692
693     void CServer::recvContextMessage(void* buff,int count)
694     {
695       static map<string,contextMessage> recvContextId;
696       map<string,contextMessage>::iterator it ;
697       CBufferIn buffer(buff,count) ;
698       string id ;
699       int clientLeader ;
700       int nbMessage ;
701
702       buffer>>id>>nbMessage>>clientLeader ;
703
704       it=recvContextId.find(id) ;
705       if (it==recvContextId.end())
706       {
707         contextMessage msg={0,0} ;
708         pair<map<string,contextMessage>::iterator,bool> ret ;
709         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
710         it=ret.first ;
711       }
712       it->second.nbRecv+=1 ;
713       it->second.leaderRank+=clientLeader ;
714
715       if (it->second.nbRecv==nbMessage)
716       {
717         int size ;
718         MPI_Comm_size(intraComm,&size) ;
719//         MPI_Request* requests= new MPI_Request[size-1] ;
720//         MPI_Status* status= new MPI_Status[size-1] ;
721         MPI_Request* requests= new MPI_Request[size] ;
722         MPI_Status* status= new MPI_Status[size] ;
723
724         CMessage msg ;
725         msg<<id<<it->second.leaderRank;
726         int messageSize=msg.size() ;
727         void * sendBuff = new char[messageSize] ;
728         CBufferOut sendBuffer(sendBuff,messageSize) ;
729         sendBuffer<<msg ;
730
731         // Include root itself in order not to have a divergence
732         for(int i=0; i<size; i++)
733         {
734           MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ;
735         }
736
737         recvContextId.erase(it) ;
738         delete [] requests ;
739         delete [] status ;
740
741       }
742     }
743
744     void CServer::listenRootContext(void)
745     {
746       MPI_Status status ;
747       int flag ;
748       static std::vector<void*> buffers;
749       static std::vector<MPI_Request> requests ;
750       static std::vector<int> counts ;
751       static std::vector<bool> isEventRegistered ;
752       static std::vector<bool> isEventQueued ;
753       MPI_Request request;
754
755       int rank ;
756       const int root=0 ;
757       boost::hash<string> hashString;
758       size_t hashId = hashString("RegisterContext");
759
760       // (1) Receive context id from the root, save it into a buffer
761       traceOff() ;
762       MPI_Iprobe(root,2,intraComm, &flag, &status) ;
763       traceOn() ;
764       if (flag==true)
765       {
766         counts.push_back(0);
767         MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ;
768         buffers.push_back(new char[counts.back()]) ;
769         MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&request) ;
770         requests.push_back(request);
771         isEventRegistered.push_back(false);
772         isEventQueued.push_back(false);
773         nbContexts++;
774       }
775
776       for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ )
777       {
778         // (2) If context id is received, register an event
779         if(!isEventRegistered[ctxNb]) MPI_Test(&requests[ctxNb],&flag,&status) ;
780         if (flag==true && !isEventRegistered[ctxNb])
781         {
782           eventScheduler->registerEvent(ctxNb,hashId);
783           isEventRegistered[ctxNb] = true;
784         }
785         // (3) If event has been scheduled, call register context
786         if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb])
787         {
788           registerContext(buffers[ctxNb],counts[ctxNb]) ;
789           isEventQueued[ctxNb] = true;
790           delete [] buffers[ctxNb] ;
791         }
792       }
793
794     }
795
796     void CServer::registerContext(void* buff, int count, int leaderRank)
797     {
798       string contextId;
799       CBufferIn buffer(buff, count);
800//       buffer >> contextId;
801       buffer >> contextId>>leaderRank;
802       CContext* context;
803
804       info(20) << "CServer : Register new Context : " << contextId << endl;
805
806       if (contextList.find(contextId) != contextList.end())
807         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
808               << "Context '" << contextId << "' has already been registred");
809
810       context=CContext::create(contextId);
811       contextList[contextId]=context;
812
813       // Primary or classical server: create communication channel with a client
814       // (1) create interComm (with a client)
815       // (2) initialize client and server (contextClient and contextServer)
816       MPI_Comm inter;
817       if (serverLevel < 2)
818       {
819         MPI_Comm contextInterComm;
820         MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm);
821         MPI_Intercomm_merge(contextInterComm,1,&inter);
822         MPI_Barrier(inter);
823         context->initServer(intraComm,contextInterComm);
824         contextInterComms.push_back(contextInterComm);
825
826         MPI_Comm_free(&inter);
827       }
828       // Secondary server: create communication channel with a primary server
829       // (1) duplicate interComm with a primary server
830       // (2) initialize client and server (contextClient and contextServer)
831       // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create,
832       //         because interComm of CContext is defined on the same processes as the interComm of CServer.
833       //         So just duplicate it.
834       else if (serverLevel == 2)
835       {
836         MPI_Comm_dup(interCommLeft.front(), &inter);
837         contextInterComms.push_back(inter);
838         context->initServer(intraComm, contextInterComms.back());
839       }
840
841       // Primary server:
842       // (1) send create context message to secondary servers
843       // (2) initialize communication channels with secondary servers (create contextClient and contextServer)
844       if (serverLevel == 1)
845       {
846         int i = 0, size;
847         MPI_Comm_size(intraComm, &size) ;
848         for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i)
849         {
850           StdString str = contextId +"_server_" + boost::lexical_cast<string>(i);
851           CMessage msg;
852           int messageSize;
853           msg<<str<<size<<rank_ ;
854           messageSize = msg.size() ;
855           buff = new char[messageSize] ;
856           CBufferOut buffer(buff,messageSize) ;
857           buffer<<msg ;
858           MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ;
859           MPI_Comm_dup(*it, &inter);
860           contextInterComms.push_back(inter);
861           MPI_Comm_dup(intraComm, &inter);
862           contextIntraComms.push_back(inter);
863           context->initClient(contextIntraComms.back(), contextInterComms.back()) ;
864           delete [] buff ;
865         }
866       }
867     }
868
869     void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/)
870     {
871       bool isFinalized ;
872       map<string,CContext*>::iterator it ;
873
874       for(it=contextList.begin();it!=contextList.end();it++)
875       {
876         isFinalized=it->second->isFinalized();
877         if (isFinalized)
878         {
879           contextList.erase(it) ;
880           break ;
881         }
882         else
883           it->second->checkBuffersAndListen(enableEventsProcessing);
884       }
885     }
886
887     //! Get rank of the current process in the intraComm
888     int CServer::getRank()
889     {
890       int rank;
891       MPI_Comm_rank(intraComm,&rank);
892       return rank;
893     }
894
895     vector<int>& CServer::getSecondaryServerGlobalRanks()
896     {
897       return sndServerGlobalRanks;
898     }
899
900    /*!
901    * Open a file specified by a suffix and an extension and use it for the given file buffer.
902    * The file name will be suffix+rank+extension.
903    *
904    * \param fileName[in] protype file name
905    * \param ext [in] extension of the file
906    * \param fb [in/out] the file buffer
907    */
908    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
909    {
910      StdStringStream fileNameClient;
911      int numDigit = 0;
912      int size = 0;
913      int id;
914      MPI_Comm_size(CXios::globalComm, &size);
915      while (size)
916      {
917        size /= 10;
918        ++numDigit;
919      }
920      id = rank_; //getRank();
921
922      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
923      fb->open(fileNameClient.str().c_str(), std::ios::out);
924      if (!fb->is_open())
925        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
926              << std::endl << "Can not open <" << fileNameClient.str() << "> file to write the server log(s).");
927    }
928
929    /*!
930    * \brief Open a file stream to write the info logs
931    * Open a file stream with a specific file name suffix+rank
932    * to write the info logs.
933    * \param fileName [in] protype file name
934    */
935    void CServer::openInfoStream(const StdString& fileName)
936    {
937      std::filebuf* fb = m_infoStream.rdbuf();
938      openStream(fileName, ".out", fb);
939
940      info.write2File(fb);
941      report.write2File(fb);
942    }
943
944    //! Write the info logs to standard output
945    void CServer::openInfoStream()
946    {
947      info.write2StdOut();
948      report.write2StdOut();
949    }
950
951    //! Close the info logs file if it opens
952    void CServer::closeInfoStream()
953    {
954      if (m_infoStream.is_open()) m_infoStream.close();
955    }
956
957    /*!
958    * \brief Open a file stream to write the error log
959    * Open a file stream with a specific file name suffix+rank
960    * to write the error log.
961    * \param fileName [in] protype file name
962    */
963    void CServer::openErrorStream(const StdString& fileName)
964    {
965      std::filebuf* fb = m_errorStream.rdbuf();
966      openStream(fileName, ".err", fb);
967
968      error.write2File(fb);
969    }
970
971    //! Write the error log to standard error output
972    void CServer::openErrorStream()
973    {
974      error.write2StdErr();
975    }
976
977    //! Close the error log file if it opens
978    void CServer::closeErrorStream()
979    {
980      if (m_errorStream.is_open()) m_errorStream.close();
981    }
982}
Note: See TracBrowser for help on using the repository browser.