source: XIOS/dev/branch_openmp/extern/ep_dev/ep_probe.cpp @ 1503

Last change on this file since 1503 was 1503, checked in by yushan, 6 years ago

rank_map is passed from vector to map, in order to have more flexibility in comm_split

File size: 4.5 KB
RevLine 
[1381]1#include "ep_lib.hpp"
2#include <mpi.h>
3#include "ep_declaration.hpp"
4#include "ep_mpi.hpp"
5
6namespace ep_lib
7{
8
9  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
10  {
11    *flag = false;
12
[1500]13    if(!comm->is_ep)
[1381]14    {
[1503]15      Debug("calling MPI_Iprobe MPI\n");
16      ::MPI_Status mpi_status;
17      ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
[1381]18
[1503]19      status->mpi_status = &mpi_status;
[1381]20      status->ep_src = src;
21      status->ep_tag = tag;
22      return 0;
23    }
24
25    Debug("calling MPI_Iprobe EP\n");
26    Message_Check(comm);
27
28    #pragma omp flush
29
30    #pragma omp critical (_query)
[1500]31    if(!comm->ep_comm_ptr->message_queue->empty())
[1381]32    {
[1500]33      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
[1381]34      {
[1500]35        bool src_matched = src<0? true: (*it)->ep_src == src;
36        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
[1381]37       
38        if(src_matched && tag_matched)       
39        {
40          Debug("find message\n");
41          *flag = true;
42
[1500]43          ::MPI_Status mpi_status = *(static_cast< ::MPI_Status *>((*it)->mpi_status));
[1381]44
[1503]45          status->mpi_status = (*it)->mpi_status;
[1500]46          status->ep_src = (*it)->ep_src;
47          status->ep_tag = (*it)->ep_tag;
[1381]48
49          break;
50        }
51      }
52    }
53
54    return 0;
55  }
56
57
58
59  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
60  {
[1503]61    int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first;
62    int mpi_rank    = comm->ep_comm_ptr->size_rank_info[2].first;
[1381]63    *flag = false;
[1500]64    if(!comm->is_ep)
[1381]65    {
66      Debug("calling MPI_Improbe MPI\n");
67
68      ::MPI_Status mpi_status;
69      ::MPI_Message mpi_message;
70
71      #ifdef _openmpi
72      #pragma omp critical (_mpi_call)
73      {
[1500]74        ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
[1381]75        if(*flag)
76        {
[1500]77          ::MPI_Mprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), &mpi_message, &mpi_status);
[1381]78        }
79      }
80      #elif _intelmpi
[1500]81        ::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_message, &mpi_status);
[1381]82      #endif
83       
[1500]84      status->mpi_status = &mpi_status;
[1381]85      status->ep_src = src;
86      status->ep_tag = tag;
87
[1500]88      (*message)->mpi_message = &message;
89      (*message)->ep_src = src;
90      (*message)->ep_tag = tag;
91     
92     
[1381]93      return 0;
94    }
95
[1500]96   
[1381]97
98    #pragma omp flush
99
100    #pragma omp critical (_query)
[1500]101    if(! comm->ep_comm_ptr->message_queue->empty())
[1381]102    {
[1500]103      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
[1381]104      {
[1500]105                                         
106        bool src_matched = src<0? true: (*it)->ep_src == src;
107        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
[1381]108       
109        if(src_matched && tag_matched)
110        {
111          *flag = true;
112
[1503]113          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
114          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
[1500]115          status->ep_src = (*it)->ep_src;
116          status->ep_tag = (*it)->ep_tag;
[1381]117
[1503]118          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
119          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
[1500]120          (*message)->ep_src = (*it)->ep_src;
121          (*message)->ep_tag = (*it)->ep_tag;
122                                     
[1381]123
124          #pragma omp critical (_query2)
125          {             
[1503]126            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
127            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
128            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
[1500]129           
[1503]130           
[1500]131            delete (*it)->mpi_message;     
132            delete (*it)->mpi_status; 
133            delete *it;
134           
[1503]135                       
[1500]136            comm->ep_comm_ptr->message_queue->erase(it);
[1503]137            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
[1381]138            #pragma omp flush
139          }
[1500]140         
[1381]141          break;
142        }
143
144      }
145    }
146  }
147
148}
149
Note: See TracBrowser for help on using the repository browser.