- Timestamp:
- 10/18/19 15:40:35 (5 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_SERVICES/src
- Files:
-
- 20 added
- 25 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_SERVICES/src/client.cpp
r1756 r1761 12 12 #include "buffer_client.hpp" 13 13 #include "string_tools.hpp" 14 #include "ressources_manager.hpp" 15 #include "services_manager.hpp" 16 #include <functional> 17 #include <cstdio> 18 14 19 15 20 namespace xios 16 21 { 17 22 23 const double serverPublishDefaultTimeout=10; 24 18 25 MPI_Comm CClient::intraComm ; 19 26 MPI_Comm CClient::interComm ; 27 MPI_Comm CClient::clientsComm_ ; 28 20 29 std::list<MPI_Comm> CClient::contextInterComms; 21 30 int CClient::serverLeader ; … … 24 33 StdOFStream CClient::m_infoStream; 25 34 StdOFStream CClient::m_errorStream; 35 CPoolRessource* CClient::poolRessource_=nullptr ; 36 26 37 MPI_Comm& CClient::getInterComm(void) { return (interComm); } 27 38 … … 35 46 */ 36 47 48 void CClient::initRessources(void) 49 { 50 51 /* 52 int commRank; 53 MPI_Comm_rank(CXios::globalComm,&commRank) ; 54 if (commRank==0) 55 { 56 ressources.createPool("ioserver1",ressources.getRessourcesSize()/2) ; 57 } 58 else if (commRank==1) 59 { 60 ressources.createPool("ioserver2",ressources.getRessourcesSize()/2) ; 61 } 62 */ 63 } 64 37 65 void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm) 66 { 67 68 MPI_Comm clientComm ; 69 // initialize MPI if not initialized 70 int initialized ; 71 MPI_Initialized(&initialized) ; 72 if (initialized) is_MPI_Initialized=true ; 73 else is_MPI_Initialized=false ; 74 75 MPI_Comm globalComm=CXios::getGlobalComm() ; 76 77 ///////////////////////////////////////// 78 ///////////// PART 1 //////////////////// 79 ///////////////////////////////////////// 80 81 82 // localComm isn't given 83 if (localComm == MPI_COMM_NULL) 84 { 85 86 // don't use OASIS 87 if (!CXios::usingOasis) 88 { 89 90 if (!is_MPI_Initialized) 91 { 92 MPI_Init(NULL, NULL); 93 } 94 CTimer::get("XIOS").resume() ; 95 CTimer::get("XIOS init/finalize",false).resume() ; 96 97 // split the global communicator 98 // get hash from all model to attribute a unique color (int) and then split to get client communicator 99 // every mpi process of globalComm (MPI_COMM_WORLD) must participate 100 101 int commRank, commSize ; 102 MPI_Comm_rank(globalComm,&commRank) ; 103 MPI_Comm_size(globalComm,&commSize) ; 104 105 std::hash<string> hashString ; 106 size_t hashClient=hashString(codeId) ; 107 108 size_t* hashAll = new size_t[commSize] ; 109 MPI_Allgather(&hashClient,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; 110 111 int color=0 ; 112 set<size_t> listHash ; 113 for(int i=0 ; i<=commRank ; i++) 114 if (listHash.count(hashAll[i])==0) 115 { 116 listHash.insert(hashAll[i]) ; 117 color=color+1 ; 118 } 119 delete[] hashAll ; 120 121 MPI_Comm_split(globalComm, color, commRank, &clientComm) ; 122 } 123 else // using oasis to split communicator 124 { 125 if (!is_MPI_Initialized) oasis_init(codeId) ; 126 oasis_get_localcomm(clientComm) ; 127 } 128 } 129 else // localComm is given 130 { 131 MPI_Comm_dup(localComm,&clientComm) ; 132 } 133 134 135 ///////////////////////////////////////// 136 ///////////// PART 2 //////////////////// 137 ///////////////////////////////////////// 138 139 140 // Create the XIOS communicator for every process which is related 141 // to XIOS, as well on client side as on server side 142 143 MPI_Comm xiosGlobalComm ; 144 string strIds=CXios::getin<string>("clients_code_id","") ; 145 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 146 if (strIds.empty()) 147 { 148 // no code Ids given, suppose XIOS initialisation is global 149 int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; 150 MPI_Comm splitComm,interComm ; 151 MPI_Comm_rank(globalComm,&commGlobalRank) ; 152 MPI_Comm_split(globalComm, 0, commGlobalRank, &splitComm) ; 153 int splitCommSize, globalCommSize ; 154 155 MPI_Comm_size(splitComm,&splitCommSize) ; 156 MPI_Comm_size(globalComm,&globalCommSize) ; 157 if (splitCommSize==globalCommSize) // no server 158 { 159 MPI_Comm_dup(globalComm,&xiosGlobalComm) ; 160 CXios::setXiosComm(xiosGlobalComm) ; 161 } 162 else 163 { 164 MPI_Comm_rank(splitComm,&commRank) ; 165 if (commRank==0) clientLeader=commGlobalRank ; 166 else clientLeader=0 ; 167 serverLeader=0 ; 168 MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 169 MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 170 MPI_Intercomm_create(splitComm, 0, globalComm, serverRemoteLeader,1341,&interComm) ; 171 MPI_Intercomm_merge(interComm,true,&xiosGlobalComm) ; 172 CXios::setXiosComm(xiosGlobalComm) ; 173 } 174 } 175 else 176 { 177 178 xiosGlobalCommByFileExchange(clientComm, codeId) ; 179 180 } 181 182 int commRank ; 183 MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; 184 MPI_Comm_split(CXios::getXiosComm(),false,commRank, &clientsComm_) ; 185 186 // is using server or not ? 187 int xiosCommSize, clientsCommSize ; 188 MPI_Comm_size(CXios::getXiosComm(), &xiosCommSize) ; 189 MPI_Comm_size(clientsComm_, &clientsCommSize) ; 190 if (xiosCommSize==clientsCommSize) CXios::setUsingServer() ; 191 else CXios::setNotUsingServer() ; 192 193 194 CXios::setGlobalRegistry(new CRegistry(clientsComm_)) ; 195 ///////////////////////////////////////// 196 ///////////// PART 3 //////////////////// 197 ///////////////////////////////////////// 198 199 CXios::launchDaemonsManager(false) ; 200 poolRessource_ = new CPoolRessource(clientComm, codeId) ; 201 202 ///////////////////////////////////////// 203 ///////////// PART 4 //////////////////// 204 ///////////////////////////////////////// 205 206 // create the services 207 /* 208 int commRank ; 209 MPI_Comm_rank(clientComm,&commRank) ; 210 auto contextsManager=CXios::getContextsManager() ; 211 212 if (commRank==0) 213 { 214 contextsManager->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId) ; 215 } 216 217 MPI_Comm interComm ; 218 219 contextsManager->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId, clientComm, interComm) ; 220 */ 221 /* while (true) 222 { 223 224 } 225 */ 226 returnComm = clientComm ; 227 } 228 229 230 void CClient::xiosGlobalCommByFileExchange(MPI_Comm clientComm, const string& codeId) 231 { 232 233 MPI_Comm globalComm=CXios::getGlobalComm() ; 234 MPI_Comm xiosGlobalComm ; 235 236 string strIds=CXios::getin<string>("clients_code_id","") ; 237 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 238 239 int commRank, globalRank, clientRank, serverRank ; 240 MPI_Comm_rank(clientComm, &commRank) ; 241 MPI_Comm_rank(globalComm, &globalRank) ; 242 string clientFileName("__xios_publisher::"+codeId+"__to_remove__") ; 243 244 int error ; 245 246 if (commRank==0) // if root process publish name 247 { 248 std::ofstream ofs (clientFileName, std::ofstream::out); 249 ofs<<globalRank ; 250 ofs.close(); 251 252 // get server root rank 253 254 std::ifstream ifs ; 255 string fileName=("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ; 256 257 double timeout = CXios::getin<double>("server_puplish_timeout",serverPublishDefaultTimeout) ; 258 double time ; 259 260 do 261 { 262 CTimer::get("server_publish_timeout").resume() ; 263 ifs.clear() ; 264 ifs.open(fileName, std::ifstream::in) ; 265 CTimer::get("server_publish_timeout").suspend() ; 266 } while (ifs.fail() && CTimer::get("server_publish_timeout").getCumulatedTime()<timeout) ; 267 268 if (CTimer::get("server_publish_timeout").getCumulatedTime()>=timeout || ifs.fail()) 269 { 270 ifs.clear() ; 271 ifs.close() ; 272 ifs.clear() ; 273 error=true ; 274 } 275 else 276 { 277 ifs>>serverRank ; 278 ifs.close() ; 279 error=false ; 280 } 281 282 } 283 284 MPI_Bcast(&error,1,MPI_INT,0,clientComm) ; 285 286 if (error==false) // you have a server 287 { 288 MPI_Comm intraComm ; 289 MPI_Comm_dup(clientComm,&intraComm) ; 290 MPI_Comm interComm ; 291 292 int pos=0 ; 293 for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ; 294 295 bool high=true ; 296 for(int i=pos ; i<clientsCodeId.size(); i++) 297 { 298 MPI_Intercomm_create(intraComm, 0, globalComm, serverRank, 3141, &interComm); 299 MPI_Comm_free(&intraComm) ; 300 MPI_Intercomm_merge(interComm,high, &intraComm ) ; 301 high=false ; 302 } 303 xiosGlobalComm=intraComm ; 304 } 305 else // no server detected 306 { 307 vector<int> clientsRank(clientsCodeId.size()) ; 308 309 if (commRank==0) 310 { 311 for(int i=0;i<clientsRank.size();i++) 312 { 313 std::ifstream ifs ; 314 string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; 315 do 316 { 317 ifs.clear() ; 318 ifs.open(fileName, std::ifstream::in) ; 319 } while (ifs.fail()) ; 320 ifs>>clientsRank[i] ; 321 ifs.close() ; 322 } 323 } 324 325 int client ; 326 MPI_Comm intraComm ; 327 MPI_Comm_dup(clientComm,&intraComm) ; 328 MPI_Comm interComm ; 329 330 int pos=0 ; 331 for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ; 332 333 bool high=true ; 334 for(int i=pos+1 ; i<clientsCodeId.size(); i++) 335 { 336 if (codeId==clientsCodeId[0]) // first model play the server rule 337 { 338 MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); 339 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 340 } 341 else 342 { 343 MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[0], 3141, &interComm); 344 MPI_Intercomm_merge(interComm,high, &intraComm ) ; 345 high=false ; 346 } 347 } 348 xiosGlobalComm=intraComm ; 349 } 350 351 MPI_Barrier(xiosGlobalComm); 352 if (commRank==0) std::remove(clientFileName.c_str()) ; 353 MPI_Barrier(xiosGlobalComm); 354 355 CXios::setXiosComm(xiosGlobalComm) ; 356 357 } 358 359 void CClient::xiosGlobalCommByPublishing(MPI_Comm clientComm, const string& codeId) 360 { 361 362 // untested. need to be developped an a true MPI compliant library 363 364 /* 365 // try to discover other client/server 366 // do you have a xios server ? 367 char portName[MPI_MAX_PORT_NAME]; 368 int ierr ; 369 int commRank ; 370 MPI_Comm_rank(clientComm,&commRank) ; 371 372 MPI_Barrier(globalComm) ; 373 if (commRank==0) 374 { 375 376 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN ); 377 const char* serviceName=CXios::xiosCodeId.c_str() ; 378 ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 379 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL ); 380 } 381 ierr=MPI_SUCCESS ; 382 MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ; 383 384 if (ierr==MPI_SUCCESS) // you have a server 385 { 386 MPI_Comm intraComm=clientComm ; 387 MPI_Comm interComm ; 388 for(int i=0 ; i<clientsCodeId.size(); i++) 389 { 390 MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 391 MPI_Intercomm_merge(interComm, true, &intraComm ) ; 392 } 393 xiosGlobalComm=intraComm ; 394 } 395 else // you don't have any server 396 { 397 if (codeId==clientsCodeId[0]) // first code will publish his name 398 { 399 400 if (commRank==0) // if root process publish name 401 { 402 MPI_Open_port(MPI_INFO_NULL, portName); 403 MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 404 } 405 406 MPI_Comm intraComm=clientComm ; 407 MPI_Comm interComm ; 408 for(int i=0 ; i<clientsCodeId.size()-1; i++) 409 { 410 MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 411 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 412 } 413 } 414 else // other clients are connecting to the first one 415 { 416 if (commRank==0) 417 { 418 419 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN ); 420 ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 421 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL ); 422 } 423 424 MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ; 425 426 if (ierr==MPI_SUCCESS) // you can connect 427 { 428 MPI_Comm intraComm=clientComm ; 429 MPI_Comm interComm ; 430 for(int i=0 ; i<clientsCodeId.size()-1; i++) 431 { 432 MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 433 MPI_Intercomm_merge(interComm, true, &intraComm ) ; 434 } 435 xiosGlobalComm=intraComm ; 436 } 437 } 438 } 439 */ 440 } 441 442 void CClient::initialize_old(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm) 38 443 { 39 444 int initialized ; … … 42 447 else is_MPI_Initialized=false ; 43 448 int rank ; 44 449 450 CXios::launchRessourcesManager(false) ; 451 CXios::launchServicesManager( false) ; 452 CXios::launchContextsManager(false) ; 453 454 initRessources() ; 45 455 // don't use OASIS 46 456 if (!CXios::usingOasis) … … 160 570 } 161 571 572 573 574 void CClient::registerContext(const string& id, MPI_Comm contextComm) 575 { 576 int commRank, commSize ; 577 MPI_Comm_rank(contextComm,&commRank) ; 578 MPI_Comm_size(contextComm,&commSize) ; 579 580 getPoolRessource()->createService(contextComm, id, 0, CServicesManager::CLIENT, 1) ; 581 getPoolRessource()->createService(contextComm, CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ; 582 583 if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();} 584 585 if (commRank==0) CXios::getContextsManager()->createServerContext(getPoolRessource()->getId(), id, 0, id) ; 586 int type=CServicesManager::CLIENT ; 587 string name = CXios::getContextsManager()->getServerContextName(getPoolRessource()->getId(), id, 0, type, id) ; 588 while (!CXios::getContextsManager()->hasContext(name, contextComm) ) 589 { 590 CXios::getDaemonsManager()->eventLoop() ; 591 } 592 593 /* 594 595 CContext::setCurrent(id) ; 596 CContext* context=CContext::create(id); 597 598 // register the new client side context to the contexts manager 599 if (commRank==0) 600 { 601 MPI_Comm_rank(CXios::getXiosComm(),&commRank) ; 602 SRegisterContextInfo contextInfo ; 603 contextInfo.serviceType=CServicesManager::CLIENT ; 604 contextInfo.partitionId=0 ; 605 contextInfo.leader=commRank ; 606 contextInfo.size=commSize ; 607 CXios::getContextsManager()->registerContext(id, contextInfo) ; 608 } 609 context->initClient(contextComm) ; 610 */ 611 } 612 613 162 614 ///--------------------------------------------------------------- 163 615 /*! … … 168 620 * Function is only called by client. 169 621 */ 170 void CClient::registerContext (const string& id, MPI_Comm contextComm)622 void CClient::registerContext_old(const string& id, MPI_Comm contextComm) 171 623 { 172 624 CContext::setCurrent(id) ; … … 260 712 } 261 713 262 263 714 void CClient::finalize(void) 715 { 716 717 MPI_Barrier(clientsComm_) ; 718 int commRank ; 719 MPI_Comm_rank(clientsComm_, &commRank) ; 720 if (commRank==0) CXios::getRessourcesManager()->finalize() ; 721 722 auto globalRegistry=CXios::getGlobalRegistry() ; 723 globalRegistry->hierarchicalGatherRegistry() ; 724 725 if (commRank==0) 726 { 727 info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ; 728 globalRegistry->toFile("xios_registry.bin") ; 729 } 730 delete globalRegistry ; 731 732 CTimer::get("XIOS init/finalize",false).suspend() ; 733 CTimer::get("XIOS").suspend() ; 734 if (!is_MPI_Initialized) 735 { 736 if (CXios::usingOasis) oasis_finalize(); 737 else MPI_Finalize() ; 738 } 739 740 info(20) << "Client side context is finalized"<<endl ; 741 report(0) <<" Performance report : Whole time from XIOS init and finalize: "<< CTimer::get("XIOS init/finalize").getCumulatedTime()<<" s"<<endl ; 742 report(0) <<" Performance report : total time spent for XIOS : "<< CTimer::get("XIOS").getCumulatedTime()<<" s"<<endl ; 743 report(0)<< " Performance report : time spent for waiting free buffer : "<< CTimer::get("Blocking time").getCumulatedTime()<<" s"<<endl ; 744 report(0)<< " Performance report : Ratio : "<< CTimer::get("Blocking time").getCumulatedTime()/CTimer::get("XIOS init/finalize").getCumulatedTime()*100.<<" %"<<endl ; 745 report(0)<< " Performance report : This ratio must be close to zero. Otherwise it may be usefull to increase buffer size or numbers of server"<<endl ; 746 // report(0)<< " Memory report : Current buffer_size : "<<CXios::bufferSize<<endl ; 747 report(0)<< " Memory report : Minimum buffer size required : " << CClientBuffer::maxRequestSize << " bytes" << endl ; 748 report(0)<< " Memory report : increasing it by a factor will increase performance, depending of the volume of data wrote in file at each time step of the file"<<endl ; 749 report(100)<<CTimer::getAllCumulatedTime()<<endl ; 750 } 751 752 753 void CClient::finalize_old(void) 264 754 { 265 755 int rank ; … … 325 815 int size = 0; 326 816 int rank; 327 MPI_Comm_size(CXios::globalComm, &size); 817 MPI_Comm_size(CXios::getGlobalComm(), &size); 818 MPI_Comm_rank(CXios::getGlobalComm(),&rank); 328 819 while (size) 329 820 { … … 332 823 } 333 824 334 if (CXios::usingOasis) 335 { 336 MPI_Comm_rank(CXios::globalComm,&rank); 337 fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << rank << ext; 338 } 339 else 340 fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext; 341 825 fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << rank << ext; 342 826 343 827 fb->open(fileNameClient.str().c_str(), std::ios::out); -
XIOS/dev/dev_ym/XIOS_SERVICES/src/client.hpp
r1639 r1761 7 7 namespace xios 8 8 { 9 class CPoolRessource ; 9 10 class CClient 10 11 { 11 12 public: 12 13 static void initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm); 14 static void initialize_old(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm); 15 static void xiosGlobalCommByFileExchange(MPI_Comm clientComm, const string& codeId) ; 16 static void xiosGlobalCommByPublishing(MPI_Comm clientComm, const string& codeId) ; 13 17 static void finalize(void); 18 static void finalize_old(void); 14 19 static void registerContext(const string& id, MPI_Comm contextComm); 15 static void callOasisEnddef(void) ; 16 20 static void registerContext_old(const string& id, MPI_Comm contextComm); 21 static void callOasisEnddef(void) ; 22 static void initRessources(void) ; 23 17 24 static MPI_Comm intraComm; 18 25 static MPI_Comm interComm; … … 20 27 static int serverLeader; 21 28 static bool is_MPI_Initialized ; 29 static MPI_Comm clientsComm_ ; 22 30 23 31 static MPI_Comm& getInterComm(); … … 39 47 //! Close the error log file if it opens 40 48 static void closeErrorStream(); 49 static CPoolRessource* getPoolRessource(void) { return poolRessource_ ; } 41 50 42 51 protected: … … 46 55 47 56 static void openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb); 57 static CPoolRessource* poolRessource_ ; 48 58 }; 49 59 } -
XIOS/dev/dev_ym/XIOS_SERVICES/src/config/context_attribute.conf
r549 r1761 1 1 DECLARE_ATTRIBUTE(StdString, output_dir) 2 DECLARE_ATTRIBUTE(bool, attached_mode) -
XIOS/dev/dev_ym/XIOS_SERVICES/src/context_client.cpp
r1757 r1761 24 24 : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4) 25 25 { 26 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)27 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode28 26 29 27 context = parent; … … 35 33 int flag; 36 34 MPI_Comm_test_inter(interComm, &flag); 35 if (flag) isAttached_=false ; 36 else isAttached_=true ; 37 38 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 39 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 40 41 42 37 43 if (flag) MPI_Comm_remote_size(interComm, &serverSize); 38 44 else MPI_Comm_size(interComm, &serverSize); … … 151 157 152 158 checkBuffers(ranks); 153 154 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode155 {156 waitEvent(ranks);157 CContext::setCurrent(context->getId());158 }159 } 160 159 } 160 161 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 162 { 163 waitEvent(ranks); 164 CContext::setCurrent(context->getId()); 165 } 166 161 167 timeLine++; 162 168 } … … 168 174 */ 169 175 void CContextClient::waitEvent(list<int>& ranks) 176 { 177 while (checkBuffers(ranks)) 178 { 179 CXios::getDaemonsManager()->eventLoop() ; 180 } 181 182 MPI_Request req ; 183 MPI_Status status ; 184 185 MPI_Ibarrier(intraComm,&req) ; 186 int flag=false ; 187 188 do 189 { 190 CXios::getDaemonsManager()->eventLoop() ; 191 MPI_Test(&req,&flag,&status) ; 192 } while (!flag) ; 193 194 195 } 196 197 198 void CContextClient::waitEvent_old(list<int>& ranks) 170 199 { 171 200 parentServer->server->setPendingEvent(); … … 227 256 for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 228 257 checkBuffers(); 229 if (CServer::serverLevel == 0) context->server->listen(); 230 else if (CServer::serverLevel == 1) 258 259 context->server->listen(); 260 261 if (context->serverPrimServer.size()>0) 231 262 { 232 context->server->listen();233 263 for (int i = 0; i < context->serverPrimServer.size(); ++i) context->serverPrimServer[i]->listen(); 234 264 CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 235 265 } 236 237 else if (CServer::serverLevel == 2) context->server->listen();238 266 239 267 } … … 320 348 } 321 349 } 322 }323 350 */ 351 } 352 324 353 325 354 /*! … … 402 431 { 403 432 return !ranksServerLeader.empty(); 404 }405 406 /*!407 * Check if the attached mode is used.408 *409 * \return true if and only if attached mode is used410 */411 bool CContextClient::isAttachedModeEnabled() const412 {413 return (parentServer != 0);414 433 } 415 434 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/context_client.hpp
r1757 r1761 32 32 void sendEvent(CEventClient& event); 33 33 void waitEvent(list<int>& ranks); 34 void waitEvent_old(list<int>& ranks); 34 35 35 36 // Functions to set/get buffers … … 46 47 const std::list<int>& getRanksServerNotLeader(void) const; 47 48 48 bool isAttachedModeEnabled() const; 49 /*! 50 * Check if the attached mode is used. 51 * 52 * \return true if and only if attached mode is used 53 */ 54 bool isAttachedModeEnabled() const { return isAttached_ ; } 49 55 50 56 static void computeLeader(int clientRank, int clientSize, int serverSize, … … 103 109 104 110 std::vector<std::vector<MPI_Win> >windows ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2] 105 111 bool isAttached_ ; 106 112 107 113 }; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/context_server.cpp
r1757 r1761 16 16 #include "event_scheduler.hpp" 17 17 #include "server.hpp" 18 #include "servers_ressource.hpp" 19 #include "pool_ressource.hpp" 20 #include "services.hpp" 21 #include "contexts_manager.hpp" 22 18 23 #include <boost/functional/hash.hpp> 19 24 #include <random> 25 #include <chrono> 20 26 21 27 22 28 namespace xios 23 29 { 30 using namespace std ; 24 31 25 32 CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) … … 40 47 else MPI_Comm_size(interComm,&commSize); 41 48 42 49 50 SRegisterContextInfo contextInfo ; 51 CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ; 52 53 if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services 54 { 55 eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ; 56 } 57 58 43 59 currentTimeLine=1; 44 60 scheduled=false; 45 61 finished=false; 62 63 // generate unique hash for server 64 auto time=chrono::system_clock::now().time_since_epoch().count() ; 65 std::default_random_engine rd(time); // not reproducible from a run to another 66 std::uniform_int_distribution<size_t> dist; 67 hashId=dist(rd) ; 68 MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context 69 70 /* 46 71 boost::hash<string> hashString; 72 47 73 if (CServer::serverLevel == 1) 48 74 hashId=hashString(context->getId() + boost::lexical_cast<string>(context->clientPrimServer.size())); 49 75 else 50 76 hashId=hashString(context->getId()); 77 */ 51 78 52 79 if (!isAttachedModeEnabled()) … … 293 320 map<size_t,CEventServer*>::iterator it; 294 321 CEventServer* event; 322 323 if (context->isProcessingEvent()) return ; 295 324 296 325 it=events.find(currentTimeLine); … … 313 342 if (!CServer::eventScheduler && CXios::isServer) MPI_Barrier(intraComm) ; 314 343 344 context->setProcessingEvent() ; 315 345 CTimer::get("Process events").resume(); 316 346 dispatchEvent(*event); 317 347 CTimer::get("Process events").suspend(); 348 context->unsetProcessingEvent() ; 318 349 pendingEvent=false; 319 350 delete event; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/context_server.hpp
r1757 r1761 5 5 #include "buffer_server.hpp" 6 6 #include "mpi.hpp" 7 #include "event_scheduler.hpp" 7 8 8 9 namespace xios … … 64 65 std::map<int, StdSize> mapBufferSize_; 65 66 vector<MPI_Win> windows ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 66 67 CEventScheduler* eventScheduler_ ; 67 68 } ; 68 69 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/cxios.cpp
r1639 r1761 11 11 #include "memtrack.hpp" 12 12 #include "registry.hpp" 13 #include "ressources_manager.hpp" 14 #include "services_manager.hpp" 15 #include "servers_ressource.hpp" 13 16 14 17 namespace xios … … 20 23 string CXios::serverPrmFile="./xios_server1"; 21 24 string CXios::serverSndFile="./xios_server2"; 22 25 const string CXios::defaultPoolId="default_pool_id" ; 26 const string CXios::defaultServerId="default_server_id" ; 27 const string CXios::defaultGathererId="default_gatherer_id" ; 28 23 29 bool CXios::xiosStack = true; 24 30 bool CXios::systemStack = false; … … 26 32 bool CXios::isClient ; 27 33 bool CXios::isServer ; 34 28 35 MPI_Comm CXios::globalComm ; 36 MPI_Comm CXios::xiosComm ; 37 29 38 bool CXios::usingOasis ; 30 39 bool CXios::usingServer = false; … … 41 50 double CXios::recvFieldTimeout = 300.0; 42 51 bool CXios::checkEventSync=false ; 43 52 53 CDaemonsManager* CXios::daemonsManager_=nullptr ; 54 CRessourcesManager* CXios::ressourcesManager_=nullptr ; 55 CServicesManager* CXios::servicesManager_=nullptr ; 56 CContextsManager* CXios::contextsManager_=nullptr ; 57 44 58 //! Parse configuration file and create some objects from it 45 59 void CXios::initialize() … … 107 121 isClient = true; 108 122 123 //CClient::initialize(codeId,localComm,returnComm) ; 109 124 CClient::initialize(codeId,localComm,returnComm) ; 110 if (CClient::getRank()==0) globalRegistry = new CRegistry(returnComm) ; 111 125 112 126 // If there are no server processes then we are in attached mode 113 127 // and the clients are also servers … … 130 144 { 131 145 CClient::finalize() ; 132 if (CClient::getRank()==0) 133 { 134 info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ; 135 globalRegistry->toFile("xios_registry.bin") ; 136 delete globalRegistry ; 137 } 138 146 139 147 #ifdef XIOS_MEMTRACK 140 148 … … 172 180 173 181 // Initialize all aspects MPI 182 // CServer::initialize(); 183 174 184 CServer::initialize(); 175 if (CServer::getRank()==0 && CServer::serverLevel != 1) globalRegistry = new CRegistry(CServer::intraComm) ; 176 185 186 //if (CServer::getRank()==0 && CServer::serverLevel != 1) globalRegistry = new CRegistry(CServer::intraComm) ; 187 188 /* 177 189 if (printLogs2Files) 178 190 { … … 201 213 // Enter the loop to listen message from Client 202 214 CServer::eventLoop(); 203 215 */ 204 216 // Finalize 217 /* 205 218 if (CServer::serverLevel == 0) 206 219 { … … 271 284 delete globalRegistry; 272 285 } 286 */ 287 273 288 CServer::finalize(); 274 289 … … 305 320 usingServer = false; 306 321 } 322 323 void CXios::launchRessourcesManager(bool isXiosServer) 324 { 325 ressourcesManager_ = new CRessourcesManager(isXiosServer) ; 326 } 327 328 void CXios::launchServicesManager(bool isXiosServer) 329 { 330 servicesManager_ = new CServicesManager(isXiosServer) ; 331 } 332 333 void CXios::launchContextsManager(bool isXiosServer) 334 { 335 contextsManager_ = new CContextsManager(isXiosServer) ; 336 } 337 338 void CXios::launchDaemonsManager(bool isXiosServer) 339 { 340 daemonsManager_ = new CDaemonsManager(isXiosServer) ; 341 } 342 343 CPoolRessource* CXios::getPoolRessource(void) 344 { 345 if (isClient) return CClient::getPoolRessource() ; 346 else if (isServer) return CServer::getServersRessource()->getPoolRessource() ; 347 } 307 348 } 349 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/cxios.hpp
r1639 r1761 5 5 #include "mpi.hpp" 6 6 #include "registry.hpp" 7 #include "ressources_manager.hpp" 8 #include "services_manager.hpp" 9 #include "contexts_manager.hpp" 10 #include "daemons_manager.hpp" 7 11 8 12 namespace xios … … 41 45 42 46 static MPI_Comm globalComm ; //!< Global communicator 47 static MPI_Comm xiosComm ; //!< Global communicator 43 48 44 49 static bool printLogs2Files; //!< Printing out logs into files … … 56 61 static double recvFieldTimeout; //!< Time to wait for data before issuing an error when receiving a field 57 62 static bool checkEventSync; //!< For debuuging, check if event are coherent and synchrone on client side 63 64 static const string defaultPoolId ; 65 static const string defaultServerId ; 66 static const string defaultGathererId ; 67 68 69 static CRessourcesManager* ressourcesManager_ ; 70 static CServicesManager* servicesManager_ ; 71 static CContextsManager* contextsManager_ ; 72 static CDaemonsManager* daemonsManager_ ; 73 58 74 59 75 public: … … 63 79 //! Setting xios NOT to use server mode 64 80 static void setNotUsingServer(); 81 82 //! is using server mode 83 static bool isUsingServer() {return usingServer;} 65 84 66 85 //! Initialize server (if any) 67 86 static void initServer(); 87 88 static void launchServicesManager( bool isXiosServer) ; 89 static void launchContextsManager(bool isXiosServer) ; 90 static void launchDaemonsManager(bool isXiosServer) ; 91 static void launchRessourcesManager(bool isXiosServer) ; 92 93 static CRessourcesManager* getRessourcesManager(void) { return ressourcesManager_ ;} 94 static CServicesManager* getServicesManager(void) { return servicesManager_ ;} 95 static CContextsManager* getContextsManager(void) { return contextsManager_ ;} 96 static CDaemonsManager* getDaemonsManager(void) { return daemonsManager_ ;} 97 static CPoolRessource* getPoolRessource(void) ; 98 99 static MPI_Comm getGlobalComm(void) { return globalComm ;} 100 static MPI_Comm getXiosComm(void) { return xiosComm ;} 101 static void setXiosComm(MPI_Comm comm) { xiosComm=comm ;} 102 static CRegistry* getGlobalRegistry(void) { return globalRegistry ;} 103 static void setGlobalRegistry(CRegistry* registry) { globalRegistry=registry ;} 68 104 69 105 private: -
XIOS/dev/dev_ym/XIOS_SERVICES/src/filter/store_filter.cpp
r1752 r1761 45 45 packet = it->second; 46 46 else // if the packet is not available yet, check if it can be received 47 context->checkBuffersAndListen(); 47 //ym context->checkBuffersAndListen(); 48 context->eventLoop(); 48 49 49 50 timer.suspend(); -
XIOS/dev/dev_ym/XIOS_SERVICES/src/interface/c/iccalendar.cpp
r1753 r1761 16 16 xios::CContext* context = CContext::getCurrent(); 17 17 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 18 context->checkBuffersAndListen(); 18 context->eventLoop(); 19 //ym context->checkBuffersAndListen(); 20 19 21 context->updateCalendar(step); 20 22 context->sendUpdateCalendar(step); -
XIOS/dev/dev_ym/XIOS_SERVICES/src/interface/c/icdata.cpp
r1753 r1761 423 423 CContext* context = CContext::getCurrent(); 424 424 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 425 context->checkBuffersAndListen(); 425 context->eventLoop(); 426 //ym context->checkBuffersAndListen(); 426 427 CArray<double, 1> data(data_k8, shape(data_Xsize), neverDeleteData); 427 428 CField::get(fieldid_str)->setData(data); … … 442 443 CContext* context = CContext::getCurrent(); 443 444 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 444 context->checkBuffersAndListen(); 445 context->eventLoop(); 446 //ym context->checkBuffersAndListen(); 445 447 446 448 CArray<double, 1> data(data_k8, shape(data_Xsize), neverDeleteData); … … 463 465 CContext* context = CContext::getCurrent(); 464 466 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 465 context->checkBuffersAndListen(); 467 context->eventLoop(); 468 //ym context->checkBuffersAndListen(); 466 469 467 470 CArray<double, 2>data(data_k8, shape(data_Xsize, data_Ysize), neverDeleteData); … … 484 487 CContext* context = CContext::getCurrent(); 485 488 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 486 context->checkBuffersAndListen(); 489 context->eventLoop(); 490 //ym context->checkBuffersAndListen(); 487 491 488 492 CArray<double, 3>data(data_k8, shape(data_Xsize, data_Ysize, data_Zsize), neverDeleteData); … … 505 509 CContext* context = CContext::getCurrent(); 506 510 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 507 context->checkBuffersAndListen(); 511 context->eventLoop(); 512 //ym context->checkBuffersAndListen(); 508 513 509 514 CArray<double, 4>data(data_k8, shape(data_0size, data_1size, data_2size, data_3size), neverDeleteData); … … 527 532 528 533 CContext* context = CContext::getCurrent(); 529 if (!context->hasServer && !context->client->isAttachedModeEnabled())530 context->checkBuffersAndListen();534 context->eventLoop(); 535 //ym context->checkBuffersAndListen(); 531 536 532 537 CArray<double, 5>data(data_k8, shape(data_0size, data_1size, data_2size, data_3size, data_4size), neverDeleteData); … … 551 556 CContext* context = CContext::getCurrent(); 552 557 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 553 context->checkBuffersAndListen(); 558 context->eventLoop(); 559 // context->checkBuffersAndListen(); 554 560 555 561 CArray<double, 6>data(data_k8, shape(data_0size, data_1size, data_2size, data_3size, data_4size, data_5size), neverDeleteData); … … 575 581 CContext* context = CContext::getCurrent(); 576 582 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 577 context->checkBuffersAndListen(); 583 context->eventLoop(); 584 // context->checkBuffersAndListen(); 578 585 579 586 CArray<double, 7>data(data_k8, shape(data_0size, data_1size, data_2size, data_3size, data_4size, data_5size, data_6size), neverDeleteData); … … 595 602 CContext* context = CContext::getCurrent(); 596 603 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 597 context->checkBuffersAndListen(); 604 context->eventLoop(); 605 //ym context->checkBuffersAndListen(); 598 606 599 607 CArray<float, 1> data_tmp(data_k4, shape(data_Xsize), neverDeleteData); … … 617 625 CContext* context = CContext::getCurrent(); 618 626 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 619 context->checkBuffersAndListen(); 627 context->eventLoop(); 628 //ym context->checkBuffersAndListen(); 620 629 621 630 CArray<float, 1> data_tmp(data_k4, shape(data_Xsize), neverDeleteData); … … 640 649 CContext* context = CContext::getCurrent(); 641 650 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 642 context->checkBuffersAndListen(); 651 context->eventLoop(); 652 //ym context->checkBuffersAndListen(); 643 653 644 654 CArray<float, 2> data_tmp(data_k4, shape(data_Xsize, data_Ysize), neverDeleteData); … … 663 673 CContext* context = CContext::getCurrent(); 664 674 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 665 context->checkBuffersAndListen(); 675 context->eventLoop(); 676 //ym context->checkBuffersAndListen(); 666 677 667 678 CArray<float, 3> data_tmp(data_k4, shape(data_Xsize, data_Ysize, data_Zsize), neverDeleteData); … … 688 699 CContext* context = CContext::getCurrent(); 689 700 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 690 context->checkBuffersAndListen(); 701 context->eventLoop(); 702 //ym context->checkBuffersAndListen(); 691 703 692 704 CArray<float, 4> data_tmp(data_k4, shape(data_0size, data_1size, data_2size, data_3size), neverDeleteData); … … 713 725 CContext* context = CContext::getCurrent(); 714 726 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 715 context->checkBuffersAndListen(); 727 context->eventLoop(); 728 //ym context->checkBuffersAndListen(); 716 729 717 730 CArray<float, 5> data_tmp(data_k4, shape(data_0size, data_1size, data_2size, data_3size, data_4size), neverDeleteData); … … 738 751 CContext* context = CContext::getCurrent(); 739 752 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 740 context->checkBuffersAndListen(); 753 context->eventLoop(); 754 //ym context->checkBuffersAndListen(); 741 755 742 756 CArray<float, 6> data_tmp(data_k4, shape(data_0size, data_1size, data_2size, data_3size, data_4size, data_5size), neverDeleteData); … … 764 778 CContext* context = CContext::getCurrent(); 765 779 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 766 context->checkBuffersAndListen(); 780 context->eventLoop(); 781 //ym context->checkBuffersAndListen(); 767 782 768 783 CArray<float, 7> data_tmp(data_k4, shape(data_0size, data_1size, data_2size, data_3size, data_4size, data_5size, data_6size), neverDeleteData); … … 789 804 CContext* context = CContext::getCurrent(); 790 805 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 791 context->checkBuffersAndListen(); 806 context->eventLoop(); 807 //ym context->checkBuffersAndListen(); 792 808 793 809 CArray<double, 1> data(data_k8, shape(data_Xsize), neverDeleteData); … … 810 826 CContext* context = CContext::getCurrent(); 811 827 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 812 context->checkBuffersAndListen(); 828 context->eventLoop(); 829 //ym context->checkBuffersAndListen(); 813 830 814 831 CArray<double, 1> data(data_k8, shape(data_Xsize), neverDeleteData); … … 831 848 CContext* context = CContext::getCurrent(); 832 849 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 833 context->checkBuffersAndListen(); 850 context->eventLoop(); 851 //ym context->checkBuffersAndListen(); 834 852 835 853 CArray<double, 2>data(data_k8, shape(data_Xsize, data_Ysize), neverDeleteData); … … 852 870 CContext* context = CContext::getCurrent(); 853 871 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 854 context->checkBuffersAndListen(); 872 context->eventLoop(); 873 //ym context->checkBuffersAndListen(); 855 874 856 875 CArray<double, 3>data(data_k8, shape(data_Xsize, data_Ysize, data_Zsize), neverDeleteData); … … 875 894 CContext* context = CContext::getCurrent(); 876 895 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 877 context->checkBuffersAndListen(); 896 context->eventLoop(); 897 //ym context->checkBuffersAndListen(); 878 898 879 899 CArray<double, 4>data(data_k8, shape(data_0size, data_1size, data_2size, data_3size), neverDeleteData); … … 898 918 CContext* context = CContext::getCurrent(); 899 919 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 900 context->checkBuffersAndListen(); 920 context->eventLoop(); 921 //ym context->checkBuffersAndListen(); 901 922 902 923 CArray<double, 5>data(data_k8, shape(data_0size, data_1size, data_2size, data_3size, data_4size), neverDeleteData); … … 921 942 CContext* context = CContext::getCurrent(); 922 943 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 923 context->checkBuffersAndListen(); 944 context->eventLoop(); 945 //ym context->checkBuffersAndListen(); 924 946 925 947 CArray<double, 6>data(data_k8, shape(data_0size, data_1size, data_2size, data_3size, data_4size, data_5size), neverDeleteData); … … 945 967 CContext* context = CContext::getCurrent(); 946 968 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 947 context->checkBuffersAndListen(); 969 context->eventLoop(); 970 //ym context->checkBuffersAndListen(); 948 971 949 972 CArray<double, 7>data(data_k8, shape(data_0size, data_1size, data_2size, data_3size, data_4size, data_5size, data_6size), neverDeleteData); … … 966 989 CContext* context = CContext::getCurrent(); 967 990 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 968 context->checkBuffersAndListen(); 991 context->eventLoop(); 992 //ym context->checkBuffersAndListen(); 969 993 970 994 CArray<double, 1> data(data_Xsize); … … 989 1013 CContext* context = CContext::getCurrent(); 990 1014 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 991 context->checkBuffersAndListen(); 1015 context->eventLoop(); 1016 //ym context->checkBuffersAndListen(); 992 1017 993 1018 CArray<double, 1> data(data_Xsize); … … 1012 1037 CContext* context = CContext::getCurrent(); 1013 1038 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 1014 context->checkBuffersAndListen(); 1039 context->eventLoop(); 1040 //ym context->checkBuffersAndListen(); 1015 1041 1016 1042 CArray<double, 2> data(data_Xsize, data_Ysize); … … 1035 1061 CContext* context = CContext::getCurrent(); 1036 1062 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 1037 context->checkBuffersAndListen(); 1063 context->eventLoop(); 1064 //ym context->checkBuffersAndListen(); 1038 1065 1039 1066 CArray<double, 3> data(data_Xsize, data_Ysize, data_Zsize); … … 1060 1087 CContext* context = CContext::getCurrent(); 1061 1088 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 1062 context->checkBuffersAndListen(); 1089 context->eventLoop(); 1090 //ym context->checkBuffersAndListen(); 1063 1091 1064 1092 CArray<double, 4> data(data_0size, data_1size, data_2size, data_3size); … … 1085 1113 CContext* context = CContext::getCurrent(); 1086 1114 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 1087 context->checkBuffersAndListen(); 1115 context->eventLoop(); 1116 //ym context->checkBuffersAndListen(); 1088 1117 1089 1118 CArray<double, 5> data(data_0size, data_1size, data_2size, data_3size, data_4size); … … 1110 1139 CContext* context = CContext::getCurrent(); 1111 1140 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 1112 context->checkBuffersAndListen(); 1141 context->eventLoop(); 1142 //ym context->checkBuffersAndListen(); 1113 1143 1114 1144 CArray<double, 6> data(data_0size, data_1size, data_2size, data_3size, data_4size, data_5size); … … 1136 1166 CContext* context = CContext::getCurrent(); 1137 1167 if (!context->hasServer && !context->client->isAttachedModeEnabled()) 1138 context->checkBuffersAndListen(); 1168 context->eventLoop(); 1169 //ym context->checkBuffersAndListen(); 1139 1170 1140 1171 CArray<double, 7> data(data_0size, data_1size, data_2size, data_3size, data_4size, data_5size, data_6size); -
XIOS/dev/dev_ym/XIOS_SERVICES/src/interface/fortran/ixios.F90
r1754 r1761 8 8 xios(get_year_length_in_seconds), xios(get_day_length_in_seconds) 9 9 10 USE icontext, ONLY : txios(context), xios(set_current_context), xios( is_valid_context)10 USE icontext, ONLY : txios(context), xios(set_current_context), xios(get_current_context), xios(is_valid_context) 11 11 12 12 USE icontext_attr, ONLY : xios(set_context_attr), xios(get_context_attr), xios(is_defined_context_attr) -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/axis.cpp
r1639 r1761 847 847 848 848 numberWrittenIndexes_[writtenCommSize] = nbWritten; 849 if (isDistributed()) 849 850 bool distributed_glo, distributed=isDistributed() ; 851 MPI_Allreduce(&distributed,&distributed_glo, 1, MPI_INT, MPI_LOR, writtenComm) ; 852 if (distributed_glo) 850 853 { 851 854 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.cpp
r1757 r1761 20 20 #include "server.hpp" 21 21 #include "distribute_file_server2.hpp" 22 #include "services_manager.hpp" 23 #include "contexts_manager.hpp" 24 #include "cxios.hpp" 25 #include "client.hpp" 22 26 23 27 namespace xios { … … 31 35 , calendar(), hasClient(false), hasServer(false) 32 36 , isPostProcessed(false), finalized(false) 33 , idServer_(), client( 0), server(0)34 , allProcessed(false), countChildCtx_(0) 37 , idServer_(), client(nullptr), server(nullptr) 38 , allProcessed(false), countChildCtx_(0), isProcessingEvent_(false) 35 39 36 40 { /* Ne rien faire de plus */ } … … 40 44 , calendar(), hasClient(false), hasServer(false) 41 45 , isPostProcessed(false), finalized(false) 42 , idServer_(), client( 0), server(0)43 , allProcessed(false), countChildCtx_(0) 46 , idServer_(), client(nullptr), server(nullptr) 47 , allProcessed(false), countChildCtx_(0), isProcessingEvent_(false) 44 48 { /* Ne rien faire de plus */ } 45 49 … … 264 268 ///--------------------------------------------------------------- 265 269 266 //! Initialize client side 270 271 //! Initialize client side : old interface to be removed 267 272 void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/) 268 273 TRY … … 270 275 271 276 hasClient = true; 272 MPI_Comm intraCommServer, interCommServer; 277 MPI_Comm intraCommServer, interCommServer; 273 278 274 279 … … 383 388 CATCH_DUMP_ATTR 384 389 390 391 void CContext::init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType) 392 TRY 393 { 394 parentServerContext_ = parentServerContext ; 395 if (serviceType==CServicesManager::CLIENT) 396 initClient(intraComm, serviceType) ; 397 else 398 initServer(intraComm, serviceType) ; 399 } 400 CATCH_DUMP_ATTR 401 402 403 404 //! Initialize client side 405 void CContext::initClient(MPI_Comm intraComm, int serviceType) 406 TRY 407 { 408 intraComm_=intraComm ; 409 serviceType_ = CServicesManager::CLIENT ; 410 if (serviceType_==CServicesManager::CLIENT) 411 { 412 hasClient=true ; 413 hasServer=false ; 414 } 415 contextId_ = getId() ; 416 417 attached_mode=true ; 418 if (!CXios::isUsingServer()) attached_mode=false ; 419 420 421 string contextRegistryId=getId() ; 422 registryIn=new CRegistry(intraComm); 423 registryIn->setPath(contextRegistryId) ; 424 425 int commRank ; 426 MPI_Comm_rank(intraComm_,&commRank) ; 427 if (commRank==0) registryIn->fromFile("xios_registry.bin") ; 428 registryIn->bcastRegistry() ; 429 registryOut=new CRegistry(intraComm_) ; 430 registryOut->setPath(contextRegistryId) ; 431 432 } 433 CATCH_DUMP_ATTR 434 435 436 void CContext::initServer(MPI_Comm intraComm, int serviceType) 437 TRY 438 { 439 hasServer=true; 440 intraComm_=intraComm ; 441 serviceType_=serviceType ; 442 443 if (serviceType_==CServicesManager::GATHERER) 444 { 445 hasClient=true ; 446 hasServer=true ; 447 } 448 else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 449 { 450 hasClient=false ; 451 hasServer=true ; 452 } 453 454 CXios::getContextsManager()->getContextId(getId(), contextId_, intraComm) ; 455 456 registryIn=new CRegistry(intraComm); 457 registryIn->setPath(contextId_) ; 458 459 int commRank ; 460 MPI_Comm_rank(intraComm_,&commRank) ; 461 if (commRank==0) registryIn->fromFile("xios_registry.bin") ; 462 463 registryIn->bcastRegistry() ; 464 registryOut=new CRegistry(intraComm) ; 465 registryOut->setPath(contextId_) ; 466 467 } 468 CATCH_DUMP_ATTR 469 470 471 void CContext::createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) // for servers 472 TRY 473 { 474 MPI_Comm intraCommClient ; 475 MPI_Comm_dup(intraComm_, &intraCommClient); 476 comms.push_back(intraCommClient); 477 478 server = new CContextServer(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ? 479 client = new CContextClient(this,intraCommClient,interCommClient); 480 481 } 482 CATCH_DUMP_ATTR 483 484 void CContext::createServerInterComm(void) 485 TRY 486 { 487 488 MPI_Comm interCommClient, interCommServer ; 489 490 if (serviceType_ == CServicesManager::CLIENT) 491 { 492 493 int commRank ; 494 MPI_Comm_rank(intraComm_,&commRank) ; 495 if (commRank==0) 496 { 497 if (attached_mode) CXios::getContextsManager()->createServerContext(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId()) ; 498 else if (CXios::usingServer2) CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId()) ; 499 else CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId()) ; 500 } 501 502 MPI_Comm interComm ; 503 504 if (attached_mode) 505 { 506 parentServerContext_->createIntercomm(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId(), intraComm_, 507 interCommClient, interCommServer) ; 508 int type ; 509 if (commRank==0) CXios::getServicesManager()->getServiceType(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type) ; 510 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 511 setIdServer(CXios::getContextsManager()->getServerContextName(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type, getContextId())) ; 512 setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble 513 } 514 else if (CXios::usingServer2) 515 { 516 // CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, interComm) ; 517 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, 518 interCommClient, interCommServer) ; 519 int type ; 520 if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultGathererId, 0, type) ; 521 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 522 setIdServer(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultGathererId, 0, type, getContextId())) ; 523 } 524 else 525 { 526 //CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, interComm) ; 527 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, 528 interCommClient, interCommServer) ; 529 int type ; 530 if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ; 531 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 532 setIdServer(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, 0, type, getContextId())) ; 533 } 534 535 // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 536 // in future better to replace it by intracommuncator associated to the context 537 538 /* MPI_Comm intraCommClient, intraCommServer ; 539 MPI_Comm interCommClient, interCommServer ; 540 541 intraCommClient=intraComm_ ; 542 MPI_Comm_dup(intraComm_, &intraCommServer) ; 543 544 interCommClient=interComm ; 545 MPI_Comm_dup(interComm, &interCommServer) ; */ 546 547 MPI_Comm intraCommClient, intraCommServer ; 548 intraCommClient=intraComm_ ; 549 MPI_Comm_dup(intraComm_, &intraCommServer) ; 550 client = new CContextClient(this, intraCommClient, interCommClient); 551 server = new CContextServer(this, intraCommServer, interCommServer); 552 553 } 554 555 if (serviceType_ == CServicesManager::GATHERER) 556 { 557 int commRank ; 558 MPI_Comm_rank(intraComm_,&commRank) ; 559 560 int nbPartitions ; 561 if (commRank==0) 562 { 563 CXios::getServicesManager()->getServiceNbPartitions(CXios::defaultPoolId, CXios::defaultServerId, 0, nbPartitions) ; 564 for(int i=0 ; i<nbPartitions; i++) 565 CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId()) ; 566 } 567 MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ; 568 569 MPI_Comm interComm ; 570 for(int i=0 ; i<nbPartitions; i++) 571 { 572 // CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interComm) ; 573 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 574 int type ; 575 if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ; 576 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 577 primServerId_.push_back(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, i, type, getContextId())) ; 578 579 // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 580 // in future better to replace it by intracommuncator associated to the context 581 582 MPI_Comm intraCommClient, intraCommServer ; 583 // MPI_Comm interCommClient, interCommServer ; 584 585 intraCommClient=intraComm_ ; 586 MPI_Comm_dup(intraComm_, &intraCommServer) ; 587 588 // interCommClient=interComm ; 589 // MPI_Comm_dup(interComm, &interCommServer) ; 590 591 clientPrimServer.push_back(new CContextClient(this, intraCommClient, interCommClient)); 592 serverPrimServer.push_back(new CContextServer(this, intraCommServer, interCommServer)); 593 594 } 595 } 596 } 597 CATCH_DUMP_ATTR 598 599 385 600 void CContext::initServer(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtClient /*= 0*/) 386 601 TRY … … 418 633 } 419 634 CATCH_DUMP_ATTR 635 636 637 638 bool CContext::eventLoop(bool enableEventsProcessing) 639 { 640 bool finished=true; 641 642 if (client!=nullptr && !finalized) client->checkBuffers(); 643 644 for (int i = 0; i < clientPrimServer.size(); ++i) 645 { 646 if (!finalized) clientPrimServer[i]->checkBuffers(); 647 if (!finalized) finished &= serverPrimServer[i]->eventLoop(enableEventsProcessing); 648 } 649 650 if (server!=nullptr) if (!finalized) finished &= server->eventLoop(enableEventsProcessing); 651 652 return finalized && finished ; 653 } 420 654 421 655 //! Try to send the buffers and receive possible answers … … 451 685 } 452 686 } 453 CATCH_DUMP_ATTR 454 455 //! Terminate a context 687 CATCH_DUMP_ATTR 688 689 690 691 456 692 void CContext::finalize(void) 457 693 TRY … … 462 698 } 463 699 // Send registry upon calling the function the first time 464 if (countChildCtx_ == 0) 465 if (hasClient) sendRegistry() ; 700 if (countChildCtx_ == 0) if (hasClient) sendRegistry() ; 466 701 467 702 // Client: … … 562 797 563 798 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 799 800 /* ym 564 801 if (hasServer && !hasClient) 565 802 { … … 567 804 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 568 805 } 806 */ 569 807 570 808 //! Deallocate client buffers … … 589 827 CATCH_DUMP_ATTR 590 828 829 830 831 //! Terminate a context 832 void CContext::finalize_old(void) 833 TRY 834 { 835 if (hasClient && !hasServer) // For now we only use server level 1 to read data 836 { 837 doPreTimestepOperationsForEnabledReadModeFiles(); 838 } 839 // Send registry upon calling the function the first time 840 if (countChildCtx_ == 0) if (hasClient) sendRegistry() ; 841 842 // Client: 843 // (1) blocking send context finalize to its server 844 // (2) blocking receive context finalize from its server 845 // (3) some memory deallocations 846 if (CXios::isClient) 847 { 848 // Make sure that client (model) enters the loop only once 849 if (countChildCtx_ < 1) 850 { 851 ++countChildCtx_; 852 853 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 854 client->finalize(); 855 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 856 while (client->havePendingRequests()) client->checkBuffers(); 857 858 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 859 while (!server->hasFinished()) 860 server->eventLoop(); 861 info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ; 862 863 bool notifiedFinalized=false ; 864 do 865 { 866 notifiedFinalized=client->isNotifiedFinalized() ; 867 } while (!notifiedFinalized) ; 868 client->releaseBuffers(); 869 870 if (hasServer) // Mode attache 871 { 872 closeAllFile(); 873 registryOut->hierarchicalGatherRegistry() ; 874 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 875 } 876 877 //! Deallocate client buffers 878 // client->releaseBuffers(); 879 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 880 //! Free internally allocated communicators 881 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 882 MPI_Comm_free(&(*it)); 883 comms.clear(); 884 885 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 886 } 887 } 888 else if (CXios::isServer) 889 { 890 // First context finalize message received from a model 891 // Send context finalize to its child contexts (if any) 892 if (countChildCtx_ == 0) 893 for (int i = 0; i < clientPrimServer.size(); ++i) 894 { 895 clientPrimServer[i]->finalize(); 896 bool bufferReleased; 897 do 898 { 899 clientPrimServer[i]->checkBuffers(); 900 bufferReleased = !clientPrimServer[i]->havePendingRequests(); 901 } while (!bufferReleased); 902 903 bool notifiedFinalized=false ; 904 do 905 { 906 // clientPrimServer[i]->checkBuffers(); 907 notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ; 908 } while (!notifiedFinalized) ; 909 clientPrimServer[i]->releaseBuffers(); 910 } 911 912 913 // (Last) context finalized message received 914 if (countChildCtx_ == clientPrimServer.size()) 915 { 916 // Blocking send of context finalize message to its client (e.g. primary server or model) 917 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 918 client->finalize(); 919 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 920 bool bufferReleased; 921 do 922 { 923 client->checkBuffers(); 924 bufferReleased = !client->havePendingRequests(); 925 } while (!bufferReleased); 926 927 bool notifiedFinalized=false ; 928 do 929 { 930 // client->checkBuffers(); 931 notifiedFinalized=client->isNotifiedFinalized() ; 932 } while (!notifiedFinalized) ; 933 client->releaseBuffers(); 934 935 finalized = true; 936 info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ; 937 938 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 939 940 /* ym 941 if (hasServer && !hasClient) 942 { 943 registryOut->hierarchicalGatherRegistry() ; 944 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 945 } 946 */ 947 948 //! Deallocate client buffers 949 // client->releaseBuffers(); 950 info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ; 951 952 /* 953 for (int i = 0; i < clientPrimServer.size(); ++i) 954 clientPrimServer[i]->releaseBuffers(); 955 */ 956 //! Free internally allocated communicators 957 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 958 MPI_Comm_free(&(*it)); 959 comms.clear(); 960 961 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 962 } 963 964 ++countChildCtx_; 965 } 966 } 967 CATCH_DUMP_ATTR 968 591 969 //! Free internally allocated communicators 592 970 void CContext::freeComms(void) … … 614 992 if (allProcessed) return; 615 993 994 // create intercommunicator with servers. 995 // not sure it is the good place to be called here 996 createServerInterComm() ; 997 998 616 999 // After xml is parsed, there are some more works with post processing 617 1000 postProcessing(); … … 734 1117 TRY 735 1118 { 736 CTimer::get("Context : close definition").resume() ; 1119 CTimer::get("Context : close definition").resume() ; 1120 1121 // 737 1122 postProcessingGlobalAttributes(); 738 1123 … … 1548 1933 CATCH_DUMP_ATTR 1549 1934 1935 void CContext::setIdServer(const StdString& idServer) 1936 TRY 1937 { 1938 idServer_=idServer ; 1939 } 1940 CATCH_DUMP_ATTR 1941 1942 1943 const StdString& CContext::getIdServer() 1944 TRY 1945 { 1946 return idServer_; 1947 } 1948 CATCH_DUMP_ATTR 1949 1950 const StdString& CContext::getIdServer(const int i) 1951 TRY 1952 { 1953 // return idServer_ + std::to_string(static_cast<unsigned long long>(i)); 1954 return primServerId_[i] ; 1955 } 1956 CATCH_DUMP_ATTR 1957 1958 /* 1550 1959 const StdString& CContext::getIdServer() 1551 1960 TRY … … 1570 1979 } 1571 1980 CATCH_DUMP_ATTR 1981 */ 1982 1572 1983 1573 1984 /*! -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.hpp
r1639 r1761 12 12 #include "registry.hpp" 13 13 #include "mpi.hpp" 14 #include "services_manager.hpp" 15 #include "server_context.hpp" 14 16 15 17 … … 89 91 // Initialize server or client 90 92 void initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer = 0); 93 void init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType); 94 void initClient(MPI_Comm intraComm, int serviceType); 95 91 96 void initServer(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtClient = 0); 97 void initServer(MPI_Comm intraComm, int serviceType ); 98 void createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) ; 99 void createServerInterComm(void) ; 100 92 101 bool isInitialized(void); 93 102 … … 96 105 // Put sever or client into loop state 97 106 bool checkBuffersAndListen(bool enableEventsProcessing=true); 107 bool eventLoop(bool enableEventsProcessing=true); 98 108 99 109 // Finalize a context 100 110 void finalize(void); 111 void finalize_old(void); 101 112 bool isFinalized(void); 102 113 … … 161 172 162 173 const StdString& getIdServer(); 174 void setIdServer(const StdString& idServer); 163 175 const StdString& getIdServer(const int srvPoolNb); 176 std::string getContextId() {return contextId_;} 164 177 165 178 // Client side: Receive and process messages … … 208 221 static void ShowTree(StdOStream & out = std::clog); 209 222 static void CleanTree(void); 223 int getServiceType(void) {return serviceType_;} 210 224 211 225 public : … … 223 237 virtual bool hasChild(void) const; 224 238 239 bool isProcessingEvent(void) {return isProcessingEvent_;} 240 bool setProcessingEvent(void) {isProcessingEvent_=true ;} 241 bool unsetProcessingEvent(void) {isProcessingEvent_=false ;} 225 242 226 243 public : … … 252 269 std::vector<CContextServer*> serverPrimServer; 253 270 std::vector<CContextClient*> clientPrimServer; 271 std::vector<std::string> primServerId_; 254 272 255 273 CRegistry* registryIn ; //!< input registry which is read from file 256 274 CRegistry* registryOut ; //!< output registry which will be written into file at the finalize 275 276 277 MPI_Comm intraComm_ ; //! context intra communicator 257 278 258 279 private: … … 265 286 std::list<MPI_Comm> comms; //!< Communicators allocated internally 266 287 288 int serviceType_; //!< service associated to the context 289 string contextId_ ; //!< context client id for the servers. For clients this is same as getId() 290 bool isProcessingEvent_ ; 291 CServerContext* parentServerContext_ ; 292 267 293 public: // Some function maybe removed in the near future 268 294 // virtual void toBinary (StdOStream & os) const; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/domain.cpp
r1639 r1761 226 226 bool distributed = !((!ni.isEmpty() && (ni == ni_glo) && !nj.isEmpty() && (nj == nj_glo)) || 227 227 (!i_index.isEmpty() && i_index.numElements() == ni_glo*nj_glo)); 228 bool distributed_glo ; 228 229 distributed |= (1 == CContext::getCurrent()->client->clientSize); 229 230 … … 2061 2062 2062 2063 numberWrittenIndexes_[writtenCommSize] = nbWritten; 2063 if (isDistributed()) 2064 bool distributed_glo, distributed=isDistributed() ; 2065 MPI_Allreduce(&distributed,&distributed_glo, 1, MPI_INT, MPI_LOR, writtenComm) ; 2066 2067 if (distributed_glo) 2064 2068 { 2065 2069 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/field.cpp
r1751 r1761 634 634 timer.resume(); 635 635 636 context->checkBuffersAndListen(); 636 //ym context->checkBuffersAndListen(); 637 context->eventLoop(); 637 638 638 639 timer.suspend(); -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/file.cpp
r1639 r1761 323 323 // Done by classical server or secondary server 324 324 // This condition should be changed soon 325 if (CServer::serverLevel == 0 || CServer::serverLevel == 2) 326 { 325 //ym if (CServer::serverLevel == 0 || CServer::serverLevel == 2) 326 if (context->getServiceType()==CServicesManager::IO_SERVER || context->getServiceType()==CServicesManager::OUT_SERVER) 327 { 327 328 if (mode.isEmpty() || mode.getValue() == mode_attr::write) 328 329 { … … 349 350 // Done by classical server or secondary server 350 351 // TODO: This condition should be changed soon. It only works with maximum number of level as 2 351 if (CServer::serverLevel == 0 || CServer::serverLevel == 1) 352 353 //ym if (CServer::serverLevel == 0 || CServer::serverLevel == 1) 354 if (context->getServiceType()==CServicesManager::IO_SERVER || context->getServiceType()==CServicesManager::GATHERER) 352 355 { 353 356 if (!mode.isEmpty() && mode.getValue() == mode_attr::read) -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/grid.cpp
r1639 r1761 907 907 } 908 908 } 909 if (CServer::serverLevel==2) 909 //ym if (CServer::serverLevel==2) 910 if (context->getServiceType()==CServicesManager::OUT_SERVER) 910 911 { 911 912 computeWrittenIndex() ; … … 1679 1680 connectedServerRankRead_ = ranks; 1680 1681 1681 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 1682 nbSrvPools = 1; 1682 // ym something is bad here, I comment some line, to be checked in future 1683 // int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 1684 int nbSrvPools = 1; 1683 1685 nbReadSenders.clear(); 1684 1686 for (int p = 0; p < nbSrvPools; ++p) 1685 1687 { 1686 CContextServer* server = (!context->hasClient) ? context->server : context->serverPrimServer[p]; 1688 // CContextServer* server = (!context->hasClient) ? context->server : context->serverPrimServer[p]; 1689 CContextServer* server = context->server ; 1687 1690 CContextClient* client = context->client; //(!context->hasClient) ? context->client : context->clientPrimServer[p]; 1688 1691 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/object.cpp
r1622 r1761 29 29 } 30 30 31 const StdString& CObject::getIdServer() const 31 const StdString& CObject::getIdServer() 32 { 33 return this->id; 34 } 35 36 const StdString& CObject::getIdServer(int nSrvpool) 32 37 { 33 38 return this->id; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/object.hpp
r1622 r1761 17 17 /// Accesseurs /// 18 18 const StdString& getId(void) const; 19 virtual const StdString& getIdServer() const; 19 virtual const StdString& getIdServer() ; 20 virtual const StdString& getIdServer(int nsrvPool) ; 20 21 21 22 virtual StdString dumpClassAttributes(void); -
XIOS/dev/dev_ym/XIOS_SERVICES/src/object_template_impl.hpp
r1626 r1761 270 270 { 271 271 CMessage msg; 272 msg<<this->getIdServer(); 272 if (context->hasServer) msg<<this->getIdServer(i); 273 else msg<<this->getIdServer(); 274 273 275 msg << attr.getName(); 274 276 msg << attr; … … 290 292 { 291 293 CMessage msg; 292 msg<<this->getIdServer(); 294 msg<<this->getIdServer(); // pb with context attribute -> to check : for now seem to be never used for context... 293 295 msg << attr.getName(); 294 296 msg << attr; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/server.cpp
r1639 r1761 15 15 #include "event_scheduler.hpp" 16 16 #include "string_tools.hpp" 17 #include "ressources_manager.hpp" 18 #include "services_manager.hpp" 19 #include "contexts_manager.hpp" 20 #include "servers_ressource.hpp" 21 #include <cstdio> 22 23 17 24 18 25 namespace xios 19 26 { 20 27 MPI_Comm CServer::intraComm ; 28 MPI_Comm CServer::serversComm_ ; 21 29 std::list<MPI_Comm> CServer::interCommLeft ; 22 30 std::list<MPI_Comm> CServer::interCommRight ; … … 34 42 bool CServer::is_MPI_Initialized ; 35 43 CEventScheduler* CServer::eventScheduler = 0; 44 CServersRessource* CServer::serversRessource_=nullptr ; 45 46 void CServer::initRessources(void) 47 { 48 auto ressourcesManager=CXios::getRessourcesManager() ; 49 auto servicesManager=CXios::getServicesManager() ; 50 auto contextsManager=CXios::getContextsManager() ; 51 auto daemonsManager=CXios::getDaemonsManager() ; 52 auto serversRessource=CServer::getServersRessource() ; 53 54 if (serversRessource->isServerLeader()) 55 { 56 // ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()/2) ; 57 // ressourcesManager->createPool("NEMO",ressourcesManager->getRessourcesSize()/2) ; 58 ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()) ; 59 servicesManager->createServices("LMDZ", "ioserver", CServicesManager::IO_SERVER, 8, 5) ; 60 for(int i=0 ; i<5;i++) 61 { 62 contextsManager->createServerContext("LMDZ","ioserver",i,"lmdz") ; 63 } 64 } 65 66 67 68 while (true) 69 { 70 daemonsManager->eventLoop() ; 71 } 72 73 74 } 75 76 void CServer::initialize(void) 77 { 78 79 MPI_Comm serverComm ; 80 int initialized ; 81 MPI_Initialized(&initialized) ; 82 if (initialized) is_MPI_Initialized=true ; 83 else is_MPI_Initialized=false ; 84 MPI_Comm globalComm=CXios::getGlobalComm() ; 85 86 ///////////////////////////////////////// 87 ///////////// PART 1 //////////////////// 88 ///////////////////////////////////////// 89 90 // don't use OASIS 91 if (!CXios::usingOasis) 92 { 93 if (!is_MPI_Initialized) MPI_Init(NULL, NULL); 94 95 // split the global communicator 96 // get hash from all model to attribute a unique color (int) and then split to get client communicator 97 // every mpi process of globalComm (MPI_COMM_WORLD) must participate 98 99 int commRank, commSize ; 100 MPI_Comm_rank(globalComm,&commRank) ; 101 MPI_Comm_size(globalComm,&commSize) ; 102 103 std::hash<string> hashString ; 104 size_t hashServer=hashString(CXios::xiosCodeId) ; 105 106 size_t* hashAll = new size_t[commSize] ; 107 MPI_Allgather(&hashServer,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; 108 109 int color=0 ; 110 set<size_t> listHash ; 111 for(int i=0 ; i<=commRank ; i++) 112 if (listHash.count(hashAll[i])==1) 113 { 114 listHash.insert(hashAll[i]) ; 115 color=color+1 ; 116 } 117 delete[] hashAll ; 118 119 MPI_Comm_split(globalComm, color, commRank, &serverComm) ; 120 } 121 else // using OASIS 122 { 123 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 124 125 CTimer::get("XIOS").resume() ; 126 oasis_get_localcomm(serverComm); 127 } 128 129 ///////////////////////////////////////// 130 ///////////// PART 2 //////////////////// 131 ///////////////////////////////////////// 132 133 134 // Create the XIOS communicator for every process which is related 135 // to XIOS, as well on client side as on server side 136 MPI_Comm xiosGlobalComm ; 137 string strIds=CXios::getin<string>("clients_code_id","") ; 138 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 139 if (strIds.empty()) 140 { 141 // no code Ids given, suppose XIOS initialisation is global 142 int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; 143 MPI_Comm splitComm,interComm ; 144 MPI_Comm_rank(globalComm,&commGlobalRank) ; 145 MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ; 146 MPI_Comm_rank(splitComm,&commRank) ; 147 if (commRank==0) serverLeader=commGlobalRank ; 148 else serverLeader=0 ; 149 clientLeader=0 ; 150 MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 151 MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 152 MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ; 153 MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ; 154 CXios::setXiosComm(xiosGlobalComm) ; 155 } 156 else 157 { 158 159 xiosGlobalCommByFileExchange(serverComm) ; 160 161 } 162 163 ///////////////////////////////////////// 164 ///////////// PART 4 //////////////////// 165 // create servers intra communicator // 166 ///////////////////////////////////////// 167 168 int commRank ; 169 MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; 170 MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ; 171 172 CXios::setUsingServer() ; 173 174 ///////////////////////////////////////// 175 ///////////// PART 5 //////////////////// 176 // redirect files output // 177 ///////////////////////////////////////// 178 179 CServer::openInfoStream(CXios::serverFile); 180 CServer::openErrorStream(CXios::serverFile); 181 182 ///////////////////////////////////////// 183 ///////////// PART 4 //////////////////// 184 ///////////////////////////////////////// 185 186 CXios::launchDaemonsManager(true) ; 187 188 ///////////////////////////////////////// 189 ///////////// PART 5 //////////////////// 190 ///////////////////////////////////////// 191 192 // create the services 193 194 auto ressourcesManager=CXios::getRessourcesManager() ; 195 auto servicesManager=CXios::getServicesManager() ; 196 auto contextsManager=CXios::getContextsManager() ; 197 auto daemonsManager=CXios::getDaemonsManager() ; 198 auto serversRessource=CServer::getServersRessource() ; 199 200 if (serversRessource->isServerLeader()) 201 { 202 int nbRessources = ressourcesManager->getRessourcesSize() ; 203 if (!CXios::usingServer2) 204 { 205 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 206 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ; 207 } 208 else 209 { 210 int nprocsServer = nbRessources*CXios::ratioServer2/100.; 211 int nprocsGatherer = nbRessources - nprocsServer ; 212 213 int nbPoolsServer2 = CXios::nbPoolsServer2 ; 214 if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 215 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 216 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 217 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ; 218 } 219 } 220 221 ///////////////////////////////////////// 222 ///////////// PART 5 //////////////////// 223 ///////////////////////////////////////// 224 // loop on event loop 225 226 bool finished=false ; 227 while (!finished) 228 { 229 finished=daemonsManager->eventLoop() ; 230 } 231 232 } 233 234 235 236 237 238 void CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm) 239 { 240 241 MPI_Comm globalComm=CXios::getGlobalComm() ; 242 MPI_Comm xiosGlobalComm ; 243 244 string strIds=CXios::getin<string>("clients_code_id","") ; 245 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 246 247 int commRank, globalRank ; 248 MPI_Comm_rank(serverComm, &commRank) ; 249 MPI_Comm_rank(globalComm, &globalRank) ; 250 string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ; 251 252 if (commRank==0) // if root process publish name 253 { 254 std::ofstream ofs (serverFileName, std::ofstream::out); 255 ofs<<globalRank ; 256 ofs.close(); 257 } 258 259 vector<int> clientsRank(clientsCodeId.size()) ; 260 for(int i=0;i<clientsRank.size();i++) 261 { 262 std::ifstream ifs ; 263 string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; 264 do 265 { 266 ifs.clear() ; 267 ifs.open(fileName, std::ifstream::in) ; 268 } while (ifs.fail()) ; 269 ifs>>clientsRank[i] ; 270 ifs.close() ; 271 } 272 273 MPI_Comm intraComm ; 274 MPI_Comm_dup(serverComm,&intraComm) ; 275 MPI_Comm interComm ; 276 for(int i=0 ; i<clientsRank.size(); i++) 277 { 278 MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); 279 MPI_Comm_free(&intraComm) ; 280 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 281 } 282 xiosGlobalComm=intraComm ; 283 MPI_Barrier(xiosGlobalComm); 284 if (commRank==0) std::remove(serverFileName.c_str()) ; 285 MPI_Barrier(xiosGlobalComm); 286 287 CXios::setXiosComm(xiosGlobalComm) ; 288 289 } 290 291 292 void CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm) 293 { 294 // untested, need to be tested on a true MPI-2 compliant library 295 296 // try to discover other client/server 297 /* 298 // publish server name 299 char portName[MPI_MAX_PORT_NAME]; 300 int ierr ; 301 int commRank ; 302 MPI_Comm_rank(serverComm, &commRank) ; 303 304 if (commRank==0) // if root process publish name 305 { 306 MPI_Open_port(MPI_INFO_NULL, portName); 307 MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 308 } 309 310 MPI_Comm intraComm=serverComm ; 311 MPI_Comm interComm ; 312 for(int i=0 ; i<clientsCodeId.size(); i++) 313 { 314 MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 315 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 316 } 317 */ 318 } 36 319 37 320 //--------------------------------------------------------------- … … 45 328 * IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. 46 329 */ 47 void CServer::initialize (void)330 void CServer::initialize_old(void) 48 331 { 49 332 int initialized ; … … 53 336 int rank ; 54 337 338 CXios::launchRessourcesManager(true) ; 339 CXios::launchServicesManager(true) ; 340 CXios::launchContextsManager(true) ; 341 342 initRessources() ; 55 343 // Not using OASIS 56 344 if (!CXios::usingOasis) … … 421 709 MPI_Comm_free(&(*it)); 422 710 423 MPI_Comm_free(&intraComm);711 // MPI_Comm_free(&intraComm); 424 712 425 713 if (!is_MPI_Initialized) … … 854 1142 } 855 1143 else 856 it->second->checkBuffersAndListen(enableEventsProcessing); 1144 it->second->eventLoop(enableEventsProcessing); 1145 //ym it->second->checkBuffersAndListen(enableEventsProcessing); 857 1146 } 858 1147 } … … 881 1170 void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb) 882 1171 { 883 StdStringStream fileName Client;1172 StdStringStream fileNameServer; 884 1173 int numDigit = 0; 885 int size = 0; 1174 int commSize = 0; 1175 int commRank ; 886 1176 int id; 887 MPI_Comm_size(CXios::globalComm, &size); 888 while (size) 1177 1178 MPI_Comm_size(CXios::getGlobalComm(), &commSize); 1179 MPI_Comm_rank(CXios::getGlobalComm(), &commRank); 1180 1181 while (commSize) 889 1182 { 890 size /= 10;1183 commSize /= 10; 891 1184 ++numDigit; 892 1185 } 893 id = rank_; //getRank();894 895 fileName Client<< fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;896 fb->open(fileName Client.str().c_str(), std::ios::out);1186 id = commRank; 1187 1188 fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; 1189 fb->open(fileNameServer.str().c_str(), std::ios::out); 897 1190 if (!fb->is_open()) 898 1191 ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)", 899 << std::endl << "Can not open <" << fileName Client.str() << "> file to write the server log(s).");1192 << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s)."); 900 1193 } 901 1194 … … 953 1246 if (m_errorStream.is_open()) m_errorStream.close(); 954 1247 } 1248 1249 void CServer::launchServersRessource(MPI_Comm serverComm) 1250 { 1251 serversRessource_ = new CServersRessource(serverComm) ; 1252 } 955 1253 } -
XIOS/dev/dev_ym/XIOS_SERVICES/src/server.hpp
r1639 r1761 10 10 namespace xios 11 11 { 12 class CServersRessource ; 13 12 14 class CServer 13 15 { 14 16 public: 15 17 static void initialize(void); 18 static void initialize_old(void); 19 static void xiosGlobalCommByFileExchange(MPI_Comm serverComm) ; 20 static void xiosGlobalCommByPublishing(MPI_Comm serverComm) ; 21 16 22 static void finalize(void); 17 23 static void eventLoop(void); … … 25 31 static void listenOasisEnddef(void); 26 32 static void registerContext(void* buff,int count, int leaderRank=0); 33 static void initRessources() ; 27 34 28 35 static MPI_Comm intraComm; 36 static MPI_Comm serversComm_; 29 37 static std::list<MPI_Comm> interCommLeft; // interComm between server (primary, classical or secondary) and its client (client or primary server) 30 38 static std::list<MPI_Comm> interCommRight; // interComm between primary server and secondary server (non-empty only for primary server pool) … … 68 76 static void closeErrorStream(); 69 77 78 static CServersRessource* getServersRessource(void) { return serversRessource_;} 79 static void launchServersRessource(MPI_Comm commServer) ; 80 70 81 private: 71 82 static vector<int> sndServerGlobalRanks; //!< Global ranks of pool leaders on the secondary server … … 75 86 static StdOFStream m_errorStream; 76 87 static void openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb); 88 static CServersRessource* serversRessource_ ; 77 89 }; 78 90 }
Note: See TracChangeset
for help on using the changeset viewer.