Changeset 490 for XIOS/trunk/src/server.cpp
- Timestamp:
- 09/26/14 14:52:04 (10 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/server.cpp
r483 r490 1 #include "globalScopeData.hpp" 1 2 #include "xmlioserver_spl.hpp" 2 3 #include "cxios.hpp" … … 13 14 14 15 namespace xios 15 { 16 { 16 17 MPI_Comm CServer::intraComm ; 17 18 list<MPI_Comm> CServer::interComm ; 18 19 bool CServer::isRoot ; 19 int CServer::rank ; 20 map<string,CContext*> CServer::contextList ; 20 int CServer::rank = INVALID_RANK; 21 StdOFStream CServer::m_infoStream; 22 map<string,CContext*> CServer::contextList ; 21 23 bool CServer::finished=false ; 22 24 bool CServer::is_MPI_Initialized ; 23 25 24 26 void CServer::initialize(void) 25 27 { … … 28 30 if (initialized) is_MPI_Initialized=true ; 29 31 else is_MPI_Initialized=false ; 30 32 31 33 // Not using OASIS 32 34 if (!CXios::usingOasis) 33 35 { 34 35 if (!is_MPI_Initialized) 36 37 if (!is_MPI_Initialized) 36 38 { 37 39 int argc=0; … … 40 42 } 41 43 CTimer::get("XIOS").resume() ; 42 43 boost::hash<string> hashString ; 44 44 45 boost::hash<string> hashString ; 46 45 47 unsigned long hashServer=hashString(CXios::xiosCodeId) ; 46 48 unsigned long* hashAll ; 47 48 int rank ;49 50 // int rank ; 49 51 int size ; 50 52 int myColor ; 51 53 int i,c ; 52 54 MPI_Comm newComm ; 53 55 54 56 MPI_Comm_size(CXios::globalComm,&size) ; 55 57 MPI_Comm_rank(CXios::globalComm,&rank); 56 58 hashAll=new unsigned long[size] ; 57 59 58 60 MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ; 59 61 … … 61 63 map<unsigned long, int> leaders ; 62 64 map<unsigned long, int>::iterator it ; 63 65 64 66 for(i=0,c=0;i<size;i++) 65 67 { … … 71 73 } 72 74 } 73 75 74 76 myColor=colors[hashServer] ; 75 77 MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ; … … 77 79 int serverLeader=leaders[hashServer] ; 78 80 int clientLeader; 79 81 80 82 serverLeader=leaders[hashServer] ; 81 83 for(it=leaders.begin();it!=leaders.end();it++) … … 84 86 { 85 87 clientLeader=it->second ; 86 88 87 89 MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ; 88 90 interComm.push_back(newComm) ; … … 95 97 else 96 98 { 97 int rank ,size; 98 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId) ; 99 // int rank ,size; 100 int size; 101 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 102 99 103 CTimer::get("XIOS").resume() ; 100 104 oasis_get_localcomm(intraComm) ; … … 102 106 MPI_Comm_size(intraComm,&size) ; 103 107 string codesId=CXios::getin<string>("oasis_codes_id") ; 104 108 105 109 vector<string> splitted ; 106 110 boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ; … … 110 114 int globalRank ; 111 115 MPI_Comm_rank(CXios::globalComm,&globalRank); 112 116 113 117 for(it=splitted.begin();it!=splitted.end();it++) 114 118 { … … 120 124 oasis_enddef() ; 121 125 } 122 123 int rank;126 127 // int rank; 124 128 MPI_Comm_rank(intraComm,&rank) ; 125 129 if (rank==0) isRoot=true; 126 else isRoot=false; 127 eventLoop() ;128 finalize() ;130 else isRoot=false; 131 // eventLoop() ; 132 // finalize() ; 129 133 } 130 134 131 135 void CServer::finalize(void) 132 136 { 133 137 CTimer::get("XIOS").suspend() ; 134 138 if (!is_MPI_Initialized) 135 { 139 { 136 140 if (CXios::usingOasis) oasis_finalize(); 137 141 else MPI_Finalize() ; … … 141 145 report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl ; 142 146 } 143 147 144 148 void CServer::eventLoop(void) 145 149 { 146 150 bool stop=false ; 147 151 148 152 CTimer::get("XIOS server").resume() ; 149 153 while(!stop) … … 159 163 if (!finished) listenRootFinalize() ; 160 164 } 161 165 162 166 contextEventLoop() ; 163 167 if (finished && contextList.empty()) stop=true ; … … 165 169 CTimer::get("XIOS server").suspend() ; 166 170 } 167 171 168 172 void CServer::listenFinalize(void) 169 173 { … … 171 175 int msg ; 172 176 int flag ; 173 177 174 178 for(it=interComm.begin();it!=interComm.end();it++) 175 179 { … … 186 190 } 187 191 } 188 192 189 193 if (interComm.empty()) 190 194 { … … 193 197 MPI_Request* requests= new MPI_Request[size-1] ; 194 198 MPI_Status* status= new MPI_Status[size-1] ; 195 199 196 200 for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ; 197 201 MPI_Waitall(size-1,requests,status) ; … … 202 206 } 203 207 } 204 205 208 209 206 210 void CServer::listenRootFinalize() 207 211 { … … 209 213 MPI_Status status ; 210 214 int msg ; 211 215 212 216 traceOff() ; 213 217 MPI_Iprobe(0,4,intraComm, &flag, &status) ; … … 219 223 } 220 224 } 221 225 222 226 void CServer::listenContext(void) 223 227 { 224 228 225 229 MPI_Status status ; 226 230 int flag ; … … 230 234 int rank ; 231 235 int count ; 232 236 233 237 if (recept==false) 234 238 { … … 236 240 MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ; 237 241 traceOn() ; 238 if (flag==true) 242 if (flag==true) 239 243 { 240 244 rank=status.MPI_SOURCE ; … … 242 246 buffer=new char[count] ; 243 247 MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ; 244 recept=true ; 248 recept=true ; 245 249 } 246 250 } … … 256 260 recvContextMessage(buffer,count) ; 257 261 delete [] buffer ; 258 recept=false ; 259 } 260 } 261 } 262 262 recept=false ; 263 } 264 } 265 } 266 263 267 void CServer::recvContextMessage(void* buff,int count) 264 268 { 265 266 267 269 static map<string,contextMessage> recvContextId ; 268 270 map<string,contextMessage>::iterator it ; 269 271 270 272 CBufferIn buffer(buff,count) ; 271 273 string id ; … … 274 276 275 277 buffer>>id>>nbMessage>>clientLeader ; 276 278 277 279 it=recvContextId.find(id) ; 278 280 if (it==recvContextId.end()) 279 { 281 { 280 282 contextMessage msg={0,0} ; 281 283 pair<map<string,contextMessage>::iterator,bool> ret ; 282 284 ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ; 283 285 it=ret.first ; 284 } 286 } 285 287 it->second.nbRecv+=1 ; 286 288 it->second.leaderRank+=clientLeader ; 287 289 288 290 if (it->second.nbRecv==nbMessage) 289 { 291 { 290 292 int size ; 291 293 MPI_Comm_size(intraComm,&size) ; 292 294 MPI_Request* requests= new MPI_Request[size-1] ; 293 295 MPI_Status* status= new MPI_Status[size-1] ; 294 296 295 297 for(int i=1;i<size;i++) 296 298 { … … 305 307 306 308 } 307 } 308 309 } 310 309 311 void CServer::listenRootContext(void) 310 312 { 311 313 312 314 MPI_Status status ; 313 315 int flag ; … … 318 320 int count ; 319 321 const int root=0 ; 320 322 321 323 if (recept==false) 322 324 { … … 324 326 MPI_Iprobe(root,2,intraComm, &flag, &status) ; 325 327 traceOn() ; 326 if (flag==true) 328 if (flag==true) 327 329 { 328 330 MPI_Get_count(&status,MPI_CHAR,&count) ; 329 331 buffer=new char[count] ; 330 332 MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ; 331 recept=true ; 333 recept=true ; 332 334 } 333 335 } … … 340 342 registerContext(buffer,count) ; 341 343 delete [] buffer ; 342 recept=false ; 343 } 344 } 345 } 346 347 348 344 recept=false ; 345 } 346 } 347 } 348 349 350 349 351 void CServer::registerContext(void* buff,int count, int leaderRank) 350 352 { 351 353 352 354 string contextId; 353 355 CBufferIn buffer(buff,count) ; … … 356 358 MPI_Comm contextIntercomm ; 357 359 MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ; 358 360 359 361 info(20)<<"CServer : Register new Context : "<<contextId<<endl ; 360 362 MPI_Comm inter ; 361 363 MPI_Intercomm_merge(contextIntercomm,1,&inter) ; 362 364 MPI_Barrier(inter) ; 363 if (contextList.find(contextId)!=contextList.end()) 365 if (contextList.find(contextId)!=contextList.end()) 364 366 ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)", 365 367 <<"Context has already been registred") ; 366 368 367 369 CContext* context=CContext::create(contextId) ; 368 370 contextList[contextId]=context ; 369 371 context->initServer(intraComm,contextIntercomm) ; 370 371 } 372 373 372 373 } 374 375 374 376 void CServer::contextEventLoop(void) 375 377 { 376 378 bool finished ; 377 379 map<string,CContext*>::iterator it ; 378 for(it=contextList.begin();it!=contextList.end();it++) 380 for(it=contextList.begin();it!=contextList.end();it++) 379 381 { 380 382 finished=it->second->eventLoop() ; … … 385 387 } 386 388 } 387 388 } 389 389 390 } 391 392 //! Get rank of the current process 393 int CServer::getRank() 394 { 395 return rank; 396 } 397 398 /*! 399 * \brief Open file stream to write in 400 * Opening a file stream with a specific file name suffix-server+rank 401 * \param [in] protype file name 402 */ 403 void CServer::openInfoStream(const StdString& fileName) 404 { 405 std::filebuf* fb = m_infoStream.rdbuf(); 406 StdStringStream fileNameServer; 407 fileNameServer << fileName <<"_server_"<<getRank() << ".txt"; 408 fb->open(fileNameServer.str().c_str(), std::ios::out); 409 if (!fb->is_open()) 410 ERROR("void CServer::openInfoStream(const StdString& fileName)", 411 <<endl<< "Can not open <"<<fileNameServer<<"> file to write" ); 412 413 info.write2File(fb); 414 } 415 416 //! Open stream for standard output 417 void CServer::openInfoStream() 418 { 419 info.write2StdOut(); 420 } 421 422 //! Close opening stream 423 void CServer::closeInfoStream() 424 { 425 if (m_infoStream.is_open()) m_infoStream.close(); 426 } 427 390 428 }
Note: See TracChangeset
for help on using the changeset viewer.