source: XMLIO_V2/dev/common/src/xmlio/manager/mpi_manager.cpp @ 286

Last change on this file since 286 was 286, checked in by ymipsl, 13 years ago

reprise en main de la version de H. Ozdoba. Correction de différentes erreurs de conception et bug.
Version NEMO operationnel en client/server, interoperabilita avec OASIS, reconstition de fichiers via netcdf4/HDF5

YM

File size: 14.3 KB
Line 
1/* ************************************************************************** *
2 *      Copyright © IPSL/LSCE, XMLIOServer, Avril 2010 - Octobre 2011         *
3 * ************************************************************************** */
4 
5 /**
6 * \file    mpi_interface.cpp
7 * \brief   Gestion des communications MPI via une surcouche interne (implémentation).
8 * \author  Hervé Ozdoba
9 * \version 0.4
10 * \date    28 Juin 2011
11 */
12 
13// XMLIOServer headers
14#include "xmlioserver.hpp"
15#include "mpi_manager.hpp"
16#include "oasis_cinterface.hpp"
17
18
19// /////////////////////////////// Définitions ////////////////////////////// //
20
21namespace xmlioserver {
22namespace comm {
23
24   // ---------------------- Initialisation & Finalisation ---------------------
25
26   bool CMPIManager::Initialized=false ;
27   MPI_Comm CMPIManager::CommClient ;
28   MPI_Comm CMPIManager::CommServer ;
29   MPI_Comm CMPIManager::CommClientServer ;
30   int CMPIManager::NbClient ;
31   int CMPIManager::NbServer ;   
32   bool CMPIManager::_IsClient ;
33   bool CMPIManager::_IsServer ;   
34   bool CMPIManager::using_server ;
35   bool CMPIManager::using_oasis ;
36
37
38   void CMPIManager::Initialise(int * _argc, char *** _argv)
39   {
40      int flag = 0;
41      if (MPI_Initialized(&flag) != MPI_SUCCESS)
42         ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
43      if (!flag)
44      {
45         if (MPI_Init(_argc, _argv) != MPI_SUCCESS)
46            ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
47         Initialized=true ;
48      }
49
50   }
51   void CMPIManager::InitialiseClient(int * _argc, char *** _argv)
52   {
53      int flag = 0;
54      using_oasis=CObjectFactory::GetObject<CVariable>("xios","using_oasis")->getData<bool>() ; 
55      using_server=CObjectFactory::GetObject<CVariable>("xios","using_server")->getData<bool>(); 
56
57      Initialized=false ;
58
59      if (MPI_Initialized(&flag) != MPI_SUCCESS)
60         ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
61
62      if (!flag)
63      {
64         if (using_oasis) 
65         {
66           StdString oasisClientId=CObjectFactory::GetObject<CVariable>("xios","client_id")->getData<StdString>();
67           oasis_init(oasisClientId) ;
68         }
69         else
70         {       
71           if (MPI_Init(_argc, _argv) != MPI_SUCCESS)
72            ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
73         }
74         Initialized=true ;
75       }
76
77   }
78   
79   void CMPIManager::InitialiseServer(int * _argc, char *** _argv)
80   {
81      int flag = 0;
82     
83      using_oasis=CObjectFactory::GetObject<CVariable>("xios","using_oasis")->getData<bool>(); 
84      using_server=CObjectFactory::GetObject<CVariable>("xios","using_server")->getData<bool>(); 
85   
86      if (using_oasis)
87      {
88         StdString oasisServerId=CObjectFactory::GetObject<CVariable>("xios","server_id")->getData<StdString>(); 
89         oasis_init(oasisServerId) ;
90      }
91      else
92      {
93         if (MPI_Init(_argc, _argv) != MPI_SUCCESS)
94            ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
95      }
96      Initialized=true ;
97   }
98   
99   void CMPIManager::Finalize(void)
100   {
101      if (Initialized)
102      {
103        if (using_oasis) oasis_finalize() ;
104        else if (MPI_Finalize() != MPI_SUCCESS)
105                ERROR("CMPIManager::Finalize(void)", << " MPI Error !");
106      }
107   }
108   
109   // ------------------------------ Communicateurs ----------------------------
110   
111   int CMPIManager::GetCommRank(MPI_Comm _comm)
112   {
113      int rank = 0;
114      if (MPI_Comm_rank(_comm, &rank) != MPI_SUCCESS)
115         ERROR("CMPIManager::GetCommRank(comm)", << " MPI Error !");
116      return (rank);
117   }
118   
119   int CMPIManager::GetCommSize(MPI_Comm _comm)
120   {
121      int size = 0;
122      if (MPI_Comm_size(_comm, &size) != MPI_SUCCESS)
123         ERROR("CMPIManager::GetCommSize(comm)", << " MPI Error !");
124      return (size);
125   }
126   
127   MPI_Comm CMPIManager::CreateComm(MPI_Group _group, MPI_Comm _pcomm)
128   {
129      MPI_Comm commu;     
130      if (MPI_Comm_create(_pcomm, _group, &commu) != MPI_SUCCESS)
131         ERROR("CMPIManager::CreateComm(group, pcomm)", << " MPI Error !");
132      return (commu);
133   }
134   
135   //MPI_Comm CMPIManager::GetCommWorld(void)
136   //{
137   //   return (MPI_COMM_WORLD);
138   //}
139   
140   // ---------------------------------- Autre ---------------------------------
141         
142   void CMPIManager::Barrier(MPI_Comm _comm)
143   {
144      if (MPI_Barrier(_comm) != MPI_SUCCESS)
145         ERROR("CMPIManager::Barrier(comm)", << " MPI Error !");
146   }
147   
148   bool CMPIManager::DispatchClient(bool       _is_server,
149                                    MPI_Comm & _comm_client,
150                                    MPI_Comm & _comm_client_server,
151                                    MPI_Comm & _comm_server,
152                                    MPI_Comm   _comm_parent)
153   {
154      if (_is_server) { _IsServer=true ; _IsClient=false ; }
155      else { _IsServer=false ; _IsClient=true; }
156
157      if (_is_server)
158      {
159        if (using_oasis)
160        {
161          StdString oasisClientId=CObjectFactory::GetObject<CVariable>("xios","client_id")->getData<StdString>() ;
162          oasis_get_intracomm(_comm_parent,oasisClientId) ;
163        }
164        else _comm_parent=MPI_COMM_WORLD ;
165      }
166      else
167      {
168        if (!using_server)
169        {
170          NbClient=GetCommSize(_comm_parent) ;
171          NbServer=0 ;
172          _comm_server = _comm_client = _comm_parent ;
173          CommClient=_comm_client ;
174          CommServer=_comm_server ;
175          CommClientServer=_comm_client_server ;
176           return false ;
177        }
178        if (using_oasis) 
179        {
180          StdString oasisServerId=CObjectFactory::GetObject<CVariable>("xios","server_id")->getData<StdString>() ;
181          oasis_get_intracomm(_comm_parent,oasisServerId) ;
182        }
183      }   
184     
185     
186      int value = (_is_server) ? 1 : 2;
187      std::size_t nbClient = 0, nbServer = 0 ;
188      std::vector<int> nbClientByServer ;
189     
190      std::vector<int> info, rank_client, rank_server;
191      CMPIManager::AllGather(value, info, _comm_parent);
192
193      for (std::size_t s = 0;  s < info.size(); s++)
194      {
195         if (info[s] == 1) rank_server.push_back(s);
196         else rank_client.push_back(s);
197      }
198      nbClient = rank_client.size();
199      nbServer = rank_server.size();
200     
201      NbClient=nbClient ;
202      NbServer=nbServer ;
203     
204      if (nbClient == 0)
205         ERROR("CMPIManager::DispatchClient()", << " Aucun client disponible !");
206         
207
208      _comm_client = CMPIManager::CreateComm(CMPIManager::CreateSubGroup(
209                     CMPIManager::GetGroup(_comm_parent), rank_client), _comm_parent);
210
211      if (nbServer != 0)
212      {
213         std::size_t currentServer = 0;
214         
215//         nbClientByServer = nbClient/nbServer;
216         _comm_server = CMPIManager::CreateComm(CMPIManager::CreateSubGroup(
217                        CMPIManager::GetGroup(_comm_parent), rank_server), _comm_parent);
218
219         for(std::size_t mm=0;mm<nbServer;mm++) 
220         {
221           int x=nbClient/nbServer ;
222           if (mm<nbClient%nbServer) x++ ;
223           nbClientByServer.push_back(x) ;
224         }
225
226         for (std::size_t mm = 0; mm < nbClient; mm += nbClientByServer[currentServer],currentServer++ )
227         {
228            std::vector<int> group_rank;
229            group_rank.push_back(rank_server[currentServer]);
230            for (std::size_t nn = 0; nn < nbClientByServer[currentServer]; nn++)
231               group_rank.push_back(rank_client[nn+mm]);
232            MPI_Comm comm_client_server_ = CMPIManager::CreateComm(CMPIManager::CreateSubGroup(
233                                           CMPIManager::GetGroup(_comm_parent), group_rank), _comm_parent);
234
235            if (std::find(group_rank.begin(), group_rank.end(),
236                         CMPIManager::GetCommRank(_comm_parent)) != group_rank.end())
237            {
238               _comm_client_server = comm_client_server_;
239            }
240               
241            group_rank.clear();
242         }
243         CommClient=_comm_client ;
244         CommServer=_comm_server ;
245         CommClientServer=_comm_client_server ;
246         return (true);
247      }
248      else
249      {
250         _comm_server = _comm_client;
251         CommClient=_comm_client ;
252         CommServer=_comm_server ;
253         CommClientServer=_comm_client_server ;
254         return (false);
255      }
256   }
257   
258
259   // --------------------------------- Groupes --------------------------------
260         
261   MPI_Group CMPIManager::GetGroupWorld(void)
262   {
263      MPI_Group group = 0;
264      if (MPI_Comm_group(MPI_COMM_WORLD, &group) != MPI_SUCCESS)
265         ERROR("CMPIManager::GetGroupWorld()", << " MPI Error !");
266      return (group);
267   }
268
269   MPI_Group CMPIManager::GetGroup(MPI_Comm comm)
270   {
271      MPI_Group group = 0;
272      if (MPI_Comm_group(comm, &group) != MPI_SUCCESS)
273         ERROR("CMPIManager::GetGroup()", << " MPI Error !");
274      return (group);
275   }
276   
277   MPI_Group CMPIManager::CreateSubGroup(MPI_Group _pgroup, const std::vector<int> & _ranks)
278   {
279      MPI_Group group = 0;
280      if (MPI_Group_incl(_pgroup, _ranks.size(), const_cast<int*>(&(_ranks[0])), &group) != MPI_SUCCESS)
281         ERROR("CMPIManager::CreateSubGroup(pgroup, ranks)", << " MPI Error !");
282      return (group);
283   }
284   
285   MPI_Group CMPIManager::CreateSubGroup
286      (MPI_Group _pgroup, int _min_rank, int _max_rank, int _intval)
287   {
288      std::vector<int> ranks;
289      for (int i = _min_rank; i <= _max_rank; i += _intval)
290         ranks.push_back(i);
291      return (CMPIManager::CreateSubGroup(_pgroup, ranks));
292   }
293
294   // ----------------------------------- Tests --------------------------------
295         
296   bool CMPIManager::IsMaster(MPI_Comm _comm)
297   {
298      return (CMPIManager::GetCommRank(_comm) == 0); 
299   }
300   
301   bool CMPIManager::IsRank(int _rank, MPI_Comm _comm)
302   {
303      return (CMPIManager::GetCommRank(_comm) == _rank); 
304   }
305
306   // --------------------------- Communication simple -------------------------
307         
308   void CMPIManager::Send (MPI_Comm _comm, int _dest_rank, char * _data,
309                           std::size_t _size, MPI_Request & _request)
310   {
311      int nsize = _size;   
312      if (MPI_Issend(_data, nsize, MPI_CHAR, _dest_rank, 0, _comm, &_request) != MPI_SUCCESS)
313         ERROR("CMPIManager::Send (comm, dest_rank, data, size, request)", << " MPI Error !");
314   }
315   
316   void CMPIManager::Wait (MPI_Request & _request)
317   {
318      MPI_Status status;
319      if (MPI_Wait(&_request, &status) != MPI_SUCCESS)
320         ERROR("CMPIManager::Wait (request)", << " MPI Error !");
321   }
322   
323   bool CMPIManager::Test (MPI_Request & _request)
324   {
325      MPI_Status status;
326      int flag = 0;
327      if (MPI_Test(&_request, &flag, &status) != MPI_SUCCESS)
328         ERROR("CMPIManager::Test (request)", << " MPI Error !");
329      return (flag);
330   }
331
332   bool CMPIManager::HasReceivedData(MPI_Comm _comm, int _src_rank)
333   {
334      MPI_Status status;
335      int flag = 0;
336      if (MPI_Iprobe(_src_rank, MPI_ANY_TAG, _comm, &flag, &status) != MPI_SUCCESS)
337         ERROR("CMPIManager::HasReceivedData (comm, rank)", << " MPI Error !");
338      return (flag);
339   }
340   
341   std::size_t CMPIManager::GetReceivedDataSize(MPI_Comm _comm, int _src_rank)
342   {
343      MPI_Status status;
344      int flag = 0, size = 0;
345      if (MPI_Iprobe(_src_rank, MPI_ANY_TAG, _comm, &flag, &status) != MPI_SUCCESS)
346         ERROR("CMPIManager::getReceivedDataSize (comm, rank)", << " MPI Error !");
347      if (!flag) return (0); 
348      if (MPI_Get_count(&status, MPI_CHAR, &size) != MPI_SUCCESS)
349         ERROR("CMPIManager::getReceivedDataSize (comm, rank)", << " MPI Error !");
350
351      return (size);
352   }
353   
354   void CMPIManager::Receive(MPI_Comm _comm, int _src_rank, char * _data)
355   {
356      MPI_Request request = 0;
357      int size = CMPIManager::GetReceivedDataSize(_comm, _src_rank);
358      if (MPI_Irecv(_data, size, MPI_CHAR, _src_rank, MPI_ANY_TAG, _comm, &request) != MPI_SUCCESS)
359         ERROR("CMPIManager::Receive (comm, src_rank, data)", << " MPI Error !");
360      CMPIManager::Wait (request); // Temporaire
361   }
362   
363   void CMPIManager::AllGather(int _indata, std::vector<int> & _outdata, MPI_Comm _comm)
364   {
365      std::vector<int> data; data.push_back(_indata);
366      CMPIManager::AllGather(data, _outdata, _comm);
367   }
368
369   void  CMPIManager::AllGather(const std::vector<int> & _indata,
370                                      std::vector<int> & _outdata, MPI_Comm _comm)
371   {
372      int sendcount = _indata.size(),
373          recvcount = _indata.size() * CMPIManager::GetCommSize(_comm);
374      _outdata.resize(recvcount);     
375      if (MPI_Allgather ( const_cast<int*>(&(_indata[0])), sendcount, MPI_INTEGER,
376                                          &(_outdata[0]) , sendcount, MPI_INTEGER, _comm) != MPI_SUCCESS)
377         ERROR("CMPIManager::AllGather (indata, outdata, comm)", << " MPI Error !");
378   }
379         
380   // ------------------------- Communication 'complexe' -----------------------
381         
382   void CMPIManager::SendLinearBuffer(MPI_Comm _comm, int _dest_rank, CLinearBuffer & _lbuffer, MPI_Request & _request)
383   {
384      CMPIManager::Send(_comm, _dest_rank, _lbuffer, _lbuffer.getUsedSize(), _request);
385      _lbuffer.clear();
386   }
387   
388   void CMPIManager::ReceiveLinearBuffer(MPI_Comm _comm, int _src_rank, CLinearBuffer & _lbuffer)
389   {
390      CMPIManager::Receive(_comm, _src_rank, _lbuffer);
391      _lbuffer.computeBufferData();
392   }
393   
394   boost::shared_ptr<CLinearBuffer> CMPIManager::ReceiveLinearBuffer(MPI_Comm _comm, int _src_rank)
395   {
396      boost::shared_ptr<CLinearBuffer> buff_ptr
397         (new CLinearBuffer(CMPIManager::GetReceivedDataSize(_comm, _src_rank)));
398      CMPIManager::ReceiveLinearBuffer(_comm, _src_rank, *buff_ptr);
399      return (buff_ptr);
400   }
401   
402   void CMPIManager::ReceiveCircularBuffer(MPI_Comm _comm, int _src_rank, CCircularBuffer & _cbuffer)
403   {
404      std::size_t data_size  = CMPIManager::GetReceivedDataSize(_comm, _src_rank);
405      std::size_t data_begin = _cbuffer.prepareNextDataPosition(data_size);
406      CMPIManager::Receive(_comm, _src_rank, _cbuffer.getData(data_begin));
407      _cbuffer.updateNbRequests(data_begin, data_begin + data_size);
408   }
409
410   // ---------------------- Mémoire (non fonctionnel ....) --------------------
411         
412   void CMPIManager::AllocMemory(void * _data, std::size_t _size)
413   {
414      if (MPI_Alloc_mem(sizeof(char) * _size, MPI_INFO_NULL, _data) != MPI_SUCCESS)
415         ERROR("CMPIManager::AllocMem(data, size)", << " MPI Error !");
416   }
417   
418   void CMPIManager::FreeMemory (void * _data)
419   {
420      MPI_Free_mem(_data);
421   }
422
423} // namespace comm
424} // namespace xmlioserver
425
Note: See TracBrowser for help on using the repository browser.