source: XIOS/trunk/src/server.cpp @ 492

Last change on this file since 492 was 492, checked in by ymipsl, 10 years ago

Add event scheduler functionnality in order to schedule events from different context, that cause Deadlock or crash when using collective MPI communication in netcdf/hdf5 library.

YM

  • Property svn:eol-style set to native
File size: 11.9 KB
Line 
1#include "globalScopeData.hpp"
2#include "xmlioserver_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include "mpi.hpp"
12#include "tracer.hpp"
13#include "timer.hpp"
14#include "event_scheduler.hpp"
15
16namespace xios
17{
18    MPI_Comm CServer::intraComm ;
19    list<MPI_Comm> CServer::interComm ;
20    bool CServer::isRoot ;
21    int CServer::rank = INVALID_RANK;
22    StdOFStream CServer::m_infoStream;
23    map<string,CContext*> CServer::contextList ;
24    bool CServer::finished=false ;
25    bool CServer::is_MPI_Initialized ;
26    CEventScheduler* CServer::eventScheduler ;
27
28    void CServer::initialize(void)
29    {
30      int initialized ;
31      MPI_Initialized(&initialized) ;
32      if (initialized) is_MPI_Initialized=true ;
33      else is_MPI_Initialized=false ;
34
35      // Not using OASIS
36      if (!CXios::usingOasis)
37      {
38
39        if (!is_MPI_Initialized)
40        {
41          int argc=0;
42          char** argv=NULL;
43          MPI_Init(&argc,&argv) ;
44        }
45        CTimer::get("XIOS").resume() ;
46
47        boost::hash<string> hashString ;
48
49        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
50        unsigned long* hashAll ;
51
52//        int rank ;
53        int size ;
54        int myColor ;
55        int i,c ;
56        MPI_Comm newComm ;
57
58        MPI_Comm_size(CXios::globalComm,&size) ;
59        MPI_Comm_rank(CXios::globalComm,&rank);
60        hashAll=new unsigned long[size] ;
61
62        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
63
64        map<unsigned long, int> colors ;
65        map<unsigned long, int> leaders ;
66        map<unsigned long, int>::iterator it ;
67
68        for(i=0,c=0;i<size;i++)
69        {
70          if (colors.find(hashAll[i])==colors.end())
71          {
72            colors[hashAll[i]]=c ;
73            leaders[hashAll[i]]=i ;
74            c++ ;
75          }
76        }
77
78        myColor=colors[hashServer] ;
79        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
80
81        int serverLeader=leaders[hashServer] ;
82        int clientLeader;
83
84         serverLeader=leaders[hashServer] ;
85         for(it=leaders.begin();it!=leaders.end();it++)
86         {
87           if (it->first!=hashServer)
88           {
89             clientLeader=it->second ;
90             int intraCommSize, intraCommRank ;
91             MPI_Comm_size(intraComm,&intraCommSize) ;
92             MPI_Comm_rank(intraComm,&intraCommRank) ;
93             cout<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
94                 <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
95
96             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
97             interComm.push_back(newComm) ;
98           }
99         }
100
101         delete [] hashAll ;
102      }
103      // using OASIS
104      else
105      {
106//        int rank ,size;
107        int size;
108        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
109
110        CTimer::get("XIOS").resume() ;
111        oasis_get_localcomm(intraComm) ;
112        MPI_Comm_rank(intraComm,&rank) ;
113        MPI_Comm_size(intraComm,&size) ;
114        string codesId=CXios::getin<string>("oasis_codes_id") ;
115
116        vector<string> splitted ;
117        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
118        vector<string>::iterator it ;
119
120        MPI_Comm newComm ;
121        int globalRank ;
122        MPI_Comm_rank(CXios::globalComm,&globalRank);
123
124        for(it=splitted.begin();it!=splitted.end();it++)
125        {
126          oasis_get_intercomm(newComm,*it) ;
127          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
128          MPI_Comm_remote_size(newComm,&size);
129          interComm.push_back(newComm) ;
130        }
131              oasis_enddef() ;
132      }
133
134//      int rank;
135      MPI_Comm_rank(intraComm,&rank) ;
136      if (rank==0) isRoot=true;
137      else isRoot=false;
138     
139      eventScheduler = new CEventScheduler(intraComm) ;
140    }
141
142    void CServer::finalize(void)
143    {
144      CTimer::get("XIOS").suspend() ;
145     
146      delete eventScheduler ;
147     
148      if (!is_MPI_Initialized)
149      {
150        if (CXios::usingOasis) oasis_finalize();
151        else MPI_Finalize() ;
152      }
153      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
154      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
155      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
156    }
157
158     void CServer::eventLoop(void)
159     {
160       bool stop=false ;
161
162       CTimer::get("XIOS server").resume() ;
163       while(!stop)
164       {
165         if (isRoot)
166         {
167           listenContext();
168           if (!finished) listenFinalize() ;
169         }
170         else
171         {
172           listenRootContext();
173           if (!finished) listenRootFinalize() ;
174         }
175
176         contextEventLoop() ;
177         if (finished && contextList.empty()) stop=true ;
178         if (! CXios::isServer) eventScheduler->checkEvent() ;
179       }
180       CTimer::get("XIOS server").suspend() ;
181     }
182
183     void CServer::listenFinalize(void)
184     {
185        list<MPI_Comm>::iterator it;
186        int msg ;
187        int flag ;
188
189        for(it=interComm.begin();it!=interComm.end();it++)
190        {
191           MPI_Status status ;
192           traceOff() ;
193           MPI_Iprobe(0,0,*it,&flag,&status) ;
194           traceOn() ;
195           if (flag==true)
196           {
197              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
198              info(20)<<" CServer : Receive client finalize"<<endl ;
199              interComm.erase(it) ;
200              break ;
201            }
202         }
203
204         if (interComm.empty())
205         {
206           int i,size ;
207           MPI_Comm_size(intraComm,&size) ;
208           MPI_Request* requests= new MPI_Request[size-1] ;
209           MPI_Status* status= new MPI_Status[size-1] ;
210
211           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
212           MPI_Waitall(size-1,requests,status) ;
213
214           finished=true ;
215           delete [] requests ;
216           delete [] status ;
217         }
218     }
219
220
221     void CServer::listenRootFinalize()
222     {
223        int flag ;
224        MPI_Status status ;
225        int msg ;
226
227        traceOff() ;
228        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
229        traceOn() ;
230        if (flag==true)
231        {
232           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
233           finished=true ;
234        }
235      }
236
237     void CServer::listenContext(void)
238     {
239
240       MPI_Status status ;
241       int flag ;
242       static void* buffer ;
243       static MPI_Request request ;
244       static bool recept=false ;
245       int rank ;
246       int count ;
247
248       if (recept==false)
249       {
250         traceOff() ;
251         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
252         traceOn() ;
253         if (flag==true)
254         {
255           rank=status.MPI_SOURCE ;
256           MPI_Get_count(&status,MPI_CHAR,&count) ;
257           buffer=new char[count] ;
258           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
259           recept=true ;
260         }
261       }
262       else
263       {
264         traceOff() ;
265         MPI_Test(&request,&flag,&status) ;
266         traceOn() ;
267         if (flag==true)
268         {
269           rank=status.MPI_SOURCE ;
270           MPI_Get_count(&status,MPI_CHAR,&count) ;
271           recvContextMessage(buffer,count) ;
272           delete [] buffer ;
273           recept=false ;
274         }
275       }
276     }
277
278     void CServer::recvContextMessage(void* buff,int count)
279     {
280       static map<string,contextMessage> recvContextId ;
281       map<string,contextMessage>::iterator it ;
282
283       CBufferIn buffer(buff,count) ;
284       string id ;
285       int clientLeader ;
286       int nbMessage ;
287
288       buffer>>id>>nbMessage>>clientLeader ;
289
290       it=recvContextId.find(id) ;
291       if (it==recvContextId.end())
292       {
293         contextMessage msg={0,0} ;
294         pair<map<string,contextMessage>::iterator,bool> ret ;
295         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
296         it=ret.first ;
297       }
298       it->second.nbRecv+=1 ;
299       it->second.leaderRank+=clientLeader ;
300
301       if (it->second.nbRecv==nbMessage)
302       {
303         int size ;
304         MPI_Comm_size(intraComm,&size) ;
305         MPI_Request* requests= new MPI_Request[size-1] ;
306         MPI_Status* status= new MPI_Status[size-1] ;
307
308         for(int i=1;i<size;i++)
309         {
310            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
311         }
312         MPI_Waitall(size-1,requests,status) ;
313         registerContext(buff,count,it->second.leaderRank) ;
314
315         recvContextId.erase(it) ;
316         delete [] requests ;
317         delete [] status ;
318
319       }
320     }
321
322     void CServer::listenRootContext(void)
323     {
324
325       MPI_Status status ;
326       int flag ;
327       static void* buffer ;
328       static MPI_Request request ;
329       static bool recept=false ;
330       int rank ;
331       int count ;
332       const int root=0 ;
333
334       if (recept==false)
335       {
336         traceOff() ;
337         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
338         traceOn() ;
339         if (flag==true)
340         {
341           MPI_Get_count(&status,MPI_CHAR,&count) ;
342           buffer=new char[count] ;
343           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
344           recept=true ;
345         }
346       }
347       else
348       {
349         MPI_Test(&request,&flag,&status) ;
350         if (flag==true)
351         {
352           MPI_Get_count(&status,MPI_CHAR,&count) ;
353           registerContext(buffer,count) ;
354           delete [] buffer ;
355           recept=false ;
356         }
357       }
358     }
359
360
361
362     void CServer::registerContext(void* buff,int count, int leaderRank)
363     {
364
365       string contextId;
366       CBufferIn buffer(buff,count) ;
367
368       buffer>>contextId ;
369       MPI_Comm contextIntercomm ;
370       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
371
372       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
373       MPI_Comm inter ;
374       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
375       MPI_Barrier(inter) ;
376       if (contextList.find(contextId)!=contextList.end())
377        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
378              <<"Context has already been registred") ;
379
380      CContext* context=CContext::create(contextId) ;
381      contextList[contextId]=context ;
382      context->initServer(intraComm,contextIntercomm) ;
383
384     }
385
386
387     void CServer::contextEventLoop(void)
388     {
389       bool finished ;
390       map<string,CContext*>::iterator it ;
391       for(it=contextList.begin();it!=contextList.end();it++)
392       {
393         finished=it->second->eventLoop() ;
394         if (finished)
395         {
396           contextList.erase(it) ;
397           break ;
398         }
399       }
400
401     }
402
403     //! Get rank of the current process
404     int CServer::getRank()
405     {
406       return rank;
407     }
408
409     /*!
410      * \brief Open file stream to write in
411      *   Opening a file stream with a specific file name suffix-server+rank
412      * \param [in] protype file name
413     */
414     void CServer::openInfoStream(const StdString& fileName)
415     {
416       std::filebuf* fb = m_infoStream.rdbuf();
417       StdStringStream fileNameServer;
418       fileNameServer << fileName <<"_server_"<<getRank() << ".out";
419       fb->open(fileNameServer.str().c_str(), std::ios::out);
420       if (!fb->is_open())
421       ERROR("void CServer::openInfoStream(const StdString& fileName)",
422            <<endl<< "Can not open <"<<fileNameServer<<"> file to write" );
423
424       info.write2File(fb);
425     }
426
427     //! Open stream for standard output
428     void CServer::openInfoStream()
429     {
430       info.write2StdOut();
431     }
432
433     //! Close opening stream
434     void CServer::closeInfoStream()
435     {
436       if (m_infoStream.is_open()) m_infoStream.close();
437     }
438
439}
Note: See TracBrowser for help on using the repository browser.