#include "ep_lib.hpp" #include #include "ep_declaration.hpp" #include "ep_mpi.hpp" using namespace std; namespace ep_lib { int MPI_Intercomm_create_kernel(MPI_Comm local_comm, int local_leader, MPI_Comm peer_comm, int remote_leader, int tag, MPI_Comm *newintercomm) { int ep_rank, ep_rank_loc, mpi_rank; int ep_size, num_ep, mpi_size; ep_rank = local_comm->ep_comm_ptr->size_rank_info[0].first; ep_rank_loc = local_comm->ep_comm_ptr->size_rank_info[1].first; mpi_rank = local_comm->ep_comm_ptr->size_rank_info[2].first; ep_size = local_comm->ep_comm_ptr->size_rank_info[0].second; num_ep = local_comm->ep_comm_ptr->size_rank_info[1].second; mpi_size = local_comm->ep_comm_ptr->size_rank_info[2].second; // step 1 : local leaders exchange ep_size then bcast to all ep in local_comm int remote_ep_size; bool is_local_leader = ep_rank==local_leader? true: false; if(is_local_leader) { MPI_Request request[2]; MPI_Status status[2]; MPI_Isend(&ep_size, 1, MPI_INT, remote_leader, tag, peer_comm, &request[0]); MPI_Irecv(&remote_ep_size, 1, MPI_INT, remote_leader, tag, peer_comm, &request[1]); MPI_Waitall(2, request, status); } MPI_Bcast(&remote_ep_size, 1, MPI_INT, local_leader, local_comm); #ifdef _showinfo MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); #pragma omp critical(stdoutput) printf("peer_rank = %d, ep_size = %d, remote_ep_size = %d\n", peer_comm->ep_comm_ptr->size_rank_info[0].first, ep_size, remote_ep_size); MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); #endif // step 2 : gather ranks in world for both local and remote comm int rank_in_world; ::MPI_Comm_rank(to_mpi_comm(MPI_COMM_WORLD->mpi_comm), &rank_in_world); int *ranks_in_world_local = new int[ep_size]; int *ranks_in_world_remote = new int[remote_ep_size]; MPI_Allgather(&rank_in_world, 1, MPI_INT, ranks_in_world_local, 1, MPI_INT, local_comm); if(is_local_leader) { MPI_Request request[2]; MPI_Status status[2]; MPI_Isend(ranks_in_world_local, ep_size, MPI_INT, remote_leader, tag, peer_comm, &request[0]); MPI_Irecv(ranks_in_world_remote, remote_ep_size, MPI_INT, remote_leader, tag, peer_comm, &request[1]); MPI_Waitall(2, request, status); } MPI_Bcast(ranks_in_world_remote, remote_ep_size, MPI_INT, local_leader, local_comm); #ifdef _showinfo MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); if(remote_leader == 4) { for(int i=0; iep_comm_ptr->size_rank_info[0].first); for(int i=0; iep_comm_ptr->size_rank_info[0].first); for(int i=0; iep_comm_ptr->size_rank_info[0].first); for(int i=0; iep_comm_ptr->size_rank_info[0].first); for(int i=0; i remote_leader_rank_in_peer? true : false; #ifdef _showinfo MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); printf("peer_rank = %d, priority = %d, local_leader_rank_in_peer = %d, remote_leader_rank_in_peer = %d\n", peer_comm->ep_comm_ptr->size_rank_info[0].first, priority, local_leader_rank_in_peer, remote_leader_rank_in_peer); MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); #endif int local_leader_rank_in_world = ranks_in_world_local[local_leader]; int remote_leader_rank_in_world; if(is_local_leader) { MPI_Request request[2]; MPI_Status status[2]; MPI_Isend(&local_leader_rank_in_world, 1, MPI_INT, remote_leader, tag, peer_comm, &request[0]); MPI_Irecv(&remote_leader_rank_in_world, 1, MPI_INT, remote_leader, tag, peer_comm, &request[1]); MPI_Waitall(2, request, status); } MPI_Bcast(&remote_leader_rank_in_world, 1, MPI_INT, local_leader, local_comm); int ownership; if(rank_in_world == ranks_in_world_local[local_leader]) ownership = 1; else if(rank_in_world == remote_leader_rank_in_world) ownership = 0; else { ownership = 1; for(int i=0; iep_comm_ptr->size_rank_info[0].first, priority, ownership); MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); #endif // step 4 : extract local_comm and create intercomm bool is_involved = is_local_leader || (!is_local_leader && ep_rank_loc == 0 && rank_in_world != local_leader_rank_in_world); #ifdef _showinfo MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); printf("peer_rank = %d, is_involved = %d\n", peer_comm->ep_comm_ptr->size_rank_info[0].first, is_involved); MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); #endif if(is_involved) { ::MPI_Group local_group; ::MPI_Group extracted_group; ::MPI_Comm *extracted_comm = new ::MPI_Comm; ::MPI_Comm_group(to_mpi_comm(local_comm->mpi_comm), &local_group); int *ownership_list = new int[mpi_size]; int *mpi_rank_list = new int[mpi_size]; ::MPI_Allgather(&ownership, 1, to_mpi_type(MPI_INT), ownership_list, 1, to_mpi_type(MPI_INT), to_mpi_comm(local_comm->mpi_comm)); ::MPI_Allgather(&mpi_rank, 1, to_mpi_type(MPI_INT), mpi_rank_list, 1, to_mpi_type(MPI_INT), to_mpi_comm(local_comm->mpi_comm)); int n=0; for(int i=0; impi_comm), extracted_group, extracted_comm); ::MPI_Comm *mpi_inter_comm = new ::MPI_Comm; int local_leader_rank_in_extracted_comm; if(is_local_leader) { ::MPI_Comm_rank(*extracted_comm, &local_leader_rank_in_extracted_comm); } ::MPI_Bcast(&local_leader_rank_in_extracted_comm, 1, to_mpi_type(MPI_INT), local_comm->ep_rank_map->at(local_leader).second, to_mpi_comm(local_comm->mpi_comm)); int local_leader_rank_in_peer_mpi; int remote_leader_rank_in_peer_mpi; ::MPI_Comm_rank(to_mpi_comm(peer_comm->mpi_comm), &local_leader_rank_in_peer_mpi); if(is_local_leader) { MPI_Request request[2]; MPI_Status status[2]; MPI_Isend(&local_leader_rank_in_peer_mpi, 1, MPI_INT, remote_leader, tag, peer_comm, &request[0]); MPI_Irecv(&remote_leader_rank_in_peer_mpi, 1, MPI_INT, remote_leader, tag, peer_comm, &request[1]); MPI_Waitall(2, request, status); } ::MPI_Bcast(&remote_leader_rank_in_peer_mpi, 1, to_mpi_type(MPI_INT), local_comm->ep_rank_map->at(local_leader).second, to_mpi_comm(local_comm->mpi_comm)); if(ownership) ::MPI_Intercomm_create(*extracted_comm, local_leader_rank_in_extracted_comm, to_mpi_comm(peer_comm->mpi_comm), remote_leader_rank_in_peer_mpi, tag, mpi_inter_comm); // step 5 :: determine new num_ep int num_ep_count=0; for(int i=0; iep_comm_ptr->size_rank_info[0].first, num_ep_count); // step 6 : create endpoints from extracted_comm if(ownership) { MPI_Comm *ep_comm; MPI_Info info; MPI_Comm_create_endpoints(extracted_comm, num_ep_count, info, ep_comm); for(int i=0; iis_intercomm = true; ep_comm[i]->ep_comm_ptr->comm_label = ranks_in_world_local[local_leader]; ep_comm[i]->ep_comm_ptr->intercomm = new ep_lib::ep_intercomm; ep_comm[i]->ep_comm_ptr->intercomm->mpi_inter_comm = mpi_inter_comm; } //delete ep_comm[0]->ep_rank_map; #pragma omp critical (write_to_tag_list) intercomm_list.push_back(make_pair( make_pair(tag, min(local_leader_rank_in_world, remote_leader_rank_in_world)) , make_pair(ep_comm , make_pair(num_ep_count, 0)))); #pragma omp flush #ifdef _showinfo for(int i=0; i new_ep_rank = %d\n", peer_comm->ep_comm_ptr->size_rank_info[0].first, ep_comm, i, ep_comm[i]->ep_comm_ptr->size_rank_info[0].first); #endif } delete ownership_list; delete mpi_rank_list; delete new_mpi_rank_list; } int repeated=0; for(int i=0; iep_comm_ptr->size_rank_info[0].first, ep_rank_loc, ownership, repeated, my_turn); MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); #endif #pragma omp flush #pragma omp critical (read_from_intercomm_list) { bool flag=true; while(flag) { for(std::list , std::pair > > >::iterator iter = intercomm_list.begin(); iter!=intercomm_list.end(); iter++) { if(iter->first == make_pair(tag, min(local_leader_rank_in_world, remote_leader_rank_in_world))) { *newintercomm = iter->second.first[my_turn]; iter->second.second.second++; if(iter->second.second.first == iter->second.second.second) intercomm_list.erase(iter); flag = false; break; } } } } #ifdef _showinfo MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); printf("peer_rank = %d, test_rank = %d\n", peer_comm->ep_comm_ptr->size_rank_info[0].first, (*newintercomm)->ep_comm_ptr->size_rank_info[0].first); MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); #endif // step 7 : create intercomm_rank_map for local leaders int my_quadruple[4]; my_quadruple[0] = ep_rank; my_quadruple[1] = (*newintercomm)->ep_comm_ptr->size_rank_info[1].first; my_quadruple[2] = (*newintercomm)->ep_comm_ptr->size_rank_info[2].first; my_quadruple[3] = (*newintercomm)->ep_comm_ptr->comm_label; #ifdef _showinfo MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); printf("peer_rank = %d, my_quadruple = %d %d %d %d\n", peer_comm->ep_comm_ptr->size_rank_info[0].first, my_quadruple[0], my_quadruple[1], my_quadruple[2], my_quadruple[3]); MPI_Barrier(peer_comm); MPI_Barrier(peer_comm); #endif int *local_quadruple_list; int *remote_quadruple_list; if(is_involved) { local_quadruple_list = new int[4*ep_size]; remote_quadruple_list = new int[4*remote_ep_size]; } MPI_Gather(my_quadruple, 4, MPI_INT, local_quadruple_list, 4, MPI_INT, local_leader, local_comm); if(is_local_leader) { MPI_Request request[2]; MPI_Status status[2]; MPI_Isend(local_quadruple_list, 4*ep_size, MPI_INT, remote_leader, tag, peer_comm, &request[0]); MPI_Irecv(remote_quadruple_list, 4*remote_ep_size, MPI_INT, remote_leader, tag, peer_comm, &request[1]); MPI_Waitall(2, request, status); } if(is_involved) { ::MPI_Bcast(remote_quadruple_list, 4*remote_ep_size, to_mpi_type(MPI_INT), local_comm->ep_rank_map->at(local_leader).second, to_mpi_comm(local_comm->mpi_comm)); (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map = new INTERCOMM_RANK_MAP; for(int i=0; iep_comm_ptr->intercomm->intercomm_rank_map->insert(std::pair > >(remote_quadruple_list[4*i], remote_quadruple_list[4*i+1], remote_quadruple_list[4*i+2], remote_quadruple_list[4*i+3])); } } // step 8 : associate intercomm_rank_map to endpoints int *leader_rank_in_world_local_gathered = new int[(*newintercomm)->ep_comm_ptr->size_rank_info[1].second]; MPI_Allgather_local(&local_leader_rank_in_world, 1, MPI_INT, leader_rank_in_world_local_gathered, *newintercomm); int new_rank_loc = (*newintercomm)->ep_comm_ptr->size_rank_info[1].first; int *new_rank_loc_local_gathered = new int[(*newintercomm)->ep_comm_ptr->size_rank_info[1].second]; MPI_Allgather_local(&new_rank_loc, 1, MPI_INT, new_rank_loc_local_gathered, *newintercomm); //printf("peer_rank = %d, leader_rank_in_world_local_gathered = %d %d %d %d, new_rank_loc_local_gathered = %d %d %d %d\n", // peer_comm->ep_comm_ptr->size_rank_info[0].first, leader_rank_in_world_local_gathered[0], leader_rank_in_world_local_gathered[1], leader_rank_in_world_local_gathered[2], leader_rank_in_world_local_gathered[3], // new_rank_loc_local_gathered[0], new_rank_loc_local_gathered[1], new_rank_loc_local_gathered[2], new_rank_loc_local_gathered[3]); if(is_involved) { if((*newintercomm)->ep_comm_ptr->size_rank_info[1].first == 0) { (*newintercomm)->ep_rank_map->clear(); delete (*newintercomm)->ep_rank_map; } (*newintercomm)->ep_rank_map = new EP_RANK_MAP[ep_size]; *((*newintercomm)->ep_rank_map) = *(local_comm->ep_rank_map); } MPI_Barrier_local(*newintercomm); if(!is_involved) { int target; for(int i=0; i<(*newintercomm)->ep_comm_ptr->size_rank_info[1].second; i++) { if(local_leader_rank_in_world == leader_rank_in_world_local_gathered[i]) { target = i; break; } } (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map = (*newintercomm)->ep_comm_ptr->comm_list[target]->ep_comm_ptr->intercomm->intercomm_rank_map; (*newintercomm)->ep_rank_map = (*newintercomm)->ep_comm_ptr->comm_list[target]->ep_rank_map; } //printf("peer_rank = %d, intercomm_rank_map add = %p\n", peer_comm->ep_comm_ptr->size_rank_info[0].first, (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map); if(peer_comm->ep_comm_ptr->size_rank_info[0].first == 5) { int receiver = rand()%10; printf("receiver = %d, intercomm_local_rank = %d, intercomm_mpi_rank = %d, comm_label = %d\n", receiver, (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->at(receiver).first, (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->at(receiver).second.first, (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->at(receiver).second.second); } if(peer_comm->ep_comm_ptr->size_rank_info[0].first == 9) { int receiver = rand()%6; printf("receiver = %d, intercomm_local_rank = %d, intercomm_mpi_rank = %d, comm_label = %d\n", receiver, (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->at(receiver).first, (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->at(receiver).second.first, (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->at(receiver).second.second); } //printf("peer_rank = %d, rank_map add = %p\n", peer_comm->ep_comm_ptr->size_rank_info[0].first, (*newintercomm)->ep_rank_map); if(peer_comm->ep_comm_ptr->size_rank_info[0].first == 5) { for(int i=0; iat(%d) = %d, %d\n", i, (*newintercomm)->ep_rank_map->at(i).first, (*newintercomm)->ep_rank_map->at(i).second); } } // clean up delete ranks_in_world_local; delete ranks_in_world_remote; if(is_involved) { delete local_quadruple_list; delete remote_quadruple_list; } delete leader_rank_in_world_local_gathered; delete new_rank_loc_local_gathered; } int MPI_Intercomm_create_kernel_bkp(MPI_Comm local_comm, int local_leader, MPI_Comm peer_comm, int remote_leader, int tag, MPI_Comm *newintercomm) { int ep_rank, ep_rank_loc, mpi_rank; int ep_size, num_ep, mpi_size; ep_rank = local_comm->ep_comm_ptr->size_rank_info[0].first; ep_rank_loc = local_comm->ep_comm_ptr->size_rank_info[1].first; mpi_rank = local_comm->ep_comm_ptr->size_rank_info[2].first; ep_size = local_comm->ep_comm_ptr->size_rank_info[0].second; num_ep = local_comm->ep_comm_ptr->size_rank_info[1].second; mpi_size = local_comm->ep_comm_ptr->size_rank_info[2].second; std::vector rank_info[4]; //! 0->rank_in_world of local_comm, 1->rank_in_local_parent of local_comm //! 2->rank_in_world of remote_comm, 3->rank_in_local_parent of remote_comm int rank_in_world; int rank_in_local_parent; int rank_in_peer_mpi[2]; int local_ep_size = ep_size; int remote_ep_size; ::MPI_Comm local_mpi_comm = to_mpi_comm(local_comm->mpi_comm); ::MPI_Comm_rank(to_mpi_comm(MPI_COMM_WORLD->mpi_comm), &rank_in_world); ::MPI_Comm_rank(local_mpi_comm, &rank_in_local_parent); bool is_proc_master = false; bool is_local_leader = false; bool is_final_master = false; if(ep_rank == local_leader) { is_proc_master = true; is_local_leader = true; is_final_master = true;} if(ep_rank_loc == 0 && mpi_rank != local_comm->ep_rank_map->at(local_leader).second) is_proc_master = true; int size_info[4]; //! used for choose size of rank_info 0-> mpi_size of local_comm, 1-> mpi_size of remote_comm int leader_info[4]; //! 0->world rank of local_leader, 1->world rank of remote leader std::vector ep_info[2]; //! 0-> num_ep in local_comm, 1->num_ep in remote_comm std::vector new_rank_info[4]; std::vector new_ep_info[2]; std::vector offset; if(is_proc_master) { size_info[0] = mpi_size; rank_info[0].resize(size_info[0]); rank_info[1].resize(size_info[0]); ep_info[0].resize(size_info[0]); vector send_buf(6); vector recv_buf(3*size_info[0]); send_buf[0] = rank_in_world; send_buf[1] = rank_in_local_parent; send_buf[2] = num_ep; ::MPI_Allgather(send_buf.data(), 3, to_mpi_type(MPI_INT), recv_buf.data(), 3, to_mpi_type(MPI_INT), local_mpi_comm); for(int i=0; impi_comm), &rank_in_peer_mpi[0]); send_buf[0] = size_info[0]; send_buf[1] = local_ep_size; send_buf[2] = rank_in_peer_mpi[0]; MPI_Request requests[2]; MPI_Status statuses[2]; MPI_Isend(send_buf.data(), 3, MPI_INT, remote_leader, tag, peer_comm, &requests[0]); MPI_Irecv(recv_buf.data(), 3, MPI_INT, remote_leader, tag, peer_comm, &requests[1]); MPI_Waitall(2, requests, statuses); size_info[1] = recv_buf[0]; remote_ep_size = recv_buf[1]; rank_in_peer_mpi[1] = recv_buf[2]; } send_buf[0] = size_info[1]; send_buf[1] = leader_info[0]; send_buf[2] = leader_info[1]; send_buf[3] = rank_in_peer_mpi[0]; send_buf[4] = rank_in_peer_mpi[1]; ::MPI_Bcast(send_buf.data(), 5, to_mpi_type(MPI_INT), local_comm->ep_rank_map->at(local_leader).second, local_mpi_comm); size_info[1] = send_buf[0]; leader_info[0] = send_buf[1]; leader_info[1] = send_buf[2]; rank_in_peer_mpi[0] = send_buf[3]; rank_in_peer_mpi[1] = send_buf[4]; rank_info[2].resize(size_info[1]); rank_info[3].resize(size_info[1]); ep_info[1].resize(size_info[1]); send_buf.resize(3*size_info[0]); recv_buf.resize(3*size_info[1]); if(is_local_leader) { MPI_Request requests[2]; MPI_Status statuses[2]; std::copy ( rank_info[0].data(), rank_info[0].data() + size_info[0], send_buf.begin() ); std::copy ( rank_info[1].data(), rank_info[1].data() + size_info[0], send_buf.begin() + size_info[0] ); std::copy ( ep_info[0].data(), ep_info[0].data() + size_info[0], send_buf.begin() + 2*size_info[0] ); MPI_Isend(send_buf.data(), 3*size_info[0], MPI_INT, remote_leader, tag+1, peer_comm, &requests[0]); MPI_Irecv(recv_buf.data(), 3*size_info[1], MPI_INT, remote_leader, tag+1, peer_comm, &requests[1]); MPI_Waitall(2, requests, statuses); } ::MPI_Bcast(recv_buf.data(), 3*size_info[1], to_mpi_type(MPI_INT), local_comm->ep_rank_map->at(local_leader).second, local_mpi_comm); std::copy ( recv_buf.data(), recv_buf.data() + size_info[1], rank_info[2].begin() ); std::copy ( recv_buf.data() + size_info[1], recv_buf.data() + 2*size_info[1], rank_info[3].begin() ); std::copy ( recv_buf.data() + 2*size_info[1], recv_buf.data() + 3*size_info[1], ep_info[1].begin() ); offset.resize(size_info[0]); if(leader_info[0]size of new_ep_info for local, 3->size of new_ep_info for remote if(is_local_leader) { size_info[2] = new_ep_info[0].size(); MPI_Request requests[2]; MPI_Status statuses[2]; MPI_Isend(&size_info[2], 1, MPI_INT, remote_leader, tag+2, peer_comm, &requests[0]); MPI_Irecv(&size_info[3], 1, MPI_INT, remote_leader, tag+2, peer_comm, &requests[1]); MPI_Waitall(2, requests, statuses); } ::MPI_Bcast(&size_info[2], 2, to_mpi_type(MPI_INT), local_comm->ep_rank_map->at(local_leader).second, local_mpi_comm); new_rank_info[2].resize(size_info[3]); new_rank_info[3].resize(size_info[3]); new_ep_info[1].resize(size_info[3]); send_buf.resize(size_info[2]); recv_buf.resize(size_info[3]); if(is_local_leader) { MPI_Request requests[2]; MPI_Status statuses[2]; std::copy ( new_rank_info[0].data(), new_rank_info[0].data() + size_info[2], send_buf.begin() ); std::copy ( new_rank_info[1].data(), new_rank_info[1].data() + size_info[2], send_buf.begin() + size_info[2] ); std::copy ( new_ep_info[0].data(), new_ep_info[0].data() + size_info[0], send_buf.begin() + 2*size_info[2] ); MPI_Isend(send_buf.data(), 3*size_info[2], MPI_INT, remote_leader, tag+3, peer_comm, &requests[0]); MPI_Irecv(recv_buf.data(), 3*size_info[3], MPI_INT, remote_leader, tag+3, peer_comm, &requests[1]); MPI_Waitall(2, requests, statuses); } ::MPI_Bcast(recv_buf.data(), 3*size_info[3], to_mpi_type(MPI_INT), local_comm->ep_rank_map->at(local_leader).second, local_mpi_comm); std::copy ( recv_buf.data(), recv_buf.data() + size_info[3], new_rank_info[2].begin() ); std::copy ( recv_buf.data() + size_info[3], recv_buf.data() + 2*size_info[3], new_rank_info[3].begin() ); std::copy ( recv_buf.data() + 2*size_info[3], recv_buf.data() + 3*size_info[3], new_ep_info[1].begin() ); } if(is_proc_master) { //! leader_info[4]: 2-> rank of local leader in new_group generated comm; // 3-> rank of remote leader in new_group generated comm; ::MPI_Group local_group; ::MPI_Group new_group; ::MPI_Comm *new_comm = new ::MPI_Comm; ::MPI_Comm *intercomm = new ::MPI_Comm; ::MPI_Comm_group(local_mpi_comm, &local_group); ::MPI_Group_incl(local_group, size_info[2], new_rank_info[1].data(), &new_group); ::MPI_Comm_create(local_mpi_comm, new_group, new_comm); if(is_local_leader) { ::MPI_Comm_rank(*new_comm, &leader_info[2]); } ::MPI_Bcast(&leader_info[2], 1, to_mpi_type(MPI_INT), local_comm->ep_rank_map->at(local_leader).second, local_mpi_comm); if(new_comm != static_cast< ::MPI_Comm*>(MPI_COMM_NULL->mpi_comm)) { ::MPI_Barrier(*new_comm); ::MPI_Intercomm_create(*new_comm, leader_info[2], to_mpi_comm(peer_comm->mpi_comm), rank_in_peer_mpi[1], tag, intercomm); int id; ::MPI_Comm_rank(*new_comm, &id); int my_num_ep = new_ep_info[0][id]; MPI_Comm *ep_intercomm; MPI_Info info; MPI_Comm_create_endpoints(new_comm, my_num_ep, info, ep_intercomm); for(int i= 0; iis_intercomm = true; ep_intercomm[i]->ep_comm_ptr->intercomm = new ep_lib::ep_intercomm; ep_intercomm[i]->ep_comm_ptr->intercomm->mpi_inter_comm = intercomm; ep_intercomm[i]->ep_comm_ptr->comm_label = leader_info[0]; } #pragma omp critical (write_to_tag_list) tag_list.push_back(make_pair( make_pair(tag, min(leader_info[0], leader_info[1])) , ep_intercomm)); //printf("tag_list size = %lu\n", tag_list.size()); } } vector bcast_buf(8); if(is_local_leader) { std::copy(size_info, size_info+4, bcast_buf.begin()); std::copy(leader_info, leader_info+4, bcast_buf.begin()+4); } MPI_Bcast(bcast_buf.data(), 8, MPI_INT, local_leader, local_comm); if(!is_local_leader) { std::copy(bcast_buf.begin(), bcast_buf.begin()+4, size_info); std::copy(bcast_buf.begin()+4, bcast_buf.begin()+8, leader_info); } if(!is_local_leader) { new_rank_info[1].resize(size_info[2]); ep_info[1].resize(size_info[1]); offset.resize(size_info[0]); } bcast_buf.resize(size_info[2]+size_info[1]+size_info[0]+1); if(is_local_leader) { bcast_buf[0] = remote_ep_size; std::copy(new_rank_info[1].data(), new_rank_info[1].data()+size_info[2], bcast_buf.begin()+1); std::copy(ep_info[1].data(), ep_info[1].data()+size_info[1], bcast_buf.begin()+size_info[2]+1); std::copy(offset.data(), offset.data()+size_info[0], bcast_buf.begin()+size_info[2]+size_info[1]+1); } MPI_Bcast(bcast_buf.data(), size_info[2]+size_info[1]+size_info[0]+1, MPI_INT, local_leader, local_comm); if(!is_local_leader) { remote_ep_size = bcast_buf[0]; std::copy(bcast_buf.data()+1, bcast_buf.data()+1+size_info[2], new_rank_info[1].begin()); std::copy(bcast_buf.data()+1+size_info[2], bcast_buf.data()+1+size_info[2]+size_info[1], ep_info[1].begin()); std::copy(bcast_buf.data()+1+size_info[2]+size_info[1], bcast_buf.data()+1+size_info[2]+size_info[1]+size_info[0], offset.begin()); } int my_position = offset[rank_in_local_parent]+ep_rank_loc; MPI_Barrier_local(local_comm); #pragma omp flush #pragma omp critical (read_from_tag_list) { bool found = false; while(!found) { for(std::list, MPI_Comm* > >::iterator iter = tag_list.begin(); iter!=tag_list.end(); iter++) { if((*iter).first == make_pair(tag, min(leader_info[0], leader_info[1]))) { *newintercomm = iter->second[my_position]; found = true; break; } } } } MPI_Barrier(local_comm); if(is_local_leader) { int local_flag = true; int remote_flag = false; MPI_Request mpi_requests[2]; MPI_Status mpi_statuses[2]; MPI_Isend(&local_flag, 1, MPI_INT, remote_leader, tag, peer_comm, &mpi_requests[0]); MPI_Irecv(&remote_flag, 1, MPI_INT, remote_leader, tag, peer_comm, &mpi_requests[1]); MPI_Waitall(2, mpi_requests, mpi_statuses); } MPI_Barrier(local_comm); if(is_proc_master) { for(std::list, MPI_Comm* > >::iterator iter = tag_list.begin(); iter!=tag_list.end(); iter++) { if((*iter).first == make_pair(tag, min(leader_info[0], leader_info[1]))) { tag_list.erase(iter); break; } } } int intercomm_ep_rank, intercomm_ep_rank_loc, intercomm_mpi_rank; int intercomm_ep_size, intercomm_num_ep, intercomm_mpi_size; intercomm_ep_rank = (*newintercomm)->ep_comm_ptr->size_rank_info[0].first; intercomm_ep_rank_loc = (*newintercomm)->ep_comm_ptr->size_rank_info[1].first; intercomm_mpi_rank = (*newintercomm)->ep_comm_ptr->size_rank_info[2].first; intercomm_ep_size = (*newintercomm)->ep_comm_ptr->size_rank_info[0].second; intercomm_num_ep = (*newintercomm)->ep_comm_ptr->size_rank_info[1].second; intercomm_mpi_size = (*newintercomm)->ep_comm_ptr->size_rank_info[2].second; MPI_Bcast(&remote_ep_size, 1, MPI_INT, local_leader, local_comm); int my_rank_map_elem[2]; my_rank_map_elem[0] = intercomm_ep_rank; my_rank_map_elem[1] = (*newintercomm)->ep_comm_ptr->comm_label; vector > local_rank_map_array; vector > remote_rank_map_array; (*newintercomm)->ep_comm_ptr->intercomm->local_rank_map = new RANK_MAP; (*newintercomm)->ep_comm_ptr->intercomm->local_rank_map->resize(local_ep_size); MPI_Allgather(my_rank_map_elem, 2, MPI_INT, (*newintercomm)->ep_comm_ptr->intercomm->local_rank_map->data(), 2, MPI_INT, local_comm); (*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map = new RANK_MAP; (*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map->resize(remote_ep_size); (*newintercomm)->ep_comm_ptr->intercomm->size_rank_info[0] = local_comm->ep_comm_ptr->size_rank_info[0]; (*newintercomm)->ep_comm_ptr->intercomm->size_rank_info[1] = local_comm->ep_comm_ptr->size_rank_info[1]; (*newintercomm)->ep_comm_ptr->intercomm->size_rank_info[2] = local_comm->ep_comm_ptr->size_rank_info[2]; int local_intercomm_size = intercomm_ep_size; int remote_intercomm_size; int new_bcast_root_0 = 0; int new_bcast_root = 0; if(is_local_leader) { MPI_Request requests[4]; MPI_Status statuses[4]; MPI_Isend((*newintercomm)->ep_comm_ptr->intercomm->local_rank_map->data(), 2*local_ep_size, MPI_INT, remote_leader, tag+4, peer_comm, &requests[0]); MPI_Irecv((*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map->data(), 2*remote_ep_size, MPI_INT, remote_leader, tag+4, peer_comm, &requests[1]); MPI_Isend(&local_intercomm_size, 1, MPI_INT, remote_leader, tag+5, peer_comm, &requests[2]); MPI_Irecv(&remote_intercomm_size, 1, MPI_INT, remote_leader, tag+5, peer_comm, &requests[3]); MPI_Waitall(4, requests, statuses); new_bcast_root_0 = intercomm_ep_rank; } MPI_Allreduce(&new_bcast_root_0, &new_bcast_root, 1, MPI_INT, MPI_SUM, *newintercomm); MPI_Bcast((*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map->data(), 2*remote_ep_size, MPI_INT, local_leader, local_comm); MPI_Bcast(&remote_intercomm_size, 1, MPI_INT, new_bcast_root, *newintercomm); //(*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map = new RANK_MAP; //(*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->resize(remote_intercomm_size); if(is_local_leader) { MPI_Request requests[2]; MPI_Status statuses[2]; std::vector > > map2vec((*newintercomm)->ep_rank_map->size()); std::vector > > vec2map((*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->size()); int ii=0; for(std::map >::iterator it = (*newintercomm)->ep_rank_map->begin(); it != (*newintercomm)->ep_rank_map->end(); it++) { map2vec[ii++] = make_pair(it->first, make_pair(it->second.first, it->second.second)); } MPI_Isend(map2vec.data(), 3*local_intercomm_size, MPI_INT, remote_leader, tag+6, peer_comm, &requests[0]); MPI_Irecv(vec2map.data(), 3*remote_intercomm_size, MPI_INT, remote_leader, tag+6, peer_comm, &requests[1]); for(ii=0; iiep_comm_ptr->intercomm->intercomm_rank_map->at(vec2map[ii].first) = make_pair(vec2map[ii].second.first, vec2map[ii].second.second); } MPI_Waitall(2, requests, statuses); } //MPI_Bcast((*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->data(), 2*remote_intercomm_size, MPI_INT, new_bcast_root, *newintercomm); //(*newintercomm)->ep_comm_ptr->intercomm->local_comm = (local_comm->ep_comm_ptr->comm_list[ep_rank_loc]); (*newintercomm)->ep_comm_ptr->intercomm->intercomm_tag = tag; /* for(int i=0; iep_comm_ptr->comm_label == 0) printf("ep_rank (from EP) = %d, local_rank_map[%d] = (%d,%d)\n", intercomm_ep_rank, i, (*newintercomm)->ep_comm_ptr->intercomm->local_rank_map->at(i).first, (*newintercomm)->ep_comm_ptr->intercomm->local_rank_map->at(i).second); for(int i=0; iep_comm_ptr->comm_label == 0) printf("ep_rank (from EP) = %d, remote_rank_map[%d] = (%d,%d)\n", intercomm_ep_rank, i, (*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map->at(i).first, (*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map->at(i).second); for(int i=0; iep_comm_ptr->comm_label == 0) printf("ep_rank (from EP) = %d, intercomm_rank_map[%d] = (%d,%d)\n", intercomm_ep_rank, i, (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->at(i).first, (*newintercomm)->ep_comm_ptr->intercomm->intercomm_rank_map->at(i).second); */ // for(int i=0; i<(*newintercomm)->rank_map->size(); i++) // if(local_comm->ep_comm_ptr->comm_label != 99) printf("ep_rank = %d, rank_map[%d] = (%d,%d)\n", intercomm_ep_rank, i, // (*newintercomm)->rank_map->at(i).first, (*newintercomm)->rank_map->at(i).second); // MPI_Comm *test_comm = newintercomm->ep_comm_ptr->intercomm->local_comm; // int test_rank; // MPI_Comm_rank(*test_comm, &test_rank); // printf("=================test_rank = %d\n", test_rank); return MPI_SUCCESS; } int MPI_Intercomm_create_unique_leader(MPI_Comm local_comm, int local_leader, MPI_Comm peer_comm, int remote_leader, int tag, MPI_Comm *newintercomm) { //! mpi_size of local comm = 1 //! same world rank of leaders int ep_rank, ep_rank_loc, mpi_rank; int ep_size, num_ep, mpi_size; ep_rank = local_comm->ep_comm_ptr->size_rank_info[0].first; ep_rank_loc = local_comm->ep_comm_ptr->size_rank_info[1].first; mpi_rank = local_comm->ep_comm_ptr->size_rank_info[2].first; ep_size = local_comm->ep_comm_ptr->size_rank_info[0].second; num_ep = local_comm->ep_comm_ptr->size_rank_info[1].second; mpi_size = local_comm->ep_comm_ptr->size_rank_info[2].second; std::vector rank_info[4]; //! 0->rank_in_world of local_comm, 1->rank_in_local_parent of local_comm //! 2->rank_in_world of remote_comm, 3->rank_in_local_parent of remote_comm int rank_in_world; int rank_in_peer_mpi[2]; ::MPI_Comm_rank(to_mpi_comm(MPI_COMM_WORLD->mpi_comm), &rank_in_world); int local_num_ep = num_ep; int remote_num_ep; int total_num_ep; int leader_rank_in_peer[2]; int my_position; int tag_label[2]; vector send_buf(4); vector recv_buf(4); if(ep_rank == local_leader) { MPI_Status status; MPI_Comm_rank(peer_comm, &leader_rank_in_peer[0]); send_buf[0] = local_num_ep; send_buf[1] = leader_rank_in_peer[0]; MPI_Request req_s, req_r; MPI_Isend(send_buf.data(), 2, MPI_INT, remote_leader, tag, peer_comm, &req_s); MPI_Irecv(recv_buf.data(), 2, MPI_INT, remote_leader, tag, peer_comm, &req_r); MPI_Wait(&req_s, &status); MPI_Wait(&req_r, &status); recv_buf[2] = leader_rank_in_peer[0]; } MPI_Bcast(recv_buf.data(), 3, MPI_INT, local_leader, local_comm); remote_num_ep = recv_buf[0]; leader_rank_in_peer[1] = recv_buf[1]; leader_rank_in_peer[0] = recv_buf[2]; total_num_ep = local_num_ep + remote_num_ep; if(leader_rank_in_peer[0] < leader_rank_in_peer[1]) { my_position = ep_rank_loc; //! LEADER create EP if(ep_rank == local_leader) { ::MPI_Comm *mpi_dup = new ::MPI_Comm; ::MPI_Comm_dup(to_mpi_comm(local_comm->mpi_comm), mpi_dup); MPI_Comm *ep_intercomm; MPI_Info info; MPI_Comm_create_endpoints(mpi_dup, total_num_ep, info, ep_intercomm); for(int i=0; iis_intercomm = true; ep_intercomm[i]->ep_comm_ptr->intercomm = new ep_lib::ep_intercomm; ep_intercomm[i]->ep_comm_ptr->intercomm->mpi_inter_comm = 0; ep_intercomm[i]->ep_comm_ptr->comm_label = leader_rank_in_peer[0]; } tag_label[0] = TAG++; tag_label[1] = rank_in_world; #pragma omp critical (write_to_tag_list) tag_list.push_back(make_pair( make_pair(tag_label[0], tag_label[1]) , ep_intercomm)); MPI_Request req_s; MPI_Status sta_s; MPI_Isend(tag_label, 2, MPI_INT, remote_leader, tag, peer_comm, &req_s); MPI_Wait(&req_s, &sta_s); } } else { //! Wait for EP creation my_position = remote_num_ep + ep_rank_loc; if(ep_rank == local_leader) { MPI_Status status; MPI_Request req_r; MPI_Irecv(tag_label, 2, MPI_INT, remote_leader, tag, peer_comm, &req_r); MPI_Wait(&req_r, &status); } } MPI_Bcast(tag_label, 2, MPI_INT, local_leader, local_comm); #pragma omp critical (read_from_tag_list) { bool found = false; while(!found) { for(std::list, MPI_Comm* > >::iterator iter = tag_list.begin(); iter!=tag_list.end(); iter++) { if((*iter).first == make_pair(tag_label[0], tag_label[1])) { *newintercomm = iter->second[my_position]; found = true; // tag_list.erase(iter); break; } } } } MPI_Barrier_local(local_comm); if(leader_rank_in_peer[0] < leader_rank_in_peer[1]) { for(std::list, MPI_Comm* > >::iterator iter = tag_list.begin(); iter!=tag_list.end(); iter++) { if((*iter).first == make_pair(tag_label[0], tag_label[1])) { tag_list.erase(iter); break; } } } int intercomm_ep_rank, intercomm_ep_rank_loc, intercomm_mpi_rank; int intercomm_ep_size, intercomm_num_ep, intercomm_mpi_size; intercomm_ep_rank = (*newintercomm)->ep_comm_ptr->size_rank_info[0].first; intercomm_ep_rank_loc = (*newintercomm)->ep_comm_ptr->size_rank_info[1].first; intercomm_mpi_rank = (*newintercomm)->ep_comm_ptr->size_rank_info[2].first; intercomm_ep_size = (*newintercomm)->ep_comm_ptr->size_rank_info[0].second; intercomm_num_ep = (*newintercomm)->ep_comm_ptr->size_rank_info[1].second; intercomm_mpi_size = (*newintercomm)->ep_comm_ptr->size_rank_info[2].second; (*newintercomm)->ep_comm_ptr->intercomm->local_rank_map = new RANK_MAP; (*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map = new RANK_MAP; (*newintercomm)->ep_comm_ptr->intercomm->local_rank_map->resize(local_num_ep); (*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map->resize(remote_num_ep); (*newintercomm)->ep_comm_ptr->intercomm->size_rank_info[0] = local_comm->ep_comm_ptr->size_rank_info[0]; (*newintercomm)->ep_comm_ptr->intercomm->size_rank_info[1] = local_comm->ep_comm_ptr->size_rank_info[1]; (*newintercomm)->ep_comm_ptr->intercomm->size_rank_info[2] = local_comm->ep_comm_ptr->size_rank_info[2]; int local_rank_map_ele[2]; local_rank_map_ele[0] = intercomm_ep_rank; local_rank_map_ele[1] = (*newintercomm)->ep_comm_ptr->comm_label; MPI_Allgather(local_rank_map_ele, 2, MPI_INT, (*newintercomm)->ep_comm_ptr->intercomm->local_rank_map->data(), 2, MPI_INT, local_comm); if(ep_rank == local_leader) { MPI_Status status; MPI_Request req_s, req_r; MPI_Isend((*newintercomm)->ep_comm_ptr->intercomm->local_rank_map->data(), 2*local_num_ep, MPI_INT, remote_leader, tag, peer_comm, &req_s); MPI_Irecv((*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map->data(), 2*remote_num_ep, MPI_INT, remote_leader, tag, peer_comm, &req_r); MPI_Wait(&req_s, &status); MPI_Wait(&req_r, &status); } MPI_Bcast((*newintercomm)->ep_comm_ptr->intercomm->remote_rank_map->data(), 2*remote_num_ep, MPI_INT, local_leader, local_comm); //(*newintercomm)->ep_comm_ptr->intercomm->local_comm = (local_comm->ep_comm_ptr->comm_list[ep_rank_loc]); (*newintercomm)->ep_comm_ptr->intercomm->intercomm_tag = tag; return MPI_SUCCESS; } }