Changeset 595 for XIOS/trunk/src/context_client.cpp
- Timestamp:
- 05/26/15 16:13:46 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/context_client.cpp
r591 r595 21 21 \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode) 22 22 */ 23 CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)23 CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) 24 24 : mapBufferSize_(), parentServer(cxtSer) 25 25 { 26 context=parent ; 27 intraComm=intraComm_ ; 28 interComm=interComm_ ; 29 MPI_Comm_rank(intraComm,&clientRank) ; 30 MPI_Comm_size(intraComm,&clientSize) ; 31 32 int flag ; 33 MPI_Comm_test_inter(interComm,&flag) ; 34 if (flag) MPI_Comm_remote_size(interComm,&serverSize); 35 else MPI_Comm_size(interComm,&serverSize) ; 36 37 timeLine=0 ; 38 26 context = parent; 27 intraComm = intraComm_; 28 interComm = interComm_; 29 MPI_Comm_rank(intraComm, &clientRank); 30 MPI_Comm_size(intraComm, &clientSize); 31 32 int flag; 33 MPI_Comm_test_inter(interComm, &flag); 34 if (flag) MPI_Comm_remote_size(interComm, &serverSize); 35 else MPI_Comm_size(interComm, &serverSize); 36 37 if (clientSize < serverSize) 38 { 39 int serverByClient = serverSize / clientSize; 40 int remain = serverSize % clientSize; 41 int rankStart = serverByClient * clientRank; 42 43 if (clientRank < remain) 44 { 45 serverByClient++; 46 rankStart += clientRank; 47 } 48 else 49 rankStart += remain; 50 51 for (int i = 0; i < serverByClient; i++) 52 ranksServerLeader.push_back(rankStart + i); 53 } 54 else 55 { 56 int clientByServer = clientSize / serverSize; 57 int remain = clientSize % serverSize; 58 59 if (clientRank < (clientByServer + 1) * remain) 60 { 61 if (clientRank % (clientByServer + 1) == 0) 62 ranksServerLeader.push_back(clientRank / (clientByServer + 1)); 63 } 64 else 65 { 66 int rank = clientRank - (clientByServer + 1) * remain; 67 if (rank % clientByServer == 0) 68 ranksServerLeader.push_back(remain + rank / clientByServer); 69 } 70 } 71 72 timeLine = 0; 39 73 } 40 74 … … 45 79 void CContextClient::sendEvent(CEventClient& event) 46 80 { 47 list<int>::iterator itServer ; 48 list<int> ranks ; 49 list<int> sizes ; 50 list<int>::iterator itSize ; 51 52 ranks=event.getRanks() ; 53 if (! event.isEmpty()) 54 { 55 sizes=event.getSizes() ; 56 CMessage msg ; 57 58 msg<<*(sizes.begin())<<timeLine ; 59 for(list<int>::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ; 60 list<CBufferOut*> buffList=getBuffers(ranks,sizes) ; 61 62 list<CBufferOut*>::iterator it ; 63 for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize) 64 { 65 **it<<*itSize<<timeLine ; 66 } 67 event.send(buffList) ; 68 checkBuffers(ranks) ; 69 } 70 71 // if (context->hasServer) 72 if (0 != parentServer) 81 list<int>::iterator itServer; 82 list<int> ranks; 83 list<int> sizes; 84 list<int>::iterator itSize; 85 86 ranks = event.getRanks(); 87 if (!event.isEmpty()) 88 { 89 sizes = event.getSizes(); 90 CMessage msg; 91 92 msg << *(sizes.begin()) << timeLine; 93 for (list<int>::iterator it = sizes.begin(); it != sizes.end(); it++) *it += msg.size(); 94 list<CBufferOut*> buffList = getBuffers(ranks, sizes); 95 96 list<CBufferOut*>::iterator it; 97 for (it = buffList.begin(), itSize = sizes.begin(); it != buffList.end(); ++it, ++itSize) 98 { 99 **it << *itSize << timeLine; 100 } 101 event.send(buffList); 102 checkBuffers(ranks); 103 } 104 105 if (0 != parentServer) // context->hasServer 73 106 { 74 107 waitEvent(ranks); … … 76 109 } 77 110 78 timeLine++ 111 timeLine++; 79 112 } 80 113 … … 85 118 void CContextClient::sendBufferSizeEvent() 86 119 { 87 std::map<int, 88 std::map<int, 120 std::map<int,CClientBuffer*>::iterator it, itE; 121 std::map<int,StdSize>::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end(); 89 122 90 123 if (itMap == iteMap) 91 ERROR(" CBufferOut* CContextClient::sendBufferSizeEvent() ;",124 ERROR("void CContextClient::sendBufferSizeEvent()", 92 125 <<"No information about server buffer, that should not happen..."); 93 126 … … 115 148 void CContextClient::waitEvent(list<int>& ranks) 116 149 { 117 // context->server->setPendingEvent() 118 // while (checkBuffers(ranks))150 // context->server->setPendingEvent(); 151 // while (checkBuffers(ranks)) 119 152 // { 120 // context->server->listen() 121 // context->server->checkPendingRequest() 153 // context->server->listen(); 154 // context->server->checkPendingRequest(); 122 155 // } 123 156 // 124 // while (context->server->hasPendingEvent())157 // while (context->server->hasPendingEvent()) 125 158 // { 126 // context->server->eventLoop() 159 // context->server->eventLoop(); 127 160 // } 128 161 129 parentServer->server->setPendingEvent() 130 while (checkBuffers(ranks))131 { 132 parentServer->server->listen() 133 parentServer->server->checkPendingRequest() 134 } 135 136 while (parentServer->server->hasPendingEvent())137 { 138 parentServer->server->eventLoop() 162 parentServer->server->setPendingEvent(); 163 while (checkBuffers(ranks)) 164 { 165 parentServer->server->listen(); 166 parentServer->server->checkPendingRequest(); 167 } 168 169 while (parentServer->server->hasPendingEvent()) 170 { 171 parentServer->server->eventLoop(); 139 172 } 140 173 } … … 148 181 list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList) 149 182 { 150 list<int>::iterator itServer, itSize;151 list<CClientBuffer*> bufferList 152 map<int,CClientBuffer*>::iterator it 153 list<CClientBuffer*>::iterator itBuffer 154 list<CBufferOut*> retBuffer 155 bool free 156 157 for (itServer=serverList.begin();itServer!=serverList.end();itServer++)158 { 159 it =buffers.find(*itServer);160 if (it ==buffers.end())161 { 162 newBuffer(*itServer) 163 it =buffers.find(*itServer);164 } 165 bufferList.push_back(it->second) 166 } 167 free =false;183 list<int>::iterator itServer, itSize; 184 list<CClientBuffer*> bufferList; 185 map<int,CClientBuffer*>::iterator it; 186 list<CClientBuffer*>::iterator itBuffer; 187 list<CBufferOut*> retBuffer; 188 bool free; 189 190 for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 191 { 192 it = buffers.find(*itServer); 193 if (it == buffers.end()) 194 { 195 newBuffer(*itServer); 196 it = buffers.find(*itServer); 197 } 198 bufferList.push_back(it->second); 199 } 200 free = false; 168 201 169 202 CTimer::get("Blocking time").resume(); 170 while (!free)171 { 172 free =true;173 for (itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)174 { 175 (*itBuffer)->checkBuffer() 176 free &=(*itBuffer)->isBufferFree(*itSize);203 while (!free) 204 { 205 free = true; 206 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 207 { 208 (*itBuffer)->checkBuffer(); 209 free &= (*itBuffer)->isBufferFree(*itSize); 177 210 } 178 211 } 179 212 CTimer::get("Blocking time").suspend(); 180 213 181 for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++) 182 { 183 retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ; 184 } 185 return retBuffer ; 186 214 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 215 { 216 retBuffer.push_back((*itBuffer)->getBuffer(*itSize)); 217 } 218 return retBuffer; 187 219 } 188 220 … … 193 225 void CContextClient::newBuffer(int rank) 194 226 { 195 buffers[rank] =new CClientBuffer(interComm,rank, mapBufferSize_[rank]);227 buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank]); 196 228 } 197 229 … … 202 234 bool CContextClient::checkBuffers(void) 203 235 { 204 map<int,CClientBuffer*>::iterator itBuff 205 bool pending =false;206 for (itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer();207 return pending 236 map<int,CClientBuffer*>::iterator itBuff; 237 bool pending = false; 238 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) pending |= itBuff->second->checkBuffer(); 239 return pending; 208 240 } 209 241 … … 211 243 void CContextClient::releaseBuffers(void) 212 244 { 213 map<int,CClientBuffer*>::iterator itBuff 214 for (itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second;245 map<int,CClientBuffer*>::iterator itBuff; 246 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) delete itBuff->second; 215 247 } 216 248 … … 222 254 bool CContextClient::checkBuffers(list<int>& ranks) 223 255 { 224 list<int>::iterator it 225 bool pending =false;226 for (it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer();227 return pending 256 list<int>::iterator it; 257 bool pending = false; 258 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(); 259 return pending; 228 260 } 229 261 … … 232 264 \param [in] mapSize mapping rank of connected server to size of allocated buffer 233 265 */ 234 void CContextClient::setBufferSize(const std::map<int, 266 void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize) 235 267 { 236 268 mapBufferSize_ = mapSize; … … 238 270 } 239 271 240 /*! 241 Get leading server in the group of connected server 242 \return rank of leading server 243 */ 244 int CContextClient::getServerLeader(void) 245 { 246 int clientByServer=clientSize/serverSize ; 247 int remain=clientSize%serverSize ; 248 249 if (clientRank<(clientByServer+1)*remain) 250 { 251 return clientRank/(clientByServer+1) ; 252 } 253 else 254 { 255 int rank=clientRank-(clientByServer+1)*remain ; 256 int nbServer=serverSize-remain ; 257 return remain+rank/clientByServer ; 258 } 259 } 260 261 /*! 262 Check if client connects to leading server 263 \return connected(true), not connected (false) 264 */ 265 bool CContextClient::isServerLeader(void) 266 { 267 int clientByServer=clientSize/serverSize ; 268 int remain=clientSize%serverSize ; 269 270 if (clientRank<(clientByServer+1)*remain) 271 { 272 if (clientRank%(clientByServer+1)==0) return true ; 273 else return false ; 274 } 275 else 276 { 277 int rank=clientRank-(clientByServer+1)*remain ; 278 int nbServer=serverSize-remain ; 279 if (rank%clientByServer==0) return true ; 280 else return false ; 281 } 282 } 272 /*! 273 Get leading server in the group of connected server 274 \return ranks of leading servers 275 */ 276 const std::list<int>& CContextClient::getRanksServerLeader(void) const 277 { 278 return ranksServerLeader; 279 } 280 281 /*! 282 Check if client connects to leading server 283 \return connected(true), not connected (false) 284 */ 285 bool CContextClient::isServerLeader(void) const 286 { 287 return !ranksServerLeader.empty(); 288 } 283 289 284 290 /*! … … 287 293 void CContextClient::finalize(void) 288 294 { 289 map<int,CClientBuffer*>::iterator itBuff 290 bool stop =true;291 292 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);295 map<int,CClientBuffer*>::iterator itBuff; 296 bool stop = true; 297 298 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 293 299 if (isServerLeader()) 294 300 { 295 CMessage msg ; 296 event.push(getServerLeader(),1,msg) ; 297 sendEvent(event) ; 301 CMessage msg; 302 const std::list<int>& ranks = getRanksServerLeader(); 303 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 304 event.push(*itRank, 1, msg); 305 sendEvent(event); 298 306 } 299 else sendEvent(event) 307 else sendEvent(event); 300 308 301 309 CTimer::get("Blocking time").resume(); 302 while (stop)310 while (stop) 303 311 { 304 checkBuffers() 305 stop =false;306 for (itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest();312 checkBuffers(); 313 stop = false; 314 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest(); 307 315 } 308 316 CTimer::get("Blocking time").suspend(); 309 317 310 std::map<int, 311 318 std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 319 iteMap = mapBufferSize_.end(), itMap; 312 320 StdSize totalBuf = 0; 313 321 for (itMap = itbMap; itMap != iteMap; ++itMap) 314 322 { 315 report(10) << " Memory report : Context <"<<context->getId()<<"> : client side : memory used for buffer of each connection to server" << endl316 << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;323 report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 324 << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 317 325 totalBuf += itMap->second; 318 326 } 319 report(0) << " Memory report : Context <"<<context->getId()<<"> : client side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;320 321 releaseBuffers() 327 report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 328 329 releaseBuffers(); 322 330 } 323 331 }
Note: See TracChangeset
for help on using the changeset viewer.