source: XIOS/dev/dev_olga/src/server.cpp @ 1148

Last change on this file since 1148 was 1148, checked in by oabramkina, 7 years ago

Fixing a bug in context initialization. Now register context is scheduled by event scheduler.
Tests on Curie: test_complete and test_xios2_cmip6.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 23.5 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "server.hpp"
[983]5#include "client.hpp"
[300]6#include "type.hpp"
7#include "context.hpp"
[352]8#include "object_template.hpp"
[300]9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[492]15#include "event_scheduler.hpp"
[300]16
[335]17namespace xios
[490]18{
[300]19    MPI_Comm CServer::intraComm ;
[992]20    list<MPI_Comm> CServer::interCommLeft ;
21    list<MPI_Comm> CServer::interCommRight ;
[1054]22//    list<MPI_Comm> CServer::interComm ;
[655]23    std::list<MPI_Comm> CServer::contextInterComms;
[1071]24    std::list<MPI_Comm> CServer::contextIntraComms;
[1021]25    int CServer::serverLevel = 0 ;
[1071]26    int CServer::serverLeader_ = 0;
27    int CServer::serverSize_ = 0;
[1021]28    int CServer::nbPools = 0;
29    int CServer::poolId = 0;
[1148]30    int CServer::nbContexts = 0;
[983]31    bool CServer::isRoot = false ;
[1077]32    int CServer::rank_ = INVALID_RANK;
[490]33    StdOFStream CServer::m_infoStream;
[523]34    StdOFStream CServer::m_errorStream;
[490]35    map<string,CContext*> CServer::contextList ;
[300]36    bool CServer::finished=false ;
37    bool CServer::is_MPI_Initialized ;
[597]38    CEventScheduler* CServer::eventScheduler = 0;
[983]39
40//---------------------------------------------------------------
41/*!
42 * \fn void CServer::initialize(void)
[1054]43 * Creates intraComm for each possible type of servers (classical, primary or secondary).
44 * In case of secondary servers intraComm is created for each secondary server pool.
45 * (For now the assumption is that there is one proc per pool.)
[1148]46 * Creates interComm and stores them into the following lists:
[1054]47 *   classical server -- interCommLeft
48 *   primary server -- interCommLeft and interCommRight
[1148]49 *   secondary server -- interCommLeft for each pool.
[983]50 */
[300]51    void CServer::initialize(void)
52    {
53      int initialized ;
54      MPI_Initialized(&initialized) ;
55      if (initialized) is_MPI_Initialized=true ;
56      else is_MPI_Initialized=false ;
[490]57
[300]58      // Not using OASIS
59      if (!CXios::usingOasis)
60      {
[490]61
62        if (!is_MPI_Initialized)
[300]63        {
[925]64          MPI_Init(NULL, NULL);
[300]65        }
[359]66        CTimer::get("XIOS").resume() ;
[490]67
68        boost::hash<string> hashString ;
[1021]69        unsigned long hashServer = hashString(CXios::xiosCodeId);
[490]70
[300]71        unsigned long* hashAll ;
[490]72
73//        int rank ;
[300]74        int size ;
75        int myColor ;
76        int i,c ;
[1021]77        MPI_Comm newComm, serversComm;
[490]78
[983]79        MPI_Comm_size(CXios::globalComm, &size) ;
[1077]80        MPI_Comm_rank(CXios::globalComm, &rank_);
[1009]81
[300]82        hashAll=new unsigned long[size] ;
[983]83        MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ;
[490]84
[1021]85        map<unsigned long, int> colors ;
[300]86        map<unsigned long, int> leaders ;
87        map<unsigned long, int>::iterator it ;
[490]88
[1142]89        int nbSrv = 0;
[300]90        for(i=0,c=0;i<size;i++)
91        {
92          if (colors.find(hashAll[i])==colors.end())
93          {
94            colors[hashAll[i]]=c ;
95            leaders[hashAll[i]]=i ;
96            c++ ;
97          }
[1148]98          if (hashAll[i] == hashServer) ++serverSize_;
[300]99        }
[490]100
[1021]101        // Setting the number of secondary pools
102        myColor = colors[hashServer];
103        if (CXios::usingServer2)
[1009]104        {
[1077]105          int serverRank = rank_ - leaders[hashServer]; // server proc rank starting 0
[1071]106          nbPools = serverSize_ * CXios::ratioServer2 / 100;
107          if ( serverRank < (serverSize_ - nbPools) )
[1009]108          {
[1021]109            serverLevel = 1;
[1009]110          }
[1021]111          else
[1009]112          {
[1021]113            serverLevel = 2;
[1071]114            poolId = serverRank - serverSize_ + nbPools;
[1142]115            myColor = rank_ + size; // + size to make sure that myColor is unique among not only servers but also clients. It's only a temporary solution
[1009]116          }
117        }
118
[1077]119        MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ;
[1009]120
[1021]121        if (serverLevel == 0)
[983]122        {
123          int clientLeader;
124          for(it=leaders.begin();it!=leaders.end();it++)
125          {
126            if (it->first!=hashServer)
127            {
128              clientLeader=it->second ;
129              int intraCommSize, intraCommRank ;
130              MPI_Comm_size(intraComm,&intraCommSize) ;
131              MPI_Comm_rank(intraComm,&intraCommRank) ;
[1142]132              info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize
[983]133                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
[490]134
[1009]135              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
[992]136               interCommLeft.push_back(newComm) ;
[983]137            }
138          }
139        }
[1021]140        else if (serverLevel == 1)
[983]141        {
[1054]142          int clientLeader, srvSndLeader;
143          int srvPrmLeader ;
[1021]144          for (it=leaders.begin();it!=leaders.end();it++)
[983]145          {
[1021]146            if (it->first != hashServer)
[983]147            {
[1021]148              clientLeader=it->second ;
149              int intraCommSize, intraCommRank ;
150              MPI_Comm_size(intraComm, &intraCommSize) ;
151              MPI_Comm_rank(intraComm, &intraCommRank) ;
[1142]152              info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize
[1021]153                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
154              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
155              interCommLeft.push_back(newComm) ;
[983]156            }
157            else
[1071]158              serverLeader_ = it->second;
[1021]159          }
[1009]160
[1021]161          for (int i = 0; i < nbPools; ++i)
[983]162          {
[1071]163            srvSndLeader = serverLeader_ + serverSize_ - nbPools + i;
[1054]164            int intraCommSize, intraCommRank ;
165            MPI_Comm_size(intraComm, &intraCommSize) ;
166            MPI_Comm_rank(intraComm, &intraCommRank) ;
[1142]167            info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize
[1054]168                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvSndLeader<<endl ;
[1142]169            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 1, &newComm) ;
[1054]170            interCommRight.push_back(newComm) ;
[1021]171          }
172        } // primary server
173        else
174        {
175          int clientLeader;
176          clientLeader = leaders[hashString(CXios::xiosCodeId)];
177          int intraCommSize, intraCommRank ;
178          MPI_Comm_size(intraComm, &intraCommSize) ;
179          MPI_Comm_rank(intraComm, &intraCommRank) ;
[1142]180          info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize
[1021]181                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
[983]182
[1142]183          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ;
[1021]184          interCommLeft.push_back(newComm) ;
185        } // secondary server
[983]186
187        delete [] hashAll ;
188
[300]189      }
190      // using OASIS
191      else
192      {
[1130]193        int size, rank;
194        int myColor;
[490]195        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
196
[359]197        CTimer::get("XIOS").resume() ;
[655]198        MPI_Comm localComm;
199        oasis_get_localcomm(localComm);
200
[1130]201        // Create server intraComm
202        if (!CXios::usingServer2)
203          MPI_Comm_dup(localComm, &intraComm);
204        else
205        {
206          MPI_Comm_rank(localComm,&rank) ;
207          MPI_Comm_size(localComm,&serverSize_) ;
208          nbPools = serverSize_ * CXios::ratioServer2 / 100;
209          if ( rank < (serverSize_ - nbPools) )
210          {
211            serverLevel = 1;
212            myColor = 0;
213          }
214          else
215          {
216            serverLevel = 2;
217            poolId = rank - serverSize_ + nbPools;
218            myColor = rank;
219          }
220          MPI_Comm_split(localComm, myColor, rank, &intraComm) ;
221
222        }
[1077]223        MPI_Comm_rank(intraComm,&rank_) ;
[300]224        MPI_Comm_size(intraComm,&size) ;
[1130]225
[300]226        string codesId=CXios::getin<string>("oasis_codes_id") ;
[490]227
[300]228        vector<string> splitted ;
[483]229        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
[300]230        vector<string>::iterator it ;
231
232        MPI_Comm newComm ;
233        int globalRank ;
234        MPI_Comm_rank(CXios::globalComm,&globalRank);
[490]235
[300]236        for(it=splitted.begin();it!=splitted.end();it++)
237        {
238          oasis_get_intercomm(newComm,*it) ;
[1130]239//        interComm.push_back(newComm) ;
240          if ( !CXios::usingServer2)
241            interCommLeft.push_back(newComm) ;
242          else
243          {
244            if (serverLevel == 1)
245            {
246              info(50)<<"intercommCreate::server "<<rank_<<" intraCommSize : "<<size
247                       <<" intraCommRank :"<<rank_<<"  clientLeader "<< rank<<endl ;
248              MPI_Intercomm_create(intraComm, 0, localComm, rank, 0, &newComm) ;
249              interCommRight.push_back(newComm) ;
250
251            }
252            else if (serverLevel == 2)
253            {
254              info(50)<<"intercommCreate::server "<<rank_<<" intraCommSize : "<<size
255                       <<" intraCommRank :"<<rank_<<"  clientLeader "<< 0<<endl ;
256              MPI_Intercomm_create(intraComm, 0, localComm, 0, 0, &newComm) ;
257              interCommLeft.push_back(newComm) ;
258
259            }
260
261          }
262//          if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
263//          MPI_Comm_remote_size(newComm,&size);
264          // Send serverLeader to client
265          if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,interCommLeft.back()) ;
[300]266        }
[492]267              oasis_enddef() ;
[300]268      }
[490]269
[1077]270      MPI_Comm_rank(intraComm, &rank_) ;
271      if (rank_==0) isRoot=true;
[490]272      else isRoot=false;
[492]273     
274      eventScheduler = new CEventScheduler(intraComm) ;
[300]275    }
[490]276
[300]277    void CServer::finalize(void)
278    {
[983]279
[361]280      CTimer::get("XIOS").suspend() ;
[697]281     
[492]282      delete eventScheduler ;
[655]283
284      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
285        MPI_Comm_free(&(*it));
[983]286
[1071]287      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
288        MPI_Comm_free(&(*it));
289
[992]290//      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
291//        MPI_Comm_free(&(*it));
292
[1077]293//        for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)
294//          MPI_Comm_free(&(*it));
[983]295
[1054]296        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
297          MPI_Comm_free(&(*it));
[992]298
[655]299      MPI_Comm_free(&intraComm);
300
[300]301      if (!is_MPI_Initialized)
[490]302      {
[300]303        if (CXios::usingOasis) oasis_finalize();
304        else MPI_Finalize() ;
305      }
[347]306      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
307      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
308      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[300]309    }
[490]310
[300]311     void CServer::eventLoop(void)
312     {
313       bool stop=false ;
[490]314
[347]315       CTimer::get("XIOS server").resume() ;
[300]316       while(!stop)
317       {
318         if (isRoot)
319         {
320           listenContext();
[1148]321           listenRootContext();
[300]322           if (!finished) listenFinalize() ;
323         }
324         else
325         {
326           listenRootContext();
327           if (!finished) listenRootFinalize() ;
328         }
[490]329
[300]330         contextEventLoop() ;
331         if (finished && contextList.empty()) stop=true ;
[956]332         eventScheduler->checkEvent() ;
[983]333
[300]334       }
[347]335       CTimer::get("XIOS server").suspend() ;
[300]336     }
[490]337
[300]338     void CServer::listenFinalize(void)
339     {
[992]340        list<MPI_Comm>::iterator it, itr;
[300]341        int msg ;
342        int flag ;
[490]343
[992]344        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
[300]345        {
346           MPI_Status status ;
[347]347           traceOff() ;
[300]348           MPI_Iprobe(0,0,*it,&flag,&status) ;
[347]349           traceOn() ;
[300]350           if (flag==true)
351           {
[1054]352              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
353              info(20)<<" CServer : Receive client finalize"<<endl ;
354              // Sending server finalize message to secondary servers (if any)
355              for(itr=interCommRight.begin();itr!=interCommRight.end();itr++)
356              {
357                MPI_Send(&msg,1,MPI_INT,0,0,*itr) ;
358              }
[655]359              MPI_Comm_free(&(*it));
[992]360              interCommLeft.erase(it) ;
[300]361              break ;
362            }
363         }
[490]364
[1054]365         if (interCommLeft.empty())
[300]366         {
367           int i,size ;
368           MPI_Comm_size(intraComm,&size) ;
369           MPI_Request* requests= new MPI_Request[size-1] ;
370           MPI_Status* status= new MPI_Status[size-1] ;
[490]371
[300]372           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
373           MPI_Waitall(size-1,requests,status) ;
374
375           finished=true ;
376           delete [] requests ;
377           delete [] status ;
378         }
379     }
[490]380
381
[300]382     void CServer::listenRootFinalize()
383     {
384        int flag ;
385        MPI_Status status ;
386        int msg ;
[490]387
[347]388        traceOff() ;
[300]389        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
[347]390        traceOn() ;
[300]391        if (flag==true)
392        {
393           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
394           finished=true ;
395        }
396      }
[490]397
[300]398     void CServer::listenContext(void)
399     {
[490]400
[300]401       MPI_Status status ;
402       int flag ;
403       static void* buffer ;
404       static MPI_Request request ;
405       static bool recept=false ;
406       int rank ;
407       int count ;
[490]408
[300]409       if (recept==false)
410       {
[347]411         traceOff() ;
[300]412         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
[347]413         traceOn() ;
[490]414         if (flag==true)
[300]415         {
416           rank=status.MPI_SOURCE ;
417           MPI_Get_count(&status,MPI_CHAR,&count) ;
418           buffer=new char[count] ;
419           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
[490]420           recept=true ;
[300]421         }
422       }
423       else
424       {
[347]425         traceOff() ;
[300]426         MPI_Test(&request,&flag,&status) ;
[347]427         traceOn() ;
[300]428         if (flag==true)
429         {
430           rank=status.MPI_SOURCE ;
431           MPI_Get_count(&status,MPI_CHAR,&count) ;
432           recvContextMessage(buffer,count) ;
[1054]433           delete [] buffer;
[490]434           recept=false ;
[300]435         }
436       }
437     }
[490]438
[300]439     void CServer::recvContextMessage(void* buff,int count)
440     {
[983]441       static map<string,contextMessage> recvContextId;
[300]442       map<string,contextMessage>::iterator it ;
443       CBufferIn buffer(buff,count) ;
444       string id ;
445       int clientLeader ;
446       int nbMessage ;
447
448       buffer>>id>>nbMessage>>clientLeader ;
[490]449
[300]450       it=recvContextId.find(id) ;
451       if (it==recvContextId.end())
[490]452       {
[300]453         contextMessage msg={0,0} ;
454         pair<map<string,contextMessage>::iterator,bool> ret ;
455         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
456         it=ret.first ;
[490]457       }
[300]458       it->second.nbRecv+=1 ;
459       it->second.leaderRank+=clientLeader ;
[490]460
[300]461       if (it->second.nbRecv==nbMessage)
[490]462       {
[300]463         int size ;
464         MPI_Comm_size(intraComm,&size) ;
[1148]465//         MPI_Request* requests= new MPI_Request[size-1] ;
466//         MPI_Status* status= new MPI_Status[size-1] ;
467         MPI_Request* requests= new MPI_Request[size] ;
468         MPI_Status* status= new MPI_Status[size] ;
[490]469
[1148]470         CMessage msg ;
471         msg<<id<<it->second.leaderRank;
472         int messageSize=msg.size() ;
473         void * sendBuff = new char[messageSize] ;
474         CBufferOut sendBuffer(sendBuff,messageSize) ;
475         sendBuffer<<msg ;
476
477         // Include root itself in order not to have a divergence
478         for(int i=0; i<size; i++)
[300]479         {
[1148]480           MPI_Isend(sendBuff,count,MPI_CHAR,i,2,intraComm,&requests[i]) ;
[300]481         }
482
[1148]483//         for(int i=1;i<size;i++)
484//         {
485//            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
486//         }
487//         MPI_Waitall(size-1,requests,status) ;
488//         registerContext(buff,count,it->second.leaderRank) ;
489
[300]490         recvContextId.erase(it) ;
491         delete [] requests ;
492         delete [] status ;
493
494       }
[490]495     }
496
[300]497     void CServer::listenRootContext(void)
498     {
499       MPI_Status status ;
500       int flag ;
501       static void* buffer ;
502       static MPI_Request request ;
503       static bool recept=false ;
504       int rank ;
[1148]505//       int count ;
506       static int count ;
[300]507       const int root=0 ;
[1148]508       boost::hash<string> hashString;
509       size_t hashId = hashString("RegisterContext");
[490]510
[1148]511       // (1) Receive context id from the root
[300]512       if (recept==false)
513       {
[347]514         traceOff() ;
[300]515         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
[347]516         traceOn() ;
[490]517         if (flag==true)
[300]518         {
519           MPI_Get_count(&status,MPI_CHAR,&count) ;
520           buffer=new char[count] ;
521           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
[490]522           recept=true ;
[300]523         }
524       }
[1148]525       // (2) If context id is received, save it into a buffer and register an event
[300]526       else
527       {
528         MPI_Test(&request,&flag,&status) ;
529         if (flag==true)
530         {
531           MPI_Get_count(&status,MPI_CHAR,&count) ;
[1148]532           eventScheduler->registerEvent(nbContexts,hashId);
533//           registerContext(buffer,count) ;
534//           delete [] buffer ;
[490]535           recept=false ;
[300]536         }
537       }
[1148]538       // (3) If event has been scheduled, call register context
539       if (eventScheduler->queryEvent(nbContexts,hashId))
540       {
541         registerContext(buffer,count) ;
542         ++nbContexts;
543         delete [] buffer ;
544       }
[490]545     }
546
[655]547     void CServer::registerContext(void* buff, int count, int leaderRank)
[300]548     {
549       string contextId;
[655]550       CBufferIn buffer(buff, count);
[1148]551//       buffer >> contextId;
552       buffer >> contextId>>leaderRank;
[983]553       CContext* context;
[300]554
[680]555       info(20) << "CServer : Register new Context : " << contextId << endl;
[490]556
[680]557       if (contextList.find(contextId) != contextList.end())
558         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
559               << "Context '" << contextId << "' has already been registred");
[490]560
[983]561       context=CContext::create(contextId);
[655]562       contextList[contextId]=context;
563
[1148]564       // Primary or classical server: create communication channel with a client
565       // (1) create interComm (with a client)
566       // (2) initialize client and server (contextClient and contextServer)
[1071]567       MPI_Comm inter;
[1054]568       if (serverLevel < 2)
569       {
570         MPI_Comm contextInterComm;
571         MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm);
572         MPI_Intercomm_merge(contextInterComm,1,&inter);
573         MPI_Barrier(inter);
574         MPI_Comm_free(&inter);
575         context->initServer(intraComm,contextInterComm);
576         contextInterComms.push_back(contextInterComm);
[1071]577
[1054]578       }
[1148]579       // Secondary server: create communication channel with a primary server
580       // (1) duplicate interComm with a primary server
581       // (2) initialize client and server (contextClient and contextServer)
582       // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create,
583       //         because interComm of CContext is defined on the same processes as the interComm of CServer.
584       //         So just duplicate it.
[1054]585       else if (serverLevel == 2)
586       {
[1071]587         MPI_Comm_dup(interCommLeft.front(), &inter);
588         contextInterComms.push_back(inter);
589         context->initServer(intraComm, contextInterComms.back());
[1054]590       }
591
[1148]592       // Primary server:
593       // (1) send create context message to secondary servers
594       // (2) initialize communication channels with secondary servers (create contextClient and contextServer)
[1021]595       if (serverLevel == 1)
[983]596       {
[1054]597         int i = 0, size;
598         CMessage msg;
599         int messageSize;
600         MPI_Comm_size(intraComm, &size) ;
601         for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i)
602         {
603           StdString str = contextId +"_server_" + boost::lexical_cast<string>(i);
[1077]604           msg<<str<<size<<rank_ ;
[1054]605           messageSize = msg.size() ;
606           buff = new char[messageSize] ;
607           CBufferOut buffer(buff,messageSize) ;
608           buffer<<msg ;
[1071]609           int sndServerGloRanks = serverSize_-nbPools+serverLeader_ +i;  // the assumption is that there is only one proc per secondary server pool
[1054]610           MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGloRanks, 1, CXios::globalComm) ;
[1071]611           MPI_Comm_dup(*it, &inter);
612           contextInterComms.push_back(inter);
613           MPI_Comm_dup(intraComm, &inter);
614           contextIntraComms.push_back(inter);
615           context->initClient(contextIntraComms.back(), contextInterComms.back()) ;
[1054]616           delete [] buff ;
617         }
[983]618       }
[490]619     }
620
[300]621     void CServer::contextEventLoop(void)
622     {
[1130]623       bool isFinalized ;
[300]624       map<string,CContext*>::iterator it ;
[983]625
[490]626       for(it=contextList.begin();it!=contextList.end();it++)
[300]627       {
[1130]628         isFinalized=it->second->isFinalized();
629         if (isFinalized)
[300]630         {
631           contextList.erase(it) ;
632           break ;
633         }
[1054]634         else
[1139]635           it->second->checkBuffersAndListen();
[300]636       }
637     }
[490]638
[1148]639     //! Get rank of the current process in the intraComm
[490]640     int CServer::getRank()
641     {
[1077]642       return rank_;
[490]643     }
644
[523]645    /*!
646    * Open a file specified by a suffix and an extension and use it for the given file buffer.
647    * The file name will be suffix+rank+extension.
648    *
649    * \param fileName[in] protype file name
650    * \param ext [in] extension of the file
651    * \param fb [in/out] the file buffer
652    */
653    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
654    {
655      StdStringStream fileNameClient;
656      int numDigit = 0;
657      int size = 0;
[1021]658      int id;
[523]659      MPI_Comm_size(CXios::globalComm, &size);
660      while (size)
661      {
662        size /= 10;
663        ++numDigit;
664      }
[497]665
[1021]666      if (!CXios::usingServer2)
667        id = getRank();
[1009]668      else
[1021]669      {
670        if (serverLevel == 1)
[1077]671          id = rank_;
[1021]672        else
673          id = poolId;
674      }
675      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
[523]676      fb->open(fileNameClient.str().c_str(), std::ios::out);
677      if (!fb->is_open())
678        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
679              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
680    }
[490]681
[523]682    /*!
683    * \brief Open a file stream to write the info logs
684    * Open a file stream with a specific file name suffix+rank
685    * to write the info logs.
686    * \param fileName [in] protype file name
687    */
688    void CServer::openInfoStream(const StdString& fileName)
689    {
690      std::filebuf* fb = m_infoStream.rdbuf();
691      openStream(fileName, ".out", fb);
[490]692
[523]693      info.write2File(fb);
694      report.write2File(fb);
695    }
[490]696
[523]697    //! Write the info logs to standard output
698    void CServer::openInfoStream()
699    {
700      info.write2StdOut();
701      report.write2StdOut();
702    }
[490]703
[523]704    //! Close the info logs file if it opens
705    void CServer::closeInfoStream()
706    {
707      if (m_infoStream.is_open()) m_infoStream.close();
708    }
709
710    /*!
711    * \brief Open a file stream to write the error log
712    * Open a file stream with a specific file name suffix+rank
713    * to write the error log.
714    * \param fileName [in] protype file name
715    */
716    void CServer::openErrorStream(const StdString& fileName)
717    {
718      std::filebuf* fb = m_errorStream.rdbuf();
719      openStream(fileName, ".err", fb);
720
721      error.write2File(fb);
722    }
723
724    //! Write the error log to standard error output
725    void CServer::openErrorStream()
726    {
727      error.write2StdErr();
728    }
729
730    //! Close the error log file if it opens
731    void CServer::closeErrorStream()
732    {
733      if (m_errorStream.is_open()) m_errorStream.close();
734    }
[300]735}
Note: See TracBrowser for help on using the repository browser.