Ignore:
Timestamp:
06/04/18 19:25:08 (3 years ago)
Author:
yushan
Message:

save dev. TO DO : test with xios

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp

    r1374 r1520  
    1818namespace ep_lib 
    1919{ 
    20  
     20  int Request_Check() 
     21  { 
     22    if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >; 
     23     
     24    if(EP_PendingRequests->size() == 0) return 0; 
     25     
     26    MPI_Status status; 
     27    MPI_Message *message; 
     28    int probed = false; 
     29    int recv_count = 0; 
     30    std::list<MPI_Request* >::iterator it; 
     31     
     32     
     33    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) 
     34    {  
     35      Message_Check(((*(*it))->comm)); 
     36    } 
     37 
     38 
     39    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 
     40    { 
     41      if((*(*it))->state == 0) 
     42      { 
     43        #pragma omp critical (_query0) 
     44        { 
     45          MPI_Iprobe((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, &status); 
     46          if(probed) 
     47          { 
     48            message = new MPI_Message; 
     49            *message = new ep_message; 
     50         
     51            memcheck("new "<< message <<" : in ep_lib::Request_Check, message = new MPI_Message"); 
     52            memcheck("new "<< *message <<" : in ep_lib::Request_Check, *message = new ep_message"); 
     53           
     54           
     55            MPI_Improbe((*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->comm, &probed, message, &status); 
     56         
     57          } 
     58        } 
     59       
     60         
     61        if(probed) 
     62        { 
     63          MPI_Get_count(&status, (*(*it))->ep_datatype, &recv_count); 
     64           
     65          MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it); 
     66          (*(*it))->type = 3; 
     67          (*(*it))->state = 1; 
     68 
     69          memcheck("delete "<< status.mpi_status <<" : in ep_lib::Request_Check, delete status.mpi_status"); 
     70          delete status.mpi_status;           
     71 
     72          memcheck("delete "<< *message <<" : in ep_lib::Request_Check, delete *message"); 
     73          memcheck("delete "<< message <<" : in ep_lib::Request_Check, delete message"); 
     74 
     75          delete *message; 
     76          delete message; 
     77         
     78          it++; 
     79          continue;       
     80        }               
     81      } 
     82       
     83      if((*(*it))->state == 2) 
     84      { 
     85        int ep_rank = ((*(*it))->comm)->ep_comm_ptr->size_rank_info[0].first; 
     86        memcheck("delete "<< (*(*it)) <<" : in ep_lib::Request_Check, delete (*(*it))"); 
     87         
     88         
     89        int world_rank; 
     90        MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); 
     91        if(world_rank==2)  
     92        { 
     93          printf("ep %d erased one pending request %p\n", world_rank,*(*it)); 
     94        } 
     95         
     96        EP_PendingRequests->erase(it); 
     97         
     98        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size()); 
     99        it = EP_PendingRequests->begin(); 
     100        continue; 
     101      } 
     102      else it++; 
     103    } 
     104  } 
     105   
     106   
     107   
    21108  int Message_Check(MPI_Comm comm) 
    22109  { 
    23     if(!comm.is_ep) return 0; 
    24  
    25     if(comm.is_intercomm) 
    26     { 
    27       return  Message_Check_intercomm(comm); 
    28     } 
     110    if(!comm->is_ep) return MPI_SUCCESS; 
     111 
     112    if(comm->is_intercomm) 
     113    { 
     114      Message_Check_intercomm(comm); 
     115    } 
     116     
     117    return Message_Check_intracomm(comm); 
     118 
     119  } 
     120   
     121   
     122  int Message_Check_intracomm(MPI_Comm comm) 
     123  { 
     124     
     125    int flag = true; 
     126    ::MPI_Status status; 
     127    ::MPI_Message message; 
     128 
     129    while(flag) // loop until the end of global queue 
     130    { 
     131      Debug("Message probing for intracomm\n"); 
     132      
     133      #pragma omp critical (_mpi_call) 
     134      { 
     135        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status); 
     136        if(flag) 
     137        { 
     138          Debug("find message in mpi comm \n"); 
     139          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status); 
     140        } 
     141      } 
     142 
     143       
     144      if(flag) 
     145      { 
     146        MPI_Message msg = new ep_message;  
     147        msg->mpi_message = new ::MPI_Message(message); 
     148 
     149        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message"); 
     150        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message"); 
     151               
     152 
     153        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong();  
     154        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong();  
     155        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong(); 
     156        int src_mpi  = status.MPI_SOURCE; 
     157              
     158        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);   
     159 
     160#ifdef _showinfo 
     161        printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag); 
     162#endif 
     163 
     164        msg->mpi_status = new ::MPI_Status(status);   
     165        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status"); 
     166 
     167        #pragma omp critical (_query) 
     168        { 
     169          #pragma omp flush 
     170          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg);   
     171          int dest_mpi = comm->ep_comm_ptr->size_rank_info[2].first; 
     172          memcheck("message_queue["<<dest_mpi<<","<<dest_loc<<"]->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size()); 
     173          #pragma omp flush 
     174        } 
     175      } 
     176    } 
     177 
     178    return MPI_SUCCESS; 
     179  } 
     180 
     181 
     182   
     183   
     184   
     185 
     186 
     187  int Message_Check_intercomm(MPI_Comm comm) 
     188  { 
     189    if(!comm->ep_comm_ptr->intercomm->mpi_inter_comm) return 0; 
     190 
     191    Debug("Message probing for intercomm\n"); 
    29192 
    30193    int flag = true; 
    31194    ::MPI_Message message; 
    32195    ::MPI_Status status; 
    33     int mpi_source; 
    34  
    35     while(flag) // loop until the end of global queue 
    36     { 
    37       Debug("Message probing for intracomm\n"); 
    38        
    39       #pragma omp critical (_mpi_call) 
    40       { 
    41         ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status); 
    42         if(flag) 
    43         { 
    44           Debug("find message in mpi comm \n"); 
    45           mpi_source = status.MPI_SOURCE; 
    46           int tag = status.MPI_TAG; 
    47           ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status); 
    48  
    49         } 
    50       } 
    51  
    52        
    53       if(flag) 
    54       { 
    55  
    56         MPI_Message *msg_block = new MPI_Message;  
    57         msg_block->mpi_message = new ::MPI_Message; 
    58         *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message;   
    59         msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();  
    60         int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();  
    61         int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong(); 
    62         int src_mpi       = status.MPI_SOURCE; 
    63               
    64         msg_block->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);        
    65         msg_block->mpi_status = new ::MPI_Status(status); 
    66  
    67         MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list; 
    68         MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc]; 
    69  
    70  
    71         #pragma omp critical (_query) 
    72         { 
    73           #pragma omp flush 
    74           comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);       
    75           #pragma omp flush 
    76         } 
    77          
    78         delete msg_block; 
    79       } 
    80  
    81     } 
    82  
    83     return MPI_SUCCESS; 
    84   } 
    85  
    86  
    87  
    88   int Message_Check_intercomm(MPI_Comm comm) 
    89   { 
    90     if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0; 
    91  
    92     Debug("Message probing for intercomm\n"); 
    93  
    94     int flag = true; 
    95     ::MPI_Message message; 
    96     ::MPI_Status status; 
    97     int mpi_source; 
    98196    int current_ep_rank; 
    99197    MPI_Comm_rank(comm, &current_ep_rank); 
    100198 
    101     while(flag) // loop until the end of global queue "comm.ep_comm_ptr->intercomm->mpi_inter_comm" 
     199    while(flag) // loop until the end of global queue "comm->ep_comm_ptr->intercomm->mpi_inter_comm" 
    102200    { 
    103201      Debug("Message probing for intracomm\n"); 
     
    105203      #pragma omp critical (_mpi_call) 
    106204      { 
    107         ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status); 
     205        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status); 
    108206        if(flag) 
    109207        { 
    110208          Debug("find message in mpi comm \n"); 
    111           mpi_source = status.MPI_SOURCE; 
    112           int tag = status.MPI_TAG; 
    113           ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status); 
    114  
     209          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status); 
    115210        } 
    116211      } 
     
    120215      { 
    121216 
    122         MPI_Message *msg_block = new MPI_Message; 
    123         msg_block->mpi_message = new ::MPI_Message; 
    124         *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message; 
    125         msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
    126         int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
    127         int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong(); 
    128         int src_mpi       = status.MPI_SOURCE; 
    129         int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first; 
     217        MPI_Message msg = new ep_message;  
     218        msg->mpi_message = new ::MPI_Message(message); 
     219 
     220        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message"); 
     221        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message"); 
     222               
     223 
     224        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong();  
     225        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong();  
     226        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong(); 
     227        int src_mpi  = status.MPI_SOURCE; 
    130228              
    131         msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc,  src_mpi); 
    132         msg_block->mpi_status = new ::MPI_Status(status); 
    133  
    134  
    135         MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list; 
    136         MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc]; 
    137  
     229        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);    
     230#ifdef _showinfo 
     231        printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag); 
     232#endif 
     233 
     234        msg->mpi_status = new ::MPI_Status(status);  
     235        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status"); 
    138236 
    139237        #pragma omp critical (_query) 
    140238        { 
    141239          #pragma omp flush 
    142           comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block); 
    143           #pragma omp flush 
    144         } 
    145          
    146         delete msg_block; 
    147          
    148       } 
    149  
    150     } 
    151  
    152     flag = true; 
    153     while(flag) // loop until the end of global queue "comm.mpi_comm" 
    154     { 
    155       Debug("Message probing for intracomm\n"); 
    156       
    157       #pragma omp critical (_mpi_call) 
    158       { 
    159         ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status); 
    160         if(flag) 
    161         { 
    162           Debug("find message in mpi comm \n"); 
    163           mpi_source = status.MPI_SOURCE; 
    164           int tag = status.MPI_TAG; 
    165           ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status); 
    166  
    167         } 
    168       } 
    169        
    170  
    171       if(flag) 
    172       { 
    173  
    174         MPI_Message *msg_block = new MPI_Message; 
    175         msg_block->mpi_message = new ::MPI_Message; 
    176         *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message; 
    177         msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
    178         int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
    179         int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong(); 
    180         int src_mpi       = status.MPI_SOURCE; 
    181          
    182         msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc, src_mpi); 
    183         msg_block->mpi_status = new ::MPI_Status(status); 
    184          
    185  
    186         MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list; 
    187         MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc]; 
    188  
    189  
    190         #pragma omp critical (_query) 
    191         { 
    192           #pragma omp flush 
    193           comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block); 
    194           #pragma omp flush 
    195         } 
    196          
    197         delete msg_block; 
    198          
    199       } 
    200  
    201     } 
     240          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 
     241          memcheck("comm->ep_comm_ptr->comm_list["<<dest_loc<<"]->ep_comm_ptr->message_queue->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size()); 
     242          #pragma omp flush 
     243        } 
     244      } 
     245    } 
     246 
     247    Message_Check_intracomm(comm); 
    202248 
    203249    return MPI_SUCCESS; 
    204250  } 
    205251 
    206   int Request_Check() 
    207   { 
    208     MPI_Status status; 
    209     MPI_Message message; 
    210     int probed = false; 
    211     int recv_count = 0; 
    212     std::list<MPI_Request* >::iterator it; 
    213      
    214     for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) 
    215     {  
    216       Message_Check((*it)->comm); 
    217     } 
    218  
    219  
    220     for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 
    221     { 
    222       MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 
    223       if(probed) 
    224       { 
    225         MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 
    226         MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 
    227         (*it)->type = 3; 
    228         EP_PendingRequests->erase(it); 
    229         it = EP_PendingRequests->begin(); 
    230         continue; 
    231       } 
    232       it++; 
    233     } 
    234   } 
     252   
    235253 
    236254} 
Note: See TracChangeset for help on using the changeset viewer.