source: XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp @ 2324

Last change on this file since 2324 was 2324, checked in by ymipsl, 2 years ago

Solve deadlock or crash occuring when activate second levels of servers.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 81.2 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16#include "timer.hpp"
17#include "memtrack.hpp"
18#include <limits>
19#include <fstream>
20#include "server.hpp"
21#include "distribute_file_server2.hpp"
22#include "services_manager.hpp"
23#include "contexts_manager.hpp"
24#include "cxios.hpp"
25#include "client.hpp"
26#include "coupler_in.hpp"
27#include "coupler_out.hpp"
28#include "servers_ressource.hpp"
29#include "pool_ressource.hpp"
30#include "services.hpp"
31#include "contexts_manager.hpp"
32#include <chrono>
33#include <random>
34
35namespace xios
36{
37
38  std::shared_ptr<CContextGroup> CContext::root;
39
40   /// ////////////////////// Définitions ////////////////////// ///
41
42   CContext::CContext(void)
43      : CObjectTemplate<CContext>(), CContextAttributes()
44      , calendar(), hasClient(false), hasServer(false)
45      , isPostProcessed(false), finalized(false)
46      , client(nullptr), server(nullptr)
47      , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false)
48
49   { /* Ne rien faire de plus */ }
50
51   CContext::CContext(const StdString & id)
52      : CObjectTemplate<CContext>(id), CContextAttributes()
53      , calendar(), hasClient(false), hasServer(false)
54      , isPostProcessed(false), finalized(false)
55      , client(nullptr), server(nullptr)
56      , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false)
57   { /* Ne rien faire de plus */ }
58
59   CContext::~CContext(void)
60   {
61     delete client;
62     delete server;
63     for (std::vector<CContextClient*>::iterator it = clientPrimServer.begin(); it != clientPrimServer.end(); it++)  delete *it;
64     for (std::vector<CContextServer*>::iterator it = serverPrimServer.begin(); it != serverPrimServer.end(); it++)  delete *it;
65     if (registryIn!=nullptr) delete registryIn ;
66     if (registryOut!=nullptr) delete registryOut ;
67   }
68
69   //----------------------------------------------------------------
70   //! Get name of context
71   StdString CContext::GetName(void)   { return (StdString("context")); }
72   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
73   ENodeType CContext::GetType(void)   { return (eContext); }
74
75   //----------------------------------------------------------------
76
77  void CContext::initEventScheduler(void)
78  {
79    SRegisterContextInfo contextInfo ;
80    CXios::getContextsManager()->getContextInfo(this->getId(), contextInfo, getIntraComm()) ;
81
82    eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
83 
84    // generate unique hash for server
85    auto time=chrono::system_clock::now().time_since_epoch().count() ;
86    std::default_random_engine rd(time); // not reproducible from a run to another
87    std::uniform_int_distribution<size_t> dist;
88    hashId_=dist(rd) ;
89    MPI_Bcast(&hashId_,1,MPI_SIZE_T,0,getIntraComm()) ; // Bcast to all server of the context
90  }
91   /*!
92   \brief Get context group (context root)
93   \return Context root
94   */
95   CContextGroup* CContext::getRoot(void)
96   TRY
97   {
98      if (root.get()==NULL) root=std::shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
99      return root.get();
100   }
101   CATCH
102
103   void CContext::releaseStaticAllocation(void)
104   TRY
105   {
106      CDomain::releaseStaticAllocation();
107      CAxis::releaseStaticAllocation();
108      CScalar::releaseStaticAllocation();
109      if (root) root.reset() ;
110   }
111   CATCH
112   
113   //----------------------------------------------------------------
114
115   /*!
116   \brief Get calendar of a context
117   \return Calendar
118   */
119   std::shared_ptr<CCalendar> CContext::getCalendar(void) const
120   TRY
121   {
122      return (this->calendar);
123   }
124   CATCH
125
126   //----------------------------------------------------------------
127
128   /*!
129   \brief Set a context with a calendar
130   \param[in] newCalendar new calendar
131   */
132   void CContext::setCalendar(std::shared_ptr<CCalendar> newCalendar)
133   TRY
134   {
135      this->calendar = newCalendar;
136   }
137   CATCH_DUMP_ATTR
138
139   //----------------------------------------------------------------
140   /*!
141   \brief Parse xml file and write information into context object
142   \param [in] node xmld node corresponding in xml file
143   */
144   void CContext::parse(xml::CXMLNode & node)
145   TRY
146   {
147      CContext::SuperClass::parse(node);
148
149      // PARSING POUR GESTION DES ENFANTS
150      xml::THashAttributes attributes = node.getAttributes();
151
152      if (attributes.end() != attributes.find("src"))
153      {
154         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
155         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
156            ERROR("void CContext::parse(xml::CXMLNode & node)",
157                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
158         if (!ifs.good())
159            ERROR("CContext::parse(xml::CXMLNode & node)",
160                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
161         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
162      }
163
164      if (node.getElementName().compare(CContext::GetName()))
165         DEBUG("Le noeud is wrong defined but will be considered as a context !");
166
167      if (!(node.goToChildElement()))
168      {
169         DEBUG("Le context ne contient pas d'enfant !");
170      }
171      else
172      {
173         do { // Parcours des contextes pour traitement.
174
175            StdString name = node.getElementName();
176            attributes.clear();
177            attributes = node.getAttributes();
178
179            if (attributes.end() != attributes.find("id"))
180            {
181              DEBUG(<< "Definition node has an id,"
182                    << "it will not be taking account !");
183            }
184
185#define DECLARE_NODE(Name_, name_)    \
186   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
187   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
188#define DECLARE_NODE_PAR(Name_, name_)
189#include "node_type.conf"
190
191            DEBUG(<< "The element \'"     << name
192                  << "\' in the context \'" << CContext::getCurrent()->getId()
193                  << "\' is not a definition !");
194
195         } while (node.goToNextElement());
196
197         node.goToParentElement(); // Retour au parent
198      }
199   }
200   CATCH_DUMP_ATTR
201
202   //----------------------------------------------------------------
203   //! Show tree structure of context
204   void CContext::ShowTree(StdOStream & out)
205   TRY
206   {
207      StdString currentContextId = CContext::getCurrent() -> getId();
208      std::vector<CContext*> def_vector =
209         CContext::getRoot()->getChildList();
210      std::vector<CContext*>::iterator
211         it = def_vector.begin(), end = def_vector.end();
212
213      out << "<? xml version=\"1.0\" ?>" << std::endl;
214      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
215
216      for (; it != end; it++)
217      {
218         CContext* context = *it;
219         CContext::setCurrent(context->getId());
220         out << *context << std::endl;
221      }
222
223      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
224      CContext::setCurrent(currentContextId);
225   }
226   CATCH
227
228   //----------------------------------------------------------------
229
230   //! Convert context object into string (to print)
231   StdString CContext::toString(void) const
232   TRY
233   {
234      StdOStringStream oss;
235      oss << "<" << CContext::GetName()
236          << " id=\"" << this->getId() << "\" "
237          << SuperClassAttribute::toString() << ">" << std::endl;
238      if (!this->hasChild())
239      {
240         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
241      }
242      else
243      {
244
245#define DECLARE_NODE(Name_, name_)    \
246   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
247   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
248#define DECLARE_NODE_PAR(Name_, name_)
249#include "node_type.conf"
250
251      }
252      oss << "</" << CContext::GetName() << " >";
253      return (oss.str());
254   }
255   CATCH
256
257   //----------------------------------------------------------------
258
259   /*!
260   \brief Find all inheritace among objects in a context.
261   \param [in] apply (true) write attributes of parent into ones of child if they are empty
262                     (false) write attributes of parent into a new container of child
263   \param [in] parent unused
264   */
265   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
266   TRY
267   {
268#define DECLARE_NODE(Name_, name_)    \
269   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
270     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273   }
274   CATCH_DUMP_ATTR
275
276   //----------------------------------------------------------------
277
278   //! Verify if all root definition in the context have child.
279   bool CContext::hasChild(void) const
280   TRY
281   {
282      return (
283#define DECLARE_NODE(Name_, name_)    \
284   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
285#define DECLARE_NODE_PAR(Name_, name_)
286#include "node_type.conf"
287      false);
288}
289   CATCH
290
291   //----------------------------------------------------------------
292
293   void CContext::CleanTree(void)
294   TRY
295   {
296#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
297#define DECLARE_NODE_PAR(Name_, name_)
298#include "node_type.conf"
299   }
300   CATCH
301
302void CContext::removeContext(const string& contextId)
303{
304  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteContext< C##Name_ >(contextId);
305  #define DECLARE_NODE_PAR(Name_, name_) CObjectFactory::deleteContext< C##Name_ >(contextId);
306  #include "node_type.conf"
307  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteContext< C##Name_##Group >(contextId);
308  #define DECLARE_NODE_PAR(Name_, name_) 
309  #include "node_type.conf"
310
311/*
312  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_##Group >();
313  #define DECLARE_NODE_PAR(Name_, name_)
314  #include "node_type.conf"
315
316  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_>();
317  #define DECLARE_NODE_PAR(Name_, name_)
318  #include "node_type.conf"
319*/
320}
321
322void CContext::removeAllContexts(void)
323{
324  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteAllContexts< C##Name_ >();
325  #define DECLARE_NODE_PAR(Name_, name_) CObjectFactory::deleteAllContexts< C##Name_ >();
326  #include "node_type.conf"
327  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteAllContexts< C##Name_##Group >();
328  #define DECLARE_NODE_PAR(Name_, name_) 
329  #include "node_type.conf"
330/*
331  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_##Group >();
332  #define DECLARE_NODE_PAR(Name_, name_)
333  #include "node_type.conf"
334
335  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_>();
336  #define DECLARE_NODE_PAR(Name_, name_)
337  #include "node_type.conf"
338*/
339  CObjectFactory::deleteAllContexts<CContext>() ;
340  CObjectFactory::deleteAllContexts<CContextGroup>() ;
341  CObjectFactory::clearCurrentContextId();
342  CGroupFactory::clearCurrentContextId();
343}
344   ///---------------------------------------------------------------
345
346
347 /*!
348    * Compute the required buffer size to send the fields data.
349    * \param maxEventSize [in/out] the size of the bigger event for each connected server
350    * \param [in] contextClient
351    * \param [in] bufferForWriting True if buffers are used for sending data for writing
352      This flag is only true for client and server-1 for communication with server-2
353    */
354   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize,
355                                                      CContextClient* contextClient, bool bufferForWriting /*= "false"*/)
356   TRY
357   {
358     std::map<int, StdSize> dataSize;
359
360     // Find all reference domain and axis of all active fields
361     std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles;
362     size_t numEnabledFiles = fileList.size();
363     for (size_t i = 0; i < numEnabledFiles; ++i)
364     {
365       CFile* file = fileList[i];
366       if (file->getContextClient() == contextClient)
367       {
368         std::vector<CField*> enabledFields = file->getEnabledFields();
369         size_t numEnabledFields = enabledFields.size();
370         for (size_t j = 0; j < numEnabledFields; ++j)
371         {
372           // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient);
373           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting);
374           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
375           for (; it != itE; ++it)
376           {
377             // If dataSize[it->first] does not exist, it will be zero-initialized
378             // so we can use it safely without checking for its existance
379           if (CXios::isOptPerformance)
380               dataSize[it->first] += it->second;
381             else if (dataSize[it->first] < it->second)
382               dataSize[it->first] = it->second;
383
384           if (maxEventSize[it->first] < it->second)
385               maxEventSize[it->first] = it->second;
386           }
387         }
388       }
389     }
390     return dataSize;
391   }
392   CATCH_DUMP_ATTR
393
394/*!
395    * Compute the required buffer size to send the attributes (mostly those grid related).
396    * \param maxEventSize [in/out] the size of the bigger event for each connected server
397    * \param [in] contextClient
398    * \param [in] bufferForWriting True if buffers are used for sending data for writing
399      This flag is only true for client and server-1 for communication with server-2
400    */
401   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize,
402                                                           CContextClient* contextClient, bool bufferForWriting /*= "false"*/)
403   TRY
404   {
405   // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes
406     std::map<int, StdSize> attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient);
407     maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient);
408
409     std::vector<CFile*>& fileList = this->enabledFiles;
410     size_t numEnabledFiles = fileList.size();
411     for (size_t i = 0; i < numEnabledFiles; ++i)
412     {
413//         CFile* file = this->enabledWriteModeFiles[i];
414        CFile* file = fileList[i];
415        std::vector<CField*> enabledFields = file->getEnabledFields();
416        size_t numEnabledFields = enabledFields.size();
417        for (size_t j = 0; j < numEnabledFields; ++j)
418        {
419          const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting);
420          std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
421          for (; it != itE; ++it)
422          {
423         // If attributesSize[it->first] does not exist, it will be zero-initialized
424         // so we can use it safely without checking for its existence
425             if (attributesSize[it->first] < it->second)
426         attributesSize[it->first] = it->second;
427
428         if (maxEventSize[it->first] < it->second)
429         maxEventSize[it->first] = it->second;
430          }
431        }
432     }
433     return attributesSize;
434   }
435   CATCH_DUMP_ATTR
436
437
438
439   //! Verify whether a context is initialized
440   bool CContext::isInitialized(void)
441   TRY
442   {
443     return hasClient;
444   }
445   CATCH_DUMP_ATTR
446
447
448   void CContext::init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType)
449   TRY
450   {
451     parentServerContext_ = parentServerContext ;
452     if (serviceType==CServicesManager::CLIENT) 
453       initClient(intraComm, serviceType) ;
454     else
455       initServer(intraComm, serviceType) ;
456     initEventScheduler() ;
457    }
458    CATCH_DUMP_ATTR
459
460
461
462//! Initialize client side
463   void CContext::initClient(MPI_Comm intraComm, int serviceType)
464   TRY
465   {
466      intraComm_=intraComm ;
467      MPI_Comm_rank(intraComm_, &intraCommRank_) ;
468      MPI_Comm_size(intraComm_, &intraCommSize_) ;
469
470      serviceType_ = CServicesManager::CLIENT ;
471      if (serviceType_==CServicesManager::CLIENT)
472      {
473        hasClient=true ;
474        hasServer=false ;
475      }
476      contextId_ = getId() ;
477     
478      attached_mode=true ;
479      if (!CXios::isUsingServer()) attached_mode=false ;
480
481
482      string contextRegistryId=getId() ;
483      registryIn=new CRegistry(CXios::getRegistryManager()->getRegistryIn());
484      registryIn->setPath(contextRegistryId) ;
485     
486      registryOut=new CRegistry(intraComm_) ;
487      registryOut->setPath(contextRegistryId) ;
488     
489   }
490   CATCH_DUMP_ATTR
491
492   
493   void CContext::initServer(MPI_Comm intraComm, int serviceType)
494   TRY
495   {
496     hasServer=true;
497     intraComm_=intraComm ;
498     MPI_Comm_rank(intraComm_, &intraCommRank_) ;
499     MPI_Comm_size(intraComm_, &intraCommSize_) ;
500
501     serviceType_=serviceType ;
502
503     if (serviceType_==CServicesManager::GATHERER)
504     {
505       hasClient=true ;
506       hasServer=true ;
507     }
508     else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)
509     {
510       hasClient=false ;
511       hasServer=true ;
512     }
513
514     CXios::getContextsManager()->getContextId(getId(), contextId_, intraComm) ;
515     
516     string contextRegistryId=getId() ;
517     registryIn=new CRegistry(CXios::getRegistryManager()->getRegistryIn());
518     registryIn->setPath(contextRegistryId) ;
519     
520     registryOut=new CRegistry(intraComm_) ;
521     registryOut->setPath(contextRegistryId) ;
522
523   }
524   CATCH_DUMP_ATTR
525
526
527  void CContext::createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) // for servers
528  TRY
529  {
530    MPI_Comm intraCommClient ;
531    MPI_Comm_dup(intraComm_, &intraCommClient);
532    comms.push_back(intraCommClient);
533    // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context
534    server = new CContextServer(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ?
535    client = new CContextClient(this,intraCommClient,interCommClient);
536    client->setAssociatedServer(server) ; 
537    server->setAssociatedClient(client) ; 
538
539  }
540  CATCH_DUMP_ATTR
541
542  void CContext::createServerInterComm(void) 
543  TRY
544  {
545   
546    MPI_Comm interCommClient, interCommServer ;
547
548    if (serviceType_ == CServicesManager::CLIENT)
549    {
550
551      int commRank ;
552      MPI_Comm_rank(intraComm_,&commRank) ;
553      if (commRank==0)
554      {
555        if (attached_mode) CXios::getContextsManager()->createServerContext(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultServerId, 0, getContextId()) ;
556        else if (CXios::usingServer2) CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId()) ;
557        else  CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId()) ;
558      }
559
560      MPI_Comm interComm ;
561     
562      if (attached_mode)
563      {
564        parentServerContext_->createIntercomm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultServerId, 0, getContextId(), intraComm_, 
565                                              interCommClient, interCommServer) ;
566        int type ; 
567        if (commRank==0) CXios::getServicesManager()->getServiceType(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type) ;
568        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
569        setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble
570      }
571      else if (CXios::usingServer2)
572      { 
573//      CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, interComm) ;
574        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_,
575                                              interCommClient, interCommServer) ;
576        int type ; 
577        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultGathererId, 0, type) ;
578        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
579      }
580      else
581      {
582        //CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, interComm) ;
583        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_,
584                                              interCommClient, interCommServer) ;
585        int type ; 
586        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ;
587        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
588      }
589
590        // intraComm client is not duplicated. In all the code we use client->intraComm for MPI
591        // in future better to replace it by intracommuncator associated to the context
592   
593      MPI_Comm intraCommClient, intraCommServer ;
594      intraCommClient=intraComm_ ;
595      MPI_Comm_dup(intraComm_, &intraCommServer) ;
596      client = new CContextClient(this, intraCommClient, interCommClient);
597      server = new CContextServer(this, intraCommServer, interCommServer);
598      client->setAssociatedServer(server) ;
599      server->setAssociatedClient(client) ;
600    }
601   
602    if (serviceType_ == CServicesManager::GATHERER)
603    {
604      int commRank ;
605      MPI_Comm_rank(intraComm_,&commRank) ;
606     
607      int nbPartitions ;
608      if (commRank==0) 
609      { 
610        CXios::getServicesManager()->getServiceNbPartitions(CXios::defaultPoolId, CXios::defaultServerId, 0, nbPartitions) ;
611        for(int i=0 ; i<nbPartitions; i++)
612          CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId()) ;
613      }     
614      MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ;
615     
616      MPI_Comm interComm ;
617      for(int i=0 ; i<nbPartitions; i++)
618      {
619        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interCommClient, interCommServer) ;
620        int type ; 
621        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ;
622        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
623        primServerId_.push_back(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, i, type, getContextId())) ;
624
625        // intraComm client is not duplicated. In all the code we use client->intraComm for MPI
626        // in future better to replace it by intracommuncator associated to the context
627     
628        MPI_Comm intraCommClient, intraCommServer ;
629
630        intraCommClient=intraComm_ ;
631        MPI_Comm_dup(intraComm_, &intraCommServer) ;
632
633        CContextClient* client = new CContextClient(this, intraCommClient, interCommClient) ;
634        CContextServer* server = new CContextServer(this, intraCommServer, interCommServer) ;
635        client->setAssociatedServer(server) ;
636        server->setAssociatedClient(client) ;
637        clientPrimServer.push_back(client);
638        serverPrimServer.push_back(server); 
639
640     
641      }
642    }
643  }
644  CATCH_DUMP_ATTR
645
646  void CContext::globalEventLoop(void)
647  {
648    lockContext() ;
649    CXios::getDaemonsManager()->eventLoop() ;
650    unlockContext() ;
651    setCurrent(getId()) ;
652  }
653
654  bool CContext::scheduledEventLoop(bool enableEventsProcessing) 
655  {
656    bool out, finished; 
657    size_t timeLine=timeLine_ ;
658    if (serviceType_==CServicesManager::CLIENT)
659    {
660      timeLine_++ ;
661      eventScheduler_->registerEvent(timeLine, hashId_) ;
662    }
663
664    do
665    { 
666      finished=eventLoop(enableEventsProcessing) ;
667      if (serviceType_==CServicesManager::CLIENT) 
668      { 
669        out = eventScheduler_->queryEvent(timeLine,hashId_) ;
670        if (out) eventScheduler_->popEvent() ;
671      }
672
673      else out=true ;
674    }  while(!out) ;
675   
676    return finished ;
677  }
678
679  bool CContext::eventLoop(bool enableEventsProcessing)
680  {
681    bool  finished(true); 
682    if (isLockedContext()) enableEventsProcessing=false;
683   
684    setCurrent(getId()) ;
685
686    if (client!=nullptr && !finalized) client->eventLoop();
687   
688    for (int i = 0; i < clientPrimServer.size(); ++i)
689    {
690      if (!finalized) clientPrimServer[i]->eventLoop();
691      if (!finalized) finished &= serverPrimServer[i]->eventLoop(enableEventsProcessing);
692    }
693
694    for (auto couplerOut : couplerOutClient_)
695      if (!finalized) couplerOut.second->eventLoop();
696   
697    for (auto couplerIn : couplerInClient_)
698      if (!finalized) couplerIn.second->eventLoop();
699   
700    for (auto couplerOut : couplerOutServer_)
701      if (!finalized) couplerOut.second->eventLoop(enableEventsProcessing);
702
703    for (auto couplerIn : couplerInServer_)
704      if (!finalized) couplerIn.second->eventLoop(enableEventsProcessing);
705   
706    if (server!=nullptr) if (!finalized) finished &= server->eventLoop(enableEventsProcessing);
707    setCurrent(getId()) ;
708    return finalized && finished ;
709  }
710
711  void CContext::addCouplingChanel(const std::string& fullContextId, bool out)
712  {
713     int contextLeader ;
714     
715     if (out)
716     { 
717       if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end()) 
718       {
719         bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ;
720     
721         MPI_Comm interComm, interCommClient, interCommServer  ;
722         MPI_Comm intraCommClient, intraCommServer ;
723
724         if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;
725
726        MPI_Comm_dup(intraComm_, &intraCommClient) ;
727        MPI_Comm_dup(intraComm_, &intraCommServer) ;
728        MPI_Comm_dup(interComm, &interCommClient) ;
729        MPI_Comm_dup(interComm, &interCommServer) ;
730        CContextClient* client = new CContextClient(this, intraCommClient, interCommClient);
731        CContextServer* server = new CContextServer(this, intraCommServer, interCommServer);
732        client->setAssociatedServer(server) ;
733        server->setAssociatedClient(client) ;
734        MPI_Comm_free(&interComm) ;
735        couplerOutClient_[fullContextId] = client ;
736        couplerOutServer_[fullContextId] = server ;
737      }
738    }
739    else if (couplerInClient_.find(fullContextId)==couplerInClient_.end())
740    {
741      bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ;
742     
743       MPI_Comm interComm, interCommClient, interCommServer  ;
744       MPI_Comm intraCommClient, intraCommServer ;
745
746       if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;
747
748       MPI_Comm_dup(intraComm_, &intraCommClient) ;
749       MPI_Comm_dup(intraComm_, &intraCommServer) ;
750       MPI_Comm_dup(interComm, &interCommServer) ;
751       MPI_Comm_dup(interComm, &interCommClient) ;
752       CContextServer* server = new CContextServer(this, intraCommServer, interCommServer);
753       CContextClient* client = new CContextClient(this, intraCommClient, interCommClient);
754       client->setAssociatedServer(server) ;
755       server->setAssociatedClient(client) ;
756       MPI_Comm_free(&interComm) ;
757
758       couplerInClient_[fullContextId] = client ;
759       couplerInServer_[fullContextId] = server ;       
760    }
761  }
762 
763   void CContext::finalize(void)
764   TRY
765   {
766      registryOut->hierarchicalGatherRegistry() ;
767      if (server->intraCommRank==0) CXios::getRegistryManager()->merge(*registryOut) ;
768
769      if (serviceType_==CServicesManager::CLIENT)
770      {
771//ym        doPreTimestepOperationsForEnabledReadModeFiles(); // For now we only use server level 1 to read data
772
773        triggerLateFields() ;
774
775        // inform couplerIn that I am finished
776        for(auto& couplerInClient : couplerInClient_) sendCouplerInContextFinalized(couplerInClient.second) ;
777
778        // wait until received message from couplerOut that they have finished
779        bool couplersInFinalized ;
780        do
781        {
782          couplersInFinalized=true ;
783          for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ; 
784          globalEventLoop() ;
785        } while (!couplersInFinalized) ;
786
787        info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ;
788        client->finalize();
789        info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ;
790        while (client->havePendingRequests()) client->eventLoop();
791        info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ;
792        bool notifiedFinalized=false ;
793        do
794        {
795          notifiedFinalized=client->isNotifiedFinalized() ;
796        } while (!notifiedFinalized) ;
797
798        server->releaseBuffers();
799        client->releaseBuffers();
800        info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ;
801      }
802      else if (serviceType_==CServicesManager::GATHERER)
803      {
804         for (int i = 0; i < clientPrimServer.size(); ++i)
805         {
806           clientPrimServer[i]->finalize();
807           bool bufferReleased;
808           do
809           {
810             clientPrimServer[i]->eventLoop();
811             bufferReleased = !clientPrimServer[i]->havePendingRequests();
812           } while (!bufferReleased);
813           
814           bool notifiedFinalized=false ;
815           do
816           {
817             notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ;
818           } while (!notifiedFinalized) ;
819           clientPrimServer[i]->releaseBuffers();
820         }
821         closeAllFile();
822
823      }
824      else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)
825      {
826        closeAllFile();
827        client->releaseBuffers();
828        server->releaseBuffers();
829      }
830
831      freeComms() ;
832       
833      parentServerContext_->freeComm() ;
834      finalized = true;
835      info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;
836   }
837   CATCH_DUMP_ATTR
838
839   //! Free internally allocated communicators
840   void CContext::freeComms(void)
841   TRY
842   {
843     for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
844       MPI_Comm_free(&(*it));
845     comms.clear();
846   }
847   CATCH_DUMP_ATTR
848
849   //! Deallocate buffers allocated by clientContexts
850   void CContext::releaseClientBuffers(void)
851   TRY
852   {
853     client->releaseBuffers();
854     for (int i = 0; i < clientPrimServer.size(); ++i)
855       clientPrimServer[i]->releaseBuffers();
856   }
857   CATCH_DUMP_ATTR
858
859   
860   /*!
861   \brief Close all the context defintion and do processing data
862      After everything is well defined on client side, they will be processed and sent to server
863   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
864   all necessary information to server, from which each server can build its own database.
865   Because the role of server is to write out field data on a specific netcdf file,
866   the only information that it needs is the enabled files
867   and the active fields (fields will be written onto active files)
868   */
869  void CContext::closeDefinition(void)
870   TRY
871   {
872     CTimer::get("Context : close definition").resume() ;
873     
874     // create intercommunicator with servers.
875     // not sure it is the good place to be called here
876     createServerInterComm() ;
877
878
879     // After xml is parsed, there are some more works with post processing
880//     postProcessing();
881
882   
883    // Make sure the calendar was correctly created
884    if (serviceType_!=CServicesManager::CLIENT) CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
885    if (!calendar)
886      ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
887    else if (calendar->getTimeStep() == NoneDu)
888      ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
889    // Calendar first update to set the current date equals to the start date
890    calendar->update(0);
891
892    // Résolution des héritages descendants (càd des héritages de groupes)
893    // pour chacun des contextes.
894    solveDescInheritance(true);
895 
896    // Solve inheritance for field to know if enabled or not.
897    for (auto field : CField::getAll()) field->solveRefInheritance();
898
899    // Check if some axis, domains or grids are eligible to for compressed indexed output.
900    // Warning: This must be done after solving the inheritance and before the rest of post-processing
901    // --> later ????    checkAxisDomainsGridsEligibilityForCompressedOutput();     
902
903      // Check if some automatic time series should be generated
904      // Warning: This must be done after solving the inheritance and before the rest of post-processing     
905
906    // The timeseries should only be prepared in client
907    prepareTimeseries();
908
909    //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
910    findEnabledFiles();
911    findEnabledWriteModeFiles();
912    findEnabledReadModeFiles();
913    findEnabledCouplerIn();
914    findEnabledCouplerOut();
915    createCouplerInterCommunicator() ;
916
917    // Find all enabled fields of each file     
918    vector<CField*>&& fileOutField = findAllEnabledFieldsInFileOut(this->enabledWriteModeFiles);
919    vector<CField*>&& fileInField = findAllEnabledFieldsInFileIn(this->enabledReadModeFiles);
920    vector<CField*>&& couplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut);
921    vector<CField*>&& couplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn);
922    findFieldsWithReadAccess();
923    vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ;
924    vector<CField*> fieldModelIn ; // fields potentially from model
925     
926    // define if files are on clientSied or serverSide
927    if (serviceType_==CServicesManager::CLIENT)
928    {
929      for (auto& file : enabledWriteModeFiles) file->setClientSide() ;
930      for (auto& file : enabledReadModeFiles) file->setClientSide() ;
931    }
932    else
933    {
934      for (auto& file : enabledWriteModeFiles) file->setServerSide() ;
935      for (auto& file : enabledReadModeFiles) file->setServerSide() ;
936    }
937
938   
939    for (auto& field : couplerInField)
940    {
941      field->unsetGridCompleted() ;
942    }
943// find all field potentially at workflow end
944    vector<CField*> endWorkflowFields ;
945    endWorkflowFields.reserve(fileOutField.size()+couplerOutField.size()+fieldWithReadAccess.size()) ;
946    endWorkflowFields.insert(endWorkflowFields.end(),fileOutField.begin(), fileOutField.end()) ;
947    endWorkflowFields.insert(endWorkflowFields.end(),couplerOutField.begin(), couplerOutField.end()) ;
948    endWorkflowFields.insert(endWorkflowFields.end(),fieldWithReadAccess.begin(), fieldWithReadAccess.end()) ;
949
950    bool workflowGraphIsCompleted ;
951   
952    bool first=true ;
953   
954    do
955    {
956      workflowGraphIsCompleted=true; 
957      for(auto endWorkflowField : endWorkflowFields) 
958      {
959        workflowGraphIsCompleted &= endWorkflowField->buildWorkflowGraph(garbageCollector) ;
960      }
961   
962      for(auto couplerIn : enabledCouplerIn) couplerIn->assignContext() ;
963      for(auto field : couplerInField) field->makeGridAliasForCoupling();
964      for(auto field : couplerInField) this->sendCouplerInReady(field->getContextClient()) ;
965   
966
967      // assign context to coupler out and related fields
968      for(auto couplerOut : enabledCouplerOut) couplerOut->assignContext() ;
969      // for now supose that all coupling out endpoint are succesfull. The difficultie is client/server buffer evaluation
970      for(auto field : couplerOutField) 
971      {
972        // connect to couplerOut -> to do
973      }
974
975      bool couplersReady ;
976      do 
977      {
978        couplersReady=true ;
979        for(auto field : couplerOutField)
980        {
981          bool ready = isCouplerInReady(field->getContextClient()) ; 
982          if (ready) field->sendFieldToCouplerOut() ;
983          couplersReady &= ready ;
984        }
985        this->scheduledEventLoop() ;
986
987      } while (!couplersReady) ;
988     
989      first=false ;
990      this->scheduledEventLoop() ;
991
992    } while (!workflowGraphIsCompleted) ;
993
994
995    for( auto field : couplerInField) couplerInFields_.push_back(field) ;
996
997    // get all field coming potentially from model
998    for (auto field : CField::getAll() ) if (field->getModelIn()) fieldModelIn.push_back(field) ;
999
1000    // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields
1001    if (serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles);
1002    else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ;
1003
1004    // client side, assign context for file reading
1005    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ;
1006   
1007    // server side, assign context where to send file data read
1008    if (serviceType_==CServicesManager::CServicesManager::GATHERER || serviceType_==CServicesManager::IO_SERVER) 
1009      for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ;
1010   
1011    // workflow endpoint => sent to IO/SERVER
1012    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER)
1013    {
1014      for(auto field : fileOutField) 
1015      {
1016        field->connectToFileServer(garbageCollector) ; // connect the field to server filter
1017      }
1018      for(auto field : fileOutField) field->sendFieldToFileServer() ;
1019    }
1020
1021    // workflow endpoint => write to file
1022    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)
1023    {
1024      for(auto field : fileOutField) 
1025      {
1026        field->connectToFileWriter(garbageCollector) ; // connect the field to server filter
1027      }
1028    }
1029   
1030    // workflow endpoint => Send data from server to client
1031    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER)
1032    {
1033      for(auto field : fileInField) 
1034      {
1035        field->connectToServerToClient(garbageCollector) ;
1036      }
1037    }
1038
1039    // workflow endpoint => sent to model on client side
1040    if (serviceType_==CServicesManager::CLIENT)
1041    {
1042      for(auto field : fieldWithReadAccess) field->connectToModelOutput(garbageCollector) ;
1043    }
1044
1045
1046    // workflow startpoint => data from model
1047    if (serviceType_==CServicesManager::CLIENT)
1048    {
1049      for(auto field : fieldModelIn) 
1050      {
1051        field->connectToModelInput(garbageCollector) ; // connect the field to server filter
1052        // grid index will be computed on the fly
1053      }
1054    }
1055   
1056    // workflow startpoint => data from client on server side
1057    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER || serviceType_==CServicesManager::OUT_SERVER)
1058    {
1059      for(auto field : fieldModelIn) 
1060      {
1061        field->connectToClientInput(garbageCollector) ; // connect the field to server filter
1062      }
1063    }
1064
1065   
1066    for(auto field : couplerInField) 
1067    {
1068      field->connectToCouplerIn(garbageCollector) ; // connect the field to server filter
1069    }
1070   
1071   
1072    for(auto field : couplerOutField) 
1073    {
1074      field->connectToCouplerOut(garbageCollector) ; // for now the same kind of filter that for file server
1075    }
1076
1077     // workflow startpoint => data from server on client side
1078    if (serviceType_==CServicesManager::CLIENT)
1079    {
1080      for(auto field : fileInField) 
1081      {
1082        field->sendFieldToInputFileServer() ;
1083        field->connectToServerInput(garbageCollector) ; // connect the field to server filter
1084        fileInFields_.push_back(field) ;
1085      }
1086    }
1087
1088    // workflow startpoint => data read from file on server side
1089    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER)
1090    {
1091      for(auto field : fileInField) 
1092      {
1093        field->connectToFileReader(garbageCollector) ;
1094      }
1095    }
1096   
1097    // construct slave server list
1098    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) 
1099    {
1100      for(auto field : fileOutField) slaveServers_.insert(field->getContextClient()) ; 
1101    }
1102
1103    for(auto& slaveServer : slaveServers_) sendCloseDefinition(slaveServer) ;
1104
1105    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
1106    {
1107      createFileHeader();
1108    }
1109
1110    if (serviceType_==CServicesManager::CLIENT) startPrefetchingOfEnabledReadModeFiles();
1111   
1112    // send signal to couplerIn context that definition phasis is done
1113
1114    for(auto& couplerInClient : couplerInClient_) sendCouplerInCloseDefinition(couplerInClient.second) ;
1115
1116    // wait until all couplerIn signal that closeDefition is done.
1117    bool ok;
1118    do
1119    {
1120      ok = true ;
1121      for(auto& couplerOutClient : couplerOutClient_) ok &= isCouplerInCloseDefinition(couplerOutClient.second) ;
1122      this->scheduledEventLoop() ;
1123    } while (!ok) ;
1124
1125    // Now evaluate the size of the context client buffers
1126    map<CContextClient*,map<int,size_t>> fieldBufferEvaluation ;
1127    for(auto field : fileOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to server
1128    for(auto field : couplerOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to coupler
1129    for(auto field : fieldModelIn) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // server to client (for io servers)
1130   
1131    // fix size for each context client
1132    for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ;
1133
1134
1135     CTimer::get("Context : close definition").suspend() ;
1136  }
1137  CATCH_DUMP_ATTR
1138
1139
1140  vector<CField*> CContext::findAllEnabledFieldsInFileOut(const std::vector<CFile*>& activeFiles)
1141   TRY
1142   {
1143     vector<CField*> fields ;
1144     for(auto file : activeFiles)
1145     {
1146        const vector<CField*>&& fieldList=file->getEnabledFields() ;
1147        for(auto field : fieldList) field->setFileOut(file) ;
1148        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1149     }
1150     return fields ;
1151   }
1152   CATCH_DUMP_ATTR
1153
1154   vector<CField*> CContext::findAllEnabledFieldsInFileIn(const std::vector<CFile*>& activeFiles)
1155   TRY
1156   {
1157     vector<CField*> fields ;
1158     for(auto file : activeFiles)
1159     {
1160        const vector<CField*>&& fieldList=file->getEnabledFields() ;
1161        for(auto field : fieldList) field->setFileIn(file) ;
1162        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1163     }
1164     return fields ;
1165   }
1166   CATCH_DUMP_ATTR
1167
1168   vector<CField*> CContext::findAllEnabledFieldsCouplerOut(const std::vector<CCouplerOut*>& activeCouplerOut)
1169   TRY
1170   {
1171     vector<CField*> fields ;
1172     for (auto couplerOut :activeCouplerOut)
1173     {
1174        const vector<CField*>&& fieldList=couplerOut->getEnabledFields() ;
1175        for(auto field : fieldList) field->setCouplerOut(couplerOut) ;
1176        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1177     }
1178     return fields ;
1179   }
1180   CATCH_DUMP_ATTR
1181
1182   vector<CField*> CContext::findAllEnabledFieldsCouplerIn(const std::vector<CCouplerIn*>& activeCouplerIn)
1183   TRY
1184   {
1185     vector<CField*> fields ;
1186     for (auto couplerIn :activeCouplerIn)
1187     {
1188        const vector<CField*>&& fieldList=couplerIn->getEnabledFields() ;
1189        for(auto field : fieldList) field->setCouplerIn(couplerIn) ;
1190        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1191     }
1192     return fields ;
1193   }
1194   CATCH_DUMP_ATTR
1195
1196 /*!
1197  * Send context attribute and calendar to file server, it must be done once by context file server
1198  * \param[in] client : context client to send   
1199  */ 
1200  void CContext::sendContextToFileServer(CContextClient* client)
1201  {
1202    if (sendToFileServer_done_.count(client)!=0) return ;
1203    else sendToFileServer_done_.insert(client) ;
1204   
1205    this->sendAllAttributesToServer(client); // Send all attributes of current context to server
1206    CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(client); // Send all attributes of current cale
1207  }
1208
1209 
1210   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
1211   TRY
1212   {
1213      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
1214        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
1215   }
1216   CATCH_DUMP_ATTR
1217
1218
1219   void CContext::postProcessFilterGraph()
1220   TRY
1221   {
1222     int size = enabledFiles.size();
1223     for (int i = 0; i < size; ++i)
1224     {
1225        enabledFiles[i]->postProcessFilterGraph();
1226     }
1227   }
1228   CATCH_DUMP_ATTR
1229
1230   void CContext::startPrefetchingOfEnabledReadModeFiles()
1231   TRY
1232   {
1233     int size = enabledReadModeFiles.size();
1234     for (int i = 0; i < size; ++i)
1235     {
1236        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
1237     }
1238   }
1239   CATCH_DUMP_ATTR
1240
1241   void CContext::doPreTimestepOperationsForEnabledReadModeFiles()
1242   TRY
1243   {
1244     int size = enabledReadModeFiles.size();
1245     for (int i = 0; i < size; ++i)
1246     {
1247        enabledReadModeFiles[i]->doPreTimestepOperationsForEnabledReadModeFields();
1248     }
1249   }
1250   CATCH_DUMP_ATTR
1251
1252   void CContext::doPostTimestepOperationsForEnabledReadModeFiles()
1253   TRY
1254   {
1255     int size = enabledReadModeFiles.size();
1256     for (int i = 0; i < size; ++i)
1257     {
1258        enabledReadModeFiles[i]->doPostTimestepOperationsForEnabledReadModeFields();
1259     }
1260   }
1261   CATCH_DUMP_ATTR
1262
1263  void CContext::findFieldsWithReadAccess(void)
1264  TRY
1265  {
1266    fieldsWithReadAccess_.clear();
1267    const vector<CField*> allFields = CField::getAll();
1268    for (size_t i = 0; i < allFields.size(); ++i)
1269    {
1270      CField* field = allFields[i];
1271      if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
1272      {
1273        fieldsWithReadAccess_.push_back(field);
1274        field->setModelOut() ;
1275      }
1276    }
1277  }
1278  CATCH_DUMP_ATTR
1279
1280 
1281   void CContext::solveAllInheritance(bool apply)
1282   TRY
1283   {
1284     // Résolution des héritages descendants (càd des héritages de groupes)
1285     // pour chacun des contextes.
1286      solveDescInheritance(apply);
1287
1288     // Résolution des héritages par référence au niveau des fichiers.
1289      const vector<CFile*> allFiles=CFile::getAll();
1290      const vector<CCouplerIn*> allCouplerIn=CCouplerIn::getAll();
1291      const vector<CCouplerOut*> allCouplerOut=CCouplerOut::getAll();
1292      const vector<CGrid*> allGrids= CGrid::getAll();
1293
1294      if (serviceType_==CServicesManager::CLIENT)
1295      {
1296        for (unsigned int i = 0; i < allFiles.size(); i++)
1297          allFiles[i]->solveFieldRefInheritance(apply);
1298
1299        for (unsigned int i = 0; i < allCouplerIn.size(); i++)
1300          allCouplerIn[i]->solveFieldRefInheritance(apply);
1301
1302        for (unsigned int i = 0; i < allCouplerOut.size(); i++)
1303          allCouplerOut[i]->solveFieldRefInheritance(apply);
1304      }
1305
1306      unsigned int vecSize = allGrids.size();
1307      unsigned int i = 0;
1308      for (i = 0; i < vecSize; ++i)
1309        allGrids[i]->solveElementsRefInheritance(apply);
1310
1311   }
1312  CATCH_DUMP_ATTR
1313
1314   void CContext::findEnabledFiles(void)
1315   TRY
1316   {
1317      const std::vector<CFile*> allFiles = CFile::getAll();
1318      const CDate& initDate = calendar->getInitDate();
1319
1320      for (unsigned int i = 0; i < allFiles.size(); i++)
1321         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
1322         {
1323            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
1324            {
1325              if (allFiles[i]->output_freq.isEmpty())
1326              {
1327                 ERROR("CContext::findEnabledFiles()",
1328                     << "Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
1329                     <<" \".")
1330              }
1331              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
1332              {
1333                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
1334                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
1335                    <<"\" is less than the time step. File will not be written."<<endl;
1336              }
1337              else
1338               enabledFiles.push_back(allFiles[i]);
1339            }
1340         }
1341         else
1342         {
1343           if (allFiles[i]->output_freq.isEmpty())
1344           {
1345              ERROR("CContext::findEnabledFiles()",
1346                  << "Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
1347                  <<" \".")
1348           }
1349           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
1350           {
1351             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
1352                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
1353                 <<"\" is less than the time step. File will not be written."<<endl;
1354           }
1355           else
1356             enabledFiles.push_back(allFiles[i]); // otherwise true by default
1357         }
1358
1359      if (enabledFiles.size() == 0)
1360         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
1361               << getId() << "\" !");
1362
1363   }
1364   CATCH_DUMP_ATTR
1365
1366   void CContext::findEnabledCouplerIn(void)
1367   TRY
1368   {
1369      const std::vector<CCouplerIn*> allCouplerIn = CCouplerIn::getAll();
1370      bool enabled ;
1371      for (size_t i = 0; i < allCouplerIn.size(); i++)
1372      {
1373        if (allCouplerIn[i]->enabled.isEmpty()) enabled=true ;
1374        else enabled=allCouplerIn[i]->enabled ;
1375        if (enabled) enabledCouplerIn.push_back(allCouplerIn[i]) ;
1376      }
1377   }
1378   CATCH_DUMP_ATTR
1379
1380   void CContext::findEnabledCouplerOut(void)
1381   TRY
1382   {
1383      const std::vector<CCouplerOut*> allCouplerOut = CCouplerOut::getAll();
1384      bool enabled ;
1385      for (size_t i = 0; i < allCouplerOut.size(); i++)
1386      {
1387        if (allCouplerOut[i]->enabled.isEmpty()) enabled=true ;
1388        else enabled=allCouplerOut[i]->enabled ;
1389        if (enabled) enabledCouplerOut.push_back(allCouplerOut[i]) ;
1390      }
1391   }
1392   CATCH_DUMP_ATTR
1393
1394
1395
1396
1397   void CContext::distributeFiles(const vector<CFile*>& files)
1398   TRY
1399   {
1400     bool distFileMemory=false ;
1401     distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory);
1402
1403     if (distFileMemory) distributeFileOverMemoryBandwith(files) ;
1404     else distributeFileOverBandwith(files) ;
1405   }
1406   CATCH_DUMP_ATTR
1407
1408   void CContext::distributeFileOverBandwith(const vector<CFile*>& files)
1409   TRY
1410   {
1411     double eps=std::numeric_limits<double>::epsilon()*10 ;
1412     
1413     std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out);
1414     int nbPools = clientPrimServer.size();
1415
1416     // (1) Find all enabled files in write mode
1417     // for (int i = 0; i < this->enabledFiles.size(); ++i)
1418     // {
1419     //   if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
1420     //    enabledWriteModeFiles.push_back(enabledFiles[i]);
1421     // }
1422
1423     // (2) Estimate the data volume for each file
1424     int size = files.size();
1425     std::vector<std::pair<double, CFile*> > dataSizeMap;
1426     double dataPerPool = 0;
1427     int nfield=0 ;
1428     ofs<<size<<endl ;
1429     for (size_t i = 0; i < size; ++i)
1430     {
1431       CFile* file = files[i];
1432       ofs<<file->getId()<<endl ;
1433       StdSize dataSize=0;
1434       std::vector<CField*> enabledFields = file->getEnabledFields();
1435       size_t numEnabledFields = enabledFields.size();
1436       ofs<<numEnabledFields<<endl ;
1437       for (size_t j = 0; j < numEnabledFields; ++j)
1438       {
1439         dataSize += enabledFields[j]->getGlobalWrittenSize() ;
1440         ofs<<enabledFields[j]->getGrid()->getId()<<endl ;
1441         ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ;
1442       }
1443       double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ;
1444       double dataSizeSec= dataSize/ outFreqSec;
1445       ofs<<dataSizeSec<<endl ;
1446       nfield++ ;
1447// add epsilon*nField to dataSizeSec in order to  preserve reproductive ordering when sorting
1448       dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file));
1449       dataPerPool += dataSizeSec;
1450     }
1451     dataPerPool /= nbPools;
1452     std::sort(dataSizeMap.begin(), dataSizeMap.end());
1453
1454     // (3) Assign contextClient to each enabled file
1455
1456     std::multimap<double,int> poolDataSize ;
1457// multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11
1458
1459     int j;
1460     double dataSize ;
1461     for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 
1462             
1463     for (int i = dataSizeMap.size()-1; i >= 0; --i)
1464     {
1465       dataSize=(*poolDataSize.begin()).first ;
1466       j=(*poolDataSize.begin()).second ;
1467       dataSizeMap[i].second->setContextClient(clientPrimServer[j]);
1468       dataSize+=dataSizeMap[i].first;
1469       poolDataSize.erase(poolDataSize.begin()) ;
1470       poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 
1471     }
1472
1473     for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" :  ratio "<<it->first*1./dataPerPool<<endl ;
1474   }
1475   CATCH_DUMP_ATTR
1476
1477   void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList)
1478   TRY
1479   {
1480     int nbPools = clientPrimServer.size();
1481     double ratio=0.5 ;
1482     ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio);
1483
1484     int nFiles = filesList.size();
1485     vector<SDistFile> files(nFiles);
1486     vector<SDistGrid> grids;
1487     map<string,int> gridMap ;
1488     string gridId; 
1489     int gridIndex=0 ;
1490
1491     for (size_t i = 0; i < nFiles; ++i)
1492     {
1493       StdSize dataSize=0;
1494       CFile* file = filesList[i];
1495       std::vector<CField*> enabledFields = file->getEnabledFields();
1496       size_t numEnabledFields = enabledFields.size();
1497
1498       files[i].id_=file->getId() ;
1499       files[i].nbGrids_=numEnabledFields;
1500       files[i].assignedGrid_ = new int[files[i].nbGrids_] ;
1501         
1502       for (size_t j = 0; j < numEnabledFields; ++j)
1503       {
1504         gridId=enabledFields[j]->getGrid()->getId() ;
1505         if (gridMap.find(gridId)==gridMap.end())
1506         {
1507            gridMap[gridId]=gridIndex  ;
1508            SDistGrid newGrid; 
1509            grids.push_back(newGrid) ;
1510            gridIndex++ ;
1511         }
1512         files[i].assignedGrid_[j]=gridMap[gridId] ;
1513         grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ;
1514         dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull
1515       }
1516       double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ;
1517       files[i].bandwith_= dataSize/ outFreqSec ;
1518     }
1519
1520     double bandwith=0 ;
1521     double memory=0 ;
1522   
1523     for(int i=0; i<nFiles; i++)  bandwith+=files[i].bandwith_ ;
1524     for(int i=0; i<nFiles; i++)  files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ;
1525
1526     for(int i=0; i<grids.size(); i++)  memory+=grids[i].size_ ;
1527     for(int i=0; i<grids.size(); i++)  grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ;
1528       
1529     distributeFileOverServer2(nbPools, grids.size(), &grids[0], nFiles, &files[0]) ;
1530
1531     vector<double> memorySize(nbPools,0.) ;
1532     vector< set<int> > serverGrids(nbPools) ;
1533     vector<double> bandwithSize(nbPools,0.) ;
1534       
1535     for (size_t i = 0; i < nFiles; ++i)
1536     {
1537       bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ;
1538       for(int j=0 ; j<files[i].nbGrids_;j++)
1539       {
1540         if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end())
1541         {
1542           memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio);
1543           serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ;
1544         }
1545       }
1546       filesList[i]->setContextClient(clientPrimServer[files[i].assignedServer_]) ;
1547       delete [] files[i].assignedGrid_ ;
1548     }
1549
1550     for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<"   assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<" Mb / days"<<endl ;
1551     for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<"   assigned grid memory "<<memorySize[i]*100/1024./1024.<<" Mb"<<endl ;
1552
1553   }
1554   CATCH_DUMP_ATTR
1555
1556   /*!
1557      Find all files in write mode
1558   */
1559   void CContext::findEnabledWriteModeFiles(void)
1560   TRY
1561   {
1562     int size = this->enabledFiles.size();
1563     for (int i = 0; i < size; ++i)
1564     {
1565       if (enabledFiles[i]->mode.isEmpty() || 
1566          (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
1567        enabledWriteModeFiles.push_back(enabledFiles[i]);
1568     }
1569   }
1570   CATCH_DUMP_ATTR
1571
1572   /*!
1573      Find all files in read mode
1574   */
1575   void CContext::findEnabledReadModeFiles(void)
1576   TRY
1577   {
1578     int size = this->enabledFiles.size();
1579     for (int i = 0; i < size; ++i)
1580     {
1581       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
1582        enabledReadModeFiles.push_back(enabledFiles[i]);
1583     }
1584   }
1585   CATCH_DUMP_ATTR
1586
1587   void CContext::closeAllFile(void)
1588   TRY
1589   {
1590     std::vector<CFile*>::const_iterator
1591            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
1592
1593     for (; it != end; it++)
1594     {
1595       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
1596       (*it)->close();
1597     }
1598   }
1599   CATCH_DUMP_ATTR
1600
1601   /*!
1602   \brief Dispatch event received from client
1603      Whenever a message is received in buffer of server, it will be processed depending on
1604   its event type. A new event type should be added in the switch list to make sure
1605   it processed on server side.
1606   \param [in] event: Received message
1607   */
1608   bool CContext::dispatchEvent(CEventServer& event)
1609   TRY
1610   {
1611
1612      if (SuperClass::dispatchEvent(event)) return true;
1613      else
1614      {
1615        switch(event.type)
1616        {
1617           case EVENT_ID_CLOSE_DEFINITION :
1618             recvCloseDefinition(event);
1619             return true;
1620             break;
1621           case EVENT_ID_UPDATE_CALENDAR:
1622             recvUpdateCalendar(event);
1623             return true;
1624             break;
1625           case EVENT_ID_CREATE_FILE_HEADER :
1626             recvCreateFileHeader(event);
1627             return true;
1628             break;
1629           case EVENT_ID_COUPLER_IN_READY:
1630             recvCouplerInReady(event);
1631             return true;
1632             break;
1633           case EVENT_ID_COUPLER_IN_CLOSE_DEFINITION:
1634             recvCouplerInCloseDefinition(event);
1635             return true;
1636             break;
1637           case EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED:
1638             recvCouplerInContextFinalized(event);
1639             return true;
1640             break; 
1641           default :
1642             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
1643                    <<"Unknown Event");
1644           return false;
1645         }
1646      }
1647   }
1648   CATCH
1649
1650   //! Client side: Send a message to server to make it close
1651   // ym obsolete
1652   void CContext::sendCloseDefinition(void)
1653   TRY
1654   {
1655    int nbSrvPools ;
1656    if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ;
1657    else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ;
1658    else nbSrvPools = 0 ;
1659    CContextClient* contextClientTmp ;
1660
1661    for (int i = 0; i < nbSrvPools; ++i)
1662     {
1663       if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ;
1664       else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ;
1665       CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
1666       if (contextClientTmp->isServerLeader())
1667       {
1668         CMessage msg;
1669         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1670         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1671           event.push(*itRank,1,msg);
1672         contextClientTmp->sendEvent(event);
1673       }
1674       else contextClientTmp->sendEvent(event);
1675     }
1676   }
1677   CATCH_DUMP_ATTR
1678   
1679   //  ! Client side: Send a message to server to make it close
1680   void CContext::sendCloseDefinition(CContextClient* client)
1681   TRY
1682   {
1683      if (sendCloseDefinition_done_.count(client)!=0) return ;
1684      else sendCloseDefinition_done_.insert(client) ;
1685
1686      CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
1687      if (client->isServerLeader())
1688      {
1689        CMessage msg;
1690        for(auto rank : client->getRanksServerLeader()) event.push(rank,1,msg);
1691        client->sendEvent(event);
1692      }
1693     else client->sendEvent(event);
1694   }
1695   CATCH_DUMP_ATTR
1696
1697   //! Server side: Receive a message of client announcing a context close
1698   void CContext::recvCloseDefinition(CEventServer& event)
1699   TRY
1700   {
1701      CBufferIn* buffer=event.subEvents.begin()->buffer;
1702      getCurrent()->closeDefinition();
1703   }
1704   CATCH
1705
1706   //! Client side: Send a message to update calendar in each time step
1707   void CContext::sendUpdateCalendar(int step)
1708   TRY
1709   {
1710     for(auto client : slaveServers_) 
1711     {
1712       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
1713       if (client->isServerLeader())
1714       {
1715         CMessage msg;
1716         msg<<step;
1717         for (auto& rank : client->getRanksServerLeader() ) event.push(rank,1,msg);
1718         client->sendEvent(event);
1719       }
1720       else client->sendEvent(event);
1721     }
1722   }
1723   CATCH_DUMP_ATTR
1724
1725   //! Server side: Receive a message of client annoucing calendar update
1726   void CContext::recvUpdateCalendar(CEventServer& event)
1727   TRY
1728   {
1729      CBufferIn* buffer=event.subEvents.begin()->buffer;
1730      getCurrent()->recvUpdateCalendar(*buffer);
1731   }
1732   CATCH
1733
1734   //! Server side: Receive a message of client annoucing calendar update
1735   void CContext::recvUpdateCalendar(CBufferIn& buffer)
1736   TRY
1737   {
1738      int step;
1739      buffer>>step;
1740      updateCalendar(step);
1741      if (serviceType_==CServicesManager::GATHERER)
1742      {       
1743        sendUpdateCalendar(step);
1744      }
1745   }
1746   CATCH_DUMP_ATTR
1747
1748   //! Client side: Send a message to create header part of netcdf file
1749   void CContext::sendCreateFileHeader(void)
1750   TRY
1751   {
1752     int nbSrvPools ;
1753     if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ;
1754     else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ;
1755     else nbSrvPools = 0 ;
1756     CContextClient* contextClientTmp ;
1757
1758     for (int i = 0; i < nbSrvPools; ++i)
1759     {
1760       if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ;
1761       else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ;
1762       CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
1763
1764       if (contextClientTmp->isServerLeader())
1765       {
1766         CMessage msg;
1767         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1768         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1769           event.push(*itRank,1,msg) ;
1770         contextClientTmp->sendEvent(event);
1771       }
1772       else contextClientTmp->sendEvent(event);
1773     }
1774   }
1775   CATCH_DUMP_ATTR
1776
1777   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
1778   void CContext::recvCreateFileHeader(CEventServer& event)
1779   TRY
1780   {
1781      CBufferIn* buffer=event.subEvents.begin()->buffer;
1782      getCurrent()->recvCreateFileHeader(*buffer);
1783   }
1784   CATCH
1785
1786   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
1787   void CContext::recvCreateFileHeader(CBufferIn& buffer)
1788   TRY
1789   {
1790      if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
1791        createFileHeader();
1792   }
1793   CATCH_DUMP_ATTR
1794
1795   void CContext::createCouplerInterCommunicator(void)
1796   TRY
1797   {
1798      int rank=this->getIntraCommRank() ;
1799      map<string,list<CCouplerOut*>> listCouplerOut ; 
1800      map<string,list<CCouplerIn*>> listCouplerIn ; 
1801
1802      for(auto couplerOut : enabledCouplerOut) listCouplerOut[couplerOut->getCouplingContextId()].push_back(couplerOut) ;
1803      for(auto couplerIn : enabledCouplerIn) listCouplerIn[couplerIn->getCouplingContextId()].push_back(couplerIn) ;
1804
1805      CCouplerManager* couplerManager = CXios::getCouplerManager() ;
1806      if (rank==0)
1807      {
1808        for(auto couplerOut : listCouplerOut) couplerManager->registerCoupling(this->getContextId(),couplerOut.first) ;
1809        for(auto couplerIn : listCouplerIn) couplerManager->registerCoupling(couplerIn.first,this->getContextId()) ;
1810      }
1811
1812      do
1813      {
1814        for(auto couplerOut : listCouplerOut) 
1815        {
1816          bool isNextCoupling ;
1817          if (rank==0) isNextCoupling = couplerManager->isNextCoupling(this->getContextId(),couplerOut.first) ;
1818          MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 
1819          if (isNextCoupling) 
1820          {
1821            addCouplingChanel(couplerOut.first, true) ;
1822            listCouplerOut.erase(couplerOut.first) ;
1823            break ;
1824          }           
1825        }
1826        for(auto couplerIn : listCouplerIn) 
1827        {
1828          bool isNextCoupling ;
1829          if (rank==0) isNextCoupling = couplerManager->isNextCoupling(couplerIn.first,this->getContextId());
1830          MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 
1831          if (isNextCoupling) 
1832          {
1833            addCouplingChanel(couplerIn.first, false) ;
1834            listCouplerIn.erase(couplerIn.first) ;
1835            break ;
1836          }           
1837        }
1838
1839      } while (!listCouplerOut.empty() || !listCouplerIn.empty()) ;
1840
1841   }
1842   CATCH_DUMP_ATTR
1843
1844 
1845     //! Client side: Send infomation of active files (files are enabled to write out)
1846   void CContext::sendEnabledFiles(const std::vector<CFile*>& activeFiles)
1847   TRY
1848   {
1849     int size = activeFiles.size();
1850
1851     // In a context, each type has a root definition, e.g: axis, domain, field.
1852     // Every object must be a child of one of these root definition. In this case
1853     // all new file objects created on server must be children of the root "file_definition"
1854     StdString fileDefRoot("file_definition");
1855     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
1856
1857     for (int i = 0; i < size; ++i)
1858     {
1859       CFile* f = activeFiles[i];
1860       cfgrpPtr->sendCreateChild(f->getId(),f->getContextClient());
1861       f->sendAllAttributesToServer(f->getContextClient());
1862       f->sendAddAllVariables(f->getContextClient());
1863     }
1864   }
1865   CATCH_DUMP_ATTR
1866
1867   //! Client side: Send information of active fields (ones are written onto files)
1868   void CContext::sendEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)
1869   TRY
1870   {
1871     int size = activeFiles.size();
1872     for (int i = 0; i < size; ++i)
1873     {
1874       activeFiles[i]->sendEnabledFields(activeFiles[i]->getContextClient());
1875     }
1876   }
1877   CATCH_DUMP_ATTR
1878
1879 
1880   //! Client side: Prepare the timeseries by adding the necessary files
1881   void CContext::prepareTimeseries()
1882   TRY
1883   {
1884     const std::vector<CFile*> allFiles = CFile::getAll();
1885     for (size_t i = 0; i < allFiles.size(); i++)
1886     {
1887       CFile* file = allFiles[i];
1888
1889       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1890       for (size_t k = 0; k < vars.size(); k++)
1891       {
1892         CVariable* var = vars[k];
1893
1894         if (var->ts_target.isEmpty()
1895              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1896           fileVars.push_back(var);
1897
1898         if (!var->ts_target.isEmpty()
1899              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1900           fieldVars.push_back(var);
1901       }
1902
1903       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1904       {
1905         StdString fileNameStr("%file_name%") ;
1906         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1907         
1908         StdString fileName=file->getFileOutputName();
1909         size_t pos=tsPrefix.find(fileNameStr) ;
1910         while (pos!=std::string::npos)
1911         {
1912           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1913           pos=tsPrefix.find(fileNameStr) ;
1914         }
1915       
1916         const std::vector<CField*> allFields = file->getAllFields();
1917         for (size_t j = 0; j < allFields.size(); j++)
1918         {
1919           CField* field = allFields[j];
1920
1921           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1922           {
1923             CFile* tsFile = CFile::create();
1924             tsFile->duplicateAttributes(file);
1925
1926             // Add variables originating from file and targeted to timeserie file
1927             for (size_t k = 0; k < fileVars.size(); k++)
1928               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1929
1930           
1931             tsFile->name = tsPrefix + "_";
1932             if (!field->name.isEmpty())
1933               tsFile->name.get() += field->name;
1934             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1935               tsFile->name.get() += field->field_ref;
1936             else
1937               tsFile->name.get() += field->getId();
1938
1939             if (!field->ts_split_freq.isEmpty())
1940               tsFile->split_freq = field->ts_split_freq;
1941
1942             CField* tsField = tsFile->addField();
1943             tsField->field_ref = field->getId();
1944
1945             // Add variables originating from file and targeted to timeserie field
1946             for (size_t k = 0; k < fieldVars.size(); k++)
1947               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1948
1949             vars = field->getAllVariables();
1950             for (size_t k = 0; k < vars.size(); k++)
1951             {
1952               CVariable* var = vars[k];
1953
1954               // Add variables originating from field and targeted to timeserie field
1955               if (var->ts_target.isEmpty()
1956                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1957                 tsField->getVirtualVariableGroup()->addChild(var);
1958
1959               // Add variables originating from field and targeted to timeserie file
1960               if (!var->ts_target.isEmpty()
1961                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1962                 tsFile->getVirtualVariableGroup()->addChild(var);
1963             }
1964
1965             tsFile->solveFieldRefInheritance(true);
1966
1967             if (file->timeseries == CFile::timeseries_attr::exclusive)
1968               field->enabled = false;
1969           }
1970         }
1971
1972         // Finally disable the original file is need be
1973         if (file->timeseries == CFile::timeseries_attr::only)
1974          file->enabled = false;
1975       }
1976     }
1977   }
1978   CATCH_DUMP_ATTR
1979
1980 
1981   //! Client side: Send information of reference domain, axis and scalar of active fields
1982   void CContext::sendRefDomainsAxisScalars(const std::vector<CFile*>& activeFiles)
1983   TRY
1984   {
1985     std::set<pair<StdString,CContextClient*>> domainIds, axisIds, scalarIds;
1986
1987     // Find all reference domain and axis of all active fields
1988     int numEnabledFiles = activeFiles.size();
1989     for (int i = 0; i < numEnabledFiles; ++i)
1990     {
1991       std::vector<CField*> enabledFields = activeFiles[i]->getEnabledFields();
1992       int numEnabledFields = enabledFields.size();
1993       for (int j = 0; j < numEnabledFields; ++j)
1994       {
1995         CContextClient* contextClient=enabledFields[j]->getContextClient() ;
1996         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1997         if ("" != prDomAxisScalarId[0]) domainIds.insert(make_pair(prDomAxisScalarId[0],contextClient));
1998         if ("" != prDomAxisScalarId[1]) axisIds.insert(make_pair(prDomAxisScalarId[1],contextClient));
1999         if ("" != prDomAxisScalarId[2]) scalarIds.insert(make_pair(prDomAxisScalarId[2],contextClient));
2000       }
2001     }
2002
2003     // Create all reference axis on server side
2004     std::set<StdString>::iterator itDom, itAxis, itScalar;
2005     std::set<StdString>::const_iterator itE;
2006
2007     StdString scalarDefRoot("scalar_definition");
2008     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
2009     
2010     for (auto itScalar = scalarIds.begin(); itScalar != scalarIds.end(); ++itScalar)
2011     {
2012       if (!itScalar->first.empty())
2013       {
2014         scalarPtr->sendCreateChild(itScalar->first,itScalar->second);
2015         CScalar::get(itScalar->first)->sendAllAttributesToServer(itScalar->second);
2016       }
2017     }
2018
2019     StdString axiDefRoot("axis_definition");
2020     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
2021     
2022     for (auto itAxis = axisIds.begin(); itAxis != axisIds.end(); ++itAxis)
2023     {
2024       if (!itAxis->first.empty())
2025       {
2026         axisPtr->sendCreateChild(itAxis->first, itAxis->second);
2027         CAxis::get(itAxis->first)->sendAllAttributesToServer(itAxis->second);
2028       }
2029     }
2030
2031     // Create all reference domains on server side
2032     StdString domDefRoot("domain_definition");
2033     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
2034     
2035     for (auto itDom = domainIds.begin(); itDom != domainIds.end(); ++itDom)
2036     {
2037       if (!itDom->first.empty()) {
2038          domPtr->sendCreateChild(itDom->first, itDom->second);
2039          CDomain::get(itDom->first)->sendAllAttributesToServer(itDom->second);
2040       }
2041     }
2042   }
2043   CATCH_DUMP_ATTR
2044
2045   void CContext::triggerLateFields(void)
2046   TRY
2047   {
2048    for(auto& field : fileInFields_) field->triggerLateField() ;
2049    for(auto& field : couplerInFields_) field->triggerLateField() ;
2050   }
2051   CATCH_DUMP_ATTR
2052
2053   //! Update calendar in each time step
2054   void CContext::updateCalendar(int step)
2055   TRY
2056   {
2057      int prevStep = calendar->getStep();
2058
2059      if (prevStep < step)
2060      {
2061        if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data
2062        {
2063          triggerLateFields();
2064        }
2065
2066        info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
2067        calendar->update(step);
2068        info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
2069  #ifdef XIOS_MEMTRACK_LIGHT
2070        info(50) << " Current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ;
2071  #endif
2072
2073        if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data
2074        {
2075          doPostTimestepOperationsForEnabledReadModeFiles();
2076          garbageCollector.invalidate(calendar->getCurrentDate());
2077        }
2078      }
2079      else if (prevStep == step)
2080        info(50) << "updateCalendar: already at step " << step << ", no operation done." << endl;
2081      else // if (prevStep > step)
2082        ERROR("void CContext::updateCalendar(int step)",
2083              << "Illegal calendar update: previous step was " << prevStep << ", new step " << step << "is in the past!")
2084   }
2085   CATCH_DUMP_ATTR
2086
2087   void CContext::initReadFiles(void)
2088   TRY
2089   {
2090      vector<CFile*>::const_iterator it;
2091
2092      for (it=enabledReadModeFiles.begin(); it != enabledReadModeFiles.end(); it++)
2093      {
2094         (*it)->initRead();
2095      }
2096   }
2097   CATCH_DUMP_ATTR
2098
2099   //! Server side: Create header of netcdf file
2100   void CContext::createFileHeader(void)
2101   TRY
2102   {
2103      vector<CFile*>::const_iterator it;
2104
2105      //for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
2106      for (it=enabledWriteModeFiles.begin(); it != enabledWriteModeFiles.end(); it++)
2107      {
2108         (*it)->initWrite();
2109      }
2110   }
2111   CATCH_DUMP_ATTR
2112
2113   //! Get current context
2114   CContext* CContext::getCurrent(void)
2115   TRY
2116   {
2117     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
2118   }
2119   CATCH
2120
2121   /*!
2122   \brief Set context with an id be the current context
2123   \param [in] id identity of context to be set to current
2124   */
2125   void CContext::setCurrent(const string& id)
2126   TRY
2127   {
2128     CObjectFactory::SetCurrentContextId(id);
2129     CGroupFactory::SetCurrentContextId(id);
2130   }
2131   CATCH
2132
2133  /*!
2134  \brief Create a context with specific id
2135  \param [in] id identity of new context
2136  \return pointer to the new context or already-existed one with identity id
2137  */
2138  CContext* CContext::create(const StdString& id)
2139  TRY
2140  {
2141    CContext::setCurrent(id);
2142
2143    bool hasctxt = CContext::has(id);
2144    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
2145    getRoot();
2146    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
2147
2148#define DECLARE_NODE(Name_, name_) \
2149    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
2150#define DECLARE_NODE_PAR(Name_, name_)
2151#include "node_type.conf"
2152
2153    return (context);
2154  }
2155  CATCH
2156
2157 
2158  void CContext::sendFinalizeClient(CContextClient* contextClient, const string& contextClientId)
2159  TRY
2160  {
2161    CEventClient event(getType(),EVENT_ID_CONTEXT_FINALIZE_CLIENT);
2162    if (contextClient->isServerLeader())
2163    {
2164      CMessage msg;
2165      msg<<contextClientId ;
2166      const std::list<int>& ranks = contextClient->getRanksServerLeader();
2167      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
2168           event.push(*itRank,1,msg);
2169      contextClient->sendEvent(event);
2170    }
2171    else contextClient->sendEvent(event);
2172  }
2173  CATCH_DUMP_ATTR
2174
2175 
2176  void CContext::recvFinalizeClient(CEventServer& event)
2177  TRY
2178  {
2179    CBufferIn* buffer=event.subEvents.begin()->buffer;
2180    string id;
2181    *buffer>>id;
2182    get(id)->recvFinalizeClient(*buffer);
2183  }
2184  CATCH
2185
2186  void CContext::recvFinalizeClient(CBufferIn& buffer)
2187  TRY
2188  {
2189    countChildContextFinalized_++ ;
2190  }
2191  CATCH_DUMP_ATTR
2192
2193
2194
2195
2196 //! Client side: Send a message  announcing that context can receive grid definition from coupling
2197   void CContext::sendCouplerInReady(CContextClient* client)
2198   TRY
2199   {
2200      if (sendCouplerInReady_done_.count(client)!=0) return ;
2201      else sendCouplerInReady_done_.insert(client) ;
2202
2203      CEventClient event(getType(),EVENT_ID_COUPLER_IN_READY);
2204
2205      if (client->isServerLeader())
2206      {
2207        CMessage msg;
2208        msg<<this->getId();
2209        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2210        client->sendEvent(event);
2211      }
2212      else client->sendEvent(event);
2213   }
2214   CATCH_DUMP_ATTR
2215
2216   //! Server side: Receive a message announcing that context can send grid definition for context coupling
2217   void CContext::recvCouplerInReady(CEventServer& event)
2218   TRY
2219   {
2220      CBufferIn* buffer=event.subEvents.begin()->buffer;
2221      getCurrent()->recvCouplerInReady(*buffer);
2222   }
2223   CATCH
2224
2225   //! Server side: Receive a message announcing that context can send grid definition for context coupling
2226   void CContext::recvCouplerInReady(CBufferIn& buffer)
2227   TRY
2228   {
2229      string contextId ;
2230      buffer>>contextId;
2231      couplerInReady_.insert(getCouplerOutClient(contextId)) ;
2232   }
2233   CATCH_DUMP_ATTR
2234
2235
2236
2237
2238
2239 //! Client side: Send a message  announcing that a coupling context have done it closeDefinition, so data can be sent now.
2240   void CContext::sendCouplerInCloseDefinition(CContextClient* client)
2241   TRY
2242   {
2243      if (sendCouplerInCloseDefinition_done_.count(client)!=0) return ;
2244      else sendCouplerInCloseDefinition_done_.insert(client) ;
2245
2246      CEventClient event(getType(),EVENT_ID_COUPLER_IN_CLOSE_DEFINITION);
2247
2248      if (client->isServerLeader())
2249      {
2250        CMessage msg;
2251        msg<<this->getId();
2252        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2253        client->sendEvent(event);
2254      }
2255      else client->sendEvent(event);
2256   }
2257   CATCH_DUMP_ATTR
2258
2259   //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now.
2260   void CContext::recvCouplerInCloseDefinition(CEventServer& event)
2261   TRY
2262   {
2263      CBufferIn* buffer=event.subEvents.begin()->buffer;
2264      getCurrent()->recvCouplerInCloseDefinition(*buffer);
2265   }
2266   CATCH
2267
2268   //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now.
2269   void CContext::recvCouplerInCloseDefinition(CBufferIn& buffer)
2270   TRY
2271   {
2272      string contextId ;
2273      buffer>>contextId;
2274      couplerInCloseDefinition_.insert(getCouplerOutClient(contextId)) ;
2275   }
2276   CATCH_DUMP_ATTR
2277
2278
2279
2280
2281//! Client side: Send a message  announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2282   void CContext::sendCouplerInContextFinalized(CContextClient* client)
2283   TRY
2284   {
2285      if (sendCouplerInContextFinalized_done_.count(client)!=0) return ;
2286      else sendCouplerInContextFinalized_done_.insert(client) ;
2287
2288      CEventClient event(getType(),EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED);
2289
2290      if (client->isServerLeader())
2291      {
2292        CMessage msg;
2293        msg<<this->getId();
2294        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2295        client->sendEvent(event);
2296      }
2297      else client->sendEvent(event);
2298   }
2299   CATCH_DUMP_ATTR
2300
2301   //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2302   void CContext::recvCouplerInContextFinalized(CEventServer& event)
2303   TRY
2304   {
2305      CBufferIn* buffer=event.subEvents.begin()->buffer;
2306      getCurrent()->recvCouplerInContextFinalized(*buffer);
2307   }
2308   CATCH
2309
2310   //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2311   void CContext::recvCouplerInContextFinalized(CBufferIn& buffer)
2312   TRY
2313   {
2314      string contextId ;
2315      buffer>>contextId;
2316      couplerInContextFinalized_.insert(getCouplerOutClient(contextId)) ;
2317   }
2318   CATCH_DUMP_ATTR
2319
2320
2321
2322
2323  /*!
2324  * \fn bool CContext::isFinalized(void)
2325  * Context is finalized if it received context post finalize event.
2326  */
2327  bool CContext::isFinalized(void)
2328  TRY
2329  {
2330    return finalized;
2331  }
2332  CATCH_DUMP_ATTR
2333  ///--------------------------------------------------------------
2334  StdString CContext::dumpClassAttributes(void)
2335  {
2336    StdString str;
2337    str.append("enabled files=\"");
2338    int size = this->enabledFiles.size();
2339    for (int i = 0; i < size; ++i)
2340    {
2341      str.append(enabledFiles[i]->getId());
2342      str.append(" ");
2343    }
2344    str.append("\"");
2345    return str;
2346  }
2347
2348} // namespace xios
Note: See TracBrowser for help on using the repository browser.