source: XIOS/dev/dev_trunk_omp/extern/src_ep_dev/ep_probe.cpp @ 1603

Last change on this file since 1603 was 1603, checked in by yushan, 2 years ago

branch_openmp merged with trunk r1597

File size: 5.6 KB
Line 
1#include "ep_lib.hpp"
2#include <mpi.h>
3#include "ep_declaration.hpp"
4#include "ep_mpi.hpp"
5
6namespace ep_lib
7{
8  int MPI_Iprobe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
9  {
10    ::MPI_Status mpi_status;
11
12    ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
13
14    status->mpi_status = new ::MPI_Status(mpi_status);
15    status->ep_src = src;
16    status->ep_tag = tag;
17  }
18
19
20  int MPI_Improbe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
21  {
22    ::MPI_Status mpi_status;
23    ::MPI_Message mpi_message;
24
25    #ifdef _openmpi
26    #pragma omp critical (_mpi_call)
27    {
28      ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
29      if(*flag)
30      {
31        ::MPI_Mprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), &mpi_message, &mpi_status);
32      }
33    }
34    #elif _intelmpi
35    ::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_message, &mpi_status);
36    #endif
37     
38    status->mpi_status = new ::MPI_Status(mpi_status);
39    status->ep_src = src;
40    status->ep_tag = tag;
41
42    (*message)->mpi_message = &message;
43    (*message)->ep_src = src;
44    (*message)->ep_tag = tag;
45  }
46
47
48  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
49  {
50    if(!comm->is_ep)
51    {
52      Debug("MPI_Iprobe with MPI\n");
53      return MPI_Iprobe_mpi(src, tag, comm, flag, status);
54    }
55
56    if(comm->is_intercomm)
57    {
58      if(src>=0) src = comm->inter_rank_map->at(src);
59    } 
60   
61    return MPI_Iprobe_endpoint(src, tag, comm, flag, status);
62  }
63
64  int MPI_Iprobe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
65  {
66    Debug("MPI_Iprobe with EP\n");
67
68    *flag = false;
69   
70    Message_Check(comm);
71
72    #pragma omp flush
73
74    #pragma omp critical (_query)
75    for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
76    {
77      bool src_matched = src<0? true: (*it)->ep_src == src;
78      bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
79     
80      if(src_matched && tag_matched)       
81      {
82        Debug("find message\n");
83         
84        status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
85        status->ep_src = (*it)->ep_src;
86        status->ep_tag = (*it)->ep_tag;
87       
88        if(comm->is_intercomm)
89        {
90          for(INTER_RANK_MAP::iterator iter = comm->inter_rank_map->begin(); iter != comm->inter_rank_map->end(); iter++)
91          {
92            if(iter->second == (*it)->ep_src) status->ep_src=iter->first;
93          }
94        }
95
96        *flag = true;
97        break;
98      }
99    }
100  }
101
102 
103
104  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
105  {
106    if(!comm->is_ep)
107    {
108      Debug("MPI_Iprobe with MPI\n");
109      return MPI_Improbe_mpi(src, tag, comm, flag, message, status);
110    }
111
112    if(comm->is_intercomm)
113    {
114      src = comm->inter_rank_map->at(src);
115      *message = new ep_message;
116      printf("============= new *message = %p\n", *message);
117    } 
118   
119    return MPI_Improbe_endpoint(src, tag, comm, flag, message, status);
120  }
121
122
123
124  int MPI_Improbe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
125  {
126    int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first;
127    int mpi_rank    = comm->ep_comm_ptr->size_rank_info[2].first;
128
129    *flag = false;
130   
131    Message_Check(comm);
132   
133    #pragma omp flush
134
135    #pragma omp critical (_query)
136    if(! comm->ep_comm_ptr->message_queue->empty())
137    {
138      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
139      {
140                                         
141        bool src_matched = src<0? true: (*it)->ep_src == src;
142        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
143       
144        if(src_matched && tag_matched)
145        {
146          *flag = true;
147
148          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
149          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
150          status->ep_src = (*it)->ep_src;
151          status->ep_tag = (*it)->ep_tag;
152
153          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
154          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
155          (*message)->ep_src = (*it)->ep_src;
156          (*message)->ep_tag = (*it)->ep_tag;
157                                     
158
159          #pragma omp critical (_query2)
160          {             
161            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
162            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
163            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
164           
165           
166            delete (*it)->mpi_message;     
167            delete (*it)->mpi_status; 
168            delete *it;
169           
170                       
171            comm->ep_comm_ptr->message_queue->erase(it);
172            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
173            #pragma omp flush
174          }
175         
176          break;
177        }
178
179      }
180    }
181  }
182
183}
184
185
Note: See TracBrowser for help on using the repository browser.