source: XIOS/dev/common/src/server_ym.cpp @ 1512

Last change on this file since 1512 was 300, checked in by ymipsl, 12 years ago

nouvelle version de developpement de xios

  • nouvelle interface fortran
  • recodage complet de la couche de communication
  • et bien d'autres choses...

YM

  • Property svn:eol-style set to native
File size: 10.1 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "cxios.hpp"
3#include "server_ym.hpp"
4#include "type.hpp"
5#include "buffer.hpp"
6#include "context.hpp"
7#include "object_template_impl.hpp"
8#include "tree_manager.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include <mpi.h>
13
14namespace xmlioserver
15{                     
16  namespace ym
17  {
18
19    MPI_Comm CServer::intraComm ;
20    list<MPI_Comm> CServer::interComm ;
21    bool CServer::isRoot ;
22    int CServer::rank ;
23    map<string,CContext*> CServer::contextList ;   
24    bool CServer::finished=false ;
25    bool CServer::is_MPI_Initialized ;
26   
27    void CServer::initialize(void)
28    {
29      int initialized ;
30      MPI_Initialized(&initialized) ;
31      if (initialized) is_MPI_Initialized=true ;
32      else is_MPI_Initialized=false ;
33           
34      // Not using OASIS
35      if (!CXios::usingOasis)
36      {
37       
38        if (!is_MPI_Initialized) 
39        {
40          int argc=0;
41          char** argv=NULL;
42          MPI_Init(&argc,&argv) ;
43        }
44         
45        boost::hash<string> hashString ;   
46     
47        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
48        unsigned long* hashAll ;
49     
50        int rank ;
51        int size ;
52        int myColor ;
53        int i,c ;
54        MPI_Comm newComm ;
55     
56        MPI_Comm_size(CXios::globalComm,&size) ;
57        MPI_Comm_rank(CXios::globalComm,&rank);
58        hashAll=new unsigned long[size] ;
59     
60        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
61
62        map<unsigned long, int> colors ;
63        map<unsigned long, int> leaders ;
64        map<unsigned long, int>::iterator it ;
65     
66        for(i=0,c=0;i<size;i++)
67        {
68          if (colors.find(hashAll[i])==colors.end())
69          {
70            colors[hashAll[i]]=c ;
71            leaders[hashAll[i]]=i ;
72            c++ ;
73          }
74        }
75     
76        myColor=colors[hashServer] ;
77        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
78
79        int serverLeader=leaders[hashServer] ;
80        int clientLeader;
81     
82         serverLeader=leaders[hashServer] ;
83         for(it=leaders.begin();it!=leaders.end();it++)
84         {
85           if (it->first!=hashServer)
86           {
87             clientLeader=it->second ;
88           
89             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
90             interComm.push_back(newComm) ;
91           }
92         }
93
94         delete [] hashAll ;
95      }
96      // using OASIS
97      else
98      {
99        int rank ,size;
100        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId) ;
101        oasis_get_localcomm(intraComm) ;
102        MPI_Comm_rank(intraComm,&rank) ;
103        MPI_Comm_size(intraComm,&size) ;
104        string codesId=CXios::getin<string>("oasis_codes_id") ;
105         
106        vector<string> splitted ;
107        boost::split( splitted, codesId, boost::is_space(), boost::token_compress_on ) ;
108        vector<string>::iterator it ;
109
110        MPI_Comm newComm ;
111        int globalRank ;
112        MPI_Comm_rank(CXios::globalComm,&globalRank);
113       
114        for(it=splitted.begin();it!=splitted.end();it++)
115        {
116          oasis_get_intercomm(newComm,*it) ;
117          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
118          MPI_Comm_remote_size(newComm,&size);
119          interComm.push_back(newComm) ;
120        }
121      }
122     
123      int rank ;
124      MPI_Comm_rank(intraComm,&rank) ;
125      if (rank==0) isRoot=true;
126      else isRoot=false; 
127      eventLoop() ;
128      finalize() ;
129    }
130   
131    void CServer::finalize(void)
132    {
133      if (!is_MPI_Initialized)
134      { 
135        if (CXios::usingOasis) oasis_finalize();
136        else MPI_Finalize() ;
137      }
138    }
139   
140     void CServer::eventLoop(void)
141     {
142       bool stop=false ;
143       
144       while(!stop)
145       {
146         if (isRoot)
147         {
148           listenContext();
149           if (!finished) listenFinalize() ;
150         }
151         else
152         {
153           listenRootContext();
154           if (!finished) listenRootFinalize() ;
155         }
156       
157         contextEventLoop() ;
158         if (finished && contextList.empty()) stop=true ;
159       }
160     
161     }
162     
163     void CServer::listenFinalize(void)
164     {
165        list<MPI_Comm>::iterator it;
166        int msg ;
167        int flag ;
168       
169        for(it=interComm.begin();it!=interComm.end();it++)
170        {
171           MPI_Status status ;
172           MPI_Iprobe(0,0,*it,&flag,&status) ;
173           if (flag==true)
174           {
175              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
176              info(20)<<" CServer : Receive client finalize"<<endl ;
177              interComm.erase(it) ;
178              break ;
179            }
180         }
181         
182         if (interComm.empty())
183         {
184           int i,size ;
185           MPI_Comm_size(intraComm,&size) ;
186           MPI_Request* requests= new MPI_Request[size-1] ;
187           MPI_Status* status= new MPI_Status[size-1] ;
188         
189           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
190           MPI_Waitall(size-1,requests,status) ;
191
192           finished=true ;
193           delete [] requests ;
194           delete [] status ;
195         }
196     }
197     
198     
199     void CServer::listenRootFinalize()
200     {
201        int flag ;
202        MPI_Status status ;
203        int msg ;
204       
205        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
206        if (flag==true)
207        {
208           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
209           finished=true ;
210        }
211      }
212     
213     void CServer::listenContext(void)
214     {
215       
216       MPI_Status status ;
217       int flag ;
218       static void* buffer ;
219       static MPI_Request request ;
220       static bool recept=false ;
221       int rank ;
222       int count ;
223       
224       if (recept==false)
225       {
226         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
227         if (flag==true) 
228         {
229           rank=status.MPI_SOURCE ;
230           MPI_Get_count(&status,MPI_CHAR,&count) ;
231           buffer=new char[count] ;
232           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
233           recept=true ;   
234         }
235       }
236       else
237       {
238         MPI_Test(&request,&flag,&status) ;
239         if (flag==true)
240         {
241           rank=status.MPI_SOURCE ;
242           MPI_Get_count(&status,MPI_CHAR,&count) ;
243           recvContextMessage(buffer,count) ;
244           delete [] buffer ;
245           recept=false ;         
246         }
247       }
248     }
249     
250     void CServer::recvContextMessage(void* buff,int count)
251     {
252 
253       
254       static map<string,contextMessage> recvContextId ;
255       map<string,contextMessage>::iterator it ;
256       
257       CBufferIn buffer(buff,count) ;
258       string id ;
259       int clientLeader ;
260       int nbMessage ;
261
262       buffer>>id>>nbMessage>>clientLeader ;
263                       
264       it=recvContextId.find(id) ;
265       if (it==recvContextId.end())
266       {         
267         contextMessage msg={0,0} ;
268         pair<map<string,contextMessage>::iterator,bool> ret ;
269         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
270         it=ret.first ;
271       } 
272       it->second.nbRecv+=1 ;
273       it->second.leaderRank+=clientLeader ;
274         
275       if (it->second.nbRecv==nbMessage)
276       { 
277         int size ;
278         MPI_Comm_size(intraComm,&size) ;
279         MPI_Request* requests= new MPI_Request[size-1] ;
280         MPI_Status* status= new MPI_Status[size-1] ;
281         
282         for(int i=1;i<size;i++)
283         {
284            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
285         }
286         MPI_Waitall(size-1,requests,status) ;
287         registerContext(buff,count,it->second.leaderRank) ;
288
289         recvContextId.erase(it) ;
290         delete [] requests ;
291         delete [] status ;
292
293       }
294     }     
295     
296     void CServer::listenRootContext(void)
297     {
298       
299       MPI_Status status ;
300       int flag ;
301       static void* buffer ;
302       static MPI_Request request ;
303       static bool recept=false ;
304       int rank ;
305       int count ;
306       const int root=0 ;
307       
308       if (recept==false)
309       {
310         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
311         if (flag==true) 
312         {
313           MPI_Get_count(&status,MPI_CHAR,&count) ;
314           buffer=new char[count] ;
315           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
316           recept=true ;   
317         }
318       }
319       else
320       {
321         MPI_Test(&request,&flag,&status) ;
322         if (flag==true)
323         {
324           MPI_Get_count(&status,MPI_CHAR,&count) ;
325           registerContext(buffer,count) ;
326           delete [] buffer ;
327           recept=false ;         
328         }
329       }
330     } 
331     
332     
333     
334     void CServer::registerContext(void* buff,int count, int leaderRank)
335     {
336     
337       string contextId;
338       CBufferIn buffer(buff,count) ;
339
340       buffer>>contextId ;
341       MPI_Comm contextIntercomm ;
342       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
343       
344       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
345       MPI_Comm inter ;
346       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
347       MPI_Barrier(inter) ;
348       if (contextList.find(contextId)!=contextList.end()) 
349        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
350              <<"Context has already been registred") ;
351     
352//      tree::CContext* ptr ;
353//      if (tree::CContext::has(contextId)) ptr=tree::CContext::get(contextId).get() ;
354//      else ptr=tree::CContext::create(contextId).get() ;
355      shared_ptr<CContext> context=tree::CTreeManager::CreateContext(contextId) ;
356      contextList[contextId]=context.get() ;
357      context->initServer(intraComm,contextIntercomm) ;
358             
359     }   
360     
361     
362     void CServer::contextEventLoop(void)
363     {
364       bool finished ;
365       map<string,CContext*>::iterator it ;
366       for(it=contextList.begin();it!=contextList.end();it++) 
367       {
368         finished=it->second->eventLoop() ;
369         if (finished)
370         {
371           contextList.erase(it) ;
372           break ;
373         }
374       }
375         
376     }
377     
378
379
380  }
381}
Note: See TracBrowser for help on using the repository browser.