source: XIOS/trunk/src/node/context.cpp @ 708

Last change on this file since 708 was 697, checked in by ymipsl, 9 years ago

Implement registryIn and registryOut functionnalities.
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 34.7 KB
RevLine 
[219]1#include "context.hpp"
[352]2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
[219]5
6#include "calendar_type.hpp"
[278]7#include "duration.hpp"
[219]8
[300]9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
[346]12#include "node_type.hpp"
[352]13#include "message.hpp"
14#include "type.hpp"
[591]15#include "xios_spl.hpp"
[219]16
[697]17
[335]18namespace xios {
[509]19
[549]20  shared_ptr<CContextGroup> CContext::root;
[509]21
[219]22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
[597]26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , dataSize_(), idServer_(), client(0), server(0)
[219]28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
[597]32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , dataSize_(), idServer_(), client(0), server(0)
[219]34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
[509]37   {
[597]38     delete client;
39     delete server;
[300]40   }
[219]41
42   //----------------------------------------------------------------
[509]43   //! Get name of context
[219]44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
[640]49
[509]50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
[347]54   CContextGroup* CContext::getRoot(void)
[509]55   {
[549]56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
[509]57      return root.get();
[219]58   }
59
[640]60   //----------------------------------------------------------------
[509]61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
[343]66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
[219]67   {
68      return (this->calendar);
69   }
[509]70
[219]71   //----------------------------------------------------------------
[640]72
[509]73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
[343]77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
[219]78   {
79      this->calendar = newCalendar;
80   }
[509]81
[219]82   //----------------------------------------------------------------
[509]83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
[219]87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
[278]92      xml::THashAttributes attributes = node.getAttributes();
[219]93
[278]94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
[462]97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
[278]100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
[462]103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
[278]104      }
105
[219]106      if (node.getElementName().compare(CContext::GetName()))
[421]107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
[219]108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
[509]122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
[219]126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
[549]129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
[219]130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
[286]132
[421]133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
[219]136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
[509]144   //! Show tree structure of context
[219]145   void CContext::ShowTree(StdOStream & out)
146   {
[549]147      StdString currentContextId = CContext::getCurrent() -> getId();
[347]148      std::vector<CContext*> def_vector =
[346]149         CContext::getRoot()->getChildList();
[347]150      std::vector<CContext*>::iterator
[219]151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
[509]155
[219]156      for (; it != end; it++)
157      {
[509]158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
[219]160         out << *context << std::endl;
161      }
[509]162
[219]163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
[509]164      CContext::setCurrent(currentContextId);
[219]165   }
[346]166
[509]167
[219]168   //----------------------------------------------------------------
169
[509]170   //! Convert context object into string (to print)
[219]171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
[346]185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
[219]187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
[509]199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
[445]205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
[219]206   {
207#define DECLARE_NODE(Name_, name_)    \
[354]208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
[445]209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
[219]210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
[509]216   //! Verify if all root definition in the context have child.
[219]217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
[346]221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
[219]222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
[549]231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
[219]232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
[509]236
237   //! Initialize client side
[597]238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
[300]239   {
[549]240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
[697]242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
[597]250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
[655]259       comms.push_back(intraCommServer);
[597]260       MPI_Comm_dup(interComm, &interCommServer);
[655]261       comms.push_back(interCommServer);
[597]262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
[509]264   }
[219]265
[509]266   void CContext::setClientServerBuffer()
267   {
[598]268     size_t bufferSizeMin = 10 * sizeof(size_t) * 1024;
[583]269#define DECLARE_NODE(Name_, name_)    \
[598]270     bufferSizeMin = (bufferSizeMin < sizeof(C##Name_##Definition)) ?  sizeof(C##Name_##Definition) : bufferSizeMin;
[583]271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
[598]273     std::map<int, StdSize> bufferSize = getDataSize();
274     if (bufferSize.empty())
275     {
276       if (client->isServerLeader())
[584]277       {
[598]278         const std::list<int>& ranks = client->getRanksServerLeader();
279         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
280           bufferSize[*itRank] = bufferSizeMin;
[584]281       }
[595]282       else
[598]283        return;
284     }
285     else
286     {
287       std::map<int, StdSize>::iterator it  = bufferSize.begin(),
288                                        ite = bufferSize.end();
289       for (; it != ite; ++it)
290         it->second = (it->second < bufferSizeMin) ? bufferSizeMin : it->second;
291     }
[584]292
[598]293     client->setBufferSize(bufferSize);
[509]294   }
295
296   //! Verify whether a context is initialized
[461]297   bool CContext::isInitialized(void)
298   {
[549]299     return hasClient;
[461]300   }
[509]301
302   //! Initialize server
[597]303   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
[300]304   {
[549]305     hasServer=true;
306     server = new CContextServer(this,intraComm,interComm);
[697]307
308     registryIn=new CRegistry(intraComm);
309     registryIn->setPath(getId()) ;
310     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
311     registryIn->bcastRegistry() ;
312     registryOut=new CRegistry(intraComm) ;
313     registryOut->setPath(getId()) ;
314 
[597]315     MPI_Comm intraCommClient, interCommClient;
316     if (cxtClient) // Attached mode
317     {
318       intraCommClient = intraComm;
319       interCommClient = interComm;
320     }
321     else
322     {
323       MPI_Comm_dup(intraComm, &intraCommClient);
[655]324       comms.push_back(intraCommClient);
[597]325       MPI_Comm_dup(interComm, &interCommClient);
[655]326       comms.push_back(interCommClient);
[597]327     }
328     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
[509]329   }
[300]330
[509]331   //! Server side: Put server into a loop in order to listen message from client
[300]332   bool CContext::eventLoop(void)
333   {
[549]334     return server->eventLoop();
[509]335   }
336
[597]337   //! Try to send the buffers and receive possible answers
338   bool CContext::checkBuffersAndListen(void)
339   {
340     client->checkBuffers();
341     return server->eventLoop();
342   }
343
[509]344   //! Terminate a context
[300]345   void CContext::finalize(void)
346   {
[597]347      if (!finalized)
[300]348      {
[597]349        finalized = true;
[697]350        if (hasClient) sendRegistry() ;
[597]351        client->finalize();
352        while (!server->hasFinished())
353        {
354          server->eventLoop();
355        }
356
357        if (hasServer)
358        {
359          closeAllFile();
[697]360          registryOut->hierarchicalGatherRegistry() ;
361          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
[597]362        }
[697]363       
[655]364        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
365          MPI_Comm_free(&(*it));
366        comms.clear();
[300]367      }
368   }
[509]369
370   /*!
371   \brief Close all the context defintion and do processing data
372      After everything is well defined on client side, they will be processed and sent to server
373   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
374   all necessary information to server, from which each server can build its own database.
375   Because the role of server is to write out field data on a specific netcdf file,
376   the only information that it needs is the enabled files
377   and the active fields (fields will be written onto active files)
378   */
[300]379   void CContext::closeDefinition(void)
380   {
[584]381     // There is nothing client need to send to server
[510]382     if (hasClient)
[509]383     {
384       // After xml is parsed, there are some more works with post processing
385       postProcessing();
[510]386     }
[598]387     setClientServerBuffer();
[300]388
[510]389     if (hasClient && !hasServer)
390     {
[509]391      // Send all attributes of current context to server
392      this->sendAllAttributesToServer();
[459]393
[549]394      // Send all attributes of current calendar
395      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
396
[509]397      // We have enough information to send to server
398      // First of all, send all enabled files
399       sendEnabledFiles();
400
401      // Then, send all enabled fields
402       sendEnabledFields();
403
[540]404      // At last, we have all info of domain and axis, then send them
[569]405       sendRefDomainsAxis();
[540]406
[509]407      // After that, send all grid (if any)
408       sendRefGrid();
409    }
410
411    // We have a xml tree on the server side and now, it should be also processed
412    if (hasClient && !hasServer) sendPostProcessing();
413
414    // There are some processings that should be done after all of above. For example: check mask or index
[510]415    if (hasClient)
[509]416    {
[640]417      this->buildFilterGraphOfEnabledFields();
418      buildFilterGraphOfFieldsWithReadAccess();
[509]419      this->solveAllRefOfEnabledFields(true);
420    }
421
[598]422    // Now tell server that it can process all messages from client
423    if (hasClient && !hasServer) this->sendCloseDefinition();
[510]424
[584]425    // Nettoyage de l'arborescence
426    if (hasClient && !hasServer) CleanTree(); // Only on client side??
[510]427
[598]428    if (hasClient)
429    {
430      sendCreateFileHeader();
431
432      startPrefetchingOfEnabledReadModeFiles();
433    }
[300]434   }
[509]435
[300]436   void CContext::findAllEnabledFields(void)
437   {
438     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
439     (void)this->enabledFiles[i]->getEnabledFields();
440   }
441
[509]442   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
[300]443   {
[509]444     int size = this->enabledFiles.size();
445     for (int i = 0; i < size; ++i)
446     {
447       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
448     }
[300]449   }
450
[640]451   void CContext::buildFilterGraphOfEnabledFields()
452   {
453     int size = this->enabledFiles.size();
454     for (int i = 0; i < size; ++i)
455     {
456       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
457     }
458   }
459
[598]460   void CContext::startPrefetchingOfEnabledReadModeFiles()
461   {
462     int size = enabledReadModeFiles.size();
463     for (int i = 0; i < size; ++i)
464     {
465        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
466     }
467   }
468
469   void CContext::checkPrefetchingOfEnabledReadModeFiles()
470   {
471     int size = enabledReadModeFiles.size();
472     for (int i = 0; i < size; ++i)
473     {
474        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
475     }
476   }
477
[593]478  void CContext::findFieldsWithReadAccess(void)
479  {
480    fieldsWithReadAccess.clear();
481    const vector<CField*> allFields = CField::getAll();
482    for (size_t i = 0; i < allFields.size(); ++i)
483    {
[598]484      if (allFields[i]->file && !allFields[i]->file->mode.isEmpty() && allFields[i]->file->mode.getValue() == CFile::mode_attr::read)
485        allFields[i]->read_access = true;
[593]486      if (!allFields[i]->read_access.isEmpty() && allFields[i]->read_access.getValue())
487        fieldsWithReadAccess.push_back(allFields[i]);
488    }
489  }
490
491  void CContext::solveAllRefOfFieldsWithReadAccess()
492  {
493    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
494      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
495  }
496
[640]497  void CContext::buildFilterGraphOfFieldsWithReadAccess()
498  {
499    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
500      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
501  }
502
[445]503   void CContext::solveAllInheritance(bool apply)
[300]504   {
505     // Résolution des héritages descendants (càd des héritages de groupes)
506     // pour chacun des contextes.
[445]507      solveDescInheritance(apply);
[300]508
509     // Résolution des héritages par référence au niveau des fichiers.
[549]510      const vector<CFile*> allFiles=CFile::getAll();
[540]511      const vector<CGrid*> allGrids= CGrid::getAll();
[300]512
[510]513     //if (hasClient && !hasServer)
514      if (hasClient)
[540]515      {
[510]516        for (unsigned int i = 0; i < allFiles.size(); i++)
517          allFiles[i]->solveFieldRefInheritance(apply);
[540]518      }
519
520      unsigned int vecSize = allGrids.size();
521      unsigned int i = 0;
522      for (i = 0; i < vecSize; ++i)
523        allGrids[i]->solveDomainAxisRefInheritance(apply);
524
[300]525   }
526
527   void CContext::findEnabledFiles(void)
528   {
[347]529      const std::vector<CFile*> allFiles = CFile::getAll();
[300]530
531      for (unsigned int i = 0; i < allFiles.size(); i++)
532         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
[430]533         {
[300]534            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
535               enabledFiles.push_back(allFiles[i]);
[430]536         }
537         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
[300]538
[509]539
[300]540      if (enabledFiles.size() == 0)
541         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
542               << getId() << "\" !");
543   }
544
[598]545   void CContext::findEnabledReadModeFiles(void)
546   {
547     int size = this->enabledFiles.size();
548     for (int i = 0; i < size; ++i)
549     {
550       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
551        enabledReadModeFiles.push_back(enabledFiles[i]);
552     }
553   }
554
[300]555   void CContext::closeAllFile(void)
556   {
[347]557     std::vector<CFile*>::const_iterator
[300]558            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
[509]559
[300]560     for (; it != end; it++)
561     {
562       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
563       (*it)->close();
564     }
565   }
[509]566
567   /*!
568   \brief Dispatch event received from client
569      Whenever a message is received in buffer of server, it will be processed depending on
570   its event type. A new event type should be added in the switch list to make sure
571   it processed on server side.
572   \param [in] event: Received message
573   */
[300]574   bool CContext::dispatchEvent(CEventServer& event)
575   {
[509]576
[549]577      if (SuperClass::dispatchEvent(event)) return true;
[300]578      else
579      {
580        switch(event.type)
581        {
582           case EVENT_ID_CLOSE_DEFINITION :
[549]583             recvCloseDefinition(event);
584             return true;
585             break;
[584]586           case EVENT_ID_UPDATE_CALENDAR:
[549]587             recvUpdateCalendar(event);
588             return true;
589             break;
[300]590           case EVENT_ID_CREATE_FILE_HEADER :
[549]591             recvCreateFileHeader(event);
592             return true;
593             break;
[509]594           case EVENT_ID_POST_PROCESS:
[549]595             recvPostProcessing(event);
596             return true;
[697]597            case EVENT_ID_SEND_REGISTRY:
598             recvRegistry(event);
599             return true;
600            break;
[584]601
[300]602           default :
603             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
[549]604                    <<"Unknown Event");
605           return false;
[300]606         }
607      }
608   }
[509]609
610   //! Client side: Send a message to server to make it close
[300]611   void CContext::sendCloseDefinition(void)
612   {
[549]613     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
[300]614     if (client->isServerLeader())
615     {
[549]616       CMessage msg;
617       msg<<this->getIdServer();
[595]618       const std::list<int>& ranks = client->getRanksServerLeader();
619       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
620         event.push(*itRank,1,msg);
[549]621       client->sendEvent(event);
[300]622     }
[549]623     else client->sendEvent(event);
[300]624   }
[509]625
626   //! Server side: Receive a message of client announcing a context close
[300]627   void CContext::recvCloseDefinition(CEventServer& event)
628   {
[509]629
[300]630      CBufferIn* buffer=event.subEvents.begin()->buffer;
631      string id;
[549]632      *buffer>>id;
[509]633      get(id)->closeDefinition();
[300]634   }
[509]635
636   //! Client side: Send a message to update calendar in each time step
[300]637   void CContext::sendUpdateCalendar(int step)
638   {
639     if (!hasServer)
640     {
[549]641       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
[300]642       if (client->isServerLeader())
643       {
[549]644         CMessage msg;
645         msg<<this->getIdServer()<<step;
[595]646         const std::list<int>& ranks = client->getRanksServerLeader();
647         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
648           event.push(*itRank,1,msg);
[549]649         client->sendEvent(event);
[300]650       }
[549]651       else client->sendEvent(event);
[300]652     }
653   }
[509]654
655   //! Server side: Receive a message of client annoucing calendar update
[300]656   void CContext::recvUpdateCalendar(CEventServer& event)
657   {
658      CBufferIn* buffer=event.subEvents.begin()->buffer;
659      string id;
[549]660      *buffer>>id;
661      get(id)->recvUpdateCalendar(*buffer);
[300]662   }
[509]663
664   //! Server side: Receive a message of client annoucing calendar update
[300]665   void CContext::recvUpdateCalendar(CBufferIn& buffer)
666   {
[549]667      int step;
668      buffer>>step;
669      updateCalendar(step);
[300]670   }
[509]671
672   //! Client side: Send a message to create header part of netcdf file
[300]673   void CContext::sendCreateFileHeader(void)
674   {
[549]675     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
[300]676     if (client->isServerLeader())
677     {
[549]678       CMessage msg;
679       msg<<this->getIdServer();
[595]680       const std::list<int>& ranks = client->getRanksServerLeader();
681       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
682         event.push(*itRank,1,msg) ;
[549]683       client->sendEvent(event);
[300]684     }
[549]685     else client->sendEvent(event);
[300]686   }
[509]687
688   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
[300]689   void CContext::recvCreateFileHeader(CEventServer& event)
690   {
691      CBufferIn* buffer=event.subEvents.begin()->buffer;
692      string id;
[549]693      *buffer>>id;
694      get(id)->recvCreateFileHeader(*buffer);
[300]695   }
[509]696
697   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
[300]698   void CContext::recvCreateFileHeader(CBufferIn& buffer)
699   {
[549]700      createFileHeader();
[300]701   }
[509]702
703   //! Client side: Send a message to do some post processing on server
704   void CContext::sendPostProcessing()
705   {
706     if (!hasServer)
707     {
[549]708       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
[509]709       if (client->isServerLeader())
710       {
[549]711         CMessage msg;
[511]712         msg<<this->getIdServer();
[595]713         const std::list<int>& ranks = client->getRanksServerLeader();
714         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
715           event.push(*itRank,1,msg);
[549]716         client->sendEvent(event);
[509]717       }
[549]718       else client->sendEvent(event);
[509]719     }
720   }
721
722   //! Server side: Receive a message to do some post processing
723   void CContext::recvPostProcessing(CEventServer& event)
724   {
725      CBufferIn* buffer=event.subEvents.begin()->buffer;
726      string id;
727      *buffer>>id;
728      get(id)->recvPostProcessing(*buffer);
729   }
730
731   //! Server side: Receive a message to do some post processing
732   void CContext::recvPostProcessing(CBufferIn& buffer)
733   {
[549]734      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
[509]735      postProcessing();
736   }
737
[511]738   const StdString& CContext::getIdServer()
739   {
740      if (hasClient)
741      {
742        idServer_ = this->getId();
743        idServer_ += "_server";
744        return idServer_;
745      }
746      if (hasServer) return (this->getId());
747   }
748
[509]749   /*!
750   \brief Do some simple post processings after parsing xml file
751      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
752   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
753   which will be written out into netcdf files, are processed
754   */
755   void CContext::postProcessing()
756   {
757     if (isPostProcessed) return;
758
[549]759      // Make sure the calendar was correctly created
760      if (!calendar)
761        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
762      else if (calendar->getTimeStep() == NoneDu)
763        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
[550]764      // Calendar first update to set the current date equals to the start date
765      calendar->update(0);
[509]766
767      // Find all inheritance in xml structure
768      this->solveAllInheritance();
769
[676]770      // Check if some axis, domains or grids are eligible to for compressed indexed output.
771      // Warning: This must be done after solving the inheritance and before the rest of post-processing
772      checkAxisDomainsGridsEligibilityForCompressedOutput();
773
[509]774      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
775      this->findEnabledFiles();
[598]776      this->findEnabledReadModeFiles();
[509]777
778      // Find all enabled fields of each file
779      this->findAllEnabledFields();
780
781      // Search and rebuild all reference object of enabled fields
782      this->solveAllRefOfEnabledFields(false);
[593]783
784      // Find all fields with read access from the public API
785      findFieldsWithReadAccess();
786      // and solve the all reference for them
787      solveAllRefOfFieldsWithReadAccess();
788
[509]789      isPostProcessed = true;
790   }
791
792   std::map<int, StdSize>& CContext::getDataSize()
793   {
[598]794     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
795
[567]796     // Set of grid used by enabled fields
797     std::set<StdString> usedGrid;
[509]798
799     // Find all reference domain and axis of all active fields
800     int numEnabledFiles = this->enabledFiles.size();
801     for (int i = 0; i < numEnabledFiles; ++i)
802     {
[598]803       CFile* file = this->enabledFiles[i];
804       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
805
806       if (fileMode == mode)
[509]807       {
[598]808         std::vector<CField*> enabledFields = file->getEnabledFields();
809         int numEnabledFields = enabledFields.size();
810         for (int j = 0; j < numEnabledFields; ++j)
[509]811         {
[598]812           StdString currentGrid = enabledFields[j]->grid->getId();
813           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataSize();
814           if (dataSize_.empty())
[509]815           {
[598]816             dataSize_ = mapSize;
817             usedGrid.insert(currentGrid);
818           }
819           else
[511]820           {
[598]821             std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
822             if (usedGrid.find(currentGrid) == usedGrid.end())
[511]823             {
[598]824               for (; it != itE; ++it)
825               {
826                 if (0 < dataSize_.count(it->first)) dataSize_[it->first] += it->second;
827                 else dataSize_.insert(make_pair(it->first, it->second));
828               }
829             } else
830             {
831               for (; it != itE; ++it)
832               {
833                 if (0 < dataSize_.count(it->first))
834                  if (CXios::isOptPerformance) dataSize_[it->first] += it->second;
835                  else
836                  {
837                    if (dataSize_[it->first] < it->second) dataSize_[it->first] = it->second;
838                  }
839                 else dataSize_.insert(make_pair(it->first, it->second));
840               }
[511]841             }
[509]842           }
843         }
844       }
845     }
846
847     return dataSize_;
848   }
849
850   //! Client side: Send infomation of active files (files are enabled to write out)
851   void CContext::sendEnabledFiles()
852   {
853     int size = this->enabledFiles.size();
854
855     // In a context, each type has a root definition, e.g: axis, domain, field.
856     // Every object must be a child of one of these root definition. In this case
857     // all new file objects created on server must be children of the root "file_definition"
858     StdString fileDefRoot("file_definition");
859     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
860
861     for (int i = 0; i < size; ++i)
862     {
863       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
864       this->enabledFiles[i]->sendAllAttributesToServer();
865       this->enabledFiles[i]->sendAddAllVariables();
866     }
867   }
868
869   //! Client side: Send information of active fields (ones are written onto files)
870   void CContext::sendEnabledFields()
871   {
872     int size = this->enabledFiles.size();
873     for (int i = 0; i < size; ++i)
874     {
875       this->enabledFiles[i]->sendEnabledFields();
876     }
877   }
878
[676]879   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
880   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
881   {
882     if (!hasClient) return;
883
884     const vector<CAxis*> allAxis = CAxis::getAll();
885     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
886       (*it)->checkEligibilityForCompressedOutput();
887
888     const vector<CDomain*> allDomains = CDomain::getAll();
889     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
890       (*it)->checkEligibilityForCompressedOutput();
891
892     const vector<CGrid*> allGrids = CGrid::getAll();
893     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
894       (*it)->checkEligibilityForCompressedOutput();
895   }
896
[509]897   //! Client side: Send information of reference grid of active fields
898   void CContext::sendRefGrid()
899   {
900     std::set<StdString> gridIds;
901     int sizeFile = this->enabledFiles.size();
902     CFile* filePtr(NULL);
903
904     // Firstly, find all reference grids of all active fields
905     for (int i = 0; i < sizeFile; ++i)
906     {
907       filePtr = this->enabledFiles[i];
908       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
909       int sizeField = enabledFields.size();
910       for (int numField = 0; numField < sizeField; ++numField)
911       {
912         if (0 != enabledFields[numField]->getRelGrid())
913           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
914       }
915     }
916
917     // Create all reference grids on server side
918     StdString gridDefRoot("grid_definition");
919     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
920     std::set<StdString>::const_iterator it, itE = gridIds.end();
921     for (it = gridIds.begin(); it != itE; ++it)
922     {
923       gridPtr->sendCreateChild(*it);
924       CGrid::get(*it)->sendAllAttributesToServer();
[540]925       CGrid::get(*it)->sendAllDomains();
926       CGrid::get(*it)->sendAllAxis();
[509]927     }
928   }
929
930
931   //! Client side: Send information of reference domain and axis of active fields
[569]932   void CContext::sendRefDomainsAxis()
933   {
934     std::set<StdString> domainIds;
935     std::set<StdString> axisIds;
[509]936
[569]937     // Find all reference domain and axis of all active fields
938     int numEnabledFiles = this->enabledFiles.size();
939     for (int i = 0; i < numEnabledFiles; ++i)
940     {
941       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
942       int numEnabledFields = enabledFields.size();
943       for (int j = 0; j < numEnabledFields; ++j)
944       {
945         const std::pair<StdString, StdString>& prDomAxisId = enabledFields[j]->getRefDomainAxisIds();
[619]946         if ("" != prDomAxisId.first) domainIds.insert(prDomAxisId.first);
947         if ("" != prDomAxisId.second) axisIds.insert(prDomAxisId.second);
[569]948       }
949     }
950
951     // Create all reference axis on server side
952     std::set<StdString>::iterator itDom, itAxis;
953     std::set<StdString>::const_iterator itE;
954
955     StdString axiDefRoot("axis_definition");
956     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
957     itE = axisIds.end();
958     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
959     {
960       if (!itAxis->empty())
961       {
962         axisPtr->sendCreateChild(*itAxis);
963         CAxis::get(*itAxis)->sendAllAttributesToServer();
964       }
965     }
966
967     // Create all reference domains on server side
968     StdString domDefRoot("domain_definition");
969     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
970     itE = domainIds.end();
971     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
972     {
973       if (!itDom->empty()) {
974          domPtr->sendCreateChild(*itDom);
975          CDomain::get(*itDom)->sendAllAttributesToServer();
976       }
977     }
978   }
979
[509]980   //! Update calendar in each time step
[300]981   void CContext::updateCalendar(int step)
982   {
[639]983      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
[549]984      calendar->update(step);
[639]985      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
986
987      if (hasClient)
988      {
989        checkPrefetchingOfEnabledReadModeFiles();
990        garbageCollector.invalidate(calendar->getCurrentDate());
991      }
[300]992   }
[509]993
994   //! Server side: Create header of netcdf file
[300]995   void CContext::createFileHeader(void )
996   {
[549]997      vector<CFile*>::const_iterator it;
[509]998
[300]999      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1000      {
[321]1001         (*it)->initFile();
[300]1002      }
[509]1003   }
1004
1005   //! Get current context
[347]1006   CContext* CContext::getCurrent(void)
[300]1007   {
[549]1008     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
[300]1009   }
[509]1010
1011   /*!
1012   \brief Set context with an id be the current context
1013   \param [in] id identity of context to be set to current
1014   */
[346]1015   void CContext::setCurrent(const string& id)
1016   {
1017     CObjectFactory::SetCurrentContextId(id);
1018     CGroupFactory::SetCurrentContextId(id);
1019   }
[509]1020
1021  /*!
1022  \brief Create a context with specific id
1023  \param [in] id identity of new context
1024  \return pointer to the new context or already-existed one with identity id
1025  */
[347]1026  CContext* CContext::create(const StdString& id)
[346]1027  {
[549]1028    CContext::setCurrent(id);
[509]1029
[346]1030    bool hasctxt = CContext::has(id);
[347]1031    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
[549]1032    getRoot();
[347]1033    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
[346]1034
1035#define DECLARE_NODE(Name_, name_) \
1036    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1037#define DECLARE_NODE_PAR(Name_, name_)
1038#include "node_type.conf"
1039
1040    return (context);
1041  }
[697]1042
1043
1044
1045     //! Server side: Receive a message to do some post processing
1046  void CContext::recvRegistry(CEventServer& event)
1047  {
1048    CBufferIn* buffer=event.subEvents.begin()->buffer;
1049    string id;
1050    *buffer>>id;
1051    get(id)->recvRegistry(*buffer);
1052  }
1053
1054  void CContext::recvRegistry(CBufferIn& buffer)
1055  {
1056    if (server->intraCommRank==0)
1057    {
1058      CRegistry registry(server->intraComm) ;
1059      registry.fromBuffer(buffer) ;
1060      registryOut->mergeRegistry(registry) ;
1061    }
1062  }
1063
1064  void CContext::sendRegistry(void)
1065  {
1066    registryOut->hierarchicalGatherRegistry() ;
1067
1068    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1069    if (client->isServerLeader())
1070    {
1071       CMessage msg ;
1072       msg<<this->getIdServer();
1073       if (client->clientRank==0) msg<<*registryOut ;
1074       const std::list<int>& ranks = client->getRanksServerLeader();
1075       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1076         event.push(*itRank,1,msg);
1077       client->sendEvent(event);
1078     }
1079     else client->sendEvent(event);
1080  }
1081
[335]1082} // namespace xios
Note: See TracBrowser for help on using the repository browser.