Changeset 1547 for XIOS/dev/dev_ym
- Timestamp:
- 06/20/18 09:09:23 (6 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_ONE_SIDED/src
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp
r1227 r1547 12 12 size_t CClientBuffer::maxRequestSize = 0; 13 13 14 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize , StdSize maxBufferedEvents)14 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 15 15 : interComm(interComm) 16 16 , serverRank(serverRank) … … 20 20 , current(0) 21 21 , count(0) 22 , bufferedEvents(0)23 , maxBufferedEvents(maxBufferedEvents)24 22 , pending(false) 25 { 26 buffer[0] = new char[bufferSize]; // transform it with MPI_ALLOC_MEM later 27 buffer[1] = new char[bufferSize]; 23 , hasWindows(false) 24 { 25 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 26 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 27 28 29 buffer[0] = bufferHeader[0]+headerSize ; 30 buffer[1] = bufferHeader[1]+headerSize ; 31 firstTimeLine[0]=(size_t*)bufferHeader[0] ; 32 firstTimeLine[1]=(size_t*)bufferHeader[1] ; 33 bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 34 bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 35 control[0]=(size_t*)bufferHeader[0] +2 ; 36 control[1]=(size_t*)bufferHeader[1] +2 ; 37 38 *firstTimeLine[0]=0 ; 39 *firstTimeLine[1]=0 ; 40 *bufferCount[0]=0 ; 41 *bufferCount[1]=0 ; 42 *control[0]=0 ; 43 *control[1]=0 ; 44 winState[0]=false ; 45 winState[1]=false ; 28 46 retBuffer = new CBufferOut(buffer[current], bufferSize); 29 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << " with a maximum of " << maxBufferedEvents << " buffered events" <<endl;47 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 30 48 } 31 49 32 50 CClientBuffer::~CClientBuffer() 33 51 { 34 delete [] buffer[0]; 35 delete [] buffer[1]; 36 delete retBuffer; 52 freeWindows() ; 53 MPI_Free_mem(bufferHeader[0]) ; 54 MPI_Free_mem(bufferHeader[1]) ; 55 delete retBuffer; 56 } 57 58 void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 59 { 60 MPI_Barrier(oneSidedComm) ; 61 MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 62 MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 63 64 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 65 *firstTimeLine[0]=0 ; 66 *bufferCount[0]=0 ; 67 *control[0]=0 ; 68 MPI_Win_unlock(0, windows[0]) ; 69 70 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 71 *firstTimeLine[1]=0 ; 72 *bufferCount[1]=0 ; 73 *control[1]=0 ; 74 MPI_Win_unlock(0, windows[1]) ; 75 winState[0]=false ; 76 winState[1]=false ; 77 MPI_Barrier(oneSidedComm) ; 78 hasWindows=true ; 79 } 80 81 void CClientBuffer::freeWindows() 82 { 83 if (hasWindows) 84 { 85 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 86 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 87 *control[0]=2 ; 88 *control[1]=2 ; 89 MPI_Win_unlock(0, windows[1]) ; 90 MPI_Win_unlock(0, windows[0]) ; 91 92 MPI_Win_free(&windows[0]) ; 93 MPI_Win_free(&windows[1]) ; 94 hasWindows=false ; 95 } 96 } 97 98 void CClientBuffer::lockBuffer(void) 99 { 100 if (hasWindows) 101 { 102 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[current]) ; 103 winState[current]=true ; 104 } 105 } 106 107 void CClientBuffer::unlockBuffer(void) 108 { 109 if (hasWindows) 110 { 111 MPI_Win_unlock(0, windows[current]) ; 112 winState[current]=false ; 113 } 37 114 } 38 115 … … 44 121 bool CClientBuffer::isBufferFree(StdSize size) 45 122 { 123 bool loop=true ; 124 while (loop) 125 { 126 lockBuffer(); 127 if (*control[current]==0) loop=false ; // attemp to read from server ? 128 else unlockBuffer() ; 129 } 130 46 131 if (size > bufferSize) 47 132 ERROR("bool CClientBuffer::isBufferFree(StdSize size)", … … 59 144 } 60 145 61 62 return (size <= remain() && bufferedEvents < maxBufferedEvents);63 } 64 65 66 CBufferOut* CClientBuffer::getBuffer( StdSize size)146 count=*bufferCount[current] ; 147 return (size <= remain()); 148 } 149 150 151 CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) 67 152 { 68 153 if (size <= remain()) 69 154 { 155 info(100)<<"count "<<count<<" bufferCount[current] "<<*bufferCount[current]<<endl ; 70 156 retBuffer->realloc(buffer[current] + count, size); 71 157 count += size; 72 bufferedEvents++; 158 if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 159 *bufferCount[current]=count ; 73 160 return retBuffer; 74 161 } … … 81 168 } 82 169 83 bool CClientBuffer::checkBuffer( void)170 bool CClientBuffer::checkBuffer(bool send) 84 171 { 85 172 MPI_Status status; … … 96 183 if (!pending) 97 184 { 185 if (!send) return false ; 98 186 if (count > 0) 99 187 { 100 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 101 pending = true; 102 if (current == 1) current = 0; 103 else current = 1; 104 count = 0; 105 bufferedEvents = 0; 188 lockBuffer() ; 189 if (*control[current]==0 && bufferCount[current] > 0) 190 { 191 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 192 pending = true; 193 *control[current]=0 ; 194 *firstTimeLine[current]=0 ; 195 *bufferCount[current]=0 ; 196 197 unlockBuffer() ; 198 199 if (current == 1) current = 0; 200 else current = 1; 201 count = 0; 202 } 203 else unlockBuffer() ; 106 204 } 107 205 } … … 112 210 bool CClientBuffer::hasPendingRequest(void) 113 211 { 212 213 lockBuffer() ; 214 count=*bufferCount[current] ; 215 unlockBuffer() ; 216 114 217 return (pending || count > 0); 115 218 } 219 220 116 221 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.hpp
r1227 r1547 14 14 static size_t maxRequestSize; 15 15 16 CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize , StdSize maxBufferedEvents);16 CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize); 17 17 ~CClientBuffer(); 18 18 void createWindows(MPI_Comm oneSidedComm) ; 19 void freeWindows(void) ; 20 void lockBuffer(void) ; 21 void unlockBuffer(void) ; 22 19 23 bool isBufferFree(StdSize size); 20 CBufferOut* getBuffer( StdSize size);21 bool checkBuffer( void);24 CBufferOut* getBuffer(size_t timeLine, StdSize size); 25 bool checkBuffer(bool send=false); 22 26 bool hasPendingRequest(void); 23 27 StdSize remain(void); … … 25 29 private: 26 30 char* buffer[2]; 27 31 char* bufferHeader[2]; 32 size_t* firstTimeLine[2] ; 33 size_t* bufferCount[2] ; 34 size_t* control[2] ; 35 bool winState[2] ; 28 36 int current; 29 37 30 38 StdSize count; 31 StdSize bufferedEvents;32 39 StdSize maxEventSize; 33 const StdSize maxBufferedEvents;34 40 const StdSize bufferSize; 35 41 const StdSize estimatedMaxEventSize; … … 43 49 CBufferOut* retBuffer; 44 50 const MPI_Comm interComm; 51 MPI_Win windows[2] ; 52 bool hasWindows ; 53 static const int headerSize=3*sizeof(size_t); 45 54 }; 46 55 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.cpp
r885 r1547 7 7 { 8 8 9 CServerBuffer::CServerBuffer(StdSize buffSize) 9 CServerBuffer::CServerBuffer(StdSize buffSize) : hasWindows(false) 10 10 { 11 11 size = 3 * buffSize; … … 13 13 current = 1; 14 14 end = size; 15 used=0 ; 15 16 buffer = new char[size]; // use MPI_ALLOC_MEM later? 17 currentWindows=0 ; 16 18 } 17 19 … … 21 23 } 22 24 25 void CServerBuffer::updateCurrentWindows(void) 26 { 27 if (currentWindows==0) currentWindows=1 ; 28 else currentWindows=0 ; 29 } 30 31 void CServerBuffer::createWindows(MPI_Comm oneSidedComm) 32 { 33 MPI_Barrier(oneSidedComm) ; 34 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 35 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 36 hasWindows=true ; 37 MPI_Barrier(oneSidedComm) ; 38 } 39 40 bool CServerBuffer::freeWindows() 41 { 42 if (hasWindows) 43 { 44 size_t header[3] ; 45 size_t& control=header[2] ; 46 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[0]) ; 47 MPI_Get(&control, 1, MPI_LONG_LONG_INT, 0 , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ; 48 MPI_Win_unlock(0,windows[0]) ; 49 if (control==2) // ok for free windows 50 { 51 MPI_Win_free( &(windows[0])) ; 52 MPI_Win_free( &(windows[1])) ; 53 hasWindows=false ; 54 return true ; 55 } 56 else return false ; 57 } 58 else return true ; 59 } 23 60 24 61 bool CServerBuffer::isBufferFree(size_t count) … … 72 109 } 73 110 111 bool CServerBuffer::isBufferEmpty(void) 112 { 113 if (used==0) return true ; 114 else return false; 115 } 74 116 75 117 void* CServerBuffer::getBuffer(size_t count) … … 128 170 } 129 171 172 used+=count ; 130 173 return ret ; 131 174 } … … 167 210 } 168 211 } 169 } 170 212 used-=count ; 213 } 214 215 bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 216 { 217 if (!hasWindows) return false ; 218 219 220 size_t header[3] ; 221 size_t& clientTimeline=header[0] ; 222 size_t& clientCount=header[1] ; 223 size_t& control=header[2] ; 224 bool ok=false ; 225 226 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[currentWindows]) ; 227 228 MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, 0 , 0, 1, MPI_LONG_LONG_INT,windows[currentWindows]) ; 229 MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, 0 , 1*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[currentWindows]) ; 230 control=1 ; 231 MPI_Put(&control, 1, MPI_LONG_LONG_INT, 0 , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[currentWindows]) ; 232 233 MPI_Win_unlock(0,windows[currentWindows]) ; 234 235 if (timeLine==clientTimeline) 236 { 237 238 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[currentWindows]) ; 239 buffer=(char*)getBuffer(clientCount) ; 240 count=clientCount ; 241 MPI_Get(buffer, clientCount, MPI_CHAR, 0, 3*sizeof(size_t) , clientCount, MPI_CHAR, windows[currentWindows]) ; 242 clientTimeline = 0 ; 243 clientCount = 0 ; 244 control=0 ; 245 MPI_Put(&header[0], 3, MPI_LONG_LONG_INT, 0, 0 , 3, MPI_LONG_LONG_INT,windows[currentWindows]) ; 246 247 MPI_Win_unlock(0,windows[currentWindows]) ; 248 ok=true ; 249 } 250 else 251 { 252 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[currentWindows]) ; 253 control=0 ; 254 MPI_Put(&control, 1, MPI_LONG_LONG_INT, 0 , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[currentWindows]) ; 255 MPI_Win_unlock(0,windows[currentWindows]) ; 256 } 257 258 if (ok) return true ; 259 260 return false ; 261 } 262 263 171 264 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.hpp
r717 r1547 18 18 void* getBuffer(size_t count) ; 19 19 void freeBuffer(size_t count) ; 20 20 void createWindows(MPI_Comm oneSidedComm) ; 21 bool freeWindows(void) ; 22 bool getBufferFromClient(size_t timeLine, char* & buffer, size_t& count) ; 23 bool isBufferEmpty(void) ; 24 void updateCurrentWindows(void) ; 21 25 private: 22 26 char* buffer; … … 25 29 size_t end; 26 30 size_t size; 31 size_t used ; // count of element occupied 32 MPI_Win windows[2] ; 33 int currentWindows ; 34 bool hasWindows ; 27 35 }; 28 36 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.cpp
r1475 r1547 24 24 : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4) 25 25 { 26 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 27 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 28 26 29 context = parent; 27 30 intraComm = intraComm_; … … 37 40 computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 38 41 39 timeLine = 0; 42 if (flag) MPI_Intercomm_merge(interComm_,false,&interCommMerged) ; 43 44 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 45 46 timeLine = 1; 40 47 } 41 48 … … 116 123 list<int> sizes = event.getSizes(); 117 124 118 // We force the getBuffers call to be non-blocking on classical servers125 // We force the getBuffers call to be non-blocking on classical servers 119 126 list<CBufferOut*> buffList; 120 bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 121 // bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer ); 122 123 if (couldBuffer) 124 { 125 event.send(timeLine, sizes, buffList); 126 127 checkBuffers(ranks); 128 129 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 130 { 131 waitEvent(ranks); 132 CContext::setCurrent(context->getId()); 133 } 134 } 135 else 136 { 137 tmpBufferedEvent.ranks = ranks; 138 tmpBufferedEvent.sizes = sizes; 139 140 for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 141 tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 142 info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ; 143 event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 127 // bool couldBuffer = getBuffers(timeLine, ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 128 getBuffers(timeLine, ranks, sizes, buffList) ; 129 130 event.send(timeLine, sizes, buffList); 131 unlockBuffers(ranks) ; 132 133 checkBuffers(ranks); 134 135 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 136 { 137 waitEvent(ranks); 138 CContext::setCurrent(context->getId()); 144 139 } 145 140 } 146 141 147 142 timeLine++; 148 }149 150 /*!151 * Send the temporarily buffered event (if any).152 *153 * \return true if a temporarily buffered event could be sent, false otherwise154 */155 bool CContextClient::sendTemporarilyBufferedEvent()156 {157 bool couldSendTmpBufferedEvent = false;158 159 if (hasTemporarilyBufferedEvent())160 {161 list<CBufferOut*> buffList;162 if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call163 {164 list<CBufferOut*>::iterator it, itBuffer;165 166 for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++)167 (*itBuffer)->put((char*)(*it)->start(), (*it)->count());168 169 info(100)<<"DEBUG : temporaly event sent "<<endl ;170 checkBuffers(tmpBufferedEvent.ranks);171 172 tmpBufferedEvent.clear();173 174 couldSendTmpBufferedEvent = true;175 }176 }177 178 return couldSendTmpBufferedEvent;179 143 } 180 144 … … 203 167 * it is explicitly requested to be non-blocking. 204 168 * 169 * 170 * \param [in] timeLine time line of the event which will be sent to servers 205 171 * \param [in] serverList list of rank of connected server 206 172 * \param [in] sizeList size of message corresponding to each connection … … 209 175 * \return whether the already allocated buffers could be used 210 176 */ 211 bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,177 bool CContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 212 178 bool nonBlocking /*= false*/) 213 179 { … … 229 195 } 230 196 197 if (CXios::isServer) info(100)<<" getBuffers : entering loop"<<endl ; 231 198 CTimer::get("Blocking time").resume(); 232 199 do … … 234 201 areBuffersFree = true; 235 202 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 203 { 236 204 areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 237 205 } 206 207 if (CXios::isServer) info(100)<<" getBuffers : areBuffersFree ? "<<areBuffersFree<<endl ; ; 238 208 if (!areBuffersFree) 239 209 { 210 for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 211 if (CXios::isServer) info(100)<<" getBuffers : buffers unlocked "<<endl ; 240 212 checkBuffers(); 241 if (C Server::serverLevel == 0)242 context->server->listen();243 213 if (CXios::isServer) info(100)<<" getBuffers : buffers checked "<<endl ; 214 if (CServer::serverLevel == 0) context->server->listen(); 215 if (CXios::isServer) info(100)<<" getBuffers : server listened... "<<endl ; 244 216 else if (CServer::serverLevel == 1) 245 217 { 246 218 context->server->listen(); 247 for (int i = 0; i < context->serverPrimServer.size(); ++i) 248 context->serverPrimServer[i]->listen(); 219 for (int i = 0; i < context->serverPrimServer.size(); ++i) context->serverPrimServer[i]->listen(); 249 220 CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 250 221 } 251 222 252 else if (CServer::serverLevel == 2) 253 context->server->listen(); 223 else if (CServer::serverLevel == 2) context->server->listen(); 254 224 255 225 } 256 226 } while (!areBuffersFree && !nonBlocking); 257 227 if (CXios::isServer) info(100)<<" getBuffers : out of loop"<<endl ; 258 228 CTimer::get("Blocking time").suspend(); 259 229 … … 261 231 { 262 232 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 263 retBuffers.push_back((*itBuffer)->getBuffer( *itSize));264 } 265 233 retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 234 } 235 if (CXios::isServer) info(100)<<" getBuffers : message pushed"<<endl ; 266 236 return areBuffersFree; 267 237 } … … 279 249 maxEventSizes[rank] = CXios::minBufferSize; 280 250 } 281 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank] , maxBufferedEvents);251 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank]); 282 252 // Notify the server 283 CBufferOut* bufOut = buffer->getBuffer( sizeof(StdSize));253 CBufferOut* bufOut = buffer->getBuffer(0, sizeof(StdSize)); 284 254 bufOut->put(mapBufferSize_[rank]); // Stupid C++ 285 buffer->checkBuffer(); 255 buffer->checkBuffer(true); 256 257 if (!isAttachedModeEnabled()) // create windows only in server mode 258 { 259 MPI_Comm OneSidedInterComm, oneSidedComm ; 260 MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &OneSidedInterComm ); 261 MPI_Intercomm_merge(OneSidedInterComm,false,&oneSidedComm); 262 info(100)<<"DEBUG: before creating windows (client)"<<endl ; 263 buffer->createWindows(oneSidedComm) ; 264 info(100)<<"DEBUG: after creating windows (client)"<<endl ; 265 } 266 286 267 } 287 268 … … 295 276 bool pending = false; 296 277 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 297 pending |= itBuff->second->checkBuffer( );278 pending |= itBuff->second->checkBuffer(!pureOneSided); 298 279 return pending; 299 280 } … … 305 286 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 306 287 { 288 // CEventClient event(CContext::GetType(), CContext::EVENT_ID_CLOSE_P2P_CHANNEL); 289 // CMessage msg; 290 // event.push(itBuff->first, 1, msg); 291 // timeLine = std::numeric_limits<size_t>::max() ; 292 // sendEvent(event); 293 // while (itBuff->second->checkBuffer(!pureOneSided)); 307 294 delete itBuff->second; 308 295 } … … 310 297 } 311 298 299 /*! 300 Lock the buffers for one sided communications 301 \param [in] ranks list rank of server to which client connects to 302 */ 303 void CContextClient::lockBuffers(list<int>& ranks) 304 { 305 list<int>::iterator it; 306 for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer(); 307 } 308 309 /*! 310 Unlock the buffers for one sided communications 311 \param [in] ranks list rank of server to which client connects to 312 */ 313 void CContextClient::unlockBuffers(list<int>& ranks) 314 { 315 list<int>::iterator it; 316 for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer(); 317 } 318 312 319 /*! 313 320 Verify state of buffers corresponding to a connection … … 319 326 list<int>::iterator it; 320 327 bool pending = false; 321 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer( );328 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided); 322 329 return pending; 323 330 } … … 333 340 mapBufferSize_ = mapSize; 334 341 maxEventSizes = maxEventSize; 335 336 // Compute the maximum number of events that can be safely buffered.337 double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max();338 for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it)339 {340 double ratio = double(it->second) / maxEventSizes[it->first];341 if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio;342 }343 MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm);344 345 if (minBufferSizeEventSizeRatio < 1.0)346 {347 ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)",348 << "The buffer sizes and the maximum events sizes are incoherent.");349 }350 else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max())351 minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception352 353 maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server354 + size_t(minBufferSizeEventSizeRatio) // one local buffer can always be fully used355 + 1; // the other local buffer might contain only one event356 342 } 357 343 … … 408 394 { 409 395 map<int,CClientBuffer*>::iterator itBuff; 396 std::list<int>::iterator ItServerLeader; 397 410 398 bool stop = false; 411 399 412 CTimer::get("Blocking time").resume();413 while (hasTemporarilyBufferedEvent())414 {415 checkBuffers();416 sendTemporarilyBufferedEvent();417 }418 CTimer::get("Blocking time").suspend();419 400 int* nbServerConnectionLocal = new int[serverSize] ; 401 int* nbServerConnectionGlobal = new int[serverSize] ; 402 for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ; 403 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) nbServerConnectionLocal[itBuff->first]=1 ; 404 for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++) nbServerConnectionLocal[*ItServerLeader]=1 ; 405 406 MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm); 407 420 408 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 409 CMessage msg; 410 411 for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ; 412 sendEvent(event); 413 414 delete[] nbServerConnectionLocal ; 415 delete[] nbServerConnectionGlobal ; 416 /* 421 417 if (isServerLeader()) 422 418 { … … 431 427 } 432 428 else sendEvent(event); 429 */ 433 430 434 431 CTimer::get("Blocking time").resume(); 435 // while (!stop) 436 { 437 checkBuffers(); 438 if (hasTemporarilyBufferedEvent()) 439 sendTemporarilyBufferedEvent(); 440 441 stop = true; 442 // for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 443 } 432 checkBuffers(); 444 433 CTimer::get("Blocking time").suspend(); 445 434 -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.hpp
r1232 r1547 31 31 // Send event to server 32 32 void sendEvent(CEventClient& event); 33 bool sendTemporarilyBufferedEvent();34 33 void waitEvent(list<int>& ranks); 35 34 36 35 // Functions to set/get buffers 37 bool getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false);36 bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 38 37 void newBuffer(int rank); 39 38 bool checkBuffers(list<int>& ranks); … … 48 47 49 48 bool isAttachedModeEnabled() const; 50 bool hasTemporarilyBufferedEvent() const { return !tmpBufferedEvent.isEmpty(); };51 49 52 50 static void computeLeader(int clientRank, int clientSize, int serverSize, … … 71 69 int serverSize; //!< Size of server group 72 70 73 MPI_Comm interComm; //!< Communicator of server group 71 MPI_Comm interComm; //!< Communicator of server group (interCommunicator) 72 73 MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 74 74 75 75 MPI_Comm intraComm; //!< Communicator of client group 76 76 77 MPI_Comm commSelf; //!< Communicator of the client alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 78 77 79 map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers 78 80 81 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 82 79 83 private: 84 void lockBuffers(list<int>& ranks) ; 85 void unlockBuffers(list<int>& ranks) ; 86 80 87 //! Mapping of server and buffer size for each connection to server 81 88 std::map<int,StdSize> mapBufferSize_; … … 84 91 //! Maximum number of events that can be buffered 85 92 StdSize maxBufferedEvents; 86 87 struct {88 std::list<int> ranks, sizes;89 std::list<CBufferOut*> buffers;90 91 bool isEmpty() const { return ranks.empty(); };92 void clear() {93 ranks.clear();94 sizes.clear();95 96 for (std::list<CBufferOut*>::iterator it = buffers.begin(); it != buffers.end(); it++)97 delete *it;98 99 buffers.clear();100 };101 } tmpBufferedEvent; //! Event temporarily buffered (used only on the server)102 93 103 94 //! Context for server (Only used in attached mode) -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.cpp
r1230 r1547 33 33 int flag; 34 34 MPI_Comm_test_inter(interComm,&flag); 35 36 if (flag) attachedMode=false ; 37 else attachedMode=true ; 38 35 39 if (flag) MPI_Comm_remote_size(interComm,&commSize); 36 40 else MPI_Comm_size(interComm,&commSize); 37 41 38 currentTimeLine=0; 42 43 currentTimeLine=1; 39 44 scheduled=false; 40 45 finished=false; … … 44 49 else 45 50 hashId=hashString(context->getId()); 46 } 47 51 52 if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 53 54 MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ; 55 itLastTimeLine=lastTimeLine.begin() ; 56 57 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 58 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 59 60 } 61 62 //! Attached mode is used ? 63 //! \return true if attached mode is used, false otherwise 64 bool CContextServer::isAttachedModeEnabled() const 65 { 66 return attachedMode ; 67 } 68 48 69 void CContextServer::setPendingEvent(void) 49 70 { … … 63 84 bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 64 85 { 86 // info(100)<<"CContextServer::eventLoop : listen"<<endl ; 65 87 listen(); 88 // info(100)<<"CContextServer::eventLoop : checkPendingRequest"<<endl ; 66 89 checkPendingRequest(); 67 if (enableEventsProcessing) 68 processEvents(); 90 // info(100)<<"CContextServer::eventLoop : process events"<<endl ; 91 if (enableEventsProcessing) processEvents(); 92 // info(100)<<"CContextServer::eventLoop : finished "<<finished<<endl ; 69 93 return finished; 70 94 } … … 121 145 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 122 146 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 147 if (!isAttachedModeEnabled()) 148 { 149 MPI_Comm OneSidedInterComm, oneSidedComm ; 150 MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0, &OneSidedInterComm ); 151 MPI_Intercomm_merge(OneSidedInterComm,true,&oneSidedComm); 152 info(100)<<"DEBUG: before creating windows (server)"<<endl ; 153 buffers[rank]->createWindows(oneSidedComm) ; 154 info(100)<<"DEBUG: before creating windows (server)"<<endl ; 155 } 156 lastTimeLine[rank]=0 ; 157 itLastTimeLine=lastTimeLine.begin() ; 158 123 159 return true; 124 160 } … … 157 193 if (flag==true) 158 194 { 195 buffers[rank]->updateCurrentWindows() ; 159 196 recvRequest.push_back(rank); 160 197 MPI_Get_count(&status,MPI_CHAR,&count); … … 170 207 } 171 208 209 void CContextServer::getBufferFromClient(size_t timeLine) 210 { 211 if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 212 { 213 int rank ; 214 char *buffer ; 215 size_t count ; 216 217 if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 218 for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 219 { 220 rank=itLastTimeLine->first ; 221 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0) 222 { 223 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 224 { 225 info(100)<<"get buffer from client : timeLine "<<timeLine<<endl ; 226 processRequest(rank, buffer, count); 227 break ; 228 } 229 } 230 } 231 } 232 } 233 234 172 235 void CContextServer::processRequest(int rank, char* buff,int count) 173 236 { … … 176 239 char* startBuffer,endBuffer; 177 240 int size, offset; 178 size_t timeLine ;241 size_t timeLine=0; 179 242 map<size_t,CEventServer*>::iterator it; 180 243 244 181 245 CTimer::get("Process request").resume(); 182 246 while(count>0) … … 185 249 CBufferIn newBuffer(startBuffer,buffer.remain()); 186 250 newBuffer>>size>>timeLine; 187 251 info(100)<<"new event : timeLine : "<<timeLine<<" size : "<<size<<endl ; 188 252 it=events.find(timeLine); 189 253 if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first; … … 193 257 count=buffer.remain(); 194 258 } 259 260 if (timeLine>0) lastTimeLine[rank]=timeLine ; 261 195 262 CTimer::get("Process request").suspend(); 196 263 } … … 230 297 } 231 298 } 232 } 299 else getBufferFromClient(currentTimeLine) ; 300 } 301 else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line 233 302 } 234 303 … … 237 306 map<int,CServerBuffer*>::iterator it; 238 307 for(it=buffers.begin();it!=buffers.end();++it) delete it->second; 308 } 309 310 void CContextServer::releaseBuffers() 311 { 312 map<int,CServerBuffer*>::iterator it; 313 bool out ; 314 do 315 { 316 out=true ; 317 for(it=buffers.begin();it!=buffers.end();++it) 318 { 319 out = out && it->second->freeWindows() ; 320 321 } 322 } while (! out) ; 239 323 } 240 324 … … 254 338 finished=true; 255 339 info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl; 340 releaseBuffers() ; 256 341 context->finalize(); 257 342 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.hpp
r1228 r1547 19 19 bool listenPendingRequest(MPI_Status& status) ; 20 20 void checkPendingRequest(void) ; 21 void getBufferFromClient(size_t timeLine) ; 21 22 void processRequest(int rank, char* buff,int count) ; 22 23 void processEvents(void) ; … … 25 26 void setPendingEvent(void) ; 26 27 bool hasPendingEvent(void) ; 27 28 bool isAttachedModeEnabled() const; 29 void releaseBuffers(void) ; 30 28 31 MPI_Comm intraComm ; 29 32 int intraCommSize ; … … 33 36 int commSize ; 34 37 38 MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 39 40 MPI_Comm commSelf; //!< Communicator of the server alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 41 35 42 map<int,CServerBuffer*> buffers ; 43 map<int,size_t> lastTimeLine ; //!< last event time line for a processed request 44 map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine 36 45 map<int,MPI_Request> pendingRequest ; 37 46 map<int,char*> bufferRequest ; … … 44 53 bool pendingEvent ; 45 54 bool scheduled ; /*!< event of current timeline is alreading scheduled ? */ 55 bool attachedMode ; //! true if attached mode is enabled otherwise false 56 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 57 46 58 size_t hashId ; 47 59
Note: See TracChangeset
for help on using the changeset viewer.