Changeset 2455
- Timestamp:
- 01/09/23 18:41:19 (2 years ago)
- Location:
- XIOS3/dev/XIOS_FILE_SERVICES/src
- Files:
-
- 2 added
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/dev/XIOS_FILE_SERVICES/src/buffer_client.cpp
r2324 r2455 94 94 void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 95 95 { 96 isAttachedWindows_=true ; 96 97 windows_=windows ; 97 98 if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; … … 170 171 bool CClientBuffer::isBufferFree(StdSize size) 171 172 { 172 173 if (!isAttachedWindows_) return false; 174 173 175 lockBuffer(); 174 176 count=*bufferCount[current] ; -
XIOS3/dev/XIOS_FILE_SERVICES/src/buffer_client.hpp
r2260 r2455 34 34 void fixBuffer(void) { isGrowableBuffer_=false ;} 35 35 void attachWindows(vector<MPI_Win>& windows) ; 36 bool isAttachedWindows(void) { return isAttachedWindows_ ;} 36 37 private: 37 38 void resizeBuffer(size_t newSize) ; … … 69 70 const MPI_Comm interComm; 70 71 std::vector<MPI_Win> windows_ ; 71 bool hasWindows ;72 72 bool hasWindows=false ; 73 bool isAttachedWindows_=false ; 73 74 double latency_=0 ; 74 75 double lastCheckedWithNothing_=0 ; -
XIOS3/dev/XIOS_FILE_SERVICES/src/manager/ressources_manager.cpp
r2454 r2455 2 2 #include "server.hpp" 3 3 #include "servers_ressource.hpp" 4 #include "token_manager.hpp" 4 5 #include "timer.hpp" 5 6 … … 21 22 if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 22 23 else commRank=0 ; 24 tokenManager_ = new CTokenManager(xiosComm_,commRank) ; 25 23 26 MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ; 24 27 -
XIOS3/dev/XIOS_FILE_SERVICES/src/manager/ressources_manager.hpp
r2453 r2455 11 11 #include "window_manager.hpp" 12 12 #include "pool_ressource.hpp" 13 #include "token_manager.hpp" 13 14 14 15 … … 53 54 void waitPoolRegistration(const string& poolId) ; 54 55 56 55 57 void registerServerLeader(int leaderRank) ; 56 58 void registerRessourcesSize(int size) ; 57 59 void registerPoolClient(const std::string& poolId,int size,int leader) ; 58 60 void registerPoolServer(const std::string& poolId,int size,int leader) ; 61 CTokenManager* getTokenManager(void) {return tokenManager_ ;} 59 62 60 63 int managerGlobalLeader_ ; … … 63 66 64 67 CWindowManager* winNotify_ ; 68 CTokenManager* tokenManager_ ; 65 69 66 70 const size_t maxBufferSize_=1024*1024 ; -
XIOS3/dev/XIOS_FILE_SERVICES/src/node/context.cpp
r2453 r2455 738 738 for(auto couplerOut : couplerOutClient_) couplerOut.second->eventLoop(); 739 739 for(auto couplerIn : couplerInClient_) couplerIn.second->eventLoop(); 740 for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 741 for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 740 //for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 741 //for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 742 for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(); 743 for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(); 742 744 } 743 745 setCurrent(getId()) ; -
XIOS3/dev/XIOS_FILE_SERVICES/src/transport/legacy_context_client.cpp
r2343 r2455 13 13 #include "server.hpp" 14 14 #include "services.hpp" 15 #include "ressources_manager.hpp" 15 16 #include <boost/functional/hash.hpp> 16 17 #include <random> … … 110 111 } 111 112 113 MPI_Request req ; 114 MPI_Status status ; 115 MPI_Ibarrier(intraComm,&req) ; 116 int flag ; 117 MPI_Test(&req,&flag,&status) ; 118 while(!flag) 119 { 120 callGlobalEventLoop() ; 121 MPI_Test(&req,&flag,&status) ; 122 } 123 124 112 125 timeLine++; 113 126 } … … 126 139 * \return whether the already allocated buffers could be used 127 140 */ 128 bool CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 129 bool nonBlocking /*= false*/) 141 void CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers) 130 142 { 131 143 list<int>::const_iterator itServer, itSize; … … 134 146 list<CClientBuffer*>::iterator itBuffer; 135 147 bool areBuffersFree; 136 148 137 149 for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 138 150 { … … 140 152 if (it == buffers.end()) 141 153 { 154 CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ; 155 size_t token = tokenManager->getToken() ; 156 while (!tokenManager->lockToken(token)) callGlobalEventLoop() ; 142 157 newBuffer(*itServer); 143 158 it = buffers.find(*itServer); 159 checkAttachWindows(it->second,it->first) ; 160 tokenManager->unlockToken(token) ; 144 161 } 145 162 bufferList.push_back(it->second); … … 177 194 } 178 195 179 } while (!areBuffersFree && !nonBlocking);196 } while (!areBuffersFree); 180 197 CTimer::get("Blocking time").suspend(); 181 198 182 if (areBuffersFree) 183 { 184 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 185 retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 186 } 187 return areBuffersFree; 199 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 200 retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 188 201 } 189 202 … … 225 238 bufOut->put(sendBuff, 4); 226 239 buffer->checkBuffer(true); 227 240 /* 228 241 // create windows dynamically for one-sided 229 242 if (!isAttachedModeEnabled()) … … 254 267 buffer->attachWindows(windows_[rank]) ; 255 268 if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ; 256 257 } 258 269 */ 270 } 271 272 void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank) 273 { 274 if (!buffer->isAttachedWindows()) 275 { 276 // create windows dynamically for one-sided 277 if (!isAttachedModeEnabled()) 278 { 279 CTimer::get("create Windows").resume() ; 280 MPI_Comm interComm ; 281 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 282 MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 283 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 284 MPI_Comm_free(&interComm) ; 285 windows_[rank].resize(2) ; 286 287 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 288 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 289 290 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 291 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 292 293 CTimer::get("create Windows").suspend() ; 294 buffer->attachWindows(windows_[rank]) ; 295 MPI_Barrier(winComm_[rank]) ; 296 } 297 else 298 { 299 winComm_[rank] = MPI_COMM_NULL ; 300 windows_[rank].resize(2) ; 301 windows_[rank][0] = MPI_WIN_NULL ; 302 windows_[rank][1] = MPI_WIN_NULL ; 303 buffer->attachWindows(windows_[rank]) ; 304 } 305 306 } 307 } 308 309 310 259 311 /*! 260 312 Verify state of buffers. Buffer is under pending state if there is no message on it -
XIOS3/dev/XIOS_FILE_SERVICES/src/transport/legacy_context_client.hpp
r2343 r2455 45 45 46 46 // Functions to set/get buffers 47 bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false);47 void getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers); 48 48 void newBuffer(int rank); 49 void checkAttachWindows(CClientBuffer* buffer , int rank) ; 49 50 bool checkBuffers(list<int>& ranks); 50 51 bool checkBuffers(void); -
XIOS3/dev/XIOS_FILE_SERVICES/src/transport/legacy_context_server.cpp
r2433 r2455 95 95 96 96 traceOff(); 97 MPI_Improbe(MPI_ANY_SOURCE, 20, interComm,&flag,&message, &status);97 MPI_Improbe(MPI_ANY_SOURCE, 20, interComm,&flag,&message, &status); 98 98 traceOn(); 99 99 if (flag==true) listenPendingRequest(message, status) ; -
XIOS3/dev/XIOS_FILE_SERVICES/src/transport/one_sided_context_client.hpp
r2343 r2455 41 41 42 42 // Functions to set/get buffers 43 bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false);43 // bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 44 44 void newBuffer(int rank); 45 45 bool checkBuffers(list<int>& ranks);
Note: See TracChangeset
for help on using the changeset viewer.