Changeset 1187


Ignore:
Timestamp:
06/28/17 17:46:00 (7 years ago)
Author:
yushan
Message:

add pending list for irecv

Location:
XIOS/dev/branch_yushan_merged
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/branch_yushan_merged/bld.cfg

    r1176 r1187  
    3434#bld::target generate_fortran_interface.exe  
    3535#bld::target xios_server.exe  
    36 bld::target test_remap.exe 
     36#bld::target test_remap.exe test_remap_omp.exe 
    3737#bld::target test_regular.exe 
    3838#bld::target test_expand_domain.exe 
    3939#bld::target test_new_features.exe test_unstruct_complete.exe  
    40 bld::target test_omp.exe test_complete_omp.exe test_remap_omp.exe test_unstruct_omp.exe 
     40bld::target test_omp.exe #test_complete_omp.exe test_remap_omp.exe test_unstruct_omp.exe 
    4141#bld::target test_client.exe test_complete.exe #test_xios2_cmip6.exe 
    4242#bld::target test_connectivity_expand.exe 
  • XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_lib.cpp

    r1134 r1187  
    99 
    1010namespace ep_lib 
    11 { 
    12  
     11{  
    1312 
    1413  int tag_combine(int real_tag, int src, int dest) 
     
    368367 
    369368  } 
     369 
     370  int test_sendrecv(MPI_Comm comm) 
     371  { 
     372    int myRank; 
     373    MPI_Comm_rank(comm, &myRank); 
     374    bool amClient = false; 
     375    bool amServer = false; 
     376    if(myRank<=3) amClient = true; 
     377    else amServer = true; 
     378 
     379    if(amServer) 
     380    { 
     381      int send_buf[4]; 
     382      MPI_Request send_request[8]; 
     383      MPI_Status send_status[8]; 
     384 
     385       
     386       
     387      for(int j=0; j<4; j++)  // 4 buffers 
     388      { 
     389        for(int i=0; i<2; i++) 
     390        { 
     391          send_buf[j] = (myRank+1)*100 + j; 
     392          MPI_Isend(&send_buf[j], 1, MPI_INT, i*2, 9999, comm, &send_request[i*4+j]); 
     393        } 
     394      } 
     395       
     396 
     397      MPI_Waitall(8, send_request, send_status); 
     398    } 
     399 
     400 
     401    if(amClient&&myRank%2==0) // Clients leaders 
     402    { 
     403      int recv_buf[8]; 
     404      MPI_Request recv_request[8]; 
     405      MPI_Status recv_status[8]; 
     406 
     407      for(int i=0; i<2; i++)  // 2 servers 
     408      { 
     409        for(int j=0; j<4; j++) 
     410        { 
     411          MPI_Irecv(&recv_buf[i*4+j], 1, MPI_INT, i+4, 9999, comm, &recv_request[i*4+j]); 
     412        } 
     413      } 
     414 
     415      MPI_Waitall(8, recv_request, recv_status); 
     416      printf("============ client %d, recv_buf = %d, %d, %d, %d, %d, %d, %d, %d ================\n",  
     417              myRank, recv_buf[0], recv_buf[1], recv_buf[2], recv_buf[3], recv_buf[4], recv_buf[5], recv_buf[6], recv_buf[7]); 
     418    } 
     419 
     420    MPI_Barrier(comm); 
     421 
     422  } 
     423 
    370424} 
    371425 
  • XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_message.cpp

    r1185 r1187  
    7979        int dest_mpi = comm.ep_comm_ptr->size_rank_info[2].first; 
    8080        int ep_dest = get_ep_rank(comm, dest_loc, dest_mpi); 
    81         printf("myRank = %d, probed one message, ep_src = %d, ep_dest = %d, tag = %d, message = %d\n", myRank, msg_block->ep_src, ep_dest, msg_block->ep_tag, msg_block->mpi_message); 
     81        //printf("myRank = %d, probed one message, ep_src = %d, ep_dest = %d, tag = %d, message = %d\n", myRank, msg_block->ep_src, ep_dest, msg_block->ep_tag, msg_block->mpi_message); 
    8282        msg_block->mpi_status = new ::MPI_Status(status); 
    8383 
     
    9090          #pragma omp flush 
    9191          ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block);   
    92           printf("myRank = %d, push_back OK, ep_src = %d, ep_tag = %d, dest = %d(%d)\n", myRank,  
    93                                                                                              ptr_comm_target->ep_comm_ptr->message_queue->back().ep_src, 
    94                                                                                              ptr_comm_target->ep_comm_ptr->message_queue->back().ep_tag, 
    95                                                                                              ep_dest, dest_loc); 
     92          //printf("myRank = %d, push_back OK, ep_src = %d, ep_tag = %d, dest = %d(%d)\n", myRank,  
     93          //                                                                                   ptr_comm_target->ep_comm_ptr->message_queue->back().ep_src, 
     94          //                                                                                   ptr_comm_target->ep_comm_ptr->message_queue->back().ep_tag, 
     95          //                                                                                   ep_dest, dest_loc); 
    9696     
    9797          #pragma omp flush 
  • XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_recv.cpp

    r1185 r1187  
    7676    request->ep_datatype = datatype; 
    7777 
     78 
     79 
    7880    /* With Improbe*/ 
    7981    Message_Check(comm); 
    8082 
     83    if(EP_PendingRequests == 0 )  
     84    { 
     85      EP_PendingRequests = new std::list< MPI_Request* >;  
     86      printf("proc %d : EP_PendingRequests allocated, add = %p\n", dest_rank, EP_PendingRequests);   
     87    } 
     88 
     89    request->pending_ptr = EP_PendingRequests; 
     90    printf("proc %d : &EP_PendingRequests add = %p, ptr = %p\n", dest_rank, EP_PendingRequests, request->pending_ptr);   
     91     
    8192    EP_PendingRequests->push_back(request); 
    82     printf("proc %d : EP_PendingRequests insert one request, add = %p(%p), buf = %p(%p)\n", dest_rank, EP_PendingRequests->back(), request, buf, request->buf); 
     93    //printf("proc %d : EP_PendingRequests insert one request, src = %d, tag = %d, size = %d\n", dest_rank, request->ep_src, request->ep_tag, EP_PendingRequests->size()); 
    8394     
    8495    // check all EP_PendingRequests 
    8596     
    86     //if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >;  
    87     if(!EP_PendingRequests->empty()) 
     97    //printf("proc %d have %d pending irecv request\n", dest_rank, EP_PendingRequests->size()); 
     98       
     99    for(std::list<MPI_Request* >::iterator it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 
    88100    { 
    89       printf("proc %d have %d pending irecv request\n", dest_rank, EP_PendingRequests->size()); 
    90        
    91       for(std::list<MPI_Request* >::iterator it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 
     101      if((*it)->type == 3)  
    92102      { 
    93         if((*it)->type != 2)  
    94         { 
    95           printf("proc %d : pending request type = %d, src= %d, tag = %d, add = %p skip\n", dest_rank, (*it)->type, (*it)->ep_src, (*it)->ep_tag, *it); 
    96           EP_PendingRequests->erase(it); 
    97           it = EP_PendingRequests->begin(); 
    98           printf("proc %d : pending request processed, size = %d, it = %p\n", dest_rank, EP_PendingRequests->size(), *it); 
    99           continue; 
    100         } 
     103        //printf("proc %d : pending request type = %d, src= %d, tag = %d    skip\n", dest_rank, (*it)->type, (*it)->ep_src, (*it)->ep_tag); 
     104        EP_PendingRequests->erase(it); 
     105        it = EP_PendingRequests->begin(); 
     106        //printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size()); 
     107        continue; 
     108      } 
    101109         
    102         printf("proc %d : pending irecv request src = %d, tag = %d, type = %d, add = %p\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, (*it)->type, *it); 
    103         int probed = false; 
    104         MPI_Message pending_message; 
    105         MPI_Status pending_status; 
     110      //printf("proc %d : pending irecv request src = %d, tag = %d, type = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, (*it)->type); 
     111      int probed = false; 
     112      MPI_Message pending_message; 
     113      MPI_Status pending_status; 
    106114     
    107         MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &pending_message, &pending_status); 
    108         printf("proc %d : pending irecv request probed to be %d, add = %p\n",dest_rank, probed, *it); 
     115      MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &pending_message, &pending_status); 
     116      //printf("proc %d : pending irecv request probed to be %d, src = %d, tag = %d\n",dest_rank, probed, (*it)->ep_src, (*it)->ep_tag); 
    109117     
    110         if(probed)  
    111         {  
    112           int count; 
    113           MPI_Get_count(&pending_status, (*it)->ep_datatype, &count); 
    114           MPI_Imrecv((*it)->buf, count, (*it)->ep_datatype, &pending_message, *it); 
    115           printf("proc %d : pending request is imrecving src = %d, tag = %d, add = %p, buf = %p, count = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, *it, (*it)->buf, count); 
    116           EP_PendingRequests->erase(it); 
    117           it = EP_PendingRequests->begin(); 
    118           printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size()); 
    119           continue; 
    120         } 
    121         else it++; 
     118      if(probed)  
     119      {  
     120        int count; 
     121        MPI_Get_count(&pending_status, (*it)->ep_datatype, &count); 
     122        MPI_Imrecv((*it)->buf, count, (*it)->ep_datatype, &pending_message, *it); 
     123        //printf("proc %d : pending request is imrecving src = %d, tag = %d, count = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, count); 
     124        EP_PendingRequests->erase(it); 
     125        it = EP_PendingRequests->begin(); 
     126        //printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size()); 
     127        continue; 
    122128      } 
     129 
     130      it++; 
    123131    } 
    124 /* 
    125     int flag = false; 
    126     MPI_Message message; 
    127     MPI_Status status; 
    128132     
    129     MPI_Improbe(src, tag, comm, &flag, &message, &status); 
    130              
    131     if(flag) 
    132     { 
    133       MPI_Imrecv(buf, count, datatype, &message, request); 
    134       printf("proc %d : found message in local queue, src = %d, tag = %d\n", dest_rank, src, tag); 
    135     } 
    136   */   
    137  
    138133    return 0; 
    139134  } 
  • XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_type.hpp

    r1185 r1187  
    453453      MPI_Request(void* request): mpi_request(request) {} 
    454454      #endif 
     455 
     456      std::list< MPI_Request* > * pending_ptr; 
    455457  }; 
    456458 
     
    490492            //    <MPI_Fint,thread_num>   EP_Comm 
    491493 
    492   static std::list< MPI_Request* > * EP_PendingRequests = new std::list< MPI_Request* >; 
     494  static std::list< MPI_Request* > * EP_PendingRequests = 0; 
    493495  #pragma omp threadprivate(EP_PendingRequests) 
    494496} 
  • XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_wait.cpp

    r1185 r1187  
    2020  { 
    2121 
    22     if(request->type == 1) 
     22    if(request->type == 1)  //=>isend 
    2323    { 
    2424      ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 
     
    4444    } 
    4545 
    46     if(request->type == 2) 
    47     { 
    48       int flag = false; 
    49       MPI_Message message; 
    50  
    51       while(!flag) 
    52       { 
    53         Message_Check(request->comm); 
    54         #pragma omp flush 
    55         MPI_Improbe(request->ep_src, request->ep_tag, request->comm, &flag, &message, status); 
    56       } 
    57  
    58       int count; 
    59       request->type = 3; 
    60       MPI_Get_count(status, request->ep_datatype, &count); 
    61       MPI_Mrecv(request->buf, count, request->ep_datatype, &message, status); 
    62       status->ep_datatype = request->ep_datatype; 
    63  
    64       //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 
    65  
    66       return 0; 
    67     } 
    68  
    69     if(request->type == 3) 
     46    if(request->type == 3) //=>imrecv 
    7047    { 
    7148      ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 
     
    9067 
    9168      //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 
     69      return 0; 
     70 
     71    } 
     72 
     73    if(request->type == 2) //=>irecv not probed 
     74    { 
     75       
     76      while(true) 
     77      { 
     78        Message_Check(request->comm); 
     79        // parcours pending list 
     80        for(std::list<MPI_Request* >::iterator it = (request->pending_ptr)->begin(); it!=(request->pending_ptr)->end(); ) 
     81        { 
     82          if(*it == request) 
     83          {  
     84            int probed = false; 
     85            MPI_Message message; 
     86 
     87            MPI_Improbe(request->ep_src, request->ep_tag, request->comm, &probed, &message, status); 
     88                   
     89            if(probed) 
     90            { 
     91              int recv_count; 
     92              MPI_Get_count(status, request->ep_datatype, &recv_count); 
     93              MPI_Mrecv(request->buf, recv_count, request->ep_datatype, &message, status); 
     94              (request->pending_ptr)->erase(it); 
     95              //printf("wait  : pending request processed, size = %d\n", (request->pending_ptr)->size()); 
     96              request->type = 3; 
     97             
     98              return 0; 
     99            } 
     100 
     101            it++; 
     102          } 
     103          else  
     104          { 
     105            int probed = false; 
     106            MPI_Message message; 
     107            MPI_Status status; 
     108 
     109            MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 
     110                   
     111            if(probed) 
     112            { 
     113              int recv_count; 
     114              MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 
     115              MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 
     116                   
     117              (request->pending_ptr)->erase(it); 
     118                   
     119              it = (request->pending_ptr)->begin(); 
     120              //printf("wait  : pending request processed, size = %d\n", (request->pending_ptr)->size()); 
     121            } 
     122            else it++; 
     123          } 
     124        } 
     125 
     126      } 
     127       
     128 
    92129    } 
    93130 
     
    111148 
    112149    for(int i=0; i<count; i++) 
     150      printf("pending add = %p\n",  array_of_requests[i].pending_ptr); 
     151 
     152    //if(EP_PendingRequests == 0) EP_PendingRequests = new std::list< MPI_Request* >;   
     153    //printf("pending size = %d, add = %p\n", EP_PendingRequests->size(), EP_PendingRequests); 
     154 
     155    for(int i=0; i<count; i++) 
    113156    { 
    114157      finished_index[i] = false; 
     
    117160    while(finished < count) 
    118161    { 
    119  
     162       
    120163      for(int i=0; i<count; i++) 
    121164      { 
    122165        if(finished_index[i] == false) // this request has not been tested. 
    123166        { 
    124           if(array_of_requests[i].type != 2) // isend or imrecv 
     167          if(array_of_requests[i].type == 1 || array_of_requests[i].type == 3) // isend or imrecv 
    125168          {       
    126169            //MPI_Wait(&array_of_requests[i], &array_of_statuses[i]); 
     
    133176          else // irecv 
    134177          { 
     178             
     179            Message_Check(array_of_requests[i].comm); 
    135180            // parcours pending list 
    136             // find request in waitall 
    137                 Message_Check(array_of_requests[i].comm); 
    138                // improbe + mrecv 
    139                // erase element in pending list 
    140  //             finished++; 
    141  //             finished_index[i] = true; 
    142              
    143  
    144             int flag = false; 
    145             MPI_Message message; 
    146  
    147             MPI_Improbe(array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, &flag, &message, &array_of_statuses[i]); 
    148  
    149             if(flag) 
     181            for(std::list<MPI_Request* >::iterator it = (array_of_requests[i].pending_ptr)->begin(); it!=(array_of_requests[i].pending_ptr)->end(); ) 
    150182            { 
    151               int recv_count; 
    152               MPI_Get_count(&array_of_statuses[i], array_of_requests[i].ep_datatype, &recv_count); 
    153               MPI_Mrecv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, &message, &array_of_statuses[i]); 
    154               //check_sum_recv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, 2); 
    155  
    156               finished++; 
    157               finished_index[i] = true; 
     183              bool matched = false; 
     184              for(int j=0; j<count; j++) 
     185              { 
     186                if(*it == &array_of_requests[j]) 
     187                {  
     188                  int probed = false; 
     189                  MPI_Message message; 
     190 
     191                  MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &array_of_statuses[j]); 
     192                   
     193                  if(probed) 
     194                  { 
     195                    int recv_count; 
     196                    MPI_Get_count(&array_of_statuses[j], array_of_requests[j].ep_datatype, &recv_count); 
     197                    MPI_Mrecv(array_of_requests[j].buf, recv_count, array_of_requests[j].ep_datatype, &message, &array_of_statuses[j]); 
     198                    //check_sum_recv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, 2); 
     199                    (array_of_requests[i].pending_ptr)->erase(it); 
     200                    array_of_requests[j].type = 3; 
     201                    finished++; 
     202                    finished_index[j] = true; 
     203                    matched = true; 
     204                    it = (array_of_requests[i].pending_ptr)->begin(); 
     205                    j=count; 
     206                    //printf("waitall  : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size()); 
     207                    //printf("matched = %d, j=%d, src = %d, tag = %d, probed = %d\n", matched, j, (*it)->ep_src, (*it)->ep_tag, probed); 
     208                  } 
     209                } 
     210 
     211              } 
     212 
     213              if(!matched) 
     214              { 
     215                int probed = false; 
     216                MPI_Message message; 
     217                MPI_Status status; 
     218 
     219                MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 
     220                   
     221                if(probed) 
     222                { 
     223                  int recv_count; 
     224                  MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 
     225                  MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 
     226                   
     227                  (array_of_requests[i].pending_ptr)->erase(it); 
     228                   
     229                  it = (array_of_requests[i].pending_ptr)->begin(); 
     230                  //printf("waitall  : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size()); 
     231                } 
     232                else it++; 
     233              } 
    158234            } 
     235 
    159236          } 
    160237        } 
  • XIOS/dev/branch_yushan_merged/src/client.cpp

    r1185 r1187  
    107107            MPI_Comm_rank(intraComm,&intraCommRank) ; 
    108108             
    109             /*#pragma omp critical(_output) 
     109            #pragma omp critical(_output) 
    110110            { 
    111111              info(10)<<"intercommCreate::client "<<test_omp_rank<< " "<< &test_omp_rank <<" intraCommSize : "<<intraCommSize 
    112112                 <<" intraCommRank :"<<intraCommRank<<"  serverLeader "<< serverLeader 
    113113                 <<" globalComm : "<< &(CXios::globalComm) << endl ;   
    114             }*/ 
     114            } 
    115115 
    116116             
  • XIOS/dev/branch_yushan_merged/src/server.cpp

    r1185 r1187  
    9090             MPI_Comm_size(intraComm,&intraCommSize) ; 
    9191             MPI_Comm_rank(intraComm,&intraCommRank) ; 
    92              /*info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize 
    93                      <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;*/ 
     92             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize 
     93                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
    9494 
    9595             test_sendrecv(CXios::globalComm); 
Note: See TracChangeset for help on using the changeset viewer.