source: XIOS/dev/dev_olga/src/server.cpp @ 992

Last change on this file since 992 was 992, checked in by oabramkina, 7 years ago

First rebond on the secondary server pool. XIOS finalizes correctly.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 17.2 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "event_scheduler.hpp"
16
17namespace xios
18{
19    MPI_Comm CServer::intraComm ;
20    list<MPI_Comm> CServer::interCommLeft ;
21    list<MPI_Comm> CServer::interCommRight ;
22    list<MPI_Comm> CServer::interComm ;
23    std::list<MPI_Comm> CServer::contextInterComms;
24    bool CServer::isRoot = false ;
25    int CServer::rank = INVALID_RANK;
26    StdOFStream CServer::m_infoStream;
27    StdOFStream CServer::m_errorStream;
28    map<string,CContext*> CServer::contextList ;
29    bool CServer::finished=false ;
30    bool CServer::is_MPI_Initialized ;
31    CEventScheduler* CServer::eventScheduler = 0;
32
33//---------------------------------------------------------------
34/*!
35 * \fn void CServer::initialize(void)
36 * Function creates intra and inter comm for each initialized server pool
37 */
38    void CServer::initialize(void)
39    {
40      int initialized ;
41      MPI_Initialized(&initialized) ;
42      if (initialized) is_MPI_Initialized=true ;
43      else is_MPI_Initialized=false ;
44
45      // Not using OASIS
46      if (!CXios::usingOasis)
47      {
48
49        if (!is_MPI_Initialized)
50        {
51          MPI_Init(NULL, NULL);
52        }
53        CTimer::get("XIOS").resume() ;
54
55        int nbSrvLevels = 2;
56
57        boost::hash<string> hashString ;
58        unsigned long hashServer1 = hashString(CXios::xiosCodeIdPrm);
59        unsigned long hashServer2 = hashString(CXios::xiosCodeIdSnd);
60        unsigned long hashServer = (CXios::serverLevel < 2)  ? hashServer1 : hashServer2;
61
62        unsigned long* hashAll ;
63
64//        int rank ;
65        int size ;
66        int myColor ;
67        int i,c ;
68        MPI_Comm newComm ;
69
70        MPI_Comm_size(CXios::globalComm, &size) ;
71        MPI_Comm_rank(CXios::globalComm, &rank);
72        hashAll=new unsigned long[size] ;
73        MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ;
74
75        map<unsigned long, int> colors ;
76        map<unsigned long, int> leaders ;
77        map<unsigned long, int>::iterator it ;
78
79        for(i=0,c=0;i<size;i++)
80        {
81          if (colors.find(hashAll[i])==colors.end())
82          {
83            colors[hashAll[i]]=c ;
84            leaders[hashAll[i]]=i ;
85            c++ ;
86          }
87        }
88
89        myColor=colors[hashServer] ;
90        MPI_Comm_split(MPI_COMM_WORLD, myColor, rank, &intraComm) ;
91
92        if (CXios::serverLevel == 0)
93        {
94          int clientLeader;
95          for(it=leaders.begin();it!=leaders.end();it++)
96          {
97            if (it->first!=hashServer)
98            {
99              clientLeader=it->second ;
100              int intraCommSize, intraCommRank ;
101              MPI_Comm_size(intraComm,&intraCommSize) ;
102              MPI_Comm_rank(intraComm,&intraCommRank) ;
103              info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
104                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
105
106               MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
107               interCommLeft.push_back(newComm) ;
108               interComm.push_back(newComm) ;
109            }
110          }
111        }
112
113        else if ((CXios::serverLevel == 1))
114        {
115          int clientLeader;
116          int srvSndLeader;
117          for(it=leaders.begin();it!=leaders.end();it++)
118          {
119            if (it->first != hashServer2)
120            {
121              if (it->first != hashServer1)
122              {
123                clientLeader=it->second ;
124                int intraCommSize, intraCommRank ;
125                MPI_Comm_size(intraComm,&intraCommSize) ;
126                MPI_Comm_rank(intraComm,&intraCommRank) ;
127                info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
128                         <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
129                MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
130                interCommLeft.push_back(newComm) ;
131                interComm.push_back(newComm) ;
132              }
133            }
134            else
135            {
136              srvSndLeader = it->second;
137            }
138          }
139
140          CClient::initializeClientOnServer(rank, intraComm, srvSndLeader);
141          interCommRight.push_back(CClient::getInterComm());
142          interComm.push_back(CClient::getInterComm());
143
144        }
145
146        else // secondary server pool
147        {
148          int clientLeader;
149          for(it=leaders.begin();it!=leaders.end();it++)
150          {
151            if (it->first == hashServer1)
152            {
153              clientLeader=it->second ;
154              int intraCommSize, intraCommRank ;
155              MPI_Comm_size(intraComm,&intraCommSize) ;
156              MPI_Comm_rank(intraComm,&intraCommRank) ;
157              info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
158                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
159
160              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
161              interCommLeft.push_back(newComm) ;
162              interComm.push_back(newComm) ;
163
164              break;
165            }
166          }
167        }
168
169        delete [] hashAll ;
170
171      }
172      // using OASIS
173      else
174      {
175//        int rank ,size;
176        int size;
177        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
178
179        CTimer::get("XIOS").resume() ;
180        MPI_Comm localComm;
181        oasis_get_localcomm(localComm);
182        MPI_Comm_dup(localComm, &intraComm);
183
184        MPI_Comm_rank(intraComm,&rank) ;
185        MPI_Comm_size(intraComm,&size) ;
186        string codesId=CXios::getin<string>("oasis_codes_id") ;
187
188        vector<string> splitted ;
189        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
190        vector<string>::iterator it ;
191
192        MPI_Comm newComm ;
193        int globalRank ;
194        MPI_Comm_rank(CXios::globalComm,&globalRank);
195
196        for(it=splitted.begin();it!=splitted.end();it++)
197        {
198          oasis_get_intercomm(newComm,*it) ;
199          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
200          MPI_Comm_remote_size(newComm,&size);
201          interComm.push_back(newComm) ;
202        }
203              oasis_enddef() ;
204      }
205
206//      int rank;
207      MPI_Comm_rank(intraComm,&rank) ;
208      if (rank==0) isRoot=true;
209      else isRoot=false;
210     
211      eventScheduler = new CEventScheduler(intraComm) ;
212    }
213
214    void CServer::finalize(void)
215    {
216
217      CTimer::get("XIOS").suspend() ;
218     
219      delete eventScheduler ;
220
221      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
222        MPI_Comm_free(&(*it));
223
224//      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
225//        MPI_Comm_free(&(*it));
226
227      for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)
228        MPI_Comm_free(&(*it));
229
230      for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
231        MPI_Comm_free(&(*it));
232
233      MPI_Comm_free(&intraComm);
234
235      if (!is_MPI_Initialized)
236      {
237        if (CXios::usingOasis) oasis_finalize();
238        else MPI_Finalize() ;
239      }
240      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
241      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
242      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
243    }
244
245     void CServer::eventLoop(void)
246     {
247       bool stop=false ;
248
249       CTimer::get("XIOS server").resume() ;
250       while(!stop)
251       {
252
253         if (isRoot)
254         {
255           listenContext();
256           if (!finished) listenFinalize() ;
257         }
258         else
259         {
260           listenRootContext();
261           if (!finished) listenRootFinalize() ;
262         }
263
264         contextEventLoop() ;
265         if (finished && contextList.empty()) stop=true ;
266         eventScheduler->checkEvent() ;
267
268       }
269       CTimer::get("XIOS server").suspend() ;
270     }
271
272     void CServer::listenFinalize(void)
273     {
274        list<MPI_Comm>::iterator it, itr;
275        int msg ;
276        int flag ;
277
278        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
279        {
280           MPI_Status status ;
281           traceOff() ;
282           MPI_Iprobe(0,0,*it,&flag,&status) ;
283           traceOn() ;
284           if (flag==true)
285           {
286             MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
287             info(20)<<" CServer : Receive client finalize"<<endl ;
288
289             // If primary server, send finalize to secondary server pool(s)
290             for(itr=interCommRight.begin(); itr!=interCommRight.end(); itr++)
291             {
292               MPI_Send(&msg,1,MPI_INT,0,0,*itr) ;
293
294//               MPI_Comm_free(&(*itr));
295//               interCommRight.erase(itr) ;
296             }
297
298              MPI_Comm_free(&(*it));
299//              interComm.erase(it) ;
300              interCommLeft.erase(it) ;
301              break ;
302            }
303         }
304
305        if (interCommLeft.empty())
306//        if (interComm.empty())
307         {
308           int i,size ;
309           MPI_Comm_size(intraComm,&size) ;
310           MPI_Request* requests= new MPI_Request[size-1] ;
311           MPI_Status* status= new MPI_Status[size-1] ;
312
313           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
314           MPI_Waitall(size-1,requests,status) ;
315
316           finished=true ;
317           delete [] requests ;
318           delete [] status ;
319         }
320     }
321
322
323     void CServer::listenRootFinalize()
324     {
325        int flag ;
326        MPI_Status status ;
327        int msg ;
328
329        traceOff() ;
330        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
331        traceOn() ;
332        if (flag==true)
333        {
334           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
335           finished=true ;
336        }
337      }
338
339     void CServer::listenContext(void)
340     {
341
342       MPI_Status status ;
343       int flag ;
344       static void* buffer ;
345       static MPI_Request request ;
346       static bool recept=false ;
347       int rank ;
348       int count ;
349
350       if (recept==false)
351       {
352         traceOff() ;
353         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
354         traceOn() ;
355         if (flag==true)
356         {
357           rank=status.MPI_SOURCE ;
358           MPI_Get_count(&status,MPI_CHAR,&count) ;
359           buffer=new char[count] ;
360           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
361           recept=true ;
362         }
363       }
364       else
365       {
366         traceOff() ;
367         MPI_Test(&request,&flag,&status) ;
368         traceOn() ;
369         if (flag==true)
370         {
371           rank=status.MPI_SOURCE ;
372           MPI_Get_count(&status,MPI_CHAR,&count) ;
373           recvContextMessage(buffer,count) ;
374           delete [] buffer ;
375           recept=false ;
376         }
377       }
378     }
379
380     void CServer::recvContextMessage(void* buff,int count)
381     {
382       static map<string,contextMessage> recvContextId;
383
384       map<string,contextMessage>::iterator it ;
385
386       CBufferIn buffer(buff,count) ;
387       string id ;
388       int clientLeader ;
389       int nbMessage ;
390
391       buffer>>id>>nbMessage>>clientLeader ;
392
393       it=recvContextId.find(id) ;
394       if (it==recvContextId.end())
395       {
396         contextMessage msg={0,0} ;
397         pair<map<string,contextMessage>::iterator,bool> ret ;
398         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
399         it=ret.first ;
400       }
401       it->second.nbRecv+=1 ;
402       it->second.leaderRank+=clientLeader ;
403
404       if (it->second.nbRecv==nbMessage)
405       {
406         int size ;
407         MPI_Comm_size(intraComm,&size) ;
408         MPI_Request* requests= new MPI_Request[size-1] ;
409         MPI_Status* status= new MPI_Status[size-1] ;
410
411         for(int i=1;i<size;i++)
412         {
413            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
414         }
415         MPI_Waitall(size-1,requests,status) ;
416         registerContext(buff,count,it->second.leaderRank) ;
417
418         recvContextId.erase(it) ;
419         delete [] requests ;
420         delete [] status ;
421
422       }
423     }
424
425     void CServer::listenRootContext(void)
426     {
427
428       MPI_Status status ;
429       int flag ;
430       static void* buffer ;
431       static MPI_Request request ;
432       static bool recept=false ;
433       int rank ;
434       int count ;
435       const int root=0 ;
436
437       if (recept==false)
438       {
439         traceOff() ;
440         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
441         traceOn() ;
442         if (flag==true)
443         {
444           MPI_Get_count(&status,MPI_CHAR,&count) ;
445           buffer=new char[count] ;
446           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
447           recept=true ;
448         }
449       }
450       else
451       {
452         MPI_Test(&request,&flag,&status) ;
453         if (flag==true)
454         {
455           MPI_Get_count(&status,MPI_CHAR,&count) ;
456           registerContext(buffer,count) ;
457
458           delete [] buffer ;
459           recept=false ;
460         }
461       }
462     }
463
464     void CServer::registerContext(void* buff, int count, int leaderRank)
465     {
466       string contextId;
467       CBufferIn buffer(buff, count);
468       buffer >> contextId;
469       CContext* context;
470
471       info(20) << "CServer : Register new Context : " << contextId << endl;
472
473       if (contextList.find(contextId) != contextList.end())
474         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
475               << "Context '" << contextId << "' has already been registred");
476
477       MPI_Comm contextInterComm;
478       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextInterComm);
479
480       MPI_Comm inter;
481       MPI_Intercomm_merge(contextInterComm,1,&inter);
482       MPI_Barrier(inter);
483
484       context=CContext::create(contextId);
485       contextList[contextId]=context;
486       context->initServer(intraComm,contextInterComm);
487       contextInterComms.push_back(contextInterComm);
488
489       if (CXios::serverLevel == 1)
490       {
491         CClient::registerContext(contextId, intraComm);
492       }
493
494       MPI_Comm_free(&inter);
495
496     }
497
498     void CServer::contextEventLoop(void)
499     {
500       bool finished ;
501       map<string,CContext*>::iterator it ;
502
503       for(it=contextList.begin();it!=contextList.end();it++)
504       {
505         finished=it->second->checkBuffersAndListen();
506         if (finished)
507         {
508           contextList.erase(it) ;
509           break ;
510         }
511       }
512     }
513
514     //! Get rank of the current process
515     int CServer::getRank()
516     {
517       return rank;
518     }
519
520    /*!
521    * Open a file specified by a suffix and an extension and use it for the given file buffer.
522    * The file name will be suffix+rank+extension.
523    *
524    * \param fileName[in] protype file name
525    * \param ext [in] extension of the file
526    * \param fb [in/out] the file buffer
527    */
528    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
529    {
530      StdStringStream fileNameClient;
531      int numDigit = 0;
532      int size = 0;
533      MPI_Comm_size(CXios::globalComm, &size);
534      while (size)
535      {
536        size /= 10;
537        ++numDigit;
538      }
539
540      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
541      fb->open(fileNameClient.str().c_str(), std::ios::out);
542      if (!fb->is_open())
543        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
544              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
545    }
546
547    /*!
548    * \brief Open a file stream to write the info logs
549    * Open a file stream with a specific file name suffix+rank
550    * to write the info logs.
551    * \param fileName [in] protype file name
552    */
553    void CServer::openInfoStream(const StdString& fileName)
554    {
555      std::filebuf* fb = m_infoStream.rdbuf();
556      openStream(fileName, ".out", fb);
557
558      info.write2File(fb);
559      report.write2File(fb);
560    }
561
562    //! Write the info logs to standard output
563    void CServer::openInfoStream()
564    {
565      info.write2StdOut();
566      report.write2StdOut();
567    }
568
569    //! Close the info logs file if it opens
570    void CServer::closeInfoStream()
571    {
572      if (m_infoStream.is_open()) m_infoStream.close();
573    }
574
575    /*!
576    * \brief Open a file stream to write the error log
577    * Open a file stream with a specific file name suffix+rank
578    * to write the error log.
579    * \param fileName [in] protype file name
580    */
581    void CServer::openErrorStream(const StdString& fileName)
582    {
583      std::filebuf* fb = m_errorStream.rdbuf();
584      openStream(fileName, ".err", fb);
585
586      error.write2File(fb);
587    }
588
589    //! Write the error log to standard error output
590    void CServer::openErrorStream()
591    {
592      error.write2StdErr();
593    }
594
595    //! Close the error log file if it opens
596    void CServer::closeErrorStream()
597    {
598      if (m_errorStream.is_open()) m_errorStream.close();
599    }
600}
Note: See TracBrowser for help on using the repository browser.