source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1128

Last change on this file since 1128 was 1128, checked in by yushan, 7 years ago

log OK with threads

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 40.6 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  //shared_ptr<CContextGroup> CContext::root;
21  shared_ptr<CContextGroup> * CContext::root_ptr = 0;
22
23   /// ////////////////////// Dfinitions ////////////////////// ///
24
25   CContext::CContext(void)
26      : CObjectTemplate<CContext>(), CContextAttributes()
27      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
28      , idServer_(), client(0), server(0)
29   { /* Ne rien faire de plus */ }
30
31   CContext::CContext(const StdString & id)
32      : CObjectTemplate<CContext>(id), CContextAttributes()
33      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
34      , idServer_(), client(0), server(0)
35   { /* Ne rien faire de plus */ }
36
37   CContext::~CContext(void)
38   {
39     delete client;
40     delete server;
41   }
42
43   //----------------------------------------------------------------
44   //! Get name of context
45   StdString CContext::GetName(void)   { return (StdString("context")); }
46   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
47   ENodeType CContext::GetType(void)   { return (eContext); }
48
49   //----------------------------------------------------------------
50
51   /*!
52   \brief Get context group (context root)
53   \return Context root
54   */
55   CContextGroup* CContext::getRoot(void)
56   {
57      //if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
58      //return root.get();
59
60      //static shared_ptr<CContextGroup> *root_ptr;
61      if(root_ptr == 0) //root_ptr = new shared_ptr<CContextGroup>;
62      // if (root_ptr->get()==NULL)
63      root_ptr = new shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
64      return root_ptr->get();
65   }
66
67   //----------------------------------------------------------------
68
69   /*!
70   \brief Get calendar of a context
71   \return Calendar
72   */
73   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
74   {
75      return (this->calendar);
76   }
77
78   //----------------------------------------------------------------
79
80   /*!
81   \brief Set a context with a calendar
82   \param[in] newCalendar new calendar
83   */
84   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
85   {
86      this->calendar = newCalendar;
87   }
88
89   //----------------------------------------------------------------
90   /*!
91   \brief Parse xml file and write information into context object
92   \param [in] node xmld node corresponding in xml file
93   */
94   void CContext::parse(xml::CXMLNode & node)
95   {
96      CContext::SuperClass::parse(node);
97
98      // PARSING POUR GESTION DES ENFANTS
99      xml::THashAttributes attributes = node.getAttributes();
100
101      if (attributes.end() != attributes.find("src"))
102      {
103         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
104         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
105            ERROR("void CContext::parse(xml::CXMLNode & node)",
106                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
107         if (!ifs.good())
108            ERROR("CContext::parse(xml::CXMLNode & node)",
109                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
110         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
111      }
112
113      if (node.getElementName().compare(CContext::GetName()))
114         DEBUG("Le noeud is wrong defined but will be considered as a context !");
115
116      if (!(node.goToChildElement()))
117      {
118         DEBUG("Le context ne contient pas d'enfant !");
119      }
120      else
121      {
122         do { // Parcours des contextes pour traitement.
123
124            StdString name = node.getElementName();
125            attributes.clear();
126            attributes = node.getAttributes();
127
128            if (attributes.end() != attributes.find("id"))
129            {
130              DEBUG(<< "Definition node has an id,"
131                    << "it will not be taking account !");
132            }
133
134#define DECLARE_NODE(Name_, name_)    \
135   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
136   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
137#define DECLARE_NODE_PAR(Name_, name_)
138#include "node_type.conf"
139
140            DEBUG(<< "The element \'"     << name
141                  << "\' in the context \'" << CContext::getCurrent()->getId()
142                  << "\' is not a definition !");
143
144         } while (node.goToNextElement());
145
146         node.goToParentElement(); // Retour au parent
147      }
148   }
149
150   //----------------------------------------------------------------
151   //! Show tree structure of context
152   void CContext::ShowTree(StdOStream & out)
153   {
154      StdString currentContextId = CContext::getCurrent() -> getId();
155      std::vector<CContext*> def_vector =
156         CContext::getRoot()->getChildList();
157      std::vector<CContext*>::iterator
158         it = def_vector.begin(), end = def_vector.end();
159
160      out << "<? xml version=\"1.0\" ?>" << std::endl;
161      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
162
163      for (; it != end; it++)
164      {
165         CContext* context = *it;
166         CContext::setCurrent(context->getId());
167         out << *context << std::endl;
168      }
169
170      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
171      CContext::setCurrent(currentContextId);
172   }
173
174
175   //----------------------------------------------------------------
176
177   //! Convert context object into string (to print)
178   StdString CContext::toString(void) const
179   {
180      StdOStringStream oss;
181      oss << "<" << CContext::GetName()
182          << " id=\"" << this->getId() << "\" "
183          << SuperClassAttribute::toString() << ">" << std::endl;
184      if (!this->hasChild())
185      {
186         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
187      }
188      else
189      {
190
191#define DECLARE_NODE(Name_, name_)    \
192   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
193   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
194#define DECLARE_NODE_PAR(Name_, name_)
195#include "node_type.conf"
196
197      }
198
199      oss << "</" << CContext::GetName() << " >";
200
201      return (oss.str());
202   }
203
204   //----------------------------------------------------------------
205
206   /*!
207   \brief Find all inheritace among objects in a context.
208   \param [in] apply (true) write attributes of parent into ones of child if they are empty
209                     (false) write attributes of parent into a new container of child
210   \param [in] parent unused
211   */
212   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
213   {
214#define DECLARE_NODE(Name_, name_)    \
215   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
216     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
217#define DECLARE_NODE_PAR(Name_, name_)
218#include "node_type.conf"
219   }
220
221   //----------------------------------------------------------------
222
223   //! Verify if all root definition in the context have child.
224   bool CContext::hasChild(void) const
225   {
226      return (
227#define DECLARE_NODE(Name_, name_)    \
228   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
229#define DECLARE_NODE_PAR(Name_, name_)
230#include "node_type.conf"
231      false);
232}
233
234   //----------------------------------------------------------------
235
236   void CContext::CleanTree(void)
237   {
238#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
239#define DECLARE_NODE_PAR(Name_, name_)
240#include "node_type.conf"
241   }
242   ///---------------------------------------------------------------
243
244   //! Initialize client side
245   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
246   {
247     hasClient=true;
248     
249
250     client = new CContextClient(this, intraComm, interComm, cxtServer);
251
252     int tmp_rank;
253     MPI_Comm_rank(intraComm, &tmp_rank);
254     MPI_Barrier(intraComm);
255     
256
257     registryIn=new CRegistry(intraComm);
258   
259     registryIn->setPath(getId()) ;
260   
261
262     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
263     registryIn->bcastRegistry() ;
264
265     registryOut=new CRegistry(intraComm) ;
266     registryOut->setPath(getId()) ;
267     
268
269     ep_lib::MPI_Comm intraCommServer, interCommServer;
270     if (cxtServer) // Attached mode
271     {
272       intraCommServer = intraComm;
273       interCommServer = interComm;
274     }
275     else
276     {
277       MPI_Comm_dup(intraComm, &intraCommServer);
278       comms.push_back(intraCommServer);
279       MPI_Comm_dup(interComm, &interCommServer);
280       comms.push_back(interCommServer);
281       
282     }
283     server = new CContextServer(this,intraCommServer,interCommServer);
284   }
285
286   void CContext::setClientServerBuffer()
287   {
288     size_t minBufferSize = CXios::minBufferSize;
289#define DECLARE_NODE(Name_, name_)    \
290     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
291#define DECLARE_NODE_PAR(Name_, name_)
292#include "node_type.conf"
293#undef DECLARE_NODE
294#undef DECLARE_NODE_PAR
295
296     std::map<int, StdSize> maxEventSize;
297     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
298     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
299
300     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
301     for (it = dataBufferSize.begin(); it != ite; ++it)
302       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
303
304     ite = bufferSize.end();
305     for (it = bufferSize.begin(); it != ite; ++it)
306     {
307       it->second *= CXios::bufferSizeFactor;
308       if (it->second < minBufferSize) it->second = minBufferSize;
309     }
310
311     // We consider that the minimum buffer size is also the minimum event size
312     ite = maxEventSize.end();
313     for (it = maxEventSize.begin(); it != ite; ++it)
314       if (it->second < minBufferSize) it->second = minBufferSize;
315
316     if (client->isServerLeader())
317     {
318       const std::list<int>& ranks = client->getRanksServerLeader();
319       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
320         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
321     }
322
323     client->setBufferSize(bufferSize, maxEventSize);
324   }
325
326   //! Verify whether a context is initialized
327   bool CContext::isInitialized(void)
328   {
329     return hasClient;
330   }
331
332   //! Initialize server
333   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
334   {
335     hasServer=true;
336     server = new CContextServer(this,intraComm,interComm);
337
338     registryIn=new CRegistry(intraComm);
339     registryIn->setPath(getId()) ;
340     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
341     registryIn->bcastRegistry() ;
342     registryOut=new CRegistry(intraComm) ;
343     registryOut->setPath(getId()) ;
344
345     ep_lib::MPI_Comm intraCommClient, interCommClient;
346     if (cxtClient) // Attached mode
347     {
348       intraCommClient = intraComm;
349       interCommClient = interComm;
350     }
351     else
352     {
353       MPI_Comm_dup(intraComm, &intraCommClient);
354       comms.push_back(intraCommClient);
355       MPI_Comm_dup(interComm, &interCommClient);
356       comms.push_back(interCommClient);
357     }
358     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
359   }
360
361   //! Server side: Put server into a loop in order to listen message from client
362   bool CContext::eventLoop(void)
363   {
364     return server->eventLoop();
365   }
366
367   //! Try to send the buffers and receive possible answers
368   bool CContext::checkBuffersAndListen(void)
369   {
370     client->checkBuffers();
371     return server->eventLoop();
372   }
373
374   //! Terminate a context
375   void CContext::finalize(void)
376   {
377     
378      if (!finalized)
379      {
380        finalized = true;
381        if (hasClient) sendRegistry() ;
382        client->finalize();
383        while (!server->hasFinished())
384        {
385          server->eventLoop();
386          //printf("server->hasFinished() = %d\n", server->hasFinished());
387        }
388
389        if (hasServer)
390        {
391          closeAllFile();
392          registryOut->hierarchicalGatherRegistry() ;
393          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
394        }
395
396        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
397          MPI_Comm_free(&(*it));
398        comms.clear();
399      }
400   }
401
402   /*!
403   \brief Close all the context defintion and do processing data
404      After everything is well defined on client side, they will be processed and sent to server
405   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
406   all necessary information to server, from which each server can build its own database.
407   Because the role of server is to write out field data on a specific netcdf file,
408   the only information that it needs is the enabled files
409   and the active fields (fields will be written onto active files)
410   */
411   void CContext::closeDefinition(void)
412   {
413
414     // There is nothing client need to send to server
415     if (hasClient)
416     {
417       // After xml is parsed, there are some more works with post processing
418       postProcessing(); 
419     }
420     setClientServerBuffer(); 
421
422     if (hasClient && !hasServer)
423     {
424      // Send all attributes of current context to server
425      this->sendAllAttributesToServer();
426
427      // Send all attributes of current calendar
428      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
429
430      // We have enough information to send to server
431      // First of all, send all enabled files
432       sendEnabledFiles(); 
433
434      // Then, send all enabled fields
435       sendEnabledFields(); 
436
437      // At last, we have all info of domain and axis, then send them
438       sendRefDomainsAxis(); 
439      // After that, send all grid (if any)
440       sendRefGrid(); 
441    }
442
443    // We have a xml tree on the server side and now, it should be also processed
444    if (hasClient && !hasServer) sendPostProcessing(); 
445
446    // There are some processings that should be done after all of above. For example: check mask or index
447    if (hasClient)
448    {
449      this->buildFilterGraphOfEnabledFields(); 
450      buildFilterGraphOfFieldsWithReadAccess(); 
451      this->solveAllRefOfEnabledFields(true); 
452    }
453
454    // Now tell server that it can process all messages from client
455    if (hasClient && !hasServer) this->sendCloseDefinition();
456
457    // Nettoyage de l'arborescence
458    if (hasClient && !hasServer) CleanTree();
459
460    if (hasClient)
461    {
462      sendCreateFileHeader(); 
463
464      startPrefetchingOfEnabledReadModeFiles(); 
465    }
466   }
467
468   void CContext::findAllEnabledFields(void)
469   {
470     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
471     (void)this->enabledFiles[i]->getEnabledFields();
472   }
473
474   void CContext::findAllEnabledFieldsInReadModeFiles(void)
475   {
476     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
477     (void)this->enabledReadModeFiles[i]->getEnabledFields();
478   }
479
480   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
481   {
482      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
483        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
484   }
485
486   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
487   {
488     int size = this->enabledFiles.size();
489     for (int i = 0; i < size; ++i)
490     {
491       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
492     }
493
494     for (int i = 0; i < size; ++i)
495     {
496       this->enabledFiles[i]->generateNewTransformationGridDest();
497     }
498   }
499
500   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
501   {
502     int size = this->enabledFiles.size();
503     
504     for (int i = 0; i < size; ++i)
505     {
506       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
507     }
508   }
509
510   void CContext::buildFilterGraphOfEnabledFields()
511   {
512     int size = this->enabledFiles.size();
513     for (int i = 0; i < size; ++i)
514     {
515       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
516     }
517   }
518
519   void CContext::startPrefetchingOfEnabledReadModeFiles()
520   {
521     int size = enabledReadModeFiles.size();
522     for (int i = 0; i < size; ++i)
523     {
524        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
525     }
526   }
527
528   void CContext::checkPrefetchingOfEnabledReadModeFiles()
529   {
530     int size = enabledReadModeFiles.size();
531     for (int i = 0; i < size; ++i)
532     {
533        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
534     }
535   }
536
537  void CContext::findFieldsWithReadAccess(void)
538  {
539    fieldsWithReadAccess.clear();
540    const vector<CField*> allFields = CField::getAll();
541    for (size_t i = 0; i < allFields.size(); ++i)
542    {
543      CField* field = allFields[i];
544
545      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
546        field->read_access = true;
547      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
548        fieldsWithReadAccess.push_back(field);
549    }
550  }
551
552  void CContext::solveAllRefOfFieldsWithReadAccess()
553  {
554    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
555      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
556  }
557
558  void CContext::buildFilterGraphOfFieldsWithReadAccess()
559  {
560    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
561      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
562  }
563
564   void CContext::solveAllInheritance(bool apply)
565   {
566     // Rsolution des hritages descendants (cd des hritages de groupes)
567     // pour chacun des contextes.
568      solveDescInheritance(apply);
569
570     // Rsolution des hritages par rfrence au niveau des fichiers.
571      const vector<CFile*> allFiles=CFile::getAll();
572      const vector<CGrid*> allGrids= CGrid::getAll();
573
574     //if (hasClient && !hasServer)
575      if (hasClient)
576      {
577        for (unsigned int i = 0; i < allFiles.size(); i++)
578          allFiles[i]->solveFieldRefInheritance(apply);
579      }
580
581      unsigned int vecSize = allGrids.size();
582      unsigned int i = 0;
583      for (i = 0; i < vecSize; ++i)
584        allGrids[i]->solveDomainAxisRefInheritance(apply);
585
586   }
587
588   void CContext::findEnabledFiles(void)
589   {
590      const std::vector<CFile*> allFiles = CFile::getAll();
591
592      for (unsigned int i = 0; i < allFiles.size(); i++)
593         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
594         {
595            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
596               enabledFiles.push_back(allFiles[i]);
597         }
598         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
599
600
601      if (enabledFiles.size() == 0)
602         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
603               << getId() << "\" !");
604   }
605
606   void CContext::findEnabledReadModeFiles(void)
607   {
608     int size = this->enabledFiles.size();
609     for (int i = 0; i < size; ++i)
610     {
611       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
612        enabledReadModeFiles.push_back(enabledFiles[i]);
613     }
614   }
615
616   void CContext::closeAllFile(void)
617   {
618     std::vector<CFile*>::const_iterator
619            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
620
621     for (; it != end; it++)
622     {
623       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
624       (*it)->close();
625     }
626   }
627
628   /*!
629   \brief Dispatch event received from client
630      Whenever a message is received in buffer of server, it will be processed depending on
631   its event type. A new event type should be added in the switch list to make sure
632   it processed on server side.
633   \param [in] event: Received message
634   */
635   bool CContext::dispatchEvent(CEventServer& event)
636   {
637
638      if (SuperClass::dispatchEvent(event)) return true;
639      else
640      {
641        switch(event.type)
642        {
643           case EVENT_ID_CLOSE_DEFINITION :
644             recvCloseDefinition(event);
645             return true;
646             break;
647           case EVENT_ID_UPDATE_CALENDAR:
648             recvUpdateCalendar(event);
649             return true;
650             break;
651           case EVENT_ID_CREATE_FILE_HEADER :
652             recvCreateFileHeader(event);
653             return true;
654             break;
655           case EVENT_ID_POST_PROCESS:
656             recvPostProcessing(event);
657             return true;
658            case EVENT_ID_SEND_REGISTRY:
659             recvRegistry(event);
660             return true;
661            break;
662
663           default :
664             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
665                    <<"Unknown Event");
666           return false;
667         }
668      }
669   }
670
671   //! Client side: Send a message to server to make it close
672   void CContext::sendCloseDefinition(void)
673   {
674     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
675     if (client->isServerLeader())
676     {
677       CMessage msg;
678       msg<<this->getIdServer();
679       const std::list<int>& ranks = client->getRanksServerLeader();
680       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
681         event.push(*itRank,1,msg);
682       client->sendEvent(event);
683     }
684     else client->sendEvent(event);
685   }
686
687   //! Server side: Receive a message of client announcing a context close
688   void CContext::recvCloseDefinition(CEventServer& event)
689   {
690
691      CBufferIn* buffer=event.subEvents.begin()->buffer;
692      string id;
693      *buffer>>id;
694      get(id)->closeDefinition();
695   }
696
697   //! Client side: Send a message to update calendar in each time step
698   void CContext::sendUpdateCalendar(int step)
699   {
700     if (!hasServer)
701     {
702       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
703       if (client->isServerLeader())
704       {
705         CMessage msg;
706         msg<<this->getIdServer()<<step;
707         const std::list<int>& ranks = client->getRanksServerLeader();
708         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
709           event.push(*itRank,1,msg);
710         client->sendEvent(event);
711       }
712       else client->sendEvent(event);
713     }
714   }
715
716   //! Server side: Receive a message of client annoucing calendar update
717   void CContext::recvUpdateCalendar(CEventServer& event)
718   {
719      CBufferIn* buffer=event.subEvents.begin()->buffer;
720      string id;
721      *buffer>>id;
722      get(id)->recvUpdateCalendar(*buffer);
723   }
724
725   //! Server side: Receive a message of client annoucing calendar update
726   void CContext::recvUpdateCalendar(CBufferIn& buffer)
727   {
728      int step;
729      buffer>>step;
730      updateCalendar(step);
731   }
732
733   //! Client side: Send a message to create header part of netcdf file
734   void CContext::sendCreateFileHeader(void)
735   {
736     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
737     if (client->isServerLeader())
738     {
739       CMessage msg;
740       msg<<this->getIdServer();
741       const std::list<int>& ranks = client->getRanksServerLeader();
742       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
743         event.push(*itRank,1,msg) ;
744       client->sendEvent(event);
745     }
746     else client->sendEvent(event);
747   }
748
749   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
750   void CContext::recvCreateFileHeader(CEventServer& event)
751   {
752      CBufferIn* buffer=event.subEvents.begin()->buffer;
753      string id;
754      *buffer>>id;
755      get(id)->recvCreateFileHeader(*buffer);
756   }
757
758   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
759   void CContext::recvCreateFileHeader(CBufferIn& buffer)
760   {
761      createFileHeader();
762   }
763
764   //! Client side: Send a message to do some post processing on server
765   void CContext::sendPostProcessing()
766   {
767     if (!hasServer)
768     {
769       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
770       if (client->isServerLeader())
771       {
772         CMessage msg;
773         msg<<this->getIdServer();
774         const std::list<int>& ranks = client->getRanksServerLeader();
775         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
776           event.push(*itRank,1,msg);
777         client->sendEvent(event);
778       }
779       else client->sendEvent(event);
780     }
781   }
782
783   //! Server side: Receive a message to do some post processing
784   void CContext::recvPostProcessing(CEventServer& event)
785   {
786      CBufferIn* buffer=event.subEvents.begin()->buffer;
787      string id;
788      *buffer>>id;
789      get(id)->recvPostProcessing(*buffer);
790   }
791
792   //! Server side: Receive a message to do some post processing
793   void CContext::recvPostProcessing(CBufferIn& buffer)
794   {
795      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
796      postProcessing();
797   }
798
799   const StdString& CContext::getIdServer()
800   {
801      if (hasClient)
802      {
803        idServer_ = this->getId();
804        idServer_ += "_server";
805        return idServer_;
806      }
807      if (hasServer) return (this->getId());
808   }
809
810   /*!
811   \brief Do some simple post processings after parsing xml file
812      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
813   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
814   which will be written out into netcdf files, are processed
815   */
816   void CContext::postProcessing()
817   {
818     int myRank;
819     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
820
821     if (isPostProcessed) return;
822
823      // Make sure the calendar was correctly created
824      if (!calendar)
825        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
826      else if (calendar->getTimeStep() == NoneDu)
827        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
828      // Calendar first update to set the current date equals to the start date
829      calendar->update(0); 
830
831      // Find all inheritance in xml structure
832      this->solveAllInheritance(); 
833
834      // Check if some axis, domains or grids are eligible to for compressed indexed output.
835      // Warning: This must be done after solving the inheritance and before the rest of post-processing
836      checkAxisDomainsGridsEligibilityForCompressedOutput(); 
837
838      // Check if some automatic time series should be generated
839      // Warning: This must be done after solving the inheritance and before the rest of post-processing
840      prepareTimeseries(); 
841
842      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
843      this->findEnabledFiles(); 
844      this->findEnabledReadModeFiles();
845
846      // Find all enabled fields of each file
847      this->findAllEnabledFields(); 
848      this->findAllEnabledFieldsInReadModeFiles();
849
850     if (hasClient && !hasServer)
851     {
852      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
853      this->readAttributesOfEnabledFieldsInReadModeFiles(); 
854     }
855
856      // Only search and rebuild all reference objects of enable fields, don't transform
857      this->solveOnlyRefOfEnabledFields(false); 
858
859      // Search and rebuild all reference object of enabled fields
860      this->solveAllRefOfEnabledFields(false); 
861
862      // Find all fields with read access from the public API
863      findFieldsWithReadAccess(); 
864      // and solve the all reference for them
865      solveAllRefOfFieldsWithReadAccess(); 
866
867      isPostProcessed = true;
868   }
869
870   /*!
871    * Compute the required buffer size to send the attributes (mostly those grid related).
872    *
873    * \param maxEventSize [in/out] the size of the bigger event for each connected server
874    */
875   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
876   {
877     std::map<int, StdSize> attributesSize;
878
879     if (hasClient)
880     {
881       size_t numEnabledFiles = this->enabledFiles.size();
882       for (size_t i = 0; i < numEnabledFiles; ++i)
883       {
884         CFile* file = this->enabledFiles[i];
885
886         std::vector<CField*> enabledFields = file->getEnabledFields();
887         size_t numEnabledFields = enabledFields.size();
888         for (size_t j = 0; j < numEnabledFields; ++j)
889         {
890           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
891           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
892           for (; it != itE; ++it)
893           {
894             // If attributesSize[it->first] does not exist, it will be zero-initialized
895             // so we can use it safely without checking for its existance
896             if (attributesSize[it->first] < it->second)
897               attributesSize[it->first] = it->second;
898
899             if (maxEventSize[it->first] < it->second)
900               maxEventSize[it->first] = it->second;
901           }
902         }
903       }
904     }
905
906     return attributesSize;
907   }
908
909   /*!
910    * Compute the required buffer size to send the fields data.
911    *
912    * \param maxEventSize [in/out] the size of the bigger event for each connected server
913    */
914   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
915   {
916     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
917
918     std::map<int, StdSize> dataSize;
919
920     // Find all reference domain and axis of all active fields
921     size_t numEnabledFiles = this->enabledFiles.size();
922     for (size_t i = 0; i < numEnabledFiles; ++i)
923     {
924       CFile* file = this->enabledFiles[i];
925       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
926
927       if (fileMode == mode)
928       {
929         std::vector<CField*> enabledFields = file->getEnabledFields();
930         size_t numEnabledFields = enabledFields.size();
931         for (size_t j = 0; j < numEnabledFields; ++j)
932         {
933           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
934           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
935           for (; it != itE; ++it)
936           {
937             // If dataSize[it->first] does not exist, it will be zero-initialized
938             // so we can use it safely without checking for its existance
939             if (CXios::isOptPerformance)
940               dataSize[it->first] += it->second;
941             else if (dataSize[it->first] < it->second)
942               dataSize[it->first] = it->second;
943
944             if (maxEventSize[it->first] < it->second)
945               maxEventSize[it->first] = it->second;
946           }
947         }
948       }
949     }
950
951     return dataSize;
952   }
953
954   //! Client side: Send infomation of active files (files are enabled to write out)
955   void CContext::sendEnabledFiles()
956   {
957     int size = this->enabledFiles.size();
958
959     // In a context, each type has a root definition, e.g: axis, domain, field.
960     // Every object must be a child of one of these root definition. In this case
961     // all new file objects created on server must be children of the root "file_definition"
962     StdString fileDefRoot("file_definition");
963     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
964
965     for (int i = 0; i < size; ++i)
966     {
967       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
968       this->enabledFiles[i]->sendAllAttributesToServer();
969       this->enabledFiles[i]->sendAddAllVariables();
970     }
971   }
972
973   //! Client side: Send information of active fields (ones are written onto files)
974   void CContext::sendEnabledFields()
975   {
976     int size = this->enabledFiles.size();
977     for (int i = 0; i < size; ++i)
978     {
979       this->enabledFiles[i]->sendEnabledFields();
980     }
981   }
982
983   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
984   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
985   {
986     if (!hasClient) return;
987
988     const vector<CAxis*> allAxis = CAxis::getAll();
989     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
990       (*it)->checkEligibilityForCompressedOutput();
991
992     const vector<CDomain*> allDomains = CDomain::getAll();
993     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
994       (*it)->checkEligibilityForCompressedOutput();
995
996     const vector<CGrid*> allGrids = CGrid::getAll();
997     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
998       (*it)->checkEligibilityForCompressedOutput();
999   }
1000
1001   //! Client side: Prepare the timeseries by adding the necessary files
1002   void CContext::prepareTimeseries()
1003   {
1004     if (!hasClient) return;
1005
1006     const std::vector<CFile*> allFiles = CFile::getAll();
1007     for (size_t i = 0; i < allFiles.size(); i++)
1008     {
1009       CFile* file = allFiles[i];
1010
1011       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1012       {
1013         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1014
1015         const std::vector<CField*> allFields = file->getAllFields();
1016         for (size_t j = 0; j < allFields.size(); j++)
1017         {
1018           CField* field = allFields[j];
1019
1020           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1021           {
1022             CFile* tsFile = CFile::create();
1023             tsFile->duplicateAttributes(file);
1024             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1025
1026             tsFile->name = tsPrefix + "_";
1027             if (!field->name.isEmpty())
1028               tsFile->name.get() += field->name;
1029             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1030               tsFile->name.get() += field->field_ref;
1031             else
1032               tsFile->name.get() += field->getId();
1033
1034             if (!field->ts_split_freq.isEmpty())
1035               tsFile->split_freq = field->ts_split_freq;
1036
1037             CField* tsField = tsFile->addField();
1038             tsField->field_ref = field->getId();
1039             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1040
1041             tsFile->solveFieldRefInheritance(true);
1042
1043             if (file->timeseries == CFile::timeseries_attr::exclusive)
1044               field->enabled = false;
1045           }
1046         }
1047
1048         // Finally disable the original file is need be
1049         if (file->timeseries == CFile::timeseries_attr::only)
1050          file->enabled = false;
1051       }
1052     }
1053   }
1054
1055   //! Client side: Send information of reference grid of active fields
1056   void CContext::sendRefGrid()
1057   {
1058     std::set<StdString> gridIds;
1059     int sizeFile = this->enabledFiles.size();
1060     CFile* filePtr(NULL);
1061
1062     // Firstly, find all reference grids of all active fields
1063     for (int i = 0; i < sizeFile; ++i)
1064     {
1065       filePtr = this->enabledFiles[i];
1066       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1067       int sizeField = enabledFields.size();
1068       for (int numField = 0; numField < sizeField; ++numField)
1069       {
1070         if (0 != enabledFields[numField]->getRelGrid())
1071           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1072       }
1073     }
1074
1075     // Create all reference grids on server side
1076     StdString gridDefRoot("grid_definition");
1077     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1078     std::set<StdString>::const_iterator it, itE = gridIds.end();
1079     for (it = gridIds.begin(); it != itE; ++it)
1080     {
1081       gridPtr->sendCreateChild(*it);
1082       CGrid::get(*it)->sendAllAttributesToServer();
1083       CGrid::get(*it)->sendAllDomains();
1084       CGrid::get(*it)->sendAllAxis();
1085       CGrid::get(*it)->sendAllScalars();
1086     }
1087   }
1088
1089
1090   //! Client side: Send information of reference domain and axis of active fields
1091   void CContext::sendRefDomainsAxis()
1092   {
1093     std::set<StdString> domainIds, axisIds, scalarIds;
1094
1095     // Find all reference domain and axis of all active fields
1096     int numEnabledFiles = this->enabledFiles.size();
1097     for (int i = 0; i < numEnabledFiles; ++i)
1098     {
1099       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1100       int numEnabledFields = enabledFields.size();
1101       for (int j = 0; j < numEnabledFields; ++j)
1102       {
1103         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1104         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1105         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1106         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1107       }
1108     }
1109
1110     // Create all reference axis on server side
1111     std::set<StdString>::iterator itDom, itAxis, itScalar;
1112     std::set<StdString>::const_iterator itE;
1113
1114     StdString scalarDefRoot("scalar_definition");
1115     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1116     itE = scalarIds.end();
1117     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1118     {
1119       if (!itScalar->empty())
1120       {
1121         scalarPtr->sendCreateChild(*itScalar);
1122         CScalar::get(*itScalar)->sendAllAttributesToServer();
1123       }
1124     }
1125
1126     StdString axiDefRoot("axis_definition");
1127     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1128     itE = axisIds.end();
1129     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1130     {
1131       if (!itAxis->empty())
1132       {
1133         axisPtr->sendCreateChild(*itAxis);
1134         CAxis::get(*itAxis)->sendAllAttributesToServer();
1135       }
1136     }
1137
1138     // Create all reference domains on server side
1139     StdString domDefRoot("domain_definition");
1140     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1141     itE = domainIds.end();
1142     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1143     {
1144       if (!itDom->empty()) {
1145          domPtr->sendCreateChild(*itDom);
1146          CDomain::get(*itDom)->sendAllAttributesToServer();
1147       }
1148     }
1149   }
1150
1151   //! Update calendar in each time step
1152   void CContext::updateCalendar(int step)
1153   {
1154      calendar->update(step);
1155
1156      if (hasClient)
1157      {
1158        checkPrefetchingOfEnabledReadModeFiles();
1159        garbageCollector.invalidate(calendar->getCurrentDate());
1160      }
1161   }
1162
1163   //! Server side: Create header of netcdf file
1164   void CContext::createFileHeader(void )
1165   {
1166      vector<CFile*>::const_iterator it;
1167
1168      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1169      {
1170         (*it)->initFile();
1171      }
1172   }
1173
1174   //! Get current context
1175   CContext* CContext::getCurrent(void)
1176   {
1177     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1178   }
1179
1180   /*!
1181   \brief Set context with an id be the current context
1182   \param [in] id identity of context to be set to current
1183   */
1184   void CContext::setCurrent(const string& id)
1185   {
1186     CObjectFactory::SetCurrentContextId(id);
1187     CGroupFactory::SetCurrentContextId(id);
1188   }
1189
1190  /*!
1191  \brief Create a context with specific id
1192  \param [in] id identity of new context
1193  \return pointer to the new context or already-existed one with identity id
1194  */
1195   //bkp
1196  CContext* CContext::create(const StdString& id)
1197  {
1198    CContext::setCurrent(id);
1199
1200    bool hasctxt = CContext::has(id);
1201    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1202    getRoot();
1203    //if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1204    if (!hasctxt) CGroupFactory::AddChild(*root_ptr, context->getShared());
1205
1206#define DECLARE_NODE(Name_, name_) \
1207    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1208#define DECLARE_NODE_PAR(Name_, name_)
1209#include "node_type.conf"
1210
1211    return (context);
1212  }
1213
1214
1215
1216     //! Server side: Receive a message to do some post processing
1217  void CContext::recvRegistry(CEventServer& event)
1218  {
1219    CBufferIn* buffer=event.subEvents.begin()->buffer;
1220    string id;
1221    *buffer>>id;
1222    get(id)->recvRegistry(*buffer);
1223  }
1224
1225  void CContext::recvRegistry(CBufferIn& buffer)
1226  {
1227    if (server->intraCommRank==0)
1228    {
1229      CRegistry registry(server->intraComm) ;
1230      registry.fromBuffer(buffer) ;
1231      registryOut->mergeRegistry(registry) ;
1232    }
1233  }
1234
1235  void CContext::sendRegistry(void)
1236  {
1237    registryOut->hierarchicalGatherRegistry() ;
1238
1239    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1240    if (client->isServerLeader())
1241    {
1242       CMessage msg ;
1243       msg<<this->getIdServer();
1244       if (client->clientRank==0) msg<<*registryOut ;
1245       const std::list<int>& ranks = client->getRanksServerLeader();
1246       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1247         event.push(*itRank,1,msg);
1248       client->sendEvent(event);
1249    }
1250    else client->sendEvent(event);
1251  }
1252
1253} // namespace xios
Note: See TracBrowser for help on using the repository browser.