Changeset 1130 for XIOS/dev/dev_olga/src
- Timestamp:
- 05/15/17 15:00:24 (7 years ago)
- Location:
- XIOS/dev/dev_olga/src
- Files:
-
- 14 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/buffer_client.cpp
r1077 r1130 74 74 int flag; 75 75 76 int error, errclass, len;77 char errstring[MPI_MAX_ERROR_STRING];78 79 76 if (pending) 80 77 { 81 78 traceOff(); 82 MPI_Errhandler_set(interComm,MPI_ERRORS_RETURN); 83 error=MPI_Test(&request, &flag, &status); 84 if (error != MPI_SUCCESS) 85 { 86 MPI_Error_class(error, &errclass); 87 MPI_Error_string(error, errstring, &len); 88 ERROR("MPI error class: ", <<errclass<<" MPI error "<<errstring ); 89 } 79 MPI_Test(&request, &flag, &status); 90 80 traceOn(); 91 81 if (flag == true) pending = false; … … 96 86 if (count > 0) 97 87 { 98 MPI_Errhandler_set(interComm,MPI_ERRORS_RETURN); 99 error = MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 100 if (error != MPI_SUCCESS) 101 { 102 MPI_Error_class(error, &errclass); 103 MPI_Error_string(error, errstring, &len); 104 ERROR("MPI error class: ", <<errclass<<" MPI error "<<errstring ); 105 } 88 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 106 89 pending = true; 107 90 if (current == 1) current = 0; -
XIOS/dev/dev_olga/src/client.cpp
r1054 r1130 235 235 } 236 236 237 ///---------------------------------------------------------------238 /*!239 * \fn void CClient::registerContextByClienOfServer(const string& id, MPI_Comm contextComm)240 * \brief Sends a request to create contexts on secondary servers. Creates clientPrimServer/serverPrimServer contexts.241 * \param [in] id id of context.242 * \param [in] contextComm.243 * Function is called by primary server.244 * The only difference with CClient::registerContext() is naming of contexts on servers (appearing of pool id at the end).245 */246 // void CClient::registerContextByClientOfServer(const string& id, MPI_Comm contextComm)247 // {248 // CContext::setCurrent(id) ;249 // CContext* context=CContext::create(id);250 // StdString idServer(id);251 // idServer += "_server_";252 //253 // int size,rank,globalRank ;254 // size_t message_size ;255 // int leaderRank ;256 // MPI_Comm contextInterComm ;257 //258 // MPI_Comm_size(contextComm,&size) ;259 // MPI_Comm_rank(contextComm,&rank) ;260 // MPI_Comm_rank(CXios::globalComm,&globalRank) ;261 // if (rank!=0) globalRank=0 ;262 //263 // CMessage msg ;264 //265 // int messageSize ;266 // void * buff ;267 //268 // for (int i = 0; i < serverLeader.size(); ++i)269 // {270 // StdString str = idServer + boost::lexical_cast<string>(i);271 // msg<<str<<size<<globalRank ;272 // messageSize = msg.size() ;273 // buff = new char[messageSize] ;274 // CBufferOut buffer(buff,messageSize) ;275 // buffer<<msg ;276 //277 // MPI_Send(buff, buffer.count(), MPI_CHAR, serverLeader[i], 1, CXios::globalComm) ;278 // MPI_Intercomm_create(contextComm, 0, CXios::globalComm, serverLeader[i], 10+globalRank, &contextInterComm) ;279 // info(10)<<"Register new Context : "<<id<<endl ;280 // MPI_Comm inter ;281 // MPI_Intercomm_merge(contextInterComm,0,&inter) ;282 // MPI_Barrier(inter) ;283 //284 // context->initClient(contextComm,contextInterComm) ;285 //286 //// contextInterComms.push_back(contextInterComm);287 // MPI_Comm_free(&inter);288 // delete [] buff ;289 // }290 // }291 292 237 void CClient::finalize(void) 293 238 { -
XIOS/dev/dev_olga/src/client.hpp
r1054 r1130 14 14 static void finalize(void); 15 15 static void registerContext(const string& id, MPI_Comm contextComm); 16 // static void registerContextByClientOfServer(const string& id, MPI_Comm contextComm);17 16 18 17 static MPI_Comm intraComm; -
XIOS/dev/dev_olga/src/context_client.cpp
r1077 r1130 11 11 #include "timer.hpp" 12 12 #include "cxios.hpp" 13 #include "server.hpp" 13 14 14 15 namespace xios … … 89 90 list<int> sizes = event.getSizes(); 90 91 91 // We force the getBuffers call to be non-blocking on theservers92 // We force the getBuffers call to be non-blocking on classical servers 92 93 list<CBufferOut*> buffList; 93 // bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer);94 bool couldBuffer = getBuffers(ranks, sizes, buffList, false);94 bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 95 //bool couldBuffer = getBuffers(ranks, sizes, buffList, false ); 95 96 96 97 if (couldBuffer) … … 212 213 { 213 214 checkBuffers(); 214 215 // WHY DO WE PUT HERE SERVER INTO LISTENING LOOP AT ALL???? 216 // context->server->listen(); 217 // for (int i = 0; i < context->serverPrimServer.size(); ++i) 218 // context->serverPrimServer[i]->listen(); 215 if (CServer::serverLevel == 0) 216 context->server->listen(); 217 218 else if (CServer::serverLevel == 1) 219 { 220 context->server->listen(); 221 for (int i = 0; i < context->serverPrimServer.size(); ++i) 222 context->serverPrimServer[i]->listen(); 223 } 224 225 else if (CServer::serverLevel == 2) 226 context->server->listen(); 227 219 228 } 220 229 } while (!areBuffersFree && !nonBlocking); 230 221 231 CTimer::get("Blocking time").suspend(); 222 232 … … 256 266 map<int,CClientBuffer*>::iterator itBuff; 257 267 bool pending = false; 258 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) pending |= itBuff->second->checkBuffer(); 268 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 269 pending |= itBuff->second->checkBuffer(); 259 270 return pending; 260 271 } … … 274 285 \return state of buffers, pending(true), ready(false) 275 286 */ 276 // bool CContextClient::checkBuffers(list<int>& ranks)277 287 bool CContextClient::checkBuffers(list<int>& ranks) 278 288 { … … 358 368 359 369 /*! 360 Finalize context client and do some reports 361 */ 362 // void CContextClient::finalize(void) 363 void CContextClient::finalize() 370 * Finalize context client and do some reports. Function is non-blocking. 371 */ 372 void CContextClient::finalize(void) 364 373 { 365 374 map<int,CClientBuffer*>::iterator itBuff; … … 386 395 387 396 CTimer::get("Blocking time").resume(); 388 while (!stop)397 // while (!stop) 389 398 { 390 399 checkBuffers(); … … 393 402 394 403 stop = true; 395 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest();404 // for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 396 405 } 397 406 CTimer::get("Blocking time").suspend(); … … 409 418 report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 410 419 411 releaseBuffers();420 //releaseBuffers(); // moved to CContext::finalize() 412 421 } 422 423 /*! 424 */ 425 bool CContextClient::havePendingRequests(void) 426 { 427 bool pending = false; 428 map<int,CClientBuffer*>::iterator itBuff; 429 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 430 pending |= itBuff->second->hasPendingRequest(); 431 return pending; 432 } 433 434 413 435 } -
XIOS/dev/dev_olga/src/context_client.hpp
r1077 r1130 40 40 bool checkBuffers(void); 41 41 void releaseBuffers(void); 42 bool havePendingRequests(void); 42 43 43 44 bool isServerLeader(void) const; … … 71 72 72 73 map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers 73 // map<int,CClientBuffer*> buffersPrim; //!< Buffers for connection to secondary servers74 74 75 75 private: -
XIOS/dev/dev_olga/src/context_server.cpp
r1071 r1130 22 22 namespace xios 23 23 { 24 StdSize CContextServer::totalBuf_ = 0;25 24 26 25 CContextServer::CContextServer(CContext* parent, MPI_Comm intraComm_,MPI_Comm interComm_) … … 41 40 finished=false; 42 41 boost::hash<string> hashString; 43 hashId=hashString(context->getId()); 42 if (CServer::serverLevel == 1) 43 hashId=hashString(context->getId() + boost::lexical_cast<string>(context->clientPrimServer.size())); 44 else 45 hashId=hashString(context->getId()); 44 46 } 45 47 … … 169 171 map<size_t,CEventServer*>::iterator it; 170 172 CEventServer* event; 171 boost::hash<string> hashString;172 size_t hashId=hashString(context->getId());173 173 174 174 it=events.find(currentTimeLine); … … 189 189 // The best way to properly solve this problem will be to use the event scheduler also in attached mode 190 190 // for now just set up a MPI barrier 191 // if (!CServer::eventScheduler) MPI_Barrier(intraComm) ;191 if (!CServer::eventScheduler && CXios::isServer) MPI_Barrier(intraComm) ; 192 192 193 193 CTimer::get("Process events").resume(); … … 218 218 int rank; 219 219 list<CEventServer::SSubEvent>::iterator it; 220 // CContext::setCurrent(context->getId());221 220 StdString ctxId = context->getId(); 222 221 CContext::setCurrent(ctxId); 222 StdSize totalBuf = 0; 223 223 224 224 if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE) 225 225 { 226 226 finished=true; 227 // info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl; 227 // info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl; // moved to CContext::finalize() 228 228 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 229 229 iteMap = mapBufferSize_.end(), itMap; … … 231 231 { 232 232 rank = itMap->first; 233 //report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl234 //<< " +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;235 totalBuf _+= itMap->second;233 report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl 234 << " +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl; 235 totalBuf += itMap->second; 236 236 } 237 237 context->finalize(); 238 239 // report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl; // moved to CContext::finalize() 238 report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl; 240 239 } 241 240 else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event); … … 261 260 } 262 261 263 size_t CContextServer::getTotalBuf(void)264 {265 return totalBuf_;266 }267 268 262 } -
XIOS/dev/dev_olga/src/context_server.hpp
r1054 r1130 47 47 size_t hashId ; 48 48 49 static size_t getTotalBuf(void);50 51 49 ~CContextServer() ; 52 50 53 51 private: 54 52 std::map<int, StdSize> mapBufferSize_; 55 static size_t totalBuf_ ; /*!< Total memory allocated by servers per context.*/56 53 57 54 } ; -
XIOS/dev/dev_olga/src/cxios.cpp
r1054 r1130 25 25 bool CXios::isClient ; 26 26 bool CXios::isServer ; 27 // int CXios::serverLevel = 0 ;28 27 MPI_Comm CXios::globalComm ; 29 28 bool CXios::usingOasis ; … … 143 142 CServer::initialize(); 144 143 isServer = true; 145 if (CServer::serverLevel == 1) 146 isClient = true; 147 else 148 isClient = false; 144 isClient = false; 149 145 150 146 if (CServer::getRank()==0) globalRegistry = new CRegistry(CServer::intraComm) ; -
XIOS/dev/dev_olga/src/io/nc4_data_output.cpp
r1129 r1130 2457 2457 msg.append(context->getId()); msg.append("\n"); 2458 2458 msg.append(e.what()); 2459 2459 ERROR("CNc4DataOutput::writeFieldData_ (CField* field)", << msg); 2460 2460 } 2461 2461 } -
XIOS/dev/dev_olga/src/node/context.cpp
r1129 r1130 26 26 : CObjectTemplate<CContext>(), CContextAttributes() 27 27 , calendar(), hasClient(false), hasServer(false) 28 , isPostProcessed(false) , finalized(false)28 , isPostProcessed(false)//, finalized(false) 29 29 , idServer_(), client(0), server(0) 30 , allProcessed(false) 30 , allProcessed(false), countChildCtx_(0) 31 31 { /* Ne rien faire de plus */ } 32 32 … … 34 34 : CObjectTemplate<CContext>(id), CContextAttributes() 35 35 , calendar(), hasClient(false), hasServer(false) 36 , isPostProcessed(false) , finalized(false)36 , isPostProcessed(false)//, finalized(false) 37 37 , idServer_(), client(0), server(0) 38 , allProcessed(false) 38 , allProcessed(false), countChildCtx_(0) 39 39 { /* Ne rien faire de plus */ } 40 40 … … 300 300 std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize); 301 301 302 303 302 std::map<int, StdSize>::iterator it, ite = dataBufferSize.end(); 304 303 for (it = dataBufferSize.begin(); it != ite; ++it) … … 351 350 hasServer=true; 352 351 server = new CContextServer(this,intraComm,interComm); 353 // client = new CContextClient(this,intraComm,interComm, cxtClient);354 352 355 353 registryIn=new CRegistry(intraComm); … … 373 371 comms.push_back(interCommClient); 374 372 } 375 client = new CContextClient(this,intraCommClient,interCommClient); 376 373 client = new CContextClient(this,intraCommClient,interCommClient,cxtClient); 377 374 } 378 375 379 376 //! Try to send the buffers and receive possible answers 380 bool CContext::checkBuffersAndListen(void) 381 { 382 if (CServer::serverLevel == 0) 383 { 384 client->checkBuffers(); 385 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 386 if (hasTmpBufferedEvent) 387 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 388 // Don't process events if there is a temporarily buffered event 389 return server->eventLoop(!hasTmpBufferedEvent); 390 } 391 392 else if (CServer::serverLevel == 1) 393 { 394 client->checkBuffers(); 395 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 396 if (hasTmpBufferedEvent) 397 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 398 bool serverFinished = server->eventLoop(!hasTmpBufferedEvent); 399 400 bool serverPrimFinished = true; 401 for (int i = 0; i < clientPrimServer.size(); ++i) 402 { 403 clientPrimServer[i]->checkBuffers(); 404 bool hasTmpBufferedEventPrim = clientPrimServer[i]->hasTemporarilyBufferedEvent(); 405 if (hasTmpBufferedEventPrim) 406 hasTmpBufferedEventPrim = !clientPrimServer[i]->sendTemporarilyBufferedEvent(); 407 // serverPrimFinished *= serverPrimServer[i]->eventLoop(!hasTmpBufferedEventPrim); 408 serverPrimFinished *= serverPrimServer[i]->eventLoop(); 409 } 410 return ( serverFinished && serverPrimFinished); 411 } 412 413 else if (CServer::serverLevel == 2) 414 { 415 client->checkBuffers(); 416 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 417 // if (hasTmpBufferedEvent) 418 // hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 419 // return server->eventLoop(!hasTmpBufferedEvent); 420 return server->eventLoop(); 377 bool CContext::checkBuffersAndListen(void) 378 { 379 bool clientReady, serverFinished; 380 381 // Only classical servers are non-blocking 382 if (CServer::serverLevel == 0) 383 { 384 client->checkBuffers(); 385 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 386 if (hasTmpBufferedEvent) 387 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 388 // Don't process events if there is a temporarily buffered event 389 return server->eventLoop(!hasTmpBufferedEvent); 390 } 391 else if (CServer::serverLevel == 1) 392 { 393 client->checkBuffers(); 394 bool serverFinished = server->eventLoop(); 395 bool serverPrimFinished = true; 396 for (int i = 0; i < clientPrimServer.size(); ++i) 397 { 398 clientPrimServer[i]->checkBuffers(); 399 serverPrimFinished *= serverPrimServer[i]->eventLoop(); 421 400 } 422 } 401 return ( serverFinished && serverPrimFinished); 402 } 403 404 else if (CServer::serverLevel == 2) 405 { 406 client->checkBuffers(); 407 return server->eventLoop(); 408 } 409 } 423 410 424 411 //! Terminate a context 425 412 void CContext::finalize(void) 426 413 { 427 if (!finalized) 428 { 429 finalized = true; 430 414 // Send registry upon calling the function the first time 415 if (countChildCtx_ == 0) 431 416 if (hasClient) sendRegistry() ; 432 417 433 if ((hasClient) && (hasServer)) 418 // Client: 419 // (1) blocking send context finalize to its server 420 // (2) blocking receive context finalize from its server 421 if (CXios::isClient) 422 { 423 // Make sure that client (model) enters the loop only once 424 if (countChildCtx_ < 1) 434 425 { 426 ++countChildCtx_; 427 428 client->finalize(); 429 while (client->havePendingRequests()) 430 client->checkBuffers(); 431 432 while (!server->hasFinished()) 433 server->eventLoop(); 434 } 435 } 436 // Server: non-blocking send context finalize 437 else if (CXios::isServer) 438 { 439 // First context finalize message received from a model => send context finalize to its child contexts (if any) 440 if (countChildCtx_ == 0) 435 441 for (int i = 0; i < clientPrimServer.size(); ++i) 436 442 clientPrimServer[i]->finalize(); 437 443 438 for (int i = 0; i < serverPrimServer.size(); ++i) 439 { 440 while (!serverPrimServer[i]->hasFinished()) 441 { 442 serverPrimServer[i]->eventLoop(); 443 CServer::eventScheduler->checkEvent() ; 444 } 445 } 446 } 447 client->finalize(); 448 while (!server->hasFinished()) 449 { 450 server->eventLoop(); 451 } 452 453 info(20)<<"Server Side context <"<<getId()<<"> finalized"<<endl; 454 report(0)<< " Memory report : Context <"<<getId()<<"> : server side : total memory used for buffers "<<CContextServer::getTotalBuf()<<" bytes"<<endl; 455 456 // if (hasServer) 457 if (hasServer && !hasClient) 458 { 459 closeAllFile(); 460 registryOut->hierarchicalGatherRegistry() ; 461 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 462 } 463 } 464 } 465 466 //! Free internally allocated communicators 467 void CContext::freeComms(void) 468 { 444 // (Last) context finalized message received => send context finalize to its parent context 445 if (countChildCtx_ == clientPrimServer.size()) 446 client->finalize(); 447 448 ++countChildCtx_; 449 } 450 451 // If in mode attache call postFinalize 452 if (CXios::isServer && CXios::isClient) 453 postFinalize(); 454 } 455 456 /*! 457 * \fn void CContext::postFinalize(void) 458 * Close files, gather registries, , and make deallocations. 459 * Function is called when a context is finalized (it has nothing to receive and nothing to send). 460 */ 461 void CContext::postFinalize(void) 462 { 463 info(20)<<"Context <"<<getId()<<"> is finalized."<<endl; 464 465 // if (hasServer && !hasClient) 466 { 467 closeAllFile(); 468 registryOut->hierarchicalGatherRegistry() ; 469 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 470 } 471 472 //! Free internally allocated communicators 469 473 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 470 474 MPI_Comm_free(&(*it)); 471 475 comms.clear(); 476 477 //! Deallocate client buffers 478 client->releaseBuffers(); 479 for (int i = 0; i < clientPrimServer.size(); ++i) 480 clientPrimServer[i]->releaseBuffers(); 481 } 482 483 //! Free internally allocated communicators 484 void CContext::freeComms(void) 485 { 486 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 487 MPI_Comm_free(&(*it)); 488 comms.clear(); 489 } 490 491 //! Deallocate buffers allocated by clientContexts 492 void CContext::releaseClientBuffers(void) 493 { 494 client->releaseBuffers(); 495 for (int i = 0; i < clientPrimServer.size(); ++i) 496 clientPrimServer[i]->releaseBuffers(); 472 497 } 473 498 … … 509 534 510 535 // After that, send all grid (if any) 511 sendRefGrid(); 536 sendRefGrid(); 512 537 513 538 // We have a xml tree on the server side and now, it should be also processed 514 539 sendPostProcessing(); 515 516 540 sendGridEnabledFields(); 517 541 } 518 519 542 allProcessed = true; 520 543 } … … 555 578 556 579 void CContext::recvPostProcessingGlobalAttributes(CBufferIn& buffer) 557 { 580 { 581 // CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar(); 558 582 postProcessingGlobalAttributes(); 559 583 } … … 572 596 postProcessingGlobalAttributes(); 573 597 574 if (hasClient) sendPostProcessingGlobalAttributes(); 575 598 if (hasClient) sendPostProcessingGlobalAttributes(); 599 600 // There are some processings that should be done after all of above. For example: check mask or index 576 601 this->buildFilterGraphOfEnabledFields(); 577 602 578 if (hasClient && !hasServer)603 if (hasClient && !hasServer) 579 604 { 580 buildFilterGraphOfFieldsWithReadAccess(); 605 buildFilterGraphOfFieldsWithReadAccess(); 581 606 } 582 607 … … 622 647 { 623 648 this->enabledFiles[i]->solveOnlyRefOfEnabledFields(false); 624 // this->enabledFiles[i]->solveAllReferenceEnabledField(false);625 649 } 626 650 … … 649 673 } 650 674 651 652 653 675 void CContext::solveOnlyRefOfEnabledFields(bool sendToServer) 654 676 { … … 663 685 this->enabledFiles[i]->generateNewTransformationGridDest(); 664 686 } 687 665 688 } 666 689 … … 857 880 } 858 881 859 // // Only send close definition from process having hasClient860 // void CContext::sendCloseDefinitionToServer(void)861 // {862 // CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);863 // }864 865 882 //! Client side: Send a message to server to make it close 866 883 void CContext::sendCloseDefinition(void) 867 884 { 868 885 // Use correct context client to send message 869 // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1;870 886 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 871 887 for (int i = 0; i < nbSrvPools; ++i) … … 902 918 { 903 919 // Use correct context client to send message 904 // CContextClient* contextClientTmp = (0 != clientPrimServer) ? clientPrimServer : client;905 // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1;906 920 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 907 921 for (int i = 0; i < nbSrvPools; ++i) … … 1462 1476 void CContext::updateCalendar(int step) 1463 1477 { 1464 info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;1478 info(50) <<"Context "<< this->getId() <<" updateCalendar : before : " << calendar->getCurrentDate() << endl; 1465 1479 calendar->update(step); 1466 info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;1480 info(50) <<"Context "<< this->getId() << " updateCalendar : after : " << calendar->getCurrentDate() << endl; 1467 1481 1468 1482 if (hasClient) … … 1569 1583 } 1570 1584 1585 /*! 1586 * \fn bool CContext::isFinalized(void) 1587 * Context is finalized if: 1588 * (1) it has received all context finalize events 1589 * (2) it has no pending events to send. 1590 */ 1571 1591 bool CContext::isFinalized(void) 1572 1592 { 1573 return finalized; 1593 if (countChildCtx_==clientPrimServer.size()+1) 1594 { 1595 bool buffersReleased = !client->havePendingRequests(); 1596 for (int i = 0; i < clientPrimServer.size(); ++i) 1597 buffersReleased *= !clientPrimServer[i]->havePendingRequests(); 1598 return buffersReleased; 1599 } 1600 else 1601 return false; 1574 1602 } 1575 1603 -
XIOS/dev/dev_olga/src/node/context.hpp
r1129 r1130 99 99 // Finalize a context 100 100 void finalize(void); 101 void postFinalize(void); 102 bool isFinalized(void); 103 101 104 void closeDefinition(void); 102 bool isFinalized(void);103 105 104 106 // Some functions to process context … … 167 169 void recvRegistry(CBufferIn& buffer) ; //!< registry is received by the servers 168 170 169 void freeComms(void); //!< Free internally allcoated communicators 171 void freeComms(void); //!< Free internally allcoated communicators 172 void releaseClientBuffers(void); //! Deallocate buffers allocated by clientContexts 170 173 171 174 // dispatch event … … 233 236 bool hasServer; 234 237 235 CContextServer* server; //!< Concrete context server236 CContextClient* client; //!< Concrete contex client238 CContextServer* server; //!< Concrete context server 239 CContextClient* client; //!< Concrete contex client 237 240 std::vector<CContextServer*> serverPrimServer; 238 241 std::vector<CContextClient*> clientPrimServer; 239 242 240 CRegistry* registryIn ; //!< input registry which is read from file241 CRegistry* registryOut ; //!< output registry which will be wrote onfile at the finalize243 CRegistry* registryIn ; //!< input registry which is read from file 244 CRegistry* registryOut ; //!< output registry which will be written into file at the finalize 242 245 243 246 private: 244 247 bool isPostProcessed; 245 248 bool allProcessed; 246 bool finalized; 249 // bool finalized; 250 int countChildCtx_; //!< Counter of child contexts (for now it is the number of secondary server pools) 247 251 StdString idServer_; 248 252 CGarbageCollector garbageCollector; -
XIOS/dev/dev_olga/src/node/domain.cpp
r1129 r1130 3051 3051 if ((0 <= indexI) && (0 <= indexJ)) 3052 3052 { 3053 data_i_index(nbCompressedData) = (1 == data_dim) ? ind : i _index(ind) - i_index(0);3054 data_j_index(nbCompressedData) = (1 == data_dim) ? 0 : j_index(ind) - j_index(0);3053 data_i_index(nbCompressedData) = (1 == data_dim) ? ind : ind % ni; 3054 data_j_index(nbCompressedData) = (1 == data_dim) ? 0 : ind / ni; 3055 3055 ++nbCompressedData; 3056 3056 } -
XIOS/dev/dev_olga/src/node/file.cpp
r1129 r1130 866 866 { 867 867 sendAddItem(id, EVENT_ID_ADD_FIELD); 868 // CContext* context = CContext::getCurrent();869 870 // if (! context->hasServer )871 // {872 // CContextClient* client = context->client;873 874 // CEventClient event(this->getType(),EVENT_ID_ADD_FIELD);875 // if (client->isServerLeader())876 // {877 // CMessage msg;878 // msg << this->getId();879 // msg << id;880 // const std::list<int>& ranks = client->getRanksServerLeader();881 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)882 // event.push(*itRank,1,msg);883 // client->sendEvent(event);884 // }885 // else client->sendEvent(event);886 // }887 888 868 } 889 869 … … 891 871 { 892 872 sendAddItem(id, EVENT_ID_ADD_FIELD, client); 893 // CContext* context = CContext::getCurrent();894 895 // if (! context->hasServer )896 // {897 // CContextClient* client = context->client;898 899 // CEventClient event(this->getType(),EVENT_ID_ADD_FIELD);900 // if (client->isServerLeader())901 // {902 // CMessage msg;903 // msg << this->getId();904 // msg << id;905 // const std::list<int>& ranks = client->getRanksServerLeader();906 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)907 // event.push(*itRank,1,msg);908 // client->sendEvent(event);909 // }910 // else client->sendEvent(event);911 // }912 913 873 } 914 874 … … 920 880 { 921 881 sendAddItem(id, (int)EVENT_ID_ADD_FIELD_GROUP); 922 // CContext* context = CContext::getCurrent();923 // if (! context->hasServer )924 // {925 // CContextClient* client = context->client;926 927 // CEventClient event(this->getType(),EVENT_ID_ADD_FIELD_GROUP);928 // if (client->isServerLeader())929 // {930 // CMessage msg;931 // msg << this->getId();932 // msg << id;933 // const std::list<int>& ranks = client->getRanksServerLeader();934 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)935 // event.push(*itRank,1,msg);936 // client->sendEvent(event);937 // }938 // else client->sendEvent(event);939 // }940 941 882 } 942 883 … … 1030 971 { 1031 972 sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE_GROUP); 1032 // CContext* context = CContext::getCurrent();1033 // if (! context->hasServer )1034 // {1035 // CContextClient* client = context->client;1036 1037 // CEventClient event(this->getType(),EVENT_ID_ADD_VARIABLE_GROUP);1038 // if (client->isServerLeader())1039 // {1040 // CMessage msg;1041 // msg << this->getId();1042 // msg << id;1043 // const std::list<int>& ranks = client->getRanksServerLeader();1044 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)1045 // event.push(*itRank,1,msg);1046 // client->sendEvent(event);1047 // }1048 // else client->sendEvent(event);1049 // }1050 1051 973 } 1052 974 … … 1059 981 { 1060 982 sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE); 1061 // CContext* context = CContext::getCurrent();1062 1063 // if (! context->hasServer )1064 // {1065 // CContextClient* client = context->client;1066 1067 // CEventClient event(this->getType(),EVENT_ID_ADD_VARIABLE);1068 // if (client->isServerLeader())1069 // {1070 // CMessage msg;1071 // msg << this->getId();1072 // msg << id;1073 // const std::list<int>& ranks = client->getRanksServerLeader();1074 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)1075 // event.push(*itRank,1,msg);1076 // client->sendEvent(event);1077 // }1078 // else client->sendEvent(event);1079 // }1080 1081 983 } 1082 984 1083 985 void CFile::sendAddVariable(const string& id, CContextClient* client) 1084 986 { 1085 sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE); 1086 // CContext* context = CContext::getCurrent(); 1087 1088 // if (! context->hasServer ) 1089 // { 1090 // CContextClient* client = context->client; 1091 1092 // CEventClient event(this->getType(),EVENT_ID_ADD_VARIABLE); 1093 // if (client->isServerLeader()) 1094 // { 1095 // CMessage msg; 1096 // msg << this->getId(); 1097 // msg << id; 1098 // const std::list<int>& ranks = client->getRanksServerLeader(); 1099 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1100 // event.push(*itRank,1,msg); 1101 // client->sendEvent(event); 1102 // } 1103 // else client->sendEvent(event); 1104 // } 1105 987 sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE, client); 1106 988 } 1107 989 … … 1153 1035 addVariableGroup(id); 1154 1036 } 1155 1156 /*!1157 \brief Sending all active (enabled) fields from client to server.1158 Each field is identified uniquely by its string identity. Not only should we1159 send the id to server but also we need to send ids of reference domain and reference axis.1160 With these two id, it's easier to make reference to grid where all data should be written.1161 Remark: This function must be called AFTER all active (enabled) files have been created on the server side1162 */1163 // void CFile::sendEnabledFields()1164 // {1165 // size_t size = this->enabledFields.size();1166 // for (size_t i = 0; i < size; ++i)1167 // {1168 // CField* field = this->enabledFields[i];1169 // this->sendAddField(field->getId());1170 // field->sendAllAttributesToServer();1171 // field->sendAddAllVariables();1172 // }1173 // }1174 1037 1175 1038 /*! -
XIOS/dev/dev_olga/src/server.cpp
r1077 r1130 67 67 68 68 boost::hash<string> hashString ; 69 // unsigned long hashServer1 = hashString(CXios::xiosCodeIdPrm);70 // unsigned long hashServer2 = hashString(CXios::xiosCodeIdSnd);71 // unsigned long hashServer = (CXios::serverLevel < 2) ? hashServer1 : hashServer2;72 69 unsigned long hashServer = hashString(CXios::xiosCodeId); 73 70 … … 193 190 else 194 191 { 195 // int rank ,size;196 int size;192 int size, rank; 193 int myColor; 197 194 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 198 195 … … 200 197 MPI_Comm localComm; 201 198 oasis_get_localcomm(localComm); 202 MPI_Comm_dup(localComm, &intraComm); 203 199 200 // Create server intraComm 201 if (!CXios::usingServer2) 202 MPI_Comm_dup(localComm, &intraComm); 203 else 204 { 205 MPI_Comm_rank(localComm,&rank) ; 206 MPI_Comm_size(localComm,&serverSize_) ; 207 nbPools = serverSize_ * CXios::ratioServer2 / 100; 208 if ( rank < (serverSize_ - nbPools) ) 209 { 210 serverLevel = 1; 211 myColor = 0; 212 } 213 else 214 { 215 serverLevel = 2; 216 poolId = rank - serverSize_ + nbPools; 217 myColor = rank; 218 } 219 MPI_Comm_split(localComm, myColor, rank, &intraComm) ; 220 221 } 204 222 MPI_Comm_rank(intraComm,&rank_) ; 205 223 MPI_Comm_size(intraComm,&size) ; 224 206 225 string codesId=CXios::getin<string>("oasis_codes_id") ; 207 226 … … 217 236 { 218 237 oasis_get_intercomm(newComm,*it) ; 219 if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 220 MPI_Comm_remote_size(newComm,&size); 221 // interComm.push_back(newComm) ; 222 interCommLeft.push_back(newComm) ; 238 // interComm.push_back(newComm) ; 239 if ( !CXios::usingServer2) 240 interCommLeft.push_back(newComm) ; 241 else 242 { 243 if (serverLevel == 1) 244 { 245 info(50)<<"intercommCreate::server "<<rank_<<" intraCommSize : "<<size 246 <<" intraCommRank :"<<rank_<<" clientLeader "<< rank<<endl ; 247 MPI_Intercomm_create(intraComm, 0, localComm, rank, 0, &newComm) ; 248 interCommRight.push_back(newComm) ; 249 250 } 251 else if (serverLevel == 2) 252 { 253 info(50)<<"intercommCreate::server "<<rank_<<" intraCommSize : "<<size 254 <<" intraCommRank :"<<rank_<<" clientLeader "<< 0<<endl ; 255 MPI_Intercomm_create(intraComm, 0, localComm, 0, 0, &newComm) ; 256 interCommLeft.push_back(newComm) ; 257 258 } 259 260 } 261 // if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 262 // MPI_Comm_remote_size(newComm,&size); 263 // Send serverLeader to client 264 if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,interCommLeft.back()) ; 223 265 } 224 266 oasis_enddef() ; … … 286 328 287 329 contextEventLoop() ; 288 // if (finished && contextList.empty()) stop=true ;289 330 if (finished && contextList.empty()) stop=true ; 290 331 eventScheduler->checkEvent() ; … … 510 551 contextInterComms.push_back(inter); 511 552 context->initServer(intraComm, contextInterComms.back()); 512 // context->initServer(intraComm, interCommLeft.front());513 553 } 514 554 … … 535 575 contextIntraComms.push_back(inter); 536 576 context->initClient(contextIntraComms.back(), contextInterComms.back()) ; 537 // context->initClient(intraComm, contextPrimInterComms.back()) ;538 // context->initClient(intraComm, *it) ;539 577 delete [] buff ; 540 578 } … … 545 583 void CServer::contextEventLoop(void) 546 584 { 547 bool finished ;585 bool isFinalized ; 548 586 549 587 map<string,CContext*>::iterator it ; … … 551 589 for(it=contextList.begin();it!=contextList.end();it++) 552 590 { 553 finished=it->second->isFinalized();554 if ( finished)555 { 556 it->second-> freeComms(); // deallocate internally allocated context communicators591 isFinalized=it->second->isFinalized(); 592 if (isFinalized) 593 { 594 it->second->postFinalize(); 557 595 contextList.erase(it) ; 558 596 break ; 559 597 } 560 598 else 561 finished=it->second->checkBuffersAndListen(); 599 { 600 isFinalized=it->second->checkBuffersAndListen(); 601 } 562 602 } 563 603 }
Note: See TracChangeset
for help on using the changeset viewer.