source: XIOS3/trunk/src/server.cpp @ 2591

Last change on this file since 2591 was 2591, checked in by jderouillat, 8 months ago

Free additional communicators

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 26.3 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "server.hpp"
[983]5#include "client.hpp"
[300]6#include "type.hpp"
7#include "context.hpp"
[352]8#include "object_template.hpp"
[300]9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[2418]15#include "mem_checker.hpp"
[492]16#include "event_scheduler.hpp"
[1587]17#include "string_tools.hpp"
[1761]18#include "ressources_manager.hpp"
19#include "services_manager.hpp"
20#include "contexts_manager.hpp"
21#include "servers_ressource.hpp"
[2333]22#include "services.hpp"
[2458]23#include "pool_node.hpp"
[1761]24#include <cstdio>
[2146]25#include "workflow_graph.hpp"
[2274]26#include "release_static_allocation.hpp"
[2547]27#include "thread_manager.hpp"
[2333]28#include <sys/stat.h>
29#include <unistd.h>
[300]30
[1761]31
[2274]32
[335]33namespace xios
[490]34{
[2332]35    MPI_Comm CServer::intraComm_ ;
[1761]36    MPI_Comm CServer::serversComm_ ;
[1639]37    std::list<MPI_Comm> CServer::interCommLeft ;
38    std::list<MPI_Comm> CServer::interCommRight ;
39    std::list<MPI_Comm> CServer::contextInterComms;
40    std::list<MPI_Comm> CServer::contextIntraComms;
[1021]41    int CServer::serverLevel = 0 ;
[1148]42    int CServer::nbContexts = 0;
[983]43    bool CServer::isRoot = false ;
[1077]44    int CServer::rank_ = INVALID_RANK;
[490]45    StdOFStream CServer::m_infoStream;
[523]46    StdOFStream CServer::m_errorStream;
[490]47    map<string,CContext*> CServer::contextList ;
[1152]48    vector<int> CServer::sndServerGlobalRanks;
[300]49    bool CServer::finished=false ;
50    bool CServer::is_MPI_Initialized ;
[597]51    CEventScheduler* CServer::eventScheduler = 0;
[1761]52    CServersRessource* CServer::serversRessource_=nullptr ;
[2335]53    CThirdPartyDriver* CServer::driver_ =nullptr ;
[983]54
[1765]55       
[1761]56    void CServer::initialize(void)
57    {
58     
59      MPI_Comm serverComm ;
60      int initialized ;
61      MPI_Initialized(&initialized) ;
62      if (initialized) is_MPI_Initialized=true ;
63      else is_MPI_Initialized=false ;
64      MPI_Comm globalComm=CXios::getGlobalComm() ;
65      /////////////////////////////////////////
66      ///////////// PART 1 ////////////////////
67      /////////////////////////////////////////
68      // don't use OASIS
69      if (!CXios::usingOasis)
70      {
[2547]71        if (!is_MPI_Initialized) 
72        {
73          int required = MPI_THREAD_SERIALIZED ;
74          int provided ;
75          MPI_Init_thread(NULL,NULL, required, &provided) ;
76        }
[1761]77       
78        // split the global communicator
79        // get hash from all model to attribute a unique color (int) and then split to get client communicator
80        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
81         
82        int commRank, commSize ;
83        MPI_Comm_rank(globalComm,&commRank) ;
84        MPI_Comm_size(globalComm,&commSize) ;
85
86        std::hash<string> hashString ;
87        size_t hashServer=hashString(CXios::xiosCodeId) ;
88         
89        size_t* hashAll = new size_t[commSize] ;
[2242]90        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
[1761]91         
92        int color=0 ;
[2242]93        map<size_t,int> listHash ;
94        for(int i=0 ; i<=commSize ; i++) 
95          if (listHash.count(hashAll[i])==0) 
[1761]96          {
[2242]97            listHash[hashAll[i]]=color ;
[1761]98            color=color+1 ;
99          }
[2242]100        color=listHash[hashServer] ;
[1761]101        delete[] hashAll ;
102
[2589]103        xios::MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
[2580]104        CXios::getMpiGarbageCollector().registerCommunicator(serverComm) ;
105
[1761]106      }
107      else // using OASIS
108      {
[2547]109       
110        if (!is_MPI_Initialized) 
111        {
112          int required = MPI_THREAD_SERIALIZED ;
113          int provided ;
114          MPI_Init_thread(NULL,NULL, required, &provided) ;
115        }
[1761]116
[2547]117        driver_ = new CThirdPartyDriver();
118
[2335]119        driver_->getComponentCommunicator( serverComm );
[1761]120      }
[2589]121      xios::MPI_Comm_dup(serverComm, &intraComm_);
[2580]122      CXios::getMpiGarbageCollector().registerCommunicator(intraComm_) ;
[2334]123     
[2290]124      CTimer::get("XIOS").resume() ;
[2437]125      CTimer::get("XIOS server").resume() ;
[2290]126      CTimer::get("XIOS initialize").resume() ;
[1761]127 
128      /////////////////////////////////////////
129      ///////////// PART 2 ////////////////////
130      /////////////////////////////////////////
131     
132
133      // Create the XIOS communicator for every process which is related
134      // to XIOS, as well on client side as on server side
135      MPI_Comm xiosGlobalComm ;
136      string strIds=CXios::getin<string>("clients_code_id","") ;
137      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
138      if (strIds.empty())
139      {
140        // no code Ids given, suppose XIOS initialisation is global           
141        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
142        MPI_Comm splitComm,interComm ;
143        MPI_Comm_rank(globalComm,&commGlobalRank) ;
[2589]144        xios::MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
[1761]145        MPI_Comm_rank(splitComm,&commRank) ;
146        if (commRank==0) serverLeader=commGlobalRank ;
147        else serverLeader=0 ;
148        clientLeader=0 ;
149        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
150        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
[2589]151        xios::MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
152        xios::MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
[1761]153        CXios::setXiosComm(xiosGlobalComm) ;
[2591]154
155        xios::MPI_Comm_free( &interComm );
156        xios::MPI_Comm_free( &splitComm );
[1761]157      }
158      else
159      {
160
161        xiosGlobalCommByFileExchange(serverComm) ;
162
163      }
164     
165      /////////////////////////////////////////
166      ///////////// PART 4 ////////////////////
167      //  create servers intra communicator  //
168      /////////////////////////////////////////
169     
170      int commRank ;
171      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
[2589]172      xios::MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
[2580]173      CXios::getMpiGarbageCollector().registerCommunicator(serversComm_) ;
[1761]174     
175      CXios::setUsingServer() ;
176
177      /////////////////////////////////////////
178      ///////////// PART 5 ////////////////////
179      //       redirect files output         //
180      /////////////////////////////////////////
181     
182      CServer::openInfoStream(CXios::serverFile);
183      CServer::openErrorStream(CXios::serverFile);
184
[2418]185      CMemChecker::logMem( "CServer::initialize" );
186
[1761]187      /////////////////////////////////////////
188      ///////////// PART 4 ////////////////////
189      /////////////////////////////////////////
190
191      CXios::launchDaemonsManager(true) ;
192     
193      /////////////////////////////////////////
194      ///////////// PART 5 ////////////////////
195      /////////////////////////////////////////
196
197      // create the services
198
199      auto ressourcesManager=CXios::getRessourcesManager() ;
200      auto servicesManager=CXios::getServicesManager() ;
201      auto contextsManager=CXios::getContextsManager() ;
202      auto daemonsManager=CXios::getDaemonsManager() ;
203      auto serversRessource=CServer::getServersRessource() ;
204
[2333]205      int rank;
206      MPI_Comm_rank(intraComm_, &rank) ;
207      if (rank==0) isRoot=true;
208      else isRoot=false;
209
[1761]210      if (serversRessource->isServerLeader())
211      {
[2458]212        // creating pool
213        CPoolNodeGroup::get("xios","pool_definition")->solveDescInheritance(true) ;
214        vector<CPoolNode*> pools = CPoolNodeGroup::get("xios","pool_definition")->getAllChildren();
215        for(auto& pool : pools) pool->allocateRessources() ;
216       
217        int nbRessources = ressourcesManager->getFreeRessourcesSize() ;
218        if (nbRessources>0)
[1761]219        {
[2458]220          if (!CXios::usingServer2)
221          {
222            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
[2547]223            if (CThreadManager::isUsingThreads()) 
224              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
225              {
226                daemonsManager->eventLoop() ;
227                CThreadManager::yield() ;
228              }
229            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
230         
[2458]231            servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ;
[2547]232            if (CThreadManager::isUsingThreads()) 
233              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,0)) 
234              {
235                daemonsManager->eventLoop() ;
236                CThreadManager::yield() ;
237              }
238            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
239           
[2458]240            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ;
[2547]241            if (CThreadManager::isUsingThreads()) 
242            {
243              daemonsManager->eventLoop() ;
244              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) CThreadManager::yield() ;
245            }
246            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
[2458]247          }
248          else
249          {
250            int nprocsServer = nbRessources*CXios::ratioServer2/100.;
251            int nprocsGatherer = nbRessources - nprocsServer ;
[1761]252         
[2458]253            int nbPoolsServer2 = CXios::nbPoolsServer2 ;
254            if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
255            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
[2547]256            if (CThreadManager::isUsingThreads()) 
257              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
258              {
259                daemonsManager->eventLoop() ;
260                CThreadManager::yield() ;
261              }
262            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
263
[2458]264            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
[2547]265            if (CThreadManager::isUsingThreads()) 
266              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultGathererId,0)) 
267              {
268                daemonsManager->eventLoop() ;
269                CThreadManager::yield() ;
270              }
271            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ;
272
[2458]273            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;
[2547]274            if (CThreadManager::isUsingThreads()) 
275              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) 
276              {
277                daemonsManager->eventLoop() ;
278                CThreadManager::yield() ;
279              }
280            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
281           
[2458]282            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;
[2547]283            if (CThreadManager::isUsingThreads())
284              for(int i=0; i<nbPoolsServer2; i++)
285                while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,i)) 
286                {
287                  daemonsManager->eventLoop() ;
288                  CThreadManager::yield() ;
289                }
290            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
[2458]291          }
[1761]292        }
[2407]293//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
[1761]294      }
[2523]295
296      MPI_Request req ;
297      MPI_Status status ;
298      MPI_Ibarrier(getServersRessource()->getCommunicator(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
299      int ok=false ;
300      while (!ok)
301      {
302        daemonsManager->eventLoop() ;
[2547]303        if (CThreadManager::isUsingThreads()) CThreadManager::yield();
[2523]304        MPI_Test(&req,&ok,&status) ;
305      }
306
307
[2547]308      //testingEventScheduler() ;
[2458]309/*
310      MPI_Request req ;
311      MPI_Status status ;
[2523]312      MPI_Ibarrier(CXios::getXiosComm(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
[2458]313      int ok=false ;
314      while (!ok)
315      {
316        daemonsManager->eventLoop() ;
317        MPI_Test(&req,&ok,&status) ;
318      }
319*/
[2242]320      CTimer::get("XIOS initialize").suspend() ;
[1761]321
322      /////////////////////////////////////////
323      ///////////// PART 5 ////////////////////
324      /////////////////////////////////////////
325      // loop on event loop
326
327      bool finished=false ;
[2242]328      CTimer::get("XIOS event loop").resume() ;
329
[1761]330      while (!finished)
331      {
332        finished=daemonsManager->eventLoop() ;
[2547]333        if (CThreadManager::isUsingThreads()) CThreadManager::yield() ;
[1761]334      }
[2242]335      CTimer::get("XIOS event loop").suspend() ;
[2243]336
337      // Delete CContext
[2274]338      //CObjectTemplate<CContext>::cleanStaticDataStructure();
[1761]339    }
340
341
[2523]342    void CServer::testingEventScheduler(void)
343    {
344      CXios::getPoolRessource()->getEventScheduler()->registerEvent(1,10) ;
345      CXios::getPoolRessource()->getEventScheduler()->registerEvent(2,10) ;
346      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
347      {
348        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(1,100) ;
349        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(2,100) ;
350        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(3,100) ;
351      }
352      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
353      {
354        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(1,1000) ;
355        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(2,1000) ;
356      }
357      CXios::getPoolRessource()->getEventScheduler()->registerEvent(3,10) ;
358      CXios::getPoolRessource()->getEventScheduler()->registerEvent(4,10) ;
359     
360      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
361      {
362        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(4,100) ;
363        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(5,100) ;
364      }
365      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
366      {
367        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(3,1000) ;
368        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(4,1000) ;
369        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(5,1000) ;
370      }
371      CXios::getPoolRessource()->getEventScheduler()->registerEvent(5,10) ;
372      CXios::getPoolRessource()->getEventScheduler()->registerEvent(6,10) ;
373     
374      int numEvents=0 ;
375      int poolEvent=1 ;
376      int gatherEvent=1 ;
377      int writerEvent=1 ;
378      do
379      {
380        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(poolEvent,10))
381        {
382          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
383          MPI_Barrier(CXios::getPoolRessource()->getCommunicator());
384          poolEvent++ ;
385          numEvents++;
386        }
387       
388        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(gatherEvent,100))
389        {
390          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
391          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)->getCommunicator());
392          gatherEvent++ ;
393          numEvents++;
394        }
[1761]395
[2523]396        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(writerEvent,1000))
397        {
398          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
399          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)->getCommunicator());
400          writerEvent++ ;
401          numEvents++;
402        }
[1761]403
[2523]404       
405      } while (numEvents!=11) ;
[1761]406
[2523]407    }
408
409
[1761]410    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
411    {
412       
413      MPI_Comm globalComm=CXios::getGlobalComm() ;
414      MPI_Comm xiosGlobalComm ;
415     
416      string strIds=CXios::getin<string>("clients_code_id","") ;
417      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
418     
419      int commRank, globalRank ;
420      MPI_Comm_rank(serverComm, &commRank) ;
421      MPI_Comm_rank(globalComm, &globalRank) ;
422      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
423
424      if (commRank==0) // if root process publish name
425      { 
426        std::ofstream ofs (serverFileName, std::ofstream::out);
427        ofs<<globalRank ;
428        ofs.close();
429      }
430       
431      vector<int> clientsRank(clientsCodeId.size()) ;
432      for(int i=0;i<clientsRank.size();i++)
433      {
434        std::ifstream ifs ;
435        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
[2333]436        struct stat buffer;
437        do {
438        } while( stat(fileName.c_str(), &buffer) != 0 );
439        sleep(1);
440        ifs.open(fileName, ifstream::in) ;
[1761]441        ifs>>clientsRank[i] ;
[2333]442        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
[1761]443        ifs.close() ; 
444      }
445
446      MPI_Comm intraComm ;
[2589]447      xios::MPI_Comm_dup(serverComm,&intraComm) ;
[1761]448      MPI_Comm interComm ;
449      for(int i=0 ; i<clientsRank.size(); i++)
450      { 
[2589]451        xios::MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
[2580]452        CXios::getMpiGarbageCollector().registerCommunicator(interComm) ;
[2333]453        interCommLeft.push_back(interComm) ;
[2589]454        xios::MPI_Comm_free(&intraComm) ;
455        xios::MPI_Intercomm_merge(interComm,false, &intraComm ) ;
[1761]456      }
457      xiosGlobalComm=intraComm ; 
458      MPI_Barrier(xiosGlobalComm);
459      if (commRank==0) std::remove(serverFileName.c_str()) ;
460      MPI_Barrier(xiosGlobalComm);
461
462      CXios::setXiosComm(xiosGlobalComm) ;
463     
464    }
465
466
467    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
468    {
469        // untested, need to be tested on a true MPI-2 compliant library
470
471        // try to discover other client/server
472/*
473        // publish server name
474        char portName[MPI_MAX_PORT_NAME];
475        int ierr ;
476        int commRank ;
477        MPI_Comm_rank(serverComm, &commRank) ;
478       
479        if (commRank==0) // if root process publish name
480        { 
481          MPI_Open_port(MPI_INFO_NULL, portName);
482          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
483        }
484
485        MPI_Comm intraComm=serverComm ;
486        MPI_Comm interComm ;
487        for(int i=0 ; i<clientsCodeId.size(); i++)
488        { 
489          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
[2589]490          xios::MPI_Intercomm_merge(interComm,false, &intraComm ) ;
[1761]491        }
492*/     
493    }
494
[2333]495   /*!
496    * Root process is listening for an order sent by client to call "oasis_enddef".
497    * The root client of a compound send the order (tag 5). It is probed and received.
498    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
499    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
500    */
501   
502     void CServer::listenOasisEnddef(void)
503     {
504        int flag ;
505        MPI_Status status ;
506        list<MPI_Comm>::iterator it;
507        int msg ;
508        static int nbCompound=0 ;
509        int size ;
510        static bool sent=false ;
511        static MPI_Request* allRequests ;
512        static MPI_Status* allStatus ;
[490]513
[2333]514
515        if (sent)
516        {
517          MPI_Comm_size(intraComm_,&size) ;
518          MPI_Testall(size,allRequests, &flag, allStatus) ;
519          if (flag==true)
520          {
521            delete [] allRequests ;
522            delete [] allStatus ;
523            sent=false ;
524          }
525        }
526       
527
528        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
529        {
530           MPI_Status status ;
531           traceOff() ;
532           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
533           traceOn() ;
534           if (flag==true)
535           {
536              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
537              nbCompound++ ;
538              if (nbCompound==interCommLeft.size())
539              {
540                MPI_Comm_size(intraComm_,&size) ;
541                allRequests= new MPI_Request[size] ;
542                allStatus= new MPI_Status[size] ;
543                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
544                sent=true ;
545              }
546           }
547        }
548}
549     
550   /*!
551    * Processes probes message from root process if oasis_enddef call must be done.
552    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
553    */
554     void CServer::listenRootOasisEnddef(void)
555     {
556       int flag ;
557       MPI_Status status ;
558       const int root=0 ;
559       int msg ;
560       static bool eventSent=false ;
561
562       if (eventSent)
563       {
564         boost::hash<string> hashString;
565         size_t hashId = hashString("oasis_enddef");
[2523]566
567         if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(0,hashId))
[2333]568         {
[2523]569           CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
[2335]570           driver_->endSynchronizedDefinition() ;
[2333]571           eventSent=false ;
572         }
573       }
574         
575       traceOff() ;
576       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
577       traceOn() ;
578       if (flag==true)
579       {
580           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
581           boost::hash<string> hashString;
582           size_t hashId = hashString("oasis_enddef");
[2523]583           CXios::getPoolRessource()->getEventScheduler()->registerEvent(0,hashId);
[2333]584           eventSent=true ;
585       }
586     }
587
[300]588    void CServer::finalize(void)
589    {
[361]590      CTimer::get("XIOS").suspend() ;
[2399]591      CTimer::get("XIOS server").suspend() ;
[492]592      delete eventScheduler ;
[655]593
[1639]594      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
[2589]595        xios::MPI_Comm_free(&(*it));
[983]596
[1639]597      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
[2589]598        xios::MPI_Comm_free(&(*it));
[1071]599
[1639]600        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
[2589]601          xios::MPI_Comm_free(&(*it));
[992]602
[2589]603//      xios::MPI_Comm_free(&intraComm);
[2266]604      CXios::finalizeDaemonsManager();
[2274]605      finalizeServersRessource();
[1764]606     
[2274]607      CContext::removeAllContexts() ; // free memory for related context
[2580]608     
[2310]609      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
[2580]610      MPI_Comm xiosComm=CXios::getXiosComm() ;
[2589]611      xios::MPI_Comm_free(&xiosComm) ;
[2420]612      CMemChecker::logMem( "CServer::finalize", true );
[2576]613     
614      CCommTrack::dumpComm() ;
615
[300]616      if (!is_MPI_Initialized)
[490]617      {
[2335]618        if (CXios::usingOasis) delete driver_;
[1639]619        else MPI_Finalize() ;
[300]620      }
[347]621      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
622      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
623      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[1158]624      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
[2535]625      if (CXios::reportMemory)
626      {
627        report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
628      }
[2274]629     
[2146]630      CWorkflowGraph::drawWorkFlowGraph_server();
[2274]631      xios::releaseStaticAllocation() ; // free memory from static allocation
[300]632    }
[490]633
[523]634    /*!
635    * Open a file specified by a suffix and an extension and use it for the given file buffer.
636    * The file name will be suffix+rank+extension.
637    *
638    * \param fileName[in] protype file name
639    * \param ext [in] extension of the file
640    * \param fb [in/out] the file buffer
641    */
642    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
643    {
[1761]644      StdStringStream fileNameServer;
[523]645      int numDigit = 0;
[1761]646      int commSize = 0;
647      int commRank ;
[1021]648      int id;
[1761]649     
650      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
651      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
652
653      while (commSize)
[523]654      {
[1761]655        commSize /= 10;
[523]656        ++numDigit;
657      }
[1761]658      id = commRank;
[497]659
[1761]660      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
661      fb->open(fileNameServer.str().c_str(), std::ios::out);
[523]662      if (!fb->is_open())
663        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
[1761]664              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
[523]665    }
[490]666
[523]667    /*!
668    * \brief Open a file stream to write the info logs
669    * Open a file stream with a specific file name suffix+rank
670    * to write the info logs.
671    * \param fileName [in] protype file name
672    */
673    void CServer::openInfoStream(const StdString& fileName)
674    {
675      std::filebuf* fb = m_infoStream.rdbuf();
676      openStream(fileName, ".out", fb);
[490]677
[523]678      info.write2File(fb);
679      report.write2File(fb);
680    }
[490]681
[523]682    //! Write the info logs to standard output
683    void CServer::openInfoStream()
684    {
685      info.write2StdOut();
686      report.write2StdOut();
687    }
[490]688
[523]689    //! Close the info logs file if it opens
690    void CServer::closeInfoStream()
691    {
692      if (m_infoStream.is_open()) m_infoStream.close();
693    }
694
695    /*!
696    * \brief Open a file stream to write the error log
697    * Open a file stream with a specific file name suffix+rank
698    * to write the error log.
699    * \param fileName [in] protype file name
700    */
701    void CServer::openErrorStream(const StdString& fileName)
702    {
703      std::filebuf* fb = m_errorStream.rdbuf();
704      openStream(fileName, ".err", fb);
705
706      error.write2File(fb);
707    }
708
709    //! Write the error log to standard error output
710    void CServer::openErrorStream()
711    {
712      error.write2StdErr();
713    }
714
715    //! Close the error log file if it opens
716    void CServer::closeErrorStream()
717    {
718      if (m_errorStream.is_open()) m_errorStream.close();
719    }
[1761]720
721    void CServer::launchServersRessource(MPI_Comm serverComm)
722    {
723      serversRessource_ = new CServersRessource(serverComm) ;
724    }
[2274]725
726    void  CServer::finalizeServersRessource(void) 
727    { 
728      delete serversRessource_; serversRessource_=nullptr ;
729    }
[300]730}
Note: See TracBrowser for help on using the repository browser.