source: XIOS3/trunk/src/server.cpp @ 2589

Last change on this file since 2589 was 2589, checked in by jderouillat, 8 months ago

Specify the usage of the xios namespace to overload the MPI funtions

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 26.2 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "mem_checker.hpp"
16#include "event_scheduler.hpp"
17#include "string_tools.hpp"
18#include "ressources_manager.hpp"
19#include "services_manager.hpp"
20#include "contexts_manager.hpp"
21#include "servers_ressource.hpp"
22#include "services.hpp"
23#include "pool_node.hpp"
24#include <cstdio>
25#include "workflow_graph.hpp"
26#include "release_static_allocation.hpp"
27#include "thread_manager.hpp"
28#include <sys/stat.h>
29#include <unistd.h>
30
31
32
33namespace xios
34{
35    MPI_Comm CServer::intraComm_ ;
36    MPI_Comm CServer::serversComm_ ;
37    std::list<MPI_Comm> CServer::interCommLeft ;
38    std::list<MPI_Comm> CServer::interCommRight ;
39    std::list<MPI_Comm> CServer::contextInterComms;
40    std::list<MPI_Comm> CServer::contextIntraComms;
41    int CServer::serverLevel = 0 ;
42    int CServer::nbContexts = 0;
43    bool CServer::isRoot = false ;
44    int CServer::rank_ = INVALID_RANK;
45    StdOFStream CServer::m_infoStream;
46    StdOFStream CServer::m_errorStream;
47    map<string,CContext*> CServer::contextList ;
48    vector<int> CServer::sndServerGlobalRanks;
49    bool CServer::finished=false ;
50    bool CServer::is_MPI_Initialized ;
51    CEventScheduler* CServer::eventScheduler = 0;
52    CServersRessource* CServer::serversRessource_=nullptr ;
53    CThirdPartyDriver* CServer::driver_ =nullptr ;
54
55       
56    void CServer::initialize(void)
57    {
58     
59      MPI_Comm serverComm ;
60      int initialized ;
61      MPI_Initialized(&initialized) ;
62      if (initialized) is_MPI_Initialized=true ;
63      else is_MPI_Initialized=false ;
64      MPI_Comm globalComm=CXios::getGlobalComm() ;
65      /////////////////////////////////////////
66      ///////////// PART 1 ////////////////////
67      /////////////////////////////////////////
68      // don't use OASIS
69      if (!CXios::usingOasis)
70      {
71        if (!is_MPI_Initialized) 
72        {
73          int required = MPI_THREAD_SERIALIZED ;
74          int provided ;
75          MPI_Init_thread(NULL,NULL, required, &provided) ;
76        }
77       
78        // split the global communicator
79        // get hash from all model to attribute a unique color (int) and then split to get client communicator
80        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
81         
82        int commRank, commSize ;
83        MPI_Comm_rank(globalComm,&commRank) ;
84        MPI_Comm_size(globalComm,&commSize) ;
85
86        std::hash<string> hashString ;
87        size_t hashServer=hashString(CXios::xiosCodeId) ;
88         
89        size_t* hashAll = new size_t[commSize] ;
90        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
91         
92        int color=0 ;
93        map<size_t,int> listHash ;
94        for(int i=0 ; i<=commSize ; i++) 
95          if (listHash.count(hashAll[i])==0) 
96          {
97            listHash[hashAll[i]]=color ;
98            color=color+1 ;
99          }
100        color=listHash[hashServer] ;
101        delete[] hashAll ;
102
103        xios::MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
104        CXios::getMpiGarbageCollector().registerCommunicator(serverComm) ;
105
106      }
107      else // using OASIS
108      {
109       
110        if (!is_MPI_Initialized) 
111        {
112          int required = MPI_THREAD_SERIALIZED ;
113          int provided ;
114          MPI_Init_thread(NULL,NULL, required, &provided) ;
115        }
116
117        driver_ = new CThirdPartyDriver();
118
119        driver_->getComponentCommunicator( serverComm );
120      }
121      xios::MPI_Comm_dup(serverComm, &intraComm_);
122      CXios::getMpiGarbageCollector().registerCommunicator(intraComm_) ;
123     
124      CTimer::get("XIOS").resume() ;
125      CTimer::get("XIOS server").resume() ;
126      CTimer::get("XIOS initialize").resume() ;
127 
128      /////////////////////////////////////////
129      ///////////// PART 2 ////////////////////
130      /////////////////////////////////////////
131     
132
133      // Create the XIOS communicator for every process which is related
134      // to XIOS, as well on client side as on server side
135      MPI_Comm xiosGlobalComm ;
136      string strIds=CXios::getin<string>("clients_code_id","") ;
137      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
138      if (strIds.empty())
139      {
140        // no code Ids given, suppose XIOS initialisation is global           
141        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
142        MPI_Comm splitComm,interComm ;
143        MPI_Comm_rank(globalComm,&commGlobalRank) ;
144        xios::MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
145        MPI_Comm_rank(splitComm,&commRank) ;
146        if (commRank==0) serverLeader=commGlobalRank ;
147        else serverLeader=0 ;
148        clientLeader=0 ;
149        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
150        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
151        xios::MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
152        xios::MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
153        CXios::setXiosComm(xiosGlobalComm) ;
154      }
155      else
156      {
157
158        xiosGlobalCommByFileExchange(serverComm) ;
159
160      }
161     
162      /////////////////////////////////////////
163      ///////////// PART 4 ////////////////////
164      //  create servers intra communicator  //
165      /////////////////////////////////////////
166     
167      int commRank ;
168      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
169      xios::MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
170      CXios::getMpiGarbageCollector().registerCommunicator(serversComm_) ;
171     
172      CXios::setUsingServer() ;
173
174      /////////////////////////////////////////
175      ///////////// PART 5 ////////////////////
176      //       redirect files output         //
177      /////////////////////////////////////////
178     
179      CServer::openInfoStream(CXios::serverFile);
180      CServer::openErrorStream(CXios::serverFile);
181
182      CMemChecker::logMem( "CServer::initialize" );
183
184      /////////////////////////////////////////
185      ///////////// PART 4 ////////////////////
186      /////////////////////////////////////////
187
188      CXios::launchDaemonsManager(true) ;
189     
190      /////////////////////////////////////////
191      ///////////// PART 5 ////////////////////
192      /////////////////////////////////////////
193
194      // create the services
195
196      auto ressourcesManager=CXios::getRessourcesManager() ;
197      auto servicesManager=CXios::getServicesManager() ;
198      auto contextsManager=CXios::getContextsManager() ;
199      auto daemonsManager=CXios::getDaemonsManager() ;
200      auto serversRessource=CServer::getServersRessource() ;
201
202      int rank;
203      MPI_Comm_rank(intraComm_, &rank) ;
204      if (rank==0) isRoot=true;
205      else isRoot=false;
206
207      if (serversRessource->isServerLeader())
208      {
209        // creating pool
210        CPoolNodeGroup::get("xios","pool_definition")->solveDescInheritance(true) ;
211        vector<CPoolNode*> pools = CPoolNodeGroup::get("xios","pool_definition")->getAllChildren();
212        for(auto& pool : pools) pool->allocateRessources() ;
213       
214        int nbRessources = ressourcesManager->getFreeRessourcesSize() ;
215        if (nbRessources>0)
216        {
217          if (!CXios::usingServer2)
218          {
219            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
220            if (CThreadManager::isUsingThreads()) 
221              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
222              {
223                daemonsManager->eventLoop() ;
224                CThreadManager::yield() ;
225              }
226            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
227         
228            servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ;
229            if (CThreadManager::isUsingThreads()) 
230              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,0)) 
231              {
232                daemonsManager->eventLoop() ;
233                CThreadManager::yield() ;
234              }
235            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
236           
237            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ;
238            if (CThreadManager::isUsingThreads()) 
239            {
240              daemonsManager->eventLoop() ;
241              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) CThreadManager::yield() ;
242            }
243            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
244          }
245          else
246          {
247            int nprocsServer = nbRessources*CXios::ratioServer2/100.;
248            int nprocsGatherer = nbRessources - nprocsServer ;
249         
250            int nbPoolsServer2 = CXios::nbPoolsServer2 ;
251            if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
252            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
253            if (CThreadManager::isUsingThreads()) 
254              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
255              {
256                daemonsManager->eventLoop() ;
257                CThreadManager::yield() ;
258              }
259            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
260
261            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
262            if (CThreadManager::isUsingThreads()) 
263              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultGathererId,0)) 
264              {
265                daemonsManager->eventLoop() ;
266                CThreadManager::yield() ;
267              }
268            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ;
269
270            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;
271            if (CThreadManager::isUsingThreads()) 
272              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) 
273              {
274                daemonsManager->eventLoop() ;
275                CThreadManager::yield() ;
276              }
277            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
278           
279            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;
280            if (CThreadManager::isUsingThreads())
281              for(int i=0; i<nbPoolsServer2; i++)
282                while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,i)) 
283                {
284                  daemonsManager->eventLoop() ;
285                  CThreadManager::yield() ;
286                }
287            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
288          }
289        }
290//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
291      }
292
293      MPI_Request req ;
294      MPI_Status status ;
295      MPI_Ibarrier(getServersRessource()->getCommunicator(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
296      int ok=false ;
297      while (!ok)
298      {
299        daemonsManager->eventLoop() ;
300        if (CThreadManager::isUsingThreads()) CThreadManager::yield();
301        MPI_Test(&req,&ok,&status) ;
302      }
303
304
305      //testingEventScheduler() ;
306/*
307      MPI_Request req ;
308      MPI_Status status ;
309      MPI_Ibarrier(CXios::getXiosComm(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
310      int ok=false ;
311      while (!ok)
312      {
313        daemonsManager->eventLoop() ;
314        MPI_Test(&req,&ok,&status) ;
315      }
316*/
317      CTimer::get("XIOS initialize").suspend() ;
318
319      /////////////////////////////////////////
320      ///////////// PART 5 ////////////////////
321      /////////////////////////////////////////
322      // loop on event loop
323
324      bool finished=false ;
325      CTimer::get("XIOS event loop").resume() ;
326
327      while (!finished)
328      {
329        finished=daemonsManager->eventLoop() ;
330        if (CThreadManager::isUsingThreads()) CThreadManager::yield() ;
331      }
332      CTimer::get("XIOS event loop").suspend() ;
333
334      // Delete CContext
335      //CObjectTemplate<CContext>::cleanStaticDataStructure();
336    }
337
338
339    void CServer::testingEventScheduler(void)
340    {
341      CXios::getPoolRessource()->getEventScheduler()->registerEvent(1,10) ;
342      CXios::getPoolRessource()->getEventScheduler()->registerEvent(2,10) ;
343      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
344      {
345        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(1,100) ;
346        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(2,100) ;
347        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(3,100) ;
348      }
349      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
350      {
351        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(1,1000) ;
352        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(2,1000) ;
353      }
354      CXios::getPoolRessource()->getEventScheduler()->registerEvent(3,10) ;
355      CXios::getPoolRessource()->getEventScheduler()->registerEvent(4,10) ;
356     
357      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
358      {
359        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(4,100) ;
360        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(5,100) ;
361      }
362      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
363      {
364        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(3,1000) ;
365        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(4,1000) ;
366        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(5,1000) ;
367      }
368      CXios::getPoolRessource()->getEventScheduler()->registerEvent(5,10) ;
369      CXios::getPoolRessource()->getEventScheduler()->registerEvent(6,10) ;
370     
371      int numEvents=0 ;
372      int poolEvent=1 ;
373      int gatherEvent=1 ;
374      int writerEvent=1 ;
375      do
376      {
377        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(poolEvent,10))
378        {
379          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
380          MPI_Barrier(CXios::getPoolRessource()->getCommunicator());
381          poolEvent++ ;
382          numEvents++;
383        }
384       
385        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(gatherEvent,100))
386        {
387          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
388          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)->getCommunicator());
389          gatherEvent++ ;
390          numEvents++;
391        }
392
393        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(writerEvent,1000))
394        {
395          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
396          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)->getCommunicator());
397          writerEvent++ ;
398          numEvents++;
399        }
400
401       
402      } while (numEvents!=11) ;
403
404    }
405
406
407    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
408    {
409       
410      MPI_Comm globalComm=CXios::getGlobalComm() ;
411      MPI_Comm xiosGlobalComm ;
412     
413      string strIds=CXios::getin<string>("clients_code_id","") ;
414      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
415     
416      int commRank, globalRank ;
417      MPI_Comm_rank(serverComm, &commRank) ;
418      MPI_Comm_rank(globalComm, &globalRank) ;
419      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
420
421      if (commRank==0) // if root process publish name
422      { 
423        std::ofstream ofs (serverFileName, std::ofstream::out);
424        ofs<<globalRank ;
425        ofs.close();
426      }
427       
428      vector<int> clientsRank(clientsCodeId.size()) ;
429      for(int i=0;i<clientsRank.size();i++)
430      {
431        std::ifstream ifs ;
432        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
433        struct stat buffer;
434        do {
435        } while( stat(fileName.c_str(), &buffer) != 0 );
436        sleep(1);
437        ifs.open(fileName, ifstream::in) ;
438        ifs>>clientsRank[i] ;
439        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
440        ifs.close() ; 
441      }
442
443      MPI_Comm intraComm ;
444      xios::MPI_Comm_dup(serverComm,&intraComm) ;
445      MPI_Comm interComm ;
446      for(int i=0 ; i<clientsRank.size(); i++)
447      { 
448        xios::MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
449        CXios::getMpiGarbageCollector().registerCommunicator(interComm) ;
450        interCommLeft.push_back(interComm) ;
451        xios::MPI_Comm_free(&intraComm) ;
452        xios::MPI_Intercomm_merge(interComm,false, &intraComm ) ;
453      }
454      xiosGlobalComm=intraComm ; 
455      MPI_Barrier(xiosGlobalComm);
456      if (commRank==0) std::remove(serverFileName.c_str()) ;
457      MPI_Barrier(xiosGlobalComm);
458
459      CXios::setXiosComm(xiosGlobalComm) ;
460     
461    }
462
463
464    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
465    {
466        // untested, need to be tested on a true MPI-2 compliant library
467
468        // try to discover other client/server
469/*
470        // publish server name
471        char portName[MPI_MAX_PORT_NAME];
472        int ierr ;
473        int commRank ;
474        MPI_Comm_rank(serverComm, &commRank) ;
475       
476        if (commRank==0) // if root process publish name
477        { 
478          MPI_Open_port(MPI_INFO_NULL, portName);
479          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
480        }
481
482        MPI_Comm intraComm=serverComm ;
483        MPI_Comm interComm ;
484        for(int i=0 ; i<clientsCodeId.size(); i++)
485        { 
486          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
487          xios::MPI_Intercomm_merge(interComm,false, &intraComm ) ;
488        }
489*/     
490    }
491
492   /*!
493    * Root process is listening for an order sent by client to call "oasis_enddef".
494    * The root client of a compound send the order (tag 5). It is probed and received.
495    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
496    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
497    */
498   
499     void CServer::listenOasisEnddef(void)
500     {
501        int flag ;
502        MPI_Status status ;
503        list<MPI_Comm>::iterator it;
504        int msg ;
505        static int nbCompound=0 ;
506        int size ;
507        static bool sent=false ;
508        static MPI_Request* allRequests ;
509        static MPI_Status* allStatus ;
510
511
512        if (sent)
513        {
514          MPI_Comm_size(intraComm_,&size) ;
515          MPI_Testall(size,allRequests, &flag, allStatus) ;
516          if (flag==true)
517          {
518            delete [] allRequests ;
519            delete [] allStatus ;
520            sent=false ;
521          }
522        }
523       
524
525        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
526        {
527           MPI_Status status ;
528           traceOff() ;
529           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
530           traceOn() ;
531           if (flag==true)
532           {
533              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
534              nbCompound++ ;
535              if (nbCompound==interCommLeft.size())
536              {
537                MPI_Comm_size(intraComm_,&size) ;
538                allRequests= new MPI_Request[size] ;
539                allStatus= new MPI_Status[size] ;
540                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
541                sent=true ;
542              }
543           }
544        }
545}
546     
547   /*!
548    * Processes probes message from root process if oasis_enddef call must be done.
549    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
550    */
551     void CServer::listenRootOasisEnddef(void)
552     {
553       int flag ;
554       MPI_Status status ;
555       const int root=0 ;
556       int msg ;
557       static bool eventSent=false ;
558
559       if (eventSent)
560       {
561         boost::hash<string> hashString;
562         size_t hashId = hashString("oasis_enddef");
563
564         if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(0,hashId))
565         {
566           CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
567           driver_->endSynchronizedDefinition() ;
568           eventSent=false ;
569         }
570       }
571         
572       traceOff() ;
573       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
574       traceOn() ;
575       if (flag==true)
576       {
577           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
578           boost::hash<string> hashString;
579           size_t hashId = hashString("oasis_enddef");
580           CXios::getPoolRessource()->getEventScheduler()->registerEvent(0,hashId);
581           eventSent=true ;
582       }
583     }
584
585    void CServer::finalize(void)
586    {
587      CTimer::get("XIOS").suspend() ;
588      CTimer::get("XIOS server").suspend() ;
589      delete eventScheduler ;
590
591      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
592        xios::MPI_Comm_free(&(*it));
593
594      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
595        xios::MPI_Comm_free(&(*it));
596
597        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
598          xios::MPI_Comm_free(&(*it));
599
600//      xios::MPI_Comm_free(&intraComm);
601      CXios::finalizeDaemonsManager();
602      finalizeServersRessource();
603     
604      CContext::removeAllContexts() ; // free memory for related context
605     
606      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
607      MPI_Comm xiosComm=CXios::getXiosComm() ;
608      xios::MPI_Comm_free(&xiosComm) ;
609      CMemChecker::logMem( "CServer::finalize", true );
610     
611      CCommTrack::dumpComm() ;
612
613      if (!is_MPI_Initialized)
614      {
615        if (CXios::usingOasis) delete driver_;
616        else MPI_Finalize() ;
617      }
618      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
619      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
620      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
621      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
622      if (CXios::reportMemory)
623      {
624        report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
625      }
626     
627      CWorkflowGraph::drawWorkFlowGraph_server();
628      xios::releaseStaticAllocation() ; // free memory from static allocation
629    }
630
631    /*!
632    * Open a file specified by a suffix and an extension and use it for the given file buffer.
633    * The file name will be suffix+rank+extension.
634    *
635    * \param fileName[in] protype file name
636    * \param ext [in] extension of the file
637    * \param fb [in/out] the file buffer
638    */
639    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
640    {
641      StdStringStream fileNameServer;
642      int numDigit = 0;
643      int commSize = 0;
644      int commRank ;
645      int id;
646     
647      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
648      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
649
650      while (commSize)
651      {
652        commSize /= 10;
653        ++numDigit;
654      }
655      id = commRank;
656
657      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
658      fb->open(fileNameServer.str().c_str(), std::ios::out);
659      if (!fb->is_open())
660        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
661              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
662    }
663
664    /*!
665    * \brief Open a file stream to write the info logs
666    * Open a file stream with a specific file name suffix+rank
667    * to write the info logs.
668    * \param fileName [in] protype file name
669    */
670    void CServer::openInfoStream(const StdString& fileName)
671    {
672      std::filebuf* fb = m_infoStream.rdbuf();
673      openStream(fileName, ".out", fb);
674
675      info.write2File(fb);
676      report.write2File(fb);
677    }
678
679    //! Write the info logs to standard output
680    void CServer::openInfoStream()
681    {
682      info.write2StdOut();
683      report.write2StdOut();
684    }
685
686    //! Close the info logs file if it opens
687    void CServer::closeInfoStream()
688    {
689      if (m_infoStream.is_open()) m_infoStream.close();
690    }
691
692    /*!
693    * \brief Open a file stream to write the error log
694    * Open a file stream with a specific file name suffix+rank
695    * to write the error log.
696    * \param fileName [in] protype file name
697    */
698    void CServer::openErrorStream(const StdString& fileName)
699    {
700      std::filebuf* fb = m_errorStream.rdbuf();
701      openStream(fileName, ".err", fb);
702
703      error.write2File(fb);
704    }
705
706    //! Write the error log to standard error output
707    void CServer::openErrorStream()
708    {
709      error.write2StdErr();
710    }
711
712    //! Close the error log file if it opens
713    void CServer::closeErrorStream()
714    {
715      if (m_errorStream.is_open()) m_errorStream.close();
716    }
717
718    void CServer::launchServersRessource(MPI_Comm serverComm)
719    {
720      serversRessource_ = new CServersRessource(serverComm) ;
721    }
722
723    void  CServer::finalizeServersRessource(void) 
724    { 
725      delete serversRessource_; serversRessource_=nullptr ;
726    }
727}
Note: See TracBrowser for help on using the repository browser.