source: XIOS/trunk/src/server.cpp @ 491

Last change on this file since 491 was 491, checked in by mhnguyen, 10 years ago

Removing using_server from iodef.xml file. From now on, XIOS is capable of dectectin automatically server mode

+) Check whether server is in the MPI global group
+) Add some more methods to set/unset using_server
+) Do some cleaning stuff

Test
+) On Curie
+) All test passed correctly without using_server id

  • Property svn:eol-style set to native
File size: 11.3 KB
Line 
1#include "globalScopeData.hpp"
2#include "xmlioserver_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include "mpi.hpp"
12#include "tracer.hpp"
13#include "timer.hpp"
14
15namespace xios
16{
17    MPI_Comm CServer::intraComm ;
18    list<MPI_Comm> CServer::interComm ;
19    bool CServer::isRoot ;
20    int CServer::rank = INVALID_RANK;
21    StdOFStream CServer::m_infoStream;
22    map<string,CContext*> CServer::contextList ;
23    bool CServer::finished=false ;
24    bool CServer::is_MPI_Initialized ;
25
26    void CServer::initialize(void)
27    {
28      int initialized ;
29      MPI_Initialized(&initialized) ;
30      if (initialized) is_MPI_Initialized=true ;
31      else is_MPI_Initialized=false ;
32
33      // Not using OASIS
34      if (!CXios::usingOasis)
35      {
36
37        if (!is_MPI_Initialized)
38        {
39          int argc=0;
40          char** argv=NULL;
41          MPI_Init(&argc,&argv) ;
42        }
43        CTimer::get("XIOS").resume() ;
44
45        boost::hash<string> hashString ;
46
47        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
48        unsigned long* hashAll ;
49
50//        int rank ;
51        int size ;
52        int myColor ;
53        int i,c ;
54        MPI_Comm newComm ;
55
56        MPI_Comm_size(CXios::globalComm,&size) ;
57        MPI_Comm_rank(CXios::globalComm,&rank);
58        hashAll=new unsigned long[size] ;
59
60        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
61
62        map<unsigned long, int> colors ;
63        map<unsigned long, int> leaders ;
64        map<unsigned long, int>::iterator it ;
65
66        for(i=0,c=0;i<size;i++)
67        {
68          if (colors.find(hashAll[i])==colors.end())
69          {
70            colors[hashAll[i]]=c ;
71            leaders[hashAll[i]]=i ;
72            c++ ;
73          }
74        }
75
76        myColor=colors[hashServer] ;
77        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
78
79        int serverLeader=leaders[hashServer] ;
80        int clientLeader;
81
82         serverLeader=leaders[hashServer] ;
83         for(it=leaders.begin();it!=leaders.end();it++)
84         {
85           if (it->first!=hashServer)
86           {
87             clientLeader=it->second ;
88
89             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
90             interComm.push_back(newComm) ;
91           }
92         }
93
94         delete [] hashAll ;
95      }
96      // using OASIS
97      else
98      {
99//        int rank ,size;
100        int size;
101        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
102
103        CTimer::get("XIOS").resume() ;
104        oasis_get_localcomm(intraComm) ;
105        MPI_Comm_rank(intraComm,&rank) ;
106        MPI_Comm_size(intraComm,&size) ;
107        string codesId=CXios::getin<string>("oasis_codes_id") ;
108
109        vector<string> splitted ;
110        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
111        vector<string>::iterator it ;
112
113        MPI_Comm newComm ;
114        int globalRank ;
115        MPI_Comm_rank(CXios::globalComm,&globalRank);
116
117        for(it=splitted.begin();it!=splitted.end();it++)
118        {
119          oasis_get_intercomm(newComm,*it) ;
120          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
121          MPI_Comm_remote_size(newComm,&size);
122          interComm.push_back(newComm) ;
123        }
124        oasis_enddef() ;
125      }
126
127//      int rank;
128      MPI_Comm_rank(intraComm,&rank) ;
129      if (rank==0) isRoot=true;
130      else isRoot=false;
131//      eventLoop() ;
132//      finalize() ;
133    }
134
135    void CServer::finalize(void)
136    {
137      CTimer::get("XIOS").suspend() ;
138      if (!is_MPI_Initialized)
139      {
140        if (CXios::usingOasis) oasis_finalize();
141        else MPI_Finalize() ;
142      }
143      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
144      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
145      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
146    }
147
148     void CServer::eventLoop(void)
149     {
150       bool stop=false ;
151
152       CTimer::get("XIOS server").resume() ;
153       while(!stop)
154       {
155         if (isRoot)
156         {
157           listenContext();
158           if (!finished) listenFinalize() ;
159         }
160         else
161         {
162           listenRootContext();
163           if (!finished) listenRootFinalize() ;
164         }
165
166         contextEventLoop() ;
167         if (finished && contextList.empty()) stop=true ;
168       }
169       CTimer::get("XIOS server").suspend() ;
170     }
171
172     void CServer::listenFinalize(void)
173     {
174        list<MPI_Comm>::iterator it;
175        int msg ;
176        int flag ;
177
178        for(it=interComm.begin();it!=interComm.end();it++)
179        {
180           MPI_Status status ;
181           traceOff() ;
182           MPI_Iprobe(0,0,*it,&flag,&status) ;
183           traceOn() ;
184           if (flag==true)
185           {
186              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
187              info(20)<<" CServer : Receive client finalize"<<endl ;
188              interComm.erase(it) ;
189              break ;
190            }
191         }
192
193         if (interComm.empty())
194         {
195           int i,size ;
196           MPI_Comm_size(intraComm,&size) ;
197           MPI_Request* requests= new MPI_Request[size-1] ;
198           MPI_Status* status= new MPI_Status[size-1] ;
199
200           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
201           MPI_Waitall(size-1,requests,status) ;
202
203           finished=true ;
204           delete [] requests ;
205           delete [] status ;
206         }
207     }
208
209
210     void CServer::listenRootFinalize()
211     {
212        int flag ;
213        MPI_Status status ;
214        int msg ;
215
216        traceOff() ;
217        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
218        traceOn() ;
219        if (flag==true)
220        {
221           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
222           finished=true ;
223        }
224      }
225
226     void CServer::listenContext(void)
227     {
228
229       MPI_Status status ;
230       int flag ;
231       static void* buffer ;
232       static MPI_Request request ;
233       static bool recept=false ;
234       int rank ;
235       int count ;
236
237       if (recept==false)
238       {
239         traceOff() ;
240         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
241         traceOn() ;
242         if (flag==true)
243         {
244           rank=status.MPI_SOURCE ;
245           MPI_Get_count(&status,MPI_CHAR,&count) ;
246           buffer=new char[count] ;
247           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
248           recept=true ;
249         }
250       }
251       else
252       {
253         traceOff() ;
254         MPI_Test(&request,&flag,&status) ;
255         traceOn() ;
256         if (flag==true)
257         {
258           rank=status.MPI_SOURCE ;
259           MPI_Get_count(&status,MPI_CHAR,&count) ;
260           recvContextMessage(buffer,count) ;
261           delete [] buffer ;
262           recept=false ;
263         }
264       }
265     }
266
267     void CServer::recvContextMessage(void* buff,int count)
268     {
269       static map<string,contextMessage> recvContextId ;
270       map<string,contextMessage>::iterator it ;
271
272       CBufferIn buffer(buff,count) ;
273       string id ;
274       int clientLeader ;
275       int nbMessage ;
276
277       buffer>>id>>nbMessage>>clientLeader ;
278
279       it=recvContextId.find(id) ;
280       if (it==recvContextId.end())
281       {
282         contextMessage msg={0,0} ;
283         pair<map<string,contextMessage>::iterator,bool> ret ;
284         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
285         it=ret.first ;
286       }
287       it->second.nbRecv+=1 ;
288       it->second.leaderRank+=clientLeader ;
289
290       if (it->second.nbRecv==nbMessage)
291       {
292         int size ;
293         MPI_Comm_size(intraComm,&size) ;
294         MPI_Request* requests= new MPI_Request[size-1] ;
295         MPI_Status* status= new MPI_Status[size-1] ;
296
297         for(int i=1;i<size;i++)
298         {
299            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
300         }
301         MPI_Waitall(size-1,requests,status) ;
302         registerContext(buff,count,it->second.leaderRank) ;
303
304         recvContextId.erase(it) ;
305         delete [] requests ;
306         delete [] status ;
307
308       }
309     }
310
311     void CServer::listenRootContext(void)
312     {
313
314       MPI_Status status ;
315       int flag ;
316       static void* buffer ;
317       static MPI_Request request ;
318       static bool recept=false ;
319       int rank ;
320       int count ;
321       const int root=0 ;
322
323       if (recept==false)
324       {
325         traceOff() ;
326         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
327         traceOn() ;
328         if (flag==true)
329         {
330           MPI_Get_count(&status,MPI_CHAR,&count) ;
331           buffer=new char[count] ;
332           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
333           recept=true ;
334         }
335       }
336       else
337       {
338         MPI_Test(&request,&flag,&status) ;
339         if (flag==true)
340         {
341           MPI_Get_count(&status,MPI_CHAR,&count) ;
342           registerContext(buffer,count) ;
343           delete [] buffer ;
344           recept=false ;
345         }
346       }
347     }
348
349
350
351     void CServer::registerContext(void* buff,int count, int leaderRank)
352     {
353
354       string contextId;
355       CBufferIn buffer(buff,count) ;
356
357       buffer>>contextId ;
358       MPI_Comm contextIntercomm ;
359       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
360
361       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
362       MPI_Comm inter ;
363       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
364       MPI_Barrier(inter) ;
365       if (contextList.find(contextId)!=contextList.end())
366        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
367              <<"Context has already been registred") ;
368
369      CContext* context=CContext::create(contextId) ;
370      contextList[contextId]=context ;
371      context->initServer(intraComm,contextIntercomm) ;
372
373     }
374
375
376     void CServer::contextEventLoop(void)
377     {
378       bool finished ;
379       map<string,CContext*>::iterator it ;
380       for(it=contextList.begin();it!=contextList.end();it++)
381       {
382         finished=it->second->eventLoop() ;
383         if (finished)
384         {
385           contextList.erase(it) ;
386           break ;
387         }
388       }
389
390     }
391
392     //! Get rank of the current process
393     int CServer::getRank()
394     {
395       return rank;
396     }
397
398     /*!
399      * \brief Open file stream to write in
400      *   Opening a file stream with a specific file name suffix-server+rank
401      * \param [in] protype file name
402     */
403     void CServer::openInfoStream(const StdString& fileName)
404     {
405       std::filebuf* fb = m_infoStream.rdbuf();
406       StdStringStream fileNameServer;
407       fileNameServer << fileName <<"_server_"<<getRank() << ".out";
408       fb->open(fileNameServer.str().c_str(), std::ios::out);
409       if (!fb->is_open())
410       ERROR("void CServer::openInfoStream(const StdString& fileName)",
411            <<endl<< "Can not open <"<<fileNameServer<<"> file to write" );
412
413       info.write2File(fb);
414     }
415
416     //! Open stream for standard output
417     void CServer::openInfoStream()
418     {
419       info.write2StdOut();
420     }
421
422     //! Close opening stream
423     void CServer::closeInfoStream()
424     {
425       if (m_infoStream.is_open()) m_infoStream.close();
426     }
427
428}
Note: See TracBrowser for help on using the repository browser.