Changeset 2547 for XIOS3/trunk/src
- Timestamp:
- 08/29/23 17:24:04 (11 months ago)
- Location:
- XIOS3/trunk/src
- Files:
-
- 3 added
- 35 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/buffer_client.cpp
r2458 r2547 14 14 size_t CClientBuffer::maxRequestSize = 0; 15 15 16 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize)16 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, bool hasWindows) 17 17 : interComm(interComm) 18 18 , clientRank_(0) 19 19 , serverRank(serverRank) 20 20 , bufferSize(bufferSize) 21 , estimatedMaxEventSize(estimatedMaxEventSize)22 21 , maxEventSize(0) 23 22 , current(0) 24 23 , count(0) 25 24 , pending(false) 26 , hasWindows(false) 27 { 28 /* 29 if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 30 else hasWindows=true ; 31 */ 32 33 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 34 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 25 , hasWindows_(hasWindows) 26 { 27 if (hasWindows_) 28 { 29 windows_.resize(2) ; 30 windows_[0] = new CWindowDynamic() ; 31 windows_[0]->allocateBuffer(bufferSize+headerSize_) ; 32 bufferHeader[0] = (char*) windows_[0]->getBufferAddress() ; 33 windows_[1] = new CWindowDynamic() ; 34 windows_[1]->allocateBuffer(bufferSize+headerSize_) ; 35 bufferHeader[1] = (char*) windows_[1]->getBufferAddress() ; 36 } 37 else 38 { 39 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 40 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 41 } 42 35 43 buffer[0] = bufferHeader[0]+headerSize_ ; 36 44 buffer[1] = bufferHeader[1]+headerSize_ ; … … 55 63 winState[1]=false ; 56 64 57 58 if (hasWindows) 59 { 60 61 MPI_Aint buffSize=bufferSize+headerSize_ ; 62 MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 63 MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 64 65 MPI_Group group ; 66 int groupSize,groupRank ; 67 MPI_Win_get_group(windows_[0], &group) ; 68 MPI_Group_size(group, &groupSize) ; 69 MPI_Group_rank(group, &groupRank) ; 70 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 71 72 MPI_Win_get_group(windows_[1], &group) ; 73 MPI_Group_size(group, &groupSize) ; 74 MPI_Group_rank(group, &groupRank) ; 75 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 76 77 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 78 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 79 80 MPI_Win_unlock(clientRank_, windows_[1]) ; 81 MPI_Win_unlock(clientRank_, windows_[0]) ; 82 } 83 retBuffer = new CBufferOut(buffer[current], bufferSize); 84 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 65 66 retBuffer = new CBufferOut(buffer[current], bufferSize); 67 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 85 68 } 86 69 … … 91 74 return address ; 92 75 } 93 94 void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 76 77 MPI_Aint CClientBuffer::getWinBufferAddress(int i) 78 { 79 return windows_[i]->getWinBufferAddress() ; 80 } 81 82 void CClientBuffer::attachWindows(MPI_Comm& winComm) 95 83 { 96 84 isAttachedWindows_=true ; 97 windows_=windows ; 98 if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; 99 else hasWindows=true ; 100 101 if (hasWindows) 85 86 if (hasWindows_) 102 87 { 103 88 MPI_Aint buffSize=bufferSize+headerSize_ ; 104 MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 105 MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 106 107 MPI_Group group ; 108 int groupSize,groupRank ; 109 MPI_Win_get_group(windows_[0], &group) ; 110 MPI_Group_size(group, &groupSize) ; 111 MPI_Group_rank(group, &groupRank) ; 112 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 113 114 MPI_Win_get_group(windows_[1], &group) ; 115 MPI_Group_size(group, &groupSize) ; 116 MPI_Group_rank(group, &groupRank) ; 117 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 118 119 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 120 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 121 122 MPI_Win_unlock(clientRank_, windows_[1]) ; 123 MPI_Win_unlock(clientRank_, windows_[0]) ; 89 windows_[0]->create(winComm) ; 90 windows_[0]->attach() ; 91 windows_[1]->create(winComm) ; 92 windows_[1]->attach() ; 93 94 windows_[0]->lockExclusive(clientRank_) ; 95 windows_[1]->lockExclusive(clientRank_) ; 96 97 windows_[0]->unlockExclusive(clientRank_) ; 98 windows_[1]->unlockExclusive(clientRank_) ; 99 124 100 } 125 101 … … 129 105 CClientBuffer::~CClientBuffer() 130 106 { 131 //freeWindows() ; 132 if (hasWindows) 133 { 134 MPI_Win_detach(windows_[0],bufferHeader[0]); 135 MPI_Win_detach(windows_[1],bufferHeader[1]); 136 MPI_Free_mem(bufferHeader[0]) ; 137 MPI_Free_mem(bufferHeader[1]) ; 138 } 139 delete retBuffer; 107 if (hasWindows_) 108 { 109 windows_[0]->detach() ; 110 windows_[1]->detach() ; 111 delete windows_[0] ; 112 delete windows_[1] ; 113 } 114 else 115 { 116 MPI_Free_mem(bufferHeader[0]) ; 117 MPI_Free_mem(bufferHeader[1]) ; 118 } 119 delete retBuffer; 140 120 } 141 121 … … 143 123 { 144 124 CTimer::get("lock buffer").resume(); 145 if ( hasWindows)125 if (isAttachedWindows_) 146 126 { 147 127 if (winState[current]==true) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo lock client buffer but winState said it is already locked") ; 148 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 128 //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 129 windows_[current]->lockExclusive(clientRank_) ; 149 130 winState[current]=true ; 150 131 } … … 155 136 { 156 137 CTimer::get("unlock buffer").resume(); 157 if ( hasWindows)138 if (isAttachedWindows_) 158 139 { 159 140 if (winState[current]==false) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo unlock client buffer but winState said it is already unlocked") ; 160 MPI_Win_unlock(clientRank_, windows_[current]) ; 141 //MPI_Win_unlock(clientRank_, windows_[current]) ; 142 windows_[current]->unlockExclusive(clientRank_) ; 161 143 winState[current]=false ; 162 144 } … … 189 171 { 190 172 maxEventSize = size; 191 192 if (size > estimatedMaxEventSize)193 error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank194 << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl;195 173 196 174 if (size > maxRequestSize) maxRequestSize = size; … … 319 297 { 320 298 321 if (hasWindows) 299 bufferSize=newSize ; 300 301 if (hasWindows_) 322 302 { 323 MPI_Win_detach(windows_[0], bufferHeader[0]) ; 324 MPI_Win_detach(windows_[1], bufferHeader[1]) ; 325 } 326 MPI_Free_mem(bufferHeader[0]) ; 327 MPI_Free_mem(bufferHeader[1]) ; 328 329 bufferSize=newSize ; 330 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 331 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 303 windows_[0]->detach(); 304 windows_[1]->detach(); 305 306 windows_[0]->attach(bufferSize+headerSize_) ; 307 bufferHeader[0] = (char*) windows_[0] -> getBufferAddress() ; 308 windows_[1]->attach(bufferSize+headerSize_) ; 309 bufferHeader[1] = (char*) windows_[1] -> getBufferAddress() ; 310 } 311 312 332 313 buffer[0] = bufferHeader[0]+headerSize_ ; 333 314 buffer[1] = bufferHeader[1]+headerSize_ ; … … 353 334 current=0 ; 354 335 355 if (hasWindows )336 if (hasWindows_) 356 337 { 357 358 MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ; 359 MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ; 360 361 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 362 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 363 364 MPI_Win_unlock(clientRank_, windows_[1]) ; 365 MPI_Win_unlock(clientRank_, windows_[0]) ; 338 339 windows_[0]->lockExclusive(clientRank_) ; 340 windows_[1]->lockExclusive(clientRank_) ; 341 342 windows_[1]->unlockExclusive(clientRank_) ; 343 windows_[0]->unlockExclusive(clientRank_) ; 366 344 } 367 345 … … 373 351 bufOut->put(timelineEventChangeBufferSize); 374 352 bufOut->put(newBufferSize_); 375 bufOut->put(this->getWinAddress(0)); 376 bufOut->put(this->getWinAddress(1)); 353 354 bufOut->put(this->getWinBufferAddress(0)); 355 bufOut->put(this->getWinBufferAddress(1)); 377 356 378 357 resizingBufferStep_=4; 379 358 unlockBuffer() ; 380 info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWin Address(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl;359 info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinBufferAddress(0)<<" winAdress[1] "<<this->getWinBufferAddress(1)<<endl; 381 360 } 382 361 … … 397 376 lockBuffer() ; 398 377 ret=*notify[current] == notifyResizeBuffer_ ? true : false ; 399 if (ret || !hasWindows )378 if (ret || !hasWindows_) 400 379 { 401 380 *notify[current] = notifyNothing_ ; -
XIOS3/trunk/src/buffer_client.hpp
r2458 r2547 7 7 #include "mpi.hpp" 8 8 #include "cxios.hpp" 9 #include "window_dynamic.hpp" 9 10 10 11 namespace xios … … 15 16 static size_t maxRequestSize; 16 17 17 CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize);18 CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, bool hasWindows); 18 19 ~CClientBuffer(); 19 20 // void createWindows(MPI_Comm oneSidedComm) ; … … 28 29 StdSize remain(void); 29 30 MPI_Aint getWinAddress(int numWindows) ; 31 MPI_Aint getWinBufferAddress(int numWindows) ; 30 32 void infoBuffer(void) ; 31 33 bool isNotifiedFinalized(void) ; … … 33 35 void fixBufferSize(size_t bufferSize) { newBufferSize_=bufferSize ; isGrowableBuffer_=false ; resizingBufferStep_=1 ;} 34 36 void fixBuffer(void) { isGrowableBuffer_=false ;} 35 void attachWindows( vector<MPI_Win>& windows) ;37 void attachWindows(MPI_Comm& winComm) ; 36 38 bool isAttachedWindows(void) { return isAttachedWindows_ ;} 37 39 private: … … 58 60 StdSize maxEventSize; 59 61 StdSize bufferSize; 60 const StdSize estimatedMaxEventSize;61 62 bool isFinalized_=false ; 62 63 … … 69 70 CBufferOut* retBuffer; 70 71 const MPI_Comm interComm; 71 std::vector< MPI_Win> windows_ ;72 bool hasWindows =false ;72 std::vector<CWindowDynamic*> windows_ ; 73 bool hasWindows_=false ; 73 74 bool isAttachedWindows_=false ; 74 75 double latency_=0 ; -
XIOS3/trunk/src/buffer_server.cpp
r2309 r2547 3 3 #include "buffer_server.hpp" 4 4 #include "timer.hpp" 5 #include "window_dynamic.hpp" 5 6 6 7 … … 8 9 { 9 10 10 CServerBuffer::CServerBuffer(vector< MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize)11 CServerBuffer::CServerBuffer(vector<CWindowDynamic*>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize) 11 12 : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress) 12 13 { … … 18 19 MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ; 19 20 currentWindows=1 ; 20 if (windows[0]== MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;21 if (windows[0]==nullptr && windows[1]==nullptr) hasWindows=false ; 21 22 } 22 23 … … 239 240 bool ok=false ; 240 241 241 MPI_Group group ; 242 int groupSize,groupRank ; 243 MPI_Win_get_group(windows_[currentWindows], &group) ; 244 MPI_Group_size(group, &groupSize) ; 245 MPI_Group_rank(group, &groupRank) ; 246 242 247 243 lockBuffer(); 248 244 CTimer::get("getBufferFromClient_locked").resume() ; 249 245 // lock is acquired 250 246 251 MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;252 MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;253 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;254 247 windows_[currentWindows]->get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 248 windows_[currentWindows]->get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 249 windows_[currentWindows]->flush(windowsRank_) ; 250 255 251 if (timeLine==clientTimeline) 256 252 { 257 253 buffer=(char*)getBuffer(clientCount) ; 258 254 count=clientCount ; 259 MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ; 255 windows_[currentWindows]->get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR) ; 256 260 257 clientTimeline = 0 ; 261 258 clientCount = 0 ; 262 MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;263 MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;259 windows_[currentWindows]->put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 260 windows_[currentWindows]->put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 264 261 265 262 // release lock … … 298 295 { 299 296 if (!hasWindows) return ; 300 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 297 //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 298 windows_[currentWindows]->lockExclusive(windowsRank_) ; 301 299 } 302 300 … … 304 302 { 305 303 if (!hasWindows) return ; 306 MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ;304 windows_[currentWindows]->unlockExclusive(windowsRank_) ; 307 305 } 308 306 … … 313 311 lockBuffer(); 314 312 // lock is acquired 315 MPI_Put(¬ify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;313 windows_[currentWindows]->put(¬ify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 316 314 unlockBuffer() ; 317 315 } … … 324 322 lockBuffer(); 325 323 // lock is acquired 326 MPI_Put(¬ify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;324 windows_[currentWindows]->put(¬ify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 327 325 unlockBuffer() ; 328 326 } -
XIOS3/trunk/src/buffer_server.hpp
r2323 r2547 7 7 #include "mpi.hpp" 8 8 #include "cxios.hpp" 9 #include "window_dynamic.hpp" 10 9 11 10 12 namespace xios … … 13 15 { 14 16 public: 15 CServerBuffer(vector< MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize bufSize) ;17 CServerBuffer(vector<CWindowDynamic*>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize bufSize) ; 16 18 ~CServerBuffer() ; 17 19 … … 37 39 size_t size; 38 40 size_t used ; // count of element occupied 39 std::vector< MPI_Win> windows_ ;41 std::vector<CWindowDynamic*> windows_ ; 40 42 std::vector<MPI_Aint> winAddress_ ; 41 43 bool resizingBuffer_ = false ; -
XIOS3/trunk/src/client.cpp
r2535 r2547 180 180 MPI_Comm_size(CXios::getXiosComm(), &xiosCommSize) ; 181 181 MPI_Comm_size(clientsComm_, &clientsCommSize) ; 182 if (xiosCommSize==clientsCommSize) CXios::set UsingServer() ;183 else CXios::set NotUsingServer() ;182 if (xiosCommSize==clientsCommSize) CXios::setNotUsingServer() ; 183 else CXios::setUsingServer() ; 184 184 185 185 ///////////////////////////////////////// -
XIOS3/trunk/src/cxios.cpp
r2535 r2547 313 313 } 314 314 315 315 void CXios::launchThreadManager(bool isXiosServer) 316 { 317 CThreadManager::initialize(isXiosServer) ; 318 } 319 316 320 void CXios::finalizeRegistryManager() 317 321 { … … 339 343 } 340 344 345 void CXios::finalizeThreadManager() 346 { 347 CThreadManager::finalize() ; 348 } 349 341 350 void CXios::finalizeDaemonsManager() 342 351 { -
XIOS3/trunk/src/cxios.hpp
r2535 r2547 11 11 #include "coupler_manager.hpp" 12 12 #include "registry_manager.hpp" 13 #include "thread_manager.hpp" 13 14 #include "mpi_garbage_collector.hpp" 14 15 … … 106 107 static void launchCouplerManager(bool isXiosServer) ; 107 108 static void launchRegistryManager(bool isXiosServer) ; 109 static void launchThreadManager(bool isXiosServer) ; 108 110 109 111 static void finalizeServicesManager() ; … … 113 115 static void finalizeCouplerManager() ; 114 116 static void finalizeRegistryManager() ; 117 static void finalizeThreadManager() ; 115 118 116 119 static CRegistryManager* getRegistryManager(void) { return registryManager_ ;} -
XIOS3/trunk/src/manager/daemons_manager.cpp
r2333 r2547 24 24 CXios::launchContextsManager(isXiosServer) ; 25 25 CXios::launchCouplerManager(isXiosServer) ; 26 CXios::launchThreadManager(isXiosServer) ; 26 27 27 28 if (isXiosServer) CServer::launchServersRessource(splitComm) ; … … 41 42 CXios::getServicesManager()->eventLoop() ; 42 43 CXios::getContextsManager()->eventLoop() ; 43 if (isServer_) { 44 if (CServer::isRoot) { 45 CServer::listenOasisEnddef() ; 46 CServer::listenRootOasisEnddef() ; 47 } 48 else { 49 CServer::listenRootOasisEnddef() ; 50 } 51 return CServer::getServersRessource()->eventLoop(false) ; 44 if (isServer_) 45 { 46 if (CServer::isRoot) 47 { 48 CServer::listenOasisEnddef() ; 49 CServer::listenRootOasisEnddef() ; 50 } 51 else CServer::listenRootOasisEnddef() ; 52 53 if (CThreadManager::isUsingThreads()) return CServer::getServersRessource()->isFinished() ; 54 else return CServer::getServersRessource()->eventLoop(false) ; 52 55 } 53 56 else return CXios::getPoolRessource()->eventLoop(false) ; … … 76 79 CXios::finalizeRessourcesManager() ; 77 80 CXios::finalizeRegistryManager() ; 81 CXios::finalizeThreadManager() ; 78 82 isFinalized_=true ; 79 83 } -
XIOS3/trunk/src/manager/pool_ressource.cpp
r2523 r2547 8 8 #include "timer.hpp" 9 9 #include "event_scheduler.hpp" 10 #include "thread_manager.hpp" 10 11 11 12 namespace xios … … 33 34 else eventScheduler_= make_shared<CEventScheduler>(poolComm) ; 34 35 freeRessourceEventScheduler_ = eventScheduler_ ; 36 std::hash<string> hashString ; 37 hashId_ = hashString("CPoolRessource::"+Id) ; 38 if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CPoolRessource::threadEventLoop, this) ; 39 } 40 41 void CPoolRessource::synchronize(void) 42 { 43 bool out=false ; 44 size_t timeLine=0 ; 45 46 eventScheduler_->registerEvent(timeLine, hashId_) ; 47 while (!out) 48 { 49 CThreadManager::yield() ; 50 out = eventScheduler_->queryEvent(timeLine,hashId_) ; 51 if (out) eventScheduler_->popEvent() ; 52 } 35 53 } 36 54 … … 122 140 MPI_Comm_rank(poolComm_, &commRank) ; 123 141 winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ; 124 if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ; 125 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ; 142 if (notifyType_==NOTIFY_CREATE_SERVICE) 143 { 144 if (CThreadManager::isUsingThreads()) synchronize() ; 145 createService() ; 146 } 147 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 148 { 149 if (CThreadManager::isUsingThreads()) synchronize() ; 150 createServiceOnto() ; 151 } 126 152 } 127 153 … … 233 259 } 234 260 CTimer::get("CPoolRessource::eventLoop").suspend(); 235 if (services_.empty() && finalizeSignal_) return true ; 236 else return false ; 237 } 261 if (services_.empty() && finalizeSignal_) finished_=true ; 262 return finished_ ; 263 } 264 265 void CPoolRessource::threadEventLoop(void) 266 { 267 CTimer::get("CPoolRessource::eventLoop").resume(); 268 info(100)<<"Launch Thread for CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ; 269 CThreadManager::threadInitialize() ; 270 271 do 272 { 273 274 double time=MPI_Wtime() ; 275 int flag ; 276 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 277 if (time-lastEventLoop_ > eventLoopLatency_) 278 { 279 //checkCreateServiceNotification() ; 280 checkNotifications() ; 281 lastEventLoop_=time ; 282 } 283 284 for(auto it=services_.begin();it!=services_.end();++it) 285 { 286 if (it->second->isFinished()) 287 { 288 delete it->second ; 289 services_.erase(it) ; 290 // destroy server_context -> to do later 291 break ; 292 } ; 293 } 294 295 CTimer::get("CPoolRessource::eventLoop").suspend(); 296 if (services_.empty() && finalizeSignal_) finished_=true ; 297 298 if (!finished_) CThreadManager::yield() ; 299 300 } while (!finished_) ; 301 302 CThreadManager::threadFinalize() ; 303 info(100)<<"Close thread for CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ; 304 } 305 238 306 /* 239 307 void CPoolRessource::checkCreateServiceNotification(void) … … 347 415 } 348 416 349 void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached417 void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients 350 418 { 351 419 services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, eventScheduler, Id_, serviceId, partitionId, type, nbPartitions) ; -
XIOS3/trunk/src/manager/pool_ressource.hpp
r2523 r2547 35 35 void createServiceOnto(const std::string& serviceId, int type, const std::string& OnServiceId) ; 36 36 bool eventLoop(bool serviceOnly=false) ; 37 void threadEventLoop(void) ; 37 38 bool hasService(const std::string serviceId, int partitionId) {return services_.count(make_tuple(serviceId,partitionId))>0 ;} 38 39 CService* getService(const std::string serviceId, int partitionId) { return services_[make_tuple(serviceId,partitionId)]; } … … 50 51 void createService(void) ; 51 52 void createServiceOnto(void) ; 53 void synchronize(void) ; 52 54 53 55 // void createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; … … 58 60 public: 59 61 void createNewServiceOnto(const std::string& serviceId, int type, const string& onServiceId) ; 60 61 private: 62 63 private: 64 bool finished_=false ; 65 public: 66 bool isFinished(void) { return finished_ ;} 67 68 private: 62 69 MPI_Comm poolComm_ ; 63 70 … … 74 81 std::string Id_ ; 75 82 bool finalizeSignal_ ; 76 83 77 84 const double eventLoopLatency_=0; 78 85 double lastEventLoop_=0. ; 79 86 80 87 private: 88 size_t hashId_ ; 81 89 shared_ptr<CEventScheduler> eventScheduler_ ; 82 90 shared_ptr<CEventScheduler> freeRessourceEventScheduler_ ; -
XIOS3/trunk/src/manager/server_context.cpp
r2517 r2547 6 6 #include "register_context_info.hpp" 7 7 #include "services.hpp" 8 #include "thread_manager.hpp" 8 9 #include "timer.hpp" 9 10 … … 50 51 info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank 51 52 <<" and global rank "<<globalRank<<endl ; 53 54 if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServerContext::threadEventLoop, this) ; 52 55 } 53 56 … … 84 87 } 85 88 86 MPI_Request req ; 87 MPI_Status status ; 88 MPI_Ibarrier(intraComm,&req) ; 89 90 int flag=false ; 91 while(!flag) 92 { 93 CXios::getDaemonsManager()->servicesEventLoop() ; 94 MPI_Test(&req,&flag,&status) ; 95 } 96 89 if (wait) 90 { 91 MPI_Request req ; 92 MPI_Status status ; 93 MPI_Ibarrier(intraComm,&req) ; 94 95 int flag=false ; 96 while(!flag) 97 { 98 CXios::getDaemonsManager()->servicesEventLoop() ; 99 MPI_Test(&req,&flag,&status) ; 100 } 101 } 102 97 103 MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ; 98 104 … … 117 123 MPI_Comm_size(contextComm_,&commSize ) ; 118 124 */ 119 if (nOverlap> 0 ) 120 { 121 while (get<0>(overlapedComm_[name_])==false) CXios::getDaemonsManager()->servicesEventLoop() ; 122 isAttachedMode_=true ; 123 cout<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ; 124 interCommClient=newInterCommClient ; 125 interCommServer=newInterCommServer ; 126 } 127 else if (nOverlap==0) 125 if (nOverlap==0) 128 126 { 129 cout<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;130 isAttachedMode_=false ;131 127 MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ; 132 128 MPI_Comm_dup(interCommClient, &interCommServer) ; … … 136 132 else 137 133 { 138 cout<<"CServerContext::createIntercomm : partial overlap ==> not managed"<<endl;134 ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ; 139 135 } 140 136 } … … 259 255 } 260 256 257 void CServerContext::threadEventLoop(void) 258 { 259 260 info(100)<<"Launch Thread for CServerContext::threadEventLoop, context id = "<<context_->getId()<<endl ; 261 CThreadManager::threadInitialize() ; 262 do 263 { 264 CTimer::get("CServerContext::eventLoop").resume(); 265 int flag ; 266 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 267 268 if (winNotify_!=nullptr) checkNotifications() ; 269 270 271 if (context_!=nullptr) 272 { 273 if (context_->eventLoop()) 274 { 275 info(100)<<"Remove context server with id "<<context_->getId()<<endl ; 276 CContext::removeContext(context_->getId()) ; 277 context_=nullptr ; 278 // destroy context ??? --> later 279 } 280 } 281 CTimer::get("CServerContext::eventLoop").suspend(); 282 if (context_==nullptr && finalizeSignal_) finished_=true ; 283 284 if (!finished_) CThreadManager::yield() ; 285 } 286 while (!finished_) ; 287 288 CThreadManager::threadFinalize() ; 289 info(100)<<"Close thread for CServerContext::threadEventLoop"<<endl ; 290 } 291 261 292 void CServerContext::createIntercomm(void) 262 293 { … … 280 311 MPI_Comm_size(contextComm_,&commSize ) ; 281 312 282 if (nOverlap==commSize) 283 { 284 info(10)<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ; 285 isAttachedMode_=true ; 286 interCommClient=get<2>(it->second) ; 287 interCommServer=get<1>(it->second) ; 288 context_ -> createClientInterComm(interCommClient, interCommServer ) ; 289 clientsInterComm_.push_back(interCommClient) ; 290 clientsInterComm_.push_back(interCommServer) ; 291 } 292 else if (nOverlap==0) 313 if (nOverlap==0) 293 314 { 294 315 info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ; 295 isAttachedMode_=false ;296 316 MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ; 297 317 MPI_Comm_dup(interCommServer,&interCommClient) ; … … 302 322 else 303 323 { 304 ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : partialoverlap ==> not managed") ;324 ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ; 305 325 } 306 326 -
XIOS3/trunk/src/manager/server_context.hpp
r2274 r2547 29 29 30 30 bool eventLoop(bool serviceOnly=false) ; 31 void threadEventLoop(void) ; 31 32 void notificationsDumpOut(CBufferOut& buffer) ; 32 33 void notificationsDumpIn(CBufferIn& buffer) ; 33 34 void finalizeSignal(void) ; 34 35 void freeComm(void) ; 35 bool isAttachedMode(void) { return isAttachedMode_ ;}36 36 CService* getParentService(void) {return parentService_ ; } 37 37 38 private : 39 bool finished_=false ; 40 public: 41 bool isFinished(void) { return finished_ ; } 38 42 private: 39 43 void createIntercomm(void) ; … … 60 64 bool finalizeSignal_ ; 61 65 bool hasNotification_ ; 62 bool isAttachedMode_ ;63 66 64 67 const double eventLoopLatency_=0; -
XIOS3/trunk/src/manager/servers_ressource.cpp
r2523 r2547 9 9 #include <vector> 10 10 #include <string> 11 #include "thread_manager.hpp" 11 12 12 13 … … 43 44 eventScheduler_ = make_shared<CEventScheduler>(freeRessourcesComm_) ; 44 45 freeRessourceEventScheduler_ = eventScheduler_ ; 46 if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServersRessource::threadEventLoop, this) ; 45 47 } 46 48 … … 131 133 if (poolRessource_!=nullptr) 132 134 { 133 if (poolRessource_->eventLoop(serviceOnly)) 135 poolRessource_->eventLoop(serviceOnly) ; 136 if (poolRessource_->isFinished()) 134 137 { 135 138 delete poolRessource_ ; … … 139 142 } 140 143 CTimer::get("CServersRessource::eventLoop").suspend(); 141 if (poolRessource_==nullptr && finalizeSignal_) return true ; 142 else return false ; 143 } 144 if (poolRessource_==nullptr && finalizeSignal_) finished_=true ; 145 return finished_ ; 146 } 147 148 void CServersRessource::threadEventLoop(void) 149 { 150 CTimer::get("CServersRessource::eventLoop").resume(); 151 info(100)<<"Launch Thread for CServersRessource::threadEventLoop"<<endl ; 152 CThreadManager::threadInitialize() ; 153 154 do 155 { 156 double time=MPI_Wtime() ; 157 int flag ; 158 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 159 160 if (time-lastEventLoop_ > eventLoopLatency_) 161 { 162 checkNotifications() ; 163 lastEventLoop_=time ; 164 } 165 166 if (poolRessource_!=nullptr) 167 { 168 if (poolRessource_->isFinished()) 169 { 170 delete poolRessource_ ; 171 poolRessource_=nullptr ; 172 // don't forget to free pool ressource later 173 } 174 } 175 CTimer::get("CServersRessource::eventLoop").suspend(); 176 if (poolRessource_==nullptr && finalizeSignal_) finished_=true ; 177 if (!finished_) CThreadManager::yield() ; 178 179 } while (!finished_) ; 180 181 CThreadManager::threadFinalize() ; 182 info(100)<<"Close thread for CServersRessource::threadEventLoop"<<endl ; ; 183 } 184 144 185 145 186 void CServersRessource::checkNotifications(void) … … 148 189 MPI_Comm_rank(serverComm_, &commRank) ; 149 190 winNotify_->popFromExclusiveWindow(commRank, this, &CServersRessource::notificationsDumpIn) ; 150 if (notifyInType_==NOTIFY_CREATE_POOL) createPool() ; 191 if (notifyInType_==NOTIFY_CREATE_POOL) 192 { 193 if (CThreadManager::isUsingThreads()) synchronize() ; 194 createPool() ; 195 } 151 196 else if (notifyInType_==NOTIFY_FINALIZE) finalizeSignal() ; 197 } 198 199 void CServersRessource::synchronize(void) 200 { 201 bool out=false ; 202 size_t timeLine=0 ; 203 std::hash<string> hashString ; 204 int commSize ; 205 MPI_Comm_size(freeRessourcesComm_,&commSize) ; 206 size_t hashId = hashString("CServersRessource::"+to_string(commSize)) ; 207 freeRessourceEventScheduler_->registerEvent(timeLine, hashId) ; 208 while (!out) 209 { 210 CThreadManager::yield() ; 211 out = eventScheduler_->queryEvent(timeLine,hashId) ; 212 if (out) eventScheduler_->popEvent() ; 213 } 152 214 } 153 215 -
XIOS3/trunk/src/manager/servers_ressource.hpp
r2523 r2547 55 55 const double eventLoopLatency_=0; 56 56 double lastEventLoop_=0. ; 57 57 58 58 private: 59 bool finished_=false; 60 public: 61 bool isFinished(void) { return finished_ ;} 62 private: 63 void synchronize(void) ; 64 void threadEventLoop(void) ; 59 65 shared_ptr<CEventScheduler> eventScheduler_ ; 60 66 shared_ptr<CEventScheduler> freeRessourceEventScheduler_ ; -
XIOS3/trunk/src/manager/services.cpp
r2523 r2547 5 5 #include "server_context.hpp" 6 6 #include "event_scheduler.hpp" 7 #include "thread_manager.hpp" 7 8 #include "timer.hpp" 8 9 … … 41 42 oss<<partitionId; 42 43 name_= poolId+"__"+serviceId+"_"+oss.str(); 44 45 if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CService::threadEventLoop, this) ; 43 46 } 44 47 … … 124 127 125 128 eventScheduler_->checkEvent() ; 129 126 130 for(auto it=contexts_.begin();it!=contexts_.end();++it) 127 131 { … … 134 138 } ; 135 139 } 140 136 141 CTimer::get("CService::eventLoop").suspend(); 137 142 if (contexts_.empty() && finalizeSignal_) return true ; 138 143 else return false ; 139 144 } 145 146 void CService::threadEventLoop(void) 147 { 148 info(100)<<"Launch Thread for CService::threadEventLoop, service id = "<<name_<<endl ; 149 CThreadManager::threadInitialize() ; 150 151 do 152 { 153 CTimer::get("CService::eventLoop").resume(); 154 int flag ; 155 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 156 157 // double time=MPI_Wtime() ; 158 // if (time-lastEventLoop_ > eventLoopLatency_) 159 // { 160 checkNotifications() ; 161 // lastEventLoop_=time ; 162 // } 163 164 165 eventScheduler_->checkEvent() ; 166 167 for(auto it=contexts_.begin();it!=contexts_.end();++it) 168 { 169 if (it->second->isFinished()) 170 { 171 delete it->second ; 172 contexts_.erase(it) ; 173 // destroy server_context -> to do later 174 break ; 175 } ; 176 } 177 178 CTimer::get("CService::eventLoop").suspend(); 179 if (contexts_.empty() && finalizeSignal_) finished_=true ; 180 if (!finished_) CThreadManager::yield() ; 181 } while (!finished_) ; 182 183 CThreadManager::threadFinalize() ; 184 info(100)<<"Close thread for CService::threadEventLoop, service id = "<<name_<<endl ; 185 } 186 140 187 141 188 void CService::sendNotification(int rank) -
XIOS3/trunk/src/manager/services.hpp
r2523 r2547 24 24 25 25 bool eventLoop(bool serviceOnly=false) ; 26 void threadEventLoop(void) ; 26 27 void createContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId) ; 27 28 void checkCreateContextNotification(void) ; … … 41 42 const MPI_Comm& getCommunicator(void) { return serviceComm_ ;} 42 43 44 private: 45 bool finished_=false ; 46 public: 47 bool isFinished(void) { return finished_; } 48 43 49 private: 44 50 void sendNotification(int rank) ; -
XIOS3/trunk/src/manager/token_manager.hpp
r2498 r2547 26 26 memset( winBufferRetrieved_, 0, windowSize ); 27 27 } 28 MPI_Win_lock_all(0, winCurrentToken_) ; 29 MPI_Win_lock_all(0, winRetrievedToken_) ; 28 30 } 29 31 32 ~CTokenManager() 33 { 34 MPI_Win_unlock_all(winCurrentToken_) ; 35 MPI_Win_unlock_all(winRetrievedToken_) ; 36 MPI_Win_free(&winCurrentToken_) ; 37 MPI_Win_free(&winRetrievedToken_) ; 38 } 39 30 40 size_t getToken(void) 31 41 { 32 42 size_t inc=1 ; 33 43 size_t token ; 34 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, leader_, 0, winCurrentToken_) ;35 44 MPI_Fetch_and_op(&inc, &token, MPI_SIZE_T, leader_, 0, MPI_SUM, winCurrentToken_) ; 36 MPI_Win_ unlock(leader_, winCurrentToken_);45 MPI_Win_flush(leader_, winCurrentToken_); 37 46 return token ; 38 47 } 39 48 40 bool lockToken(size_t token)49 bool checkToken(size_t token) 41 50 { 42 51 size_t tokenRead ; 43 MPI_Win_lock(MPI_LOCK_SHARED, leader_, 0, winRetrievedToken_) ; 44 MPI_Get(&tokenRead, 1, MPI_SIZE_T, leader_, 0, 1, MPI_SIZE_T, winRetrievedToken_ ) ; 45 MPI_Win_unlock(leader_, winRetrievedToken_) ; 46 if (token==tokenRead) return true ; 47 else return false ; 52 size_t inc=0 ; 53 MPI_Fetch_and_op(&inc, &tokenRead, MPI_SIZE_T, leader_, 0, MPI_NO_OP, winRetrievedToken_) ; 54 MPI_Win_flush(leader_, winRetrievedToken_); 55 return tokenRead==token ; 48 56 } 49 50 void unlockToken(size_t token) 57 58 void updateToken(size_t token) 59 { 60 size_t inc=1 ; 61 size_t tokenRead ; 62 MPI_Fetch_and_op(&inc, &tokenRead, MPI_SIZE_T, leader_, 0, MPI_SUM, winRetrievedToken_) ; 63 MPI_Win_flush(leader_, winRetrievedToken_); 64 if (token!=tokenRead) ERROR("void CTokenManager::unlockToken(size_t token)",<<"Cannot release token="<<token<< 65 " that is not corresponding to the locked token="<<tokenRead) ; 66 } 67 /* void unlockToken(size_t token) 51 68 { 52 69 size_t inc=1 ; … … 59 76 " that is not corresponding to the locked token="<<tokenRead) ; 60 77 } 61 78 */ 62 79 private: 63 80 -
XIOS3/trunk/src/manager/window_base.hpp
r2517 r2547 137 137 return MPI_Get(origin_addr, origin_count, origin_datatype, target_rank, target_disp + OFFSET_BUFFER, target_count, target_datatype, window_) ; 138 138 } 139 139 140 int fetchAndOp(const void *origin_addr, void *result_addr, MPI_Datatype datatype, int target_rank, MPI_Aint target_disp, MPI_Op op) 141 { 142 return MPI_Fetch_and_op(origin_addr, result_addr, datatype, target_rank, target_disp + OFFSET_BUFFER, op, window_ ) ; 143 } 144 140 145 int compareAndSwap(const void *origin_addr, const void *compare_addr, void *result_addr, MPI_Datatype datatype, 141 146 int target_rank, MPI_Aint target_disp) -
XIOS3/trunk/src/node/context.cpp
r2507 r2547 34 34 #include "services.hpp" 35 35 #include "contexts_manager.hpp" 36 #include "thread_manager.hpp" 36 37 #include <chrono> 37 38 #include <random> … … 488 489 } 489 490 contextId_ = getId() ; 490 491 attached_mode=true ;492 if (!CXios::isUsingServer()) attached_mode=false ;493 494 491 495 492 string contextRegistryId=getId() ; … … 544 541 MPI_Comm_dup(intraComm_, &intraCommClient); 545 542 comms.push_back(intraCommClient); 546 // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context547 543 548 544 CContextServer* server ; … … 578 574 if (commRank==0) 579 575 { 580 CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 581 for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 582 } 583 setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble (attached ???) 576 while (! CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions)) yield() ; 577 for(int i=0 ; i<nbPartitions; i++) 578 while (!CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId())) yield() ; 579 } 580 synchronize() ; 581 setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble 584 582 MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ; 585 583 … … 587 585 for(int i=0 ; i<nbPartitions; i++) 588 586 { 589 parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ;587 while (!parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer)) yield() ; 590 588 int type ; 591 if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 589 if (commRank==0) while (!CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type)) yield(); 590 synchronize() ; 592 591 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 592 593 593 string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; 594 594 … … 620 620 if (serviceType_ == CServicesManager::CLIENT) 621 621 { 622 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultWriterId, clientServers) ; 623 else if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 622 if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 624 623 else createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ; 625 624 … … 629 628 clientServers.clear() ; 630 629 631 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultReaderId, clientServers) ; 632 else createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 630 createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 633 631 readerClientOut_.push_back(clientServers[0].second.first) ; 634 632 readerServerOut_.push_back(clientServers[0].second.second) ; … … 658 656 else 659 657 { 660 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 661 else createServerInterComm(poolId, serviceId, retClientServers) ; 658 createServerInterComm(poolId, serviceId, retClientServers) ; 662 659 for(auto& retClientServer : retClientServers) clientServers.push_back(retClientServer.second) ; 663 660 664 661 int serviceType ; 665 if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType, true) ; 662 if (intraCommRank_==0) while(!CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType)) yield(); 663 synchronize() ; 666 664 MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 667 665 … … 694 692 void CContext::globalEventLoop(void) 695 693 { 696 lockContext() ; 697 CXios::getDaemonsManager()->eventLoop() ; 698 unlockContext() ; 699 setCurrent(getId()) ; 694 if (CThreadManager::isUsingThreads()) CThreadManager::yield(); 695 else 696 { 697 lockContext() ; 698 CXios::getDaemonsManager()->eventLoop() ; 699 unlockContext() ; 700 setCurrent(getId()) ; 701 } 700 702 } 701 703 704 void CContext::yield(void) 705 { 706 if (CThreadManager::isUsingThreads()) 707 { 708 CThreadManager::yield(); 709 setCurrent(getId()) ; 710 } 711 else 712 { 713 lockContext() ; 714 CXios::getDaemonsManager()->eventLoop() ; 715 unlockContext() ; 716 setCurrent(getId()) ; 717 } 718 } 719 720 void CContext::synchronize(void) 721 { 722 bool out, finished; 723 size_t timeLine=timeLine_ ; 724 725 timeLine_++ ; 726 eventScheduler_->registerEvent(timeLine, hashId_) ; 727 728 out = eventScheduler_->queryEvent(timeLine,hashId_) ; 729 if (out) eventScheduler_->popEvent() ; 730 while (!out) 731 { 732 yield() ; 733 out = eventScheduler_->queryEvent(timeLine,hashId_) ; 734 if (out) eventScheduler_->popEvent() ; 735 } 736 } 737 702 738 bool CContext::scheduledEventLoop(bool enableEventsProcessing) 703 739 { … … 761 797 if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end()) 762 798 { 763 bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 799 while(!CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm())) yield(); 800 synchronize() ; 764 801 765 802 MPI_Comm interComm, interCommClient, interCommServer ; 766 803 MPI_Comm intraCommClient, intraCommServer ; 767 804 768 if (ok)MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;805 MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 769 806 770 807 MPI_Comm_dup(intraComm_, &intraCommClient) ; … … 783 820 else if (couplerInClient_.find(fullContextId)==couplerInClient_.end()) 784 821 { 785 bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 822 while(!CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm())) yield() ; 823 synchronize() ; 786 824 787 825 MPI_Comm interComm, interCommClient, interCommServer ; 788 826 MPI_Comm intraCommClient, intraCommServer ; 789 827 790 if (ok)MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;828 MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 791 829 792 830 MPI_Comm_dup(intraComm_, &intraCommClient) ; … … 824 862 couplersInFinalized=true ; 825 863 for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ; 826 globalEventLoop() ; 864 if (CThreadManager::isUsingThreads()) yield() ; 865 else globalEventLoop() ; 827 866 } while (!couplersInFinalized) ; 828 867 … … 900 939 info(100)<<"DEBUG: context "<<getId()<<" release client reader ok"<<endl ; 901 940 } 941 closeAllFile() ; 902 942 } 903 943 else if (serviceType_==CServicesManager::GATHERER) 904 944 { 905 for(auto& client : writerClientOut_) 906 { 907 client->finalize(); 908 bool bufferReleased; 909 do 910 { 911 client->eventLoop(); 912 bufferReleased = !client->havePendingRequests(); 913 } while (!bufferReleased); 945 CContextClient* client ; 946 CContextServer* server ; 947 948 for(int n=0; n<writerClientOut_.size() ; n++) 949 { 950 client=writerClientOut_[n] ; 951 server=writerServerOut_[n] ; 952 953 client->finalize(); 954 bool bufferReleased; 955 do 956 { 957 client->eventLoop(); 958 bufferReleased = !client->havePendingRequests(); 959 } while (!bufferReleased); 914 960 915 bool notifiedFinalized=false ; 916 do 917 { 918 notifiedFinalized=client->isNotifiedFinalized() ; 919 } while (!notifiedFinalized) ; 920 client->releaseBuffers(); 961 bool notifiedFinalized=false ; 962 do 963 { 964 notifiedFinalized=client->isNotifiedFinalized() ; 965 } while (!notifiedFinalized) ; 966 server->releaseBuffers(); 967 client->releaseBuffers(); 921 968 } 922 969 closeAllFile(); 970 writerClientIn_[0]->releaseBuffers(); 971 writerServerIn_[0]->releaseBuffers(); 923 972 //ym writerClientIn & writerServerIn not released here ==> to check !! 924 973 } … … 957 1006 void CContext::setDefaultServices(void) 958 1007 { 959 defaultPoolWriterId_ = CXios::defaultPoolId ; 960 defaultPoolReaderId_ = CXios::defaultPoolId ; 961 defaultPoolGathererId_ = CXios::defaultPoolId ; 962 defaultWriterId_ = CXios::defaultWriterId ; 963 defaultReaderId_ = CXios::defaultReaderId ; 964 defaultGathererId_ = CXios::defaultGathererId ; 965 defaultUsingServer2_ = CXios::usingServer2 ; 1008 if (!CXios::isUsingServer()) 1009 { 1010 defaultPoolWriterId_ = CXios::defaultPoolId ; 1011 defaultPoolReaderId_ = CXios::defaultPoolId ; 1012 defaultPoolGathererId_ = CXios::defaultPoolId ; 1013 defaultWriterId_ = "attached" ; 1014 defaultReaderId_ = "attached" ; 1015 defaultGathererId_ = "attached" ; 1016 defaultUsingServer2_ = false; 1017 } 1018 else 1019 { 1020 defaultPoolWriterId_ = CXios::defaultPoolId ; 1021 defaultPoolReaderId_ = CXios::defaultPoolId ; 1022 defaultPoolGathererId_ = CXios::defaultPoolId ; 1023 defaultWriterId_ = CXios::defaultWriterId ; 1024 defaultReaderId_ = CXios::defaultReaderId ; 1025 defaultGathererId_ = CXios::defaultGathererId ; 1026 defaultUsingServer2_ = CXios::usingServer2 ; 966 1027 967 if (!default_pool.isEmpty()) defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 968 if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 969 if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 970 if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 971 if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 972 if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 973 if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 974 if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 1028 if (!default_pool.isEmpty()) defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 1029 if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 1030 if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 1031 if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 1032 if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 1033 if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 1034 if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 1035 if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 1036 } 975 1037 } 976 1038 -
XIOS3/trunk/src/node/context.hpp
r2509 r2547 112 112 bool scheduledEventLoop(bool enableEventsProcessing=true) ; 113 113 void globalEventLoop(void); 114 void yield(void) ; 115 void synchronize(void) ; 114 116 115 117 // Finalize a context … … 368 370 int getIntraCommRank(void) {return intraCommRank_;} 369 371 int getIntraCommSize(void) {return intraCommSize_;} 372 373 public: 374 shared_ptr<CEventScheduler> getEventScheduler(void) {return eventScheduler_ ;} 370 375 private: 371 376 shared_ptr<CEventScheduler> eventScheduler_ ; //! The local event scheduler for context -
XIOS3/trunk/src/node/pool_node.cpp
r2458 r2547 1 1 #include "pool_node.hpp" 2 2 #include "cxios.hpp" 3 #include "thread_manager.hpp" 3 4 #include<cmath> 4 5 … … 61 62 else ERROR("void CPoolNode::allocateRessources(void)",<<"Pool has no name or id, attributes <id> or <name> must be specified") 62 63 ressourcesManager->createPool(poolId, nbRessources) ; 63 ressourcesManager->waitPoolRegistration(poolId) ; 64 if (CThreadManager::isUsingThreads()) 65 while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 66 { 67 CXios::getDaemonsManager()->eventLoop() ; 68 CThreadManager::yield() ; 69 } 70 else ressourcesManager->waitPoolRegistration(poolId) ; 64 71 auto services=this->getAllServiceNodes() ; 65 72 for(auto& service : services) service->allocateRessources(poolId) ; -
XIOS3/trunk/src/node/service_node.cpp
r2458 r2547 60 60 else if (!hasAutoGeneratedId() ) serviceId=getId() ; 61 61 else ERROR("void CServiceNode::allocateRessources(const string& poolId)",<<"Service has no name or id, attributes <id> or <name> must be specified") 62 62 63 servicesManager->createServices(poolId, serviceId, serviceType, nbRessources, nb_partitions, true) ; 64 if (CThreadManager::isUsingThreads()) 65 for(int i=0; i<nb_partitions; i++) 66 while(!servicesManager->hasService(poolId, serviceId, i)) 67 { 68 CXios::getDaemonsManager()->eventLoop() ; 69 CThreadManager::yield() ; 70 } 71 else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 72 63 73 } 64 74 -
XIOS3/trunk/src/server.cpp
r2535 r2547 25 25 #include "workflow_graph.hpp" 26 26 #include "release_static_allocation.hpp" 27 #include "thread_manager.hpp" 27 28 #include <sys/stat.h> 28 29 #include <unistd.h> … … 68 69 if (!CXios::usingOasis) 69 70 { 70 if (!is_MPI_Initialized) MPI_Init(NULL, NULL); 71 if (!is_MPI_Initialized) 72 { 73 int required = MPI_THREAD_SERIALIZED ; 74 int provided ; 75 MPI_Init_thread(NULL,NULL, required, &provided) ; 76 } 71 77 72 78 // split the global communicator … … 99 105 else // using OASIS 100 106 { 101 if (!is_MPI_Initialized) driver_ = new CThirdPartyDriver(); 107 108 if (!is_MPI_Initialized) 109 { 110 int required = MPI_THREAD_SERIALIZED ; 111 int provided ; 112 MPI_Init_thread(NULL,NULL, required, &provided) ; 113 } 114 115 driver_ = new CThirdPartyDriver(); 102 116 103 117 driver_->getComponentCommunicator( serverComm ); … … 200 214 { 201 215 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 202 ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 216 if (CThreadManager::isUsingThreads()) 217 while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 218 { 219 daemonsManager->eventLoop() ; 220 CThreadManager::yield() ; 221 } 222 else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 223 203 224 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ; 204 servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 225 if (CThreadManager::isUsingThreads()) 226 while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,0)) 227 { 228 daemonsManager->eventLoop() ; 229 CThreadManager::yield() ; 230 } 231 else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 232 205 233 servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ; 206 servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 234 if (CThreadManager::isUsingThreads()) 235 { 236 daemonsManager->eventLoop() ; 237 while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) CThreadManager::yield() ; 238 } 239 else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 207 240 } 208 241 else … … 214 247 if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 215 248 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 216 ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 249 if (CThreadManager::isUsingThreads()) 250 while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 251 { 252 daemonsManager->eventLoop() ; 253 CThreadManager::yield() ; 254 } 255 else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 256 217 257 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 218 servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ; 258 if (CThreadManager::isUsingThreads()) 259 while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultGathererId,0)) 260 { 261 daemonsManager->eventLoop() ; 262 CThreadManager::yield() ; 263 } 264 else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ; 265 219 266 servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ; 220 servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 267 if (CThreadManager::isUsingThreads()) 268 while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) 269 { 270 daemonsManager->eventLoop() ; 271 CThreadManager::yield() ; 272 } 273 else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 274 221 275 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ; 222 servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 276 if (CThreadManager::isUsingThreads()) 277 for(int i=0; i<nbPoolsServer2; i++) 278 while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,i)) 279 { 280 daemonsManager->eventLoop() ; 281 CThreadManager::yield() ; 282 } 283 else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 223 284 } 224 285 } … … 233 294 { 234 295 daemonsManager->eventLoop() ; 296 if (CThreadManager::isUsingThreads()) CThreadManager::yield(); 235 297 MPI_Test(&req,&ok,&status) ; 236 298 } 237 299 238 300 239 testingEventScheduler() ;301 //testingEventScheduler() ; 240 302 /* 241 303 MPI_Request req ; … … 262 324 { 263 325 finished=daemonsManager->eventLoop() ; 326 if (CThreadManager::isUsingThreads()) CThreadManager::yield() ; 264 327 } 265 328 CTimer::get("XIOS event loop").suspend() ; -
XIOS3/trunk/src/transport/context_client.cpp
r2507 r2547 40 40 41 41 int flag; 42 MPI_Comm_test_inter(interComm, &flag); 43 if (flag) isAttached_=false ; 44 else isAttached_=true ; 45 46 if (flag) MPI_Comm_remote_size(interComm, &serverSize); 47 else MPI_Comm_size(interComm, &serverSize); 48 42 43 MPI_Comm_remote_size(interComm, &serverSize); 44 49 45 computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 50 46 -
XIOS3/trunk/src/transport/context_client.hpp
r2507 r2547 40 40 const std::list<int>& getRanksServerLeader(void) const; 41 41 const std::list<int>& getRanksServerNotLeader(void) const; 42 bool isAttachedModeEnabled() const { return isAttached_ ; }43 42 static void computeLeader(int clientRank, int clientSize, int serverSize, 44 43 std::list<int>& rankRecvLeader, … … 93 92 size_t hashId_ ; //!< hash id on the context client that will be used for context server to identify the remote calling context client. 94 93 95 bool isAttached_ ;96 94 CContextServer* associatedServer_ ; //!< The server associated to the pair client/server 97 95 }; -
XIOS3/trunk/src/transport/context_server.cpp
r2343 r2547 44 44 45 45 interComm=interComm_; 46 int flag; 47 MPI_Comm_test_inter(interComm,&flag); 48 49 if (flag) attachedMode=false ; 50 else attachedMode=true ; 46 MPI_Comm_remote_size(interComm,&clientSize_); 51 47 52 if (flag) MPI_Comm_remote_size(interComm,&clientSize_);53 else MPI_Comm_size(interComm,&clientSize_);54 55 48 SRegisterContextInfo contextInfo ; 56 49 CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ; … … 89 82 } 90 83 91 //! Attached mode is used ?92 //! \return true if attached mode is used, false otherwise93 bool CContextServer::isAttachedModeEnabled() const94 {95 return attachedMode ;96 }97 98 84 } -
XIOS3/trunk/src/transport/context_server.hpp
r2404 r2547 23 23 CContextServer(CContext* parent,MPI_Comm intraComm,MPI_Comm interComm) ; 24 24 ~CContextServer() {} 25 bool isAttachedModeEnabled() const ;26 25 void setAssociatedClient(CContextClient* associatedClient) {associatedClient_=associatedClient ;} 27 26 CContextClient* getAssociatedClient(void) { return associatedClient_ ;} … … 41 40 int commSize ; 42 41 int clientSize_ ; 43 44 bool attachedMode ; //! true if attached mode is enabled otherwise false45 42 46 43 CContext* context ; -
XIOS3/trunk/src/transport/legacy_context_client.cpp
r2528 r2547 24 24 \param [in] intraComm_ communicator of group client 25 25 \param [in] interComm_ communicator of group server 26 \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode ).26 \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode --> obsolete). 27 27 */ 28 28 CLegacyContextClient::CLegacyContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) … … 31 31 { 32 32 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 33 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 34 35 if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 36 else interCommMerged_ = interComm_; // interComm_ is yet an intracommunicator in attached 37 33 MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 38 34 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows 39 35 eventScheduler_ = parent->getEventScheduler() ; 40 36 timeLine = 1; 41 37 } … … 44 40 45 41 /*! 46 In case of attached mode, the current context must be reset to context for client47 42 \param [in] event Event sent to server 48 43 */ … … 105 100 } 106 101 107 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 108 { 109 while (checkBuffers(ranks)) callGlobalEventLoop() ; 110 111 CXios::getDaemonsManager()->scheduleContext(hashId_) ; 112 while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ; 113 } 114 115 MPI_Request req ; 116 MPI_Status status ; 117 MPI_Ibarrier(intraComm,&req) ; 118 int flag ; 119 MPI_Test(&req,&flag,&status) ; 120 while(!flag) 121 { 122 callGlobalEventLoop() ; 123 MPI_Test(&req,&flag,&status) ; 124 } 125 126 102 synchronize() ; 127 103 timeLine++; 128 104 } … … 148 124 list<CClientBuffer*>::iterator itBuffer; 149 125 bool areBuffersFree; 150 126 /* 151 127 for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 152 128 { … … 156 132 CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ; 157 133 size_t token = tokenManager->getToken() ; 158 while (!tokenManager-> lockToken(token)) callGlobalEventLoop() ;134 while (!tokenManager->checkToken(token)) callGlobalEventLoop() ; 159 135 newBuffer(*itServer); 160 136 it = buffers.find(*itServer); 161 137 checkAttachWindows(it->second,it->first) ; 162 tokenManager->u nlockToken(token) ;138 tokenManager->updateToken(token) ; 163 139 } 164 140 bufferList.push_back(it->second); 165 141 } 142 */ 143 map<int,MPI_Request> attachList ; 144 145 for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 146 { 147 it = buffers.find(*itServer); 148 if (it == buffers.end()) 149 { 150 newBuffer(*itServer); 151 it = buffers.find(*itServer); 152 checkAttachWindows(it->second, it->first, attachList) ; 153 } 154 bufferList.push_back(it->second); 155 } 156 157 while(!attachList.empty()) 158 { 159 auto it = attachList.begin() ; 160 while(it!=attachList.end()) 161 { 162 if (checkAttachWindows(buffers[it->first], it->first, attachList)) it=attachList.erase(it) ; 163 else ++it ; 164 } 165 166 yield() ; 167 } 168 166 169 167 170 double lastTimeBuffersNotFree=0. ; … … 193 196 checkBuffers(); 194 197 195 callGlobalEventLoop() ;198 yield() ; 196 199 } 197 200 … … 203 206 } 204 207 208 209 bool CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank, map<int, MPI_Request>& attachList) 210 { 211 int dummy; 212 bool ret=true; 213 214 if (!buffer->isAttachedWindows()) 215 { 216 // create windows dynamically for one-sided 217 /* 218 CTimer::get("create Windows").resume() ; 219 MPI_Comm interComm ; 220 int tag = 0 ; 221 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, tag, &interComm) ; 222 MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 223 MPI_Comm_free(&interComm) ; 224 225 buffer->attachWindows(winComm_[rank]) ; 226 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 227 MPI_Barrier(winComm_[rank]) ; 228 */ 229 if (attachList.count(rank)==0) 230 { 231 MPI_Irecv(&dummy,0,MPI_INT,clientSize+rank, 21, interCommMerged_, &attachList[rank]) ; 232 ret = false ; 233 } 234 else 235 { 236 MPI_Status status ; 237 int flag ; 238 MPI_Test(&attachList[rank],&flag, &status) ; 239 if (flag) 240 { 241 CTimer::get("create Windows").resume() ; 242 MPI_Comm interComm ; 243 int tag = 0 ; 244 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, tag, &interComm) ; 245 MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 246 MPI_Comm_free(&interComm) ; 247 248 buffer->attachWindows(winComm_[rank]) ; 249 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 250 MPI_Barrier(winComm_[rank]) ; 251 ret = true ; 252 } 253 else ret=false ; 254 } 255 } 256 return ret ; 257 } 258 259 205 260 void CLegacyContextClient::eventLoop(void) 206 261 { … … 211 266 { 212 267 locked_=true ; 213 context_-> globalEventLoop() ;268 context_->yield() ; 214 269 locked_=false ; 270 } 271 272 void CLegacyContextClient::yield(void) 273 { 274 locked_=true ; 275 context_->yield() ; 276 locked_=false ; 277 } 278 279 void CLegacyContextClient::synchronize(void) 280 { 281 if (context_->getServiceType()!=CServicesManager::CLIENT) 282 { 283 locked_=true ; 284 context_->synchronize() ; 285 locked_=false ; 286 } 215 287 } 216 288 /*! … … 226 298 maxEventSizes[rank] = CXios::minBufferSize; 227 299 } 228 229 int considerServers = 1; 230 if (isAttachedModeEnabled()) considerServers = 0; 231 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interCommMerged_, considerServers*clientSize+rank, mapBufferSize_[rank], maxEventSizes[rank]); 300 bool hasWindows = true ; 301 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interCommMerged_, clientSize+rank, mapBufferSize_[rank], hasWindows); 232 302 if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ; 233 303 else buffer->fixBuffer() ; 234 304 // Notify the server 305 235 306 CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint)); 236 307 MPI_Aint sendBuff[4] ; 237 308 sendBuff[0]=hashId_; 238 309 sendBuff[1]=mapBufferSize_[rank]; 239 sendBuff[2]=buffers[rank]->getWin Address(0);240 sendBuff[3]=buffers[rank]->getWin Address(1);241 info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWin Address(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl;242 bufOut->put(sendBuff, 310 sendBuff[2]=buffers[rank]->getWinBufferAddress(0); 311 sendBuff[3]=buffers[rank]->getWinBufferAddress(1); 312 info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinBufferAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinBufferAddress(1)<<endl; 313 bufOut->put(sendBuff,4); 243 314 buffer->checkBuffer(true); 244 /* 245 // create windows dynamically for one-sided 246 if (!isAttachedModeEnabled()) 247 { 248 CTimer::get("create Windows").resume() ; 249 MPI_Comm interComm ; 250 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 251 MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 252 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 253 MPI_Comm_free(&interComm) ; 254 windows_[rank].resize(2) ; 255 256 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 257 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 258 259 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 260 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 261 262 CTimer::get("create Windows").suspend() ; 263 } 264 else 265 { 266 winComm_[rank] = MPI_COMM_NULL ; 267 windows_[rank].resize(2) ; 268 windows_[rank][0] = MPI_WIN_NULL ; 269 windows_[rank][1] = MPI_WIN_NULL ; 270 } 271 buffer->attachWindows(windows_[rank]) ; 272 if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ; 273 */ 274 } 275 276 void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank) 277 { 278 if (!buffer->isAttachedWindows()) 279 { 280 // create windows dynamically for one-sided 281 if (!isAttachedModeEnabled()) 282 { 283 CTimer::get("create Windows").resume() ; 284 MPI_Comm interComm ; 285 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 286 MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 287 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 288 MPI_Comm_free(&interComm) ; 289 windows_[rank].resize(2) ; 290 291 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 292 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 293 294 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 295 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 296 297 CTimer::get("create Windows").suspend() ; 298 buffer->attachWindows(windows_[rank]) ; 299 MPI_Barrier(winComm_[rank]) ; 300 } 301 else 302 { 303 winComm_[rank] = MPI_COMM_NULL ; 304 windows_[rank].resize(2) ; 305 windows_[rank][0] = MPI_WIN_NULL ; 306 windows_[rank][1] = MPI_WIN_NULL ; 307 buffer->attachWindows(windows_[rank]) ; 308 } 309 310 } 311 } 312 313 315 316 } 317 318 314 319 315 320 /*! … … 336 341 buffers.clear(); 337 342 338 // don't know when release windows 339 340 //if (!isAttachedModeEnabled()) 341 //{ 342 // for(auto& it : winComm_) 343 // { 344 // int rank = it.first ; 345 // MPI_Win_free(&windows_[rank][0]); 346 // MPI_Win_free(&windows_[rank][1]); 347 // MPI_Comm_free(&winComm_[rank]) ; 348 // } 349 //} 343 for(auto& it : winComm_) 344 { 345 int rank = it.first ; 346 } 350 347 } 351 348 … … 469 466 bool CLegacyContextClient::isNotifiedFinalized(void) 470 467 { 471 if (isAttachedModeEnabled()) return true ;472 473 468 bool finalized = true; 474 469 map<int,CClientBuffer*>::iterator itBuff; -
XIOS3/trunk/src/transport/legacy_context_client.hpp
r2507 r2547 11 11 #include "registry.hpp" 12 12 #include "context_client.hpp" 13 #include "window_dynamic.hpp" 13 14 14 15 namespace xios … … 48 49 void getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers); 49 50 void newBuffer(int rank); 50 void checkAttachWindows(CClientBuffer* buffer , int rank) ;51 bool checkAttachWindows(CClientBuffer* buffer , int rank, map<int,MPI_Request>& attachList) ; 51 52 bool checkBuffers(list<int>& ranks); 52 53 bool checkBuffers(void); 53 54 void callGlobalEventLoop() ; 55 void yield(void) ; 56 void synchronize(void) ; 54 57 bool havePendingRequests(list<int>& ranks) ; 55 58 void setGrowableBuffer(void) { isGrowableBuffer_=true;} … … 76 79 77 80 std::map<int, MPI_Comm> winComm_ ; //! Window communicators 78 std::map<int, std::vector< MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2]81 std::map<int, std::vector<CWindowDynamic*> >windows_ ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2] 79 82 bool isGrowableBuffer_ = true ; 80 83 … … 82 85 83 86 bool locked_ = false ; //!< The context client is locked to avoid recursive checkBuffer 87 shared_ptr<CEventScheduler> eventScheduler_ ; 84 88 }; 85 89 } -
XIOS3/trunk/src/transport/legacy_context_server.cpp
r2528 r2547 43 43 finished=false; 44 44 45 if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 46 else interCommMerged_ = interComm_; // interComm_ is yet an intracommunicator in attached 45 MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 47 46 MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows 48 47 … … 50 49 51 50 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 52 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode53 54 51 } 55 52 … … 115 112 remoteHashId_ = recvBuff[0] ; 116 113 StdSize buffSize = recvBuff[1]; 117 vector<MPI_Aint> win Adress(2) ;118 win Adress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;114 vector<MPI_Aint> winBufferAddress(2) ; 115 winBufferAddress[0]=recvBuff[2] ; winBufferAddress[1]=recvBuff[3] ; 119 116 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 120 117 121 118 // create windows dynamically for one-sided 122 if (!isAttachedModeEnabled()) 123 { 124 CTimer::get("create Windows").resume() ; 125 MPI_Comm interComm ; 126 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ; 127 MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 128 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 129 MPI_Comm_free(&interComm) ; 130 windows_[rank].resize(2) ; 131 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 132 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 133 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 134 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 135 CTimer::get("create Windows").suspend() ; 136 MPI_Barrier(winComm_[rank]) ; 137 } 138 else 139 { 140 winComm_[rank] = MPI_COMM_NULL ; 141 windows_[rank].resize(2) ; 142 windows_[rank][0] = MPI_WIN_NULL ; 143 windows_[rank][1] = MPI_WIN_NULL ; 144 } 145 146 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 119 int dummy ; 120 MPI_Send(&dummy, 0, MPI_INT, rank, 21,interCommMerged_) ; 121 CTimer::get("create Windows").resume() ; 122 MPI_Comm interComm ; 123 int tag = 0 ; 124 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, tag , &interComm) ; 125 MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 126 MPI_Comm_free(&interComm) ; 127 windows_[rank].resize(2) ; 128 //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 129 //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 130 //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 131 //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 132 windows_[rank][0] = new CWindowDynamic() ; 133 windows_[rank][1] = new CWindowDynamic() ; 134 windows_[rank][0] -> create(winComm_[rank]) ; 135 windows_[rank][1] -> create(winComm_[rank]) ; 136 windows_[rank][0] -> setWinBufferAddress(winBufferAddress[0],0) ; 137 windows_[rank][1] -> setWinBufferAddress(winBufferAddress[1],0) ; 138 CTimer::get("create Windows").suspend() ; 139 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 140 MPI_Barrier(winComm_[rank]) ; 141 142 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winBufferAddress, 0, buffSize)))).first; 147 143 lastTimeLine[rank]=0 ; 148 144 itLastTimeLine=lastTimeLine.begin() ; … … 230 226 { 231 227 CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ; 232 if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 233 { 234 int rank ; 235 char *buffer ; 236 size_t count ; 237 238 if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 239 for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 240 { 241 rank=itLastTimeLine->first ; 242 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 243 { 244 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 245 if (count >= 0) ++itLastTimeLine ; 246 break ; 247 } 228 229 int rank ; 230 char *buffer ; 231 size_t count ; 232 233 if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 234 for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 235 { 236 rank=itLastTimeLine->first ; 237 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 238 { 239 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 240 if (count >= 0) ++itLastTimeLine ; 241 break ; 248 242 } 249 243 } … … 280 274 { 281 275 size_t newSize ; 282 vector<MPI_Aint> win Adress(2) ;283 newBuffer>>newSize>>win Adress[0]>>winAdress[1] ;276 vector<MPI_Aint> winBufferAdress(2) ; 277 newBuffer>>newSize>>winBufferAdress[0]>>winBufferAdress[1] ; 284 278 buffers[rank]->freeBuffer(count) ; 285 279 delete buffers[rank] ; 286 buffers[rank] = new CServerBuffer(windows_[rank], winAdress, 0, newSize) ; 280 windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0],0) ; 281 windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1],0) ; 282 buffers[rank] = new CServerBuffer(windows_[rank], winBufferAdress, 0, newSize) ; 287 283 info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank 288 <<" newSize : "<<newSize<<" Address : "<<win Adress[0]<<" & "<<winAdress[1]<<endl ;284 <<" newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ; 289 285 } 290 286 else … … 309 305 CEventServer* event; 310 306 311 // if (context->isProcessingEvent()) return ;312 307 if (isProcessingEvent_) return ; 313 if (isAttachedModeEnabled())314 if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;315 308 316 309 it=events.find(currentTimeLine); … … 321 314 if (event->isFull()) 322 315 { 323 if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side316 if (!scheduled) 324 317 { 325 318 eventScheduler_->registerEvent(currentTimeLine,hashId); … … 327 320 scheduled=true; 328 321 } 329 else if ( isAttachedModeEnabled() ||eventScheduler_->queryEvent(currentTimeLine,hashId) )322 else if (eventScheduler_->queryEvent(currentTimeLine,hashId) ) 330 323 { 331 324 if (!enableEventsProcessing && isCollectiveEvent(*event)) return ; … … 346 339 } 347 340 348 if (CXios::checkEventSync )341 if (CXios::checkEventSync && context->getServiceType()!=CServicesManager::CLIENT) 349 342 { 350 343 int typeId, classId, typeId_in, classId_in; … … 364 357 } 365 358 366 if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ; 367 //MPI_Barrier(intraComm) ; 368 // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes 369 // The best way to properly solve this problem will be to use the event scheduler also in attached mode 370 // for now just set up a MPI barrier 371 //ym to be check later 372 // if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ; 373 374 // context->setProcessingEvent() ; 375 isProcessingEvent_=true ; 376 CTimer::get("Process events").resume(); 377 info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 378 dispatchEvent(*event); 379 CTimer::get("Process events").suspend(); 380 isProcessingEvent_=false ; 381 // context->unsetProcessingEvent() ; 382 pendingEvent=false; 383 delete event; 384 events.erase(it); 385 currentTimeLine++; 386 scheduled = false; 387 if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ; 359 isProcessingEvent_=true ; 360 CTimer::get("Process events").resume(); 361 info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 362 eventScheduler_->popEvent() ; 363 dispatchEvent(*event); 364 CTimer::get("Process events").suspend(); 365 isProcessingEvent_=false ; 366 pendingEvent=false; 367 delete event; 368 events.erase(it); 369 currentTimeLine++; 370 scheduled = false; 388 371 } 389 372 } … … 409 392 void CLegacyContextServer::freeWindows() 410 393 { 411 //if (!isAttachedModeEnabled()) 412 //{ 413 // for(auto& it : winComm_) 414 // { 415 // int rank = it.first ; 416 // MPI_Win_free(&windows_[rank][0]); 417 // MPI_Win_free(&windows_[rank][1]); 418 // MPI_Comm_free(&winComm_[rank]) ; 419 // } 420 //} 394 for(auto& it : winComm_) 395 { 396 int rank = it.first ; 397 delete windows_[rank][0]; 398 delete windows_[rank][1]; 399 } 421 400 } 422 401 -
XIOS3/trunk/src/transport/legacy_context_server.hpp
r2343 r2547 61 61 std::map<int, StdSize> mapBufferSize_; 62 62 std::map<int,MPI_Comm> winComm_ ; //! Window communicators 63 std::map<int,std::vector< MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side.63 std::map<int,std::vector<CWindowDynamic*> >windows_ ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 64 64 bool isProcessingEvent_ ; 65 65 size_t remoteHashId_; //!< the hash is of the calling context client -
XIOS3/trunk/src/transport/one_sided_client_buffer.hpp
r2526 r2547 182 182 int serverRank_ ; 183 183 184 MPI_Comm interCommMerged_; 185 int intraServerRank_ ; 184 MPI_Comm interCommMerged_; 185 int intraServerRank_ ; 186 186 187 187 std::list<CBuffer*> buffers_ ; -
XIOS3/trunk/src/transport/one_sided_context_client.cpp
r2399 r2547 22 22 \param [in] intraComm_ communicator of group client 23 23 \param [in] interComm_ communicator of group server 24 \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode ).24 \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode --> obsolete ). 25 25 */ 26 26 COneSidedContextClient::COneSidedContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) … … 30 30 31 31 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 32 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 33 34 if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 32 33 MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 35 34 36 35 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows 37 36 eventScheduler_ = parent->getEventScheduler() ; 38 37 timeLine = 1; 39 38 } … … 109 108 { 110 109 if (CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").resume() ; 111 callGlobalEventLoop() ;110 yield() ; 112 111 } 113 112 } 114 113 if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").suspend() ; 115 114 116 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 117 { 118 while (checkBuffers(ranks)) callGlobalEventLoop() ; 119 120 CXios::getDaemonsManager()->scheduleContext(hashId_) ; 121 while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ; 122 } 115 116 synchronize() ; 123 117 124 118 timeLine++; … … 137 131 locked_=false ; 138 132 } 133 134 void COneSidedContextClient::yield(void) 135 { 136 locked_=true ; 137 context_->yield() ; 138 locked_=false ; 139 } 140 141 void COneSidedContextClient::synchronize(void) 142 { 143 if (context_->getServiceType()!=CServicesManager::CLIENT) 144 { 145 locked_=true ; 146 context_->synchronize() ; 147 locked_=false ; 148 } 149 } 150 139 151 /*! 140 152 Make a new buffer for a certain connection to server with specific rank … … 271 283 bool COneSidedContextClient::isNotifiedFinalized(void) 272 284 { 273 if (isAttachedModeEnabled()) return true ;274 285 275 286 bool finalized = true; -
XIOS3/trunk/src/transport/one_sided_context_client.hpp
r2507 r2547 48 48 void eventLoop(void) ; 49 49 void callGlobalEventLoop() ; 50 void yield() ; 51 void synchronize() ; 50 52 bool havePendingRequests(list<int>& ranks) ; 51 53 … … 81 83 82 84 bool locked_ = false ; //!< The context client is locked to avoid recursive checkBuffer 85 shared_ptr<CEventScheduler> eventScheduler_ ; 83 86 }; 84 87 } -
XIOS3/trunk/src/transport/one_sided_context_server.cpp
r2526 r2547 42 42 finished=false; 43 43 44 if (!isAttachedModeEnabled())MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;44 MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 45 45 MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows 46 46 … … 48 48 49 49 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 50 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode51 50 52 51 } … … 165 164 166 165 if (isProcessingEvent_) return ; 167 if (isAttachedModeEnabled())168 if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;169 166 170 167 auto it=completedEvents_.find(currentTimeLine); … … 174 171 if (it->second.nbSenders == it->second.currentNbSenders) 175 172 { 176 if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side173 if (!scheduled) 177 174 { 178 175 eventScheduler_->registerEvent(currentTimeLine,hashId); 179 176 scheduled=true; 180 177 } 181 else if ( isAttachedModeEnabled() ||eventScheduler_->queryEvent(currentTimeLine,hashId) )178 else if (eventScheduler_->queryEvent(currentTimeLine,hashId) ) 182 179 { 183 180 //if (!enableEventsProcessing && isCollectiveEvent(event)) return ; … … 198 195 } 199 196 200 if (!isAttachedModeEnabled())eventScheduler_->popEvent() ;197 eventScheduler_->popEvent() ; 201 198 202 199 isProcessingEvent_=true ; … … 214 211 currentTimeLine++; 215 212 scheduled = false; 216 if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;217 213 } 218 214 } … … 235 231 void COneSidedContextServer::freeWindows() 236 232 { 237 //if (!isAttachedModeEnabled())238 //{239 233 // for(auto& it : winComm_) 240 234 // { … … 244 238 // MPI_Comm_free(&winComm_[rank]) ; 245 239 // } 246 //}247 240 } 248 241
Note: See TracChangeset
for help on using the changeset viewer.