source: XIOS/trunk/src/node/context.cpp @ 909

Last change on this file since 909 was 909, checked in by rlacroix, 6 years ago

Fix: The "buffer_size_factor" parameter was not used.

I mistakenly removed it from the buffers sizes computation in r731.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 39.0 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Définitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t minBufferSize = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> bufferSize = getAttributesBufferSize();
277     std::map<int, StdSize> dataBufferSize = getDataBufferSize();
278
279     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
280     for (it = dataBufferSize.begin(); it != ite; ++it)
281       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
282
283     ite = bufferSize.end();
284     for (it = bufferSize.begin(); it != ite; ++it)
285     {
286       it->second *= CXios::bufferSizeFactor;
287       if (it->second < minBufferSize) it->second = minBufferSize;
288     }
289
290     if (client->isServerLeader())
291     {
292       const std::list<int>& ranks = client->getRanksServerLeader();
293       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
294         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = minBufferSize;
295     }
296
297     if (!bufferSize.empty())
298       client->setBufferSize(bufferSize);
299   }
300
301   //! Verify whether a context is initialized
302   bool CContext::isInitialized(void)
303   {
304     return hasClient;
305   }
306
307   //! Initialize server
308   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/)
309   {
310     hasServer=true;
311     server = new CContextServer(this,intraComm,interComm);
312
313     registryIn=new CRegistry(intraComm);
314     registryIn->setPath(getId()) ;
315     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
316     registryIn->bcastRegistry() ;
317     registryOut=new CRegistry(intraComm) ;
318     registryOut->setPath(getId()) ;
319
320     MPI_Comm intraCommClient, interCommClient;
321     if (cxtClient) // Attached mode
322     {
323       intraCommClient = intraComm;
324       interCommClient = interComm;
325     }
326     else
327     {
328       MPI_Comm_dup(intraComm, &intraCommClient);
329       comms.push_back(intraCommClient);
330       MPI_Comm_dup(interComm, &interCommClient);
331       comms.push_back(interCommClient);
332     }
333     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
334   }
335
336   //! Server side: Put server into a loop in order to listen message from client
337   bool CContext::eventLoop(void)
338   {
339     return server->eventLoop();
340   }
341
342   //! Try to send the buffers and receive possible answers
343   bool CContext::checkBuffersAndListen(void)
344   {
345     client->checkBuffers();
346     return server->eventLoop();
347   }
348
349   //! Terminate a context
350   void CContext::finalize(void)
351   {
352      if (!finalized)
353      {
354        finalized = true;
355        if (hasClient) sendRegistry() ;
356        client->finalize();
357        while (!server->hasFinished())
358        {
359          server->eventLoop();
360        }
361
362        if (hasServer)
363        {
364          closeAllFile();
365          registryOut->hierarchicalGatherRegistry() ;
366          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
367        }
368
369        for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
370          MPI_Comm_free(&(*it));
371        comms.clear();
372      }
373   }
374
375   /*!
376   \brief Close all the context defintion and do processing data
377      After everything is well defined on client side, they will be processed and sent to server
378   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
379   all necessary information to server, from which each server can build its own database.
380   Because the role of server is to write out field data on a specific netcdf file,
381   the only information that it needs is the enabled files
382   and the active fields (fields will be written onto active files)
383   */
384   void CContext::closeDefinition(void)
385   {
386     // There is nothing client need to send to server
387     if (hasClient)
388     {
389       // After xml is parsed, there are some more works with post processing
390       postProcessing();
391     }
392     setClientServerBuffer();
393
394     if (hasClient && !hasServer)
395     {
396      // Send all attributes of current context to server
397      this->sendAllAttributesToServer();
398
399      // Send all attributes of current calendar
400      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
401
402      // We have enough information to send to server
403      // First of all, send all enabled files
404       sendEnabledFiles();
405
406      // Then, send all enabled fields
407       sendEnabledFields();
408
409      // At last, we have all info of domain and axis, then send them
410       sendRefDomainsAxis();
411
412      // After that, send all grid (if any)
413       sendRefGrid();
414    }
415
416    // We have a xml tree on the server side and now, it should be also processed
417    if (hasClient && !hasServer) sendPostProcessing();
418
419    // There are some processings that should be done after all of above. For example: check mask or index
420    if (hasClient)
421    {
422      this->buildFilterGraphOfEnabledFields();
423      buildFilterGraphOfFieldsWithReadAccess();
424      this->solveAllRefOfEnabledFields(true);
425    }
426
427    // Now tell server that it can process all messages from client
428    if (hasClient && !hasServer) this->sendCloseDefinition();
429
430    // Nettoyage de l'arborescence
431    if (hasClient && !hasServer) CleanTree(); // Only on client side??
432
433    if (hasClient)
434    {
435      sendCreateFileHeader();
436
437      startPrefetchingOfEnabledReadModeFiles();
438    }
439   }
440
441   void CContext::findAllEnabledFields(void)
442   {
443     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
444     (void)this->enabledFiles[i]->getEnabledFields();
445   }
446
447   void CContext::findAllEnabledFieldsInReadModeFiles(void)
448   {
449     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
450     (void)this->enabledReadModeFiles[i]->getEnabledFields();
451   }
452
453   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
454   {
455      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
456        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
457   }
458
459   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
460   {
461     int size = this->enabledFiles.size();
462     for (int i = 0; i < size; ++i)
463     {
464       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
465     }
466
467     for (int i = 0; i < size; ++i)
468     {
469       this->enabledFiles[i]->generateNewTransformationGridDest();
470     }
471   }
472
473   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
474   {
475     int size = this->enabledFiles.size();
476     for (int i = 0; i < size; ++i)
477     {
478       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
479     }
480   }
481
482   void CContext::buildFilterGraphOfEnabledFields()
483   {
484     int size = this->enabledFiles.size();
485     for (int i = 0; i < size; ++i)
486     {
487       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
488     }
489   }
490
491   void CContext::startPrefetchingOfEnabledReadModeFiles()
492   {
493     int size = enabledReadModeFiles.size();
494     for (int i = 0; i < size; ++i)
495     {
496        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
497     }
498   }
499
500   void CContext::checkPrefetchingOfEnabledReadModeFiles()
501   {
502     int size = enabledReadModeFiles.size();
503     for (int i = 0; i < size; ++i)
504     {
505        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
506     }
507   }
508
509  void CContext::findFieldsWithReadAccess(void)
510  {
511    fieldsWithReadAccess.clear();
512    const vector<CField*> allFields = CField::getAll();
513    for (size_t i = 0; i < allFields.size(); ++i)
514    {
515      CField* field = allFields[i];
516
517      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
518        field->read_access = true;
519      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
520        fieldsWithReadAccess.push_back(field);
521    }
522  }
523
524  void CContext::solveAllRefOfFieldsWithReadAccess()
525  {
526    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
527      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
528  }
529
530  void CContext::buildFilterGraphOfFieldsWithReadAccess()
531  {
532    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
533      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
534  }
535
536   void CContext::solveAllInheritance(bool apply)
537   {
538     // Résolution des héritages descendants (càd des héritages de groupes)
539     // pour chacun des contextes.
540      solveDescInheritance(apply);
541
542     // Résolution des héritages par référence au niveau des fichiers.
543      const vector<CFile*> allFiles=CFile::getAll();
544      const vector<CGrid*> allGrids= CGrid::getAll();
545
546     //if (hasClient && !hasServer)
547      if (hasClient)
548      {
549        for (unsigned int i = 0; i < allFiles.size(); i++)
550          allFiles[i]->solveFieldRefInheritance(apply);
551      }
552
553      unsigned int vecSize = allGrids.size();
554      unsigned int i = 0;
555      for (i = 0; i < vecSize; ++i)
556        allGrids[i]->solveDomainAxisRefInheritance(apply);
557
558   }
559
560   void CContext::findEnabledFiles(void)
561   {
562      const std::vector<CFile*> allFiles = CFile::getAll();
563
564      for (unsigned int i = 0; i < allFiles.size(); i++)
565         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
566         {
567            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
568               enabledFiles.push_back(allFiles[i]);
569         }
570         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
571
572
573      if (enabledFiles.size() == 0)
574         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
575               << getId() << "\" !");
576   }
577
578   void CContext::findEnabledReadModeFiles(void)
579   {
580     int size = this->enabledFiles.size();
581     for (int i = 0; i < size; ++i)
582     {
583       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
584        enabledReadModeFiles.push_back(enabledFiles[i]);
585     }
586   }
587
588   void CContext::closeAllFile(void)
589   {
590     std::vector<CFile*>::const_iterator
591            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
592
593     for (; it != end; it++)
594     {
595       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
596       (*it)->close();
597     }
598   }
599
600   /*!
601   \brief Dispatch event received from client
602      Whenever a message is received in buffer of server, it will be processed depending on
603   its event type. A new event type should be added in the switch list to make sure
604   it processed on server side.
605   \param [in] event: Received message
606   */
607   bool CContext::dispatchEvent(CEventServer& event)
608   {
609
610      if (SuperClass::dispatchEvent(event)) return true;
611      else
612      {
613        switch(event.type)
614        {
615           case EVENT_ID_CLOSE_DEFINITION :
616             recvCloseDefinition(event);
617             return true;
618             break;
619           case EVENT_ID_UPDATE_CALENDAR:
620             recvUpdateCalendar(event);
621             return true;
622             break;
623           case EVENT_ID_CREATE_FILE_HEADER :
624             recvCreateFileHeader(event);
625             return true;
626             break;
627           case EVENT_ID_POST_PROCESS:
628             recvPostProcessing(event);
629             return true;
630            case EVENT_ID_SEND_REGISTRY:
631             recvRegistry(event);
632             return true;
633            break;
634
635           default :
636             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
637                    <<"Unknown Event");
638           return false;
639         }
640      }
641   }
642
643   //! Client side: Send a message to server to make it close
644   void CContext::sendCloseDefinition(void)
645   {
646     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
647     if (client->isServerLeader())
648     {
649       CMessage msg;
650       msg<<this->getIdServer();
651       const std::list<int>& ranks = client->getRanksServerLeader();
652       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
653         event.push(*itRank,1,msg);
654       client->sendEvent(event);
655     }
656     else client->sendEvent(event);
657   }
658
659   //! Server side: Receive a message of client announcing a context close
660   void CContext::recvCloseDefinition(CEventServer& event)
661   {
662
663      CBufferIn* buffer=event.subEvents.begin()->buffer;
664      string id;
665      *buffer>>id;
666      get(id)->closeDefinition();
667   }
668
669   //! Client side: Send a message to update calendar in each time step
670   void CContext::sendUpdateCalendar(int step)
671   {
672     if (!hasServer)
673     {
674       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
675       if (client->isServerLeader())
676       {
677         CMessage msg;
678         msg<<this->getIdServer()<<step;
679         const std::list<int>& ranks = client->getRanksServerLeader();
680         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
681           event.push(*itRank,1,msg);
682         client->sendEvent(event);
683       }
684       else client->sendEvent(event);
685     }
686   }
687
688   //! Server side: Receive a message of client annoucing calendar update
689   void CContext::recvUpdateCalendar(CEventServer& event)
690   {
691      CBufferIn* buffer=event.subEvents.begin()->buffer;
692      string id;
693      *buffer>>id;
694      get(id)->recvUpdateCalendar(*buffer);
695   }
696
697   //! Server side: Receive a message of client annoucing calendar update
698   void CContext::recvUpdateCalendar(CBufferIn& buffer)
699   {
700      int step;
701      buffer>>step;
702      updateCalendar(step);
703   }
704
705   //! Client side: Send a message to create header part of netcdf file
706   void CContext::sendCreateFileHeader(void)
707   {
708     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
709     if (client->isServerLeader())
710     {
711       CMessage msg;
712       msg<<this->getIdServer();
713       const std::list<int>& ranks = client->getRanksServerLeader();
714       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
715         event.push(*itRank,1,msg) ;
716       client->sendEvent(event);
717     }
718     else client->sendEvent(event);
719   }
720
721   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
722   void CContext::recvCreateFileHeader(CEventServer& event)
723   {
724      CBufferIn* buffer=event.subEvents.begin()->buffer;
725      string id;
726      *buffer>>id;
727      get(id)->recvCreateFileHeader(*buffer);
728   }
729
730   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
731   void CContext::recvCreateFileHeader(CBufferIn& buffer)
732   {
733      createFileHeader();
734   }
735
736   //! Client side: Send a message to do some post processing on server
737   void CContext::sendPostProcessing()
738   {
739     if (!hasServer)
740     {
741       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
742       if (client->isServerLeader())
743       {
744         CMessage msg;
745         msg<<this->getIdServer();
746         const std::list<int>& ranks = client->getRanksServerLeader();
747         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
748           event.push(*itRank,1,msg);
749         client->sendEvent(event);
750       }
751       else client->sendEvent(event);
752     }
753   }
754
755   //! Server side: Receive a message to do some post processing
756   void CContext::recvPostProcessing(CEventServer& event)
757   {
758      CBufferIn* buffer=event.subEvents.begin()->buffer;
759      string id;
760      *buffer>>id;
761      get(id)->recvPostProcessing(*buffer);
762   }
763
764   //! Server side: Receive a message to do some post processing
765   void CContext::recvPostProcessing(CBufferIn& buffer)
766   {
767      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
768      postProcessing();
769   }
770
771   const StdString& CContext::getIdServer()
772   {
773      if (hasClient)
774      {
775        idServer_ = this->getId();
776        idServer_ += "_server";
777        return idServer_;
778      }
779      if (hasServer) return (this->getId());
780   }
781
782   /*!
783   \brief Do some simple post processings after parsing xml file
784      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
785   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
786   which will be written out into netcdf files, are processed
787   */
788   void CContext::postProcessing()
789   {
790     if (isPostProcessed) return;
791
792      // Make sure the calendar was correctly created
793      if (!calendar)
794        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
795      else if (calendar->getTimeStep() == NoneDu)
796        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
797      // Calendar first update to set the current date equals to the start date
798      calendar->update(0);
799
800      // Find all inheritance in xml structure
801      this->solveAllInheritance();
802
803      // Check if some axis, domains or grids are eligible to for compressed indexed output.
804      // Warning: This must be done after solving the inheritance and before the rest of post-processing
805      checkAxisDomainsGridsEligibilityForCompressedOutput();
806
807      // Check if some automatic time series should be generated
808      // Warning: This must be done after solving the inheritance and before the rest of post-processing
809      prepareTimeseries();
810
811      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
812      this->findEnabledFiles();
813      this->findEnabledReadModeFiles();
814
815      // Find all enabled fields of each file
816      this->findAllEnabledFields();
817      this->findAllEnabledFieldsInReadModeFiles();
818
819     if (hasClient && !hasServer)
820     {
821      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
822      this->readAttributesOfEnabledFieldsInReadModeFiles();
823     }
824
825      // Only search and rebuild all reference objects of enable fields, don't transform
826      this->solveOnlyRefOfEnabledFields(false);
827
828      // Search and rebuild all reference object of enabled fields
829      this->solveAllRefOfEnabledFields(false);
830
831      // Find all fields with read access from the public API
832      findFieldsWithReadAccess();
833      // and solve the all reference for them
834      solveAllRefOfFieldsWithReadAccess();
835
836      isPostProcessed = true;
837   }
838
839   std::map<int, StdSize> CContext::getAttributesBufferSize()
840   {
841     std::map<int, StdSize> attributesSize;
842
843     if (hasClient)
844     {
845       size_t numEnabledFiles = this->enabledFiles.size();
846       for (size_t i = 0; i < numEnabledFiles; ++i)
847       {
848         CFile* file = this->enabledFiles[i];
849
850         std::vector<CField*> enabledFields = file->getEnabledFields();
851         size_t numEnabledFields = enabledFields.size();
852         for (size_t j = 0; j < numEnabledFields; ++j)
853         {
854           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
855           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
856           for (; it != itE; ++it)
857           {
858             // If attributesSize[it->first] does not exist, it will be zero-initialized
859             // so we can use it safely without checking for its existance
860             if (attributesSize[it->first] < it->second)
861               attributesSize[it->first] = it->second;
862           }
863         }
864       }
865     }
866
867     return attributesSize;
868   }
869
870   std::map<int, StdSize> CContext::getDataBufferSize()
871   {
872     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
873
874     std::map<int, StdSize> dataSize;
875
876     // Find all reference domain and axis of all active fields
877     size_t numEnabledFiles = this->enabledFiles.size();
878     for (size_t i = 0; i < numEnabledFiles; ++i)
879     {
880       CFile* file = this->enabledFiles[i];
881       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
882
883       if (fileMode == mode)
884       {
885         std::vector<CField*> enabledFields = file->getEnabledFields();
886         size_t numEnabledFields = enabledFields.size();
887         for (size_t j = 0; j < numEnabledFields; ++j)
888         {
889           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
890           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
891           for (; it != itE; ++it)
892           {
893             // If dataSize[it->first] does not exist, it will be zero-initialized
894             // so we can use it safely without checking for its existance
895             if (CXios::isOptPerformance)
896               dataSize[it->first] += it->second;
897             else if (dataSize[it->first] < it->second)
898               dataSize[it->first] = it->second;
899           }
900         }
901       }
902     }
903
904     return dataSize;
905   }
906
907   //! Client side: Send infomation of active files (files are enabled to write out)
908   void CContext::sendEnabledFiles()
909   {
910     int size = this->enabledFiles.size();
911
912     // In a context, each type has a root definition, e.g: axis, domain, field.
913     // Every object must be a child of one of these root definition. In this case
914     // all new file objects created on server must be children of the root "file_definition"
915     StdString fileDefRoot("file_definition");
916     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
917
918     for (int i = 0; i < size; ++i)
919     {
920       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
921       this->enabledFiles[i]->sendAllAttributesToServer();
922       this->enabledFiles[i]->sendAddAllVariables();
923     }
924   }
925
926   //! Client side: Send information of active fields (ones are written onto files)
927   void CContext::sendEnabledFields()
928   {
929     int size = this->enabledFiles.size();
930     for (int i = 0; i < size; ++i)
931     {
932       this->enabledFiles[i]->sendEnabledFields();
933     }
934   }
935
936   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
937   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
938   {
939     if (!hasClient) return;
940
941     const vector<CAxis*> allAxis = CAxis::getAll();
942     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
943       (*it)->checkEligibilityForCompressedOutput();
944
945     const vector<CDomain*> allDomains = CDomain::getAll();
946     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
947       (*it)->checkEligibilityForCompressedOutput();
948
949     const vector<CGrid*> allGrids = CGrid::getAll();
950     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
951       (*it)->checkEligibilityForCompressedOutput();
952   }
953
954   //! Client side: Prepare the timeseries by adding the necessary files
955   void CContext::prepareTimeseries()
956   {
957     if (!hasClient) return;
958
959     const std::vector<CFile*> allFiles = CFile::getAll();
960     for (size_t i = 0; i < allFiles.size(); i++)
961     {
962       CFile* file = allFiles[i];
963
964       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
965       {
966         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
967
968         const std::vector<CField*> allFields = file->getAllFields();
969         for (size_t j = 0; j < allFields.size(); j++)
970         {
971           CField* field = allFields[j];
972
973           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
974           {
975             CFile* tsFile = CFile::create();
976             tsFile->duplicateAttributes(file);
977
978             tsFile->name = tsPrefix + "_";
979             if (!field->name.isEmpty())
980               tsFile->name.get() += field->name;
981             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
982               tsFile->name.get() += field->field_ref;
983             else
984               tsFile->name.get() += field->getId();
985
986             if (!field->ts_split_freq.isEmpty())
987               tsFile->split_freq = field->ts_split_freq;
988
989             CField* tsField = tsFile->addField();
990             tsField->field_ref = field->getId();
991
992             tsFile->solveFieldRefInheritance(true);
993
994             if (file->timeseries == CFile::timeseries_attr::exclusive)
995               field->enabled = false;
996           }
997         }
998
999         // Finally disable the original file is need be
1000         if (file->timeseries == CFile::timeseries_attr::only)
1001          file->enabled = false;
1002       }
1003     }
1004   }
1005
1006   //! Client side: Send information of reference grid of active fields
1007   void CContext::sendRefGrid()
1008   {
1009     std::set<StdString> gridIds;
1010     int sizeFile = this->enabledFiles.size();
1011     CFile* filePtr(NULL);
1012
1013     // Firstly, find all reference grids of all active fields
1014     for (int i = 0; i < sizeFile; ++i)
1015     {
1016       filePtr = this->enabledFiles[i];
1017       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1018       int sizeField = enabledFields.size();
1019       for (int numField = 0; numField < sizeField; ++numField)
1020       {
1021         if (0 != enabledFields[numField]->getRelGrid())
1022           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1023       }
1024     }
1025
1026     // Create all reference grids on server side
1027     StdString gridDefRoot("grid_definition");
1028     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1029     std::set<StdString>::const_iterator it, itE = gridIds.end();
1030     for (it = gridIds.begin(); it != itE; ++it)
1031     {
1032       gridPtr->sendCreateChild(*it);
1033       CGrid::get(*it)->sendAllAttributesToServer();
1034       CGrid::get(*it)->sendAllDomains();
1035       CGrid::get(*it)->sendAllAxis();
1036       CGrid::get(*it)->sendAllScalars();
1037     }
1038   }
1039
1040
1041   //! Client side: Send information of reference domain and axis of active fields
1042   void CContext::sendRefDomainsAxis()
1043   {
1044     std::set<StdString> domainIds, axisIds, scalarIds;
1045
1046     // Find all reference domain and axis of all active fields
1047     int numEnabledFiles = this->enabledFiles.size();
1048     for (int i = 0; i < numEnabledFiles; ++i)
1049     {
1050       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1051       int numEnabledFields = enabledFields.size();
1052       for (int j = 0; j < numEnabledFields; ++j)
1053       {
1054         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1055         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1056         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1057         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1058       }
1059     }
1060
1061     // Create all reference axis on server side
1062     std::set<StdString>::iterator itDom, itAxis, itScalar;
1063     std::set<StdString>::const_iterator itE;
1064
1065     StdString scalarDefRoot("scalar_definition");
1066     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1067     itE = scalarIds.end();
1068     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1069     {
1070       if (!itScalar->empty())
1071       {
1072         scalarPtr->sendCreateChild(*itScalar);
1073         CScalar::get(*itScalar)->sendAllAttributesToServer();
1074       }
1075     }
1076
1077     StdString axiDefRoot("axis_definition");
1078     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1079     itE = axisIds.end();
1080     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1081     {
1082       if (!itAxis->empty())
1083       {
1084         axisPtr->sendCreateChild(*itAxis);
1085         CAxis::get(*itAxis)->sendAllAttributesToServer();
1086       }
1087     }
1088
1089     // Create all reference domains on server side
1090     StdString domDefRoot("domain_definition");
1091     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1092     itE = domainIds.end();
1093     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1094     {
1095       if (!itDom->empty()) {
1096          domPtr->sendCreateChild(*itDom);
1097          CDomain::get(*itDom)->sendAllAttributesToServer();
1098       }
1099     }
1100   }
1101
1102   //! Update calendar in each time step
1103   void CContext::updateCalendar(int step)
1104   {
1105      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1106      calendar->update(step);
1107      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1108
1109      if (hasClient)
1110      {
1111        checkPrefetchingOfEnabledReadModeFiles();
1112        garbageCollector.invalidate(calendar->getCurrentDate());
1113      }
1114   }
1115
1116   //! Server side: Create header of netcdf file
1117   void CContext::createFileHeader(void )
1118   {
1119      vector<CFile*>::const_iterator it;
1120
1121      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1122      {
1123         (*it)->initFile();
1124      }
1125   }
1126
1127   //! Get current context
1128   CContext* CContext::getCurrent(void)
1129   {
1130     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1131   }
1132
1133   /*!
1134   \brief Set context with an id be the current context
1135   \param [in] id identity of context to be set to current
1136   */
1137   void CContext::setCurrent(const string& id)
1138   {
1139     CObjectFactory::SetCurrentContextId(id);
1140     CGroupFactory::SetCurrentContextId(id);
1141   }
1142
1143  /*!
1144  \brief Create a context with specific id
1145  \param [in] id identity of new context
1146  \return pointer to the new context or already-existed one with identity id
1147  */
1148  CContext* CContext::create(const StdString& id)
1149  {
1150    CContext::setCurrent(id);
1151
1152    bool hasctxt = CContext::has(id);
1153    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1154    getRoot();
1155    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1156
1157#define DECLARE_NODE(Name_, name_) \
1158    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1159#define DECLARE_NODE_PAR(Name_, name_)
1160#include "node_type.conf"
1161
1162    return (context);
1163  }
1164
1165
1166
1167     //! Server side: Receive a message to do some post processing
1168  void CContext::recvRegistry(CEventServer& event)
1169  {
1170    CBufferIn* buffer=event.subEvents.begin()->buffer;
1171    string id;
1172    *buffer>>id;
1173    get(id)->recvRegistry(*buffer);
1174  }
1175
1176  void CContext::recvRegistry(CBufferIn& buffer)
1177  {
1178    if (server->intraCommRank==0)
1179    {
1180      CRegistry registry(server->intraComm) ;
1181      registry.fromBuffer(buffer) ;
1182      registryOut->mergeRegistry(registry) ;
1183    }
1184  }
1185
1186  void CContext::sendRegistry(void)
1187  {
1188    registryOut->hierarchicalGatherRegistry() ;
1189
1190    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1191    if (client->isServerLeader())
1192    {
1193       CMessage msg ;
1194       msg<<this->getIdServer();
1195       if (client->clientRank==0) msg<<*registryOut ;
1196       const std::list<int>& ranks = client->getRanksServerLeader();
1197       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1198         event.push(*itRank,1,msg);
1199       client->sendEvent(event);
1200     }
1201     else client->sendEvent(event);
1202  }
1203
1204} // namespace xios
Note: See TracBrowser for help on using the repository browser.