source: XIOS3/trunk/src/transport/one_sided_context_client.cpp @ 2399

Last change on this file since 2399 was 2399, checked in by ymipsl, 21 months ago

-Fix performance issue in one_sided protocol

  • better timer instrumentation of the protocol

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.5 KB
Line 
1#include "xios_spl.hpp"
2#include "one_sided_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "type.hpp"
7#include "event_client.hpp"
8#include "context.hpp"
9#include "mpi.hpp"
10#include "timer.hpp"
11#include "cxios.hpp"
12#include "server.hpp"
13#include "services.hpp"
14#include <boost/functional/hash.hpp>
15#include <random>
16#include <chrono>
17
18namespace xios
19{
20    /*!
21    \param [in] parent Pointer to context on client side
22    \param [in] intraComm_ communicator of group client
23    \param [in] interComm_ communicator of group server
24    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
25    */
26    COneSidedContextClient::COneSidedContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
27     : CContextClient(parent, intraComm_, interComm_, cxtSer),
28       mapBufferSize_(), maxBufferedEvents(4)
29    {
30     
31      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
32      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
33
34      if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
35     
36      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
37
38      timeLine = 1;
39    }
40
41
42    /*!
43    In case of attached mode, the current context must be reset to context for client
44    \param [in] event Event sent to server
45    */
46    void COneSidedContextClient::sendEvent(CEventClient& event)
47    {
48      list<int> ranks = event.getRanks();
49 
50//      ostringstream str ;
51//      for(auto& rank : ranks) str<<rank<<" ; " ;
52//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
53
54      if (CXios::checkEventSync)
55      {
56        int typeId, classId, typeId_in, classId_in;
57        long long timeLine_out;
58        long long timeLine_in( timeLine );
59        typeId_in=event.getTypeId() ;
60        classId_in=event.getClassId() ;
61//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
62        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
63        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
64        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
65        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
66        {
67           ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
68               << "Event are not coherent between client for timeline = "<<timeLine);
69        }
70       
71        vector<int> servers(serverSize,0) ;
72        auto ranks=event.getRanks() ;
73        for(auto& rank : ranks) servers[rank]=1 ;
74        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
75        ostringstream osstr ;
76        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
77        if (!osstr.str().empty())
78        {
79          ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
80                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
81                 <<"Servers are : "<<osstr.str()) ;
82        }
83
84
85      }
86     
87      event.setFirst() ;
88      while(!event.isEmpty())
89      {
90        int rank=event.getRank() ;
91        auto itBuffer=buffers.find(rank) ;
92        if (itBuffer==buffers.end()) 
93        { 
94          newBuffer(rank) ;
95          itBuffer=buffers.find(rank) ;
96        }
97        itBuffer->second->eventLoop() ;
98        double time=CTimer::getTime() ;
99        bool succed = itBuffer->second->writeEvent(timeLine, event)  ;
100        if (succed) 
101        {
102          time=CTimer::getTime()-time ;
103          if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").minus(time) ;
104        }
105
106        if (succed) event.remove() ;
107        else event.next() ;
108        if (event.isFirst())
109        {
110          if (CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").resume() ;
111          callGlobalEventLoop() ;
112        } 
113      }
114      if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").suspend() ;
115
116      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
117      {
118        while (checkBuffers(ranks)) callGlobalEventLoop() ;
119     
120        CXios::getDaemonsManager()->scheduleContext(hashId_) ;
121        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ;
122      }
123     
124      timeLine++;
125    }
126
127
128   void COneSidedContextClient::eventLoop(void)
129   {
130      if (!locked_) checkBuffers() ;
131   }
132
133   void COneSidedContextClient::callGlobalEventLoop(void)
134   {
135     locked_=true ;
136     context_->globalEventLoop() ;
137     locked_=false ;
138   }
139   /*!
140   Make a new buffer for a certain connection to server with specific rank
141   \param [in] rank rank of connected server
142   */
143   void COneSidedContextClient::newBuffer(int rank)
144   {
145      if (!mapBufferSize_.count(rank))
146      {
147        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
148        mapBufferSize_[rank] = CXios::minBufferSize;
149        maxEventSizes[rank] = CXios::minBufferSize;
150      }
151
152      COneSidedClientBuffer* buffer = buffers[rank] = new COneSidedClientBuffer(interComm, rank, commSelf_, interCommMerged_, clientSize+rank );
153      if (isGrowableBuffer_) { buffer->setGrowable(growingFactor_) ; }
154      else buffer->setFixed(mapBufferSize_[rank]) ;
155 
156   }
157
158   /*!
159   Verify state of buffers. Buffer is under pending state if there is no message on it
160   \return state of buffers, pending(true), ready(false)
161   */
162   bool COneSidedContextClient::checkBuffers(void)
163   {
164      bool pending = false;
165      for (auto itBuff : buffers)
166      {
167        itBuff.second->eventLoop() ;
168        pending |= itBuff.second->isEmpty();
169      }
170      return pending;
171   }
172
173   //! Release all buffers
174   void COneSidedContextClient::releaseBuffers()
175   {
176      for (auto& itBuff : buffers) delete itBuff.second;
177      buffers.clear();
178   }
179
180
181   /*!
182   Verify state of buffers corresponding to a connection
183   \param [in] ranks list rank of server to which client connects to
184   \return state of buffers, pending(true), ready(false)
185   */
186   bool COneSidedContextClient::checkBuffers(list<int>& ranks)
187   {
188      bool pending = false;
189      for (auto& rank : ranks) 
190      {
191        buffers[rank]->eventLoop() ;
192        pending |= buffers[rank]->isEmpty() ;
193      }
194      return pending;
195   }
196
197   /*!
198    * Set the buffer size for each connection. Warning: This function is collective.
199    *
200    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
201    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
202   */
203   void COneSidedContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
204   {
205     setFixedBuffer() ;
206     for(auto& it : mapSize)
207     {
208      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) * 8 ; // double
209      mapBufferSize_[it.first]=size ;
210      if (buffers.count(it.first)>0) buffers[it.first]->setFixed(size);
211     }
212   }
213
214
215   /*!
216   * Finalize context client and do some reports. Function is non-blocking.
217   */
218  void COneSidedContextClient::finalize(void)
219  {
220    bool stop = false;
221
222    int* nbServerConnectionLocal  = new int[serverSize] ;
223    int* nbServerConnectionGlobal  = new int[serverSize] ;
224    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
225    for (auto itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
226    for (auto ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
227   
228    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
229   
230    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
231    CMessage msg;
232
233    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
234    sendEvent(event);
235
236    delete[] nbServerConnectionLocal ;
237    delete[] nbServerConnectionGlobal ;
238
239
240    CTimer::get("Blocking time").resume();
241    checkBuffers();
242    CTimer::get("Blocking time").suspend();
243
244    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
245                                          iteMap = mapBufferSize_.end(), itMap;
246
247    StdSize totalBuf = 0;
248    for (itMap = itbMap; itMap != iteMap; ++itMap)
249    {
250      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
251                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
252      totalBuf += itMap->second;
253    }
254    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
255
256  }
257
258
259  /*!
260  */
261  bool COneSidedContextClient::havePendingRequests(void)
262  {
263    return checkBuffers();
264  }
265 
266  bool COneSidedContextClient::havePendingRequests(list<int>& ranks)
267  {
268    return checkBuffers(ranks) ;
269  }
270
271  bool COneSidedContextClient::isNotifiedFinalized(void)
272  {
273    if (isAttachedModeEnabled()) return true ;
274
275    bool finalized = true;
276    for (auto& it : buffers ) finalized &= it.second->isNotifiedFinalized();
277    return finalized;
278  }
279
280}
Note: See TracBrowser for help on using the repository browser.