source: XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp @ 1359

Last change on this file since 1359 was 1355, checked in by yushan, 7 years ago

unify MPI_Comm type

File size: 6.8 KB
RevLine 
[1134]1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
[1295]11#include "ep_mpi.hpp"
[1134]12
13using namespace std;
14
[1196]15extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
16#pragma omp threadprivate(EP_PendingRequests)
17
[1134]18namespace ep_lib
19{
20
21  int Message_Check(MPI_Comm comm)
22  {
23    if(!comm.is_ep) return 0;
24
25    if(comm.is_intercomm)
26    {
27      return  Message_Check_intercomm(comm);
28    }
29
30    int flag = true;
31    ::MPI_Message message;
32    ::MPI_Status status;
33    int mpi_source;
34
35    while(flag) // loop until the end of global queue
36    {
37      Debug("Message probing for intracomm\n");
[1295]38     
39
[1355]40      //#ifdef _openmpi
[1134]41      #pragma omp critical (_mpi_call)
42      {
[1295]43        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
[1134]44        if(flag)
45        {
46          Debug("find message in mpi comm \n");
47          mpi_source = status.MPI_SOURCE;
48          int tag = status.MPI_TAG;
[1295]49          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
[1134]50
51        }
52      }
[1355]53      //#elif _intelmpi
54      //#pragma omp critical (_mpi_call)
55      //{
56      //  ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
57      //  if(flag)
58      //  {
59      //    Debug("find message in mpi comm \n");
60      //    mpi_source = status.MPI_SOURCE;
61      //    int tag = status.MPI_TAG;
62      //    ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
63      //  }
64      //}
65      //#endif
[1176]66     
[1134]67      if(flag)
68      {
69
[1220]70        MPI_Message *msg_block = new MPI_Message; 
71        msg_block->mpi_message = message; 
72        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
73        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
74        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
75        int src_mpi       = status.MPI_SOURCE;
[1134]76             
[1220]77        msg_block->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);       
[1134]78        msg_block->mpi_status = new ::MPI_Status(status);
79
80        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
81        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
82
83
84        #pragma omp critical (_query)
85        {
86          #pragma omp flush
[1295]87          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);     
[1134]88          #pragma omp flush
89        }
90       
[1220]91        delete msg_block;       
[1134]92      }
93
94    }
95
96    return MPI_SUCCESS;
97  }
98
99
100
101  int Message_Check_intercomm(MPI_Comm comm)
102  {
103    if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
104
105    Debug("Message probing for intercomm\n");
106
107    int flag = true;
108    ::MPI_Message message;
109    ::MPI_Status status;
110    int mpi_source;
111    int current_ep_rank;
112    MPI_Comm_rank(comm, &current_ep_rank);
113
114    while(flag) // loop until the end of global queue "comm.ep_comm_ptr->intercomm->mpi_inter_comm"
115    {
116      Debug("Message probing for intracomm\n");
[1295]117
[1355]118      //#ifdef _openmpi
[1134]119      #pragma omp critical (_mpi_call)
120      {
[1295]121        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
[1134]122        if(flag)
123        {
124          Debug("find message in mpi comm \n");
125          mpi_source = status.MPI_SOURCE;
126          int tag = status.MPI_TAG;
[1295]127          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
[1134]128
129        }
130      }
[1355]131     
[1347]132
[1134]133      if(flag)
134      {
135
136        MPI_Message *msg_block = new MPI_Message;
137
138        msg_block->mpi_message = message;
139        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
140        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
141        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
142        int src_mpi       = status.MPI_SOURCE;
143        int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
144             
145        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc,  src_mpi);
146        msg_block->mpi_status = new ::MPI_Status(status);
147
148
149        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
150        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
151
152
153        #pragma omp critical (_query)
154        {
155          #pragma omp flush
[1295]156          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
[1134]157          #pragma omp flush
158        }
159       
160        delete msg_block;
161       
162      }
163
164    }
165
166    flag = true;
167    while(flag) // loop until the end of global queue "comm.mpi_comm"
168    {
169      Debug("Message probing for intracomm\n");
[1295]170     
[1355]171      //#ifdef _openmpi
[1134]172      #pragma omp critical (_mpi_call)
173      {
[1295]174        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
[1134]175        if(flag)
176        {
177          Debug("find message in mpi comm \n");
178          mpi_source = status.MPI_SOURCE;
179          int tag = status.MPI_TAG;
[1295]180          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
[1134]181
182        }
183      }
[1355]184     
[1347]185
[1134]186      if(flag)
187      {
188
189        MPI_Message *msg_block = new MPI_Message;
190       
191        msg_block->mpi_message = message;
192        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
193        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
194        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
195        int src_mpi       = status.MPI_SOURCE;
196       
197        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc, src_mpi);
198        msg_block->mpi_status = new ::MPI_Status(status);
199       
200
201        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
202        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
203
204
205        #pragma omp critical (_query)
206        {
207          #pragma omp flush
[1295]208          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
[1134]209          #pragma omp flush
210        }
211       
212        delete msg_block;
213       
214      }
215
216    }
217
218    return MPI_SUCCESS;
219  }
220
[1196]221  int Request_Check()
222  {
223    MPI_Status status;
224    MPI_Message message;
225    int probed = false;
226    int recv_count = 0;
227    std::list<MPI_Request* >::iterator it;
228   
229    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
230    { 
231      Message_Check((*it)->comm);
232    }
233
234
235    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
236    {
237      MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status);
238      if(probed)
239      {
240        MPI_Get_count(&status, (*it)->ep_datatype, &recv_count);
241        MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it);
242        (*it)->type = 3;
243        EP_PendingRequests->erase(it);
244        it = EP_PendingRequests->begin();
245        continue;
246      }
247      it++;
248    }
249  }
250
[1134]251}
Note: See TracBrowser for help on using the repository browser.