source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1070

Last change on this file since 1070 was 1070, checked in by yushan, 7 years ago

Preperation for merge from trunk

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 41.2 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Dfinitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     ep_lib::MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262       
263     }
264     server = new CContextServer(this,intraCommServer,interCommServer);
265   }
266
267   void CContext::setClientServerBuffer()
268   {
269     size_t minBufferSize = CXios::minBufferSize;
270#define DECLARE_NODE(Name_, name_)    \
271     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
272#define DECLARE_NODE_PAR(Name_, name_)
273#include "node_type.conf"
274#undef DECLARE_NODE
275#undef DECLARE_NODE_PAR
276
277     std::map<int, StdSize> maxEventSize;
278     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
279     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
280
281     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
282     for (it = dataBufferSize.begin(); it != ite; ++it)
283       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
284
285     ite = bufferSize.end();
286     for (it = bufferSize.begin(); it != ite; ++it)
287     {
288       it->second *= CXios::bufferSizeFactor;
289       if (it->second < minBufferSize) it->second = minBufferSize;
290     }
291
292     // We consider that the minimum buffer size is also the minimum event size
293     ite = maxEventSize.end();
294     for (it = maxEventSize.begin(); it != ite; ++it)
295       if (it->second < minBufferSize) it->second = minBufferSize;
296
297     if (client->isServerLeader())
298     {
299       const std::list<int>& ranks = client->getRanksServerLeader();
300       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
301         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
302     }
303
304     client->setBufferSize(bufferSize, maxEventSize);
305   }
306
307   //! Verify whether a context is initialized
308   bool CContext::isInitialized(void)
309   {
310     return hasClient;
311   }
312
313   //! Initialize server
314   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
315   {
316     hasServer=true;
317     server = new CContextServer(this,intraComm,interComm);
318
319     registryIn=new CRegistry(intraComm);
320     registryIn->setPath(getId()) ;
321     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
322     registryIn->bcastRegistry() ;
323     registryOut=new CRegistry(intraComm) ;
324     registryOut->setPath(getId()) ;
325
326     ep_lib::MPI_Comm intraCommClient, interCommClient;
327     if (cxtClient) // Attached mode
328     {
329       intraCommClient = intraComm;
330       interCommClient = interComm;
331     }
332     else
333     {
334       MPI_Comm_dup(intraComm, &intraCommClient);
335       comms.push_back(intraCommClient);
336       MPI_Comm_dup(interComm, &interCommClient);
337       comms.push_back(interCommClient);
338     }
339     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
340   }
341
342   //! Server side: Put server into a loop in order to listen message from client
343   bool CContext::eventLoop(void)
344   {
345     return server->eventLoop();
346   }
347
348   //! Try to send the buffers and receive possible answers
349   bool CContext::checkBuffersAndListen(void)
350   {
351     client->checkBuffers();
352     return server->eventLoop();
353   }
354
355   //! Terminate a context
356   void CContext::finalize(void)
357   {
358      if (!finalized)
359      {
360        finalized = true;
361        if (hasClient) sendRegistry() ;
362        client->finalize();
363        while (!server->hasFinished())
364        {
365          server->eventLoop();
366        }
367
368        if (hasServer)
369        {
370          closeAllFile();
371          registryOut->hierarchicalGatherRegistry() ;
372          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
373        }
374
375        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
376          MPI_Comm_free(&(*it));
377        comms.clear();
378      }
379   }
380
381   /*!
382   \brief Close all the context defintion and do processing data
383      After everything is well defined on client side, they will be processed and sent to server
384   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
385   all necessary information to server, from which each server can build its own database.
386   Because the role of server is to write out field data on a specific netcdf file,
387   the only information that it needs is the enabled files
388   and the active fields (fields will be written onto active files)
389   */
390   void CContext::closeDefinition(void)
391   {
392
393     // There is nothing client need to send to server
394     if (hasClient)
395     {
396       // After xml is parsed, there are some more works with post processing
397       postProcessing(); 
398     }
399     setClientServerBuffer(); 
400
401     if (hasClient && !hasServer)
402     {
403      // Send all attributes of current context to server
404      this->sendAllAttributesToServer();
405
406      // Send all attributes of current calendar
407      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
408
409      // We have enough information to send to server
410      // First of all, send all enabled files
411       sendEnabledFiles(); 
412
413      // Then, send all enabled fields
414       sendEnabledFields(); 
415
416      // At last, we have all info of domain and axis, then send them
417       sendRefDomainsAxis(); 
418      // After that, send all grid (if any)
419       sendRefGrid(); 
420    }
421
422    // We have a xml tree on the server side and now, it should be also processed
423    if (hasClient && !hasServer) sendPostProcessing(); 
424
425    // There are some processings that should be done after all of above. For example: check mask or index
426    if (hasClient)
427    {
428      this->buildFilterGraphOfEnabledFields(); 
429      buildFilterGraphOfFieldsWithReadAccess(); 
430      this->solveAllRefOfEnabledFields(true); 
431    }
432
433    // Now tell server that it can process all messages from client
434    if (hasClient && !hasServer) this->sendCloseDefinition();
435
436    // Nettoyage de l'arborescence
437    if (hasClient && !hasServer) CleanTree();
438
439    if (hasClient)
440    {
441      sendCreateFileHeader(); 
442
443      startPrefetchingOfEnabledReadModeFiles(); 
444    }
445   }
446
447   void CContext::findAllEnabledFields(void)
448   {
449     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
450     (void)this->enabledFiles[i]->getEnabledFields();
451   }
452
453   void CContext::findAllEnabledFieldsInReadModeFiles(void)
454   {
455     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
456     (void)this->enabledReadModeFiles[i]->getEnabledFields();
457   }
458
459   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
460   {
461      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
462        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
463   }
464
465   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
466   {
467     int size = this->enabledFiles.size();
468     for (int i = 0; i < size; ++i)
469     {
470       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
471     }
472
473     for (int i = 0; i < size; ++i)
474     {
475       this->enabledFiles[i]->generateNewTransformationGridDest();
476     }
477   }
478
479   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
480   {
481     int size = this->enabledFiles.size();
482     
483     for (int i = 0; i < size; ++i)
484     {
485       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
486     }
487   }
488
489   void CContext::buildFilterGraphOfEnabledFields()
490   {
491     int size = this->enabledFiles.size();
492     for (int i = 0; i < size; ++i)
493     {
494       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
495     }
496   }
497
498   void CContext::startPrefetchingOfEnabledReadModeFiles()
499   {
500     int size = enabledReadModeFiles.size();
501     for (int i = 0; i < size; ++i)
502     {
503        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
504     }
505   }
506
507   void CContext::checkPrefetchingOfEnabledReadModeFiles()
508   {
509     int size = enabledReadModeFiles.size();
510     for (int i = 0; i < size; ++i)
511     {
512        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
513     }
514   }
515
516  void CContext::findFieldsWithReadAccess(void)
517  {
518    fieldsWithReadAccess.clear();
519    const vector<CField*> allFields = CField::getAll();
520    for (size_t i = 0; i < allFields.size(); ++i)
521    {
522      CField* field = allFields[i];
523
524      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
525        field->read_access = true;
526      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
527        fieldsWithReadAccess.push_back(field);
528    }
529  }
530
531  void CContext::solveAllRefOfFieldsWithReadAccess()
532  {
533    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
534      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
535  }
536
537  void CContext::buildFilterGraphOfFieldsWithReadAccess()
538  {
539    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
540      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
541  }
542
543   void CContext::solveAllInheritance(bool apply)
544   {
545     // Rsolution des hritages descendants (cd des hritages de groupes)
546     // pour chacun des contextes.
547      solveDescInheritance(apply);
548
549     // Rsolution des hritages par rfrence au niveau des fichiers.
550      const vector<CFile*> allFiles=CFile::getAll();
551      const vector<CGrid*> allGrids= CGrid::getAll();
552
553     //if (hasClient && !hasServer)
554      if (hasClient)
555      {
556        for (unsigned int i = 0; i < allFiles.size(); i++)
557          allFiles[i]->solveFieldRefInheritance(apply);
558      }
559
560      unsigned int vecSize = allGrids.size();
561      unsigned int i = 0;
562      for (i = 0; i < vecSize; ++i)
563        allGrids[i]->solveDomainAxisRefInheritance(apply);
564
565   }
566
567   void CContext::findEnabledFiles(void)
568   {
569      const std::vector<CFile*> allFiles = CFile::getAll();
570
571      for (unsigned int i = 0; i < allFiles.size(); i++)
572         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
573         {
574            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
575               enabledFiles.push_back(allFiles[i]);
576         }
577         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
578
579
580      if (enabledFiles.size() == 0)
581         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
582               << getId() << "\" !");
583   }
584
585   void CContext::findEnabledReadModeFiles(void)
586   {
587     int size = this->enabledFiles.size();
588     for (int i = 0; i < size; ++i)
589     {
590       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
591        enabledReadModeFiles.push_back(enabledFiles[i]);
592     }
593   }
594
595   void CContext::closeAllFile(void)
596   {
597     std::vector<CFile*>::const_iterator
598            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
599
600     for (; it != end; it++)
601     {
602       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
603       (*it)->close();
604     }
605   }
606
607   /*!
608   \brief Dispatch event received from client
609      Whenever a message is received in buffer of server, it will be processed depending on
610   its event type. A new event type should be added in the switch list to make sure
611   it processed on server side.
612   \param [in] event: Received message
613   */
614   bool CContext::dispatchEvent(CEventServer& event)
615   {
616
617      if (SuperClass::dispatchEvent(event)) return true;
618      else
619      {
620        switch(event.type)
621        {
622           case EVENT_ID_CLOSE_DEFINITION :
623             recvCloseDefinition(event);
624             return true;
625             break;
626           case EVENT_ID_UPDATE_CALENDAR:
627             recvUpdateCalendar(event);
628             return true;
629             break;
630           case EVENT_ID_CREATE_FILE_HEADER :
631             recvCreateFileHeader(event);
632             return true;
633             break;
634           case EVENT_ID_POST_PROCESS:
635             recvPostProcessing(event);
636             return true;
637            case EVENT_ID_SEND_REGISTRY:
638             recvRegistry(event);
639             return true;
640            break;
641
642           default :
643             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
644                    <<"Unknown Event");
645           return false;
646         }
647      }
648   }
649
650   //! Client side: Send a message to server to make it close
651   void CContext::sendCloseDefinition(void)
652   {
653     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
654     if (client->isServerLeader())
655     {
656       CMessage msg;
657       msg<<this->getIdServer();
658       const std::list<int>& ranks = client->getRanksServerLeader();
659       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
660         event.push(*itRank,1,msg);
661       client->sendEvent(event);
662     }
663     else client->sendEvent(event);
664   }
665
666   //! Server side: Receive a message of client announcing a context close
667   void CContext::recvCloseDefinition(CEventServer& event)
668   {
669
670      CBufferIn* buffer=event.subEvents.begin()->buffer;
671      string id;
672      *buffer>>id;
673      get(id)->closeDefinition();
674   }
675
676   //! Client side: Send a message to update calendar in each time step
677   void CContext::sendUpdateCalendar(int step)
678   {
679     if (!hasServer)
680     {
681       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
682       if (client->isServerLeader())
683       {
684         CMessage msg;
685         msg<<this->getIdServer()<<step;
686         const std::list<int>& ranks = client->getRanksServerLeader();
687         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
688           event.push(*itRank,1,msg);
689         client->sendEvent(event);
690       }
691       else client->sendEvent(event);
692     }
693   }
694
695   //! Server side: Receive a message of client annoucing calendar update
696   void CContext::recvUpdateCalendar(CEventServer& event)
697   {
698      CBufferIn* buffer=event.subEvents.begin()->buffer;
699      string id;
700      *buffer>>id;
701      get(id)->recvUpdateCalendar(*buffer);
702   }
703
704   //! Server side: Receive a message of client annoucing calendar update
705   void CContext::recvUpdateCalendar(CBufferIn& buffer)
706   {
707      int step;
708      buffer>>step;
709      updateCalendar(step);
710   }
711
712   //! Client side: Send a message to create header part of netcdf file
713   void CContext::sendCreateFileHeader(void)
714   {
715     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
716     if (client->isServerLeader())
717     {
718       CMessage msg;
719       msg<<this->getIdServer();
720       const std::list<int>& ranks = client->getRanksServerLeader();
721       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
722         event.push(*itRank,1,msg) ;
723       client->sendEvent(event);
724     }
725     else client->sendEvent(event);
726   }
727
728   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
729   void CContext::recvCreateFileHeader(CEventServer& event)
730   {
731      CBufferIn* buffer=event.subEvents.begin()->buffer;
732      string id;
733      *buffer>>id;
734      get(id)->recvCreateFileHeader(*buffer);
735   }
736
737   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
738   void CContext::recvCreateFileHeader(CBufferIn& buffer)
739   {
740      createFileHeader();
741   }
742
743   //! Client side: Send a message to do some post processing on server
744   void CContext::sendPostProcessing()
745   {
746     if (!hasServer)
747     {
748       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
749       if (client->isServerLeader())
750       {
751         CMessage msg;
752         msg<<this->getIdServer();
753         const std::list<int>& ranks = client->getRanksServerLeader();
754         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
755           event.push(*itRank,1,msg);
756         client->sendEvent(event);
757       }
758       else client->sendEvent(event);
759     }
760   }
761
762   //! Server side: Receive a message to do some post processing
763   void CContext::recvPostProcessing(CEventServer& event)
764   {
765      CBufferIn* buffer=event.subEvents.begin()->buffer;
766      string id;
767      *buffer>>id;
768      get(id)->recvPostProcessing(*buffer);
769   }
770
771   //! Server side: Receive a message to do some post processing
772   void CContext::recvPostProcessing(CBufferIn& buffer)
773   {
774      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
775      postProcessing();
776   }
777
778   const StdString& CContext::getIdServer()
779   {
780      if (hasClient)
781      {
782        idServer_ = this->getId();
783        idServer_ += "_server";
784        return idServer_;
785      }
786      if (hasServer) return (this->getId());
787   }
788
789   /*!
790   \brief Do some simple post processings after parsing xml file
791      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
792   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
793   which will be written out into netcdf files, are processed
794   */
795   void CContext::postProcessing()
796   {
797     int myRank;
798     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
799
800     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
801     if (isPostProcessed) return;
802
803      // Make sure the calendar was correctly created
804      if (!calendar)
805        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
806      else if (calendar->getTimeStep() == NoneDu)
807        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
808      // Calendar first update to set the current date equals to the start date
809      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
810
811      // Find all inheritance in xml structure
812      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
813
814      // Check if some axis, domains or grids are eligible to for compressed indexed output.
815      // Warning: This must be done after solving the inheritance and before the rest of post-processing
816      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
817
818      // Check if some automatic time series should be generated
819      // Warning: This must be done after solving the inheritance and before the rest of post-processing
820      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
821
822      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
823      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
824      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
825
826      // Find all enabled fields of each file
827      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
828      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
829
830     if (hasClient && !hasServer)
831     {
832      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
833      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
834     }
835
836      // Only search and rebuild all reference objects of enable fields, don't transform
837      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
838
839      // Search and rebuild all reference object of enabled fields
840      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
841
842      // Find all fields with read access from the public API
843      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
844      // and solve the all reference for them
845      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
846
847      isPostProcessed = true;
848   }
849
850   /*!
851    * Compute the required buffer size to send the attributes (mostly those grid related).
852    *
853    * \param maxEventSize [in/out] the size of the bigger event for each connected server
854    */
855   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
856   {
857     std::map<int, StdSize> attributesSize;
858
859     if (hasClient)
860     {
861       size_t numEnabledFiles = this->enabledFiles.size();
862       for (size_t i = 0; i < numEnabledFiles; ++i)
863       {
864         CFile* file = this->enabledFiles[i];
865
866         std::vector<CField*> enabledFields = file->getEnabledFields();
867         size_t numEnabledFields = enabledFields.size();
868         for (size_t j = 0; j < numEnabledFields; ++j)
869         {
870           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
871           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
872           for (; it != itE; ++it)
873           {
874             // If attributesSize[it->first] does not exist, it will be zero-initialized
875             // so we can use it safely without checking for its existance
876             if (attributesSize[it->first] < it->second)
877               attributesSize[it->first] = it->second;
878
879             if (maxEventSize[it->first] < it->second)
880               maxEventSize[it->first] = it->second;
881           }
882         }
883       }
884     }
885
886     return attributesSize;
887   }
888
889   /*!
890    * Compute the required buffer size to send the fields data.
891    *
892    * \param maxEventSize [in/out] the size of the bigger event for each connected server
893    */
894   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
895   {
896     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
897
898     std::map<int, StdSize> dataSize;
899
900     // Find all reference domain and axis of all active fields
901     size_t numEnabledFiles = this->enabledFiles.size();
902     for (size_t i = 0; i < numEnabledFiles; ++i)
903     {
904       CFile* file = this->enabledFiles[i];
905       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
906
907       if (fileMode == mode)
908       {
909         std::vector<CField*> enabledFields = file->getEnabledFields();
910         size_t numEnabledFields = enabledFields.size();
911         for (size_t j = 0; j < numEnabledFields; ++j)
912         {
913           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
914           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
915           for (; it != itE; ++it)
916           {
917             // If dataSize[it->first] does not exist, it will be zero-initialized
918             // so we can use it safely without checking for its existance
919             if (CXios::isOptPerformance)
920               dataSize[it->first] += it->second;
921             else if (dataSize[it->first] < it->second)
922               dataSize[it->first] = it->second;
923
924             if (maxEventSize[it->first] < it->second)
925               maxEventSize[it->first] = it->second;
926           }
927         }
928       }
929     }
930
931     return dataSize;
932   }
933
934   //! Client side: Send infomation of active files (files are enabled to write out)
935   void CContext::sendEnabledFiles()
936   {
937     int size = this->enabledFiles.size();
938
939     // In a context, each type has a root definition, e.g: axis, domain, field.
940     // Every object must be a child of one of these root definition. In this case
941     // all new file objects created on server must be children of the root "file_definition"
942     StdString fileDefRoot("file_definition");
943     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
944
945     for (int i = 0; i < size; ++i)
946     {
947       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
948       this->enabledFiles[i]->sendAllAttributesToServer();
949       this->enabledFiles[i]->sendAddAllVariables();
950     }
951   }
952
953   //! Client side: Send information of active fields (ones are written onto files)
954   void CContext::sendEnabledFields()
955   {
956     int size = this->enabledFiles.size();
957     for (int i = 0; i < size; ++i)
958     {
959       this->enabledFiles[i]->sendEnabledFields();
960     }
961   }
962
963   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
964   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
965   {
966     if (!hasClient) return;
967
968     const vector<CAxis*> allAxis = CAxis::getAll();
969     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
970       (*it)->checkEligibilityForCompressedOutput();
971
972     const vector<CDomain*> allDomains = CDomain::getAll();
973     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
974       (*it)->checkEligibilityForCompressedOutput();
975
976     const vector<CGrid*> allGrids = CGrid::getAll();
977     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
978       (*it)->checkEligibilityForCompressedOutput();
979   }
980
981   //! Client side: Prepare the timeseries by adding the necessary files
982   void CContext::prepareTimeseries()
983   {
984     if (!hasClient) return;
985
986     const std::vector<CFile*> allFiles = CFile::getAll();
987     for (size_t i = 0; i < allFiles.size(); i++)
988     {
989       CFile* file = allFiles[i];
990
991       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
992       {
993         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
994
995         const std::vector<CField*> allFields = file->getAllFields();
996         for (size_t j = 0; j < allFields.size(); j++)
997         {
998           CField* field = allFields[j];
999
1000           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1001           {
1002             CFile* tsFile = CFile::create();
1003             tsFile->duplicateAttributes(file);
1004             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1005
1006             tsFile->name = tsPrefix + "_";
1007             if (!field->name.isEmpty())
1008               tsFile->name.get() += field->name;
1009             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1010               tsFile->name.get() += field->field_ref;
1011             else
1012               tsFile->name.get() += field->getId();
1013
1014             if (!field->ts_split_freq.isEmpty())
1015               tsFile->split_freq = field->ts_split_freq;
1016
1017             CField* tsField = tsFile->addField();
1018             tsField->field_ref = field->getId();
1019             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1020
1021             tsFile->solveFieldRefInheritance(true);
1022
1023             if (file->timeseries == CFile::timeseries_attr::exclusive)
1024               field->enabled = false;
1025           }
1026         }
1027
1028         // Finally disable the original file is need be
1029         if (file->timeseries == CFile::timeseries_attr::only)
1030          file->enabled = false;
1031       }
1032     }
1033   }
1034
1035   //! Client side: Send information of reference grid of active fields
1036   void CContext::sendRefGrid()
1037   {
1038     std::set<StdString> gridIds;
1039     int sizeFile = this->enabledFiles.size();
1040     CFile* filePtr(NULL);
1041
1042     // Firstly, find all reference grids of all active fields
1043     for (int i = 0; i < sizeFile; ++i)
1044     {
1045       filePtr = this->enabledFiles[i];
1046       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1047       int sizeField = enabledFields.size();
1048       for (int numField = 0; numField < sizeField; ++numField)
1049       {
1050         if (0 != enabledFields[numField]->getRelGrid())
1051           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1052       }
1053     }
1054
1055     // Create all reference grids on server side
1056     StdString gridDefRoot("grid_definition");
1057     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1058     std::set<StdString>::const_iterator it, itE = gridIds.end();
1059     for (it = gridIds.begin(); it != itE; ++it)
1060     {
1061       gridPtr->sendCreateChild(*it);
1062       CGrid::get(*it)->sendAllAttributesToServer();
1063       CGrid::get(*it)->sendAllDomains();
1064       CGrid::get(*it)->sendAllAxis();
1065       CGrid::get(*it)->sendAllScalars();
1066     }
1067   }
1068
1069
1070   //! Client side: Send information of reference domain and axis of active fields
1071   void CContext::sendRefDomainsAxis()
1072   {
1073     std::set<StdString> domainIds, axisIds, scalarIds;
1074
1075     // Find all reference domain and axis of all active fields
1076     int numEnabledFiles = this->enabledFiles.size();
1077     for (int i = 0; i < numEnabledFiles; ++i)
1078     {
1079       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1080       int numEnabledFields = enabledFields.size();
1081       for (int j = 0; j < numEnabledFields; ++j)
1082       {
1083         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1084         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1085         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1086         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1087       }
1088     }
1089
1090     // Create all reference axis on server side
1091     std::set<StdString>::iterator itDom, itAxis, itScalar;
1092     std::set<StdString>::const_iterator itE;
1093
1094     StdString scalarDefRoot("scalar_definition");
1095     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1096     itE = scalarIds.end();
1097     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1098     {
1099       if (!itScalar->empty())
1100       {
1101         scalarPtr->sendCreateChild(*itScalar);
1102         CScalar::get(*itScalar)->sendAllAttributesToServer();
1103       }
1104     }
1105
1106     StdString axiDefRoot("axis_definition");
1107     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1108     itE = axisIds.end();
1109     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1110     {
1111       if (!itAxis->empty())
1112       {
1113         axisPtr->sendCreateChild(*itAxis);
1114         CAxis::get(*itAxis)->sendAllAttributesToServer();
1115       }
1116     }
1117
1118     // Create all reference domains on server side
1119     StdString domDefRoot("domain_definition");
1120     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1121     itE = domainIds.end();
1122     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1123     {
1124       if (!itDom->empty()) {
1125          domPtr->sendCreateChild(*itDom);
1126          CDomain::get(*itDom)->sendAllAttributesToServer();
1127       }
1128     }
1129   }
1130
1131   //! Update calendar in each time step
1132   void CContext::updateCalendar(int step)
1133   {
1134      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1135      calendar->update(step);
1136      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1137
1138      if (hasClient)
1139      {
1140        checkPrefetchingOfEnabledReadModeFiles();
1141        garbageCollector.invalidate(calendar->getCurrentDate());
1142      }
1143   }
1144
1145   //! Server side: Create header of netcdf file
1146   void CContext::createFileHeader(void )
1147   {
1148      vector<CFile*>::const_iterator it;
1149
1150      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1151      {
1152         (*it)->initFile();
1153      }
1154   }
1155
1156   //! Get current context
1157   CContext* CContext::getCurrent(void)
1158   {
1159     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1160   }
1161
1162   /*!
1163   \brief Set context with an id be the current context
1164   \param [in] id identity of context to be set to current
1165   */
1166   void CContext::setCurrent(const string& id)
1167   {
1168     CObjectFactory::SetCurrentContextId(id);
1169     CGroupFactory::SetCurrentContextId(id);
1170   }
1171
1172  /*!
1173  \brief Create a context with specific id
1174  \param [in] id identity of new context
1175  \return pointer to the new context or already-existed one with identity id
1176  */
1177  CContext* CContext::create(const StdString& id)
1178  {
1179    CContext::setCurrent(id);
1180
1181    bool hasctxt = CContext::has(id);
1182    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1183    getRoot();
1184    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1185
1186#define DECLARE_NODE(Name_, name_) \
1187    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1188#define DECLARE_NODE_PAR(Name_, name_)
1189#include "node_type.conf"
1190
1191    return (context);
1192  }
1193
1194
1195
1196     //! Server side: Receive a message to do some post processing
1197  void CContext::recvRegistry(CEventServer& event)
1198  {
1199    CBufferIn* buffer=event.subEvents.begin()->buffer;
1200    string id;
1201    *buffer>>id;
1202    get(id)->recvRegistry(*buffer);
1203  }
1204
1205  void CContext::recvRegistry(CBufferIn& buffer)
1206  {
1207    if (server->intraCommRank==0)
1208    {
1209      CRegistry registry(server->intraComm) ;
1210      registry.fromBuffer(buffer) ;
1211      registryOut->mergeRegistry(registry) ;
1212    }
1213  }
1214
1215  void CContext::sendRegistry(void)
1216  {
1217    registryOut->hierarchicalGatherRegistry() ;
1218
1219    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1220    if (client->isServerLeader())
1221    {
1222       CMessage msg ;
1223       msg<<this->getIdServer();
1224       if (client->clientRank==0) msg<<*registryOut ;
1225       const std::list<int>& ranks = client->getRanksServerLeader();
1226       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1227         event.push(*itRank,1,msg);
1228       client->sendEvent(event);
1229    }
1230    else client->sendEvent(event);
1231  }
1232
1233} // namespace xios
Note: See TracBrowser for help on using the repository browser.