source: XIOS/trunk/src/server.cpp @ 486

Last change on this file since 486 was 483, checked in by aclsce, 10 years ago

Modified to be able to read in iodef.xml several component using XIOS servers to be coupled with OASIS-MCT.
The separator used is the comma.

  • Property svn:eol-style set to native
File size: 10.6 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "cxios.hpp"
3#include "server.hpp"
4#include "type.hpp"
5#include "context.hpp"
6#include "object_template.hpp"
7#include "oasis_cinterface.hpp"
8#include <boost/functional/hash.hpp>
9#include <boost/algorithm/string.hpp>
10#include "mpi.hpp"
11#include "tracer.hpp"
12#include "timer.hpp"
13
14namespace xios
15{                     
16    MPI_Comm CServer::intraComm ;
17    list<MPI_Comm> CServer::interComm ;
18    bool CServer::isRoot ;
19    int CServer::rank ;
20    map<string,CContext*> CServer::contextList ;   
21    bool CServer::finished=false ;
22    bool CServer::is_MPI_Initialized ;
23   
24    void CServer::initialize(void)
25    {
26      int initialized ;
27      MPI_Initialized(&initialized) ;
28      if (initialized) is_MPI_Initialized=true ;
29      else is_MPI_Initialized=false ;
30           
31      // Not using OASIS
32      if (!CXios::usingOasis)
33      {
34       
35        if (!is_MPI_Initialized) 
36        {
37          int argc=0;
38          char** argv=NULL;
39          MPI_Init(&argc,&argv) ;
40        }
41        CTimer::get("XIOS").resume() ;
42 
43        boost::hash<string> hashString ;   
44     
45        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
46        unsigned long* hashAll ;
47     
48        int rank ;
49        int size ;
50        int myColor ;
51        int i,c ;
52        MPI_Comm newComm ;
53     
54        MPI_Comm_size(CXios::globalComm,&size) ;
55        MPI_Comm_rank(CXios::globalComm,&rank);
56        hashAll=new unsigned long[size] ;
57     
58        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
59
60        map<unsigned long, int> colors ;
61        map<unsigned long, int> leaders ;
62        map<unsigned long, int>::iterator it ;
63     
64        for(i=0,c=0;i<size;i++)
65        {
66          if (colors.find(hashAll[i])==colors.end())
67          {
68            colors[hashAll[i]]=c ;
69            leaders[hashAll[i]]=i ;
70            c++ ;
71          }
72        }
73     
74        myColor=colors[hashServer] ;
75        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
76
77        int serverLeader=leaders[hashServer] ;
78        int clientLeader;
79     
80         serverLeader=leaders[hashServer] ;
81         for(it=leaders.begin();it!=leaders.end();it++)
82         {
83           if (it->first!=hashServer)
84           {
85             clientLeader=it->second ;
86           
87             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
88             interComm.push_back(newComm) ;
89           }
90         }
91
92         delete [] hashAll ;
93      }
94      // using OASIS
95      else
96      {
97        int rank ,size;
98        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId) ;
99        CTimer::get("XIOS").resume() ;
100        oasis_get_localcomm(intraComm) ;
101        MPI_Comm_rank(intraComm,&rank) ;
102        MPI_Comm_size(intraComm,&size) ;
103        string codesId=CXios::getin<string>("oasis_codes_id") ;
104         
105        vector<string> splitted ;
106        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
107        vector<string>::iterator it ;
108
109        MPI_Comm newComm ;
110        int globalRank ;
111        MPI_Comm_rank(CXios::globalComm,&globalRank);
112       
113        for(it=splitted.begin();it!=splitted.end();it++)
114        {
115          oasis_get_intercomm(newComm,*it) ;
116          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
117          MPI_Comm_remote_size(newComm,&size);
118          interComm.push_back(newComm) ;
119        }
120        oasis_enddef() ;
121      }
122     
123      int rank ;
124      MPI_Comm_rank(intraComm,&rank) ;
125      if (rank==0) isRoot=true;
126      else isRoot=false; 
127      eventLoop() ;
128      finalize() ;
129    }
130   
131    void CServer::finalize(void)
132    {
133      CTimer::get("XIOS").suspend() ;
134      if (!is_MPI_Initialized)
135      { 
136        if (CXios::usingOasis) oasis_finalize();
137        else MPI_Finalize() ;
138      }
139      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
140      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
141      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
142    }
143   
144     void CServer::eventLoop(void)
145     {
146       bool stop=false ;
147       
148       CTimer::get("XIOS server").resume() ;
149       while(!stop)
150       {
151         if (isRoot)
152         {
153           listenContext();
154           if (!finished) listenFinalize() ;
155         }
156         else
157         {
158           listenRootContext();
159           if (!finished) listenRootFinalize() ;
160         }
161       
162         contextEventLoop() ;
163         if (finished && contextList.empty()) stop=true ;
164       }
165       CTimer::get("XIOS server").suspend() ;
166     }
167     
168     void CServer::listenFinalize(void)
169     {
170        list<MPI_Comm>::iterator it;
171        int msg ;
172        int flag ;
173       
174        for(it=interComm.begin();it!=interComm.end();it++)
175        {
176           MPI_Status status ;
177           traceOff() ;
178           MPI_Iprobe(0,0,*it,&flag,&status) ;
179           traceOn() ;
180           if (flag==true)
181           {
182              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
183              info(20)<<" CServer : Receive client finalize"<<endl ;
184              interComm.erase(it) ;
185              break ;
186            }
187         }
188         
189         if (interComm.empty())
190         {
191           int i,size ;
192           MPI_Comm_size(intraComm,&size) ;
193           MPI_Request* requests= new MPI_Request[size-1] ;
194           MPI_Status* status= new MPI_Status[size-1] ;
195         
196           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
197           MPI_Waitall(size-1,requests,status) ;
198
199           finished=true ;
200           delete [] requests ;
201           delete [] status ;
202         }
203     }
204     
205     
206     void CServer::listenRootFinalize()
207     {
208        int flag ;
209        MPI_Status status ;
210        int msg ;
211       
212        traceOff() ;
213        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
214        traceOn() ;
215        if (flag==true)
216        {
217           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
218           finished=true ;
219        }
220      }
221     
222     void CServer::listenContext(void)
223     {
224       
225       MPI_Status status ;
226       int flag ;
227       static void* buffer ;
228       static MPI_Request request ;
229       static bool recept=false ;
230       int rank ;
231       int count ;
232       
233       if (recept==false)
234       {
235         traceOff() ;
236         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
237         traceOn() ;
238         if (flag==true) 
239         {
240           rank=status.MPI_SOURCE ;
241           MPI_Get_count(&status,MPI_CHAR,&count) ;
242           buffer=new char[count] ;
243           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
244           recept=true ;   
245         }
246       }
247       else
248       {
249         traceOff() ;
250         MPI_Test(&request,&flag,&status) ;
251         traceOn() ;
252         if (flag==true)
253         {
254           rank=status.MPI_SOURCE ;
255           MPI_Get_count(&status,MPI_CHAR,&count) ;
256           recvContextMessage(buffer,count) ;
257           delete [] buffer ;
258           recept=false ;         
259         }
260       }
261     }
262     
263     void CServer::recvContextMessage(void* buff,int count)
264     {
265 
266       
267       static map<string,contextMessage> recvContextId ;
268       map<string,contextMessage>::iterator it ;
269       
270       CBufferIn buffer(buff,count) ;
271       string id ;
272       int clientLeader ;
273       int nbMessage ;
274
275       buffer>>id>>nbMessage>>clientLeader ;
276                       
277       it=recvContextId.find(id) ;
278       if (it==recvContextId.end())
279       {         
280         contextMessage msg={0,0} ;
281         pair<map<string,contextMessage>::iterator,bool> ret ;
282         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
283         it=ret.first ;
284       } 
285       it->second.nbRecv+=1 ;
286       it->second.leaderRank+=clientLeader ;
287         
288       if (it->second.nbRecv==nbMessage)
289       { 
290         int size ;
291         MPI_Comm_size(intraComm,&size) ;
292         MPI_Request* requests= new MPI_Request[size-1] ;
293         MPI_Status* status= new MPI_Status[size-1] ;
294         
295         for(int i=1;i<size;i++)
296         {
297            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
298         }
299         MPI_Waitall(size-1,requests,status) ;
300         registerContext(buff,count,it->second.leaderRank) ;
301
302         recvContextId.erase(it) ;
303         delete [] requests ;
304         delete [] status ;
305
306       }
307     }     
308     
309     void CServer::listenRootContext(void)
310     {
311       
312       MPI_Status status ;
313       int flag ;
314       static void* buffer ;
315       static MPI_Request request ;
316       static bool recept=false ;
317       int rank ;
318       int count ;
319       const int root=0 ;
320       
321       if (recept==false)
322       {
323         traceOff() ;
324         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
325         traceOn() ;
326         if (flag==true) 
327         {
328           MPI_Get_count(&status,MPI_CHAR,&count) ;
329           buffer=new char[count] ;
330           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
331           recept=true ;   
332         }
333       }
334       else
335       {
336         MPI_Test(&request,&flag,&status) ;
337         if (flag==true)
338         {
339           MPI_Get_count(&status,MPI_CHAR,&count) ;
340           registerContext(buffer,count) ;
341           delete [] buffer ;
342           recept=false ;         
343         }
344       }
345     } 
346     
347     
348     
349     void CServer::registerContext(void* buff,int count, int leaderRank)
350     {
351     
352       string contextId;
353       CBufferIn buffer(buff,count) ;
354
355       buffer>>contextId ;
356       MPI_Comm contextIntercomm ;
357       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
358       
359       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
360       MPI_Comm inter ;
361       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
362       MPI_Barrier(inter) ;
363       if (contextList.find(contextId)!=contextList.end()) 
364        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
365              <<"Context has already been registred") ;
366     
367      CContext* context=CContext::create(contextId) ;
368      contextList[contextId]=context ;
369      context->initServer(intraComm,contextIntercomm) ;
370             
371     }   
372     
373     
374     void CServer::contextEventLoop(void)
375     {
376       bool finished ;
377       map<string,CContext*>::iterator it ;
378       for(it=contextList.begin();it!=contextList.end();it++) 
379       {
380         finished=it->second->eventLoop() ;
381         if (finished)
382         {
383           contextList.erase(it) ;
384           break ;
385         }
386       }
387         
388     }
389     
390}
Note: See TracBrowser for help on using the repository browser.