source: vendor/nemo/current/NEMOGCM/EXTERNAL/XIOS/src/server.cpp @ 44

Last change on this file since 44 was 44, checked in by cholod, 12 years ago

Load NEMO_TMP into vendor/nemo/current.

File size: 10.5 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "cxios.hpp"
3#include "server.hpp"
4#include "type.hpp"
5#include "context.hpp"
6#include "object_template.hpp"
7#include "oasis_cinterface.hpp"
8#include <boost/functional/hash.hpp>
9#include <boost/algorithm/string.hpp>
10#include <mpi.h>
11#include "tracer.hpp"
12#include "timer.hpp"
13
14namespace xios
15{                     
16    MPI_Comm CServer::intraComm ;
17    list<MPI_Comm> CServer::interComm ;
18    bool CServer::isRoot ;
19    int CServer::rank ;
20    map<string,CContext*> CServer::contextList ;   
21    bool CServer::finished=false ;
22    bool CServer::is_MPI_Initialized ;
23   
24    void CServer::initialize(void)
25    {
26      int initialized ;
27      MPI_Initialized(&initialized) ;
28      if (initialized) is_MPI_Initialized=true ;
29      else is_MPI_Initialized=false ;
30           
31      // Not using OASIS
32      if (!CXios::usingOasis)
33      {
34       
35        if (!is_MPI_Initialized) 
36        {
37          int argc=0;
38          char** argv=NULL;
39          MPI_Init(&argc,&argv) ;
40        }
41         
42        boost::hash<string> hashString ;   
43     
44        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
45        unsigned long* hashAll ;
46     
47        int rank ;
48        int size ;
49        int myColor ;
50        int i,c ;
51        MPI_Comm newComm ;
52     
53        MPI_Comm_size(CXios::globalComm,&size) ;
54        MPI_Comm_rank(CXios::globalComm,&rank);
55        hashAll=new unsigned long[size] ;
56     
57        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
58
59        map<unsigned long, int> colors ;
60        map<unsigned long, int> leaders ;
61        map<unsigned long, int>::iterator it ;
62     
63        for(i=0,c=0;i<size;i++)
64        {
65          if (colors.find(hashAll[i])==colors.end())
66          {
67            colors[hashAll[i]]=c ;
68            leaders[hashAll[i]]=i ;
69            c++ ;
70          }
71        }
72     
73        myColor=colors[hashServer] ;
74        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
75
76        int serverLeader=leaders[hashServer] ;
77        int clientLeader;
78     
79         serverLeader=leaders[hashServer] ;
80         for(it=leaders.begin();it!=leaders.end();it++)
81         {
82           if (it->first!=hashServer)
83           {
84             clientLeader=it->second ;
85           
86             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
87             interComm.push_back(newComm) ;
88           }
89         }
90
91         delete [] hashAll ;
92      }
93      // using OASIS
94      else
95      {
96        int rank ,size;
97        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId) ;
98        oasis_get_localcomm(intraComm) ;
99        MPI_Comm_rank(intraComm,&rank) ;
100        MPI_Comm_size(intraComm,&size) ;
101        string codesId=CXios::getin<string>("oasis_codes_id") ;
102         
103        vector<string> splitted ;
104        boost::split( splitted, codesId, boost::is_space(), boost::token_compress_on ) ;
105        vector<string>::iterator it ;
106
107        MPI_Comm newComm ;
108        int globalRank ;
109        MPI_Comm_rank(CXios::globalComm,&globalRank);
110       
111        for(it=splitted.begin();it!=splitted.end();it++)
112        {
113          oasis_get_intercomm(newComm,*it) ;
114          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
115          MPI_Comm_remote_size(newComm,&size);
116          interComm.push_back(newComm) ;
117        }
118      }
119     
120      int rank ;
121      MPI_Comm_rank(intraComm,&rank) ;
122      if (rank==0) isRoot=true;
123      else isRoot=false; 
124      eventLoop() ;
125      finalize() ;
126    }
127   
128    void CServer::finalize(void)
129    {
130      if (!is_MPI_Initialized)
131      { 
132        if (CXios::usingOasis) oasis_finalize();
133        else MPI_Finalize() ;
134      }
135      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
136      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
137      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
138    }
139   
140     void CServer::eventLoop(void)
141     {
142       bool stop=false ;
143       
144       CTimer::get("XIOS server").resume() ;
145       while(!stop)
146       {
147         if (isRoot)
148         {
149           listenContext();
150           if (!finished) listenFinalize() ;
151         }
152         else
153         {
154           listenRootContext();
155           if (!finished) listenRootFinalize() ;
156         }
157       
158         contextEventLoop() ;
159         if (finished && contextList.empty()) stop=true ;
160       }
161       CTimer::get("XIOS server").suspend() ;
162     }
163     
164     void CServer::listenFinalize(void)
165     {
166        list<MPI_Comm>::iterator it;
167        int msg ;
168        int flag ;
169       
170        for(it=interComm.begin();it!=interComm.end();it++)
171        {
172           MPI_Status status ;
173           traceOff() ;
174           MPI_Iprobe(0,0,*it,&flag,&status) ;
175           traceOn() ;
176           if (flag==true)
177           {
178              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
179              info(20)<<" CServer : Receive client finalize"<<endl ;
180              interComm.erase(it) ;
181              break ;
182            }
183         }
184         
185         if (interComm.empty())
186         {
187           int i,size ;
188           MPI_Comm_size(intraComm,&size) ;
189           MPI_Request* requests= new MPI_Request[size-1] ;
190           MPI_Status* status= new MPI_Status[size-1] ;
191         
192           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
193           MPI_Waitall(size-1,requests,status) ;
194
195           finished=true ;
196           delete [] requests ;
197           delete [] status ;
198         }
199     }
200     
201     
202     void CServer::listenRootFinalize()
203     {
204        int flag ;
205        MPI_Status status ;
206        int msg ;
207       
208        traceOff() ;
209        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
210        traceOn() ;
211        if (flag==true)
212        {
213           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
214           finished=true ;
215        }
216      }
217     
218     void CServer::listenContext(void)
219     {
220       
221       MPI_Status status ;
222       int flag ;
223       static void* buffer ;
224       static MPI_Request request ;
225       static bool recept=false ;
226       int rank ;
227       int count ;
228       
229       if (recept==false)
230       {
231         traceOff() ;
232         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
233         traceOn() ;
234         if (flag==true) 
235         {
236           rank=status.MPI_SOURCE ;
237           MPI_Get_count(&status,MPI_CHAR,&count) ;
238           buffer=new char[count] ;
239           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
240           recept=true ;   
241         }
242       }
243       else
244       {
245         traceOff() ;
246         MPI_Test(&request,&flag,&status) ;
247         traceOn() ;
248         if (flag==true)
249         {
250           rank=status.MPI_SOURCE ;
251           MPI_Get_count(&status,MPI_CHAR,&count) ;
252           recvContextMessage(buffer,count) ;
253           delete [] buffer ;
254           recept=false ;         
255         }
256       }
257     }
258     
259     void CServer::recvContextMessage(void* buff,int count)
260     {
261 
262       
263       static map<string,contextMessage> recvContextId ;
264       map<string,contextMessage>::iterator it ;
265       
266       CBufferIn buffer(buff,count) ;
267       string id ;
268       int clientLeader ;
269       int nbMessage ;
270
271       buffer>>id>>nbMessage>>clientLeader ;
272                       
273       it=recvContextId.find(id) ;
274       if (it==recvContextId.end())
275       {         
276         contextMessage msg={0,0} ;
277         pair<map<string,contextMessage>::iterator,bool> ret ;
278         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
279         it=ret.first ;
280       } 
281       it->second.nbRecv+=1 ;
282       it->second.leaderRank+=clientLeader ;
283         
284       if (it->second.nbRecv==nbMessage)
285       { 
286         int size ;
287         MPI_Comm_size(intraComm,&size) ;
288         MPI_Request* requests= new MPI_Request[size-1] ;
289         MPI_Status* status= new MPI_Status[size-1] ;
290         
291         for(int i=1;i<size;i++)
292         {
293            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
294         }
295         MPI_Waitall(size-1,requests,status) ;
296         registerContext(buff,count,it->second.leaderRank) ;
297
298         recvContextId.erase(it) ;
299         delete [] requests ;
300         delete [] status ;
301
302       }
303     }     
304     
305     void CServer::listenRootContext(void)
306     {
307       
308       MPI_Status status ;
309       int flag ;
310       static void* buffer ;
311       static MPI_Request request ;
312       static bool recept=false ;
313       int rank ;
314       int count ;
315       const int root=0 ;
316       
317       if (recept==false)
318       {
319         traceOff() ;
320         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
321         traceOn() ;
322         if (flag==true) 
323         {
324           MPI_Get_count(&status,MPI_CHAR,&count) ;
325           buffer=new char[count] ;
326           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
327           recept=true ;   
328         }
329       }
330       else
331       {
332         MPI_Test(&request,&flag,&status) ;
333         if (flag==true)
334         {
335           MPI_Get_count(&status,MPI_CHAR,&count) ;
336           registerContext(buffer,count) ;
337           delete [] buffer ;
338           recept=false ;         
339         }
340       }
341     } 
342     
343     
344     
345     void CServer::registerContext(void* buff,int count, int leaderRank)
346     {
347     
348       string contextId;
349       CBufferIn buffer(buff,count) ;
350
351       buffer>>contextId ;
352       MPI_Comm contextIntercomm ;
353       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
354       
355       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
356       MPI_Comm inter ;
357       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
358       MPI_Barrier(inter) ;
359       if (contextList.find(contextId)!=contextList.end()) 
360        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
361              <<"Context has already been registred") ;
362     
363      CContext* context=CContext::create(contextId) ;
364      contextList[contextId]=context ;
365      context->initServer(intraComm,contextIntercomm) ;
366             
367     }   
368     
369     
370     void CServer::contextEventLoop(void)
371     {
372       bool finished ;
373       map<string,CContext*>::iterator it ;
374       for(it=contextList.begin();it!=contextList.end();it++) 
375       {
376         finished=it->second->eventLoop() ;
377         if (finished)
378         {
379           contextList.erase(it) ;
380           break ;
381         }
382       }
383         
384     }
385     
386}
Note: See TracBrowser for help on using the repository browser.