Changeset 1225


Ignore:
Timestamp:
07/25/17 15:12:17 (7 years ago)
Author:
ymipsl
Message:

Improvment of XIOS protocal on server side.

YM

Location:
XIOS/dev/XIOS_DEV_CMIP6/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/XIOS_DEV_CMIP6/src/context_server.cpp

    r1194 r1225  
    7979    map<int,CServerBuffer*>::iterator it; 
    8080 
    81     for(rank=0;rank<commSize;rank++) 
    82     { 
    83       if (pendingRequest.find(rank)==pendingRequest.end()) 
    84       { 
    85         traceOff(); 
    86         MPI_Iprobe(rank,20,interComm,&flag,&status); 
    87         traceOn(); 
    88         if (flag==true) 
     81    traceOff(); 
     82    MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status); 
     83    traceOn(); 
     84 
     85    if (flag==true) 
     86    { 
     87      rank=status.MPI_SOURCE ; 
     88 
     89      if (pendingRequest.find(rank)==pendingRequest.end()) listenPendingRequest(status) ; 
     90      else 
     91      { 
     92        for(rank=0;rank<commSize;rank++) 
    8993        { 
    90           it=buffers.find(rank); 
    91           if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
     94          if (pendingRequest.find(rank)==pendingRequest.end()) 
    9295          { 
    93             StdSize buffSize = 0; 
    94             MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 
    95             mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    96             it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 
    97           } 
    98           else 
    99           { 
    100             MPI_Get_count(&status,MPI_CHAR,&count); 
    101             if (it->second->isBufferFree(count)) 
    102             { 
    103               addr=(char*)it->second->getBuffer(count); 
    104               MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]); 
    105               bufferRequest[rank]=addr; 
    106             } 
     96 
     97            traceOff(); 
     98            MPI_Iprobe(rank, 20,interComm,&flag,&status); 
     99            traceOn(); 
     100            if (flag==true) listenPendingRequest(status) ; 
    107101          } 
    108102        } 
     
    110104    } 
    111105  } 
     106 
     107  void CContextServer::listenPendingRequest(MPI_Status& status) 
     108  { 
     109    int count; 
     110    char * addr; 
     111    map<int,CServerBuffer*>::iterator it; 
     112    int rank=status.MPI_SOURCE ; 
     113 
     114    it=buffers.find(rank); 
     115    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
     116    { 
     117       StdSize buffSize = 0; 
     118       MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 
     119       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
     120       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 
     121    } 
     122    else 
     123    { 
     124      MPI_Get_count(&status,MPI_CHAR,&count); 
     125      if (it->second->isBufferFree(count)) 
     126      { 
     127         addr=(char*)it->second->getBuffer(count); 
     128         MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]); 
     129         bufferRequest[rank]=addr; 
     130       } 
     131    } 
     132  } 
     133 
    112134 
    113135  void CContextServer::checkPendingRequest(void) 
     
    151173    map<size_t,CEventServer*>::iterator it; 
    152174 
     175    CTimer::get("Process request").resume(); 
    153176    while(count>0) 
    154177    { 
     
    164187      count=buffer.remain(); 
    165188    } 
     189    CTimer::get("Process request").suspend(); 
    166190  } 
    167191 
  • XIOS/dev/XIOS_DEV_CMIP6/src/context_server.hpp

    r1158 r1225  
    1717    bool eventLoop(bool enableEventsProcessing = true); 
    1818    void listen(void) ; 
     19    void listenPendingRequest(MPI_Status& status) ; 
    1920    void checkPendingRequest(void) ; 
    2021    void processRequest(int rank, char* buff,int count) ; 
Note: See TracChangeset for help on using the changeset viewer.