Changeset 1054 for XIOS/dev/dev_olga/src
- Timestamp:
- 02/17/17 19:51:36 (7 years ago)
- Location:
- XIOS/dev/dev_olga/src
- Files:
-
- 2 deleted
- 26 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/client.cpp
r1021 r1054 162 162 } 163 163 164 void CClient::initializeClientOnServer(const int rank, const MPI_Comm& intraCommPrmSrv, const int srvSndLeader)165 {166 MPI_Comm_dup(intraCommPrmSrv, &intraComm) ;167 serverLeader.push_back(srvSndLeader);168 int intraCommSize, intraCommRank ;169 MPI_Comm_size(intraComm,&intraCommSize) ;170 MPI_Comm_rank(intraComm,&intraCommRank) ;171 info(50)<<"intercommCreate::client "<<rank<<" intraCommSize : "<<intraCommSize172 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvSndLeader<<endl ;173 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &interComm) ;174 }175 176 177 164 ///--------------------------------------------------------------- 178 165 /*! … … 257 244 * The only difference with CClient::registerContext() is naming of contexts on servers (appearing of pool id at the end). 258 245 */ 259 void CClient::registerContextByClienOfServer(const string& id, MPI_Comm contextComm)260 {261 CContext::setCurrent(id) ;262 CContext* context=CContext::create(id);263 StdString idServer(id);264 idServer += "_server_";265 266 int size,rank,globalRank ;267 size_t message_size ;268 int leaderRank ;269 MPI_Comm contextInterComm ;270 271 MPI_Comm_size(contextComm,&size) ;272 MPI_Comm_rank(contextComm,&rank) ;273 MPI_Comm_rank(CXios::globalComm,&globalRank) ;274 if (rank!=0) globalRank=0 ;275 276 CMessage msg ;277 278 int messageSize ;279 void * buff ;280 281 for (int i = 0; i < serverLeader.size(); ++i)282 {283 StdString str = idServer + boost::lexical_cast<string>(i);284 msg<<str<<size<<globalRank ;285 messageSize = msg.size() ;286 buff = new char[messageSize] ;287 CBufferOut buffer(buff,messageSize) ;288 buffer<<msg ;289 290 MPI_Send(buff, buffer.count(), MPI_CHAR, serverLeader[i], 1, CXios::globalComm) ;291 MPI_Intercomm_create(contextComm, 0, CXios::globalComm, serverLeader[i], 10+globalRank, &contextInterComm) ;292 info(10)<<"Register new Context : "<<id<<endl ;293 MPI_Comm inter ;294 MPI_Intercomm_merge(contextInterComm,0,&inter) ;295 MPI_Barrier(inter) ;296 297 context->initClient(contextComm,contextInterComm) ;298 299 contextInterComms.push_back(contextInterComm);300 MPI_Comm_free(&inter);301 delete [] buff ;302 }303 }246 // void CClient::registerContextByClientOfServer(const string& id, MPI_Comm contextComm) 247 // { 248 // CContext::setCurrent(id) ; 249 // CContext* context=CContext::create(id); 250 // StdString idServer(id); 251 // idServer += "_server_"; 252 // 253 // int size,rank,globalRank ; 254 // size_t message_size ; 255 // int leaderRank ; 256 // MPI_Comm contextInterComm ; 257 // 258 // MPI_Comm_size(contextComm,&size) ; 259 // MPI_Comm_rank(contextComm,&rank) ; 260 // MPI_Comm_rank(CXios::globalComm,&globalRank) ; 261 // if (rank!=0) globalRank=0 ; 262 // 263 // CMessage msg ; 264 // 265 // int messageSize ; 266 // void * buff ; 267 // 268 // for (int i = 0; i < serverLeader.size(); ++i) 269 // { 270 // StdString str = idServer + boost::lexical_cast<string>(i); 271 // msg<<str<<size<<globalRank ; 272 // messageSize = msg.size() ; 273 // buff = new char[messageSize] ; 274 // CBufferOut buffer(buff,messageSize) ; 275 // buffer<<msg ; 276 // 277 // MPI_Send(buff, buffer.count(), MPI_CHAR, serverLeader[i], 1, CXios::globalComm) ; 278 // MPI_Intercomm_create(contextComm, 0, CXios::globalComm, serverLeader[i], 10+globalRank, &contextInterComm) ; 279 // info(10)<<"Register new Context : "<<id<<endl ; 280 // MPI_Comm inter ; 281 // MPI_Intercomm_merge(contextInterComm,0,&inter) ; 282 // MPI_Barrier(inter) ; 283 // 284 // context->initClient(contextComm,contextInterComm) ; 285 // 286 //// contextInterComms.push_back(contextInterComm); 287 // MPI_Comm_free(&inter); 288 // delete [] buff ; 289 // } 290 // } 304 291 305 292 void CClient::finalize(void) -
XIOS/dev/dev_olga/src/client.hpp
r1021 r1054 11 11 public: 12 12 static void initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm); 13 static void initializeClientOnServer(const int rank, const MPI_Comm& localComm, const int srvSndLeader);14 13 15 14 static void finalize(void); 16 15 static void registerContext(const string& id, MPI_Comm contextComm); 17 static void registerContextByClienOfServer(const string& id, MPI_Comm contextComm);16 // static void registerContextByClientOfServer(const string& id, MPI_Comm contextComm); 18 17 19 18 static MPI_Comm intraComm; -
XIOS/dev/dev_olga/src/client_server_mapping_distributed.cpp
r907 r1054 39 39 { 40 40 CContext* context=CContext::getCurrent() ; 41 CContextClient* client=context->client ; 41 // CContextClient* client=context->client ; 42 // For now the assumption is that secondary server pools consist of the same number of procs. 43 // CHANGE the line below if the assumption changes. 44 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client; 42 45 int nbServer=client->serverSize; 43 46 -
XIOS/dev/dev_olga/src/context_client.cpp
r1021 r1054 84 84 { 85 85 list<int> ranks = event.getRanks(); 86 86 87 if (!event.isEmpty()) 87 88 { 88 89 list<int> sizes = event.getSizes(); 89 90 90 list<CBufferOut*> buffList = getBuffers(ranks, sizes); 91 92 event.send(timeLine, sizes, buffList); 93 94 checkBuffers(ranks); 95 } 96 97 if (isAttachedModeEnabled()) 98 { 99 waitEvent(ranks); 100 CContext::setCurrent(context->getId()); 91 // We force the getBuffers call to be non-blocking on the servers 92 list<CBufferOut*> buffList; 93 bool couldBuffer = getBuffers(ranks, sizes, buffList, !CXios::isClient); 94 // bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer); 95 96 if (couldBuffer) 97 { 98 event.send(timeLine, sizes, buffList); 99 100 checkBuffers(ranks); 101 102 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 103 { 104 waitEvent(ranks); 105 CContext::setCurrent(context->getId()); 106 } 107 } 108 else 109 { 110 tmpBufferedEvent.ranks = ranks; 111 tmpBufferedEvent.sizes = sizes; 112 113 for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 114 tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 115 116 event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 117 } 101 118 } 102 119 103 120 timeLine++; 121 } 122 123 /*! 124 * Send the temporarily buffered event (if any). 125 * 126 * \return true if a temporarily buffered event could be sent, false otherwise 127 */ 128 bool CContextClient::sendTemporarilyBufferedEvent() 129 { 130 bool couldSendTmpBufferedEvent = false; 131 132 if (hasTemporarilyBufferedEvent()) 133 { 134 list<CBufferOut*> buffList; 135 if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call 136 { 137 list<CBufferOut*>::iterator it, itBuffer; 138 139 for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++) 140 (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 141 142 checkBuffers(tmpBufferedEvent.ranks); 143 144 tmpBufferedEvent.clear(); 145 146 couldSendTmpBufferedEvent = true; 147 } 148 } 149 150 return couldSendTmpBufferedEvent; 104 151 } 105 152 … … 124 171 } 125 172 126 /*! 127 Setup buffer for each connection to server and verify their state to put content into them 128 \param [in] serverList list of rank of connected server 129 \param [in] sizeList size of message corresponding to each connection 130 \return List of buffer input which event can be placed 131 */ 132 list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList) 133 { 134 list<int>::iterator itServer, itSize; 173 174 /*! 175 * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless 176 * it is explicitly requested to be non-blocking. 177 * 178 * \param [in] serverList list of rank of connected server 179 * \param [in] sizeList size of message corresponding to each connection 180 * \param [out] retBuffers list of buffers that can be used to store an event 181 * \param [in] nonBlocking whether this function should be non-blocking 182 * \return whether the already allocated buffers could be used 183 */ 184 bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking /*= false*/) 185 { 186 list<int>::const_iterator itServer, itSize; 135 187 list<CClientBuffer*> bufferList; 136 map<int,CClientBuffer*>:: iterator it;188 map<int,CClientBuffer*>::const_iterator it; 137 189 list<CClientBuffer*>::iterator itBuffer; 138 list<CBufferOut*> retBuffer;139 190 bool areBuffersFree; 140 191 … … 160 211 { 161 212 checkBuffers(); 162 context->server->listen(); 163 } 164 } while (!areBuffersFree); 213 // if (?) 214 // { 215 // for (int i = 0; i < context->serverPrimServer.size(); ++i) 216 // context->serverPrimServer[i]->listen(); 217 // } 218 // else 219 context->server->listen(); 220 } 221 } while (!areBuffersFree && !nonBlocking); 165 222 CTimer::get("Blocking time").suspend(); 166 223 167 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 168 { 169 retBuffer.push_back((*itBuffer)->getBuffer(*itSize)); 170 } 171 return retBuffer; 224 if (areBuffersFree) 225 { 226 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 227 retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 228 } 229 230 return areBuffersFree; 172 231 } 173 232 … … 299 358 Finalize context client and do some reports 300 359 */ 301 void CContextClient::finalize(void) 302 { 303 map<int,CClientBuffer*>::iterator itBuff; 304 bool stop = true; 305 306 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 307 if (isServerLeader()) 308 { 309 CMessage msg; 310 const std::list<int>& ranks = getRanksServerLeader(); 311 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 312 event.push(*itRank, 1, msg); 313 sendEvent(event); 314 } 315 else sendEvent(event); 316 317 CTimer::get("Blocking time").resume(); 318 while (stop) 319 { 320 checkBuffers(); 321 stop = false; 322 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest(); 323 } 324 CTimer::get("Blocking time").suspend(); 325 326 std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 327 iteMap = mapBufferSize_.end(), itMap; 328 StdSize totalBuf = 0; 329 for (itMap = itbMap; itMap != iteMap; ++itMap) 330 { 331 report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 332 << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 333 totalBuf += itMap->second; 334 } 335 report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 336 337 releaseBuffers(); 338 } 360 void CContextClient::finalize(void) 361 { 362 map<int,CClientBuffer*>::iterator itBuff; 363 bool stop = false; 364 365 CTimer::get("Blocking time").resume(); 366 while (hasTemporarilyBufferedEvent()) 367 { 368 checkBuffers(); 369 sendTemporarilyBufferedEvent(); 370 } 371 CTimer::get("Blocking time").suspend(); 372 373 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 374 if (isServerLeader()) 375 { 376 CMessage msg; 377 const std::list<int>& ranks = getRanksServerLeader(); 378 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 379 event.push(*itRank, 1, msg); 380 sendEvent(event); 381 } 382 else sendEvent(event); 383 384 CTimer::get("Blocking time").resume(); 385 while (!stop) 386 { 387 checkBuffers(); 388 if (hasTemporarilyBufferedEvent()) 389 sendTemporarilyBufferedEvent(); 390 391 stop = true; 392 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 393 } 394 CTimer::get("Blocking time").suspend(); 395 396 std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 397 iteMap = mapBufferSize_.end(), itMap; 398 StdSize totalBuf = 0; 399 for (itMap = itbMap; itMap != iteMap; ++itMap) 400 { 401 report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 402 << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 403 totalBuf += itMap->second; 404 } 405 report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 406 407 releaseBuffers(); 408 } 339 409 } -
XIOS/dev/dev_olga/src/context_client.hpp
r1021 r1054 31 31 // Send event to server 32 32 void sendEvent(CEventClient& event); 33 bool sendTemporarilyBufferedEvent(); 33 34 void waitEvent(list<int>& ranks); 34 35 35 // Functions relatesto set/get buffers36 list<CBufferOut*> getBuffers(list<int>& serverlist, list<int>& sizeList);36 // Functions to set/get buffers 37 bool getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 37 38 void newBuffer(int rank); 38 39 bool checkBuffers(list<int>& ranks); … … 46 47 47 48 bool isAttachedModeEnabled() const; 49 bool hasTemporarilyBufferedEvent() const { return !tmpBufferedEvent.isEmpty(); }; 48 50 49 51 // Close and finalize context client 50 void closeContext(void); 52 // void closeContext(void); Never been implemented. 51 53 void finalize(void); 52 54 … … 68 70 MPI_Comm intraComm; //!< Communicator of client group 69 71 70 map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers72 map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers 71 73 72 74 private: … … 75 77 //! Maximum number of events that can be buffered 76 78 StdSize maxBufferedEvents; 79 80 struct { 81 std::list<int> ranks, sizes; 82 std::list<CBufferOut*> buffers; 83 84 bool isEmpty() const { return ranks.empty(); }; 85 void clear() { 86 ranks.clear(); 87 sizes.clear(); 88 89 for (std::list<CBufferOut*>::iterator it = buffers.begin(); it != buffers.end(); it++) 90 delete *it; 91 92 buffers.clear(); 93 }; 94 } tmpBufferedEvent; //! Event temporarily buffered (used only on the server) 77 95 78 96 //! Context for server (Only used in attached mode) -
XIOS/dev/dev_olga/src/context_server.cpp
r1021 r1054 22 22 namespace xios 23 23 { 24 StdSize CContextServer::totalBuf_ = 0; 24 25 25 26 CContextServer::CContextServer(CContext* parent, MPI_Comm intraComm_,MPI_Comm interComm_) … … 29 30 MPI_Comm_size(intraComm,&intraCommSize); 30 31 MPI_Comm_rank(intraComm,&intraCommRank); 32 31 33 interComm=interComm_; 32 34 int flag; … … 38 40 scheduled=false; 39 41 finished=false; 40 41 42 boost::hash<string> hashString; 42 43 hashId=hashString(context->getId()); 43 44 } 45 46 // CContextServer::CContextServer(CContext* parent, int srvLvl, MPI_Comm intraComm_,MPI_Comm interComm_) 47 // { 48 // context=parent; 49 // intraComm=intraComm_; 50 // MPI_Comm_size(intraComm,&intraCommSize); 51 // MPI_Comm_rank(intraComm,&intraCommRank); 52 // interComm=interComm_; 53 // int flag; 54 // MPI_Comm_test_inter(interComm,&flag); 55 // if (flag) MPI_Comm_remote_size(interComm,&commSize); 56 // else MPI_Comm_size(interComm,&commSize); 57 // 58 // currentTimeLine=0; 59 // scheduled=false; 60 // finished=false; 61 // 62 // boost::hash<string> hashString; 63 // StdString contextId = context->getId(); 64 // hashId=hashString(contextId); 65 // 66 // } 44 } 45 67 46 void CContextServer::setPendingEvent(void) 68 47 { … … 70 49 } 71 50 51 72 52 bool CContextServer::hasPendingEvent(void) 73 53 { … … 80 60 } 81 61 82 bool CContextServer::eventLoop( void)62 bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 83 63 { 84 64 listen(); 85 65 checkPendingRequest(); 86 processEvents(); 66 if (enableEventsProcessing) 67 processEvents(); 87 68 return finished; 88 69 } … … 188 169 map<size_t,CEventServer*>::iterator it; 189 170 CEventServer* event; 171 boost::hash<string> hashString; 172 size_t hashId=hashString(context->getId()); 190 173 191 174 it=events.find(currentTimeLine); … … 224 207 { 225 208 map<int,CServerBuffer*>::iterator it; 226 for(it=buffers.begin();it!=buffers.end();++it) delete it->second;227 }228 209 for(it=buffers.begin();it!=buffers.end();++it) 210 delete it->second; 211 } 229 212 230 213 void CContextServer::dispatchEvent(CEventServer& event) … … 235 218 int rank; 236 219 list<CEventServer::SSubEvent>::iterator it; 237 CContext::setCurrent(context->getId()); 220 // CContext::setCurrent(context->getId()); 221 StdString ctxId = context->getId(); 222 CContext::setCurrent(ctxId); 238 223 239 224 if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE) … … 243 228 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 244 229 iteMap = mapBufferSize_.end(), itMap; 245 StdSize totalBuf = 0;246 230 for (itMap = itbMap; itMap != iteMap; ++itMap) 247 231 { 248 report(10)<< " Memory report : Context <"<<context->getId()<<"> : server side : memory used for buffer of each connection to client" << endl 249 << " +) With client of rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 250 totalBuf += itMap->second; 232 rank = itMap->first; 233 report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl 234 << " +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl; 235 totalBuf_ += itMap->second; 251 236 } 252 237 context->finalize(); 253 report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl; 238 239 // report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl; 254 240 } 255 241 else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event); … … 274 260 } 275 261 } 262 263 size_t CContextServer::getTotalBuf(void) 264 { 265 return totalBuf_; 266 } 267 276 268 } -
XIOS/dev/dev_olga/src/context_server.hpp
r992 r1054 15 15 16 16 CContextServer(CContext* parent, MPI_Comm intraComm, MPI_Comm interComm) ; 17 CContextServer(CContext* parent, int srvLvl, MPI_Comm intraComm, MPI_Comm interComm) ; 18 bool eventLoop( void);17 18 bool eventLoop(bool enableEventsProcessing = true); 19 19 void listen(void) ; 20 20 void checkPendingRequest(void) ; 21 void processRequest(int rank, char* buff,int count) ;22 21 void processEvents(void) ; 22 bool hasFinished(void); 23 23 void dispatchEvent(CEventServer& event) ; 24 24 void setPendingEvent(void) ; 25 25 bool hasPendingEvent(void) ; 26 bool hasFinished(void); 26 27 void processRequest(int rank, char* buff,int count) ; 27 28 28 29 MPI_Comm intraComm ; … … 45 46 bool scheduled ; /*!< event of current timeline is alreading scheduled ? */ 46 47 size_t hashId ; 48 49 static size_t getTotalBuf(void); 50 47 51 ~CContextServer() ; 48 52 49 53 private: 50 std::map<int, StdSize> mapBufferSize_; 54 std::map<int, StdSize> mapBufferSize_; 55 static size_t totalBuf_ ; /*!< Total memory allocated by servers per context.*/ 51 56 52 57 } ; -
XIOS/dev/dev_olga/src/cxios.cpp
r1021 r1054 136 136 137 137 //! Initialize server then put it into listening state 138 void CXios::initServerSide( int serverLvl)138 void CXios::initServerSide(void) 139 139 { 140 140 initServer(); 141 142 // if (serverLvl == 1)143 // isClient = true;144 // else145 // isClient = false;146 //147 // isServer = true;148 // serverLevel = serverLvl;149 150 151 141 152 142 // Initialize all aspects MPI … … 163 153 { 164 154 if (CServer::serverLevel == 0) 165 // if (CXios::serverLevel == 0)166 155 { 167 156 CServer::openInfoStream(serverFile); … … 169 158 } 170 159 else if (CServer::serverLevel == 1) 171 // else if (CXios::serverLevel == 1)172 160 { 173 161 CServer::openInfoStream(serverPrmFile); -
XIOS/dev/dev_olga/src/cxios.hpp
r1021 r1054 16 16 static void initialize(void) ; 17 17 static void initClientSide(const string & codeId, MPI_Comm& localComm, MPI_Comm& returnComm) ; 18 static void initServerSide( int serverLevel) ;18 static void initServerSide(void) ; 19 19 static void clientFinalize(void) ; 20 20 static void parseFile(const string& filename) ; -
XIOS/dev/dev_olga/src/distribution_server.hpp
r1025 r1054 41 41 const std::vector<int>& getZoomSizeServer() const; 42 42 const GlobalLocalMap& getGlobalLocalIndex() const { return globalLocalIndexMap_; } 43 const std::vector<CArray<int,1> >& getGlobalIndexEachDimension() const ;43 const std::vector<CArray<int,1> >& getGlobalIndexEachDimension() const {return globalIndexEachDimension_;} 44 44 45 45 virtual CArray<size_t,1> computeLocalIndex(const CArray<size_t,1>& globalIndex); -
XIOS/dev/dev_olga/src/interface/c/icdata.cpp
r983 r1054 47 47 } 48 48 49 void cxios_init_server( int server_level)50 { 51 CXios::initServerSide( server_level);49 void cxios_init_server(void) 50 { 51 CXios::initServerSide(); 52 52 } 53 53 -
XIOS/dev/dev_olga/src/interface/fortran/idata.F90
r983 r1054 7 7 INTERFACE ! Ne pas appeler directement/Interface FORTRAN 2003 <-> C99 8 8 9 SUBROUTINE cxios_init_server(server_level) BIND(C) 10 USE ISO_C_BINDING 11 INTEGER (kind = C_INT) , VALUE :: server_level 9 SUBROUTINE cxios_init_server() BIND(C) 10 USE ISO_C_BINDING 12 11 END SUBROUTINE cxios_init_server 13 12 … … 453 452 CONTAINS ! Fonctions disponibles pour les utilisateurs. 454 453 455 SUBROUTINE xios(init_server)(server_level) 456 IMPLICIT NONE 457 INTEGER (kind = C_INT), INTENT(IN) :: server_level 458 CALL cxios_init_server(server_level) 454 SUBROUTINE xios(init_server)() 455 IMPLICIT NONE 456 CALL cxios_init_server() 459 457 END SUBROUTINE xios(init_server) 460 458 -
XIOS/dev/dev_olga/src/io/nc4_data_output.cpp
r1030 r1054 231 231 SuperClassWriter::writeData(CArray<double,1>(lat.copy()), latid, isCollective, 0); 232 232 CArray<double,1> lon = domain->lonvalue(Range(0,local_size_write[1])) ; 233 // CArray<double,1> lon = domain->lonvalue(Range(0,local_size_write[1]-1)) ; 233 234 SuperClassWriter::writeData(CArray<double,1>(lon.copy()), lonid, isCollective, 0); 234 235 break; -
XIOS/dev/dev_olga/src/node/axis.cpp
r1030 r1054 174 174 std::map<int, StdSize> CAxis::getAttributesBufferSize() 175 175 { 176 CContextClient* client = CContext::getCurrent()->client; 176 // CContextClient* client = CContext::getCurrent()->client; 177 // For now the assumption is that secondary server pools consist of the same number of procs. 178 // CHANGE the line below if the assumption changes. 179 CContext* context = CContext::getCurrent(); 180 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client; 177 181 178 182 std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes(); … … 455 459 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 456 460 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 457 for (int i = 0; i < nbSrvPools; ++i)458 { 459 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[ i] : context->client;461 for (int p = 0; p < nbSrvPools; ++p) 462 { 463 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client; 460 464 int nbServer = client->serverSize; 461 465 int range, clientSize = client->clientSize; … … 751 755 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 752 756 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 753 for (int i = 0; i < nbSrvPools; ++i)754 { 755 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[ i] : context->client;757 for (int p = 0; p < nbSrvPools; ++p) 758 { 759 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client; 756 760 757 761 CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES); … … 857 861 //int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 858 862 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 859 for (int i = 0; i < nbSrvPools; ++i)860 { 861 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[ i] : context->client;863 for (int p = 0; p < nbSrvPools; ++p) 864 { 865 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client; 862 866 863 867 CEventClient eventData(getType(), EVENT_ID_DISTRIBUTED_ATTRIBUTES); -
XIOS/dev/dev_olga/src/node/context.cpp
r1030 r1054 249 249 250 250 hasClient = true; 251 251 252 if (CServer::serverLevel != 1) 252 // if (CXios::serverLevel != 1) 253 // initClient is called by client pool 253 // initClient is called by client 254 254 { 255 255 client = new CContextClient(this, intraComm, interComm, cxtServer); 256 256 server = new CContextServer(this, intraComm, interComm); 257 } 257 MPI_Comm intraCommServer, interCommServer; 258 if (cxtServer) // Attached mode 259 { 260 intraCommServer = intraComm; 261 interCommServer = interComm; 262 } 263 else 264 { 265 // MPI_Comm_dup(intraComm, &intraCommServer); 266 // comms.push_back(intraCommServer); 267 // MPI_Comm_dup(interComm, &interCommServer); 268 // comms.push_back(interCommServer); 269 } 270 } 271 258 272 else 259 // initClient is called by primary server pool273 // initClient is called by primary server 260 274 { 261 275 clientPrimServer.push_back(new CContextClient(this, intraComm, interComm)); … … 263 277 } 264 278 265 registryIn=new CRegistry(intraComm); 266 registryIn->setPath(getId()) ; 267 if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ; 268 registryIn->bcastRegistry() ; 269 270 registryOut=new CRegistry(intraComm) ; 271 registryOut->setPath(getId()) ; 272 273 MPI_Comm intraCommServer, interCommServer; 274 if (cxtServer) // Attached mode 275 { 276 intraCommServer = intraComm; 277 interCommServer = interComm; 278 } 279 else 280 { 281 MPI_Comm_dup(intraComm, &intraCommServer); 282 comms.push_back(intraCommServer); 283 MPI_Comm_dup(interComm, &interCommServer); 284 comms.push_back(interCommServer); 285 } 279 280 281 // registryIn=new CRegistry(intraComm); 282 // registryIn->setPath(getId()) ; 283 // if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ; 284 // registryIn->bcastRegistry() ; 285 // 286 // registryOut=new CRegistry(intraComm) ; 287 // registryOut->setPath(getId()) ; 288 289 286 290 } 287 291 … … 353 357 // client = new CContextClient(this,intraComm,interComm, cxtClient); 354 358 355 registryIn=new CRegistry(intraComm);356 registryIn->setPath(getId()) ;357 if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;358 registryIn->bcastRegistry() ;359 registryOut=new CRegistry(intraComm) ;360 registryOut->setPath(getId()) ;359 // registryIn=new CRegistry(intraComm); 360 // registryIn->setPath(getId()) ; 361 // if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ; 362 // registryIn->bcastRegistry() ; 363 // registryOut=new CRegistry(intraComm) ; 364 // registryOut->setPath(getId()) ; 361 365 362 366 MPI_Comm intraCommClient, interCommClient; … … 368 372 else 369 373 { 370 MPI_Comm_dup(intraComm, &intraCommClient);371 comms.push_back(intraCommClient);372 MPI_Comm_dup(interComm, &interCommClient);373 comms.push_back(interCommClient);374 // MPI_Comm_dup(intraComm, &intraCommClient); 375 // comms.push_back(intraCommClient); 376 // MPI_Comm_dup(interComm, &interCommClient); 377 // comms.push_back(interCommClient); 374 378 } 375 379 … … 377 381 378 382 //! Server side: Put server into a loop in order to listen message from client 379 bool CContext::eventLoop(void)380 {381 if (CServer::serverLevel == 0)382 {383 return server->eventLoop();384 }385 else if (CServer::serverLevel == 1)386 {387 bool serverFinished = server->eventLoop();388 bool serverPrimFinished = true;389 for (int i = 0; i < serverPrimServer.size(); ++i)390 {391 serverPrimFinished *= serverPrimServer[i]->eventLoop();392 }393 return ( serverFinished && serverPrimFinished);394 }395 else396 {397 return server->eventLoop();398 }399 }383 // bool CContext::eventLoop(void) 384 // { 385 // if (CServer::serverLevel == 0) 386 // { 387 // return server->eventLoop(); 388 // } 389 // else if (CServer::serverLevel == 1) 390 // { 391 // bool serverFinished = server->eventLoop(); 392 // bool serverPrimFinished = true; 393 // for (int i = 0; i < serverPrimServer.size(); ++i) 394 // { 395 // serverPrimFinished *= serverPrimServer[i]->eventLoop(); 396 // } 397 // return ( serverFinished && serverPrimFinished); 398 // } 399 // else 400 // { 401 // return server->eventLoop(); 402 // } 403 // } 400 404 401 405 //! Try to send the buffers and receive possible answers … … 405 409 { 406 410 client->checkBuffers(); 407 return server->eventLoop(); 408 } 411 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 412 if (hasTmpBufferedEvent) 413 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 414 415 // Don't process events if there is a temporarily buffered event 416 return server->eventLoop(!hasTmpBufferedEvent); 417 } 418 409 419 else if (CServer::serverLevel == 1) 410 420 { 411 421 client->checkBuffers(); 422 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 423 if (hasTmpBufferedEvent) 424 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 425 bool serverFinished = server->eventLoop(!hasTmpBufferedEvent); 426 427 bool serverPrimFinished = true; 412 428 for (int i = 0; i < clientPrimServer.size(); ++i) 429 { 413 430 clientPrimServer[i]->checkBuffers(); 414 bool serverFinished = server->eventLoop(); 415 bool serverPrimFinished = true; 416 for (int i = 0; i < serverPrimServer.size(); ++i) 417 { 418 serverPrimFinished *= serverPrimServer[i]->eventLoop(); 431 bool hasTmpBufferedEventPrim = clientPrimServer[i]->hasTemporarilyBufferedEvent(); 432 if (hasTmpBufferedEventPrim) 433 hasTmpBufferedEventPrim = !clientPrimServer[i]->sendTemporarilyBufferedEvent(); 434 serverPrimFinished *= serverPrimServer[i]->eventLoop(hasTmpBufferedEventPrim); 419 435 } 420 436 return ( serverFinished && serverPrimFinished); 421 437 } 438 422 439 else if (CServer::serverLevel == 2) 423 440 { 424 441 client->checkBuffers(); 425 return server->eventLoop(); 442 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 443 if (hasTmpBufferedEvent) 444 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 445 return server->eventLoop(!hasTmpBufferedEvent); 426 446 } 427 447 } … … 433 453 { 434 454 finalized = true; 435 if (hasClient) sendRegistry() ; 455 // if (hasClient) sendRegistry() ; 456 457 client->finalize(); 458 while (!server->hasFinished()) 459 { 460 server->eventLoop(); 461 } 436 462 437 463 if ((hasClient) && (hasServer)) … … 450 476 } 451 477 452 client->finalize(); 453 while (!server->hasFinished()) 454 { 455 server->eventLoop(); 456 } 457 458 if (hasServer) 478 report(0)<< " Memory report : Context <"<<getId()<<"> : server side : total memory used for buffers "<<CContextServer::getTotalBuf()<<" bytes"<<endl; 479 480 // if (hasServer) 481 if (hasServer && !hasClient) 459 482 { 460 483 closeAllFile(); 461 registryOut->hierarchicalGatherRegistry() ;462 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;484 // registryOut->hierarchicalGatherRegistry() ; 485 // if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 463 486 } 464 487 … … 472 495 MPI_Comm_free(&(*it)); 473 496 comms.clear(); 474 475 497 476 498 } … … 1627 1649 } 1628 1650 1651 bool CContext::isFinalized(void) 1652 { 1653 return finalized; 1654 } 1655 1629 1656 } // namespace xios -
XIOS/dev/dev_olga/src/node/context.hpp
r1025 r1054 95 95 96 96 // Put sever or client into loop state 97 bool eventLoop(void);98 99 97 bool checkBuffersAndListen(void); 100 98 … … 167 165 void recvRegistry(CBufferIn& buffer) ; //!< registry is received by the servers 168 166 167 bool isFinalized(void); 168 169 169 // dispatch event 170 170 static bool dispatchEvent(CEventServer& event); … … 208 208 virtual bool hasChild(void) const; 209 209 210 210 211 public : 211 212 // Calendar of context … … 238 239 CContextServer* server; 239 240 240 // Client-server pair in case of secondary server pool241 241 // CContextClient* clientPrimServer; 242 242 // CContextServer* serverPrimServer; … … 250 250 StdString idServer_; 251 251 CGarbageCollector garbageCollector; 252 std::list<MPI_Comm> comms; //!< Communicators allocated internally 252 std::list<MPI_Comm> comms; //!< Communicators allocated internally --- significance?? 253 253 254 254 public: // Some function maybe removed in the near future -
XIOS/dev/dev_olga/src/node/domain.cpp
r1030 r1054 156 156 std::map<int, StdSize> CDomain::getAttributesBufferSize() 157 157 { 158 CContextClient* client = CContext::getCurrent()->client; 158 CContext* context = CContext::getCurrent(); 159 // For now the assumption is that secondary server pools consist of the same number of procs. 160 // CHANGE the line below if the assumption changes. 161 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client; 159 162 160 163 std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes(); … … 272 275 this->isRedistributed_ = true; 273 276 CContext* context = CContext::getCurrent(); 274 CContextClient* client = context->client; 277 // For now the assumption is that secondary server pools consist of the same number of procs. 278 // CHANGE the line below if the assumption changes. 279 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client; 275 280 int rankClient = client->clientRank; 276 281 int rankOnDomain = rankClient%nbLocalDomain; … … 508 513 { 509 514 CContext* context = CContext::getCurrent(); 510 CContextClient* client = context->client; 515 // For now the assumption is that secondary server pools consist of the same number of procs. 516 // CHANGE the line below if the assumption changes. 517 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client; 511 518 lon_g.resize(ni_glo) ; 512 519 lat_g.resize(nj_glo) ; … … 1351 1358 this->computeConnectedClients(); 1352 1359 // if (hasLonLat || hasArea || isCompressible_) this->computeConnectedClients(); 1353 if (hasLonLat) this->completeLonLatClient(); 1360 if (hasLonLat) 1361 if (!context->hasServer) 1362 this->completeLonLatClient(); 1354 1363 } 1355 1364 … … 1397 1406 if (context->hasClient) 1398 1407 { 1408 // this->completeLonLatClient(); 1399 1409 sendAttributes(); 1400 1410 } … … 1444 1454 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 1445 1455 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 1446 for (int i = 0; i < nbSrvPools; ++i) 1447 { 1448 CContextClient* contextClientTmp = (context->hasServer) ? context->clientPrimServer[i] 1449 : context->client; 1456 for (int p = 0; p < nbSrvPools; ++p) 1457 { 1458 CContextClient* contextClientTmp = (context->hasServer) ? context->clientPrimServer[p] 1459 : context->client; 1460 1450 1461 int nbServer = contextClientTmp->serverSize; 1451 1462 std::vector<int> nGlobDomain(2); … … 1498 1509 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 1499 1510 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 1500 for (int i = 0; i < nbSrvPools; ++i)1501 { 1502 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[ i] : context->client;1511 for (int p = 0; p < nbSrvPools; ++p) 1512 { 1513 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client; 1503 1514 int nbServer=client->serverSize; 1504 1515 int rank = client->clientRank; … … 1668 1679 for (it = globalIndexDomainOnServer.begin(); it != ite; ++it) { 1669 1680 connectedServerRank_.push_back(it->first); 1670 //std::vector<size_t> vec = it->second;1671 //std::sort(vec.begin(), vec.end());1672 //indSrv_[it->first] = vec;1673 } 1674 1675 indSrv_.swap(globalIndexDomainOnServer);1681 std::vector<size_t> vec = it->second; 1682 std::sort(vec.begin(), vec.end()); 1683 indSrv_[it->first] = vec; 1684 } 1685 1686 // indSrv_.swap(globalIndexDomainOnServer); 1676 1687 nbConnectedClients_ = clientServerMap->computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_); 1677 1688 1678 1689 clientServerMap->computeServerIndexMapping(globalIndexDomainZoom); 1679 1690 CClientServerMapping::GlobalIndexMap& globalIndexDomainZoomOnServer = clientServerMap->getGlobalIndexOnServer(); 1680 indZoomSrv_.swap(globalIndexDomainZoomOnServer);1691 // indZoomSrv_.swap(globalIndexDomainZoomOnServer); 1681 1692 std::vector<int> connectedServerZoomRank(indZoomSrv_.size()); 1682 for (it = indZoomSrv_.begin(); it != indZoomSrv_.end(); ++it) 1693 // for (it = indZoomSrv_.begin(); it != indZoomSrv_.end(); ++it) 1694 // connectedServerZoomRank.push_back(it->first); 1695 for (it = globalIndexDomainZoomOnServer.begin(); it != globalIndexDomainZoomOnServer.end(); ++it) 1696 { 1683 1697 connectedServerZoomRank.push_back(it->first); 1698 std::vector<size_t> vec = it->second; 1699 std::sort(vec.begin(), vec.end()); 1700 indZoomSrv_[it->first] = vec; 1701 } 1684 1702 nbConnectedClientsZoom_ = clientServerMap->computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerZoomRank); 1685 1703 … … 1699 1717 void CDomain::sendAttributes() 1700 1718 { 1719 sendDistributionAttributes(); 1701 1720 sendIndex(); 1702 sendDistributionAttributes();1703 1721 sendMask(); 1704 1722 sendLonLat(); … … 1717 1735 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 1718 1736 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 1719 for (int i = 0; i < nbSrvPools; ++i)1720 { 1721 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[ i] : context->client;1737 for (int p = 0; p < nbSrvPools; ++p) 1738 { 1739 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client; 1722 1740 1723 1741 CEventClient eventIndex(getType(), EVENT_ID_INDEX); … … 1788 1806 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 1789 1807 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 1790 for (int i = 0; i < nbSrvPools; ++i)1791 { 1792 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[ i] : context->client;1808 for (int p = 0; p < nbSrvPools; ++p) 1809 { 1810 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client; 1793 1811 1794 1812 // send area for each connected server … … 1835 1853 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 1836 1854 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 1837 for (int i = 0; i < nbSrvPools; ++i)1838 { 1839 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[ i] : context->client;1855 for (int p = 0; p < nbSrvPools; ++p) 1856 { 1857 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client; 1840 1858 1841 1859 // send area for each connected server … … 1885 1903 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 1886 1904 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 1887 for (int i = 0; i < nbSrvPools; ++i)1888 { 1889 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[ i] : context->client;1905 for (int p = 0; p < nbSrvPools; ++p) 1906 { 1907 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client; 1890 1908 1891 1909 // send lon lat for each connected server … … 1954 1972 eventLat.push(rank, nbConnectedClients_[rank], list_msgsLat.back()); 1955 1973 } 1956 1957 1974 client->sendEvent(eventLon); 1958 1975 client->sendEvent(eventLat); … … 1971 1988 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 1972 1989 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 1973 for (int i = 0; i < nbSrvPools; ++i)1974 { 1975 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[ i] : context->client;1990 for (int p = 0; p < nbSrvPools; ++p) 1991 { 1992 CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client; 1976 1993 1977 1994 // send area for each connected server … … 2204 2221 for (idx =0; idx < ni_zoom; ++idx) 2205 2222 { 2206 2223 if ((ibegin <= zoom_i_index(idx)) && (zoom_i_index(idx) < ibegin+ni) && (nbIZoom < ni)) 2207 2224 ++nbIZoom; 2208 2225 if ((jbegin <= zoom_j_index(idx)) && (zoom_j_index(idx) < jbegin+nj) && (nbJZoom < nj)) 2209 2226 ++nbJZoom; 2210 2227 } … … 2237 2254 } 2238 2255 2239 MPI_Scan(&count_write_index_[0], &start_write_index_[0], 2, MPI_INT, MPI_SUM, server->intraComm); 2240 start_write_index_[0] = 0; 2241 start_write_index_[1] -= count_write_index_[1]; 2256 MPI_Scan(&count_write_index_[0], &start_write_index_[0], 2, MPI_INT, MPI_SUM, server->intraComm); 2257 if ((this->type) != CDomain::type_attr::unstructured) 2258 { 2259 start_write_index_[0] = 0; 2260 start_write_index_[1] -= count_write_index_[1]; 2261 } 2262 else 2263 { 2264 start_write_index_[0] -= count_write_index_[0]; 2265 } 2242 2266 local_write_size_[0] = count_write_index_[0]; 2243 2267 local_write_size_[1] = count_write_index_[1]; 2244 2268 MPI_Allreduce(&count_write_index_[0], &global_write_size_[0], 2, MPI_INT, MPI_SUM, server->intraComm); 2245 global_write_size_[0] = count_write_index_[0]; 2246 global_write_size_[1] = (global_write_size_[1] > nj_glo) ? nj_glo : global_write_size_[1]; 2247 2269 if ((this->type) != CDomain::type_attr::unstructured) 2270 { 2271 global_write_size_[0] = count_write_index_[0]; 2272 global_write_size_[1] = (global_write_size_[1] > nj_glo) ? nj_glo : global_write_size_[1]; 2273 } 2248 2274 } 2249 2275 -
XIOS/dev/dev_olga/src/node/field.cpp
r1030 r1054 125 125 // CContextClient* client = context->client; 126 126 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 127 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 128 for (int i = 0; i < nbSrvPools; ++i) 129 { 130 CContextClient* client = (!context->hasServer) ? context->client : context->clientPrimServer[i]; 127 // int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 128 // for (int i = 0; i < nbSrvPools; ++i) 129 // { 130 // CContextClient* client = (!context->hasServer) ? context->client : context->clientPrimServer[i]; 131 CContextClient* client = (!context->hasServer) ? context->client : this->file->getContextClient(); 131 132 132 133 CEventClient event(getType(), EVENT_ID_UPDATE_DATA); … … 176 177 client->sendEvent(event); 177 178 } 178 }179 // } 179 180 180 181 CTimer::get("XIOS Send Data").suspend(); 181 182 } 182 183 184 /* 183 185 void CField::sendUpdateData(const CArray<double,1>& data, CContextClient* client) 184 186 { … … 233 235 CTimer::get("XIOS Send Data").suspend(); 234 236 } 235 237 */ 236 238 void CField::recvUpdateData(CEventServer& event) 237 239 { -
XIOS/dev/dev_olga/src/node/field.hpp
r1024 r1054 146 146 static bool dispatchEvent(CEventServer& event); 147 147 void sendUpdateData(const CArray<double,1>& data); 148 void sendUpdateData(const CArray<double,1>& data, CContextClient* client);148 // void sendUpdateData(const CArray<double,1>& data, CContextClient* client); 149 149 static void recvUpdateData(CEventServer& event); 150 150 void recvUpdateData(std::map<int,CBufferIn*>& rankBuffers); -
XIOS/dev/dev_olga/src/node/file.cpp
r1025 r1054 270 270 void CFile::checkFile(void) 271 271 { 272 if (mode.isEmpty() || mode.getValue() == mode_attr::write) 272 CContext* context = CContext::getCurrent(); 273 // Done by classical server or secondary server 274 if (!CXios::usingServer2 || (CXios::usingServer2 && !context->hasClient)) 273 275 { 274 if (!isOpen) createHeader(); 275 checkSync(); 276 } 277 else 278 { 279 if (!isOpen) openInReadMode(); 280 } 281 checkSplit(); 276 if (mode.isEmpty() || mode.getValue() == mode_attr::write) 277 { 278 if (!isOpen) createHeader(); 279 checkSync(); 280 } 281 else 282 { 283 if (!isOpen) openInReadMode(); 284 } 285 checkSplit(); 286 } 282 287 } 283 288 … … 590 595 CContext* context = CContext::getCurrent(); 591 596 CContextClient* client=context->client; 597 // CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client; 592 598 593 599 // It would probably be better to call initFile() somehow -
XIOS/dev/dev_olga/src/node/file.hpp
r1025 r1054 170 170 private : 171 171 /// Propriétés privées /// 172 CContextClient* client; 172 173 CFieldGroup* vFieldGroup; 173 174 CVariableGroup* vVariableGroup; … … 175 176 boost::shared_ptr<CDataInput> data_in; 176 177 std::vector<CField*> enabledFields; 177 CContextClient* client;178 178 179 179 public: -
XIOS/dev/dev_olga/src/node/grid.cpp
r1030 r1054 630 630 computeClientIndex(); 631 631 if (context->hasClient) 632 computeConnectedClients(); 632 { 633 computeConnectedClients(); 634 } 633 635 634 636 … … 792 794 } 793 795 794 nbIndexOnServer = 0; 795 for (it = itb; it != ite; ++it) 796 // nbIndexOnServer = 0; 797 // for (it = itb; it != ite; ++it) 798 // { 799 // const std::vector<int>& tmp = it->second; 800 // nbIndexOnServerTmp = 0; 801 // for (int i = 0; i < tmp.size(); ++i) 802 // { 803 // if (0 == nbIndexOnServerTmp(tmp[i])) 804 // { 805 // globalElementIndexOnServer[idx][tmp[i]][nbIndexOnServer(tmp[i])] = it->first; 806 // ++nbIndexOnServerTmp(tmp[i]); 807 // } 808 // } 809 // nbIndexOnServer += nbIndexOnServerTmp; 810 // } 811 // } 812 813 nbIndexOnServer = 0; 814 // for (it = itb; it != ite; ++it) 815 for (size_t j = 0; j < globalIndexElementOnServerMap.size(); ++j) 816 { 817 it = globalIndexElementOnServerMap.find(globalIndexElementOnClient(j)); 818 if (it != ite) 796 819 { 797 820 const std::vector<int>& tmp = it->second; … … 808 831 } 809 832 } 833 } 810 834 811 835 // Determine server which contain global source index … … 1236 1260 // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 1237 1261 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 1238 for (int i = 0; i < nbSrvPools; ++i)1239 { 1240 CContextClient* client = context->hasServer ? context->clientPrimServer[ i] : context->client ;1262 for (int p = 0; p < nbSrvPools; ++p) 1263 { 1264 CContextClient* client = context->hasServer ? context->clientPrimServer[p] : context->client ; 1241 1265 1242 1266 CEventClient event(getType(), EVENT_ID_INDEX); … … 1397 1421 int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 1398 1422 nbSrvPools = 1; 1399 for (int i = 0; i < nbSrvPools; ++i) 1400 { 1401 CContextServer* server = (context->hasServer) ? context->server : context->serverPrimServer[i]; 1402 CContextClient* client = (context->hasServer) ? context->client : context->clientPrimServer[i]; 1423 for (int p = 0; p < nbSrvPools; ++p) 1424 { 1425 CContextServer* server = (!context->hasClient) ? context->server : context->serverPrimServer[p]; 1426 CContextClient* client = (!context->hasClient) ? context->client : context->clientPrimServer[p]; 1427 // CContextServer* server = (context->hasServer) ? context->server : context->serverPrimServer[p]; 1428 // CContextClient* client = (context->hasServer) ? context->client : context->clientPrimServer[p]; 1403 1429 numberWrittenIndexes_ = totalNumberWrittenIndexes_ = offsetWrittenIndexes_ = 0; 1404 1430 connectedServerRank_ = ranks; … … 1503 1529 else 1504 1530 { 1505 dataSize = outIndex.numElements(); 1506 for (int i = 0; i < outIndex.numElements(); ++i) outIndex(i) = i; 1531 // dataSize = outIndex.numElements(); 1532 // for (int i = 0; i < outIndex.numElements(); ++i) outIndex(i) = i; 1533 // THE PROBLEM HERE IS THAT DATA CAN BE NONDISTRIBUTED ON CLIENT AND DISTRIBUTED ON SERVER 1534 // BELOW IS THE TEMPORARY FIX only for a single type of element (domain, asix, scalar) 1535 dataSize = serverDistribution_->getGlobalIndexEachDimension()[0].numElements(); 1536 outIndex.resize(dataSize); 1537 outIndex = serverDistribution_->getGlobalIndexEachDimension()[0]; 1538 1507 1539 } 1508 1540 writtenDataSize_ += dataSize; -
XIOS/dev/dev_olga/src/object_template_impl.hpp
r1030 r1054 296 296 } 297 297 } 298 // // if (!context->hasServer)299 // if (context->hasClient)300 // {301 // CContextClient* client=context->client;302 303 // CEventClient event(getType(),EVENT_ID_SEND_ATTRIBUTE);304 // if (client->isServerLeader())305 // {306 // CMessage msg;307 // // msg << this->getId();308 // msg << this->getIdServer();309 // msg << attr.getName();310 // msg << attr;311 // const std::list<int>& ranks = client->getRanksServerLeader();312 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)313 // event.push(*itRank,1,msg);314 // client->sendEvent(event);315 // }316 // else client->sendEvent(event);317 // }318 319 298 } 320 299 -
XIOS/dev/dev_olga/src/server.cpp
r1021 r1054 20 20 list<MPI_Comm> CServer::interCommLeft ; 21 21 list<MPI_Comm> CServer::interCommRight ; 22 list<MPI_Comm> CServer::interComm ;22 // list<MPI_Comm> CServer::interComm ; 23 23 std::list<MPI_Comm> CServer::contextInterComms; 24 24 int CServer::serverLevel = 0 ; 25 int CServer::serverLeader = 0; 26 int CServer::serverSize = 0; 25 27 int CServer::nbPools = 0; 26 28 int CServer::poolId = 0; 27 int CServer::serverSize = 0;28 29 bool CServer::isRoot = false ; 29 30 int CServer::rank = INVALID_RANK; … … 38 39 /*! 39 40 * \fn void CServer::initialize(void) 40 * Creates intraComm and interComm for a server pool (primary or secondary). 41 * Creates intraComm for each possible type of servers (classical, primary or secondary). 42 * In case of secondary servers intraComm is created for each secondary server pool. 43 * (For now the assumption is that there is one proc per pool.) 44 * Creates the following lists of interComms: 45 * classical server -- interCommLeft 46 * primary server -- interCommLeft and interCommRight 47 * secondary server -- interComm for each pool. 41 48 */ 42 49 void CServer::initialize(void) … … 64 71 65 72 unsigned long* hashAll ; 66 // unsigned long* hashAllServers ;67 73 68 74 // int rank ; … … 129 135 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 130 136 interCommLeft.push_back(newComm) ; 131 interComm.push_back(newComm) ;132 137 } 133 138 } … … 135 140 else if (serverLevel == 1) 136 141 { 137 int clientLeader, srvPrmLeader, srvSndLeader; 142 int clientLeader, srvSndLeader; 143 int srvPrmLeader ; 138 144 for (it=leaders.begin();it!=leaders.end();it++) 139 145 { … … 148 154 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 149 155 interCommLeft.push_back(newComm) ; 150 interComm.push_back(newComm) ;151 156 } 152 157 else 153 s rvPrmLeader = it->second;158 serverLeader = it->second; 154 159 } 155 160 156 161 for (int i = 0; i < nbPools; ++i) 157 162 { 158 srvSndLeader = srvPrmLeader + serverSize - nbPools + i; 159 // CClient::initializeClientOnServer(rank, serversComm, srvSndLeader); 160 CClient::initializeClientOnServer(rank, intraComm, srvSndLeader); 161 interCommRight.push_back(CClient::getInterComm()); 162 interComm.push_back(CClient::getInterComm()); 163 srvSndLeader = serverLeader + serverSize - nbPools + i; 164 int intraCommSize, intraCommRank ; 165 MPI_Comm_size(intraComm, &intraCommSize) ; 166 MPI_Comm_rank(intraComm, &intraCommRank) ; 167 info(50)<<"intercommCreate::client "<<rank<<" intraCommSize : "<<intraCommSize 168 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvSndLeader<<endl ; 169 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ; 170 interCommRight.push_back(newComm) ; 163 171 } 164 172 } // primary server … … 175 183 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 176 184 interCommLeft.push_back(newComm) ; 177 interComm.push_back(newComm) ;178 185 } // secondary server 179 186 … … 210 217 if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 211 218 MPI_Comm_remote_size(newComm,&size); 212 interComm.push_back(newComm) ; 219 // interComm.push_back(newComm) ; 220 interCommLeft.push_back(newComm) ; 213 221 } 214 222 oasis_enddef() ; 215 223 } 216 224 217 MPI_Comm_rank(intraComm, &rank) ; 218 if (rank==0) isRoot=true; 225 int rankServer; 226 MPI_Comm_rank(intraComm, &rankServer) ; 227 if (rankServer==0) isRoot=true; 219 228 else isRoot=false; 220 229 … … 235 244 // MPI_Comm_free(&(*it)); 236 245 237 for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)238 MPI_Comm_free(&(*it));239 240 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)241 MPI_Comm_free(&(*it));246 for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) 247 MPI_Comm_free(&(*it)); 248 249 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 250 MPI_Comm_free(&(*it)); 242 251 243 252 MPI_Comm_free(&intraComm); … … 273 282 274 283 contextEventLoop() ; 284 // if (finished && contextList.empty()) stop=true ; 275 285 if (finished && contextList.empty()) stop=true ; 276 286 eventScheduler->checkEvent() ; … … 294 304 if (flag==true) 295 305 { 296 MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; 297 info(20)<<" CServer : Receive client finalize"<<endl ; 298 299 // If primary server, send finalize to secondary server pool(s) 300 for(itr=interCommRight.begin(); itr!=interCommRight.end(); itr++) 301 { 302 MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; 303 // MPI_Comm_free(&(*itr)); 304 // interCommRight.erase(itr) ; 305 } 306 306 MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; 307 info(20)<<" CServer : Receive client finalize"<<endl ; 308 // Sending server finalize message to secondary servers (if any) 309 for(itr=interCommRight.begin();itr!=interCommRight.end();itr++) 310 { 311 MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; 312 // itr = interCommRight.erase(itr) ; 313 } 307 314 MPI_Comm_free(&(*it)); 308 // interComm.erase(it) ;309 315 interCommLeft.erase(it) ; 310 316 break ; … … 312 318 } 313 319 314 if (interCommLeft.empty()) 315 // if (interComm.empty()) 320 if (interCommLeft.empty()) 316 321 { 317 322 int i,size ; … … 381 386 MPI_Get_count(&status,MPI_CHAR,&count) ; 382 387 recvContextMessage(buffer,count) ; 383 delete [] buffer 388 delete [] buffer; 384 389 recept=false ; 385 390 } … … 390 395 { 391 396 static map<string,contextMessage> recvContextId; 392 393 397 map<string,contextMessage>::iterator it ; 394 395 398 CBufferIn buffer(buff,count) ; 396 399 string id ; … … 434 437 void CServer::listenRootContext(void) 435 438 { 436 437 439 MPI_Status status ; 438 440 int flag ; … … 464 466 MPI_Get_count(&status,MPI_CHAR,&count) ; 465 467 registerContext(buffer,count) ; 466 467 468 delete [] buffer ; 468 469 recept=false ; … … 484 485 << "Context '" << contextId << "' has already been registred"); 485 486 486 MPI_Comm contextInterComm;487 MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextInterComm);488 489 MPI_Comm inter;490 MPI_Intercomm_merge(contextInterComm,1,&inter);491 MPI_Barrier(inter);492 493 487 context=CContext::create(contextId); 494 488 contextList[contextId]=context; 495 context->initServer(intraComm,contextInterComm); 496 contextInterComms.push_back(contextInterComm); 497 489 490 // All type of servers initialize its own server (CContextServer) 491 if (serverLevel < 2) 492 { 493 MPI_Comm contextInterComm; 494 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); 495 MPI_Comm inter; 496 MPI_Intercomm_merge(contextInterComm,1,&inter); 497 MPI_Barrier(inter); 498 MPI_Comm_free(&inter); 499 context->initServer(intraComm,contextInterComm); 500 contextInterComms.push_back(contextInterComm); 501 } 502 else if (serverLevel == 2) 503 { 504 context->initServer(intraComm, interCommLeft.front()); 505 } 506 507 // Primary server: send create context message to secondary servers and initialize its own client (CContextClient) 498 508 if (serverLevel == 1) 499 509 { 500 // CClient::registerContext(contextId, intraComm); 501 CClient::registerContextByClienOfServer(contextId, intraComm); 502 } 503 504 MPI_Comm_free(&inter); 505 510 int i = 0, size; 511 CMessage msg; 512 int messageSize; 513 MPI_Comm_size(intraComm, &size) ; 514 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) 515 { 516 StdString str = contextId +"_server_" + boost::lexical_cast<string>(i); 517 msg<<str<<size<<rank ; 518 messageSize = msg.size() ; 519 buff = new char[messageSize] ; 520 CBufferOut buffer(buff,messageSize) ; 521 buffer<<msg ; 522 int sndServerGloRanks = serverSize-nbPools+serverLeader +i; // the assumption is that there is only one proc per secondary server pool 523 MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGloRanks, 1, CXios::globalComm) ; 524 context->initClient(intraComm, *it) ; 525 delete [] buff ; 526 } 527 } 506 528 } 507 529 … … 509 531 { 510 532 bool finished ; 533 511 534 map<string,CContext*>::iterator it ; 512 535 513 536 for(it=contextList.begin();it!=contextList.end();it++) 514 537 { 515 finished=it->second-> checkBuffersAndListen();538 finished=it->second->isFinalized(); 516 539 if (finished) 517 540 { … … 519 542 break ; 520 543 } 544 else 545 finished=it->second->checkBuffersAndListen(); 521 546 } 522 547 } … … 554 579 { 555 580 if (serverLevel == 1) 556 id = getRank();581 id = rank-serverLeader; 557 582 else 558 583 id = poolId; -
XIOS/dev/dev_olga/src/server.hpp
r1021 r1054 14 14 public: 15 15 static void initialize(void); 16 static void initialize(const StdString& serverId);17 16 static void finalize(void); 18 17 static void eventLoop(void); … … 23 22 static void listenRootContext(void); 24 23 static void listenRootFinalize(void); 25 static void registerContext(void* buff,int count, int leaderRank=0); // context registered by the primary server24 static void registerContext(void* buff,int count, int leaderRank=0); 26 25 27 // Communicators for the primary group of servers28 26 static MPI_Comm intraComm; 29 static list<MPI_Comm> interCommLeft; // interComm between server (primary or secondary) and its client (client or primary server) 30 static list<MPI_Comm> interCommRight; // interComm between primary server and secondary server (non-empty only for primary server pool) 31 static list<MPI_Comm> interComm; // interCommLeft + interCommRight 32 static std::list<MPI_Comm> contextInterComms; 27 static list<MPI_Comm> interCommLeft; // interComm between server (primary, classical or secondary) and its client (client or primary server) 28 static list<MPI_Comm> interCommRight; // interComm between primary server and secondary server (non-empty only for primary server pool) 29 static std::list<MPI_Comm> contextInterComms; // significance ?? 33 30 static CEventScheduler* eventScheduler; 34 31 35 32 static int serverLevel ; 36 37 // static int nbSndSrvPools; // number of secondary server pools38 // static int poolNb; // for secondary servers; stores the pool number39 33 40 34 struct contextMessage … … 46 40 static bool isRoot; 47 41 48 static map<string,CContext*> contextList; // contexts on the primary server42 static map<string,CContext*> contextList; 49 43 static bool finished; 50 44 static bool is_MPI_Initialized; … … 70 64 private: 71 65 static int rank; 72 static int serverSize; //!< Number of procs dedicated to server 73 static int nbPools; //!< Number of secondary-server pools 74 static int poolId; //!< ID of a secondary-server pool 66 static int serverLeader; //!< Leader of the classical or primary server (needed in case of secondary servers) 67 static int serverSize; //!< Number of procs dedicated to servers (primary and seconday (if any) combined) 68 static int nbPools; //!< Number of secondary server pools 69 static int poolId; //!< id of a secondary server pool starting from 1 75 70 static StdOFStream m_infoStream; 76 71 static StdOFStream m_errorStream; -
XIOS/dev/dev_olga/src/xios_server.f90
r983 r1054 4 4 INCLUDE "mpif.h" 5 5 INTEGER :: ierr 6 INTEGER :: server_level = 07 ! 0 in case of a single server pool8 ! 1 for primary server in case of two server pools9 ! 2 for secondary server in case of two server pools10 6 11 CALL xios_init_server (server_level)7 CALL xios_init_server 12 8 13 9 END PROGRAM server_main
Note: See TracChangeset
for help on using the changeset viewer.