Changeset 1254


Ignore:
Timestamp:
09/07/17 15:10:16 (7 years ago)
Author:
rlacroix
Message:

Backport r1225 to trunk: Improve the communication protocol on server side.

Location:
XIOS/trunk/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/context_server.cpp

    r1033 r1254  
    7474    MPI_Status status; 
    7575    map<int,CServerBuffer*>::iterator it; 
    76  
    77     for(rank=0;rank<commSize;rank++) 
    78     { 
     76    bool okLoop; 
     77 
     78    traceOff(); 
     79    MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status); 
     80    traceOn(); 
     81 
     82    if (flag==true) 
     83    { 
     84      rank=status.MPI_SOURCE ; 
     85      okLoop = true; 
    7986      if (pendingRequest.find(rank)==pendingRequest.end()) 
    80       { 
    81         traceOff(); 
    82         MPI_Iprobe(rank,20,interComm,&flag,&status); 
    83         traceOn(); 
    84         if (flag==true) 
     87        okLoop = !listenPendingRequest(status) ; 
     88      if (okLoop) 
     89      { 
     90        for(rank=0;rank<commSize;rank++) 
    8591        { 
    86           it=buffers.find(rank); 
    87           if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
     92          if (pendingRequest.find(rank)==pendingRequest.end()) 
    8893          { 
    89             StdSize buffSize = 0; 
    90             MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 
    91             mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    92             it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 
    93           } 
    94           else 
    95           { 
    96             MPI_Get_count(&status,MPI_CHAR,&count); 
    97             if (it->second->isBufferFree(count)) 
    98             { 
    99               addr=(char*)it->second->getBuffer(count); 
    100               MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]); 
    101               bufferRequest[rank]=addr; 
    102             } 
     94 
     95            traceOff(); 
     96            MPI_Iprobe(rank, 20,interComm,&flag,&status); 
     97            traceOn(); 
     98            if (flag==true) listenPendingRequest(status) ; 
    10399          } 
    104100        } 
    105101      } 
     102    } 
     103  } 
     104 
     105  bool CContextServer::listenPendingRequest(MPI_Status& status) 
     106  { 
     107    int count; 
     108    char * addr; 
     109    map<int,CServerBuffer*>::iterator it; 
     110    int rank=status.MPI_SOURCE ; 
     111 
     112    it=buffers.find(rank); 
     113    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
     114    { 
     115       StdSize buffSize = 0; 
     116       MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 
     117       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
     118       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 
     119       return true; 
     120    } 
     121    else 
     122    { 
     123      MPI_Get_count(&status,MPI_CHAR,&count); 
     124      if (it->second->isBufferFree(count)) 
     125      { 
     126         addr=(char*)it->second->getBuffer(count); 
     127         MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]); 
     128         bufferRequest[rank]=addr; 
     129         return true; 
     130      } 
     131      else 
     132        return false; 
    106133    } 
    107134  } 
  • XIOS/trunk/src/context_server.hpp

    r1033 r1254  
    1717    bool eventLoop(bool enableEventsProcessing = true); 
    1818    void listen(void) ; 
     19    bool listenPendingRequest(MPI_Status& status); 
    1920    void checkPendingRequest(void) ; 
    2021    void processRequest(int rank, char* buff,int count) ; 
Note: See TracChangeset for help on using the changeset viewer.