source: XIOS/dev/branch_yushan/extern/src_ep_dev/ep_probe.cpp @ 1037

Last change on this file since 1037 was 1037, checked in by yushan, 7 years ago

initialize the branch

File size: 3.8 KB
Line 
1#include "ep_lib.hpp"
2#include <mpi.h>
3#include "ep_declaration.hpp"
4
5namespace ep_lib
6{
7
8
9
10  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
11  {
12    *flag = false;
13
14    if(!comm.is_ep)
15    {
16      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm >(comm.mpi_comm);
17      ::MPI_Status *mpi_status = static_cast< ::MPI_Status* >(status->mpi_status);
18      ::MPI_Iprobe(src, tag, mpi_comm, flag, mpi_status);
19
20      status->mpi_status = mpi_status;
21      status->ep_src = src;
22      status->ep_tag = tag;
23      return 0;
24    }
25
26    Debug("calling MPI_Iprobe EP\n");
27
28    Message_Check(comm);
29
30    #pragma omp flush
31
32   
33    //printf("iprobe, message queue size = %lu, queue = %p\n", comm.ep_comm_ptr->message_queue->size(), comm.ep_comm_ptr->message_queue);
34
35    #pragma omp flush
36    #pragma critical (_query)
37    if(comm.ep_comm_ptr->message_queue->size() > 0)
38    {
39      for(Message_list::iterator it = comm.ep_comm_ptr->message_queue->begin(); it!= comm.ep_comm_ptr->message_queue->end(); it++)
40      {
41        bool src_matched = src<0? true: it->ep_src == src;
42        bool tag_matched = tag<0? true: it->ep_tag == tag;
43       
44        if(src_matched && tag_matched)       
45        {
46          Debug("find message\n");
47          // printf("iprobe, find in local message queue %p, src = %d, tag = %d\n", comm.ep_comm_ptr->message_queue, it->ep_src, it->ep_tag);
48
49          *flag = true;
50
51          ::MPI_Status mpi_status = *(static_cast< ::MPI_Status *>(it->mpi_status));
52
53          status->mpi_status = new ::MPI_Status(mpi_status);
54          status->ep_src = it->ep_src;
55          status->ep_tag = it->ep_tag;
56
57          break;
58        }
59
60      }
61    }
62
63    return 0;
64  }
65
66
67
68  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
69  {
70    *flag = false;
71    if(!comm.is_ep)
72    {
73      Debug("calling MPI_Improbe MPI\n");
74
75        ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm>(comm.mpi_comm);
76        ::MPI_Status mpi_status;
77      ::MPI_Message mpi_message;
78
79        #ifdef _openmpi
80      #pragma omp critical (_mpi_call)
81      {
82        ::MPI_Iprobe(src, tag, mpi_comm, flag, &mpi_status);
83        if(*flag)
84        {
85          ::MPI_Mprobe(src, tag, mpi_comm, &mpi_message, &mpi_status);
86        }
87      }
88      #elif _intelmpi
89        ::MPI_Improbe(src, tag, mpi_comm, flag, &mpi_message, &mpi_status);
90      #endif
91       
92      status->mpi_status = new ::MPI_Status(mpi_status);
93      status->ep_src = src;
94      status->ep_tag = tag;
95
96      message->mpi_message = mpi_message;
97      message->ep_src = src;
98      message->ep_tag = tag;
99        return 0;
100    }
101
102   
103    Message_Check(comm);
104
105    #pragma omp flush
106    #pragma critical (_query)
107    if(comm.ep_comm_ptr->message_queue->size() > 0)
108    {
109      for(Message_list::iterator it = comm.ep_comm_ptr->message_queue->begin(); it!= comm.ep_comm_ptr->message_queue->end(); it++)
110      {
111        bool src_matched = src<0? true: it->ep_src == src;
112        bool tag_matched = tag<0? true: it->ep_tag == tag;
113       
114        if(src_matched && tag_matched)
115        {
116          *flag = true;
117
118          ::MPI_Status mpi_status;
119          mpi_status = *(static_cast< ::MPI_Status *>(it->mpi_status));
120
121          status->mpi_status = new ::MPI_Status(mpi_status);
122          status->ep_src = it->ep_src;
123          status->ep_tag = it->ep_tag;
124
125          message->mpi_message = it->mpi_message;
126          message->ep_tag = it->ep_tag;
127          message->ep_src = it->ep_src;
128
129          #pragma omp critical (_query)
130          {             
131            //printf("local message erased. src = %d, dest = %d, tag = %d\n", it->ep_src, it->ep_dest, it->ep_tag);     
132            delete it->mpi_status;
133            comm.ep_comm_ptr->message_queue->erase(it);
134            #pragma omp flush
135          }
136
137          break;
138        }
139
140      }
141    }
142  }
143
144}
145
Note: See TracBrowser for help on using the repository browser.