- Timestamp:
- 04/20/21 09:49:47 (3 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src
- Files:
-
- 1 added
- 21 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp
r1757 r2130 7 7 #include "mpi.hpp" 8 8 #include "tracer.hpp" 9 #include "timeline_events.hpp" 9 10 10 11 namespace xios … … 200 201 201 202 lockBuffer(); 203 count=*bufferCount[current] ; 204 205 if (resizingBufferStep_ > 0 ) return false ; 206 202 207 if (size > bufferSize) 203 ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 204 << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl); 208 { 209 // ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 210 // << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl); 211 resizingBufferStep_=1 ; 212 newBufferSize_=size ; 213 return false ; 214 } 205 215 206 216 if (size > maxEventSize) … … 214 224 if (size > maxRequestSize) maxRequestSize = size; 215 225 } 216 217 count=*bufferCount[current] ; 218 return (size <= remain()); 226 227 if (size > remain()) 228 { 229 if (isGrowableBuffer_) 230 { 231 resizingBufferStep_ = 1 ; 232 newBufferSize_ = (count+size)*growFactor_ ; 233 } 234 return false ; 235 } 236 else return true ; 219 237 } 220 238 … … 276 294 if (!pending) 277 295 { 278 if (!send) return false ; 296 if (!send && resizingBufferStep_==0 ) return false ; 297 279 298 if (count > 0) 280 299 { … … 284 303 { 285 304 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 305 if (resizingBufferStep_==3) resizingBufferStep_=0 ; 286 306 pending = true; 287 307 // *control[current]=0 ; … … 295 315 count = 0; 296 316 } 297 else unlockBuffer() ; 317 else 318 { 319 unlockBuffer() ; 320 } 298 321 } 322 else 323 { 324 if (resizingBufferStep_==2) resizeBuffer(newBufferSize_) ; 325 if (resizingBufferStep_==1) resizeBufferNotify() ; 326 } 299 327 } 300 328 301 329 return pending; 330 } 331 332 void CClientBuffer::resizeBufferNotify(void) 333 { 334 // notify server of changing buffers size 335 lockBuffer() ; 336 int size=sizeof(int)+sizeof(size_t) ; 337 CBufferOut* bufOut = this->getBuffer(timelineEventNotifyChangeBufferSize, size); 338 bufOut->put(size); 339 bufOut->put(timelineEventNotifyChangeBufferSize); 340 resizingBufferStep_ = 2 ; 341 unlockBuffer() ; 342 } 343 344 void CClientBuffer::resizeBuffer(size_t newSize) 345 { 346 if (hasWindows) 347 { 348 MPI_Win_detach(windows_[0], bufferHeader[0]) ; 349 MPI_Win_detach(windows_[1], bufferHeader[1]) ; 350 } 351 MPI_Free_mem(bufferHeader[0]) ; 352 MPI_Free_mem(bufferHeader[1]) ; 353 354 bufferSize=newSize ; 355 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 356 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 357 buffer[0] = bufferHeader[0]+headerSize ; 358 buffer[1] = bufferHeader[1]+headerSize ; 359 firstTimeLine[0]=(size_t*)bufferHeader[0] ; 360 firstTimeLine[1]=(size_t*)bufferHeader[1] ; 361 bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 362 bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 363 control[0]=(size_t*)bufferHeader[0] +2 ; 364 control[1]=(size_t*)bufferHeader[1] +2 ; 365 finalize[0]=(size_t*)bufferHeader[0] +3 ; 366 finalize[1]=(size_t*)bufferHeader[1] +3 ; 367 368 *firstTimeLine[0]=0 ; 369 *firstTimeLine[1]=0 ; 370 *bufferCount[0]=0 ; 371 *bufferCount[1]=0 ; 372 *control[0]=0 ; 373 *control[1]=0 ; 374 *finalize[0]=0 ; 375 *finalize[1]=0 ; 376 winState[0]=false ; 377 winState[1]=false ; 378 current=0 ; 379 380 if (hasWindows) 381 { 382 383 MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; 384 MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; 385 386 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 387 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 388 389 MPI_Win_unlock(clientRank_, windows_[1]) ; 390 MPI_Win_unlock(clientRank_, windows_[0]) ; 391 } 392 393 lockBuffer() ; 394 395 int size=sizeof(int)+2*sizeof(size_t)+2*sizeof(MPI_AINT) ; 396 CBufferOut* bufOut = this->getBuffer(timelineEventChangeBufferSize, size); 397 bufOut->put(size); 398 bufOut->put(timelineEventChangeBufferSize); 399 bufOut->put(newBufferSize_); 400 bufOut->put(this->getWinAddress(0)); 401 bufOut->put(this->getWinAddress(1)); 402 403 resizingBufferStep_=3; 404 unlockBuffer() ; 302 405 } 303 406 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.hpp
r1757 r2130 29 29 void infoBuffer(void) ; 30 30 bool isNotifiedFinalized(void) ; 31 void setGrowableBuffer(double growFactor) { growFactor_=growFactor ; isGrowableBuffer_=true ;} 32 void fixBufferSize(size_t bufferSize) { newBufferSize_=bufferSize ; isGrowableBuffer_=false ; resizingBufferStep_=1 ;} 33 void fixBuffer(void) { isGrowableBuffer_=false ;} 31 34 private: 35 void resizeBuffer(size_t newSize) ; 36 void resizeBufferNotify(void) ; 37 38 32 39 char* buffer[2]; 33 40 char* bufferHeader[2]; … … 38 45 bool winState[2] ; 39 46 int current; 47 48 double growFactor_=1.2 ; 49 bool isGrowableBuffer_=true ; 40 50 51 int resizingBufferStep_ = 0 ; 52 size_t newBufferSize_ ; 41 53 StdSize count; 42 54 StdSize maxEventSize; 43 constStdSize bufferSize;55 StdSize bufferSize; 44 56 const StdSize estimatedMaxEventSize; 45 57 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.cpp
r1757 r2130 15 15 end = size; 16 16 used=0 ; 17 buffer = new char[size]; // use MPI_ALLOC_MEM later?17 MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ; 18 18 currentWindows=1 ; 19 19 if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; … … 22 22 CServerBuffer::~CServerBuffer() 23 23 { 24 delete [] buffer;24 MPI_Free_mem(buffer) ; 25 25 } 26 26 … … 222 222 bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 223 223 { 224 if (!hasWindows ) return false ;224 if (!hasWindows || resizingBuffer_) return false ; 225 225 226 226 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.hpp
r1757 r2130 26 26 void unlockBuffer(void) ; 27 27 void notifyClientFinalize(void) ; 28 void notifyBufferResizing(void) { resizingBuffer_=true ;} 28 29 private: 29 30 char* buffer; … … 35 36 std::vector<MPI_Win> windows_ ; 36 37 std::vector<MPI_Aint> winAddress_ ; 37 38 bool resizingBuffer_ = false ; 38 39 int currentWindows ; 39 40 bool hasWindows ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.cpp
r1853 r2130 12 12 #include "cxios.hpp" 13 13 #include "server.hpp" 14 #include "services.hpp" 15 #include <boost/functional/hash.hpp> 16 #include <random> 17 #include <chrono> 14 18 15 19 namespace xios … … 25 29 { 26 30 27 context = parent;31 context_ = parent; 28 32 intraComm = intraComm_; 29 33 interComm = interComm_; … … 66 70 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 67 71 72 auto time=chrono::system_clock::now().time_since_epoch().count() ; 73 std::default_random_engine rd(time); // not reproducible from a run to another 74 std::uniform_int_distribution<size_t> dist; 75 hashId_=dist(rd) ; 76 MPI_Bcast(&hashId_,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context 77 68 78 timeLine = 1; 69 79 } … … 124 134 { 125 135 list<int> ranks = event.getRanks(); 126 info(100)<<"Event "<<timeLine<<" of context "<<context ->getId()<<endl ;136 info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<endl ; 127 137 if (CXios::checkEventSync) 128 138 { … … 154 164 155 165 unlockBuffers(ranks) ; 156 info(100)<<"Event "<<timeLine<<" of context "<<context ->getId()<<" sent"<<endl ;166 info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<" sent"<<endl ; 157 167 158 168 checkBuffers(ranks); … … 161 171 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 162 172 { 163 waitEvent(ranks); 164 CContext::setCurrent(context->getId()); 173 while (checkBuffers(ranks)) context_->globalEventLoop() ; 174 175 CXios::getDaemonsManager()->scheduleContext(hashId_) ; 176 while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) context_->globalEventLoop() ; 165 177 } 166 178 … … 177 189 while (checkBuffers(ranks)) 178 190 { 179 CXios::getDaemonsManager()->eventLoop() ;191 context_->eventLoop() ; 180 192 } 181 193 … … 256 268 for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 257 269 checkBuffers(); 258 270 /* 259 271 context->server->listen(); 260 272 … … 265 277 context->globalEventLoop() ; 266 278 } 267 268 } 279 */ 280 context_->globalEventLoop() ; 281 } 282 269 283 } while (!areBuffersFree && !nonBlocking); 270 284 CTimer::get("Blocking time").suspend(); … … 295 309 296 310 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, Wins, clientRank, rank, mapBufferSize_[rank], maxEventSizes[rank]); 311 if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ; 312 else buffer->fixBuffer() ; 297 313 // Notify the server 298 CBufferOut* bufOut = buffer->getBuffer(0, 3*sizeof(MPI_Aint)); 299 MPI_Aint sendBuff[3] ; 300 sendBuff[0]=mapBufferSize_[rank]; // Stupid C++ 301 sendBuff[1]=buffers[rank]->getWinAddress(0); 302 sendBuff[2]=buffers[rank]->getWinAddress(1); 314 CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint)); 315 MPI_Aint sendBuff[4] ; 316 sendBuff[0]=hashId_; 317 sendBuff[1]=mapBufferSize_[rank]; 318 sendBuff[2]=buffers[rank]->getWinAddress(0); 319 sendBuff[3]=buffers[rank]->getWinAddress(1); 303 320 info(100)<<"CContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl; 304 bufOut->put(sendBuff, 3); // Stupid C++321 bufOut->put(sendBuff, 4); 305 322 buffer->checkBuffer(true); 306 323 … … 383 400 * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event 384 401 */ 385 void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize) 386 { 387 mapBufferSize_ = mapSize; 388 maxEventSizes = maxEventSize; 402 void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize) 403 { 404 for(auto& it : mapSize) {buffers[it.first]->fixBufferSize(std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0));} 389 405 } 390 406 … … 463 479 for (itMap = itbMap; itMap != iteMap; ++itMap) 464 480 { 465 report(10) << " Memory report : Context <" << context ->getId() << "> : client side : memory used for buffer of each connection to server" << endl481 report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl 466 482 << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 467 483 totalBuf += itMap->second; 468 484 } 469 report(0) << " Memory report : Context <" << context ->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;485 report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 470 486 471 487 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.hpp
r1918 r2130 63 63 void finalize(void); 64 64 65 void setBufferSize(const std::map<int,StdSize>& mapSize , const std::map<int,StdSize>& maxEventSize);65 void setBufferSize(const std::map<int,StdSize>& mapSize); 66 66 67 67 int getRemoteSize(void) {return serverSize;} … … 75 75 /*! get the associated server (dual chanel client/server) */ 76 76 CContextServer* getAssociatedServer(void) { return associatedServer_;} 77 77 void setGrowableBuffer(void) { isGrowableBuffer_=true;} 78 void setFixedBuffer(void) { isGrowableBuffer_=false;} 78 79 public: 79 CContext* context ; //!< Context for client80 CContext* context_; //!< Context for client 80 81 81 82 size_t timeLine; //!< Timeline of each event … … 98 99 99 100 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 101 102 size_t hashId_ ; //!< hash id on the context client that will be used for context server to identify the remote calling context client. 100 103 101 104 private: … … 122 125 bool isAttached_ ; 123 126 CContextServer* associatedServer_ ; //!< The server associated to the pair client/server 124 127 bool isGrowableBuffer_ = true ; 125 128 }; 126 129 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp
r2123 r2130 20 20 #include "services.hpp" 21 21 #include "contexts_manager.hpp" 22 #include "timeline_events.hpp" 22 23 23 24 #include <boost/functional/hash.hpp> … … 188 189 if (it==buffers.end()) // Receive the buffer size and allocate the buffer 189 190 { 190 MPI_Aint recvBuff[3] ; 191 MPI_Recv(recvBuff, 3, MPI_AINT, rank, 20, interComm, &status); 192 StdSize buffSize = recvBuff[0]; 191 MPI_Aint recvBuff[4] ; 192 MPI_Recv(recvBuff, 4, MPI_AINT, rank, 20, interComm, &status); 193 remoteHashId_ = recvBuff[0] ; 194 StdSize buffSize = recvBuff[1]; 193 195 vector<MPI_Aint> winAdress(2) ; 194 winAdress[0]=recvBuff[ 1] ; winAdress[1]=recvBuff[2] ;196 winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 195 197 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 196 198 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; … … 290 292 CBufferIn newBuffer(startBuffer,buffer.remain()); 291 293 newBuffer>>size>>timeLine; 292 it=events.find(timeLine); 293 if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first; 294 it->second->push(rank,buffers[rank],startBuffer,size); 295 294 295 if (timeLine==timelineEventNotifyChangeBufferSize) 296 { 297 buffers[rank]->notifyBufferResizing() ; 298 buffers[rank]->updateCurrentWindows() ; 299 } 300 else if (timeLine==timelineEventChangeBufferSize) 301 { 302 size_t newSize ; 303 vector<MPI_Aint> winAdress(2) ; 304 newBuffer>>newSize>>winAdress[0]>>winAdress[1] ; 305 buffers.erase(rank) ; 306 buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, newSize))); 307 } 308 else 309 { 310 it=events.find(timeLine); 311 if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first; 312 it->second->push(rank,buffers[rank],startBuffer,size); 313 if (timeLine>0) lastTimeLine[rank]=timeLine ; 314 } 296 315 buffer.advance(size); 297 316 count=buffer.remain(); 298 317 } 299 300 if (timeLine>0) lastTimeLine[rank]=timeLine ;301 318 302 319 CTimer::get("Process request").suspend(); … … 310 327 // if (context->isProcessingEvent()) return ; 311 328 if (isProcessingEvent_) return ; 329 if (isAttachedModeEnabled()) 330 if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ; 312 331 313 332 it=events.find(currentTimeLine); … … 356 375 currentTimeLine++; 357 376 scheduled = false; 377 if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ; 358 378 } 359 379 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.hpp
r1853 r2130 72 72 bool isProcessingEvent_ ; 73 73 CContextClient* associatedClient_ ; 74 size_t remoteHashId_; //!< the hash is of the calling context client 74 75 } ; 75 76 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/distribution/grid_scatterer_connector.hpp
r1934 r2130 42 42 } 43 43 44 44 const map<int,int>& getTransferedDataSize(void) {return dstSize_;} 45 45 46 template<typename T, int N> 46 47 void transfer(const CArray<T,N>& input, map<int, CArray<T,1>>& output) -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_to_server_store_filter.cpp
r2022 r2130 28 28 CTimer::get("Field : send data").suspend(); 29 29 } 30 31 CContextClient* CClientToServerStoreFilter::getTransferedDataSize(map<int,int>& size) 32 { 33 size = field_->getSentGrid()->getClientToServerConnector(client_)->getTransferedDataSize() ; 34 return client_ ; 35 } 30 36 31 37 bool CClientToServerStoreFilter::mustAutoTrigger() const -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_to_server_store_filter.hpp
r1935 r2130 23 23 */ 24 24 CClientToServerStoreFilter(CGarbageCollector& gc, CField* field, CContextClient* client); 25 /*! 26 * Get the size of data transfered by call. Needed for context client buffer size evaluation 27 * 28 * \param size : map returning the size for each server rank 29 * \return the associated context client 30 */ 31 CContextClient* getTransferedDataSize(map<int,int>& size) ; 25 32 26 33 /*! -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/server_to_client_store_filter.cpp
r1934 r2130 44 44 } 45 45 46 CContextClient* CServerToClientStoreFilter::getTransferedDataSize(map<int,int>& size) 47 { 48 size = grid_->getServerToClientConnector()->getTransferedDataSize() ; 49 return client_ ; 50 } 46 51 47 52 bool CServerToClientStoreFilter::mustAutoTrigger() const -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/server_to_client_store_filter.hpp
r1934 r2130 24 24 */ 25 25 CServerToClientStoreFilter(CGarbageCollector& gc, CField* field, CContextClient* client); 26 27 /*! 28 * Get the size of data transfered by call. Needed for context client buffer size evaluation 29 * 30 * \param size : map returning the size for each server rank 31 * \return the associated context client 32 */ 33 CContextClient* getTransferedDataSize(map<int,int>& size) ; 26 34 27 35 /*! -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/daemons_manager.hpp
r1764 r2130 1 1 #ifndef __DAEMONS_MANAGER_HPP__ 2 2 #define __DAEMONS_MANAGER_HPP__ 3 #include <cstddef> 3 4 4 5 namespace xios … … 16 17 bool servicesEventLoop(void) ; 17 18 19 void scheduleContext(size_t hashId) { scheduledContext_=hashId ;} //!< for attached mode, give the hand to the associated context server 20 bool isScheduledContext(size_t hashId) { return scheduledContext_==hashId ;} //!< for attached mode, return true if context server is sceduled 21 void unscheduleContext(void) { scheduledContext_=0 ;} //!< for attached mode : unschedule context 22 18 23 private: 19 24 bool isServer_ ; 25 size_t scheduledContext_ = 0 ; //!< Hash id of the next scehduled context for attached mode 20 26 } ; 21 27 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.hpp
r1764 r2130 33 33 void freeComm(void) ; 34 34 bool isAttachedMode(void) { return isAttachedMode_ ;} 35 CService* getParentService(void) {return parentService_ ; } 36 35 37 private: 36 38 void createIntercomm(void) ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.hpp
r1764 r2130 37 37 int getType(void) {return type_;} 38 38 int getNbPartitions(void) {return nbPartitions_;} 39 39 40 40 private: 41 41 void sendNotification(int rank) ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
r2124 r2130 292 292 293 293 294 void CContext::setClientServerBuffer(vector<CField*>& fields, bool bufferForWriting)295 TRY296 {297 // Estimated minimum event size for small events (20 is an arbitrary constant just for safety)298 const size_t minEventSize = CEventClient::headerSize + 20 * sizeof(int);299 // Ensure there is at least some room for 20 of such events in the buffers300 size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize);301 302 #define DECLARE_NODE(Name_, name_) \303 if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);304 #define DECLARE_NODE_PAR(Name_, name_)305 #include "node_type.conf"306 #undef DECLARE_NODE307 #undef DECLARE_NODE_PAR308 309 310 map<CContextClient*,map<int,size_t>> dataSize ;311 map<CContextClient*,map<int,size_t>> maxEventSize ;312 map<CContextClient*,map<int,size_t>> attributesSize ;313 314 for(auto field : fields)315 {316 field->setContextClientDataBufferSize(dataSize, maxEventSize, bufferForWriting) ;317 field->setContextClientAttributesBufferSize(attributesSize, maxEventSize, bufferForWriting) ;318 }319 320 321 for(auto& it : attributesSize)322 {323 auto contextClient = it.first ;324 auto& contextDataSize = dataSize[contextClient] ;325 auto& contextAttributesSize = attributesSize[contextClient] ;326 auto& contextMaxEventSize = maxEventSize[contextClient] ;327 328 for (auto& it : contextAttributesSize)329 {330 auto serverRank=it.first ;331 auto& buffer = contextAttributesSize[serverRank] ;332 if (contextDataSize[serverRank] > buffer) buffer=contextDataSize[serverRank] ;333 buffer *= CXios::bufferSizeFactor;334 if (buffer < minBufferSize) buffer = minBufferSize;335 if (buffer > CXios::maxBufferSize ) buffer = CXios::maxBufferSize;336 }337 338 // Leaders will have to send some control events so ensure there is some room for those in the buffers339 if (contextClient->isServerLeader())340 for(auto& rank : contextClient->getRanksServerLeader())341 if (!contextAttributesSize.count(rank))342 {343 contextAttributesSize[rank] = minBufferSize;344 contextMaxEventSize[rank] = minEventSize;345 }346 347 contextClient->setBufferSize(contextAttributesSize, contextMaxEventSize);348 }349 }350 CATCH_DUMP_ATTR351 352 353 /*!354 Sets client buffers.355 \param [in] contextClient356 \param [in] bufferForWriting True if buffers are used for sending data for writing357 This flag is only true for client and server-1 for communication with server-2358 */359 // ym obsolete to be removed360 void CContext::setClientServerBuffer(CContextClient* contextClient, bool bufferForWriting)361 TRY362 {363 // Estimated minimum event size for small events (20 is an arbitrary constant just for safety)364 const size_t minEventSize = CEventClient::headerSize + 20 * sizeof(int);365 366 // Ensure there is at least some room for 20 of such events in the buffers367 size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize);368 369 #define DECLARE_NODE(Name_, name_) \370 if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);371 #define DECLARE_NODE_PAR(Name_, name_)372 #include "node_type.conf"373 #undef DECLARE_NODE374 #undef DECLARE_NODE_PAR375 376 // Compute the buffer sizes needed to send the attributes and data corresponding to fields377 std::map<int, StdSize> maxEventSize;378 std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize, contextClient, bufferForWriting);379 std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize, contextClient, bufferForWriting);380 381 std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();382 for (it = dataBufferSize.begin(); it != ite; ++it)383 if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;384 385 // Apply the buffer size factor, check that we are above the minimum buffer size and below the maximum size386 ite = bufferSize.end();387 for (it = bufferSize.begin(); it != ite; ++it)388 {389 it->second *= CXios::bufferSizeFactor;390 if (it->second < minBufferSize) it->second = minBufferSize;391 if (it->second > CXios::maxBufferSize) it->second = CXios::maxBufferSize;392 }393 394 // Leaders will have to send some control events so ensure there is some room for those in the buffers395 if (contextClient->isServerLeader())396 {397 const std::list<int>& ranks = contextClient->getRanksServerLeader();398 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)399 {400 if (!bufferSize.count(*itRank))401 {402 bufferSize[*itRank] = minBufferSize;403 maxEventSize[*itRank] = minEventSize;404 }405 }406 }407 contextClient->setBufferSize(bufferSize, maxEventSize);408 }409 CATCH_DUMP_ATTR410 411 294 /*! 412 295 * Compute the required buffer size to send the fields data. … … 716 599 CATCH_DUMP_ATTR 717 600 601 void CContext::globalEventLoop(void) 602 { 603 lockContext() ; 604 CXios::getDaemonsManager()->eventLoop() ; 605 unlockContext() ; 606 setCurrent(getId()) ; 607 } 608 718 609 bool CContext::scheduledEventLoop(bool enableEventsProcessing) 719 610 { … … 739 630 { 740 631 bool finished; 632 if (isLockedContext()) return ; 633 741 634 setCurrent(getId()) ; 742 635 … … 792 685 couplerOutClient_[fullContextId] = client ; 793 686 couplerOutServer_[fullContextId] = server ; 794 795 /*796 // for now, we don't now which beffer size must be used for client coupler797 // It will be evaluated later. Fix a constant size for now...798 // set to 10Mb for development799 map<int,size_t> bufferSize, maxEventSize ;800 for(int i=0;i<client->getRemoteSize();i++)801 {802 bufferSize[i]=10000000 ;803 maxEventSize[i]=10000000 ;804 }805 806 client->setBufferSize(bufferSize, maxEventSize);807 */808 687 } 809 688 } … … 827 706 MPI_Comm_free(&interComm) ; 828 707 829 map<int,size_t> bufferSize, maxEventSize ;830 for(int i=0;i<client->getRemoteSize();i++)831 {832 bufferSize[i]=10000000 ;833 maxEventSize[i]=10000000 ;834 }835 836 client->setBufferSize(bufferSize, maxEventSize);837 708 couplerInClient_[fullContextId] = client ; 838 709 couplerInServer_[fullContextId] = server ; … … 840 711 } 841 712 842 void CContext::globalEventLoop(void)843 {844 CXios::getDaemonsManager()->eventLoop() ;845 setCurrent(getId()) ;846 }847 848 849 713 void CContext::finalize(void) 850 714 TRY … … 1054 918 // connect to couplerOut -> to do 1055 919 } 1056 if (first) setClientServerBuffer(couplerOutField, true) ; // set buffer context --> to check1057 920 1058 921 bool couplersReady ; … … 1099 962 field->connectToFileServer(garbageCollector) ; // connect the field to server filter 1100 963 } 1101 setClientServerBuffer(fileOutField, true) ; // set buffer context --> to review1102 964 for(auto field : fileOutField) field->sendFieldToFileServer() ; 1103 965 } … … 1207 1069 this->scheduledEventLoop() ; 1208 1070 } while (!ok) ; 1071 1072 // Now evaluate the size of the context client buffers 1073 map<CContextClient*,map<int,size_t>> fieldBufferEvaluation ; 1074 for(auto field : fileOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to server 1075 for(auto field : couplerOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to coupler 1076 for(auto field : fieldModelIn) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // server to client (for io servers) 1077 1078 // fix size for each context client 1079 for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ; 1080 1209 1081 1210 1082 CTimer::get("Context : close definition").suspend() ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.hpp
r2123 r2130 150 150 std::map<int, StdSize> getAttributesBufferSize(std::map<int, StdSize>& maxEventSize, CContextClient* contextClient, bool bufferForWriting = false); 151 151 std::map<int, StdSize> getDataBufferSize(std::map<int, StdSize>& maxEventSize, CContextClient* contextClient, bool bufferForWriting = false); 152 void setClientServerBuffer(CContextClient* contextClient, bool bufferForWriting = false); // old interface to be removed153 void setClientServerBuffer(vector<CField*>& fields, bool bufferForWriting) ;154 152 155 153 // Distribute files (in write mode) among secondary-server pools according to the estimated data flux … … 357 355 string contextId_ ; //!< context client id for the servers. For clients this is same as getId() 358 356 bool isProcessingEvent_ ; 357 private: 359 358 CServerContext* parentServerContext_ ; 360 359 public: 360 CServerContext* getParentServerContext(void) { return parentServerContext_; } 361 private: 362 bool lockedContext_=false; 363 public: 364 void lockContext(void) {lockedContext_=true; } 365 void unlockContext(void) {lockedContext_=true; } 366 bool isLockedContext(void) { return lockedContext_;} 361 367 public: // Some function maybe removed in the near future 362 368 // virtual void toBinary (StdOStream & os) const; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp
r2022 r2130 497 497 CATCH_DUMP_ATTR 498 498 499 bool CField::evaluateBufferSize(map<CContextClient*,map<int,size_t>>& evaluateBuffer, bool isOptPerformance) 500 { 501 CContextClient* client=nullptr ; 502 503 for(int i=0;i<2;i++) 504 { 505 map<int,int> dataSize ; 506 if (i==0 && clientToServerStoreFilter_) client = clientToServerStoreFilter_-> getTransferedDataSize(dataSize) ; 507 if (i==1 && serverToClientStoreFilter_) client = serverToClientStoreFilter_-> getTransferedDataSize(dataSize) ; 508 509 if (client!=nullptr) 510 { 511 map<int,size_t> bufferSize ; 512 513 if (evaluateBuffer.count(client)!=0) bufferSize = evaluateBuffer[client] ; 514 if (isOptPerformance) 515 { 516 for(auto& it : dataSize) 517 { 518 if (bufferSize.count(it.first)==0) bufferSize[it.first]=it.second ; 519 else bufferSize[it.first]+=it.second ; 520 } 521 } 522 else 523 { 524 for(auto& it : dataSize) 525 { 526 if (bufferSize.count(it.first)==0) bufferSize[it.first]=it.second ; 527 else bufferSize[it.first]=std::max(bufferSize[it.first],(size_t)it.second) ; 528 } 529 } 530 evaluateBuffer[client] = bufferSize ; 531 client=nullptr ; 532 } 533 } 534 if (client==nullptr) return false ; 535 else return true; 536 } 499 537 500 538 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.hpp
r2022 r2130 110 110 // Grid data buffer size for each connection of contextclient 111 111 std::map<int, StdSize> getGridDataBufferSize(CContextClient* client, bool bufferForWriting = false); 112 112 113 // evaluation the size of the buffer for the field 114 bool evaluateBufferSize(map<CContextClient*,map<int,size_t>>& evaluateBuffer, bool isOptPerformance) ; 113 115 public: 114 116 void makeGridAliasForCoupling(void) ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/object_factory_decl_macro.hpp
r1869 r2130 19 19 20 20 21
Note: See TracChangeset
for help on using the changeset viewer.