source: XIOS3/trunk/src/server.cpp

Last change on this file was 2629, checked in by jderouillat, 2 months ago

Delete boost dependencies, the few features used are replaced by functions stored in extern/boost_extraction

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 28.7 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "server.hpp"
[983]5#include "client.hpp"
[300]6#include "type.hpp"
7#include "context.hpp"
[352]8#include "object_template.hpp"
[300]9#include "oasis_cinterface.hpp"
[382]10#include "mpi.hpp"
[347]11#include "tracer.hpp"
12#include "timer.hpp"
[2418]13#include "mem_checker.hpp"
[492]14#include "event_scheduler.hpp"
[1587]15#include "string_tools.hpp"
[1761]16#include "ressources_manager.hpp"
17#include "services_manager.hpp"
18#include "contexts_manager.hpp"
19#include "servers_ressource.hpp"
[2333]20#include "services.hpp"
[2458]21#include "pool_node.hpp"
[1761]22#include <cstdio>
[2146]23#include "workflow_graph.hpp"
[2274]24#include "release_static_allocation.hpp"
[2547]25#include "thread_manager.hpp"
[2333]26#include <sys/stat.h>
27#include <unistd.h>
[300]28
[1761]29
[2274]30
[335]31namespace xios
[490]32{
[2332]33    MPI_Comm CServer::intraComm_ ;
[1761]34    MPI_Comm CServer::serversComm_ ;
[1639]35    std::list<MPI_Comm> CServer::interCommLeft ;
36    std::list<MPI_Comm> CServer::interCommRight ;
37    std::list<MPI_Comm> CServer::contextInterComms;
38    std::list<MPI_Comm> CServer::contextIntraComms;
[1021]39    int CServer::serverLevel = 0 ;
[1148]40    int CServer::nbContexts = 0;
[983]41    bool CServer::isRoot = false ;
[1077]42    int CServer::rank_ = INVALID_RANK;
[490]43    StdOFStream CServer::m_infoStream;
[523]44    StdOFStream CServer::m_errorStream;
[490]45    map<string,CContext*> CServer::contextList ;
[1152]46    vector<int> CServer::sndServerGlobalRanks;
[300]47    bool CServer::finished=false ;
48    bool CServer::is_MPI_Initialized ;
[597]49    CEventScheduler* CServer::eventScheduler = 0;
[1761]50    CServersRessource* CServer::serversRessource_=nullptr ;
[2335]51    CThirdPartyDriver* CServer::driver_ =nullptr ;
[2628]52    extern CLogType logTimers ;
[1765]53       
[1761]54    void CServer::initialize(void)
55    {
56     
57      MPI_Comm serverComm ;
58      int initialized ;
59      MPI_Initialized(&initialized) ;
60      if (initialized) is_MPI_Initialized=true ;
61      else is_MPI_Initialized=false ;
62      MPI_Comm globalComm=CXios::getGlobalComm() ;
63      /////////////////////////////////////////
64      ///////////// PART 1 ////////////////////
65      /////////////////////////////////////////
66      // don't use OASIS
67      if (!CXios::usingOasis)
68      {
[2547]69        if (!is_MPI_Initialized) 
70        {
71          int required = MPI_THREAD_SERIALIZED ;
72          int provided ;
73          MPI_Init_thread(NULL,NULL, required, &provided) ;
74        }
[1761]75       
76        // split the global communicator
77        // get hash from all model to attribute a unique color (int) and then split to get client communicator
78        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
79         
80        int commRank, commSize ;
81        MPI_Comm_rank(globalComm,&commRank) ;
82        MPI_Comm_size(globalComm,&commSize) ;
83
84        std::hash<string> hashString ;
85        size_t hashServer=hashString(CXios::xiosCodeId) ;
86         
87        size_t* hashAll = new size_t[commSize] ;
[2242]88        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
[1761]89         
90        int color=0 ;
[2242]91        map<size_t,int> listHash ;
92        for(int i=0 ; i<=commSize ; i++) 
93          if (listHash.count(hashAll[i])==0) 
[1761]94          {
[2242]95            listHash[hashAll[i]]=color ;
[1761]96            color=color+1 ;
97          }
[2242]98        color=listHash[hashServer] ;
[1761]99        delete[] hashAll ;
100
[2589]101        xios::MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
[2580]102        CXios::getMpiGarbageCollector().registerCommunicator(serverComm) ;
103
[1761]104      }
105      else // using OASIS
106      {
[2547]107       
108        if (!is_MPI_Initialized) 
109        {
110          int required = MPI_THREAD_SERIALIZED ;
111          int provided ;
112          MPI_Init_thread(NULL,NULL, required, &provided) ;
113        }
[1761]114
[2547]115        driver_ = new CThirdPartyDriver();
116
[2335]117        driver_->getComponentCommunicator( serverComm );
[1761]118      }
[2589]119      xios::MPI_Comm_dup(serverComm, &intraComm_);
[2580]120      CXios::getMpiGarbageCollector().registerCommunicator(intraComm_) ;
[2334]121     
[2290]122      CTimer::get("XIOS").resume() ;
[2437]123      CTimer::get("XIOS server").resume() ;
[2290]124      CTimer::get("XIOS initialize").resume() ;
[1761]125 
126      /////////////////////////////////////////
127      ///////////// PART 2 ////////////////////
128      /////////////////////////////////////////
129     
130
131      // Create the XIOS communicator for every process which is related
132      // to XIOS, as well on client side as on server side
133      MPI_Comm xiosGlobalComm ;
134      string strIds=CXios::getin<string>("clients_code_id","") ;
135      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
136      if (strIds.empty())
137      {
138        // no code Ids given, suppose XIOS initialisation is global           
139        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
140        MPI_Comm splitComm,interComm ;
141        MPI_Comm_rank(globalComm,&commGlobalRank) ;
[2589]142        xios::MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
[1761]143        MPI_Comm_rank(splitComm,&commRank) ;
144        if (commRank==0) serverLeader=commGlobalRank ;
145        else serverLeader=0 ;
146        clientLeader=0 ;
147        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
148        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
[2589]149        xios::MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
150        xios::MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
[1761]151        CXios::setXiosComm(xiosGlobalComm) ;
[2591]152
153        xios::MPI_Comm_free( &interComm );
154        xios::MPI_Comm_free( &splitComm );
[1761]155      }
156      else
157      {
158
159        xiosGlobalCommByFileExchange(serverComm) ;
160
161      }
162     
163      /////////////////////////////////////////
164      ///////////// PART 4 ////////////////////
165      //  create servers intra communicator  //
166      /////////////////////////////////////////
167     
168      int commRank ;
169      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
[2589]170      xios::MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
[2580]171      CXios::getMpiGarbageCollector().registerCommunicator(serversComm_) ;
[1761]172     
173      CXios::setUsingServer() ;
174
175      /////////////////////////////////////////
176      ///////////// PART 5 ////////////////////
177      //       redirect files output         //
178      /////////////////////////////////////////
179     
180      CServer::openInfoStream(CXios::serverFile);
181      CServer::openErrorStream(CXios::serverFile);
182
[2418]183      CMemChecker::logMem( "CServer::initialize" );
184
[1761]185      /////////////////////////////////////////
186      ///////////// PART 4 ////////////////////
187      /////////////////////////////////////////
188
189      CXios::launchDaemonsManager(true) ;
190     
191      /////////////////////////////////////////
192      ///////////// PART 5 ////////////////////
193      /////////////////////////////////////////
194
195      // create the services
196
197      auto ressourcesManager=CXios::getRessourcesManager() ;
198      auto servicesManager=CXios::getServicesManager() ;
199      auto contextsManager=CXios::getContextsManager() ;
200      auto daemonsManager=CXios::getDaemonsManager() ;
201      auto serversRessource=CServer::getServersRessource() ;
202
[2333]203      int rank;
204      MPI_Comm_rank(intraComm_, &rank) ;
205      if (rank==0) isRoot=true;
206      else isRoot=false;
207
[1761]208      if (serversRessource->isServerLeader())
209      {
[2458]210        // creating pool
211        CPoolNodeGroup::get("xios","pool_definition")->solveDescInheritance(true) ;
212        vector<CPoolNode*> pools = CPoolNodeGroup::get("xios","pool_definition")->getAllChildren();
213        for(auto& pool : pools) pool->allocateRessources() ;
214       
215        int nbRessources = ressourcesManager->getFreeRessourcesSize() ;
216        if (nbRessources>0)
[1761]217        {
[2458]218          if (!CXios::usingServer2)
219          {
220            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
[2547]221            if (CThreadManager::isUsingThreads()) 
222              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
223              {
224                daemonsManager->eventLoop() ;
225                CThreadManager::yield() ;
226              }
227            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
228         
[2458]229            servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ;
[2547]230            if (CThreadManager::isUsingThreads()) 
231              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,0)) 
232              {
233                daemonsManager->eventLoop() ;
234                CThreadManager::yield() ;
235              }
236            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
237           
[2458]238            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ;
[2547]239            if (CThreadManager::isUsingThreads()) 
240            {
241              daemonsManager->eventLoop() ;
242              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) CThreadManager::yield() ;
243            }
244            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
[2458]245          }
246          else
247          {
248            int nprocsServer = nbRessources*CXios::ratioServer2/100.;
249            int nprocsGatherer = nbRessources - nprocsServer ;
[1761]250         
[2458]251            int nbPoolsServer2 = CXios::nbPoolsServer2 ;
252            if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
253            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
[2547]254            if (CThreadManager::isUsingThreads()) 
255              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
256              {
257                daemonsManager->eventLoop() ;
258                CThreadManager::yield() ;
259              }
260            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
261
[2458]262            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
[2547]263            if (CThreadManager::isUsingThreads()) 
264              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultGathererId,0)) 
265              {
266                daemonsManager->eventLoop() ;
267                CThreadManager::yield() ;
268              }
269            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ;
270
[2458]271            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;
[2547]272            if (CThreadManager::isUsingThreads()) 
273              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) 
274              {
275                daemonsManager->eventLoop() ;
276                CThreadManager::yield() ;
277              }
278            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
279           
[2458]280            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;
[2547]281            if (CThreadManager::isUsingThreads())
282              for(int i=0; i<nbPoolsServer2; i++)
283                while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,i)) 
284                {
285                  daemonsManager->eventLoop() ;
286                  CThreadManager::yield() ;
287                }
288            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
[2458]289          }
[1761]290        }
[2407]291//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
[1761]292      }
[2523]293
294      MPI_Request req ;
295      MPI_Status status ;
296      MPI_Ibarrier(getServersRessource()->getCommunicator(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
297      int ok=false ;
298      while (!ok)
299      {
300        daemonsManager->eventLoop() ;
[2547]301        if (CThreadManager::isUsingThreads()) CThreadManager::yield();
[2523]302        MPI_Test(&req,&ok,&status) ;
303      }
304
305
[2547]306      //testingEventScheduler() ;
[2458]307/*
308      MPI_Request req ;
309      MPI_Status status ;
[2523]310      MPI_Ibarrier(CXios::getXiosComm(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
[2458]311      int ok=false ;
312      while (!ok)
313      {
314        daemonsManager->eventLoop() ;
315        MPI_Test(&req,&ok,&status) ;
316      }
317*/
[2242]318      CTimer::get("XIOS initialize").suspend() ;
[1761]319
320      /////////////////////////////////////////
321      ///////////// PART 5 ////////////////////
322      /////////////////////////////////////////
323      // loop on event loop
324
325      bool finished=false ;
[2242]326      CTimer::get("XIOS event loop").resume() ;
327
[1761]328      while (!finished)
329      {
330        finished=daemonsManager->eventLoop() ;
[2547]331        if (CThreadManager::isUsingThreads()) CThreadManager::yield() ;
[1761]332      }
[2242]333      CTimer::get("XIOS event loop").suspend() ;
[2243]334
335      // Delete CContext
[2274]336      //CObjectTemplate<CContext>::cleanStaticDataStructure();
[1761]337    }
338
339
[2523]340    void CServer::testingEventScheduler(void)
341    {
342      CXios::getPoolRessource()->getEventScheduler()->registerEvent(1,10) ;
343      CXios::getPoolRessource()->getEventScheduler()->registerEvent(2,10) ;
344      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
345      {
346        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(1,100) ;
347        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(2,100) ;
348        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(3,100) ;
349      }
350      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
351      {
352        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(1,1000) ;
353        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(2,1000) ;
354      }
355      CXios::getPoolRessource()->getEventScheduler()->registerEvent(3,10) ;
356      CXios::getPoolRessource()->getEventScheduler()->registerEvent(4,10) ;
357     
358      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
359      {
360        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(4,100) ;
361        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(5,100) ;
362      }
363      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
364      {
365        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(3,1000) ;
366        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(4,1000) ;
367        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(5,1000) ;
368      }
369      CXios::getPoolRessource()->getEventScheduler()->registerEvent(5,10) ;
370      CXios::getPoolRessource()->getEventScheduler()->registerEvent(6,10) ;
371     
372      int numEvents=0 ;
373      int poolEvent=1 ;
374      int gatherEvent=1 ;
375      int writerEvent=1 ;
376      do
377      {
378        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(poolEvent,10))
379        {
380          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
381          MPI_Barrier(CXios::getPoolRessource()->getCommunicator());
382          poolEvent++ ;
383          numEvents++;
384        }
385       
386        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(gatherEvent,100))
387        {
388          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
389          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)->getCommunicator());
390          gatherEvent++ ;
391          numEvents++;
392        }
[1761]393
[2523]394        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(writerEvent,1000))
395        {
396          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
397          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)->getCommunicator());
398          writerEvent++ ;
399          numEvents++;
400        }
[1761]401
[2523]402       
403      } while (numEvents!=11) ;
[1761]404
[2523]405    }
406
407
[1761]408    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
409    {
410       
411      MPI_Comm globalComm=CXios::getGlobalComm() ;
412      MPI_Comm xiosGlobalComm ;
413     
414      string strIds=CXios::getin<string>("clients_code_id","") ;
415      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
416     
417      int commRank, globalRank ;
418      MPI_Comm_rank(serverComm, &commRank) ;
419      MPI_Comm_rank(globalComm, &globalRank) ;
420      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
421
422      if (commRank==0) // if root process publish name
423      { 
424        std::ofstream ofs (serverFileName, std::ofstream::out);
425        ofs<<globalRank ;
426        ofs.close();
427      }
428       
429      vector<int> clientsRank(clientsCodeId.size()) ;
430      for(int i=0;i<clientsRank.size();i++)
431      {
432        std::ifstream ifs ;
433        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
[2333]434        struct stat buffer;
435        do {
436        } while( stat(fileName.c_str(), &buffer) != 0 );
437        sleep(1);
438        ifs.open(fileName, ifstream::in) ;
[1761]439        ifs>>clientsRank[i] ;
[2333]440        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
[1761]441        ifs.close() ; 
442      }
443
444      MPI_Comm intraComm ;
[2589]445      xios::MPI_Comm_dup(serverComm,&intraComm) ;
[1761]446      MPI_Comm interComm ;
447      for(int i=0 ; i<clientsRank.size(); i++)
448      { 
[2589]449        xios::MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
[2580]450        CXios::getMpiGarbageCollector().registerCommunicator(interComm) ;
[2333]451        interCommLeft.push_back(interComm) ;
[2589]452        xios::MPI_Comm_free(&intraComm) ;
453        xios::MPI_Intercomm_merge(interComm,false, &intraComm ) ;
[1761]454      }
455      xiosGlobalComm=intraComm ; 
456      MPI_Barrier(xiosGlobalComm);
457      if (commRank==0) std::remove(serverFileName.c_str()) ;
458      MPI_Barrier(xiosGlobalComm);
459
460      CXios::setXiosComm(xiosGlobalComm) ;
461     
462    }
463
464
465    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
466    {
467        // untested, need to be tested on a true MPI-2 compliant library
468
469        // try to discover other client/server
470/*
471        // publish server name
472        char portName[MPI_MAX_PORT_NAME];
473        int ierr ;
474        int commRank ;
475        MPI_Comm_rank(serverComm, &commRank) ;
476       
477        if (commRank==0) // if root process publish name
478        { 
479          MPI_Open_port(MPI_INFO_NULL, portName);
480          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
481        }
482
483        MPI_Comm intraComm=serverComm ;
484        MPI_Comm interComm ;
485        for(int i=0 ; i<clientsCodeId.size(); i++)
486        { 
487          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
[2589]488          xios::MPI_Intercomm_merge(interComm,false, &intraComm ) ;
[1761]489        }
490*/     
491    }
492
[2333]493   /*!
494    * Root process is listening for an order sent by client to call "oasis_enddef".
495    * The root client of a compound send the order (tag 5). It is probed and received.
496    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
497    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
498    */
499   
500     void CServer::listenOasisEnddef(void)
501     {
502        int flag ;
503        MPI_Status status ;
504        list<MPI_Comm>::iterator it;
505        int msg ;
506        static int nbCompound=0 ;
507        int size ;
508        static bool sent=false ;
509        static MPI_Request* allRequests ;
510        static MPI_Status* allStatus ;
[490]511
[2333]512
513        if (sent)
514        {
515          MPI_Comm_size(intraComm_,&size) ;
516          MPI_Testall(size,allRequests, &flag, allStatus) ;
517          if (flag==true)
518          {
519            delete [] allRequests ;
520            delete [] allStatus ;
521            sent=false ;
522          }
523        }
524       
525
526        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
527        {
528           MPI_Status status ;
529           traceOff() ;
530           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
531           traceOn() ;
532           if (flag==true)
533           {
534              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
535              nbCompound++ ;
536              if (nbCompound==interCommLeft.size())
537              {
538                MPI_Comm_size(intraComm_,&size) ;
539                allRequests= new MPI_Request[size] ;
540                allStatus= new MPI_Status[size] ;
541                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
542                sent=true ;
543              }
544           }
545        }
546}
547     
548   /*!
549    * Processes probes message from root process if oasis_enddef call must be done.
550    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
551    */
552     void CServer::listenRootOasisEnddef(void)
553     {
554       int flag ;
555       MPI_Status status ;
556       const int root=0 ;
557       int msg ;
558       static bool eventSent=false ;
559
560       if (eventSent)
561       {
[2629]562         std::hash<string> hashString;
[2333]563         size_t hashId = hashString("oasis_enddef");
[2523]564
565         if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(0,hashId))
[2333]566         {
[2523]567           CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
[2335]568           driver_->endSynchronizedDefinition() ;
[2333]569           eventSent=false ;
570         }
571       }
572         
573       traceOff() ;
574       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
575       traceOn() ;
576       if (flag==true)
577       {
578           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
[2629]579           std::hash<string> hashString;
[2333]580           size_t hashId = hashString("oasis_enddef");
[2523]581           CXios::getPoolRessource()->getEventScheduler()->registerEvent(0,hashId);
[2333]582           eventSent=true ;
583       }
584     }
585
[300]586    void CServer::finalize(void)
587    {
[361]588      CTimer::get("XIOS").suspend() ;
[2399]589      CTimer::get("XIOS server").suspend() ;
[492]590      delete eventScheduler ;
[655]591
[1639]592      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
[2589]593        xios::MPI_Comm_free(&(*it));
[983]594
[1639]595      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
[2589]596        xios::MPI_Comm_free(&(*it));
[1071]597
[1639]598        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
[2589]599          xios::MPI_Comm_free(&(*it));
[992]600
[2589]601//      xios::MPI_Comm_free(&intraComm);
[2266]602      CXios::finalizeDaemonsManager();
[2274]603      finalizeServersRessource();
[1764]604     
[2274]605      CContext::removeAllContexts() ; // free memory for related context
[2580]606     
[2310]607      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
[2580]608      MPI_Comm xiosComm=CXios::getXiosComm() ;
[2589]609      xios::MPI_Comm_free(&xiosComm) ;
[2420]610      CMemChecker::logMem( "CServer::finalize", true );
[2576]611     
612      CCommTrack::dumpComm() ;
613
[300]614      if (!is_MPI_Initialized)
[490]615      {
[2335]616        if (CXios::usingOasis) delete driver_;
[1639]617        else MPI_Finalize() ;
[300]618      }
[347]619      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
620      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
621      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[2628]622
623      if (info.isActive(logProfile))
624      {
625        printProfile();
626      }
627     
628      if (info.isActive(logTimers)) report(0)<<"\n"<<CTimer::getAllCumulatedTime()<<endl ;
[2535]629      if (CXios::reportMemory)
630      {
631        report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
632      }
[2274]633     
[2146]634      CWorkflowGraph::drawWorkFlowGraph_server();
[2274]635      xios::releaseStaticAllocation() ; // free memory from static allocation
[300]636    }
[2628]637   
638    void CServer::printProfile()
639    {
640      list< pair<string,int> > timer_name;
641      timer_name.push_back({"XIOS server",0});
642      timer_name.push_back({"XIOS initialize",0});
643      timer_name.push_back({"XIOS event loop",0});
644      //timer_name.push_back({"Recv event loop (p2p)",1});      // timer concerned by yield and thread (if reader embedded)
645      //timer_name.push_back({"Recv event loop (legacy)",1});   // timer concerned by yield and thread
646      timer_name.push_back({"Process events",2});
647      timer_name.push_back({"Context : close definition",3});
648      timer_name.push_back({"Reader workflow data entry",3});
649      timer_name.push_back({"Files : reading data",4});
650      //timer_name.push_back({"Field : send data (read)",4});   // timer concerned by yield and thread
651      timer_name.push_back({"Server workflow data entry",3});
652      timer_name.push_back({"Server workflow",3});
653      timer_name.push_back({"Applying filters",4});
654      timer_name.push_back({"Transformation transfers",5});
655      timer_name.push_back({"Transformation MPI",6});
656      timer_name.push_back({"Temporal filters",5});
657      timer_name.push_back({"Field : send data",4});
658      //timer_name.push_back({"Scatter event",5});              // timer concerned by yield and thread
659      //timer_name.push_back({"Blocking time",6});              // timer concerned by yield and thread
660      timer_name.push_back({"Files : create headers",4});
661      timer_name.push_back({"Files : writing data",4});
662      timer_name.push_back({"Context finalize",3});             // timer concerned by yield and thread
663      timer_name.push_back({"Files : close",4});
664     
665      report(0)<< endl;
666      double total_time = CTimer::get("Process events").getCumulatedTime();
667      for(auto it_timer_name = timer_name.begin(); it_timer_name != timer_name.end(); it_timer_name++)
668      {
669        double timer_time = CTimer::get(it_timer_name->first).getCumulatedTime();
670        if ( timer_time / total_time > 0.001 )
671        {
672          ostringstream printed_line;
673          printed_line << setprecision(3) << std::fixed;
674          for(int itab=0;itab<it_timer_name->second;itab++)
675              printed_line << "  ";
676          printed_line << it_timer_name->first << " : " << timer_time <<endl;
677          string string_line = printed_line.str();
678          report(0)<< string_line;
679        }
680      }
681    }
[490]682
[523]683    /*!
684    * Open a file specified by a suffix and an extension and use it for the given file buffer.
685    * The file name will be suffix+rank+extension.
686    *
687    * \param fileName[in] protype file name
688    * \param ext [in] extension of the file
689    * \param fb [in/out] the file buffer
690    */
691    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
692    {
[1761]693      StdStringStream fileNameServer;
[523]694      int numDigit = 0;
[1761]695      int commSize = 0;
696      int commRank ;
[1021]697      int id;
[1761]698     
699      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
700      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
701
702      while (commSize)
[523]703      {
[1761]704        commSize /= 10;
[523]705        ++numDigit;
706      }
[1761]707      id = commRank;
[497]708
[1761]709      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
710      fb->open(fileNameServer.str().c_str(), std::ios::out);
[523]711      if (!fb->is_open())
712        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
[1761]713              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
[523]714    }
[490]715
[523]716    /*!
717    * \brief Open a file stream to write the info logs
718    * Open a file stream with a specific file name suffix+rank
719    * to write the info logs.
720    * \param fileName [in] protype file name
721    */
722    void CServer::openInfoStream(const StdString& fileName)
723    {
724      std::filebuf* fb = m_infoStream.rdbuf();
725      openStream(fileName, ".out", fb);
[490]726
[523]727      info.write2File(fb);
728      report.write2File(fb);
729    }
[490]730
[523]731    //! Write the info logs to standard output
732    void CServer::openInfoStream()
733    {
734      info.write2StdOut();
735      report.write2StdOut();
736    }
[490]737
[523]738    //! Close the info logs file if it opens
739    void CServer::closeInfoStream()
740    {
741      if (m_infoStream.is_open()) m_infoStream.close();
742    }
743
744    /*!
745    * \brief Open a file stream to write the error log
746    * Open a file stream with a specific file name suffix+rank
747    * to write the error log.
748    * \param fileName [in] protype file name
749    */
750    void CServer::openErrorStream(const StdString& fileName)
751    {
752      std::filebuf* fb = m_errorStream.rdbuf();
753      openStream(fileName, ".err", fb);
754
755      error.write2File(fb);
756    }
757
758    //! Write the error log to standard error output
759    void CServer::openErrorStream()
760    {
761      error.write2StdErr();
762    }
763
764    //! Close the error log file if it opens
765    void CServer::closeErrorStream()
766    {
767      if (m_errorStream.is_open()) m_errorStream.close();
768    }
[1761]769
770    void CServer::launchServersRessource(MPI_Comm serverComm)
771    {
772      serversRessource_ = new CServersRessource(serverComm) ;
773    }
[2274]774
775    void  CServer::finalizeServersRessource(void) 
776    { 
777      delete serversRessource_; serversRessource_=nullptr ;
778    }
[300]779}
Note: See TracBrowser for help on using the repository browser.