source: XIOS3/trunk/src/transport/legacy_context_client.cpp @ 2528

Last change on this file since 2528 was 2528, checked in by jderouillat, 11 months ago

Fix intracommunicator probing for attached mode

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 17.2 KB
Line 
1#include "xios_spl.hpp"
2#include "legacy_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14#include "services.hpp"
15#include "ressources_manager.hpp"
16#include <boost/functional/hash.hpp>
17#include <random>
18#include <chrono>
19
20namespace xios
21{
22    /*!
23    \param [in] parent Pointer to context on client side
24    \param [in] intraComm_ communicator of group client
25    \param [in] interComm_ communicator of group server
26    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
27    */
28    CLegacyContextClient::CLegacyContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
29                         : CContextClient(parent, intraComm_, interComm_, cxtSer),
30                           mapBufferSize_(),  maxBufferedEvents(4)
31    {
32      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
33      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
34
35      if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
36      else interCommMerged_ = interComm_; // interComm_ is yet an intracommunicator in attached
37     
38      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
39
40      timeLine = 1;
41    }
42
43    CContextClient::ETransport getType(void) {return CContextClient::legacy ;}
44
45    /*!
46    In case of attached mode, the current context must be reset to context for client
47    \param [in] event Event sent to server
48    */
49    void CLegacyContextClient::sendEvent(CEventClient& event)
50    {
51      list<int> ranks = event.getRanks();
52 
53//      ostringstream str ;
54//      for(auto& rank : ranks) str<<rank<<" ; " ;
55//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
56
57      if (CXios::checkEventSync)
58      {
59        int typeId, classId, typeId_in, classId_in;
60        long long timeLine_out;
61        long long timeLine_in( timeLine );
62        typeId_in=event.getTypeId() ;
63        classId_in=event.getClassId() ;
64//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
65        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
66        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
67        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
68        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
69        {
70           ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
71               << "Event are not coherent between client for timeline = "<<timeLine);
72        }
73       
74        vector<int> servers(serverSize,0) ;
75        auto ranks=event.getRanks() ;
76        for(auto& rank : ranks) servers[rank]=1 ;
77        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
78        ostringstream osstr ;
79        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
80        if (!osstr.str().empty())
81        {
82          ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
83                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
84                 <<"Servers are : "<<osstr.str()) ;
85        }
86
87
88      }
89
90      if (!event.isEmpty())
91      {
92        list<int> sizes = event.getSizes();
93
94         // We force the getBuffers call to be non-blocking on classical servers
95        list<CBufferOut*> buffList;
96        getBuffers(timeLine, ranks, sizes, buffList) ;
97
98        event.send(timeLine, sizes, buffList);
99       
100        //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ;
101
102        unlockBuffers(ranks) ;
103        checkBuffers(ranks);
104       
105      }
106     
107      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
108      {
109        while (checkBuffers(ranks)) callGlobalEventLoop() ;
110     
111        CXios::getDaemonsManager()->scheduleContext(hashId_) ;
112        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ;
113      }
114     
115      MPI_Request req ;
116      MPI_Status status ;
117      MPI_Ibarrier(intraComm,&req) ;
118      int flag ;
119      MPI_Test(&req,&flag,&status) ;
120      while(!flag) 
121      {
122        callGlobalEventLoop() ;
123        MPI_Test(&req,&flag,&status) ;
124      }
125
126
127      timeLine++;
128    }
129
130
131    /*!
132     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
133     * it is explicitly requested to be non-blocking.
134     *
135     *
136     * \param [in] timeLine time line of the event which will be sent to servers
137     * \param [in] serverList list of rank of connected server
138     * \param [in] sizeList size of message corresponding to each connection
139     * \param [out] retBuffers list of buffers that can be used to store an event
140     * \param [in] nonBlocking whether this function should be non-blocking
141     * \return whether the already allocated buffers could be used
142    */
143    void CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers)
144    {
145      list<int>::const_iterator itServer, itSize;
146      list<CClientBuffer*> bufferList;
147      map<int,CClientBuffer*>::const_iterator it;
148      list<CClientBuffer*>::iterator itBuffer;
149      bool areBuffersFree;
150     
151      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
152      {
153        it = buffers.find(*itServer);
154        if (it == buffers.end())
155        {
156          CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ;
157          size_t token = tokenManager->getToken() ;
158          while (!tokenManager->lockToken(token)) callGlobalEventLoop() ;
159          newBuffer(*itServer);
160          it = buffers.find(*itServer);
161          checkAttachWindows(it->second,it->first) ;
162          tokenManager->unlockToken(token) ;
163        }
164        bufferList.push_back(it->second);
165      }
166
167      double lastTimeBuffersNotFree=0. ;
168      double time ;
169      bool doUnlockBuffers ;
170      CTimer::get("Blocking time").resume();
171      do
172      {
173        areBuffersFree = true;
174        doUnlockBuffers=false ;
175        time=MPI_Wtime() ;
176        if (time-lastTimeBuffersNotFree > latency_)
177        {
178          for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
179          {
180            areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
181          }
182          if (!areBuffersFree)
183          {
184            lastTimeBuffersNotFree = time ;
185            doUnlockBuffers=true ;
186          }         
187        }
188        else areBuffersFree = false ;
189
190        if (!areBuffersFree)
191        {
192          if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer();
193          checkBuffers();
194
195          callGlobalEventLoop() ;
196        }
197
198      } while (!areBuffersFree);
199      CTimer::get("Blocking time").suspend();
200
201      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
202        retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize));
203   }
204
205   void CLegacyContextClient::eventLoop(void)
206   {
207      if (!locked_) checkBuffers() ;
208   }
209
210   void CLegacyContextClient::callGlobalEventLoop(void)
211   {
212     locked_=true ;
213     context_->globalEventLoop() ;
214     locked_=false ;
215   }
216   /*!
217   Make a new buffer for a certain connection to server with specific rank
218   \param [in] rank rank of connected server
219   */
220   void CLegacyContextClient::newBuffer(int rank)
221   {
222      if (!mapBufferSize_.count(rank))
223      {
224        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
225        mapBufferSize_[rank] = CXios::minBufferSize;
226        maxEventSizes[rank] = CXios::minBufferSize;
227      }
228     
229      int considerServers = 1;
230      if (isAttachedModeEnabled()) considerServers = 0;
231      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interCommMerged_, considerServers*clientSize+rank, mapBufferSize_[rank], maxEventSizes[rank]);
232      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ;
233      else buffer->fixBuffer() ;
234      // Notify the server
235      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint));
236      MPI_Aint sendBuff[4] ;
237      sendBuff[0]=hashId_;
238      sendBuff[1]=mapBufferSize_[rank];
239      sendBuff[2]=buffers[rank]->getWinAddress(0); 
240      sendBuff[3]=buffers[rank]->getWinAddress(1); 
241      info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl;
242      bufOut->put(sendBuff, 4); 
243      buffer->checkBuffer(true);
244/*
245       // create windows dynamically for one-sided
246      if (!isAttachedModeEnabled())
247      {
248        CTimer::get("create Windows").resume() ;
249        MPI_Comm interComm ;
250        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
251        MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
252        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
253        MPI_Comm_free(&interComm) ;
254        windows_[rank].resize(2) ;
255       
256        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
257        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
258       
259        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
260        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
261
262        CTimer::get("create Windows").suspend() ;
263      }
264      else
265      {
266        winComm_[rank] = MPI_COMM_NULL ;
267        windows_[rank].resize(2) ;
268        windows_[rank][0] = MPI_WIN_NULL ;
269        windows_[rank][1] = MPI_WIN_NULL ;
270      }
271      buffer->attachWindows(windows_[rank]) ;
272      if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ;
273  */     
274   }
275
276   void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank)
277   {
278      if (!buffer->isAttachedWindows())
279      {
280           // create windows dynamically for one-sided
281        if (!isAttachedModeEnabled())
282        { 
283          CTimer::get("create Windows").resume() ;
284          MPI_Comm interComm ;
285          MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
286          MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
287          CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
288          MPI_Comm_free(&interComm) ;
289          windows_[rank].resize(2) ;
290     
291          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
292          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
293     
294          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
295          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
296
297          CTimer::get("create Windows").suspend() ;
298          buffer->attachWindows(windows_[rank]) ;
299          MPI_Barrier(winComm_[rank]) ;
300        }
301        else
302        {
303          winComm_[rank] = MPI_COMM_NULL ;
304          windows_[rank].resize(2) ;
305          windows_[rank][0] = MPI_WIN_NULL ;
306          windows_[rank][1] = MPI_WIN_NULL ;
307          buffer->attachWindows(windows_[rank]) ;
308        }
309
310      }
311    }
312
313
314 
315   /*!
316   Verify state of buffers. Buffer is under pending state if there is no message on it
317   \return state of buffers, pending(true), ready(false)
318   */
319   bool CLegacyContextClient::checkBuffers(void)
320   {
321      map<int,CClientBuffer*>::iterator itBuff;
322      bool pending = false;
323      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
324        pending |= itBuff->second->checkBuffer(!pureOneSided);
325      return pending;
326   }
327
328   //! Release all buffers
329   void CLegacyContextClient::releaseBuffers()
330   {
331      map<int,CClientBuffer*>::iterator itBuff;
332      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
333      {
334         delete itBuff->second;
335      }
336      buffers.clear();
337
338// don't know when release windows
339
340      //if (!isAttachedModeEnabled())
341      //{ 
342      //  for(auto& it : winComm_)
343      //  {
344      //    int rank = it.first ;
345      //    MPI_Win_free(&windows_[rank][0]);
346      //    MPI_Win_free(&windows_[rank][1]);
347      //    MPI_Comm_free(&winComm_[rank]) ;
348      //  }
349      //}
350   }
351
352     
353  /*!
354   Lock the buffers for one sided communications
355   \param [in] ranks list rank of server to which client connects to
356   */
357   void CLegacyContextClient::lockBuffers(list<int>& ranks)
358   {
359      list<int>::iterator it;
360      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer();
361   }
362
363  /*!
364   Unlock the buffers for one sided communications
365   \param [in] ranks list rank of server to which client connects to
366   */
367   void CLegacyContextClient::unlockBuffers(list<int>& ranks)
368   {
369      list<int>::iterator it;
370      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer();
371   }
372     
373   /*!
374   Verify state of buffers corresponding to a connection
375   \param [in] ranks list rank of server to which client connects to
376   \return state of buffers, pending(true), ready(false)
377   */
378   bool CLegacyContextClient::checkBuffers(list<int>& ranks)
379   {
380      list<int>::iterator it;
381      bool pending = false;
382      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided);
383      return pending;
384   }
385
386   /*!
387    * Set the buffer size for each connection. Warning: This function is collective.
388    *
389    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
390    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
391   */
392   void CLegacyContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
393   {
394     setFixedBuffer() ;
395     for(auto& it : mapSize)
396     {
397      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ;
398      mapBufferSize_[it.first]=size ;
399      if (buffers.count(it.first)>0) buffers[it.first]->fixBufferSize(size);
400     }
401   }
402
403   /*!
404   * Finalize context client and do some reports. Function is non-blocking.
405   */
406  void CLegacyContextClient::finalize(void)
407  {
408    map<int,CClientBuffer*>::iterator itBuff;
409    std::list<int>::iterator ItServerLeader; 
410   
411    bool stop = false;
412
413    int* nbServerConnectionLocal  = new int[serverSize] ;
414    int* nbServerConnectionGlobal  = new int[serverSize] ;
415    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
416    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
417    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
418   
419    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
420   
421    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
422    CMessage msg;
423
424    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
425    sendEvent(event);
426
427    delete[] nbServerConnectionLocal ;
428    delete[] nbServerConnectionGlobal ;
429
430
431    CTimer::get("Blocking time").resume();
432    checkBuffers();
433    CTimer::get("Blocking time").suspend();
434
435    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
436                                          iteMap = mapBufferSize_.end(), itMap;
437
438    StdSize totalBuf = 0;
439    for (itMap = itbMap; itMap != iteMap; ++itMap)
440    {
441      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
442                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
443      totalBuf += itMap->second;
444    }
445    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
446
447  }
448
449
450  /*!
451  */
452  bool CLegacyContextClient::havePendingRequests(void)
453  {
454    bool pending = false;
455    map<int,CClientBuffer*>::iterator itBuff;
456    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
457      pending |= itBuff->second->hasPendingRequest();
458    return pending;
459  }
460 
461  bool CLegacyContextClient::havePendingRequests(list<int>& ranks)
462  {
463      list<int>::iterator it;
464      bool pending = false;
465      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->hasPendingRequest();
466      return pending;
467  }
468
469  bool CLegacyContextClient::isNotifiedFinalized(void)
470  {
471    if (isAttachedModeEnabled()) return true ;
472
473    bool finalized = true;
474    map<int,CClientBuffer*>::iterator itBuff;
475    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
476      finalized &= itBuff->second->isNotifiedFinalized();
477    return finalized;
478  }
479
480}
Note: See TracBrowser for help on using the repository browser.