source: XIOS/dev/branch_yushan_merged/src/server.cpp @ 1141

Last change on this file since 1141 was 1141, checked in by yushan, 4 years ago

adding test_remap_omp

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.2 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include "tracer.hpp"
12#include "timer.hpp"
13#include "event_scheduler.hpp"
14
15namespace xios
16{
17    MPI_Comm CServer::intraComm ;
18    list<MPI_Comm> CServer::interComm ;
19    std::list<MPI_Comm> CServer::contextInterComms;
20    bool CServer::isRoot ;
21    int CServer::rank = INVALID_RANK;
22    StdOFStream CServer::m_infoStream;
23    StdOFStream CServer::m_errorStream;
24    map<string,CContext*> CServer::contextList ;
25    bool CServer::finished=false ;
26    bool CServer::is_MPI_Initialized ;
27
28   
29    CEventScheduler* CServer::eventScheduler = 0;
30   
31    void CServer::initialize(void)
32    {
33      // Not using OASIS
34      if (!CXios::usingOasis)
35      {
36
37        CTimer::get("XIOS").resume() ;
38
39        boost::hash<string> hashString ;
40
41        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
42        unsigned long* hashAll ;
43
44
45        int size ;
46        int myColor ;
47        int i,c ;
48        MPI_Comm newComm ;
49
50        MPI_Comm_size(CXios::globalComm,&size) ;
51       
52        //size = CXios::globalComm.ep_comm_ptr->size_rank_info[0].second;
53        printf("global size = %d, size= %d\n", CXios::globalComm.ep_comm_ptr->size_rank_info[0].second, size);
54       
55        MPI_Comm_rank(CXios::globalComm,&rank);
56        hashAll=new unsigned long[size] ;
57
58        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
59
60        map<unsigned long, int> colors ;
61        map<unsigned long, int> leaders ;
62        map<unsigned long, int>::iterator it ;
63
64        for(i=0,c=0;i<size;i++)
65        {
66          if (colors.find(hashAll[i])==colors.end())
67          {
68            colors[hashAll[i]]=c ;
69            leaders[hashAll[i]]=i ;
70            c++ ;
71          }
72        }
73
74        myColor=colors[hashServer] ;
75
76
77        MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ;
78
79       
80        int serverLeader=leaders[hashServer] ;
81        int clientLeader;
82
83         serverLeader=leaders[hashServer] ;
84         for(it=leaders.begin();it!=leaders.end();++it)
85         {
86           if (it->first!=hashServer)
87           {
88             clientLeader=it->second ;
89             int intraCommSize, intraCommRank ;
90             MPI_Comm_size(intraComm,&intraCommSize) ;
91             MPI_Comm_rank(intraComm,&intraCommRank) ;
92             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
93                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
94
95             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
96             interComm.push_back(newComm) ;
97           }
98         }
99
100         delete [] hashAll ;
101      }
102      // using OASIS
103      else
104      {
105        int size;
106        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
107
108        CTimer::get("XIOS").resume() ;
109        MPI_Comm localComm;
110        oasis_get_localcomm(localComm);
111        MPI_Comm_dup(localComm, &intraComm);
112
113        MPI_Comm_rank(intraComm,&rank) ;
114        MPI_Comm_size(intraComm,&size) ;
115        string codesId=CXios::getin<string>("oasis_codes_id") ;
116
117        vector<string> splitted ;
118        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
119        vector<string>::iterator it ;
120
121        MPI_Comm newComm ;
122        int globalRank ;
123        MPI_Comm_rank(CXios::globalComm,&globalRank);
124
125        for(it=splitted.begin();it!=splitted.end();it++)
126        {
127          oasis_get_intercomm(newComm,*it) ;
128          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
129          MPI_Comm_remote_size(newComm,&size);
130          interComm.push_back(newComm) ;
131        }
132              oasis_enddef() ;
133      }
134
135      MPI_Comm_rank(intraComm,&rank) ;
136      if (rank==0) isRoot=true;
137      else isRoot=false;
138     
139      eventScheduler = new CEventScheduler(intraComm) ;
140    }
141
142    void CServer::finalize(void)
143    {
144      CTimer::get("XIOS").suspend() ;
145     
146      delete eventScheduler ;
147
148      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); ++it)
149        MPI_Comm_free(&(*it));
150      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); ++it)
151        MPI_Comm_free(&(*it));
152
153      MPI_Comm_free(&intraComm);
154
155      if (!is_MPI_Initialized)
156      {
157        if (CXios::usingOasis) oasis_finalize();
158        //else  {MPI_Finalize() ;}
159      }
160
161     
162      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
163      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
164      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
165    }
166
167     void CServer::eventLoop(void)
168     {
169       bool stop=false ;
170
171       CTimer::get("XIOS server").resume() ;
172       while(!stop)
173       {
174         if (isRoot)
175         {
176           listenContext();
177           if (!finished) listenFinalize() ;
178         }
179         else
180         {
181           listenRootContext();
182           if (!finished) 
183           {
184             listenRootFinalize() ;
185           }
186         }
187
188         contextEventLoop() ;
189         if (finished && contextList.empty()) stop=true ;
190         
191         eventScheduler->checkEvent() ;
192       }
193       
194       
195       CTimer::get("XIOS server").suspend() ;
196     }
197
198     void CServer::listenFinalize(void)
199     {
200        list<MPI_Comm>::iterator it;
201        int msg ;
202        int flag ;
203
204        for(it=interComm.begin();it!=interComm.end();++it)
205        {
206           MPI_Status status ;
207           traceOff() ;
208           MPI_Iprobe(0,0,*it,&flag,&status) ;
209           traceOn() ;
210           if (flag==true)
211           {
212              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
213              info(20)<<" CServer : Receive client finalize"<<endl ;
214
215              MPI_Comm_free(&(*it));
216              interComm.erase(it) ;
217              break ;
218            }
219         }
220
221         if (interComm.empty())
222         {
223           int i,size ;
224           MPI_Comm_size(intraComm,&size) ;
225           MPI_Request* requests= new MPI_Request[size-1] ;
226           MPI_Status* status= new MPI_Status[size-1] ;
227
228           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
229           MPI_Waitall(size-1,requests,status) ;
230
231           finished=true ;
232           delete [] requests ;
233           delete [] status ;
234         }
235     }
236
237
238     void CServer::listenRootFinalize()
239     {
240        int flag ;
241        MPI_Status status ;
242        int msg ;
243
244        traceOff() ;
245        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
246        traceOn() ;
247        if (flag==true)
248        {
249           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
250           finished=true ;
251        }
252      }
253
254     void CServer::listenContext(void)
255     {
256
257       MPI_Status status ;
258       int flag ;
259       static char* buffer ;
260       static MPI_Request request ;
261       static bool recept=false ;
262       int rank ;
263       int count ;
264
265       if (recept==false)
266       {
267         traceOff() ;
268         #ifdef _usingEP
269         MPI_Iprobe(-1,1,CXios::globalComm, &flag, &status) ;
270         #else
271         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
272         #endif
273         traceOn() ;
274         
275         if (flag==true)
276         {
277           #ifdef _usingMPI
278           rank=status.MPI_SOURCE ;
279           #elif _usingEP
280           rank= status.ep_src ;
281           #endif
282           MPI_Get_count(&status,MPI_CHAR,&count) ;
283           buffer=new char[count] ;
284           MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
285           recept=true ;
286         }
287       }
288       else
289       {
290         traceOff() ;
291         MPI_Test(&request,&flag,&status) ;
292         traceOn() ;
293         if (flag==true)
294         {
295           #ifdef _usingMPI
296           rank=status.MPI_SOURCE ;
297           #elif _usingEP
298           rank= status.ep_src ;
299           #endif
300           MPI_Get_count(&status,MPI_CHAR,&count) ;
301           recvContextMessage((void*)buffer,count) ;
302           delete [] buffer ;
303           recept=false ;
304         }
305       }
306     }
307
308     void CServer::recvContextMessage(void* buff,int count)
309     {
310       static map<string,contextMessage> recvContextId ;
311       map<string,contextMessage>::iterator it ;
312
313       CBufferIn buffer(buff,count) ;
314       string id ;
315       int clientLeader ;
316       int nbMessage ;
317
318       buffer>>id>>nbMessage>>clientLeader ;
319
320       it=recvContextId.find(id) ;
321       if (it==recvContextId.end())
322       {
323         contextMessage msg={0,0} ;
324         pair<map<string,contextMessage>::iterator,bool> ret ;
325         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
326         it=ret.first ;
327       }
328       it->second.nbRecv+=1 ;
329       it->second.leaderRank+=clientLeader ;
330
331       if (it->second.nbRecv==nbMessage)
332       {
333         int size ;
334         MPI_Comm_size(intraComm,&size) ;
335         MPI_Request* requests= new MPI_Request[size-1] ;
336         MPI_Status* status= new MPI_Status[size-1] ;
337
338         for(int i=1;i<size;i++)
339         {
340            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
341         }
342         MPI_Waitall(size-1,requests,status) ;
343         registerContext(buff,count,it->second.leaderRank) ;
344
345         recvContextId.erase(it) ;
346         delete [] requests ;
347         delete [] status ;
348
349       }
350     }
351
352     void CServer::listenRootContext(void)
353     {
354
355       MPI_Status status ;
356       int flag ;
357       static char* buffer ;
358       static MPI_Request request ;
359       static bool recept=false ;
360       int rank ;
361       int count ;
362       const int root=0 ;
363
364       if (recept==false)
365       {
366         traceOff() ;
367         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
368         traceOn() ;
369         if (flag==true)
370         {
371           MPI_Get_count(&status,MPI_CHAR,&count) ;
372           buffer=new char[count] ;
373           MPI_Irecv((void*)buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
374           recept=true ;
375         }
376       }
377       else
378       {
379         MPI_Test(&request,&flag,&status) ;
380         if (flag==true)
381         {
382           MPI_Get_count(&status,MPI_CHAR,&count) ;
383           registerContext((void*)buffer,count) ;
384           delete [] buffer ;
385           recept=false ;
386         }
387       }
388     }
389
390     void CServer::registerContext(void* buff, int count, int leaderRank)
391     {
392       string contextId;
393       CBufferIn buffer(buff, count);
394       buffer >> contextId;
395
396       info(20) << "CServer : Register new Context : " << contextId << endl;
397
398       if (contextList.find(contextId) != contextList.end())
399         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
400               << "Context '" << contextId << "' has already been registred");
401
402       MPI_Comm contextIntercomm;
403       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm);
404
405       MPI_Comm inter;
406       MPI_Intercomm_merge(contextIntercomm,1,&inter);
407       MPI_Barrier(inter);
408
409       CContext* context=CContext::create(contextId);
410       contextList[contextId]=context;
411       context->initServer(intraComm,contextIntercomm);
412
413       contextInterComms.push_back(contextIntercomm);
414       MPI_Comm_free(&inter);
415     }
416
417     void CServer::contextEventLoop(void)
418     {
419       bool finished ;
420       map<string,CContext*>::iterator it ;
421       for(it=contextList.begin();it!=contextList.end();++it)
422       {
423         finished=it->second->checkBuffersAndListen();
424         if (finished)
425         {
426           contextList.erase(it) ;
427           break ;
428         }
429       }
430
431     }
432
433     //! Get rank of the current process
434     int CServer::getRank()
435     {
436       return rank;
437     }
438
439    /*!
440    * Open a file specified by a suffix and an extension and use it for the given file buffer.
441    * The file name will be suffix+rank+extension.
442    *
443    * \param fileName[in] protype file name
444    * \param ext [in] extension of the file
445    * \param fb [in/out] the file buffer
446    */
447    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
448    {
449      StdStringStream fileNameClient;
450      int numDigit = 0;
451      int size = 0;
452      MPI_Comm_size(CXios::globalComm, &size);
453      while (size)
454      {
455        size /= 10;
456        ++numDigit;
457      }
458
459      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
460      fb->open(fileNameClient.str().c_str(), std::ios::out);
461      if (!fb->is_open())
462        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
463              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
464    }
465
466    /*!
467    * \brief Open a file stream to write the info logs
468    * Open a file stream with a specific file name suffix+rank
469    * to write the info logs.
470    * \param fileName [in] protype file name
471    */
472    void CServer::openInfoStream(const StdString& fileName)
473    {
474      std::filebuf* fb = m_infoStream.rdbuf();
475      openStream(fileName, ".out", fb);
476
477      info.write2File(fb);
478      report.write2File(fb);
479    }
480
481    //! Write the info logs to standard output
482    void CServer::openInfoStream()
483    {
484      info.write2StdOut();
485      report.write2StdOut();
486    }
487
488    //! Close the info logs file if it opens
489    void CServer::closeInfoStream()
490    {
491      if (m_infoStream.is_open()) m_infoStream.close();
492    }
493
494    /*!
495    * \brief Open a file stream to write the error log
496    * Open a file stream with a specific file name suffix+rank
497    * to write the error log.
498    * \param fileName [in] protype file name
499    */
500    void CServer::openErrorStream(const StdString& fileName)
501    {
502      std::filebuf* fb = m_errorStream.rdbuf();
503      openStream(fileName, ".err", fb);
504
505      error.write2File(fb);
506    }
507
508    //! Write the error log to standard error output
509    void CServer::openErrorStream()
510    {
511      error.write2StdErr();
512    }
513
514    //! Close the error log file if it opens
515    void CServer::closeErrorStream()
516    {
517      if (m_errorStream.is_open()) m_errorStream.close();
518    }
519}
Note: See TracBrowser for help on using the repository browser.