source: XIOS/trunk/src/server.cpp @ 475

Last change on this file since 475 was 475, checked in by aclsce, 10 years ago

Modified to use Oasis-MCT coupler in coupled mode.

  • Property svn:eol-style set to native
File size: 10.6 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "cxios.hpp"
3#include "server.hpp"
4#include "type.hpp"
5#include "context.hpp"
6#include "object_template.hpp"
7#include "oasis_cinterface.hpp"
8#include <boost/functional/hash.hpp>
9#include <boost/algorithm/string.hpp>
10#include "mpi.hpp"
11#include "tracer.hpp"
12#include "timer.hpp"
13
14namespace xios
15{                     
16    MPI_Comm CServer::intraComm ;
17    list<MPI_Comm> CServer::interComm ;
18    bool CServer::isRoot ;
19    int CServer::rank ;
20    map<string,CContext*> CServer::contextList ;   
21    bool CServer::finished=false ;
22    bool CServer::is_MPI_Initialized ;
23   
24    void CServer::initialize(void)
25    {
26      int initialized ;
27      MPI_Initialized(&initialized) ;
28      if (initialized) is_MPI_Initialized=true ;
29      else is_MPI_Initialized=false ;
30           
31      // Not using OASIS
32      if (!CXios::usingOasis)
33      {
34       
35        if (!is_MPI_Initialized) 
36        {
37          int argc=0;
38          char** argv=NULL;
39          MPI_Init(&argc,&argv) ;
40        }
41        CTimer::get("XIOS").resume() ;
42 
43        boost::hash<string> hashString ;   
44     
45        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
46        unsigned long* hashAll ;
47     
48        int rank ;
49        int size ;
50        int myColor ;
51        int i,c ;
52        MPI_Comm newComm ;
53     
54        MPI_Comm_size(CXios::globalComm,&size) ;
55        MPI_Comm_rank(CXios::globalComm,&rank);
56        hashAll=new unsigned long[size] ;
57     
58        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
59
60        map<unsigned long, int> colors ;
61        map<unsigned long, int> leaders ;
62        map<unsigned long, int>::iterator it ;
63     
64        for(i=0,c=0;i<size;i++)
65        {
66          if (colors.find(hashAll[i])==colors.end())
67          {
68            colors[hashAll[i]]=c ;
69            leaders[hashAll[i]]=i ;
70            c++ ;
71          }
72        }
73     
74        myColor=colors[hashServer] ;
75        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
76
77        int serverLeader=leaders[hashServer] ;
78        int clientLeader;
79     
80         serverLeader=leaders[hashServer] ;
81         for(it=leaders.begin();it!=leaders.end();it++)
82         {
83           if (it->first!=hashServer)
84           {
85             clientLeader=it->second ;
86           
87             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
88             interComm.push_back(newComm) ;
89           }
90         }
91
92         delete [] hashAll ;
93      }
94      // using OASIS
95      else
96      {
97        int rank ,size;
98        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId) ;
99        CTimer::get("XIOS").resume() ;
100        oasis_get_localcomm(intraComm) ;
101        MPI_Comm_rank(intraComm,&rank) ;
102        MPI_Comm_size(intraComm,&size) ;
103        string codesId=CXios::getin<string>("oasis_codes_id") ;
104         
105        vector<string> splitted ;
106        boost::split( splitted, codesId, boost::is_space(), boost::token_compress_on ) ;
107        vector<string>::iterator it ;
108
109        MPI_Comm newComm ;
110        int globalRank ;
111        MPI_Comm_rank(CXios::globalComm,&globalRank);
112       
113        for(it=splitted.begin();it!=splitted.end();it++)
114        {
115          oasis_get_intercomm(newComm,*it) ;
116          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
117          MPI_Comm_remote_size(newComm,&size);
118          interComm.push_back(newComm) ;
119        }
120        oasis_enddef() ;
121      }
122     
123      int rank ;
124      MPI_Comm_rank(intraComm,&rank) ;
125      if (rank==0) isRoot=true;
126      else isRoot=false; 
127      eventLoop() ;
128      finalize() ;
129    }
130   
131    void CServer::finalize(void)
132    {
133      CTimer::get("XIOS").suspend() ;
134      if (!is_MPI_Initialized)
135      { 
136        if (CXios::usingOasis) oasis_finalize();
137        else MPI_Finalize() ;
138      }
139      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
140      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
141      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
142    }
143   
144     void CServer::eventLoop(void)
145     {
146       bool stop=false ;
147       
148       CTimer::get("XIOS server").resume() ;
149       while(!stop)
150       {
151         if (isRoot)
152         {
153           listenContext();
154           if (!finished) listenFinalize() ;
155         }
156         else
157         {
158           listenRootContext();
159           if (!finished) listenRootFinalize() ;
160         }
161       
162         contextEventLoop() ;
163         if (finished && contextList.empty()) stop=true ;
164       }
165       CTimer::get("XIOS server").suspend() ;
166     }
167     
168     void CServer::listenFinalize(void)
169     {
170        list<MPI_Comm>::iterator it;
171        int msg ;
172        int flag ;
173       
174        for(it=interComm.begin();it!=interComm.end();it++)
175        {
176           MPI_Status status ;
177           traceOff() ;
178           MPI_Iprobe(0,0,*it,&flag,&status) ;
179           traceOn() ;
180           if (flag==true)
181           {
182              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
183              info(20)<<" CServer : Receive client finalize"<<endl ;
184              interComm.erase(it) ;
185              break ;
186            }
187         }
188         
189         if (interComm.empty())
190         {
191           int i,size ;
192           MPI_Comm_size(intraComm,&size) ;
193           MPI_Request* requests= new MPI_Request[size-1] ;
194           MPI_Status* status= new MPI_Status[size-1] ;
195         
196           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
197           MPI_Waitall(size-1,requests,status) ;
198
199           finished=true ;
200           delete [] requests ;
201           delete [] status ;
202         }
203     }
204     
205     
206     void CServer::listenRootFinalize()
207     {
208        int flag ;
209        MPI_Status status ;
210        int msg ;
211       
212        traceOff() ;
213        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
214        traceOn() ;
215        if (flag==true)
216        {
217           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
218           finished=true ;
219        }
220      }
221     
222     void CServer::listenContext(void)
223     {
224       
225       MPI_Status status ;
226       int flag ;
227       static void* buffer ;
228       static MPI_Request request ;
229       static bool recept=false ;
230       int rank ;
231       int count ;
232       
233       if (recept==false)
234       {
235         traceOff() ;
236         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
237         traceOn() ;
238         if (flag==true) 
239         {
240           rank=status.MPI_SOURCE ;
241           MPI_Get_count(&status,MPI_CHAR,&count) ;
242           buffer=new char[count] ;
243           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
244           recept=true ;   
245         }
246       }
247       else
248       {
249         traceOff() ;
250         MPI_Test(&request,&flag,&status) ;
251         traceOn() ;
252         if (flag==true)
253         {
254           rank=status.MPI_SOURCE ;
255           MPI_Get_count(&status,MPI_CHAR,&count) ;
256           recvContextMessage(buffer,count) ;
257           delete [] buffer ;
258           recept=false ;         
259         }
260       }
261     }
262     
263     void CServer::recvContextMessage(void* buff,int count)
264     {
265 
266       
267       static map<string,contextMessage> recvContextId ;
268       map<string,contextMessage>::iterator it ;
269       
270       CBufferIn buffer(buff,count) ;
271       string id ;
272       int clientLeader ;
273       int nbMessage ;
274
275       buffer>>id>>nbMessage>>clientLeader ;
276                       
277       it=recvContextId.find(id) ;
278       if (it==recvContextId.end())
279       {         
280         contextMessage msg={0,0} ;
281         pair<map<string,contextMessage>::iterator,bool> ret ;
282         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
283         it=ret.first ;
284       } 
285       it->second.nbRecv+=1 ;
286       it->second.leaderRank+=clientLeader ;
287         
288       if (it->second.nbRecv==nbMessage)
289       { 
290         int size ;
291         MPI_Comm_size(intraComm,&size) ;
292         MPI_Request* requests= new MPI_Request[size-1] ;
293         MPI_Status* status= new MPI_Status[size-1] ;
294         
295         for(int i=1;i<size;i++)
296         {
297            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
298         }
299         MPI_Waitall(size-1,requests,status) ;
300         registerContext(buff,count,it->second.leaderRank) ;
301
302         recvContextId.erase(it) ;
303         delete [] requests ;
304         delete [] status ;
305
306       }
307     }     
308     
309     void CServer::listenRootContext(void)
310     {
311       
312       MPI_Status status ;
313       int flag ;
314       static void* buffer ;
315       static MPI_Request request ;
316       static bool recept=false ;
317       int rank ;
318       int count ;
319       const int root=0 ;
320       
321       if (recept==false)
322       {
323         traceOff() ;
324         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
325         traceOn() ;
326         if (flag==true) 
327         {
328           MPI_Get_count(&status,MPI_CHAR,&count) ;
329           buffer=new char[count] ;
330           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
331           recept=true ;   
332         }
333       }
334       else
335       {
336         MPI_Test(&request,&flag,&status) ;
337         if (flag==true)
338         {
339           MPI_Get_count(&status,MPI_CHAR,&count) ;
340           registerContext(buffer,count) ;
341           delete [] buffer ;
342           recept=false ;         
343         }
344       }
345     } 
346     
347     
348     
349     void CServer::registerContext(void* buff,int count, int leaderRank)
350     {
351     
352       string contextId;
353       CBufferIn buffer(buff,count) ;
354
355       buffer>>contextId ;
356       MPI_Comm contextIntercomm ;
357       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
358       
359       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
360       MPI_Comm inter ;
361       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
362       MPI_Barrier(inter) ;
363       if (contextList.find(contextId)!=contextList.end()) 
364        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
365              <<"Context has already been registred") ;
366     
367      CContext* context=CContext::create(contextId) ;
368      contextList[contextId]=context ;
369      context->initServer(intraComm,contextIntercomm) ;
370             
371     }   
372     
373     
374     void CServer::contextEventLoop(void)
375     {
376       bool finished ;
377       map<string,CContext*>::iterator it ;
378       for(it=contextList.begin();it!=contextList.end();it++) 
379       {
380         finished=it->second->eventLoop() ;
381         if (finished)
382         {
383           contextList.erase(it) ;
384           break ;
385         }
386       }
387         
388     }
389     
390}
Note: See TracBrowser for help on using the repository browser.