source: XIOS/trunk/src/server_ym.cpp @ 314

Last change on this file since 314 was 314, checked in by ymipsl, 12 years ago

Removing obsolete files and some cleaning...

YM

  • Property svn:eol-style set to native
File size: 9.9 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "cxios.hpp"
3#include "server_ym.hpp"
4#include "type.hpp"
5#include "context.hpp"
6#include "object_template_impl.hpp"
7#include "tree_manager.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include <mpi.h>
12
13namespace xmlioserver
14{                     
15  namespace ym
16  {
17
18    MPI_Comm CServer::intraComm ;
19    list<MPI_Comm> CServer::interComm ;
20    bool CServer::isRoot ;
21    int CServer::rank ;
22    map<string,CContext*> CServer::contextList ;   
23    bool CServer::finished=false ;
24    bool CServer::is_MPI_Initialized ;
25   
26    void CServer::initialize(void)
27    {
28      int initialized ;
29      MPI_Initialized(&initialized) ;
30      if (initialized) is_MPI_Initialized=true ;
31      else is_MPI_Initialized=false ;
32           
33      // Not using OASIS
34      if (!CXios::usingOasis)
35      {
36       
37        if (!is_MPI_Initialized) 
38        {
39          int argc=0;
40          char** argv=NULL;
41          MPI_Init(&argc,&argv) ;
42        }
43         
44        boost::hash<string> hashString ;   
45     
46        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
47        unsigned long* hashAll ;
48     
49        int rank ;
50        int size ;
51        int myColor ;
52        int i,c ;
53        MPI_Comm newComm ;
54     
55        MPI_Comm_size(CXios::globalComm,&size) ;
56        MPI_Comm_rank(CXios::globalComm,&rank);
57        hashAll=new unsigned long[size] ;
58     
59        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
60
61        map<unsigned long, int> colors ;
62        map<unsigned long, int> leaders ;
63        map<unsigned long, int>::iterator it ;
64     
65        for(i=0,c=0;i<size;i++)
66        {
67          if (colors.find(hashAll[i])==colors.end())
68          {
69            colors[hashAll[i]]=c ;
70            leaders[hashAll[i]]=i ;
71            c++ ;
72          }
73        }
74     
75        myColor=colors[hashServer] ;
76        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
77
78        int serverLeader=leaders[hashServer] ;
79        int clientLeader;
80     
81         serverLeader=leaders[hashServer] ;
82         for(it=leaders.begin();it!=leaders.end();it++)
83         {
84           if (it->first!=hashServer)
85           {
86             clientLeader=it->second ;
87           
88             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
89             interComm.push_back(newComm) ;
90           }
91         }
92
93         delete [] hashAll ;
94      }
95      // using OASIS
96      else
97      {
98        int rank ,size;
99        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId) ;
100        oasis_get_localcomm(intraComm) ;
101        MPI_Comm_rank(intraComm,&rank) ;
102        MPI_Comm_size(intraComm,&size) ;
103        string codesId=CXios::getin<string>("oasis_codes_id") ;
104         
105        vector<string> splitted ;
106        boost::split( splitted, codesId, boost::is_space(), boost::token_compress_on ) ;
107        vector<string>::iterator it ;
108
109        MPI_Comm newComm ;
110        int globalRank ;
111        MPI_Comm_rank(CXios::globalComm,&globalRank);
112       
113        for(it=splitted.begin();it!=splitted.end();it++)
114        {
115          oasis_get_intercomm(newComm,*it) ;
116          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
117          MPI_Comm_remote_size(newComm,&size);
118          interComm.push_back(newComm) ;
119        }
120      }
121     
122      int rank ;
123      MPI_Comm_rank(intraComm,&rank) ;
124      if (rank==0) isRoot=true;
125      else isRoot=false; 
126      eventLoop() ;
127      finalize() ;
128    }
129   
130    void CServer::finalize(void)
131    {
132      if (!is_MPI_Initialized)
133      { 
134        if (CXios::usingOasis) oasis_finalize();
135        else MPI_Finalize() ;
136      }
137    }
138   
139     void CServer::eventLoop(void)
140     {
141       bool stop=false ;
142       
143       while(!stop)
144       {
145         if (isRoot)
146         {
147           listenContext();
148           if (!finished) listenFinalize() ;
149         }
150         else
151         {
152           listenRootContext();
153           if (!finished) listenRootFinalize() ;
154         }
155       
156         contextEventLoop() ;
157         if (finished && contextList.empty()) stop=true ;
158       }
159     
160     }
161     
162     void CServer::listenFinalize(void)
163     {
164        list<MPI_Comm>::iterator it;
165        int msg ;
166        int flag ;
167       
168        for(it=interComm.begin();it!=interComm.end();it++)
169        {
170           MPI_Status status ;
171           MPI_Iprobe(0,0,*it,&flag,&status) ;
172           if (flag==true)
173           {
174              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
175              info(20)<<" CServer : Receive client finalize"<<endl ;
176              interComm.erase(it) ;
177              break ;
178            }
179         }
180         
181         if (interComm.empty())
182         {
183           int i,size ;
184           MPI_Comm_size(intraComm,&size) ;
185           MPI_Request* requests= new MPI_Request[size-1] ;
186           MPI_Status* status= new MPI_Status[size-1] ;
187         
188           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
189           MPI_Waitall(size-1,requests,status) ;
190
191           finished=true ;
192           delete [] requests ;
193           delete [] status ;
194         }
195     }
196     
197     
198     void CServer::listenRootFinalize()
199     {
200        int flag ;
201        MPI_Status status ;
202        int msg ;
203       
204        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
205        if (flag==true)
206        {
207           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
208           finished=true ;
209        }
210      }
211     
212     void CServer::listenContext(void)
213     {
214       
215       MPI_Status status ;
216       int flag ;
217       static void* buffer ;
218       static MPI_Request request ;
219       static bool recept=false ;
220       int rank ;
221       int count ;
222       
223       if (recept==false)
224       {
225         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
226         if (flag==true) 
227         {
228           rank=status.MPI_SOURCE ;
229           MPI_Get_count(&status,MPI_CHAR,&count) ;
230           buffer=new char[count] ;
231           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
232           recept=true ;   
233         }
234       }
235       else
236       {
237         MPI_Test(&request,&flag,&status) ;
238         if (flag==true)
239         {
240           rank=status.MPI_SOURCE ;
241           MPI_Get_count(&status,MPI_CHAR,&count) ;
242           recvContextMessage(buffer,count) ;
243           delete [] buffer ;
244           recept=false ;         
245         }
246       }
247     }
248     
249     void CServer::recvContextMessage(void* buff,int count)
250     {
251 
252       
253       static map<string,contextMessage> recvContextId ;
254       map<string,contextMessage>::iterator it ;
255       
256       CBufferIn buffer(buff,count) ;
257       string id ;
258       int clientLeader ;
259       int nbMessage ;
260
261       buffer>>id>>nbMessage>>clientLeader ;
262                       
263       it=recvContextId.find(id) ;
264       if (it==recvContextId.end())
265       {         
266         contextMessage msg={0,0} ;
267         pair<map<string,contextMessage>::iterator,bool> ret ;
268         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
269         it=ret.first ;
270       } 
271       it->second.nbRecv+=1 ;
272       it->second.leaderRank+=clientLeader ;
273         
274       if (it->second.nbRecv==nbMessage)
275       { 
276         int size ;
277         MPI_Comm_size(intraComm,&size) ;
278         MPI_Request* requests= new MPI_Request[size-1] ;
279         MPI_Status* status= new MPI_Status[size-1] ;
280         
281         for(int i=1;i<size;i++)
282         {
283            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
284         }
285         MPI_Waitall(size-1,requests,status) ;
286         registerContext(buff,count,it->second.leaderRank) ;
287
288         recvContextId.erase(it) ;
289         delete [] requests ;
290         delete [] status ;
291
292       }
293     }     
294     
295     void CServer::listenRootContext(void)
296     {
297       
298       MPI_Status status ;
299       int flag ;
300       static void* buffer ;
301       static MPI_Request request ;
302       static bool recept=false ;
303       int rank ;
304       int count ;
305       const int root=0 ;
306       
307       if (recept==false)
308       {
309         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
310         if (flag==true) 
311         {
312           MPI_Get_count(&status,MPI_CHAR,&count) ;
313           buffer=new char[count] ;
314           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
315           recept=true ;   
316         }
317       }
318       else
319       {
320         MPI_Test(&request,&flag,&status) ;
321         if (flag==true)
322         {
323           MPI_Get_count(&status,MPI_CHAR,&count) ;
324           registerContext(buffer,count) ;
325           delete [] buffer ;
326           recept=false ;         
327         }
328       }
329     } 
330     
331     
332     
333     void CServer::registerContext(void* buff,int count, int leaderRank)
334     {
335     
336       string contextId;
337       CBufferIn buffer(buff,count) ;
338
339       buffer>>contextId ;
340       MPI_Comm contextIntercomm ;
341       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
342       
343       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
344       MPI_Comm inter ;
345       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
346       MPI_Barrier(inter) ;
347       if (contextList.find(contextId)!=contextList.end()) 
348        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
349              <<"Context has already been registred") ;
350     
351      shared_ptr<CContext> context=tree::CTreeManager::CreateContext(contextId) ;
352      contextList[contextId]=context.get() ;
353      context->initServer(intraComm,contextIntercomm) ;
354             
355     }   
356     
357     
358     void CServer::contextEventLoop(void)
359     {
360       bool finished ;
361       map<string,CContext*>::iterator it ;
362       for(it=contextList.begin();it!=contextList.end();it++) 
363       {
364         finished=it->second->eventLoop() ;
365         if (finished)
366         {
367           contextList.erase(it) ;
368           break ;
369         }
370       }
371         
372     }
373     
374
375
376  }
377}
Note: See TracBrowser for help on using the repository browser.