source: XIOS/dev/branch_openmp/extern/ep_dev/ep_probe.cpp @ 1527

Last change on this file since 1527 was 1527, checked in by yushan, 6 years ago

save dev

File size: 4.7 KB
RevLine 
[1381]1#include "ep_lib.hpp"
2#include <mpi.h>
3#include "ep_declaration.hpp"
4#include "ep_mpi.hpp"
5
6namespace ep_lib
7{
[1522]8  int MPI_Iprobe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
9  {
[1527]10    ::MPI_Status mpi_status;
11
12    ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
13
14    status->mpi_status = new ::MPI_Status(mpi_status);
[1522]15    status->ep_src = src;
16    status->ep_tag = tag;
17  }
[1381]18
[1527]19
20  int MPI_Improbe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
21  {
22    ::MPI_Status mpi_status;
23    ::MPI_Message mpi_message;
24
25    #ifdef _openmpi
26    #pragma omp critical (_mpi_call)
27    {
28      ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
29      if(*flag)
30      {
31        ::MPI_Mprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), &mpi_message, &mpi_status);
32      }
33    }
34    #elif _intelmpi
35    ::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_message, &mpi_status);
36    #endif
37     
38    status->mpi_status = new ::MPI_Status(mpi_status);
39    status->ep_src = src;
40    status->ep_tag = tag;
41
42    (*message)->mpi_message = &message;
43    (*message)->ep_src = src;
44    (*message)->ep_tag = tag;
45  }
46
47
48
49
[1381]50  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
51  {
[1500]52    if(!comm->is_ep)
[1381]53    {
[1522]54      Debug("MPI_Iprobe with MPI\n");
55      return MPI_Iprobe_mpi(src, tag, comm, flag, status);
[1381]56    }
[1522]57   
58    else
59    {
60      Debug("MPI_Iprobe with EP\n");
[1527]61
[1522]62      *flag = false;
63   
64      Message_Check(comm);
[1381]65
[1522]66      #pragma omp flush
[1381]67
[1522]68      #pragma omp critical (_query)
[1500]69      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
[1381]70      {
[1500]71        bool src_matched = src<0? true: (*it)->ep_src == src;
72        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
[1381]73       
74        if(src_matched && tag_matched)       
75        {
76          Debug("find message\n");
[1522]77         
[1381]78
[1522]79          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
[1500]80          status->ep_src = (*it)->ep_src;
81          status->ep_tag = (*it)->ep_tag;
[1381]82
[1522]83          *flag = true;
[1381]84          break;
85        }
86      }
87    }
88  }
89
[1527]90 
[1381]91
92
[1527]93
[1381]94  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
95  {
[1503]96    int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first;
97    int mpi_rank    = comm->ep_comm_ptr->size_rank_info[2].first;
[1381]98    *flag = false;
[1500]99    if(!comm->is_ep)
[1381]100    {
101      Debug("calling MPI_Improbe MPI\n");
[1527]102      return MPI_Improbe_mpi(src, tag, comm, flag, message, status);
103    } 
[1381]104
105    #pragma omp flush
106
107    #pragma omp critical (_query)
[1500]108    if(! comm->ep_comm_ptr->message_queue->empty())
[1381]109    {
[1500]110      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
[1381]111      {
[1500]112                                         
113        bool src_matched = src<0? true: (*it)->ep_src == src;
114        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
[1381]115       
116        if(src_matched && tag_matched)
117        {
118          *flag = true;
119
[1503]120          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
121          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
[1500]122          status->ep_src = (*it)->ep_src;
123          status->ep_tag = (*it)->ep_tag;
[1381]124
[1503]125          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
126          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
[1500]127          (*message)->ep_src = (*it)->ep_src;
128          (*message)->ep_tag = (*it)->ep_tag;
129                                     
[1381]130
131          #pragma omp critical (_query2)
132          {             
[1503]133            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
134            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
135            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
[1500]136           
[1503]137           
[1500]138            delete (*it)->mpi_message;     
139            delete (*it)->mpi_status; 
140            delete *it;
141           
[1503]142                       
[1500]143            comm->ep_comm_ptr->message_queue->erase(it);
[1503]144            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
[1381]145            #pragma omp flush
146          }
[1500]147         
[1381]148          break;
149        }
150
151      }
152    }
153  }
154
155}
156
Note: See TracBrowser for help on using the repository browser.