Changeset 1554
- Timestamp:
- 06/28/18 18:03:28 (6 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_ONE_SIDED/src
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.cpp
r1547 r1554 125 125 // We force the getBuffers call to be non-blocking on classical servers 126 126 list<CBufferOut*> buffList; 127 // bool couldBuffer = getBuffers(timeLine, ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) ));128 127 getBuffers(timeLine, ranks, sizes, buffList) ; 129 128 … … 195 194 } 196 195 197 if (CXios::isServer) info(100)<<" getBuffers : entering loop"<<endl ;198 196 CTimer::get("Blocking time").resume(); 199 197 do … … 205 203 } 206 204 207 if (CXios::isServer) info(100)<<" getBuffers : areBuffersFree ? "<<areBuffersFree<<endl ; ;208 205 if (!areBuffersFree) 209 206 { 210 207 for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 211 if (CXios::isServer) info(100)<<" getBuffers : buffers unlocked "<<endl ;212 208 checkBuffers(); 213 if (CXios::isServer) info(100)<<" getBuffers : buffers checked "<<endl ;214 209 if (CServer::serverLevel == 0) context->server->listen(); 215 if (CXios::isServer) info(100)<<" getBuffers : server listened... "<<endl ;216 210 else if (CServer::serverLevel == 1) 217 211 { … … 225 219 } 226 220 } while (!areBuffersFree && !nonBlocking); 227 if (CXios::isServer) info(100)<<" getBuffers : out of loop"<<endl ;228 221 CTimer::get("Blocking time").suspend(); 229 222 … … 233 226 retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 234 227 } 235 if (CXios::isServer) info(100)<<" getBuffers : message pushed"<<endl ;236 228 return areBuffersFree; 237 229 } … … 260 252 MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &OneSidedInterComm ); 261 253 MPI_Intercomm_merge(OneSidedInterComm,false,&oneSidedComm); 262 info(100)<<"DEBUG: before creating windows (client)"<<endl ;263 254 buffer->createWindows(oneSidedComm) ; 264 info(100)<<"DEBUG: after creating windows (client)"<<endl ;265 255 } 266 256 … … 286 276 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 287 277 { 288 // CEventClient event(CContext::GetType(), CContext::EVENT_ID_CLOSE_P2P_CHANNEL); 289 // CMessage msg; 290 // event.push(itBuff->first, 1, msg); 291 // timeLine = std::numeric_limits<size_t>::max() ; 292 // sendEvent(event); 293 // while (itBuff->second->checkBuffer(!pureOneSided)); 294 delete itBuff->second; 278 delete itBuff->second; 295 279 } 296 280 buffers.clear(); -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.cpp
r1547 r1554 84 84 bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 85 85 { 86 // info(100)<<"CContextServer::eventLoop : listen"<<endl ;87 86 listen(); 88 // info(100)<<"CContextServer::eventLoop : checkPendingRequest"<<endl ;89 87 checkPendingRequest(); 90 // info(100)<<"CContextServer::eventLoop : process events"<<endl ;91 88 if (enableEventsProcessing) processEvents(); 92 // info(100)<<"CContextServer::eventLoop : finished "<<finished<<endl ;93 89 return finished; 94 90 } … … 150 146 MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0, &OneSidedInterComm ); 151 147 MPI_Intercomm_merge(OneSidedInterComm,true,&oneSidedComm); 152 info(100)<<"DEBUG: before creating windows (server)"<<endl ;153 148 buffers[rank]->createWindows(oneSidedComm) ; 154 info(100)<<"DEBUG: before creating windows (server)"<<endl ;155 149 } 156 150 lastTimeLine[rank]=0 ; … … 223 217 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 224 218 { 225 info(100)<<"get buffer from client : timeLine "<<timeLine<<endl ;226 219 processRequest(rank, buffer, count); 227 220 break ; … … 249 242 CBufferIn newBuffer(startBuffer,buffer.remain()); 250 243 newBuffer>>size>>timeLine; 251 info(100)<<"new event : timeLine : "<<timeLine<<" size : "<<size<<endl ;252 244 it=events.find(timeLine); 253 245 if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first; -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/node/context.cpp
r1542 r1554 404 404 { 405 405 client->checkBuffers(); 406 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 407 if (hasTmpBufferedEvent) 408 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 409 // Don't process events if there is a temporarily buffered event 410 return server->eventLoop(!hasTmpBufferedEvent || !enableEventsProcessing); 406 return server->eventLoop(true); 411 407 } 412 408 else if (CServer::serverLevel == 1) 413 409 { 414 if (!finalized) 415 client->checkBuffers(); 410 if (!finalized) client->checkBuffers(); 416 411 bool serverFinished = true; 417 if (!finalized) 418 serverFinished = server->eventLoop(enableEventsProcessing); 412 if (!finalized) serverFinished = server->eventLoop(enableEventsProcessing); 419 413 bool serverPrimFinished = true; 420 414 for (int i = 0; i < clientPrimServer.size(); ++i) 421 415 { 422 if (!finalized) 423 clientPrimServer[i]->checkBuffers(); 424 if (!finalized) 425 serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 416 if (!finalized) clientPrimServer[i]->checkBuffers(); 417 if (!finalized) serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 426 418 } 427 419 return ( serverFinished && serverPrimFinished); … … 456 448 { 457 449 ++countChildCtx_; 458 450 451 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 459 452 client->finalize(); 460 while (client->havePendingRequests()) 461 client->checkBuffers(); 462 453 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 454 while (client->havePendingRequests()) client->checkBuffers(); 455 client->releaseBuffers(); 456 457 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 463 458 while (!server->hasFinished()) 464 459 server->eventLoop(); 465 460 info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ; 466 461 if (hasServer) // Mode attache 467 462 { … … 472 467 473 468 //! Deallocate client buffers 474 client->releaseBuffers();475 469 // client->releaseBuffers(); 470 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 476 471 //! Free internally allocated communicators 477 472 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) … … 488 483 if (countChildCtx_ == 0) 489 484 for (int i = 0; i < clientPrimServer.size(); ++i) 485 { 490 486 clientPrimServer[i]->finalize(); 487 bool bufferReleased; 488 do 489 { 490 clientPrimServer[i]->checkBuffers(); 491 bufferReleased = !clientPrimServer[i]->havePendingRequests(); 492 } while (!bufferReleased); 493 clientPrimServer[i]->releaseBuffers(); 494 } 495 491 496 492 497 // (Last) context finalized message received … … 494 499 { 495 500 // Blocking send of context finalize message to its client (e.g. primary server or model) 496 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize <<"<<endl ;501 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 497 502 client->finalize(); 503 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 498 504 bool bufferReleased; 499 505 do … … 502 508 bufferReleased = !client->havePendingRequests(); 503 509 } while (!bufferReleased); 510 client->releaseBuffers(); 504 511 finalized = true; 505 512 info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ; 513 506 514 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 507 515 if (hasServer && !hasClient) … … 512 520 513 521 //! Deallocate client buffers 514 client->releaseBuffers(); 522 // client->releaseBuffers(); 523 info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ; 524 525 /* 515 526 for (int i = 0; i < clientPrimServer.size(); ++i) 516 527 clientPrimServer[i]->releaseBuffers(); 517 528 */ 518 529 //! Free internally allocated communicators 519 530 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
Note: See TracChangeset
for help on using the changeset viewer.