source: XIOS/dev/branch_yushan_merged/src/node/context.cpp @ 1160

Last change on this file since 1160 was 1160, checked in by yushan, 4 years ago

fix bad commit. Back to R1155 branch not merged

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 43.5 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17namespace xios {
18
19  //shared_ptr<CContextGroup> CContext::root;
20  shared_ptr<CContextGroup> * CContext::root_ptr = 0;
21
22   /// ////////////////////// Dfinitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      //if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      //return root.get();
58
59      //static shared_ptr<CContextGroup> *root_ptr;
60      if(root_ptr == 0) //root_ptr = new shared_ptr<CContextGroup>;
61      // if (root_ptr->get()==NULL)
62      root_ptr = new shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
63      return root_ptr->get();
64   }
65
66   //----------------------------------------------------------------
67
68   /*!
69   \brief Get calendar of a context
70   \return Calendar
71   */
72   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
73   {
74      return (this->calendar);
75   }
76
77   //----------------------------------------------------------------
78
79   /*!
80   \brief Set a context with a calendar
81   \param[in] newCalendar new calendar
82   */
83   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
84   {
85      this->calendar = newCalendar;
86   }
87
88   //----------------------------------------------------------------
89   /*!
90   \brief Parse xml file and write information into context object
91   \param [in] node xmld node corresponding in xml file
92   */
93   void CContext::parse(xml::CXMLNode & node)
94   {
95      CContext::SuperClass::parse(node);
96
97      // PARSING POUR GESTION DES ENFANTS
98      xml::THashAttributes attributes = node.getAttributes();
99
100      if (attributes.end() != attributes.find("src"))
101      {
102         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
103         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
104            ERROR("void CContext::parse(xml::CXMLNode & node)",
105                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
106         if (!ifs.good())
107            ERROR("CContext::parse(xml::CXMLNode & node)",
108                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
109         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
110      }
111
112      if (node.getElementName().compare(CContext::GetName()))
113         DEBUG("Le noeud is wrong defined but will be considered as a context !");
114
115      if (!(node.goToChildElement()))
116      {
117         DEBUG("Le context ne contient pas d'enfant !");
118      }
119      else
120      {
121         do { // Parcours des contextes pour traitement.
122
123            StdString name = node.getElementName();
124            attributes.clear();
125            attributes = node.getAttributes();
126
127            if (attributes.end() != attributes.find("id"))
128            {
129              DEBUG(<< "Definition node has an id,"
130                    << "it will not be taking account !");
131            }
132
133#define DECLARE_NODE(Name_, name_)    \
134   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
135   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
136#define DECLARE_NODE_PAR(Name_, name_)
137#include "node_type.conf"
138
139            DEBUG(<< "The element \'"     << name
140                  << "\' in the context \'" << CContext::getCurrent()->getId()
141                  << "\' is not a definition !");
142
143         } while (node.goToNextElement());
144
145         node.goToParentElement(); // Retour au parent
146      }
147   }
148
149   //----------------------------------------------------------------
150   //! Show tree structure of context
151   void CContext::ShowTree(StdOStream & out)
152   {
153      StdString currentContextId = CContext::getCurrent() -> getId();
154      std::vector<CContext*> def_vector =
155         CContext::getRoot()->getChildList();
156      std::vector<CContext*>::iterator
157         it = def_vector.begin(), end = def_vector.end();
158
159      out << "<? xml version=\"1.0\" ?>" << std::endl;
160      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
161
162      for (; it != end; it++)
163      {
164         CContext* context = *it;
165         CContext::setCurrent(context->getId());
166         out << *context << std::endl;
167      }
168
169      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
170      CContext::setCurrent(currentContextId);
171   }
172
173
174   //----------------------------------------------------------------
175
176   //! Convert context object into string (to print)
177   StdString CContext::toString(void) const
178   {
179      StdOStringStream oss;
180      oss << "<" << CContext::GetName()
181          << " id=\"" << this->getId() << "\" "
182          << SuperClassAttribute::toString() << ">" << std::endl;
183      if (!this->hasChild())
184      {
185         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
186      }
187      else
188      {
189
190#define DECLARE_NODE(Name_, name_)    \
191   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
192   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
193#define DECLARE_NODE_PAR(Name_, name_)
194#include "node_type.conf"
195
196      }
197
198      oss << "</" << CContext::GetName() << " >";
199
200      return (oss.str());
201   }
202
203   //----------------------------------------------------------------
204
205   /*!
206   \brief Find all inheritace among objects in a context.
207   \param [in] apply (true) write attributes of parent into ones of child if they are empty
208                     (false) write attributes of parent into a new container of child
209   \param [in] parent unused
210   */
211   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
212   {
213#define DECLARE_NODE(Name_, name_)    \
214   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
215     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
216#define DECLARE_NODE_PAR(Name_, name_)
217#include "node_type.conf"
218   }
219
220   //----------------------------------------------------------------
221
222   //! Verify if all root definition in the context have child.
223   bool CContext::hasChild(void) const
224   {
225      return (
226#define DECLARE_NODE(Name_, name_)    \
227   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
228#define DECLARE_NODE_PAR(Name_, name_)
229#include "node_type.conf"
230      false);
231}
232
233   //----------------------------------------------------------------
234
235   void CContext::CleanTree(void)
236   {
237#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
238#define DECLARE_NODE_PAR(Name_, name_)
239#include "node_type.conf"
240   }
241   ///---------------------------------------------------------------
242
243   //! Initialize client side
244   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
245   {
246     hasClient=true;
247     client = new CContextClient(this, intraComm, interComm, cxtServer);
248
249     int tmp_rank;
250     MPI_Comm_rank(intraComm, &tmp_rank);
251     MPI_Barrier(intraComm);
252     
253
254     registryIn=new CRegistry(intraComm);
255     registryIn->setPath(getId()) ;
256     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
257     registryIn->bcastRegistry() ;
258
259     registryOut=new CRegistry(intraComm) ;
260     registryOut->setPath(getId()) ;
261
262     ep_lib::MPI_Comm intraCommServer, interCommServer;
263     if (cxtServer) // Attached mode
264     {
265       intraCommServer = intraComm;
266       interCommServer = interComm;
267     }
268     else
269     {
270       MPI_Comm_dup(intraComm, &intraCommServer);
271       comms.push_back(intraCommServer);
272       MPI_Comm_dup(interComm, &interCommServer);
273       comms.push_back(interCommServer);
274     }
275     server = new CContextServer(this,intraCommServer,interCommServer);
276   }
277
278   void CContext::setClientServerBuffer()
279   {
280     size_t minBufferSize = CXios::minBufferSize;
281#define DECLARE_NODE(Name_, name_)    \
282     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
283#define DECLARE_NODE_PAR(Name_, name_)
284#include "node_type.conf"
285#undef DECLARE_NODE
286#undef DECLARE_NODE_PAR
287
288     std::map<int, StdSize> maxEventSize;
289     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
290     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
291
292     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
293     for (it = dataBufferSize.begin(); it != ite; ++it)
294       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
295
296     ite = bufferSize.end();
297     for (it = bufferSize.begin(); it != ite; ++it)
298     {
299       it->second *= CXios::bufferSizeFactor;
300       if (it->second < minBufferSize) it->second = minBufferSize;
301     }
302
303     // We consider that the minimum buffer size is also the minimum event size
304     ite = maxEventSize.end();
305     for (it = maxEventSize.begin(); it != ite; ++it)
306       if (it->second < minBufferSize) it->second = minBufferSize;
307
308     if (client->isServerLeader())
309     {
310       const std::list<int>& ranks = client->getRanksServerLeader();
311       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
312         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
313     }
314
315     client->setBufferSize(bufferSize, maxEventSize);
316   }
317
318   //! Verify whether a context is initialized
319   bool CContext::isInitialized(void)
320   {
321     return hasClient;
322   }
323
324   //! Initialize server
325   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
326   {
327     hasServer=true;
328     server = new CContextServer(this,intraComm,interComm);
329
330     registryIn=new CRegistry(intraComm);
331     registryIn->setPath(getId()) ;
332     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
333     registryIn->bcastRegistry() ;
334     registryOut=new CRegistry(intraComm) ;
335     registryOut->setPath(getId()) ;
336
337     ep_lib::MPI_Comm intraCommClient, interCommClient;
338     if (cxtClient) // Attached mode
339     {
340       intraCommClient = intraComm;
341       interCommClient = interComm;
342     }
343     else
344     {
345       MPI_Comm_dup(intraComm, &intraCommClient);
346       comms.push_back(intraCommClient);
347       MPI_Comm_dup(interComm, &interCommClient);
348       comms.push_back(interCommClient);
349     }
350     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
351   }
352
353   //! Try to send the buffers and receive possible answers
354   bool CContext::checkBuffersAndListen(void)
355   {
356     client->checkBuffers();
357
358     bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent();
359     if (hasTmpBufferedEvent)
360       hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent();
361
362     // Don't process events if there is a temporarily buffered event
363     return server->eventLoop(!hasTmpBufferedEvent);
364   }
365
366   //! Terminate a context
367   void CContext::finalize(void)
368   {
369      if (!finalized)
370      {
371        finalized = true;
372        if (hasClient) sendRegistry() ;
373        client->finalize();
374        while (!server->hasFinished())
375        {
376          server->eventLoop();
377        }
378
379        if (hasServer)
380        {
381          closeAllFile();
382          registryOut->hierarchicalGatherRegistry() ;
383          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
384        }
385
386        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
387          MPI_Comm_free(&(*it));
388        comms.clear();
389      }
390   }
391
392   /*!
393   \brief Close all the context defintion and do processing data
394      After everything is well defined on client side, they will be processed and sent to server
395   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
396   all necessary information to server, from which each server can build its own database.
397   Because the role of server is to write out field data on a specific netcdf file,
398   the only information that it needs is the enabled files
399   and the active fields (fields will be written onto active files)
400   */
401   void CContext::closeDefinition(void)
402   {
403     // There is nothing client need to send to server
404     if (hasClient)
405     {
406       // After xml is parsed, there are some more works with post processing
407       postProcessing();
408     }
409     setClientServerBuffer();
410
411     if (hasClient && !hasServer)
412     {
413      // Send all attributes of current context to server
414      this->sendAllAttributesToServer();
415
416      // Send all attributes of current calendar
417      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
418
419      // We have enough information to send to server
420      // First of all, send all enabled files
421       sendEnabledFiles();
422
423      // Then, send all enabled fields
424       sendEnabledFields();
425
426      // At last, we have all info of domain and axis, then send them
427       sendRefDomainsAxis();
428
429      // After that, send all grid (if any)
430       sendRefGrid();
431    }
432
433    // We have a xml tree on the server side and now, it should be also processed
434    if (hasClient && !hasServer) sendPostProcessing();
435
436    // There are some processings that should be done after all of above. For example: check mask or index
437    if (hasClient)
438    {
439      this->buildFilterGraphOfEnabledFields();
440      buildFilterGraphOfFieldsWithReadAccess();
441      this->solveAllRefOfEnabledFields(true);
442    }
443
444    // Now tell server that it can process all messages from client
445    if (hasClient && !hasServer) this->sendCloseDefinition();
446
447    // Nettoyage de l'arborescence
448    if (hasClient && !hasServer) CleanTree(); // Only on client side??
449
450    if (hasClient)
451    {
452      sendCreateFileHeader();
453
454      startPrefetchingOfEnabledReadModeFiles();
455    }
456   }
457
458   void CContext::findAllEnabledFields(void)
459   {
460     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
461     (void)this->enabledFiles[i]->getEnabledFields();
462   }
463
464   void CContext::findAllEnabledFieldsInReadModeFiles(void)
465   {
466     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
467     (void)this->enabledReadModeFiles[i]->getEnabledFields();
468   }
469
470   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
471   {
472      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
473        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
474   }
475
476   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
477   {
478     int size = this->enabledFiles.size();
479     for (int i = 0; i < size; ++i)
480     {
481       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
482     }
483
484     for (int i = 0; i < size; ++i)
485     {
486       this->enabledFiles[i]->generateNewTransformationGridDest();
487     }
488   }
489
490   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
491   {
492     int size = this->enabledFiles.size();
493     for (int i = 0; i < size; ++i)
494     {
495       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
496     }
497   }
498
499   void CContext::buildFilterGraphOfEnabledFields()
500   {
501     int size = this->enabledFiles.size();
502     for (int i = 0; i < size; ++i)
503     {
504       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
505     }
506   }
507
508   void CContext::startPrefetchingOfEnabledReadModeFiles()
509   {
510     int size = enabledReadModeFiles.size();
511     for (int i = 0; i < size; ++i)
512     {
513        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
514     }
515   }
516
517   void CContext::checkPrefetchingOfEnabledReadModeFiles()
518   {
519     int size = enabledReadModeFiles.size();
520     for (int i = 0; i < size; ++i)
521     {
522        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
523     }
524   }
525
526  void CContext::findFieldsWithReadAccess(void)
527  {
528    fieldsWithReadAccess.clear();
529    const vector<CField*> allFields = CField::getAll();
530    for (size_t i = 0; i < allFields.size(); ++i)
531    {
532      CField* field = allFields[i];
533
534      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
535        field->read_access = true;
536      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
537        fieldsWithReadAccess.push_back(field);
538    }
539  }
540
541  void CContext::solveAllRefOfFieldsWithReadAccess()
542  {
543    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
544      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
545  }
546
547  void CContext::buildFilterGraphOfFieldsWithReadAccess()
548  {
549    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
550      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
551  }
552
553   void CContext::solveAllInheritance(bool apply)
554   {
555     // Rsolution des hritages descendants (cd des hritages de groupes)
556     // pour chacun des contextes.
557      solveDescInheritance(apply);
558
559     // Rsolution des hritages par rfrence au niveau des fichiers.
560      const vector<CFile*> allFiles=CFile::getAll();
561      const vector<CGrid*> allGrids= CGrid::getAll();
562
563     //if (hasClient && !hasServer)
564      if (hasClient)
565      {
566        for (unsigned int i = 0; i < allFiles.size(); i++)
567          allFiles[i]->solveFieldRefInheritance(apply);
568      }
569
570      unsigned int vecSize = allGrids.size();
571      unsigned int i = 0;
572      for (i = 0; i < vecSize; ++i)
573        allGrids[i]->solveDomainAxisRefInheritance(apply);
574
575   }
576
577   void CContext::findEnabledFiles(void)
578   {
579      const std::vector<CFile*> allFiles = CFile::getAll();
580      const CDate& initDate = calendar->getInitDate();
581
582      for (unsigned int i = 0; i < allFiles.size(); i++)
583         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
584         {
585            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
586            {
587              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
588              {
589                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
590                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
591                    <<"\" is less than the time step. File will not be written."<<endl;
592              }
593              else
594               enabledFiles.push_back(allFiles[i]);
595            }
596         }
597         else
598         {
599           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
600           {
601             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
602                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
603                 <<"\" is less than the time step. File will not be written."<<endl;
604           }
605           else
606             enabledFiles.push_back(allFiles[i]); // otherwise true by default
607         }
608
609      if (enabledFiles.size() == 0)
610         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
611               << getId() << "\" !");
612   }
613
614   void CContext::findEnabledReadModeFiles(void)
615   {
616     int size = this->enabledFiles.size();
617     for (int i = 0; i < size; ++i)
618     {
619       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
620        enabledReadModeFiles.push_back(enabledFiles[i]);
621     }
622   }
623
624   void CContext::closeAllFile(void)
625   {
626     std::vector<CFile*>::const_iterator
627            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
628
629     for (; it != end; it++)
630     {
631       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
632       (*it)->close();
633     }
634   }
635
636   /*!
637   \brief Dispatch event received from client
638      Whenever a message is received in buffer of server, it will be processed depending on
639   its event type. A new event type should be added in the switch list to make sure
640   it processed on server side.
641   \param [in] event: Received message
642   */
643   bool CContext::dispatchEvent(CEventServer& event)
644   {
645
646      if (SuperClass::dispatchEvent(event)) return true;
647      else
648      {
649        switch(event.type)
650        {
651           case EVENT_ID_CLOSE_DEFINITION :
652             recvCloseDefinition(event);
653             return true;
654             break;
655           case EVENT_ID_UPDATE_CALENDAR:
656             recvUpdateCalendar(event);
657             return true;
658             break;
659           case EVENT_ID_CREATE_FILE_HEADER :
660             recvCreateFileHeader(event);
661             return true;
662             break;
663           case EVENT_ID_POST_PROCESS:
664             recvPostProcessing(event);
665             return true;
666            case EVENT_ID_SEND_REGISTRY:
667             recvRegistry(event);
668             return true;
669            break;
670
671           default :
672             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
673                    <<"Unknown Event");
674           return false;
675         }
676      }
677   }
678
679   //! Client side: Send a message to server to make it close
680   void CContext::sendCloseDefinition(void)
681   {
682     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
683     if (client->isServerLeader())
684     {
685       CMessage msg;
686       msg<<this->getIdServer();
687       const std::list<int>& ranks = client->getRanksServerLeader();
688       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
689         event.push(*itRank,1,msg);
690       client->sendEvent(event);
691     }
692     else client->sendEvent(event);
693   }
694
695   //! Server side: Receive a message of client announcing a context close
696   void CContext::recvCloseDefinition(CEventServer& event)
697   {
698
699      CBufferIn* buffer=event.subEvents.begin()->buffer;
700      string id;
701      *buffer>>id;
702      get(id)->closeDefinition();
703   }
704
705   //! Client side: Send a message to update calendar in each time step
706   void CContext::sendUpdateCalendar(int step)
707   {
708     if (!hasServer)
709     {
710       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
711       if (client->isServerLeader())
712       {
713         CMessage msg;
714         msg<<this->getIdServer()<<step;
715         const std::list<int>& ranks = client->getRanksServerLeader();
716         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
717           event.push(*itRank,1,msg);
718         client->sendEvent(event);
719       }
720       else client->sendEvent(event);
721     }
722   }
723
724   //! Server side: Receive a message of client annoucing calendar update
725   void CContext::recvUpdateCalendar(CEventServer& event)
726   {
727      CBufferIn* buffer=event.subEvents.begin()->buffer;
728      string id;
729      *buffer>>id;
730      get(id)->recvUpdateCalendar(*buffer);
731   }
732
733   //! Server side: Receive a message of client annoucing calendar update
734   void CContext::recvUpdateCalendar(CBufferIn& buffer)
735   {
736      int step;
737      buffer>>step;
738      updateCalendar(step);
739   }
740
741   //! Client side: Send a message to create header part of netcdf file
742   void CContext::sendCreateFileHeader(void)
743   {
744     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
745     if (client->isServerLeader())
746     {
747       CMessage msg;
748       msg<<this->getIdServer();
749       const std::list<int>& ranks = client->getRanksServerLeader();
750       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
751         event.push(*itRank,1,msg) ;
752       client->sendEvent(event);
753     }
754     else client->sendEvent(event);
755   }
756
757   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
758   void CContext::recvCreateFileHeader(CEventServer& event)
759   {
760      CBufferIn* buffer=event.subEvents.begin()->buffer;
761      string id;
762      *buffer>>id;
763      get(id)->recvCreateFileHeader(*buffer);
764   }
765
766   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
767   void CContext::recvCreateFileHeader(CBufferIn& buffer)
768   {
769      createFileHeader();
770   }
771
772   //! Client side: Send a message to do some post processing on server
773   void CContext::sendPostProcessing()
774   {
775     if (!hasServer)
776     {
777       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
778       if (client->isServerLeader())
779       {
780         CMessage msg;
781         msg<<this->getIdServer();
782         const std::list<int>& ranks = client->getRanksServerLeader();
783         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
784           event.push(*itRank,1,msg);
785         client->sendEvent(event);
786       }
787       else client->sendEvent(event);
788     }
789   }
790
791   //! Server side: Receive a message to do some post processing
792   void CContext::recvPostProcessing(CEventServer& event)
793   {
794      CBufferIn* buffer=event.subEvents.begin()->buffer;
795      string id;
796      *buffer>>id;
797      get(id)->recvPostProcessing(*buffer);
798   }
799
800   //! Server side: Receive a message to do some post processing
801   void CContext::recvPostProcessing(CBufferIn& buffer)
802   {
803      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
804      postProcessing();
805   }
806
807   const StdString& CContext::getIdServer()
808   {
809      if (hasClient)
810      {
811        idServer_ = this->getId();
812        idServer_ += "_server";
813        return idServer_;
814      }
815      if (hasServer) return (this->getId());
816   }
817
818   /*!
819   \brief Do some simple post processings after parsing xml file
820      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
821   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
822   which will be written out into netcdf files, are processed
823   */
824   void CContext::postProcessing()
825   {
826     int myRank;
827     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
828
829     if (isPostProcessed) return;
830
831      // Make sure the calendar was correctly created
832      if (!calendar)
833        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
834      else if (calendar->getTimeStep() == NoneDu)
835        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
836      // Calendar first update to set the current date equals to the start date
837      calendar->update(0);
838
839      // Find all inheritance in xml structure
840      this->solveAllInheritance();
841
842      // Check if some axis, domains or grids are eligible to for compressed indexed output.
843      // Warning: This must be done after solving the inheritance and before the rest of post-processing
844      checkAxisDomainsGridsEligibilityForCompressedOutput();
845
846      // Check if some automatic time series should be generated
847      // Warning: This must be done after solving the inheritance and before the rest of post-processing
848      prepareTimeseries();
849
850      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
851      this->findEnabledFiles();
852      this->findEnabledReadModeFiles();
853
854      // Find all enabled fields of each file
855      this->findAllEnabledFields();
856      this->findAllEnabledFieldsInReadModeFiles();
857
858     if (hasClient && !hasServer)
859     {
860      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
861      this->readAttributesOfEnabledFieldsInReadModeFiles();
862     }
863
864      // Only search and rebuild all reference objects of enable fields, don't transform
865      this->solveOnlyRefOfEnabledFields(false);
866
867      // Search and rebuild all reference object of enabled fields
868      this->solveAllRefOfEnabledFields(false);
869
870      // Find all fields with read access from the public API
871      findFieldsWithReadAccess();
872      // and solve the all reference for them
873      solveAllRefOfFieldsWithReadAccess();
874
875      isPostProcessed = true;
876   }
877
878   /*!
879    * Compute the required buffer size to send the attributes (mostly those grid related).
880    *
881    * \param maxEventSize [in/out] the size of the bigger event for each connected server
882    */
883   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
884   {
885     std::map<int, StdSize> attributesSize;
886
887     if (hasClient)
888     {
889       size_t numEnabledFiles = this->enabledFiles.size();
890       for (size_t i = 0; i < numEnabledFiles; ++i)
891       {
892         CFile* file = this->enabledFiles[i];
893
894         std::vector<CField*> enabledFields = file->getEnabledFields();
895         size_t numEnabledFields = enabledFields.size();
896         for (size_t j = 0; j < numEnabledFields; ++j)
897         {
898           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
899           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
900           for (; it != itE; ++it)
901           {
902             // If attributesSize[it->first] does not exist, it will be zero-initialized
903             // so we can use it safely without checking for its existance
904             if (attributesSize[it->first] < it->second)
905               attributesSize[it->first] = it->second;
906
907             if (maxEventSize[it->first] < it->second)
908               maxEventSize[it->first] = it->second;
909           }
910         }
911       }
912     }
913
914     return attributesSize;
915   }
916
917   /*!
918    * Compute the required buffer size to send the fields data.
919    *
920    * \param maxEventSize [in/out] the size of the bigger event for each connected server
921    */
922   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
923   {
924     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
925
926     std::map<int, StdSize> dataSize;
927
928     // Find all reference domain and axis of all active fields
929     size_t numEnabledFiles = this->enabledFiles.size();
930     for (size_t i = 0; i < numEnabledFiles; ++i)
931     {
932       CFile* file = this->enabledFiles[i];
933       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
934
935       if (fileMode == mode)
936       {
937         std::vector<CField*> enabledFields = file->getEnabledFields();
938         size_t numEnabledFields = enabledFields.size();
939         for (size_t j = 0; j < numEnabledFields; ++j)
940         {
941           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
942           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
943           for (; it != itE; ++it)
944           {
945             // If dataSize[it->first] does not exist, it will be zero-initialized
946             // so we can use it safely without checking for its existance
947             if (CXios::isOptPerformance)
948               dataSize[it->first] += it->second;
949             else if (dataSize[it->first] < it->second)
950               dataSize[it->first] = it->second;
951
952             if (maxEventSize[it->first] < it->second)
953               maxEventSize[it->first] = it->second;
954           }
955         }
956       }
957     }
958
959     return dataSize;
960   }
961
962   //! Client side: Send infomation of active files (files are enabled to write out)
963   void CContext::sendEnabledFiles()
964   {
965     int size = this->enabledFiles.size();
966
967     // In a context, each type has a root definition, e.g: axis, domain, field.
968     // Every object must be a child of one of these root definition. In this case
969     // all new file objects created on server must be children of the root "file_definition"
970     StdString fileDefRoot("file_definition");
971     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
972
973     for (int i = 0; i < size; ++i)
974     {
975       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
976       this->enabledFiles[i]->sendAllAttributesToServer();
977       this->enabledFiles[i]->sendAddAllVariables();
978     }
979   }
980
981   //! Client side: Send information of active fields (ones are written onto files)
982   void CContext::sendEnabledFields()
983   {
984     int size = this->enabledFiles.size();
985     for (int i = 0; i < size; ++i)
986     {
987       this->enabledFiles[i]->sendEnabledFields();
988     }
989   }
990
991   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
992   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
993   {
994     if (!hasClient) return;
995
996     const vector<CAxis*> allAxis = CAxis::getAll();
997     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
998       (*it)->checkEligibilityForCompressedOutput();
999
1000     const vector<CDomain*> allDomains = CDomain::getAll();
1001     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
1002       (*it)->checkEligibilityForCompressedOutput();
1003
1004     const vector<CGrid*> allGrids = CGrid::getAll();
1005     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
1006       (*it)->checkEligibilityForCompressedOutput();
1007   }
1008
1009   //! Client side: Prepare the timeseries by adding the necessary files
1010   void CContext::prepareTimeseries()
1011   {
1012     if (!hasClient) return;
1013
1014     const std::vector<CFile*> allFiles = CFile::getAll();
1015     for (size_t i = 0; i < allFiles.size(); i++)
1016     {
1017       CFile* file = allFiles[i];
1018
1019       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1020       for (size_t k = 0; k < vars.size(); k++)
1021       {
1022         CVariable* var = vars[k];
1023
1024         if (var->ts_target.isEmpty()
1025              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1026           fileVars.push_back(var);
1027
1028         if (!var->ts_target.isEmpty()
1029              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1030           fieldVars.push_back(var);
1031       }
1032
1033       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1034       {
1035         StdString fileNameStr("%file_name%") ;
1036         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1037         
1038         StdString fileName=file->getFileOutputName();
1039         size_t pos=tsPrefix.find(fileNameStr) ;
1040         while (pos!=std::string::npos)
1041         {
1042           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1043           pos=tsPrefix.find(fileNameStr) ;
1044         }
1045       
1046         const std::vector<CField*> allFields = file->getAllFields();
1047         for (size_t j = 0; j < allFields.size(); j++)
1048         {
1049           CField* field = allFields[j];
1050
1051           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1052           {
1053             CFile* tsFile = CFile::create();
1054             tsFile->duplicateAttributes(file);
1055
1056             // Add variables originating from file and targeted to timeserie file
1057             for (size_t k = 0; k < fileVars.size(); k++)
1058               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1059
1060           
1061             tsFile->name = tsPrefix + "_";
1062             if (!field->name.isEmpty())
1063               tsFile->name.get() += field->name;
1064             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1065               tsFile->name.get() += field->field_ref;
1066             else
1067               tsFile->name.get() += field->getId();
1068
1069             if (!field->ts_split_freq.isEmpty())
1070               tsFile->split_freq = field->ts_split_freq;
1071
1072             CField* tsField = tsFile->addField();
1073             tsField->field_ref = field->getId();
1074
1075             // Add variables originating from file and targeted to timeserie field
1076             for (size_t k = 0; k < fieldVars.size(); k++)
1077               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1078
1079             vars = field->getAllVariables();
1080             for (size_t k = 0; k < vars.size(); k++)
1081             {
1082               CVariable* var = vars[k];
1083
1084               // Add variables originating from field and targeted to timeserie field
1085               if (var->ts_target.isEmpty()
1086                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1087                 tsField->getVirtualVariableGroup()->addChild(var);
1088
1089               // Add variables originating from field and targeted to timeserie file
1090               if (!var->ts_target.isEmpty()
1091                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1092                 tsFile->getVirtualVariableGroup()->addChild(var);
1093             }
1094
1095             tsFile->solveFieldRefInheritance(true);
1096
1097             if (file->timeseries == CFile::timeseries_attr::exclusive)
1098               field->enabled = false;
1099           }
1100         }
1101
1102         // Finally disable the original file is need be
1103         if (file->timeseries == CFile::timeseries_attr::only)
1104          file->enabled = false;
1105       }
1106     }
1107   }
1108
1109   //! Client side: Send information of reference grid of active fields
1110   void CContext::sendRefGrid()
1111   {
1112     std::set<StdString> gridIds;
1113     int sizeFile = this->enabledFiles.size();
1114     CFile* filePtr(NULL);
1115
1116     // Firstly, find all reference grids of all active fields
1117     for (int i = 0; i < sizeFile; ++i)
1118     {
1119       filePtr = this->enabledFiles[i];
1120       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1121       int sizeField = enabledFields.size();
1122       for (int numField = 0; numField < sizeField; ++numField)
1123       {
1124         if (0 != enabledFields[numField]->getRelGrid())
1125           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1126       }
1127     }
1128
1129     // Create all reference grids on server side
1130     StdString gridDefRoot("grid_definition");
1131     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1132     std::set<StdString>::const_iterator it, itE = gridIds.end();
1133     for (it = gridIds.begin(); it != itE; ++it)
1134     {
1135       gridPtr->sendCreateChild(*it);
1136       CGrid::get(*it)->sendAllAttributesToServer();
1137       CGrid::get(*it)->sendAllDomains();
1138       CGrid::get(*it)->sendAllAxis();
1139       CGrid::get(*it)->sendAllScalars();
1140     }
1141   }
1142
1143
1144   //! Client side: Send information of reference domain and axis of active fields
1145   void CContext::sendRefDomainsAxis()
1146   {
1147     std::set<StdString> domainIds, axisIds, scalarIds;
1148
1149     // Find all reference domain and axis of all active fields
1150     int numEnabledFiles = this->enabledFiles.size();
1151     for (int i = 0; i < numEnabledFiles; ++i)
1152     {
1153       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1154       int numEnabledFields = enabledFields.size();
1155       for (int j = 0; j < numEnabledFields; ++j)
1156       {
1157         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1158         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1159         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1160         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1161       }
1162     }
1163
1164     // Create all reference axis on server side
1165     std::set<StdString>::iterator itDom, itAxis, itScalar;
1166     std::set<StdString>::const_iterator itE;
1167
1168     StdString scalarDefRoot("scalar_definition");
1169     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1170     itE = scalarIds.end();
1171     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1172     {
1173       if (!itScalar->empty())
1174       {
1175         scalarPtr->sendCreateChild(*itScalar);
1176         CScalar::get(*itScalar)->sendAllAttributesToServer();
1177       }
1178     }
1179
1180     StdString axiDefRoot("axis_definition");
1181     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1182     itE = axisIds.end();
1183     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1184     {
1185       if (!itAxis->empty())
1186       {
1187         axisPtr->sendCreateChild(*itAxis);
1188         CAxis::get(*itAxis)->sendAllAttributesToServer();
1189       }
1190     }
1191
1192     // Create all reference domains on server side
1193     StdString domDefRoot("domain_definition");
1194     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1195     itE = domainIds.end();
1196     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1197     {
1198       if (!itDom->empty()) {
1199          domPtr->sendCreateChild(*itDom);
1200          CDomain::get(*itDom)->sendAllAttributesToServer();
1201       }
1202     }
1203   }
1204
1205   //! Update calendar in each time step
1206   void CContext::updateCalendar(int step)
1207   {
1208      calendar->update(step);
1209
1210
1211      if (hasClient)
1212      {
1213        checkPrefetchingOfEnabledReadModeFiles();
1214        garbageCollector.invalidate(calendar->getCurrentDate());
1215      }
1216   }
1217
1218   //! Server side: Create header of netcdf file
1219   void CContext::createFileHeader(void )
1220   {
1221      vector<CFile*>::const_iterator it;
1222
1223      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1224      {
1225         (*it)->initFile();
1226      }
1227   }
1228
1229   //! Get current context
1230   CContext* CContext::getCurrent(void)
1231   {
1232     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1233   }
1234
1235   /*!
1236   \brief Set context with an id be the current context
1237   \param [in] id identity of context to be set to current
1238   */
1239   void CContext::setCurrent(const string& id)
1240   {
1241     CObjectFactory::SetCurrentContextId(id);
1242     CGroupFactory::SetCurrentContextId(id);
1243   }
1244
1245  /*!
1246  \brief Create a context with specific id
1247  \param [in] id identity of new context
1248  \return pointer to the new context or already-existed one with identity id
1249  */
1250  CContext* CContext::create(const StdString& id)
1251  {
1252    CContext::setCurrent(id);
1253
1254    bool hasctxt = CContext::has(id);
1255    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1256    getRoot();
1257    //if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1258    if (!hasctxt) CGroupFactory::AddChild(*root_ptr, context->getShared());
1259
1260#define DECLARE_NODE(Name_, name_) \
1261    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1262#define DECLARE_NODE_PAR(Name_, name_)
1263#include "node_type.conf"
1264
1265    return (context);
1266  }
1267
1268
1269
1270     //! Server side: Receive a message to do some post processing
1271  void CContext::recvRegistry(CEventServer& event)
1272  {
1273    CBufferIn* buffer=event.subEvents.begin()->buffer;
1274    string id;
1275    *buffer>>id;
1276    get(id)->recvRegistry(*buffer);
1277  }
1278
1279  void CContext::recvRegistry(CBufferIn& buffer)
1280  {
1281    if (server->intraCommRank==0)
1282    {
1283      CRegistry registry(server->intraComm) ;
1284      registry.fromBuffer(buffer) ;
1285      registryOut->mergeRegistry(registry) ;
1286    }
1287  }
1288
1289  void CContext::sendRegistry(void)
1290  {
1291    registryOut->hierarchicalGatherRegistry() ;
1292
1293    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1294    if (client->isServerLeader())
1295    {
1296       CMessage msg ;
1297       msg<<this->getIdServer();
1298       if (client->clientRank==0) msg<<*registryOut ;
1299       const std::list<int>& ranks = client->getRanksServerLeader();
1300       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1301         event.push(*itRank,1,msg);
1302       client->sendEvent(event);
1303     }
1304     else client->sendEvent(event);
1305  }
1306
1307} // namespace xios
Note: See TracBrowser for help on using the repository browser.