source: XIOS/trunk/src/server.cpp @ 361

Last change on this file since 361 was 361, checked in by ymipsl, 12 years ago

Bug correction : using MPI_WTIME MPI function for tracing before MPI_Init is called.

YM

  • Property svn:eol-style set to native
File size: 10.6 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "cxios.hpp"
3#include "server.hpp"
4#include "type.hpp"
5#include "context.hpp"
6#include "object_template.hpp"
7#include "oasis_cinterface.hpp"
8#include <boost/functional/hash.hpp>
9#include <boost/algorithm/string.hpp>
10#include <mpi.h>
11#include "tracer.hpp"
12#include "timer.hpp"
13
14namespace xios
15{                     
16    MPI_Comm CServer::intraComm ;
17    list<MPI_Comm> CServer::interComm ;
18    bool CServer::isRoot ;
19    int CServer::rank ;
20    map<string,CContext*> CServer::contextList ;   
21    bool CServer::finished=false ;
22    bool CServer::is_MPI_Initialized ;
23   
24    void CServer::initialize(void)
25    {
26      int initialized ;
27      MPI_Initialized(&initialized) ;
28      if (initialized) is_MPI_Initialized=true ;
29      else is_MPI_Initialized=false ;
30           
31      // Not using OASIS
32      if (!CXios::usingOasis)
33      {
34       
35        if (!is_MPI_Initialized) 
36        {
37          int argc=0;
38          char** argv=NULL;
39          MPI_Init(&argc,&argv) ;
40        }
41        CTimer::get("XIOS").resume() ;
42 
43        boost::hash<string> hashString ;   
44     
45        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
46        unsigned long* hashAll ;
47     
48        int rank ;
49        int size ;
50        int myColor ;
51        int i,c ;
52        MPI_Comm newComm ;
53     
54        MPI_Comm_size(CXios::globalComm,&size) ;
55        MPI_Comm_rank(CXios::globalComm,&rank);
56        hashAll=new unsigned long[size] ;
57     
58        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
59
60        map<unsigned long, int> colors ;
61        map<unsigned long, int> leaders ;
62        map<unsigned long, int>::iterator it ;
63     
64        for(i=0,c=0;i<size;i++)
65        {
66          if (colors.find(hashAll[i])==colors.end())
67          {
68            colors[hashAll[i]]=c ;
69            leaders[hashAll[i]]=i ;
70            c++ ;
71          }
72        }
73     
74        myColor=colors[hashServer] ;
75        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
76
77        int serverLeader=leaders[hashServer] ;
78        int clientLeader;
79     
80         serverLeader=leaders[hashServer] ;
81         for(it=leaders.begin();it!=leaders.end();it++)
82         {
83           if (it->first!=hashServer)
84           {
85             clientLeader=it->second ;
86           
87             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
88             interComm.push_back(newComm) ;
89           }
90         }
91
92         delete [] hashAll ;
93      }
94      // using OASIS
95      else
96      {
97        int rank ,size;
98        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId) ;
99        CTimer::get("XIOS").resume() ;
100        oasis_get_localcomm(intraComm) ;
101        MPI_Comm_rank(intraComm,&rank) ;
102        MPI_Comm_size(intraComm,&size) ;
103        string codesId=CXios::getin<string>("oasis_codes_id") ;
104         
105        vector<string> splitted ;
106        boost::split( splitted, codesId, boost::is_space(), boost::token_compress_on ) ;
107        vector<string>::iterator it ;
108
109        MPI_Comm newComm ;
110        int globalRank ;
111        MPI_Comm_rank(CXios::globalComm,&globalRank);
112       
113        for(it=splitted.begin();it!=splitted.end();it++)
114        {
115          oasis_get_intercomm(newComm,*it) ;
116          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
117          MPI_Comm_remote_size(newComm,&size);
118          interComm.push_back(newComm) ;
119        }
120      }
121     
122      int rank ;
123      MPI_Comm_rank(intraComm,&rank) ;
124      if (rank==0) isRoot=true;
125      else isRoot=false; 
126      eventLoop() ;
127      finalize() ;
128    }
129   
130    void CServer::finalize(void)
131    {
132      CTimer::get("XIOS").suspend() ;
133      if (!is_MPI_Initialized)
134      { 
135        if (CXios::usingOasis) oasis_finalize();
136        else MPI_Finalize() ;
137      }
138      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
139      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
140      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
141    }
142   
143     void CServer::eventLoop(void)
144     {
145       bool stop=false ;
146       
147       CTimer::get("XIOS server").resume() ;
148       while(!stop)
149       {
150         if (isRoot)
151         {
152           listenContext();
153           if (!finished) listenFinalize() ;
154         }
155         else
156         {
157           listenRootContext();
158           if (!finished) listenRootFinalize() ;
159         }
160       
161         contextEventLoop() ;
162         if (finished && contextList.empty()) stop=true ;
163       }
164       CTimer::get("XIOS server").suspend() ;
165     }
166     
167     void CServer::listenFinalize(void)
168     {
169        list<MPI_Comm>::iterator it;
170        int msg ;
171        int flag ;
172       
173        for(it=interComm.begin();it!=interComm.end();it++)
174        {
175           MPI_Status status ;
176           traceOff() ;
177           MPI_Iprobe(0,0,*it,&flag,&status) ;
178           traceOn() ;
179           if (flag==true)
180           {
181              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
182              info(20)<<" CServer : Receive client finalize"<<endl ;
183              interComm.erase(it) ;
184              break ;
185            }
186         }
187         
188         if (interComm.empty())
189         {
190           int i,size ;
191           MPI_Comm_size(intraComm,&size) ;
192           MPI_Request* requests= new MPI_Request[size-1] ;
193           MPI_Status* status= new MPI_Status[size-1] ;
194         
195           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
196           MPI_Waitall(size-1,requests,status) ;
197
198           finished=true ;
199           delete [] requests ;
200           delete [] status ;
201         }
202     }
203     
204     
205     void CServer::listenRootFinalize()
206     {
207        int flag ;
208        MPI_Status status ;
209        int msg ;
210       
211        traceOff() ;
212        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
213        traceOn() ;
214        if (flag==true)
215        {
216           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
217           finished=true ;
218        }
219      }
220     
221     void CServer::listenContext(void)
222     {
223       
224       MPI_Status status ;
225       int flag ;
226       static void* buffer ;
227       static MPI_Request request ;
228       static bool recept=false ;
229       int rank ;
230       int count ;
231       
232       if (recept==false)
233       {
234         traceOff() ;
235         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
236         traceOn() ;
237         if (flag==true) 
238         {
239           rank=status.MPI_SOURCE ;
240           MPI_Get_count(&status,MPI_CHAR,&count) ;
241           buffer=new char[count] ;
242           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
243           recept=true ;   
244         }
245       }
246       else
247       {
248         traceOff() ;
249         MPI_Test(&request,&flag,&status) ;
250         traceOn() ;
251         if (flag==true)
252         {
253           rank=status.MPI_SOURCE ;
254           MPI_Get_count(&status,MPI_CHAR,&count) ;
255           recvContextMessage(buffer,count) ;
256           delete [] buffer ;
257           recept=false ;         
258         }
259       }
260     }
261     
262     void CServer::recvContextMessage(void* buff,int count)
263     {
264 
265       
266       static map<string,contextMessage> recvContextId ;
267       map<string,contextMessage>::iterator it ;
268       
269       CBufferIn buffer(buff,count) ;
270       string id ;
271       int clientLeader ;
272       int nbMessage ;
273
274       buffer>>id>>nbMessage>>clientLeader ;
275                       
276       it=recvContextId.find(id) ;
277       if (it==recvContextId.end())
278       {         
279         contextMessage msg={0,0} ;
280         pair<map<string,contextMessage>::iterator,bool> ret ;
281         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
282         it=ret.first ;
283       } 
284       it->second.nbRecv+=1 ;
285       it->second.leaderRank+=clientLeader ;
286         
287       if (it->second.nbRecv==nbMessage)
288       { 
289         int size ;
290         MPI_Comm_size(intraComm,&size) ;
291         MPI_Request* requests= new MPI_Request[size-1] ;
292         MPI_Status* status= new MPI_Status[size-1] ;
293         
294         for(int i=1;i<size;i++)
295         {
296            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
297         }
298         MPI_Waitall(size-1,requests,status) ;
299         registerContext(buff,count,it->second.leaderRank) ;
300
301         recvContextId.erase(it) ;
302         delete [] requests ;
303         delete [] status ;
304
305       }
306     }     
307     
308     void CServer::listenRootContext(void)
309     {
310       
311       MPI_Status status ;
312       int flag ;
313       static void* buffer ;
314       static MPI_Request request ;
315       static bool recept=false ;
316       int rank ;
317       int count ;
318       const int root=0 ;
319       
320       if (recept==false)
321       {
322         traceOff() ;
323         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
324         traceOn() ;
325         if (flag==true) 
326         {
327           MPI_Get_count(&status,MPI_CHAR,&count) ;
328           buffer=new char[count] ;
329           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
330           recept=true ;   
331         }
332       }
333       else
334       {
335         MPI_Test(&request,&flag,&status) ;
336         if (flag==true)
337         {
338           MPI_Get_count(&status,MPI_CHAR,&count) ;
339           registerContext(buffer,count) ;
340           delete [] buffer ;
341           recept=false ;         
342         }
343       }
344     } 
345     
346     
347     
348     void CServer::registerContext(void* buff,int count, int leaderRank)
349     {
350     
351       string contextId;
352       CBufferIn buffer(buff,count) ;
353
354       buffer>>contextId ;
355       MPI_Comm contextIntercomm ;
356       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
357       
358       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
359       MPI_Comm inter ;
360       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
361       MPI_Barrier(inter) ;
362       if (contextList.find(contextId)!=contextList.end()) 
363        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
364              <<"Context has already been registred") ;
365     
366      CContext* context=CContext::create(contextId) ;
367      contextList[contextId]=context ;
368      context->initServer(intraComm,contextIntercomm) ;
369             
370     }   
371     
372     
373     void CServer::contextEventLoop(void)
374     {
375       bool finished ;
376       map<string,CContext*>::iterator it ;
377       for(it=contextList.begin();it!=contextList.end();it++) 
378       {
379         finished=it->second->eventLoop() ;
380         if (finished)
381         {
382           contextList.erase(it) ;
383           break ;
384         }
385       }
386         
387     }
388     
389}
Note: See TracBrowser for help on using the repository browser.