Changeset 1460 for XIOS/dev/branch_openmp/src/server.cpp
- Timestamp:
- 03/22/18 10:43:20 (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_openmp/src/server.cpp
r1347 r1460 3 3 #include "cxios.hpp" 4 4 #include "server.hpp" 5 #include "client.hpp" 5 6 #include "type.hpp" 6 7 #include "context.hpp" … … 18 19 { 19 20 MPI_Comm CServer::intraComm ; 20 list<MPI_Comm> CServer::interComm ; 21 std::list<MPI_Comm> CServer::interCommLeft ; 22 std::list<MPI_Comm> CServer::interCommRight ; 21 23 std::list<MPI_Comm> CServer::contextInterComms; 22 bool CServer::isRoot ; 23 int CServer::rank = INVALID_RANK; 24 std::list<MPI_Comm> CServer::contextIntraComms; 25 int CServer::serverLevel = 0 ; 26 int CServer::nbContexts = 0; 27 bool CServer::isRoot = false ; 28 int CServer::rank_ = INVALID_RANK; 24 29 StdOFStream CServer::m_infoStream; 25 30 StdOFStream CServer::m_errorStream; 26 31 map<string,CContext*> CServer::contextList ; 32 vector<int> CServer::sndServerGlobalRanks; 27 33 bool CServer::finished=false ; 28 34 bool CServer::is_MPI_Initialized ; 29 35 CEventScheduler* CServer::eventScheduler = 0; 30 36 37 //--------------------------------------------------------------- 38 /*! 39 * \fn void CServer::initialize(void) 40 * Creates intraComm for each possible type of servers (classical, primary or secondary). 41 * (For now the assumption is that there is one proc per secondary server pool.) 42 * Creates interComm and stores them into the following lists: 43 * classical server -- interCommLeft 44 * primary server -- interCommLeft and interCommRight 45 * secondary server -- interCommLeft for each pool. 46 * IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. 47 */ 31 48 void CServer::initialize(void) 32 49 { 33 // int initialized ; 34 // MPI_Initialized(&initialized) ; 35 // if (initialized) is_MPI_Initialized=true ; 36 // else is_MPI_Initialized=false ; 50 //int initialized ; 51 //MPI_Initialized(&initialized) ; 52 //if (initialized) is_MPI_Initialized=true ; 53 //else is_MPI_Initialized=false ; 54 int rank ; 37 55 38 56 // Not using OASIS … … 40 58 { 41 59 42 //if (!is_MPI_Initialized)43 //{44 //MPI_Init(NULL, NULL);45 //}60 //if (!is_MPI_Initialized) 61 //{ 62 // MPI_Init(NULL, NULL); 63 //} 46 64 CTimer::get("XIOS").resume() ; 47 65 48 66 boost::hash<string> hashString ; 49 50 unsigned long hashServer=hashString(CXios::xiosCodeId) ; 67 unsigned long hashServer = hashString(CXios::xiosCodeId); 68 51 69 unsigned long* hashAll ; 52 53 // int rank ; 70 unsigned long* srvLevelAll ; 71 54 72 int size ; 55 73 int myColor ; 56 74 int i,c ; 57 MPI_Comm newComm ; 58 59 MPI_Comm_size(CXios::globalComm,&size) ; 60 MPI_Comm_rank(CXios::globalComm,&rank); 75 MPI_Comm newComm; 76 77 MPI_Comm_size(CXios::globalComm, &size) ; 78 MPI_Comm_rank(CXios::globalComm, &rank_); 79 61 80 hashAll=new unsigned long[size] ; 62 63 MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ; 81 MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ; 64 82 65 83 map<unsigned long, int> colors ; … … 67 85 map<unsigned long, int>::iterator it ; 68 86 87 // (1) Establish client leaders, distribute processes between two server levels 88 std::vector<int> srvRanks; 69 89 for(i=0,c=0;i<size;i++) 70 90 { … … 75 95 c++ ; 76 96 } 77 } 78 79 myColor=colors[hashServer] ; 80 MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ; 81 82 int serverLeader=leaders[hashServer] ; 83 int clientLeader; 84 85 serverLeader=leaders[hashServer] ; 86 for(it=leaders.begin();it!=leaders.end();it++) 87 { 88 if (it->first!=hashServer) 89 { 90 clientLeader=it->second ; 91 int intraCommSize, intraCommRank ; 92 MPI_Comm_size(intraComm,&intraCommSize) ; 93 MPI_Comm_rank(intraComm,&intraCommRank) ; 94 #pragma omp critical (_output) 95 { 96 info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize 97 if (CXios::usingServer2) 98 if (hashAll[i] == hashServer) 99 srvRanks.push_back(i); 100 } 101 102 if (CXios::usingServer2) 103 { 104 int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.; 105 if (reqNbProc<1 || reqNbProc==srvRanks.size()) 106 { 107 error(0)<<"WARNING: void CServer::initialize(void)"<<endl 108 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 109 <<" to secondary server. XIOS will run in the classical server mode."<<endl; 110 } 111 else 112 { 113 int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ; 114 int poolLeader = firstSndSrvRank; 115 //*********** (1) Comment out the line below to set one process per pool 116 // sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 117 int nbPools = CXios::nbPoolsServer2; 118 if ( nbPools > reqNbProc || nbPools < 1) 119 { 120 error(0)<<"WARNING: void CServer::initialize(void)"<<endl 121 << "It is impossible to allocate the requested number of pools = "<<nbPools 122 <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 123 nbPools = reqNbProc; 124 } 125 int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools; 126 int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools; 127 for (i=0; i<srvRanks.size(); i++) 128 { 129 if (i >= firstSndSrvRank) 130 { 131 if (rank_ == srvRanks[i]) 132 { 133 serverLevel=2; 134 } 135 poolLeader += procsPerPool; 136 if (remainder != 0) 137 { 138 ++poolLeader; 139 --remainder; 140 } 141 //*********** (2) Comment out the two lines below to set one process per pool 142 // if (poolLeader < srvRanks.size()) 143 // sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 144 //*********** (3) Uncomment the line below to set one process per pool 145 sndServerGlobalRanks.push_back(srvRanks[i]); 146 } 147 else 148 { 149 if (rank_ == srvRanks[i]) serverLevel=1; 150 } 151 } 152 if (serverLevel==2) 153 { 154 #pragma omp critical (_output) 155 info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 156 for (i=0; i<sndServerGlobalRanks.size(); i++) 157 { 158 if (rank_>= sndServerGlobalRanks[i]) 159 { 160 if ( i == sndServerGlobalRanks.size()-1) 161 { 162 myColor = colors.size() + sndServerGlobalRanks[i]; 163 } 164 else if (rank_< sndServerGlobalRanks[i+1]) 165 { 166 myColor = colors.size() + sndServerGlobalRanks[i]; 167 break; 168 } 169 } 170 } 171 } 172 } 173 } 174 175 // (2) Create intraComm 176 if (serverLevel != 2) myColor=colors[hashServer]; 177 MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ; 178 179 // (3) Create interComm 180 if (serverLevel == 0) 181 { 182 int clientLeader; 183 for(it=leaders.begin();it!=leaders.end();it++) 184 { 185 if (it->first!=hashServer) 186 { 187 clientLeader=it->second ; 188 int intraCommSize, intraCommRank ; 189 MPI_Comm_size(intraComm,&intraCommSize) ; 190 MPI_Comm_rank(intraComm,&intraCommRank) ; 191 #pragma omp critical (_output) 192 { 193 info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize 97 194 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; 98 } 99 MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ; 100 interComm.push_back(newComm) ; 101 } 102 } 103 104 delete [] hashAll ; 195 } 196 197 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 198 interCommLeft.push_back(newComm) ; 199 } 200 } 201 } 202 else if (serverLevel == 1) 203 { 204 int clientLeader, srvSndLeader; 205 int srvPrmLeader ; 206 207 for (it=leaders.begin();it!=leaders.end();it++) 208 { 209 if (it->first != hashServer) 210 { 211 clientLeader=it->second ; 212 int intraCommSize, intraCommRank ; 213 MPI_Comm_size(intraComm, &intraCommSize) ; 214 MPI_Comm_rank(intraComm, &intraCommRank) ; 215 #pragma omp critical (_output) 216 { 217 info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 218 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; 219 } 220 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 221 interCommLeft.push_back(newComm) ; 222 } 223 } 224 225 for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 226 { 227 int intraCommSize, intraCommRank ; 228 MPI_Comm_size(intraComm, &intraCommSize) ; 229 MPI_Comm_rank(intraComm, &intraCommRank) ; 230 #pragma omp critical (_output) 231 { 232 info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 233 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< sndServerGlobalRanks[i]<<endl ; 234 } 235 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ; 236 interCommRight.push_back(newComm) ; 237 } 238 } 239 else 240 { 241 int clientLeader; 242 clientLeader = leaders[hashString(CXios::xiosCodeId)]; 243 int intraCommSize, intraCommRank ; 244 MPI_Comm_size(intraComm, &intraCommSize) ; 245 MPI_Comm_rank(intraComm, &intraCommRank) ; 246 #pragma omp critical (_output) 247 { 248 info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize 249 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; 250 } 251 252 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ; 253 interCommLeft.push_back(newComm) ; 254 } 255 256 delete [] hashAll ; 257 105 258 } 106 259 // using OASIS 107 /*else260 else 108 261 { 109 int rank ,size;110 262 int size; 263 int myColor; 264 int* srvGlobalRanks; 111 265 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 112 266 … … 114 268 MPI_Comm localComm; 115 269 oasis_get_localcomm(localComm); 116 MPI_Comm_dup(localComm, &intraComm); 117 118 MPI_Comm_rank(intraComm,&rank) ; 119 MPI_Comm_size(intraComm,&size) ; 270 MPI_Comm_rank(localComm,&rank_) ; 271 272 // (1) Create server intraComm 273 if (!CXios::usingServer2) 274 { 275 MPI_Comm_dup(localComm, &intraComm); 276 } 277 else 278 { 279 int globalRank; 280 MPI_Comm_size(localComm,&size) ; 281 MPI_Comm_rank(CXios::globalComm,&globalRank) ; 282 srvGlobalRanks = new int[size] ; 283 MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ; 284 285 int reqNbProc = size*CXios::ratioServer2/100.; 286 if (reqNbProc < 1 || reqNbProc == size) 287 { 288 error(0)<<"WARNING: void CServer::initialize(void)"<<endl 289 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 290 <<" to secondary server. XIOS will run in the classical server mode."<<endl; 291 MPI_Comm_dup(localComm, &intraComm); 292 } 293 else 294 { 295 int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ; 296 int poolLeader = firstSndSrvRank; 297 //*********** (1) Comment out the line below to set one process per pool 298 // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 299 int nbPools = CXios::nbPoolsServer2; 300 if ( nbPools > reqNbProc || nbPools < 1) 301 { 302 error(0)<<"WARNING: void CServer::initialize(void)"<<endl 303 << "It is impossible to allocate the requested number of pools = "<<nbPools 304 <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 305 nbPools = reqNbProc; 306 } 307 int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools; 308 int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools; 309 for (int i=0; i<size; i++) 310 { 311 if (i >= firstSndSrvRank) 312 { 313 if (globalRank == srvGlobalRanks[i]) 314 { 315 serverLevel=2; 316 } 317 poolLeader += procsPerPool; 318 if (remainder != 0) 319 { 320 ++poolLeader; 321 --remainder; 322 } 323 //*********** (2) Comment out the two lines below to set one process per pool 324 // if (poolLeader < size) 325 // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 326 //*********** (3) Uncomment the line below to set one process per pool 327 sndServerGlobalRanks.push_back(srvGlobalRanks[i]); 328 } 329 else 330 { 331 if (globalRank == srvGlobalRanks[i]) serverLevel=1; 332 } 333 } 334 if (serverLevel==2) 335 { 336 info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 337 for (int i=0; i<sndServerGlobalRanks.size(); i++) 338 { 339 if (globalRank>= sndServerGlobalRanks[i]) 340 { 341 if (i == sndServerGlobalRanks.size()-1) 342 { 343 myColor = sndServerGlobalRanks[i]; 344 } 345 else if (globalRank< sndServerGlobalRanks[i+1]) 346 { 347 myColor = sndServerGlobalRanks[i]; 348 break; 349 } 350 } 351 } 352 } 353 if (serverLevel != 2) myColor=0; 354 MPI_Comm_split(localComm, myColor, rank_, &intraComm) ; 355 } 356 } 357 120 358 string codesId=CXios::getin<string>("oasis_codes_id") ; 121 122 359 vector<string> splitted ; 123 360 boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ; … … 128 365 MPI_Comm_rank(CXios::globalComm,&globalRank); 129 366 367 // (2) Create interComms with models 130 368 for(it=splitted.begin();it!=splitted.end();it++) 131 369 { 132 370 oasis_get_intercomm(newComm,*it) ; 133 if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 134 MPI_Comm_remote_size(newComm,&size); 135 interComm.push_back(newComm) ; 136 } 137 oasis_enddef() ; 371 if ( serverLevel == 0 || serverLevel == 1) 372 { 373 interCommLeft.push_back(newComm) ; 374 if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 375 } 376 } 377 378 // (3) Create interComms between primary and secondary servers 379 int intraCommSize, intraCommRank ; 380 MPI_Comm_size(intraComm,&intraCommSize) ; 381 MPI_Comm_rank(intraComm, &intraCommRank) ; 382 383 if (serverLevel == 1) 384 { 385 for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 386 { 387 int srvSndLeader = sndServerGlobalRanks[i]; 388 info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize 389 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvSndLeader<<endl ; 390 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ; 391 interCommRight.push_back(newComm) ; 392 } 393 } 394 else if (serverLevel == 2) 395 { 396 info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize 397 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvGlobalRanks[0] <<endl ; 398 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ; 399 interCommLeft.push_back(newComm) ; 400 } 401 if (CXios::usingServer2) delete [] srvGlobalRanks ; 402 oasis_enddef() ; 138 403 } 139 */ 140 // int rank; 141 MPI_Comm_rank(intraComm, &rank) ;404 405 406 MPI_Comm_rank(intraComm, &rank) ; 142 407 if (rank==0) isRoot=true; 143 408 else isRoot=false; … … 154 419 for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++) 155 420 MPI_Comm_free(&(*it)); 156 for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++) 421 422 for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++) 157 423 MPI_Comm_free(&(*it)); 424 425 // for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++) 426 // MPI_Comm_free(&(*it)); 427 428 // for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) 429 // MPI_Comm_free(&(*it)); 430 431 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 432 MPI_Comm_free(&(*it)); 433 158 434 MPI_Comm_free(&intraComm); 159 435 … … 179 455 { 180 456 listenContext(); 457 listenRootContext(); 181 458 if (!finished) listenFinalize() ; 182 459 } … … 196 473 void CServer::listenFinalize(void) 197 474 { 198 list<MPI_Comm>::iterator it ;475 list<MPI_Comm>::iterator it, itr; 199 476 int msg ; 200 477 int flag ; 201 478 202 for(it=interComm .begin();it!=interComm.end();it++)479 for(it=interCommLeft.begin();it!=interCommLeft.end();it++) 203 480 { 204 481 MPI_Status status ; … … 210 487 MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; 211 488 info(20)<<" CServer : Receive client finalize"<<endl ; 489 // Sending server finalize message to secondary servers (if any) 490 for(itr=interCommRight.begin();itr!=interCommRight.end();itr++) 491 { 492 MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; 493 } 212 494 MPI_Comm_free(&(*it)); 213 interComm .erase(it) ;495 interCommLeft.erase(it) ; 214 496 break ; 215 497 } 216 498 } 217 499 218 if (interComm .empty())500 if (interCommLeft.empty()) 219 501 { 220 502 int i,size ; … … 300 582 void CServer::recvContextMessage(void* buff,int count) 301 583 { 302 static map<string,contextMessage> recvContextId 584 static map<string,contextMessage> recvContextId; 303 585 map<string,contextMessage>::iterator it ; 304 305 586 CBufferIn buffer(buff,count) ; 306 587 string id ; … … 325 606 int size ; 326 607 MPI_Comm_size(intraComm,&size) ; 327 MPI_Request* requests= new MPI_Request[size-1] ; 328 MPI_Status* status= new MPI_Status[size-1] ; 329 330 for(int i=1;i<size;i++) 331 { 332 MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ; 333 } 334 MPI_Waitall(size-1,requests,status) ; 335 registerContext(buff,count,it->second.leaderRank) ; 608 // MPI_Request* requests= new MPI_Request[size-1] ; 609 // MPI_Status* status= new MPI_Status[size-1] ; 610 MPI_Request* requests= new MPI_Request[size] ; 611 MPI_Status* status= new MPI_Status[size] ; 612 613 CMessage msg ; 614 msg<<id<<it->second.leaderRank; 615 int messageSize=msg.size() ; 616 void * sendBuff = new char[messageSize] ; 617 CBufferOut sendBuffer(sendBuff,messageSize) ; 618 sendBuffer<<msg ; 619 620 // Include root itself in order not to have a divergence 621 for(int i=0; i<size; i++) 622 { 623 MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ; 624 } 336 625 337 626 recvContextId.erase(it) ; … … 344 633 void CServer::listenRootContext(void) 345 634 { 346 347 635 MPI_Status status ; 348 636 int flag ; 349 static char* buffer ; 350 static MPI_Request request ; 351 static bool recept=false ; 637 static std::vector<void*> buffers; 638 static std::vector<MPI_Request> requests ; 639 static std::vector<int> counts ; 640 static std::vector<bool> isEventRegistered ; 641 static std::vector<bool> isEventQueued ; 642 MPI_Request request; 643 352 644 int rank ; 353 int count ;354 645 const int root=0 ; 355 356 if (recept==false) 357 { 358 traceOff() ; 359 MPI_Iprobe(root,2,intraComm, &flag, &status) ; 360 traceOn() ; 361 if (flag==true) 362 { 363 MPI_Get_count(&status,MPI_CHAR,&count) ; 364 buffer=new char[count] ; 365 MPI_Irecv((void*)buffer,count,MPI_CHAR,root,2,intraComm,&request) ; 366 recept=true ; 367 } 368 } 369 else 370 { 371 MPI_Test(&request,&flag,&status) ; 372 if (flag==true) 373 { 374 MPI_Get_count(&status,MPI_CHAR,&count) ; 375 registerContext((void*)buffer,count) ; 376 delete [] buffer ; 377 recept=false ; 378 } 379 } 646 boost::hash<string> hashString; 647 size_t hashId = hashString("RegisterContext"); 648 649 // (1) Receive context id from the root, save it into a buffer 650 traceOff() ; 651 MPI_Iprobe(root,2,intraComm, &flag, &status) ; 652 traceOn() ; 653 if (flag==true) 654 { 655 counts.push_back(0); 656 MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ; 657 buffers.push_back(new char[counts.back()]) ; 658 requests.push_back(request); 659 MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ; 660 isEventRegistered.push_back(false); 661 isEventQueued.push_back(false); 662 nbContexts++; 663 } 664 665 for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ ) 666 { 667 // (2) If context id is received, register an event 668 MPI_Test(&requests[ctxNb],&flag,&status) ; 669 if (flag==true && !isEventRegistered[ctxNb]) 670 { 671 eventScheduler->registerEvent(ctxNb,hashId); 672 isEventRegistered[ctxNb] = true; 673 } 674 // (3) If event has been scheduled, call register context 675 if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb]) 676 { 677 registerContext(buffers[ctxNb],counts[ctxNb]) ; 678 isEventQueued[ctxNb] = true; 679 delete [] buffers[ctxNb] ; 680 } 681 } 682 380 683 } 381 684 … … 384 687 string contextId; 385 688 CBufferIn buffer(buff, count); 386 buffer >> contextId; 689 // buffer >> contextId; 690 buffer >> contextId>>leaderRank; 691 CContext* context; 387 692 388 693 info(20) << "CServer : Register new Context : " << contextId << endl; … … 392 697 << "Context '" << contextId << "' has already been registred"); 393 698 394 MPI_Comm contextIntercomm; 395 //MPI_Barrier(CXios::globalComm); 396 397 MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm); 398 699 context=CContext::create(contextId); 700 contextList[contextId]=context; 701 702 // Primary or classical server: create communication channel with a client 703 // (1) create interComm (with a client) 704 // (2) initialize client and server (contextClient and contextServer) 399 705 MPI_Comm inter; 400 MPI_Intercomm_merge(contextIntercomm,1,&inter); 401 MPI_Barrier(inter); 402 403 CContext* context=CContext::create(contextId); 404 contextList[contextId]=context; 405 context->initServer(intraComm,contextIntercomm); 406 407 contextInterComms.push_back(contextIntercomm); 408 MPI_Comm_free(&inter); 706 if (serverLevel < 2) 707 { 708 MPI_Comm contextInterComm; 709 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); 710 MPI_Intercomm_merge(contextInterComm,1,&inter); 711 MPI_Barrier(inter); 712 MPI_Comm_free(&inter); 713 context->initServer(intraComm,contextInterComm); 714 contextInterComms.push_back(contextInterComm); 715 716 } 717 // Secondary server: create communication channel with a primary server 718 // (1) duplicate interComm with a primary server 719 // (2) initialize client and server (contextClient and contextServer) 720 // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create, 721 // because interComm of CContext is defined on the same processes as the interComm of CServer. 722 // So just duplicate it. 723 else if (serverLevel == 2) 724 { 725 MPI_Comm_dup(interCommLeft.front(), &inter); 726 contextInterComms.push_back(inter); 727 context->initServer(intraComm, contextInterComms.back()); 728 } 729 730 // Primary server: 731 // (1) send create context message to secondary servers 732 // (2) initialize communication channels with secondary servers (create contextClient and contextServer) 733 if (serverLevel == 1) 734 { 735 int i = 0, size; 736 MPI_Comm_size(intraComm, &size) ; 737 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) 738 { 739 StdString str = contextId +"_server_" + boost::lexical_cast<string>(i); 740 CMessage msg; 741 int messageSize; 742 msg<<str<<size<<rank_ ; 743 messageSize = msg.size() ; 744 buff = new char[messageSize] ; 745 CBufferOut buffer(buff,messageSize) ; 746 buffer<<msg ; 747 MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ; 748 MPI_Comm_dup(*it, &inter); 749 contextInterComms.push_back(inter); 750 MPI_Comm_dup(intraComm, &inter); 751 contextIntraComms.push_back(inter); 752 context->initClient(contextIntraComms.back(), contextInterComms.back()) ; 753 delete [] buff ; 754 } 755 } 409 756 } 410 757 411 void CServer::contextEventLoop( void)412 { 413 bool finished ;758 void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/) 759 { 760 bool isFinalized ; 414 761 map<string,CContext*>::iterator it ; 762 415 763 for(it=contextList.begin();it!=contextList.end();it++) 416 764 { 417 finished=it->second->checkBuffersAndListen();418 if ( finished)765 isFinalized=it->second->isFinalized(); 766 if (isFinalized) 419 767 { 420 768 contextList.erase(it) ; 421 769 break ; 422 770 } 423 } 424 771 else 772 it->second->checkBuffersAndListen(enableEventsProcessing); 773 } 425 774 } 426 775 427 //! Get rank of the current process 776 //! Get rank of the current process in the intraComm 428 777 int CServer::getRank() 429 778 { 779 int rank; 780 MPI_Comm_rank(intraComm,&rank); 430 781 return rank; 782 } 783 784 vector<int>& CServer::getSecondaryServerGlobalRanks() 785 { 786 return sndServerGlobalRanks; 431 787 } 432 788 … … 444 800 int numDigit = 0; 445 801 int size = 0; 802 int id; 446 803 MPI_Comm_size(CXios::globalComm, &size); 447 804 while (size) … … 450 807 ++numDigit; 451 808 } 452 453 fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext; 809 id = rank_; //getRank(); 810 811 fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; 454 812 fb->open(fileNameClient.str().c_str(), std::ios::out); 455 813 if (!fb->is_open())
Note: See TracChangeset
for help on using the changeset viewer.