Changeset 2246 for XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp
- Timestamp:
- 10/11/21 14:41:56 (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp
r2240 r2246 46 46 else attachedMode=true ; 47 47 48 if (flag) MPI_Comm_remote_size(interComm,&commSize); 49 else MPI_Comm_size(interComm,&commSize); 48 int clientSize ; 49 if (flag) MPI_Comm_remote_size(interComm,&clientSize); 50 else MPI_Comm_size(interComm,&clientSize); 50 51 51 52 … … 75 76 if (!isAttachedModeEnabled()) 76 77 { 78 CTimer::get("create Windows").resume() ; 79 77 80 MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 78 // create windows for one sided comm 79 int interCommMergedRank; 81 82 // We create dummy pair of intercommunicator between clients and server 83 // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 84 // We don't know the reason 85 MPI_Comm commSelf ; 86 MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ; 87 vector<MPI_Comm> dummyComm(clientSize) ; 88 for(int rank=0; rank<clientSize ; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &dummyComm[rank]) ; 89 90 // create windows for one sided comm 80 91 MPI_Comm winComm ; 81 MPI_Comm_rank(intraComm, &interCommMergedRank);82 92 windows.resize(2) ; 83 for(int rank=c ommSize; rank<commSize+intraCommSize; rank++)84 { 85 if (rank==c ommSize+interCommMergedRank)93 for(int rank=clientSize; rank<clientSize+intraCommSize; rank++) 94 { 95 if (rank==clientSize+intraCommRank) 86 96 { 87 MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 88 int myRank ; 89 MPI_Comm_rank(winComm,&myRank); 97 MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 90 98 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 91 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]); 99 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]); 92 100 } 93 else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 94 // ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 95 // Bug or not ? 96 // MPI_Comm_free(&winComm) ; 97 } 101 else MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 102 // ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 103 // Bug or not ? 104 // MPI_Comm_free(&winComm) ; 105 } 106 107 // free dummy intercommunicator 108 for(int rank=0; rank<clientSize ; rank++) MPI_Comm_free(&dummyComm[rank]) ; 109 MPI_Comm_free(&commSelf) ; 110 CTimer::get("create Windows").suspend() ; 98 111 } 99 112 else … … 103 116 windows[1]=MPI_WIN_NULL ; 104 117 } 105 106 107 118 108 MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ;109 119 itLastTimeLine=lastTimeLine.begin() ; 110 120 … … 138 148 bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 139 149 { 150 CTimer::get("listen request").resume(); 140 151 listen(); 152 CTimer::get("listen request").suspend(); 153 CTimer::get("check pending request").resume(); 141 154 checkPendingRequest(); 155 checkPendingProbe() ; 156 CTimer::get("check pending request").suspend(); 157 CTimer::get("check event process").resume(); 142 158 if (enableEventsProcessing) processEvents(); 159 CTimer::get("check event process").suspend(); 143 160 return finished; 144 161 } 145 162 /* 146 163 void CContextServer::listen(void) 147 164 { … … 221 238 } 222 239 } 240 */ 241 242 void CContextServer::listen(void) 243 { 244 int rank; 245 int flag; 246 int count; 247 char * addr; 248 MPI_Status status; 249 MPI_Message message ; 250 map<int,CServerBuffer*>::iterator it; 251 bool okLoop; 252 253 traceOff(); 254 MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status); 255 traceOn(); 256 if (flag==true) listenPendingRequest(message, status) ; 257 } 258 259 bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status) 260 { 261 int count; 262 char * addr; 263 map<int,CServerBuffer*>::iterator it; 264 int rank=status.MPI_SOURCE ; 265 266 it=buffers.find(rank); 267 if (it==buffers.end()) // Receive the buffer size and allocate the buffer 268 { 269 MPI_Aint recvBuff[4] ; 270 MPI_Mrecv(recvBuff, 4, MPI_AINT, &message, &status); 271 remoteHashId_ = recvBuff[0] ; 272 StdSize buffSize = recvBuff[1]; 273 vector<MPI_Aint> winAdress(2) ; 274 winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 275 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 276 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 277 lastTimeLine[rank]=0 ; 278 itLastTimeLine=lastTimeLine.begin() ; 279 return true; 280 } 281 else 282 { 283 std::pair<MPI_Message,MPI_Status> mypair(message,status) ; 284 pendingProbe[rank].push_back(mypair) ; 285 return false; 286 } 287 } 288 289 void CContextServer::checkPendingProbe(void) 290 { 291 292 list<int> recvProbe ; 293 list<int>::iterator itRecv ; 294 map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe; 295 296 for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++) 297 { 298 int rank=itProbe->first ; 299 if (pendingRequest.count(rank)==0) 300 { 301 MPI_Message& message = itProbe->second.front().first ; 302 MPI_Status& status = itProbe->second.front().second ; 303 int count ; 304 MPI_Get_count(&status,MPI_CHAR,&count); 305 map<int,CServerBuffer*>::iterator it = buffers.find(rank); 306 if (it->second->isBufferFree(count)) 307 { 308 char * addr; 309 addr=(char*)it->second->getBuffer(count); 310 MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]); 311 bufferRequest[rank]=addr; 312 recvProbe.push_back(rank) ; 313 itProbe->second.pop_front() ; 314 } 315 } 316 } 317 318 for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ; 319 } 223 320 224 321 … … 232 329 int count; 233 330 MPI_Status status; 331 332 if (!pendingRequest.empty()) CTimer::get("receiving requests").resume(); 333 else CTimer::get("receiving requests").suspend(); 234 334 235 335 for(it=pendingRequest.begin();it!=pendingRequest.end();it++) … … 257 357 void CContextServer::getBufferFromClient(size_t timeLine) 258 358 { 359 CTimer::get("CContextServer::getBufferFromClient").resume() ; 259 360 if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 260 361 { … … 267 368 { 268 369 rank=itLastTimeLine->first ; 269 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0 )370 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 270 371 { 271 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 272 { 273 processRequest(rank, buffer, count); 274 break ; 275 } 372 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 373 if (count >= 0) break ; 276 374 } 277 375 } 278 376 } 377 CTimer::get("CContextServer::getBufferFromClient").suspend() ; 279 378 } 280 379 … … 388 487 } 389 488 } 390 else getBufferFromClient(currentTimeLine) ;489 else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; 391 490 } 392 491 else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line … … 441 540 // releaseBuffers() ; 442 541 notifyClientsFinalize() ; 542 CTimer::get("receiving requests").suspend(); 443 543 context->finalize(); 444 544 … … 446 546 MPI_Win_free(&windows[0]) ; 447 547 MPI_Win_free(&windows[1]) ; 448 548 449 549 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 450 550 iteMap = mapBufferSize_.end(), itMap;
Note: See TracChangeset
for help on using the changeset viewer.