source: XIOS3/trunk/src/transport/legacy_context_client.cpp @ 2458

Last change on this file since 2458 was 2458, checked in by ymipsl, 16 months ago

Merge XIOS_FILE_SERVICE dev branch into trunk

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 16.9 KB
Line 
1#include "xios_spl.hpp"
2#include "legacy_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14#include "services.hpp"
15#include "ressources_manager.hpp"
16#include <boost/functional/hash.hpp>
17#include <random>
18#include <chrono>
19
20namespace xios
21{
22    /*!
23    \param [in] parent Pointer to context on client side
24    \param [in] intraComm_ communicator of group client
25    \param [in] interComm_ communicator of group server
26    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
27    */
28    CLegacyContextClient::CLegacyContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
29                         : CContextClient(parent, intraComm_, interComm_, cxtSer),
30                           mapBufferSize_(),  maxBufferedEvents(4)
31    {
32      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
33      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
34
35      if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
36     
37      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
38
39      timeLine = 1;
40    }
41
42
43    /*!
44    In case of attached mode, the current context must be reset to context for client
45    \param [in] event Event sent to server
46    */
47    void CLegacyContextClient::sendEvent(CEventClient& event)
48    {
49      list<int> ranks = event.getRanks();
50 
51//      ostringstream str ;
52//      for(auto& rank : ranks) str<<rank<<" ; " ;
53//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
54
55      if (CXios::checkEventSync)
56      {
57        int typeId, classId, typeId_in, classId_in;
58        long long timeLine_out;
59        long long timeLine_in( timeLine );
60        typeId_in=event.getTypeId() ;
61        classId_in=event.getClassId() ;
62//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
63        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
64        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
65        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
66        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
67        {
68           ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
69               << "Event are not coherent between client for timeline = "<<timeLine);
70        }
71       
72        vector<int> servers(serverSize,0) ;
73        auto ranks=event.getRanks() ;
74        for(auto& rank : ranks) servers[rank]=1 ;
75        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
76        ostringstream osstr ;
77        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
78        if (!osstr.str().empty())
79        {
80          ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
81                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
82                 <<"Servers are : "<<osstr.str()) ;
83        }
84
85
86      }
87
88      if (!event.isEmpty())
89      {
90        list<int> sizes = event.getSizes();
91
92         // We force the getBuffers call to be non-blocking on classical servers
93        list<CBufferOut*> buffList;
94        getBuffers(timeLine, ranks, sizes, buffList) ;
95
96        event.send(timeLine, sizes, buffList);
97       
98        //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ;
99
100        unlockBuffers(ranks) ;
101        checkBuffers(ranks);
102       
103      }
104     
105      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
106      {
107        while (checkBuffers(ranks)) callGlobalEventLoop() ;
108     
109        CXios::getDaemonsManager()->scheduleContext(hashId_) ;
110        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ;
111      }
112     
113      MPI_Request req ;
114      MPI_Status status ;
115      MPI_Ibarrier(intraComm,&req) ;
116      int flag ;
117      MPI_Test(&req,&flag,&status) ;
118      while(!flag) 
119      {
120        callGlobalEventLoop() ;
121        MPI_Test(&req,&flag,&status) ;
122      }
123
124
125      timeLine++;
126    }
127
128
129    /*!
130     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
131     * it is explicitly requested to be non-blocking.
132     *
133     *
134     * \param [in] timeLine time line of the event which will be sent to servers
135     * \param [in] serverList list of rank of connected server
136     * \param [in] sizeList size of message corresponding to each connection
137     * \param [out] retBuffers list of buffers that can be used to store an event
138     * \param [in] nonBlocking whether this function should be non-blocking
139     * \return whether the already allocated buffers could be used
140    */
141    void CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers)
142    {
143      list<int>::const_iterator itServer, itSize;
144      list<CClientBuffer*> bufferList;
145      map<int,CClientBuffer*>::const_iterator it;
146      list<CClientBuffer*>::iterator itBuffer;
147      bool areBuffersFree;
148     
149      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
150      {
151        it = buffers.find(*itServer);
152        if (it == buffers.end())
153        {
154          CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ;
155          size_t token = tokenManager->getToken() ;
156          while (!tokenManager->lockToken(token)) callGlobalEventLoop() ;
157          newBuffer(*itServer);
158          it = buffers.find(*itServer);
159          checkAttachWindows(it->second,it->first) ;
160          tokenManager->unlockToken(token) ;
161        }
162        bufferList.push_back(it->second);
163      }
164
165      double lastTimeBuffersNotFree=0. ;
166      double time ;
167      bool doUnlockBuffers ;
168      CTimer::get("Blocking time").resume();
169      do
170      {
171        areBuffersFree = true;
172        doUnlockBuffers=false ;
173        time=MPI_Wtime() ;
174        if (time-lastTimeBuffersNotFree > latency_)
175        {
176          for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
177          {
178            areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
179          }
180          if (!areBuffersFree)
181          {
182            lastTimeBuffersNotFree = time ;
183            doUnlockBuffers=true ;
184          }         
185        }
186        else areBuffersFree = false ;
187
188        if (!areBuffersFree)
189        {
190          if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer();
191          checkBuffers();
192
193          callGlobalEventLoop() ;
194        }
195
196      } while (!areBuffersFree);
197      CTimer::get("Blocking time").suspend();
198
199      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
200        retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize));
201   }
202
203   void CLegacyContextClient::eventLoop(void)
204   {
205      if (!locked_) checkBuffers() ;
206   }
207
208   void CLegacyContextClient::callGlobalEventLoop(void)
209   {
210     locked_=true ;
211     context_->globalEventLoop() ;
212     locked_=false ;
213   }
214   /*!
215   Make a new buffer for a certain connection to server with specific rank
216   \param [in] rank rank of connected server
217   */
218   void CLegacyContextClient::newBuffer(int rank)
219   {
220      if (!mapBufferSize_.count(rank))
221      {
222        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
223        mapBufferSize_[rank] = CXios::minBufferSize;
224        maxEventSizes[rank] = CXios::minBufferSize;
225      }
226     
227      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank]);
228      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ;
229      else buffer->fixBuffer() ;
230      // Notify the server
231      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint));
232      MPI_Aint sendBuff[4] ;
233      sendBuff[0]=hashId_;
234      sendBuff[1]=mapBufferSize_[rank];
235      sendBuff[2]=buffers[rank]->getWinAddress(0); 
236      sendBuff[3]=buffers[rank]->getWinAddress(1); 
237      info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl;
238      bufOut->put(sendBuff, 4); 
239      buffer->checkBuffer(true);
240/*
241       // create windows dynamically for one-sided
242      if (!isAttachedModeEnabled())
243      {
244        CTimer::get("create Windows").resume() ;
245        MPI_Comm interComm ;
246        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
247        MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
248        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
249        MPI_Comm_free(&interComm) ;
250        windows_[rank].resize(2) ;
251       
252        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
253        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
254       
255        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
256        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
257
258        CTimer::get("create Windows").suspend() ;
259      }
260      else
261      {
262        winComm_[rank] = MPI_COMM_NULL ;
263        windows_[rank].resize(2) ;
264        windows_[rank][0] = MPI_WIN_NULL ;
265        windows_[rank][1] = MPI_WIN_NULL ;
266      }
267      buffer->attachWindows(windows_[rank]) ;
268      if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ;
269  */     
270   }
271
272   void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank)
273   {
274      if (!buffer->isAttachedWindows())
275      {
276           // create windows dynamically for one-sided
277        if (!isAttachedModeEnabled())
278        { 
279          CTimer::get("create Windows").resume() ;
280          MPI_Comm interComm ;
281          MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
282          MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
283          CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
284          MPI_Comm_free(&interComm) ;
285          windows_[rank].resize(2) ;
286     
287          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
288          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
289     
290          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
291          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
292
293          CTimer::get("create Windows").suspend() ;
294          buffer->attachWindows(windows_[rank]) ;
295          MPI_Barrier(winComm_[rank]) ;
296        }
297        else
298        {
299          winComm_[rank] = MPI_COMM_NULL ;
300          windows_[rank].resize(2) ;
301          windows_[rank][0] = MPI_WIN_NULL ;
302          windows_[rank][1] = MPI_WIN_NULL ;
303          buffer->attachWindows(windows_[rank]) ;
304        }
305
306      }
307    }
308
309
310 
311   /*!
312   Verify state of buffers. Buffer is under pending state if there is no message on it
313   \return state of buffers, pending(true), ready(false)
314   */
315   bool CLegacyContextClient::checkBuffers(void)
316   {
317      map<int,CClientBuffer*>::iterator itBuff;
318      bool pending = false;
319      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
320        pending |= itBuff->second->checkBuffer(!pureOneSided);
321      return pending;
322   }
323
324   //! Release all buffers
325   void CLegacyContextClient::releaseBuffers()
326   {
327      map<int,CClientBuffer*>::iterator itBuff;
328      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
329      {
330         delete itBuff->second;
331      }
332      buffers.clear();
333
334// don't know when release windows
335
336      //if (!isAttachedModeEnabled())
337      //{ 
338      //  for(auto& it : winComm_)
339      //  {
340      //    int rank = it.first ;
341      //    MPI_Win_free(&windows_[rank][0]);
342      //    MPI_Win_free(&windows_[rank][1]);
343      //    MPI_Comm_free(&winComm_[rank]) ;
344      //  }
345      //}
346   }
347
348     
349  /*!
350   Lock the buffers for one sided communications
351   \param [in] ranks list rank of server to which client connects to
352   */
353   void CLegacyContextClient::lockBuffers(list<int>& ranks)
354   {
355      list<int>::iterator it;
356      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer();
357   }
358
359  /*!
360   Unlock the buffers for one sided communications
361   \param [in] ranks list rank of server to which client connects to
362   */
363   void CLegacyContextClient::unlockBuffers(list<int>& ranks)
364   {
365      list<int>::iterator it;
366      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer();
367   }
368     
369   /*!
370   Verify state of buffers corresponding to a connection
371   \param [in] ranks list rank of server to which client connects to
372   \return state of buffers, pending(true), ready(false)
373   */
374   bool CLegacyContextClient::checkBuffers(list<int>& ranks)
375   {
376      list<int>::iterator it;
377      bool pending = false;
378      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided);
379      return pending;
380   }
381
382   /*!
383    * Set the buffer size for each connection. Warning: This function is collective.
384    *
385    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
386    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
387   */
388   void CLegacyContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
389   {
390     setFixedBuffer() ;
391     for(auto& it : mapSize)
392     {
393      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ;
394      mapBufferSize_[it.first]=size ;
395      if (buffers.count(it.first)>0) buffers[it.first]->fixBufferSize(size);
396     }
397   }
398
399   /*!
400   * Finalize context client and do some reports. Function is non-blocking.
401   */
402  void CLegacyContextClient::finalize(void)
403  {
404    map<int,CClientBuffer*>::iterator itBuff;
405    std::list<int>::iterator ItServerLeader; 
406   
407    bool stop = false;
408
409    int* nbServerConnectionLocal  = new int[serverSize] ;
410    int* nbServerConnectionGlobal  = new int[serverSize] ;
411    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
412    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
413    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
414   
415    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
416   
417    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
418    CMessage msg;
419
420    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
421    sendEvent(event);
422
423    delete[] nbServerConnectionLocal ;
424    delete[] nbServerConnectionGlobal ;
425
426
427    CTimer::get("Blocking time").resume();
428    checkBuffers();
429    CTimer::get("Blocking time").suspend();
430
431    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
432                                          iteMap = mapBufferSize_.end(), itMap;
433
434    StdSize totalBuf = 0;
435    for (itMap = itbMap; itMap != iteMap; ++itMap)
436    {
437      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
438                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
439      totalBuf += itMap->second;
440    }
441    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
442
443  }
444
445
446  /*!
447  */
448  bool CLegacyContextClient::havePendingRequests(void)
449  {
450    bool pending = false;
451    map<int,CClientBuffer*>::iterator itBuff;
452    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
453      pending |= itBuff->second->hasPendingRequest();
454    return pending;
455  }
456 
457  bool CLegacyContextClient::havePendingRequests(list<int>& ranks)
458  {
459      list<int>::iterator it;
460      bool pending = false;
461      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->hasPendingRequest();
462      return pending;
463  }
464
465  bool CLegacyContextClient::isNotifiedFinalized(void)
466  {
467    if (isAttachedModeEnabled()) return true ;
468
469    bool finalized = true;
470    map<int,CClientBuffer*>::iterator itBuff;
471    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
472      finalized &= itBuff->second->isNotifiedFinalized();
473    return finalized;
474  }
475
476}
Note: See TracBrowser for help on using the repository browser.