Changeset 2547 for XIOS3/trunk/src/transport/legacy_context_server.cpp
- Timestamp:
- 08/29/23 17:24:04 (10 months ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/transport/legacy_context_server.cpp
r2528 r2547 43 43 finished=false; 44 44 45 if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 46 else interCommMerged_ = interComm_; // interComm_ is yet an intracommunicator in attached 45 MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 47 46 MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows 48 47 … … 50 49 51 50 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 52 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode53 54 51 } 55 52 … … 115 112 remoteHashId_ = recvBuff[0] ; 116 113 StdSize buffSize = recvBuff[1]; 117 vector<MPI_Aint> win Adress(2) ;118 win Adress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;114 vector<MPI_Aint> winBufferAddress(2) ; 115 winBufferAddress[0]=recvBuff[2] ; winBufferAddress[1]=recvBuff[3] ; 119 116 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 120 117 121 118 // create windows dynamically for one-sided 122 if (!isAttachedModeEnabled()) 123 { 124 CTimer::get("create Windows").resume() ; 125 MPI_Comm interComm ; 126 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ; 127 MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 128 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 129 MPI_Comm_free(&interComm) ; 130 windows_[rank].resize(2) ; 131 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 132 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 133 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 134 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 135 CTimer::get("create Windows").suspend() ; 136 MPI_Barrier(winComm_[rank]) ; 137 } 138 else 139 { 140 winComm_[rank] = MPI_COMM_NULL ; 141 windows_[rank].resize(2) ; 142 windows_[rank][0] = MPI_WIN_NULL ; 143 windows_[rank][1] = MPI_WIN_NULL ; 144 } 145 146 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 119 int dummy ; 120 MPI_Send(&dummy, 0, MPI_INT, rank, 21,interCommMerged_) ; 121 CTimer::get("create Windows").resume() ; 122 MPI_Comm interComm ; 123 int tag = 0 ; 124 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, tag , &interComm) ; 125 MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 126 MPI_Comm_free(&interComm) ; 127 windows_[rank].resize(2) ; 128 //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 129 //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 130 //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 131 //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 132 windows_[rank][0] = new CWindowDynamic() ; 133 windows_[rank][1] = new CWindowDynamic() ; 134 windows_[rank][0] -> create(winComm_[rank]) ; 135 windows_[rank][1] -> create(winComm_[rank]) ; 136 windows_[rank][0] -> setWinBufferAddress(winBufferAddress[0],0) ; 137 windows_[rank][1] -> setWinBufferAddress(winBufferAddress[1],0) ; 138 CTimer::get("create Windows").suspend() ; 139 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 140 MPI_Barrier(winComm_[rank]) ; 141 142 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winBufferAddress, 0, buffSize)))).first; 147 143 lastTimeLine[rank]=0 ; 148 144 itLastTimeLine=lastTimeLine.begin() ; … … 230 226 { 231 227 CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ; 232 if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 233 { 234 int rank ; 235 char *buffer ; 236 size_t count ; 237 238 if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 239 for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 240 { 241 rank=itLastTimeLine->first ; 242 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 243 { 244 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 245 if (count >= 0) ++itLastTimeLine ; 246 break ; 247 } 228 229 int rank ; 230 char *buffer ; 231 size_t count ; 232 233 if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 234 for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 235 { 236 rank=itLastTimeLine->first ; 237 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 238 { 239 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 240 if (count >= 0) ++itLastTimeLine ; 241 break ; 248 242 } 249 243 } … … 280 274 { 281 275 size_t newSize ; 282 vector<MPI_Aint> win Adress(2) ;283 newBuffer>>newSize>>win Adress[0]>>winAdress[1] ;276 vector<MPI_Aint> winBufferAdress(2) ; 277 newBuffer>>newSize>>winBufferAdress[0]>>winBufferAdress[1] ; 284 278 buffers[rank]->freeBuffer(count) ; 285 279 delete buffers[rank] ; 286 buffers[rank] = new CServerBuffer(windows_[rank], winAdress, 0, newSize) ; 280 windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0],0) ; 281 windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1],0) ; 282 buffers[rank] = new CServerBuffer(windows_[rank], winBufferAdress, 0, newSize) ; 287 283 info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank 288 <<" newSize : "<<newSize<<" Address : "<<win Adress[0]<<" & "<<winAdress[1]<<endl ;284 <<" newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ; 289 285 } 290 286 else … … 309 305 CEventServer* event; 310 306 311 // if (context->isProcessingEvent()) return ;312 307 if (isProcessingEvent_) return ; 313 if (isAttachedModeEnabled())314 if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;315 308 316 309 it=events.find(currentTimeLine); … … 321 314 if (event->isFull()) 322 315 { 323 if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side316 if (!scheduled) 324 317 { 325 318 eventScheduler_->registerEvent(currentTimeLine,hashId); … … 327 320 scheduled=true; 328 321 } 329 else if ( isAttachedModeEnabled() ||eventScheduler_->queryEvent(currentTimeLine,hashId) )322 else if (eventScheduler_->queryEvent(currentTimeLine,hashId) ) 330 323 { 331 324 if (!enableEventsProcessing && isCollectiveEvent(*event)) return ; … … 346 339 } 347 340 348 if (CXios::checkEventSync )341 if (CXios::checkEventSync && context->getServiceType()!=CServicesManager::CLIENT) 349 342 { 350 343 int typeId, classId, typeId_in, classId_in; … … 364 357 } 365 358 366 if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ; 367 //MPI_Barrier(intraComm) ; 368 // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes 369 // The best way to properly solve this problem will be to use the event scheduler also in attached mode 370 // for now just set up a MPI barrier 371 //ym to be check later 372 // if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ; 373 374 // context->setProcessingEvent() ; 375 isProcessingEvent_=true ; 376 CTimer::get("Process events").resume(); 377 info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 378 dispatchEvent(*event); 379 CTimer::get("Process events").suspend(); 380 isProcessingEvent_=false ; 381 // context->unsetProcessingEvent() ; 382 pendingEvent=false; 383 delete event; 384 events.erase(it); 385 currentTimeLine++; 386 scheduled = false; 387 if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ; 359 isProcessingEvent_=true ; 360 CTimer::get("Process events").resume(); 361 info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 362 eventScheduler_->popEvent() ; 363 dispatchEvent(*event); 364 CTimer::get("Process events").suspend(); 365 isProcessingEvent_=false ; 366 pendingEvent=false; 367 delete event; 368 events.erase(it); 369 currentTimeLine++; 370 scheduled = false; 388 371 } 389 372 } … … 409 392 void CLegacyContextServer::freeWindows() 410 393 { 411 //if (!isAttachedModeEnabled()) 412 //{ 413 // for(auto& it : winComm_) 414 // { 415 // int rank = it.first ; 416 // MPI_Win_free(&windows_[rank][0]); 417 // MPI_Win_free(&windows_[rank][1]); 418 // MPI_Comm_free(&winComm_[rank]) ; 419 // } 420 //} 394 for(auto& it : winComm_) 395 { 396 int rank = it.first ; 397 delete windows_[rank][0]; 398 delete windows_[rank][1]; 399 } 421 400 } 422 401
Note: See TracChangeset
for help on using the changeset viewer.