source: XIOS3/trunk/src/transport/one_sided_context_client.cpp @ 2547

Last change on this file since 2547 was 2547, checked in by ymipsl, 9 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.5 KB
Line 
1#include "xios_spl.hpp"
2#include "one_sided_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "type.hpp"
7#include "event_client.hpp"
8#include "context.hpp"
9#include "mpi.hpp"
10#include "timer.hpp"
11#include "cxios.hpp"
12#include "server.hpp"
13#include "services.hpp"
14#include <boost/functional/hash.hpp>
15#include <random>
16#include <chrono>
17
18namespace xios
19{
20    /*!
21    \param [in] parent Pointer to context on client side
22    \param [in] intraComm_ communicator of group client
23    \param [in] interComm_ communicator of group server
24    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode --> obsolete ).
25    */
26    COneSidedContextClient::COneSidedContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
27     : CContextClient(parent, intraComm_, interComm_, cxtSer),
28       mapBufferSize_(), maxBufferedEvents(4)
29    {
30     
31      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
32
33      MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
34     
35      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
36      eventScheduler_ = parent->getEventScheduler() ; 
37      timeLine = 1;
38    }
39
40
41    /*!
42    In case of attached mode, the current context must be reset to context for client
43    \param [in] event Event sent to server
44    */
45    void COneSidedContextClient::sendEvent(CEventClient& event)
46    {
47      list<int> ranks = event.getRanks();
48 
49//      ostringstream str ;
50//      for(auto& rank : ranks) str<<rank<<" ; " ;
51//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
52
53      if (CXios::checkEventSync)
54      {
55        int typeId, classId, typeId_in, classId_in;
56        long long timeLine_out;
57        long long timeLine_in( timeLine );
58        typeId_in=event.getTypeId() ;
59        classId_in=event.getClassId() ;
60//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
61        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
62        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
63        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
64        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
65        {
66           ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
67               << "Event are not coherent between client for timeline = "<<timeLine);
68        }
69       
70        vector<int> servers(serverSize,0) ;
71        auto ranks=event.getRanks() ;
72        for(auto& rank : ranks) servers[rank]=1 ;
73        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
74        ostringstream osstr ;
75        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
76        if (!osstr.str().empty())
77        {
78          ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
79                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
80                 <<"Servers are : "<<osstr.str()) ;
81        }
82
83
84      }
85     
86      event.setFirst() ;
87      while(!event.isEmpty())
88      {
89        int rank=event.getRank() ;
90        auto itBuffer=buffers.find(rank) ;
91        if (itBuffer==buffers.end()) 
92        { 
93          newBuffer(rank) ;
94          itBuffer=buffers.find(rank) ;
95        }
96        itBuffer->second->eventLoop() ;
97        double time=CTimer::getTime() ;
98        bool succed = itBuffer->second->writeEvent(timeLine, event)  ;
99        if (succed) 
100        {
101          time=CTimer::getTime()-time ;
102          if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").minus(time) ;
103        }
104
105        if (succed) event.remove() ;
106        else event.next() ;
107        if (event.isFirst())
108        {
109          if (CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").resume() ;
110          yield() ;
111        } 
112      }
113      if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").suspend() ;
114
115
116      synchronize() ;
117     
118      timeLine++;
119    }
120
121
122   void COneSidedContextClient::eventLoop(void)
123   {
124      if (!locked_) checkBuffers() ;
125   }
126
127   void COneSidedContextClient::callGlobalEventLoop(void)
128   {
129     locked_=true ;
130     context_->globalEventLoop() ;
131     locked_=false ;
132   }
133
134   void COneSidedContextClient::yield(void)
135   {
136     locked_=true ;
137     context_->yield() ;
138     locked_=false ;
139   }
140
141   void COneSidedContextClient::synchronize(void)
142   {
143     if (context_->getServiceType()!=CServicesManager::CLIENT)
144     {
145       locked_=true ;
146       context_->synchronize() ;
147       locked_=false ;
148     }   
149   }
150
151   /*!
152   Make a new buffer for a certain connection to server with specific rank
153   \param [in] rank rank of connected server
154   */
155   void COneSidedContextClient::newBuffer(int rank)
156   {
157      if (!mapBufferSize_.count(rank))
158      {
159        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
160        mapBufferSize_[rank] = CXios::minBufferSize;
161        maxEventSizes[rank] = CXios::minBufferSize;
162      }
163
164      COneSidedClientBuffer* buffer = buffers[rank] = new COneSidedClientBuffer(interComm, rank, commSelf_, interCommMerged_, clientSize+rank );
165      if (isGrowableBuffer_) { buffer->setGrowable(growingFactor_) ; }
166      else buffer->setFixed(mapBufferSize_[rank]) ;
167 
168   }
169
170   /*!
171   Verify state of buffers. Buffer is under pending state if there is no message on it
172   \return state of buffers, pending(true), ready(false)
173   */
174   bool COneSidedContextClient::checkBuffers(void)
175   {
176      bool pending = false;
177      for (auto itBuff : buffers)
178      {
179        itBuff.second->eventLoop() ;
180        pending |= itBuff.second->isEmpty();
181      }
182      return pending;
183   }
184
185   //! Release all buffers
186   void COneSidedContextClient::releaseBuffers()
187   {
188      for (auto& itBuff : buffers) delete itBuff.second;
189      buffers.clear();
190   }
191
192
193   /*!
194   Verify state of buffers corresponding to a connection
195   \param [in] ranks list rank of server to which client connects to
196   \return state of buffers, pending(true), ready(false)
197   */
198   bool COneSidedContextClient::checkBuffers(list<int>& ranks)
199   {
200      bool pending = false;
201      for (auto& rank : ranks) 
202      {
203        buffers[rank]->eventLoop() ;
204        pending |= buffers[rank]->isEmpty() ;
205      }
206      return pending;
207   }
208
209   /*!
210    * Set the buffer size for each connection. Warning: This function is collective.
211    *
212    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
213    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
214   */
215   void COneSidedContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
216   {
217     setFixedBuffer() ;
218     for(auto& it : mapSize)
219     {
220      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) * 8 ; // double
221      mapBufferSize_[it.first]=size ;
222      if (buffers.count(it.first)>0) buffers[it.first]->setFixed(size);
223     }
224   }
225
226
227   /*!
228   * Finalize context client and do some reports. Function is non-blocking.
229   */
230  void COneSidedContextClient::finalize(void)
231  {
232    bool stop = false;
233
234    int* nbServerConnectionLocal  = new int[serverSize] ;
235    int* nbServerConnectionGlobal  = new int[serverSize] ;
236    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
237    for (auto itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
238    for (auto ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
239   
240    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
241   
242    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
243    CMessage msg;
244
245    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
246    sendEvent(event);
247
248    delete[] nbServerConnectionLocal ;
249    delete[] nbServerConnectionGlobal ;
250
251
252    CTimer::get("Blocking time").resume();
253    checkBuffers();
254    CTimer::get("Blocking time").suspend();
255
256    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
257                                          iteMap = mapBufferSize_.end(), itMap;
258
259    StdSize totalBuf = 0;
260    for (itMap = itbMap; itMap != iteMap; ++itMap)
261    {
262      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
263                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
264      totalBuf += itMap->second;
265    }
266    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
267
268  }
269
270
271  /*!
272  */
273  bool COneSidedContextClient::havePendingRequests(void)
274  {
275    return checkBuffers();
276  }
277 
278  bool COneSidedContextClient::havePendingRequests(list<int>& ranks)
279  {
280    return checkBuffers(ranks) ;
281  }
282
283  bool COneSidedContextClient::isNotifiedFinalized(void)
284  {
285
286    bool finalized = true;
287    for (auto& it : buffers ) finalized &= it.second->isNotifiedFinalized();
288    return finalized;
289  }
290
291}
Note: See TracBrowser for help on using the repository browser.