source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1074

Last change on this file since 1074 was 1074, checked in by yushan, 7 years ago

save modif

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 41.2 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Dfinitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     ep_lib::MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262       
263     }
264     server = new CContextServer(this,intraCommServer,interCommServer);
265   }
266
267   void CContext::setClientServerBuffer()
268   {
269     size_t minBufferSize = CXios::minBufferSize;
270#define DECLARE_NODE(Name_, name_)    \
271     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
272#define DECLARE_NODE_PAR(Name_, name_)
273#include "node_type.conf"
274#undef DECLARE_NODE
275#undef DECLARE_NODE_PAR
276
277     std::map<int, StdSize> maxEventSize;
278     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
279     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
280
281     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
282     for (it = dataBufferSize.begin(); it != ite; ++it)
283       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
284
285     ite = bufferSize.end();
286     for (it = bufferSize.begin(); it != ite; ++it)
287     {
288       it->second *= CXios::bufferSizeFactor;
289       if (it->second < minBufferSize) it->second = minBufferSize;
290     }
291
292     // We consider that the minimum buffer size is also the minimum event size
293     ite = maxEventSize.end();
294     for (it = maxEventSize.begin(); it != ite; ++it)
295       if (it->second < minBufferSize) it->second = minBufferSize;
296
297     if (client->isServerLeader())
298     {
299       const std::list<int>& ranks = client->getRanksServerLeader();
300       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
301         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
302     }
303
304     client->setBufferSize(bufferSize, maxEventSize);
305   }
306
307   //! Verify whether a context is initialized
308   bool CContext::isInitialized(void)
309   {
310     return hasClient;
311   }
312
313   //! Initialize server
314   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
315   {
316     hasServer=true;
317     server = new CContextServer(this,intraComm,interComm);
318
319     registryIn=new CRegistry(intraComm);
320     registryIn->setPath(getId()) ;
321     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
322     registryIn->bcastRegistry() ;
323     registryOut=new CRegistry(intraComm) ;
324     registryOut->setPath(getId()) ;
325
326     ep_lib::MPI_Comm intraCommClient, interCommClient;
327     if (cxtClient) // Attached mode
328     {
329       intraCommClient = intraComm;
330       interCommClient = interComm;
331     }
332     else
333     {
334       MPI_Comm_dup(intraComm, &intraCommClient);
335       comms.push_back(intraCommClient);
336       MPI_Comm_dup(interComm, &interCommClient);
337       comms.push_back(interCommClient);
338     }
339     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
340   }
341
342   //! Server side: Put server into a loop in order to listen message from client
343   bool CContext::eventLoop(void)
344   {
345     return server->eventLoop();
346   }
347
348   //! Try to send the buffers and receive possible answers
349   bool CContext::checkBuffersAndListen(void)
350   {
351     client->checkBuffers();
352     return server->eventLoop();
353   }
354
355   //! Terminate a context
356   void CContext::finalize(void)
357   {
358     
359      if (!finalized)
360      {
361        finalized = true;
362        if (hasClient) sendRegistry() ;
363        client->finalize();
364        while (!server->hasFinished())
365        {
366          server->eventLoop();
367        }
368
369        if (hasServer)
370        {
371          closeAllFile();
372          registryOut->hierarchicalGatherRegistry() ;
373          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
374        }
375
376        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
377          MPI_Comm_free(&(*it));
378        comms.clear();
379      }
380   }
381
382   /*!
383   \brief Close all the context defintion and do processing data
384      After everything is well defined on client side, they will be processed and sent to server
385   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
386   all necessary information to server, from which each server can build its own database.
387   Because the role of server is to write out field data on a specific netcdf file,
388   the only information that it needs is the enabled files
389   and the active fields (fields will be written onto active files)
390   */
391   void CContext::closeDefinition(void)
392   {
393
394     // There is nothing client need to send to server
395     if (hasClient)
396     {
397       // After xml is parsed, there are some more works with post processing
398       postProcessing(); 
399     }
400     setClientServerBuffer(); 
401
402     if (hasClient && !hasServer)
403     {
404      // Send all attributes of current context to server
405      this->sendAllAttributesToServer();
406
407      // Send all attributes of current calendar
408      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
409
410      // We have enough information to send to server
411      // First of all, send all enabled files
412       sendEnabledFiles(); 
413
414      // Then, send all enabled fields
415       sendEnabledFields(); 
416
417      // At last, we have all info of domain and axis, then send them
418       sendRefDomainsAxis(); 
419      // After that, send all grid (if any)
420       sendRefGrid(); 
421    }
422
423    // We have a xml tree on the server side and now, it should be also processed
424    if (hasClient && !hasServer) sendPostProcessing(); 
425
426    // There are some processings that should be done after all of above. For example: check mask or index
427    if (hasClient)
428    {
429      this->buildFilterGraphOfEnabledFields(); 
430      buildFilterGraphOfFieldsWithReadAccess(); 
431      this->solveAllRefOfEnabledFields(true); 
432    }
433
434    // Now tell server that it can process all messages from client
435    if (hasClient && !hasServer) this->sendCloseDefinition();
436
437    // Nettoyage de l'arborescence
438    if (hasClient && !hasServer) CleanTree();
439
440    if (hasClient)
441    {
442      sendCreateFileHeader(); 
443
444      startPrefetchingOfEnabledReadModeFiles(); 
445    }
446   }
447
448   void CContext::findAllEnabledFields(void)
449   {
450     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
451     (void)this->enabledFiles[i]->getEnabledFields();
452   }
453
454   void CContext::findAllEnabledFieldsInReadModeFiles(void)
455   {
456     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
457     (void)this->enabledReadModeFiles[i]->getEnabledFields();
458   }
459
460   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
461   {
462      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
463        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
464   }
465
466   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
467   {
468     int size = this->enabledFiles.size();
469     for (int i = 0; i < size; ++i)
470     {
471       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
472     }
473
474     for (int i = 0; i < size; ++i)
475     {
476       this->enabledFiles[i]->generateNewTransformationGridDest();
477     }
478   }
479
480   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
481   {
482     int size = this->enabledFiles.size();
483     
484     for (int i = 0; i < size; ++i)
485     {
486       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
487     }
488   }
489
490   void CContext::buildFilterGraphOfEnabledFields()
491   {
492     int size = this->enabledFiles.size();
493     for (int i = 0; i < size; ++i)
494     {
495       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
496     }
497   }
498
499   void CContext::startPrefetchingOfEnabledReadModeFiles()
500   {
501     int size = enabledReadModeFiles.size();
502     for (int i = 0; i < size; ++i)
503     {
504        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
505     }
506   }
507
508   void CContext::checkPrefetchingOfEnabledReadModeFiles()
509   {
510     int size = enabledReadModeFiles.size();
511     for (int i = 0; i < size; ++i)
512     {
513        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
514     }
515   }
516
517  void CContext::findFieldsWithReadAccess(void)
518  {
519    fieldsWithReadAccess.clear();
520    const vector<CField*> allFields = CField::getAll();
521    for (size_t i = 0; i < allFields.size(); ++i)
522    {
523      CField* field = allFields[i];
524
525      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
526        field->read_access = true;
527      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
528        fieldsWithReadAccess.push_back(field);
529    }
530  }
531
532  void CContext::solveAllRefOfFieldsWithReadAccess()
533  {
534    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
535      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
536  }
537
538  void CContext::buildFilterGraphOfFieldsWithReadAccess()
539  {
540    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
541      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
542  }
543
544   void CContext::solveAllInheritance(bool apply)
545   {
546     // Rsolution des hritages descendants (cd des hritages de groupes)
547     // pour chacun des contextes.
548      solveDescInheritance(apply);
549
550     // Rsolution des hritages par rfrence au niveau des fichiers.
551      const vector<CFile*> allFiles=CFile::getAll();
552      const vector<CGrid*> allGrids= CGrid::getAll();
553
554     //if (hasClient && !hasServer)
555      if (hasClient)
556      {
557        for (unsigned int i = 0; i < allFiles.size(); i++)
558          allFiles[i]->solveFieldRefInheritance(apply);
559      }
560
561      unsigned int vecSize = allGrids.size();
562      unsigned int i = 0;
563      for (i = 0; i < vecSize; ++i)
564        allGrids[i]->solveDomainAxisRefInheritance(apply);
565
566   }
567
568   void CContext::findEnabledFiles(void)
569   {
570      const std::vector<CFile*> allFiles = CFile::getAll();
571
572      for (unsigned int i = 0; i < allFiles.size(); i++)
573         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
574         {
575            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
576               enabledFiles.push_back(allFiles[i]);
577         }
578         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
579
580
581      if (enabledFiles.size() == 0)
582         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
583               << getId() << "\" !");
584   }
585
586   void CContext::findEnabledReadModeFiles(void)
587   {
588     int size = this->enabledFiles.size();
589     for (int i = 0; i < size; ++i)
590     {
591       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
592        enabledReadModeFiles.push_back(enabledFiles[i]);
593     }
594   }
595
596   void CContext::closeAllFile(void)
597   {
598     std::vector<CFile*>::const_iterator
599            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
600
601     for (; it != end; it++)
602     {
603       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
604       (*it)->close();
605     }
606   }
607
608   /*!
609   \brief Dispatch event received from client
610      Whenever a message is received in buffer of server, it will be processed depending on
611   its event type. A new event type should be added in the switch list to make sure
612   it processed on server side.
613   \param [in] event: Received message
614   */
615   bool CContext::dispatchEvent(CEventServer& event)
616   {
617
618      if (SuperClass::dispatchEvent(event)) return true;
619      else
620      {
621        switch(event.type)
622        {
623           case EVENT_ID_CLOSE_DEFINITION :
624             recvCloseDefinition(event);
625             return true;
626             break;
627           case EVENT_ID_UPDATE_CALENDAR:
628             recvUpdateCalendar(event);
629             return true;
630             break;
631           case EVENT_ID_CREATE_FILE_HEADER :
632             recvCreateFileHeader(event);
633             return true;
634             break;
635           case EVENT_ID_POST_PROCESS:
636             recvPostProcessing(event);
637             return true;
638            case EVENT_ID_SEND_REGISTRY:
639             recvRegistry(event);
640             return true;
641            break;
642
643           default :
644             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
645                    <<"Unknown Event");
646           return false;
647         }
648      }
649   }
650
651   //! Client side: Send a message to server to make it close
652   void CContext::sendCloseDefinition(void)
653   {
654     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
655     if (client->isServerLeader())
656     {
657       CMessage msg;
658       msg<<this->getIdServer();
659       const std::list<int>& ranks = client->getRanksServerLeader();
660       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
661         event.push(*itRank,1,msg);
662       client->sendEvent(event);
663     }
664     else client->sendEvent(event);
665   }
666
667   //! Server side: Receive a message of client announcing a context close
668   void CContext::recvCloseDefinition(CEventServer& event)
669   {
670
671      CBufferIn* buffer=event.subEvents.begin()->buffer;
672      string id;
673      *buffer>>id;
674      get(id)->closeDefinition();
675   }
676
677   //! Client side: Send a message to update calendar in each time step
678   void CContext::sendUpdateCalendar(int step)
679   {
680     if (!hasServer)
681     {
682       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
683       if (client->isServerLeader())
684       {
685         CMessage msg;
686         msg<<this->getIdServer()<<step;
687         const std::list<int>& ranks = client->getRanksServerLeader();
688         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
689           event.push(*itRank,1,msg);
690         client->sendEvent(event);
691       }
692       else client->sendEvent(event);
693     }
694   }
695
696   //! Server side: Receive a message of client annoucing calendar update
697   void CContext::recvUpdateCalendar(CEventServer& event)
698   {
699      CBufferIn* buffer=event.subEvents.begin()->buffer;
700      string id;
701      *buffer>>id;
702      get(id)->recvUpdateCalendar(*buffer);
703   }
704
705   //! Server side: Receive a message of client annoucing calendar update
706   void CContext::recvUpdateCalendar(CBufferIn& buffer)
707   {
708      int step;
709      buffer>>step;
710      updateCalendar(step);
711   }
712
713   //! Client side: Send a message to create header part of netcdf file
714   void CContext::sendCreateFileHeader(void)
715   {
716     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
717     if (client->isServerLeader())
718     {
719       CMessage msg;
720       msg<<this->getIdServer();
721       const std::list<int>& ranks = client->getRanksServerLeader();
722       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
723         event.push(*itRank,1,msg) ;
724       client->sendEvent(event);
725     }
726     else client->sendEvent(event);
727   }
728
729   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
730   void CContext::recvCreateFileHeader(CEventServer& event)
731   {
732      CBufferIn* buffer=event.subEvents.begin()->buffer;
733      string id;
734      *buffer>>id;
735      get(id)->recvCreateFileHeader(*buffer);
736   }
737
738   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
739   void CContext::recvCreateFileHeader(CBufferIn& buffer)
740   {
741      createFileHeader();
742   }
743
744   //! Client side: Send a message to do some post processing on server
745   void CContext::sendPostProcessing()
746   {
747     if (!hasServer)
748     {
749       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
750       if (client->isServerLeader())
751       {
752         CMessage msg;
753         msg<<this->getIdServer();
754         const std::list<int>& ranks = client->getRanksServerLeader();
755         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
756           event.push(*itRank,1,msg);
757         client->sendEvent(event);
758       }
759       else client->sendEvent(event);
760     }
761   }
762
763   //! Server side: Receive a message to do some post processing
764   void CContext::recvPostProcessing(CEventServer& event)
765   {
766      CBufferIn* buffer=event.subEvents.begin()->buffer;
767      string id;
768      *buffer>>id;
769      get(id)->recvPostProcessing(*buffer);
770   }
771
772   //! Server side: Receive a message to do some post processing
773   void CContext::recvPostProcessing(CBufferIn& buffer)
774   {
775      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
776      postProcessing();
777   }
778
779   const StdString& CContext::getIdServer()
780   {
781      if (hasClient)
782      {
783        idServer_ = this->getId();
784        idServer_ += "_server";
785        return idServer_;
786      }
787      if (hasServer) return (this->getId());
788   }
789
790   /*!
791   \brief Do some simple post processings after parsing xml file
792      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
793   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
794   which will be written out into netcdf files, are processed
795   */
796   void CContext::postProcessing()
797   {
798     int myRank;
799     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
800
801     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
802     if (isPostProcessed) return;
803
804      // Make sure the calendar was correctly created
805      if (!calendar)
806        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
807      else if (calendar->getTimeStep() == NoneDu)
808        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
809      // Calendar first update to set the current date equals to the start date
810      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
811
812      // Find all inheritance in xml structure
813      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
814
815      // Check if some axis, domains or grids are eligible to for compressed indexed output.
816      // Warning: This must be done after solving the inheritance and before the rest of post-processing
817      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
818
819      // Check if some automatic time series should be generated
820      // Warning: This must be done after solving the inheritance and before the rest of post-processing
821      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
822
823      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
824      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
825      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
826
827      // Find all enabled fields of each file
828      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
829      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
830
831     if (hasClient && !hasServer)
832     {
833      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
834      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
835     }
836
837      // Only search and rebuild all reference objects of enable fields, don't transform
838      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
839
840      // Search and rebuild all reference object of enabled fields
841      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
842
843      // Find all fields with read access from the public API
844      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
845      // and solve the all reference for them
846      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
847
848      isPostProcessed = true;
849   }
850
851   /*!
852    * Compute the required buffer size to send the attributes (mostly those grid related).
853    *
854    * \param maxEventSize [in/out] the size of the bigger event for each connected server
855    */
856   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
857   {
858     std::map<int, StdSize> attributesSize;
859
860     if (hasClient)
861     {
862       size_t numEnabledFiles = this->enabledFiles.size();
863       for (size_t i = 0; i < numEnabledFiles; ++i)
864       {
865         CFile* file = this->enabledFiles[i];
866
867         std::vector<CField*> enabledFields = file->getEnabledFields();
868         size_t numEnabledFields = enabledFields.size();
869         for (size_t j = 0; j < numEnabledFields; ++j)
870         {
871           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
872           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
873           for (; it != itE; ++it)
874           {
875             // If attributesSize[it->first] does not exist, it will be zero-initialized
876             // so we can use it safely without checking for its existance
877             if (attributesSize[it->first] < it->second)
878               attributesSize[it->first] = it->second;
879
880             if (maxEventSize[it->first] < it->second)
881               maxEventSize[it->first] = it->second;
882           }
883         }
884       }
885     }
886
887     return attributesSize;
888   }
889
890   /*!
891    * Compute the required buffer size to send the fields data.
892    *
893    * \param maxEventSize [in/out] the size of the bigger event for each connected server
894    */
895   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
896   {
897     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
898
899     std::map<int, StdSize> dataSize;
900
901     // Find all reference domain and axis of all active fields
902     size_t numEnabledFiles = this->enabledFiles.size();
903     for (size_t i = 0; i < numEnabledFiles; ++i)
904     {
905       CFile* file = this->enabledFiles[i];
906       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
907
908       if (fileMode == mode)
909       {
910         std::vector<CField*> enabledFields = file->getEnabledFields();
911         size_t numEnabledFields = enabledFields.size();
912         for (size_t j = 0; j < numEnabledFields; ++j)
913         {
914           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
915           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
916           for (; it != itE; ++it)
917           {
918             // If dataSize[it->first] does not exist, it will be zero-initialized
919             // so we can use it safely without checking for its existance
920             if (CXios::isOptPerformance)
921               dataSize[it->first] += it->second;
922             else if (dataSize[it->first] < it->second)
923               dataSize[it->first] = it->second;
924
925             if (maxEventSize[it->first] < it->second)
926               maxEventSize[it->first] = it->second;
927           }
928         }
929       }
930     }
931
932     return dataSize;
933   }
934
935   //! Client side: Send infomation of active files (files are enabled to write out)
936   void CContext::sendEnabledFiles()
937   {
938     int size = this->enabledFiles.size();
939
940     // In a context, each type has a root definition, e.g: axis, domain, field.
941     // Every object must be a child of one of these root definition. In this case
942     // all new file objects created on server must be children of the root "file_definition"
943     StdString fileDefRoot("file_definition");
944     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
945
946     for (int i = 0; i < size; ++i)
947     {
948       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
949       this->enabledFiles[i]->sendAllAttributesToServer();
950       this->enabledFiles[i]->sendAddAllVariables();
951     }
952   }
953
954   //! Client side: Send information of active fields (ones are written onto files)
955   void CContext::sendEnabledFields()
956   {
957     int size = this->enabledFiles.size();
958     for (int i = 0; i < size; ++i)
959     {
960       this->enabledFiles[i]->sendEnabledFields();
961     }
962   }
963
964   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
965   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
966   {
967     if (!hasClient) return;
968
969     const vector<CAxis*> allAxis = CAxis::getAll();
970     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
971       (*it)->checkEligibilityForCompressedOutput();
972
973     const vector<CDomain*> allDomains = CDomain::getAll();
974     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
975       (*it)->checkEligibilityForCompressedOutput();
976
977     const vector<CGrid*> allGrids = CGrid::getAll();
978     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
979       (*it)->checkEligibilityForCompressedOutput();
980   }
981
982   //! Client side: Prepare the timeseries by adding the necessary files
983   void CContext::prepareTimeseries()
984   {
985     if (!hasClient) return;
986
987     const std::vector<CFile*> allFiles = CFile::getAll();
988     for (size_t i = 0; i < allFiles.size(); i++)
989     {
990       CFile* file = allFiles[i];
991
992       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
993       {
994         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
995
996         const std::vector<CField*> allFields = file->getAllFields();
997         for (size_t j = 0; j < allFields.size(); j++)
998         {
999           CField* field = allFields[j];
1000
1001           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1002           {
1003             CFile* tsFile = CFile::create();
1004             tsFile->duplicateAttributes(file);
1005             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1006
1007             tsFile->name = tsPrefix + "_";
1008             if (!field->name.isEmpty())
1009               tsFile->name.get() += field->name;
1010             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1011               tsFile->name.get() += field->field_ref;
1012             else
1013               tsFile->name.get() += field->getId();
1014
1015             if (!field->ts_split_freq.isEmpty())
1016               tsFile->split_freq = field->ts_split_freq;
1017
1018             CField* tsField = tsFile->addField();
1019             tsField->field_ref = field->getId();
1020             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1021
1022             tsFile->solveFieldRefInheritance(true);
1023
1024             if (file->timeseries == CFile::timeseries_attr::exclusive)
1025               field->enabled = false;
1026           }
1027         }
1028
1029         // Finally disable the original file is need be
1030         if (file->timeseries == CFile::timeseries_attr::only)
1031          file->enabled = false;
1032       }
1033     }
1034   }
1035
1036   //! Client side: Send information of reference grid of active fields
1037   void CContext::sendRefGrid()
1038   {
1039     std::set<StdString> gridIds;
1040     int sizeFile = this->enabledFiles.size();
1041     CFile* filePtr(NULL);
1042
1043     // Firstly, find all reference grids of all active fields
1044     for (int i = 0; i < sizeFile; ++i)
1045     {
1046       filePtr = this->enabledFiles[i];
1047       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1048       int sizeField = enabledFields.size();
1049       for (int numField = 0; numField < sizeField; ++numField)
1050       {
1051         if (0 != enabledFields[numField]->getRelGrid())
1052           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1053       }
1054     }
1055
1056     // Create all reference grids on server side
1057     StdString gridDefRoot("grid_definition");
1058     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1059     std::set<StdString>::const_iterator it, itE = gridIds.end();
1060     for (it = gridIds.begin(); it != itE; ++it)
1061     {
1062       gridPtr->sendCreateChild(*it);
1063       CGrid::get(*it)->sendAllAttributesToServer();
1064       CGrid::get(*it)->sendAllDomains();
1065       CGrid::get(*it)->sendAllAxis();
1066       CGrid::get(*it)->sendAllScalars();
1067     }
1068   }
1069
1070
1071   //! Client side: Send information of reference domain and axis of active fields
1072   void CContext::sendRefDomainsAxis()
1073   {
1074     std::set<StdString> domainIds, axisIds, scalarIds;
1075
1076     // Find all reference domain and axis of all active fields
1077     int numEnabledFiles = this->enabledFiles.size();
1078     for (int i = 0; i < numEnabledFiles; ++i)
1079     {
1080       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1081       int numEnabledFields = enabledFields.size();
1082       for (int j = 0; j < numEnabledFields; ++j)
1083       {
1084         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1085         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1086         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1087         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1088       }
1089     }
1090
1091     // Create all reference axis on server side
1092     std::set<StdString>::iterator itDom, itAxis, itScalar;
1093     std::set<StdString>::const_iterator itE;
1094
1095     StdString scalarDefRoot("scalar_definition");
1096     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1097     itE = scalarIds.end();
1098     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1099     {
1100       if (!itScalar->empty())
1101       {
1102         scalarPtr->sendCreateChild(*itScalar);
1103         CScalar::get(*itScalar)->sendAllAttributesToServer();
1104       }
1105     }
1106
1107     StdString axiDefRoot("axis_definition");
1108     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1109     itE = axisIds.end();
1110     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1111     {
1112       if (!itAxis->empty())
1113       {
1114         axisPtr->sendCreateChild(*itAxis);
1115         CAxis::get(*itAxis)->sendAllAttributesToServer();
1116       }
1117     }
1118
1119     // Create all reference domains on server side
1120     StdString domDefRoot("domain_definition");
1121     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1122     itE = domainIds.end();
1123     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1124     {
1125       if (!itDom->empty()) {
1126          domPtr->sendCreateChild(*itDom);
1127          CDomain::get(*itDom)->sendAllAttributesToServer();
1128       }
1129     }
1130   }
1131
1132   //! Update calendar in each time step
1133   void CContext::updateCalendar(int step)
1134   {
1135      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1136      calendar->update(step);
1137      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1138
1139      if (hasClient)
1140      {
1141        checkPrefetchingOfEnabledReadModeFiles();
1142        garbageCollector.invalidate(calendar->getCurrentDate());
1143      }
1144   }
1145
1146   //! Server side: Create header of netcdf file
1147   void CContext::createFileHeader(void )
1148   {
1149      vector<CFile*>::const_iterator it;
1150
1151      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1152      {
1153         (*it)->initFile();
1154      }
1155   }
1156
1157   //! Get current context
1158   CContext* CContext::getCurrent(void)
1159   {
1160     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1161   }
1162
1163   /*!
1164   \brief Set context with an id be the current context
1165   \param [in] id identity of context to be set to current
1166   */
1167   void CContext::setCurrent(const string& id)
1168   {
1169     CObjectFactory::SetCurrentContextId(id);
1170     CGroupFactory::SetCurrentContextId(id);
1171   }
1172
1173  /*!
1174  \brief Create a context with specific id
1175  \param [in] id identity of new context
1176  \return pointer to the new context or already-existed one with identity id
1177  */
1178  CContext* CContext::create(const StdString& id)
1179  {
1180    CContext::setCurrent(id);
1181
1182    bool hasctxt = CContext::has(id);
1183    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1184    getRoot();
1185    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1186
1187#define DECLARE_NODE(Name_, name_) \
1188    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1189#define DECLARE_NODE_PAR(Name_, name_)
1190#include "node_type.conf"
1191
1192    return (context);
1193  }
1194
1195
1196
1197     //! Server side: Receive a message to do some post processing
1198  void CContext::recvRegistry(CEventServer& event)
1199  {
1200    CBufferIn* buffer=event.subEvents.begin()->buffer;
1201    string id;
1202    *buffer>>id;
1203    get(id)->recvRegistry(*buffer);
1204  }
1205
1206  void CContext::recvRegistry(CBufferIn& buffer)
1207  {
1208    if (server->intraCommRank==0)
1209    {
1210      CRegistry registry(server->intraComm) ;
1211      registry.fromBuffer(buffer) ;
1212      registryOut->mergeRegistry(registry) ;
1213    }
1214  }
1215
1216  void CContext::sendRegistry(void)
1217  {
1218    registryOut->hierarchicalGatherRegistry() ;
1219
1220    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1221    if (client->isServerLeader())
1222    {
1223       CMessage msg ;
1224       msg<<this->getIdServer();
1225       if (client->clientRank==0) msg<<*registryOut ;
1226       const std::list<int>& ranks = client->getRanksServerLeader();
1227       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1228         event.push(*itRank,1,msg);
1229       client->sendEvent(event);
1230    }
1231    else client->sendEvent(event);
1232  }
1233
1234} // namespace xios
Note: See TracBrowser for help on using the repository browser.