source: XIOS/trunk/src/server.cpp @ 346

Last change on this file since 346 was 346, checked in by ymipsl, 12 years ago

Suppress access to CObjectFactory class and CTreeManager.

YM

  • Property svn:eol-style set to native
File size: 9.8 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "cxios.hpp"
3#include "server.hpp"
4#include "type.hpp"
5#include "context.hpp"
6#include "object_template_impl.hpp"
7#include "tree_manager.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include <mpi.h>
12
13namespace xios
14{                     
15    MPI_Comm CServer::intraComm ;
16    list<MPI_Comm> CServer::interComm ;
17    bool CServer::isRoot ;
18    int CServer::rank ;
19    map<string,CContext*> CServer::contextList ;   
20    bool CServer::finished=false ;
21    bool CServer::is_MPI_Initialized ;
22   
23    void CServer::initialize(void)
24    {
25      int initialized ;
26      MPI_Initialized(&initialized) ;
27      if (initialized) is_MPI_Initialized=true ;
28      else is_MPI_Initialized=false ;
29           
30      // Not using OASIS
31      if (!CXios::usingOasis)
32      {
33       
34        if (!is_MPI_Initialized) 
35        {
36          int argc=0;
37          char** argv=NULL;
38          MPI_Init(&argc,&argv) ;
39        }
40         
41        boost::hash<string> hashString ;   
42     
43        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
44        unsigned long* hashAll ;
45     
46        int rank ;
47        int size ;
48        int myColor ;
49        int i,c ;
50        MPI_Comm newComm ;
51     
52        MPI_Comm_size(CXios::globalComm,&size) ;
53        MPI_Comm_rank(CXios::globalComm,&rank);
54        hashAll=new unsigned long[size] ;
55     
56        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
57
58        map<unsigned long, int> colors ;
59        map<unsigned long, int> leaders ;
60        map<unsigned long, int>::iterator it ;
61     
62        for(i=0,c=0;i<size;i++)
63        {
64          if (colors.find(hashAll[i])==colors.end())
65          {
66            colors[hashAll[i]]=c ;
67            leaders[hashAll[i]]=i ;
68            c++ ;
69          }
70        }
71     
72        myColor=colors[hashServer] ;
73        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
74
75        int serverLeader=leaders[hashServer] ;
76        int clientLeader;
77     
78         serverLeader=leaders[hashServer] ;
79         for(it=leaders.begin();it!=leaders.end();it++)
80         {
81           if (it->first!=hashServer)
82           {
83             clientLeader=it->second ;
84           
85             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
86             interComm.push_back(newComm) ;
87           }
88         }
89
90         delete [] hashAll ;
91      }
92      // using OASIS
93      else
94      {
95        int rank ,size;
96        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId) ;
97        oasis_get_localcomm(intraComm) ;
98        MPI_Comm_rank(intraComm,&rank) ;
99        MPI_Comm_size(intraComm,&size) ;
100        string codesId=CXios::getin<string>("oasis_codes_id") ;
101         
102        vector<string> splitted ;
103        boost::split( splitted, codesId, boost::is_space(), boost::token_compress_on ) ;
104        vector<string>::iterator it ;
105
106        MPI_Comm newComm ;
107        int globalRank ;
108        MPI_Comm_rank(CXios::globalComm,&globalRank);
109       
110        for(it=splitted.begin();it!=splitted.end();it++)
111        {
112          oasis_get_intercomm(newComm,*it) ;
113          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
114          MPI_Comm_remote_size(newComm,&size);
115          interComm.push_back(newComm) ;
116        }
117      }
118     
119      int rank ;
120      MPI_Comm_rank(intraComm,&rank) ;
121      if (rank==0) isRoot=true;
122      else isRoot=false; 
123      eventLoop() ;
124      finalize() ;
125    }
126   
127    void CServer::finalize(void)
128    {
129      if (!is_MPI_Initialized)
130      { 
131        if (CXios::usingOasis) oasis_finalize();
132        else MPI_Finalize() ;
133      }
134    }
135   
136     void CServer::eventLoop(void)
137     {
138       bool stop=false ;
139       
140       while(!stop)
141       {
142         if (isRoot)
143         {
144           listenContext();
145           if (!finished) listenFinalize() ;
146         }
147         else
148         {
149           listenRootContext();
150           if (!finished) listenRootFinalize() ;
151         }
152       
153         contextEventLoop() ;
154         if (finished && contextList.empty()) stop=true ;
155       }
156     
157     }
158     
159     void CServer::listenFinalize(void)
160     {
161        list<MPI_Comm>::iterator it;
162        int msg ;
163        int flag ;
164       
165        for(it=interComm.begin();it!=interComm.end();it++)
166        {
167           MPI_Status status ;
168           MPI_Iprobe(0,0,*it,&flag,&status) ;
169           if (flag==true)
170           {
171              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
172              info(20)<<" CServer : Receive client finalize"<<endl ;
173              interComm.erase(it) ;
174              break ;
175            }
176         }
177         
178         if (interComm.empty())
179         {
180           int i,size ;
181           MPI_Comm_size(intraComm,&size) ;
182           MPI_Request* requests= new MPI_Request[size-1] ;
183           MPI_Status* status= new MPI_Status[size-1] ;
184         
185           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
186           MPI_Waitall(size-1,requests,status) ;
187
188           finished=true ;
189           delete [] requests ;
190           delete [] status ;
191         }
192     }
193     
194     
195     void CServer::listenRootFinalize()
196     {
197        int flag ;
198        MPI_Status status ;
199        int msg ;
200       
201        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
202        if (flag==true)
203        {
204           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
205           finished=true ;
206        }
207      }
208     
209     void CServer::listenContext(void)
210     {
211       
212       MPI_Status status ;
213       int flag ;
214       static void* buffer ;
215       static MPI_Request request ;
216       static bool recept=false ;
217       int rank ;
218       int count ;
219       
220       if (recept==false)
221       {
222         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
223         if (flag==true) 
224         {
225           rank=status.MPI_SOURCE ;
226           MPI_Get_count(&status,MPI_CHAR,&count) ;
227           buffer=new char[count] ;
228           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
229           recept=true ;   
230         }
231       }
232       else
233       {
234         MPI_Test(&request,&flag,&status) ;
235         if (flag==true)
236         {
237           rank=status.MPI_SOURCE ;
238           MPI_Get_count(&status,MPI_CHAR,&count) ;
239           recvContextMessage(buffer,count) ;
240           delete [] buffer ;
241           recept=false ;         
242         }
243       }
244     }
245     
246     void CServer::recvContextMessage(void* buff,int count)
247     {
248 
249       
250       static map<string,contextMessage> recvContextId ;
251       map<string,contextMessage>::iterator it ;
252       
253       CBufferIn buffer(buff,count) ;
254       string id ;
255       int clientLeader ;
256       int nbMessage ;
257
258       buffer>>id>>nbMessage>>clientLeader ;
259                       
260       it=recvContextId.find(id) ;
261       if (it==recvContextId.end())
262       {         
263         contextMessage msg={0,0} ;
264         pair<map<string,contextMessage>::iterator,bool> ret ;
265         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
266         it=ret.first ;
267       } 
268       it->second.nbRecv+=1 ;
269       it->second.leaderRank+=clientLeader ;
270         
271       if (it->second.nbRecv==nbMessage)
272       { 
273         int size ;
274         MPI_Comm_size(intraComm,&size) ;
275         MPI_Request* requests= new MPI_Request[size-1] ;
276         MPI_Status* status= new MPI_Status[size-1] ;
277         
278         for(int i=1;i<size;i++)
279         {
280            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
281         }
282         MPI_Waitall(size-1,requests,status) ;
283         registerContext(buff,count,it->second.leaderRank) ;
284
285         recvContextId.erase(it) ;
286         delete [] requests ;
287         delete [] status ;
288
289       }
290     }     
291     
292     void CServer::listenRootContext(void)
293     {
294       
295       MPI_Status status ;
296       int flag ;
297       static void* buffer ;
298       static MPI_Request request ;
299       static bool recept=false ;
300       int rank ;
301       int count ;
302       const int root=0 ;
303       
304       if (recept==false)
305       {
306         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
307         if (flag==true) 
308         {
309           MPI_Get_count(&status,MPI_CHAR,&count) ;
310           buffer=new char[count] ;
311           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
312           recept=true ;   
313         }
314       }
315       else
316       {
317         MPI_Test(&request,&flag,&status) ;
318         if (flag==true)
319         {
320           MPI_Get_count(&status,MPI_CHAR,&count) ;
321           registerContext(buffer,count) ;
322           delete [] buffer ;
323           recept=false ;         
324         }
325       }
326     } 
327     
328     
329     
330     void CServer::registerContext(void* buff,int count, int leaderRank)
331     {
332     
333       string contextId;
334       CBufferIn buffer(buff,count) ;
335
336       buffer>>contextId ;
337       MPI_Comm contextIntercomm ;
338       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
339       
340       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
341       MPI_Comm inter ;
342       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
343       MPI_Barrier(inter) ;
344       if (contextList.find(contextId)!=contextList.end()) 
345        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
346              <<"Context has already been registred") ;
347     
348      shared_ptr<CContext> context=CContext::create(contextId) ;
349      contextList[contextId]=context.get() ;
350      context->initServer(intraComm,contextIntercomm) ;
351             
352     }   
353     
354     
355     void CServer::contextEventLoop(void)
356     {
357       bool finished ;
358       map<string,CContext*>::iterator it ;
359       for(it=contextList.begin();it!=contextList.end();it++) 
360       {
361         finished=it->second->eventLoop() ;
362         if (finished)
363         {
364           contextList.erase(it) ;
365           break ;
366         }
367       }
368         
369     }
370     
371}
Note: See TracBrowser for help on using the repository browser.