source: XIOS/trunk/src/node/context.cpp @ 917

Last change on this file since 917 was 917, checked in by rlacroix, 5 years ago

Fix the client/server communication protocol.

In the some extreme cases a deadlock could occur. To fix this, the number of buffered events must be properly limited.

If you noticed decreased performance due to this commit, please let us know about it.

Fixes ticket #91.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 39.9 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t minBufferSize = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> maxEventSize;
277     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
278     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
279
280     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
281     for (it = dataBufferSize.begin(); it != ite; ++it)
282       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
283
284     ite = bufferSize.end();
285     for (it = bufferSize.begin(); it != ite; ++it)
286     {
287       it->second *= CXios::bufferSizeFactor;
288       if (it->second < minBufferSize) it->second = minBufferSize;
289     }
290
291     // We consider that the minimum buffer size is also the minimum event size
292     ite = maxEventSize.end();
293     for (it = maxEventSize.begin(); it != ite; ++it)
294       if (it->second < minBufferSize) it->second = minBufferSize;
295
296     if (client->isServerLeader())
297     {
298       const std::list<int>& ranks = client->getRanksServerLeader();
299       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
300         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
301     }
302
303     client->setBufferSize(bufferSize, maxEventSize);
304   }
305
306   //! Verify whether a context is initialized
307   bool CContext::isInitialized(void)
308   {
309     return hasClient;
310   }
311
312   //! Initialize server
313   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
314   {
315     hasServer=true;
316     server = new CContextServer(this,intraComm,interComm);
317
318     registryIn=new CRegistry(intraComm);
319     registryIn->setPath(getId()) ;
320     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
321     registryIn->bcastRegistry() ;
322     registryOut=new CRegistry(intraComm) ;
323     registryOut->setPath(getId()) ;
324
325     MPI_Comm intraCommClient, interCommClient;
326     if (cxtClient) // Attached mode
327     {
328       intraCommClient = intraComm;
329       interCommClient = interComm;
330     }
331     else
332     {
333       MPI_Comm_dup(intraComm, &intraCommClient);
334       comms.push_back(intraCommClient);
335       MPI_Comm_dup(interComm, &interCommClient);
336       comms.push_back(interCommClient);
337     }
338     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
339   }
340
341   //! Server side: Put server into a loop in order to listen message from client
342   bool CContext::eventLoop(void)
343   {
344     return server->eventLoop();
345   }
346
347   //! Try to send the buffers and receive possible answers
348   bool CContext::checkBuffersAndListen(void)
349   {
350     client->checkBuffers();
351     return server->eventLoop();
352   }
353
354   //! Terminate a context
355   void CContext::finalize(void)
356   {
357      if (!finalized)
358      {
359        finalized = true;
360        if (hasClient) sendRegistry() ;
361        client->finalize();
362        while (!server->hasFinished())
363        {
364          server->eventLoop();
365        }
366
367        if (hasServer)
368        {
369          closeAllFile();
370          registryOut->hierarchicalGatherRegistry() ;
371          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
372        }
373
374        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
375          MPI_Comm_free(&(*it));
376        comms.clear();
377      }
378   }
379
380   /*!
381   \brief Close all the context defintion and do processing data
382      After everything is well defined on client side, they will be processed and sent to server
383   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
384   all necessary information to server, from which each server can build its own database.
385   Because the role of server is to write out field data on a specific netcdf file,
386   the only information that it needs is the enabled files
387   and the active fields (fields will be written onto active files)
388   */
389   void CContext::closeDefinition(void)
390   {
391     // There is nothing client need to send to server
392     if (hasClient)
393     {
394       // After xml is parsed, there are some more works with post processing
395       postProcessing();
396     }
397     setClientServerBuffer();
398
399     if (hasClient && !hasServer)
400     {
401      // Send all attributes of current context to server
402      this->sendAllAttributesToServer();
403
404      // Send all attributes of current calendar
405      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
406
407      // We have enough information to send to server
408      // First of all, send all enabled files
409       sendEnabledFiles();
410
411      // Then, send all enabled fields
412       sendEnabledFields();
413
414      // At last, we have all info of domain and axis, then send them
415       sendRefDomainsAxis();
416
417      // After that, send all grid (if any)
418       sendRefGrid();
419    }
420
421    // We have a xml tree on the server side and now, it should be also processed
422    if (hasClient && !hasServer) sendPostProcessing();
423
424    // There are some processings that should be done after all of above. For example: check mask or index
425    if (hasClient)
426    {
427      this->buildFilterGraphOfEnabledFields();
428      buildFilterGraphOfFieldsWithReadAccess();
429      this->solveAllRefOfEnabledFields(true);
430    }
431
432    // Now tell server that it can process all messages from client
433    if (hasClient && !hasServer) this->sendCloseDefinition();
434
435    // Nettoyage de l'arborescence
436    if (hasClient && !hasServer) CleanTree(); // Only on client side??
437
438    if (hasClient)
439    {
440      sendCreateFileHeader();
441
442      startPrefetchingOfEnabledReadModeFiles();
443    }
444   }
445
446   void CContext::findAllEnabledFields(void)
447   {
448     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
449     (void)this->enabledFiles[i]->getEnabledFields();
450   }
451
452   void CContext::findAllEnabledFieldsInReadModeFiles(void)
453   {
454     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
455     (void)this->enabledReadModeFiles[i]->getEnabledFields();
456   }
457
458   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
459   {
460      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
461        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
462   }
463
464   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
465   {
466     int size = this->enabledFiles.size();
467     for (int i = 0; i < size; ++i)
468     {
469       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
470     }
471
472     for (int i = 0; i < size; ++i)
473     {
474       this->enabledFiles[i]->generateNewTransformationGridDest();
475     }
476   }
477
478   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
479   {
480     int size = this->enabledFiles.size();
481     for (int i = 0; i < size; ++i)
482     {
483       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
484     }
485   }
486
487   void CContext::buildFilterGraphOfEnabledFields()
488   {
489     int size = this->enabledFiles.size();
490     for (int i = 0; i < size; ++i)
491     {
492       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
493     }
494   }
495
496   void CContext::startPrefetchingOfEnabledReadModeFiles()
497   {
498     int size = enabledReadModeFiles.size();
499     for (int i = 0; i < size; ++i)
500     {
501        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
502     }
503   }
504
505   void CContext::checkPrefetchingOfEnabledReadModeFiles()
506   {
507     int size = enabledReadModeFiles.size();
508     for (int i = 0; i < size; ++i)
509     {
510        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
511     }
512   }
513
514  void CContext::findFieldsWithReadAccess(void)
515  {
516    fieldsWithReadAccess.clear();
517    const vector<CField*> allFields = CField::getAll();
518    for (size_t i = 0; i < allFields.size(); ++i)
519    {
520      CField* field = allFields[i];
521
522      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
523        field->read_access = true;
524      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
525        fieldsWithReadAccess.push_back(field);
526    }
527  }
528
529  void CContext::solveAllRefOfFieldsWithReadAccess()
530  {
531    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
532      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
533  }
534
535  void CContext::buildFilterGraphOfFieldsWithReadAccess()
536  {
537    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
538      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
539  }
540
541   void CContext::solveAllInheritance(bool apply)
542   {
543     // Résolution des héritages descendants (càd des héritages de groupes)
544     // pour chacun des contextes.
545      solveDescInheritance(apply);
546
547     // Résolution des héritages par référence au niveau des fichiers.
548      const vector<CFile*> allFiles=CFile::getAll();
549      const vector<CGrid*> allGrids= CGrid::getAll();
550
551     //if (hasClient && !hasServer)
552      if (hasClient)
553      {
554        for (unsigned int i = 0; i < allFiles.size(); i++)
555          allFiles[i]->solveFieldRefInheritance(apply);
556      }
557
558      unsigned int vecSize = allGrids.size();
559      unsigned int i = 0;
560      for (i = 0; i < vecSize; ++i)
561        allGrids[i]->solveDomainAxisRefInheritance(apply);
562
563   }
564
565   void CContext::findEnabledFiles(void)
566   {
567      const std::vector<CFile*> allFiles = CFile::getAll();
568
569      for (unsigned int i = 0; i < allFiles.size(); i++)
570         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
571         {
572            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
573               enabledFiles.push_back(allFiles[i]);
574         }
575         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
576
577
578      if (enabledFiles.size() == 0)
579         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
580               << getId() << "\" !");
581   }
582
583   void CContext::findEnabledReadModeFiles(void)
584   {
585     int size = this->enabledFiles.size();
586     for (int i = 0; i < size; ++i)
587     {
588       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
589        enabledReadModeFiles.push_back(enabledFiles[i]);
590     }
591   }
592
593   void CContext::closeAllFile(void)
594   {
595     std::vector<CFile*>::const_iterator
596            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
597
598     for (; it != end; it++)
599     {
600       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
601       (*it)->close();
602     }
603   }
604
605   /*!
606   \brief Dispatch event received from client
607      Whenever a message is received in buffer of server, it will be processed depending on
608   its event type. A new event type should be added in the switch list to make sure
609   it processed on server side.
610   \param [in] event: Received message
611   */
612   bool CContext::dispatchEvent(CEventServer& event)
613   {
614
615      if (SuperClass::dispatchEvent(event)) return true;
616      else
617      {
618        switch(event.type)
619        {
620           case EVENT_ID_CLOSE_DEFINITION :
621             recvCloseDefinition(event);
622             return true;
623             break;
624           case EVENT_ID_UPDATE_CALENDAR:
625             recvUpdateCalendar(event);
626             return true;
627             break;
628           case EVENT_ID_CREATE_FILE_HEADER :
629             recvCreateFileHeader(event);
630             return true;
631             break;
632           case EVENT_ID_POST_PROCESS:
633             recvPostProcessing(event);
634             return true;
635            case EVENT_ID_SEND_REGISTRY:
636             recvRegistry(event);
637             return true;
638            break;
639
640           default :
641             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
642                    <<"Unknown Event");
643           return false;
644         }
645      }
646   }
647
648   //! Client side: Send a message to server to make it close
649   void CContext::sendCloseDefinition(void)
650   {
651     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
652     if (client->isServerLeader())
653     {
654       CMessage msg;
655       msg<<this->getIdServer();
656       const std::list<int>& ranks = client->getRanksServerLeader();
657       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
658         event.push(*itRank,1,msg);
659       client->sendEvent(event);
660     }
661     else client->sendEvent(event);
662   }
663
664   //! Server side: Receive a message of client announcing a context close
665   void CContext::recvCloseDefinition(CEventServer& event)
666   {
667
668      CBufferIn* buffer=event.subEvents.begin()->buffer;
669      string id;
670      *buffer>>id;
671      get(id)->closeDefinition();
672   }
673
674   //! Client side: Send a message to update calendar in each time step
675   void CContext::sendUpdateCalendar(int step)
676   {
677     if (!hasServer)
678     {
679       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
680       if (client->isServerLeader())
681       {
682         CMessage msg;
683         msg<<this->getIdServer()<<step;
684         const std::list<int>& ranks = client->getRanksServerLeader();
685         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
686           event.push(*itRank,1,msg);
687         client->sendEvent(event);
688       }
689       else client->sendEvent(event);
690     }
691   }
692
693   //! Server side: Receive a message of client annoucing calendar update
694   void CContext::recvUpdateCalendar(CEventServer& event)
695   {
696      CBufferIn* buffer=event.subEvents.begin()->buffer;
697      string id;
698      *buffer>>id;
699      get(id)->recvUpdateCalendar(*buffer);
700   }
701
702   //! Server side: Receive a message of client annoucing calendar update
703   void CContext::recvUpdateCalendar(CBufferIn& buffer)
704   {
705      int step;
706      buffer>>step;
707      updateCalendar(step);
708   }
709
710   //! Client side: Send a message to create header part of netcdf file
711   void CContext::sendCreateFileHeader(void)
712   {
713     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
714     if (client->isServerLeader())
715     {
716       CMessage msg;
717       msg<<this->getIdServer();
718       const std::list<int>& ranks = client->getRanksServerLeader();
719       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
720         event.push(*itRank,1,msg) ;
721       client->sendEvent(event);
722     }
723     else client->sendEvent(event);
724   }
725
726   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
727   void CContext::recvCreateFileHeader(CEventServer& event)
728   {
729      CBufferIn* buffer=event.subEvents.begin()->buffer;
730      string id;
731      *buffer>>id;
732      get(id)->recvCreateFileHeader(*buffer);
733   }
734
735   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
736   void CContext::recvCreateFileHeader(CBufferIn& buffer)
737   {
738      createFileHeader();
739   }
740
741   //! Client side: Send a message to do some post processing on server
742   void CContext::sendPostProcessing()
743   {
744     if (!hasServer)
745     {
746       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
747       if (client->isServerLeader())
748       {
749         CMessage msg;
750         msg<<this->getIdServer();
751         const std::list<int>& ranks = client->getRanksServerLeader();
752         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
753           event.push(*itRank,1,msg);
754         client->sendEvent(event);
755       }
756       else client->sendEvent(event);
757     }
758   }
759
760   //! Server side: Receive a message to do some post processing
761   void CContext::recvPostProcessing(CEventServer& event)
762   {
763      CBufferIn* buffer=event.subEvents.begin()->buffer;
764      string id;
765      *buffer>>id;
766      get(id)->recvPostProcessing(*buffer);
767   }
768
769   //! Server side: Receive a message to do some post processing
770   void CContext::recvPostProcessing(CBufferIn& buffer)
771   {
772      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
773      postProcessing();
774   }
775
776   const StdString& CContext::getIdServer()
777   {
778      if (hasClient)
779      {
780        idServer_ = this->getId();
781        idServer_ += "_server";
782        return idServer_;
783      }
784      if (hasServer) return (this->getId());
785   }
786
787   /*!
788   \brief Do some simple post processings after parsing xml file
789      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
790   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
791   which will be written out into netcdf files, are processed
792   */
793   void CContext::postProcessing()
794   {
795     if (isPostProcessed) return;
796
797      // Make sure the calendar was correctly created
798      if (!calendar)
799        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
800      else if (calendar->getTimeStep() == NoneDu)
801        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
802      // Calendar first update to set the current date equals to the start date
803      calendar->update(0);
804
805      // Find all inheritance in xml structure
806      this->solveAllInheritance();
807
808      // Check if some axis, domains or grids are eligible to for compressed indexed output.
809      // Warning: This must be done after solving the inheritance and before the rest of post-processing
810      checkAxisDomainsGridsEligibilityForCompressedOutput();
811
812      // Check if some automatic time series should be generated
813      // Warning: This must be done after solving the inheritance and before the rest of post-processing
814      prepareTimeseries();
815
816      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
817      this->findEnabledFiles();
818      this->findEnabledReadModeFiles();
819
820      // Find all enabled fields of each file
821      this->findAllEnabledFields();
822      this->findAllEnabledFieldsInReadModeFiles();
823
824     if (hasClient && !hasServer)
825     {
826      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
827      this->readAttributesOfEnabledFieldsInReadModeFiles();
828     }
829
830      // Only search and rebuild all reference objects of enable fields, don't transform
831      this->solveOnlyRefOfEnabledFields(false);
832
833      // Search and rebuild all reference object of enabled fields
834      this->solveAllRefOfEnabledFields(false);
835
836      // Find all fields with read access from the public API
837      findFieldsWithReadAccess();
838      // and solve the all reference for them
839      solveAllRefOfFieldsWithReadAccess();
840
841      isPostProcessed = true;
842   }
843
844   /*!
845    * Compute the required buffer size to send the attributes (mostly those grid related).
846    *
847    * \param maxEventSize [in/out] the size of the bigger event for each connected server
848    */
849   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
850   {
851     std::map<int, StdSize> attributesSize;
852
853     if (hasClient)
854     {
855       size_t numEnabledFiles = this->enabledFiles.size();
856       for (size_t i = 0; i < numEnabledFiles; ++i)
857       {
858         CFile* file = this->enabledFiles[i];
859
860         std::vector<CField*> enabledFields = file->getEnabledFields();
861         size_t numEnabledFields = enabledFields.size();
862         for (size_t j = 0; j < numEnabledFields; ++j)
863         {
864           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
865           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
866           for (; it != itE; ++it)
867           {
868             // If attributesSize[it->first] does not exist, it will be zero-initialized
869             // so we can use it safely without checking for its existance
870             if (attributesSize[it->first] < it->second)
871               attributesSize[it->first] = it->second;
872
873             if (maxEventSize[it->first] < it->second)
874               maxEventSize[it->first] = it->second;
875           }
876         }
877       }
878     }
879
880     return attributesSize;
881   }
882
883   /*!
884    * Compute the required buffer size to send the fields data.
885    *
886    * \param maxEventSize [in/out] the size of the bigger event for each connected server
887    */
888   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
889   {
890     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
891
892     std::map<int, StdSize> dataSize;
893
894     // Find all reference domain and axis of all active fields
895     size_t numEnabledFiles = this->enabledFiles.size();
896     for (size_t i = 0; i < numEnabledFiles; ++i)
897     {
898       CFile* file = this->enabledFiles[i];
899       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
900
901       if (fileMode == mode)
902       {
903         std::vector<CField*> enabledFields = file->getEnabledFields();
904         size_t numEnabledFields = enabledFields.size();
905         for (size_t j = 0; j < numEnabledFields; ++j)
906         {
907           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
908           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
909           for (; it != itE; ++it)
910           {
911             // If dataSize[it->first] does not exist, it will be zero-initialized
912             // so we can use it safely without checking for its existance
913             if (CXios::isOptPerformance)
914               dataSize[it->first] += it->second;
915             else if (dataSize[it->first] < it->second)
916               dataSize[it->first] = it->second;
917
918             if (maxEventSize[it->first] < it->second)
919               maxEventSize[it->first] = it->second;
920           }
921         }
922       }
923     }
924
925     return dataSize;
926   }
927
928   //! Client side: Send infomation of active files (files are enabled to write out)
929   void CContext::sendEnabledFiles()
930   {
931     int size = this->enabledFiles.size();
932
933     // In a context, each type has a root definition, e.g: axis, domain, field.
934     // Every object must be a child of one of these root definition. In this case
935     // all new file objects created on server must be children of the root "file_definition"
936     StdString fileDefRoot("file_definition");
937     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
938
939     for (int i = 0; i < size; ++i)
940     {
941       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
942       this->enabledFiles[i]->sendAllAttributesToServer();
943       this->enabledFiles[i]->sendAddAllVariables();
944     }
945   }
946
947   //! Client side: Send information of active fields (ones are written onto files)
948   void CContext::sendEnabledFields()
949   {
950     int size = this->enabledFiles.size();
951     for (int i = 0; i < size; ++i)
952     {
953       this->enabledFiles[i]->sendEnabledFields();
954     }
955   }
956
957   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
958   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
959   {
960     if (!hasClient) return;
961
962     const vector<CAxis*> allAxis = CAxis::getAll();
963     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
964       (*it)->checkEligibilityForCompressedOutput();
965
966     const vector<CDomain*> allDomains = CDomain::getAll();
967     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
968       (*it)->checkEligibilityForCompressedOutput();
969
970     const vector<CGrid*> allGrids = CGrid::getAll();
971     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
972       (*it)->checkEligibilityForCompressedOutput();
973   }
974
975   //! Client side: Prepare the timeseries by adding the necessary files
976   void CContext::prepareTimeseries()
977   {
978     if (!hasClient) return;
979
980     const std::vector<CFile*> allFiles = CFile::getAll();
981     for (size_t i = 0; i < allFiles.size(); i++)
982     {
983       CFile* file = allFiles[i];
984
985       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
986       {
987         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
988
989         const std::vector<CField*> allFields = file->getAllFields();
990         for (size_t j = 0; j < allFields.size(); j++)
991         {
992           CField* field = allFields[j];
993
994           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
995           {
996             CFile* tsFile = CFile::create();
997             tsFile->duplicateAttributes(file);
998
999             tsFile->name = tsPrefix + "_";
1000             if (!field->name.isEmpty())
1001               tsFile->name.get() += field->name;
1002             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1003               tsFile->name.get() += field->field_ref;
1004             else
1005               tsFile->name.get() += field->getId();
1006
1007             if (!field->ts_split_freq.isEmpty())
1008               tsFile->split_freq = field->ts_split_freq;
1009
1010             CField* tsField = tsFile->addField();
1011             tsField->field_ref = field->getId();
1012
1013             tsFile->solveFieldRefInheritance(true);
1014
1015             if (file->timeseries == CFile::timeseries_attr::exclusive)
1016               field->enabled = false;
1017           }
1018         }
1019
1020         // Finally disable the original file is need be
1021         if (file->timeseries == CFile::timeseries_attr::only)
1022          file->enabled = false;
1023       }
1024     }
1025   }
1026
1027   //! Client side: Send information of reference grid of active fields
1028   void CContext::sendRefGrid()
1029   {
1030     std::set<StdString> gridIds;
1031     int sizeFile = this->enabledFiles.size();
1032     CFile* filePtr(NULL);
1033
1034     // Firstly, find all reference grids of all active fields
1035     for (int i = 0; i < sizeFile; ++i)
1036     {
1037       filePtr = this->enabledFiles[i];
1038       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1039       int sizeField = enabledFields.size();
1040       for (int numField = 0; numField < sizeField; ++numField)
1041       {
1042         if (0 != enabledFields[numField]->getRelGrid())
1043           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1044       }
1045     }
1046
1047     // Create all reference grids on server side
1048     StdString gridDefRoot("grid_definition");
1049     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1050     std::set<StdString>::const_iterator it, itE = gridIds.end();
1051     for (it = gridIds.begin(); it != itE; ++it)
1052     {
1053       gridPtr->sendCreateChild(*it);
1054       CGrid::get(*it)->sendAllAttributesToServer();
1055       CGrid::get(*it)->sendAllDomains();
1056       CGrid::get(*it)->sendAllAxis();
1057       CGrid::get(*it)->sendAllScalars();
1058     }
1059   }
1060
1061
1062   //! Client side: Send information of reference domain and axis of active fields
1063   void CContext::sendRefDomainsAxis()
1064   {
1065     std::set<StdString> domainIds, axisIds, scalarIds;
1066
1067     // Find all reference domain and axis of all active fields
1068     int numEnabledFiles = this->enabledFiles.size();
1069     for (int i = 0; i < numEnabledFiles; ++i)
1070     {
1071       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1072       int numEnabledFields = enabledFields.size();
1073       for (int j = 0; j < numEnabledFields; ++j)
1074       {
1075         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1076         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1077         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1078         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1079       }
1080     }
1081
1082     // Create all reference axis on server side
1083     std::set<StdString>::iterator itDom, itAxis, itScalar;
1084     std::set<StdString>::const_iterator itE;
1085
1086     StdString scalarDefRoot("scalar_definition");
1087     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1088     itE = scalarIds.end();
1089     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1090     {
1091       if (!itScalar->empty())
1092       {
1093         scalarPtr->sendCreateChild(*itScalar);
1094         CScalar::get(*itScalar)->sendAllAttributesToServer();
1095       }
1096     }
1097
1098     StdString axiDefRoot("axis_definition");
1099     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1100     itE = axisIds.end();
1101     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1102     {
1103       if (!itAxis->empty())
1104       {
1105         axisPtr->sendCreateChild(*itAxis);
1106         CAxis::get(*itAxis)->sendAllAttributesToServer();
1107       }
1108     }
1109
1110     // Create all reference domains on server side
1111     StdString domDefRoot("domain_definition");
1112     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1113     itE = domainIds.end();
1114     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1115     {
1116       if (!itDom->empty()) {
1117          domPtr->sendCreateChild(*itDom);
1118          CDomain::get(*itDom)->sendAllAttributesToServer();
1119       }
1120     }
1121   }
1122
1123   //! Update calendar in each time step
1124   void CContext::updateCalendar(int step)
1125   {
1126      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1127      calendar->update(step);
1128      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1129
1130      if (hasClient)
1131      {
1132        checkPrefetchingOfEnabledReadModeFiles();
1133        garbageCollector.invalidate(calendar->getCurrentDate());
1134      }
1135   }
1136
1137   //! Server side: Create header of netcdf file
1138   void CContext::createFileHeader(void )
1139   {
1140      vector<CFile*>::const_iterator it;
1141
1142      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1143      {
1144         (*it)->initFile();
1145      }
1146   }
1147
1148   //! Get current context
1149   CContext* CContext::getCurrent(void)
1150   {
1151     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1152   }
1153
1154   /*!
1155   \brief Set context with an id be the current context
1156   \param [in] id identity of context to be set to current
1157   */
1158   void CContext::setCurrent(const string& id)
1159   {
1160     CObjectFactory::SetCurrentContextId(id);
1161     CGroupFactory::SetCurrentContextId(id);
1162   }
1163
1164  /*!
1165  \brief Create a context with specific id
1166  \param [in] id identity of new context
1167  \return pointer to the new context or already-existed one with identity id
1168  */
1169  CContext* CContext::create(const StdString& id)
1170  {
1171    CContext::setCurrent(id);
1172
1173    bool hasctxt = CContext::has(id);
1174    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1175    getRoot();
1176    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1177
1178#define DECLARE_NODE(Name_, name_) \
1179    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1180#define DECLARE_NODE_PAR(Name_, name_)
1181#include "node_type.conf"
1182
1183    return (context);
1184  }
1185
1186
1187
1188     //! Server side: Receive a message to do some post processing
1189  void CContext::recvRegistry(CEventServer& event)
1190  {
1191    CBufferIn* buffer=event.subEvents.begin()->buffer;
1192    string id;
1193    *buffer>>id;
1194    get(id)->recvRegistry(*buffer);
1195  }
1196
1197  void CContext::recvRegistry(CBufferIn& buffer)
1198  {
1199    if (server->intraCommRank==0)
1200    {
1201      CRegistry registry(server->intraComm) ;
1202      registry.fromBuffer(buffer) ;
1203      registryOut->mergeRegistry(registry) ;
1204    }
1205  }
1206
1207  void CContext::sendRegistry(void)
1208  {
1209    registryOut->hierarchicalGatherRegistry() ;
1210
1211    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1212    if (client->isServerLeader())
1213    {
1214       CMessage msg ;
1215       msg<<this->getIdServer();
1216       if (client->clientRank==0) msg<<*registryOut ;
1217       const std::list<int>& ranks = client->getRanksServerLeader();
1218       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1219         event.push(*itRank,1,msg);
1220       client->sendEvent(event);
1221     }
1222     else client->sendEvent(event);
1223  }
1224
1225} // namespace xios
Note: See TracBrowser for help on using the repository browser.