source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1067

Last change on this file since 1067 was 1067, checked in by yushan, 7 years ago

server mode OK tested with test_complete

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 42.3 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Dfinitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     ep_lib::MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262       
263     }
264     server = new CContextServer(this,intraCommServer,interCommServer);
265   }
266
267   void CContext::setClientServerBuffer()
268   {
269     size_t minBufferSize = CXios::minBufferSize;
270#define DECLARE_NODE(Name_, name_)    \
271     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
272#define DECLARE_NODE_PAR(Name_, name_)
273#include "node_type.conf"
274#undef DECLARE_NODE
275#undef DECLARE_NODE_PAR
276
277     std::map<int, StdSize> maxEventSize;
278     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
279     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
280
281     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
282     for (it = dataBufferSize.begin(); it != ite; ++it)
283       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
284
285     ite = bufferSize.end();
286     for (it = bufferSize.begin(); it != ite; ++it)
287     {
288       it->second *= CXios::bufferSizeFactor;
289       if (it->second < minBufferSize) it->second = minBufferSize;
290     }
291
292     // We consider that the minimum buffer size is also the minimum event size
293     ite = maxEventSize.end();
294     for (it = maxEventSize.begin(); it != ite; ++it)
295       if (it->second < minBufferSize) it->second = minBufferSize;
296
297     if (client->isServerLeader())
298     {
299       const std::list<int>& ranks = client->getRanksServerLeader();
300       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
301         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
302     }
303
304     client->setBufferSize(bufferSize, maxEventSize);
305   }
306
307   //! Verify whether a context is initialized
308   bool CContext::isInitialized(void)
309   {
310     return hasClient;
311   }
312
313   //! Initialize server
314   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
315   {
316     hasServer=true;
317     server = new CContextServer(this,intraComm,interComm);
318
319     registryIn=new CRegistry(intraComm);
320     registryIn->setPath(getId()) ;
321     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
322     registryIn->bcastRegistry() ;
323     registryOut=new CRegistry(intraComm) ;
324     registryOut->setPath(getId()) ;
325
326     ep_lib::MPI_Comm intraCommClient, interCommClient;
327     if (cxtClient) // Attached mode
328     {
329       intraCommClient = intraComm;
330       interCommClient = interComm;
331     }
332     else
333     {
334       MPI_Comm_dup(intraComm, &intraCommClient);
335       comms.push_back(intraCommClient);
336       MPI_Comm_dup(interComm, &interCommClient);
337       comms.push_back(interCommClient);
338     }
339     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
340   }
341
342   //! Server side: Put server into a loop in order to listen message from client
343   bool CContext::eventLoop(void)
344   {
345     return server->eventLoop();
346   }
347
348   //! Try to send the buffers and receive possible answers
349   bool CContext::checkBuffersAndListen(void)
350   {
351     client->checkBuffers();
352     return server->eventLoop();
353   }
354
355   //! Terminate a context
356   void CContext::finalize(void)
357   {
358      if (!finalized)
359      {
360        finalized = true;
361        if (hasClient) sendRegistry() ;
362        client->finalize();
363        while (!server->hasFinished())
364        {
365          server->eventLoop();
366        }
367
368        if (hasServer)
369        {
370          closeAllFile();
371          registryOut->hierarchicalGatherRegistry() ;
372          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
373        }
374
375        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
376          MPI_Comm_free(&(*it));
377        comms.clear();
378      }
379   }
380
381   /*!
382   \brief Close all the context defintion and do processing data
383      After everything is well defined on client side, they will be processed and sent to server
384   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
385   all necessary information to server, from which each server can build its own database.
386   Because the role of server is to write out field data on a specific netcdf file,
387   the only information that it needs is the enabled files
388   and the active fields (fields will be written onto active files)
389   */
390   void CContext::closeDefinition(void)
391   {
392     int myRank;
393     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
394
395     //printf("myRank = %d, hasClient = %d, hasServer = %d\n", myRank, hasClient, hasServer);
396
397     // There is nothing client need to send to server
398     if (hasClient)
399     {
400       // After xml is parsed, there are some more works with post processing
401       postProcessing(); 
402       //printf("myRank = %d,                postProcessing OK\n", myRank);
403     }
404     setClientServerBuffer(); //printf("myRank = %d, setClientServerBuffer OK\n", myRank);
405
406     //printf("hasClient = %d, hasServer = %d\n", hasClient, hasServer);
407
408     if (hasClient && !hasServer)
409     {
410      // Send all attributes of current context to server
411      this->sendAllAttributesToServer(); //printf("myRank = %d, this->sendAllAttributesToServer OK\n", myRank);
412
413      // Send all attributes of current calendar
414      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
415      //printf("myRank = %d, CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer OK\n", myRank);
416
417      // We have enough information to send to server
418      // First of all, send all enabled files
419       sendEnabledFiles();  //printf("myRank = %d, sendEnabledFiles OK\n", myRank);
420
421      // Then, send all enabled fields
422       sendEnabledFields();  //printf("myRank = %d, sendEnabledFields OK\n", myRank);
423
424      // At last, we have all info of domain and axis, then send them
425       sendRefDomainsAxis();  //printf("myRank = %d, sendRefDomainsAxis OK\n", myRank);
426
427      // After that, send all grid (if any)
428       sendRefGrid(); //printf("myRank = %d, sendRefGrid OK\n", myRank);
429    }
430
431    // We have a xml tree on the server side and now, it should be also processed
432    if (hasClient && !hasServer) sendPostProcessing(); 
433
434    // There are some processings that should be done after all of above. For example: check mask or index
435    if (hasClient)
436    {
437      this->buildFilterGraphOfEnabledFields();  //printf("myRank = %d, buildFilterGraphOfEnabledFields OK\n", myRank);
438      buildFilterGraphOfFieldsWithReadAccess();  //printf("myRank = %d, buildFilterGraphOfFieldsWithReadAccess OK\n", myRank);
439      this->solveAllRefOfEnabledFields(true);  //printf("myRank = %d, solveAllRefOfEnabledFields OK\n", myRank);
440    }
441
442    // Now tell server that it can process all messages from client
443    if (hasClient && !hasServer) this->sendCloseDefinition();
444
445    // Nettoyage de l'arborescence
446    if (hasClient && !hasServer) CleanTree();
447
448    if (hasClient)
449    {
450      sendCreateFileHeader();  //printf("myRank = %d, sendCreateFileHeader OK\n", myRank);
451
452      startPrefetchingOfEnabledReadModeFiles();  //printf("myRank = %d, startPrefetchingOfEnabledReadModeFiles OK\n", myRank);
453    }
454   }
455
456   void CContext::findAllEnabledFields(void)
457   {
458     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
459     (void)this->enabledFiles[i]->getEnabledFields();
460   }
461
462   void CContext::findAllEnabledFieldsInReadModeFiles(void)
463   {
464     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
465     (void)this->enabledReadModeFiles[i]->getEnabledFields();
466   }
467
468   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
469   {
470      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
471        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
472   }
473
474   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
475   {
476     int size = this->enabledFiles.size();
477     for (int i = 0; i < size; ++i)
478     {
479       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
480     }
481
482     for (int i = 0; i < size; ++i)
483     {
484       this->enabledFiles[i]->generateNewTransformationGridDest();
485     }
486   }
487
488   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
489   {
490     int size = this->enabledFiles.size();
491     
492     for (int i = 0; i < size; ++i)
493     {
494       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
495     }
496   }
497
498   void CContext::buildFilterGraphOfEnabledFields()
499   {
500     int size = this->enabledFiles.size();
501     for (int i = 0; i < size; ++i)
502     {
503       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
504     }
505   }
506
507   void CContext::startPrefetchingOfEnabledReadModeFiles()
508   {
509     int size = enabledReadModeFiles.size();
510     for (int i = 0; i < size; ++i)
511     {
512        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
513     }
514   }
515
516   void CContext::checkPrefetchingOfEnabledReadModeFiles()
517   {
518     int size = enabledReadModeFiles.size();
519     for (int i = 0; i < size; ++i)
520     {
521        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
522     }
523   }
524
525  void CContext::findFieldsWithReadAccess(void)
526  {
527    fieldsWithReadAccess.clear();
528    const vector<CField*> allFields = CField::getAll();
529    for (size_t i = 0; i < allFields.size(); ++i)
530    {
531      CField* field = allFields[i];
532
533      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
534        field->read_access = true;
535      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
536        fieldsWithReadAccess.push_back(field);
537    }
538  }
539
540  void CContext::solveAllRefOfFieldsWithReadAccess()
541  {
542    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
543      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
544  }
545
546  void CContext::buildFilterGraphOfFieldsWithReadAccess()
547  {
548    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
549      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
550  }
551
552   void CContext::solveAllInheritance(bool apply)
553   {
554     // Rsolution des hritages descendants (cd des hritages de groupes)
555     // pour chacun des contextes.
556      solveDescInheritance(apply);
557
558     // Rsolution des hritages par rfrence au niveau des fichiers.
559      const vector<CFile*> allFiles=CFile::getAll();
560      const vector<CGrid*> allGrids= CGrid::getAll();
561
562     //if (hasClient && !hasServer)
563      if (hasClient)
564      {
565        for (unsigned int i = 0; i < allFiles.size(); i++)
566          allFiles[i]->solveFieldRefInheritance(apply);
567      }
568
569      unsigned int vecSize = allGrids.size();
570      unsigned int i = 0;
571      for (i = 0; i < vecSize; ++i)
572        allGrids[i]->solveDomainAxisRefInheritance(apply);
573
574   }
575
576   void CContext::findEnabledFiles(void)
577   {
578      const std::vector<CFile*> allFiles = CFile::getAll();
579
580      for (unsigned int i = 0; i < allFiles.size(); i++)
581         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
582         {
583            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
584               enabledFiles.push_back(allFiles[i]);
585         }
586         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
587
588
589      if (enabledFiles.size() == 0)
590         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
591               << getId() << "\" !");
592   }
593
594   void CContext::findEnabledReadModeFiles(void)
595   {
596     int size = this->enabledFiles.size();
597     for (int i = 0; i < size; ++i)
598     {
599       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
600        enabledReadModeFiles.push_back(enabledFiles[i]);
601     }
602   }
603
604   void CContext::closeAllFile(void)
605   {
606     std::vector<CFile*>::const_iterator
607            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
608
609     for (; it != end; it++)
610     {
611       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
612       (*it)->close();
613     }
614   }
615
616   /*!
617   \brief Dispatch event received from client
618      Whenever a message is received in buffer of server, it will be processed depending on
619   its event type. A new event type should be added in the switch list to make sure
620   it processed on server side.
621   \param [in] event: Received message
622   */
623   bool CContext::dispatchEvent(CEventServer& event)
624   {
625
626      if (SuperClass::dispatchEvent(event)) return true;
627      else
628      {
629        switch(event.type)
630        {
631           case EVENT_ID_CLOSE_DEFINITION :
632             recvCloseDefinition(event);
633             return true;
634             break;
635           case EVENT_ID_UPDATE_CALENDAR:
636             recvUpdateCalendar(event);
637             return true;
638             break;
639           case EVENT_ID_CREATE_FILE_HEADER :
640             recvCreateFileHeader(event);
641             return true;
642             break;
643           case EVENT_ID_POST_PROCESS:
644             recvPostProcessing(event);
645             return true;
646            case EVENT_ID_SEND_REGISTRY:
647             recvRegistry(event);
648             return true;
649            break;
650
651           default :
652             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
653                    <<"Unknown Event");
654           return false;
655         }
656      }
657   }
658
659   //! Client side: Send a message to server to make it close
660   void CContext::sendCloseDefinition(void)
661   {
662     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
663     if (client->isServerLeader())
664     {
665       CMessage msg;
666       msg<<this->getIdServer();
667       const std::list<int>& ranks = client->getRanksServerLeader();
668       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
669         event.push(*itRank,1,msg);
670       client->sendEvent(event);
671     }
672     else client->sendEvent(event);
673   }
674
675   //! Server side: Receive a message of client announcing a context close
676   void CContext::recvCloseDefinition(CEventServer& event)
677   {
678
679      CBufferIn* buffer=event.subEvents.begin()->buffer;
680      string id;
681      *buffer>>id;
682      get(id)->closeDefinition();
683   }
684
685   //! Client side: Send a message to update calendar in each time step
686   void CContext::sendUpdateCalendar(int step)
687   {
688     if (!hasServer)
689     {
690       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
691       if (client->isServerLeader())
692       {
693         CMessage msg;
694         msg<<this->getIdServer()<<step;
695         const std::list<int>& ranks = client->getRanksServerLeader();
696         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
697           event.push(*itRank,1,msg);
698         client->sendEvent(event);
699       }
700       else client->sendEvent(event);
701     }
702   }
703
704   //! Server side: Receive a message of client annoucing calendar update
705   void CContext::recvUpdateCalendar(CEventServer& event)
706   {
707      CBufferIn* buffer=event.subEvents.begin()->buffer;
708      string id;
709      *buffer>>id;
710      get(id)->recvUpdateCalendar(*buffer);
711   }
712
713   //! Server side: Receive a message of client annoucing calendar update
714   void CContext::recvUpdateCalendar(CBufferIn& buffer)
715   {
716      int step;
717      buffer>>step;
718      updateCalendar(step);
719   }
720
721   //! Client side: Send a message to create header part of netcdf file
722   void CContext::sendCreateFileHeader(void)
723   {
724     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
725     if (client->isServerLeader())
726     {
727       CMessage msg;
728       msg<<this->getIdServer();
729       const std::list<int>& ranks = client->getRanksServerLeader();
730       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
731         event.push(*itRank,1,msg) ;
732       client->sendEvent(event);
733     }
734     else client->sendEvent(event);
735   }
736
737   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
738   void CContext::recvCreateFileHeader(CEventServer& event)
739   {
740      CBufferIn* buffer=event.subEvents.begin()->buffer;
741      string id;
742      *buffer>>id;
743      get(id)->recvCreateFileHeader(*buffer);
744   }
745
746   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
747   void CContext::recvCreateFileHeader(CBufferIn& buffer)
748   {
749      createFileHeader();
750   }
751
752   //! Client side: Send a message to do some post processing on server
753   void CContext::sendPostProcessing()
754   {
755     if (!hasServer)
756     {
757       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
758       if (client->isServerLeader())
759       {
760         CMessage msg;
761         msg<<this->getIdServer();
762         const std::list<int>& ranks = client->getRanksServerLeader();
763         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
764           event.push(*itRank,1,msg);
765         client->sendEvent(event);
766       }
767       else client->sendEvent(event);
768     }
769   }
770
771   //! Server side: Receive a message to do some post processing
772   void CContext::recvPostProcessing(CEventServer& event)
773   {
774      CBufferIn* buffer=event.subEvents.begin()->buffer;
775      string id;
776      *buffer>>id;
777      get(id)->recvPostProcessing(*buffer);
778   }
779
780   //! Server side: Receive a message to do some post processing
781   void CContext::recvPostProcessing(CBufferIn& buffer)
782   {
783      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
784      postProcessing();
785   }
786
787   const StdString& CContext::getIdServer()
788   {
789      if (hasClient)
790      {
791        idServer_ = this->getId();
792        idServer_ += "_server";
793        return idServer_;
794      }
795      if (hasServer) return (this->getId());
796   }
797
798   /*!
799   \brief Do some simple post processings after parsing xml file
800      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
801   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
802   which will be written out into netcdf files, are processed
803   */
804   void CContext::postProcessing()
805   {
806     int myRank;
807     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
808
809     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
810     if (isPostProcessed) return;
811
812      // Make sure the calendar was correctly created
813      if (!calendar)
814        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
815      else if (calendar->getTimeStep() == NoneDu)
816        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
817      // Calendar first update to set the current date equals to the start date
818      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
819
820      // Find all inheritance in xml structure
821      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
822
823      // Check if some axis, domains or grids are eligible to for compressed indexed output.
824      // Warning: This must be done after solving the inheritance and before the rest of post-processing
825      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
826
827      // Check if some automatic time series should be generated
828      // Warning: This must be done after solving the inheritance and before the rest of post-processing
829      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
830
831      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
832      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
833      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
834
835      // Find all enabled fields of each file
836      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
837      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
838
839     if (hasClient && !hasServer)
840     {
841      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
842      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
843     }
844
845      // Only search and rebuild all reference objects of enable fields, don't transform
846      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
847
848      // Search and rebuild all reference object of enabled fields
849      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
850
851      // Find all fields with read access from the public API
852      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
853      // and solve the all reference for them
854      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
855
856      isPostProcessed = true;
857   }
858
859   /*!
860    * Compute the required buffer size to send the attributes (mostly those grid related).
861    *
862    * \param maxEventSize [in/out] the size of the bigger event for each connected server
863    */
864   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
865   {
866     std::map<int, StdSize> attributesSize;
867
868     if (hasClient)
869     {
870       size_t numEnabledFiles = this->enabledFiles.size();
871       for (size_t i = 0; i < numEnabledFiles; ++i)
872       {
873         CFile* file = this->enabledFiles[i];
874
875         std::vector<CField*> enabledFields = file->getEnabledFields();
876         size_t numEnabledFields = enabledFields.size();
877         for (size_t j = 0; j < numEnabledFields; ++j)
878         {
879           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
880           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
881           for (; it != itE; ++it)
882           {
883             // If attributesSize[it->first] does not exist, it will be zero-initialized
884             // so we can use it safely without checking for its existance
885             if (attributesSize[it->first] < it->second)
886               attributesSize[it->first] = it->second;
887
888             if (maxEventSize[it->first] < it->second)
889               maxEventSize[it->first] = it->second;
890           }
891         }
892       }
893     }
894
895     return attributesSize;
896   }
897
898   /*!
899    * Compute the required buffer size to send the fields data.
900    *
901    * \param maxEventSize [in/out] the size of the bigger event for each connected server
902    */
903   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
904   {
905     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
906
907     std::map<int, StdSize> dataSize;
908
909     // Find all reference domain and axis of all active fields
910     size_t numEnabledFiles = this->enabledFiles.size();
911     for (size_t i = 0; i < numEnabledFiles; ++i)
912     {
913       CFile* file = this->enabledFiles[i];
914       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
915
916       if (fileMode == mode)
917       {
918         std::vector<CField*> enabledFields = file->getEnabledFields();
919         size_t numEnabledFields = enabledFields.size();
920         for (size_t j = 0; j < numEnabledFields; ++j)
921         {
922           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
923           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
924           for (; it != itE; ++it)
925           {
926             // If dataSize[it->first] does not exist, it will be zero-initialized
927             // so we can use it safely without checking for its existance
928             if (CXios::isOptPerformance)
929               dataSize[it->first] += it->second;
930             else if (dataSize[it->first] < it->second)
931               dataSize[it->first] = it->second;
932
933             if (maxEventSize[it->first] < it->second)
934               maxEventSize[it->first] = it->second;
935           }
936         }
937       }
938     }
939
940     return dataSize;
941   }
942
943   //! Client side: Send infomation of active files (files are enabled to write out)
944   void CContext::sendEnabledFiles()
945   {
946     int size = this->enabledFiles.size();
947
948     // In a context, each type has a root definition, e.g: axis, domain, field.
949     // Every object must be a child of one of these root definition. In this case
950     // all new file objects created on server must be children of the root "file_definition"
951     StdString fileDefRoot("file_definition");
952     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
953
954     for (int i = 0; i < size; ++i)
955     {
956       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
957       this->enabledFiles[i]->sendAllAttributesToServer();
958       this->enabledFiles[i]->sendAddAllVariables();
959     }
960   }
961
962   //! Client side: Send information of active fields (ones are written onto files)
963   void CContext::sendEnabledFields()
964   {
965     int size = this->enabledFiles.size();
966     for (int i = 0; i < size; ++i)
967     {
968       this->enabledFiles[i]->sendEnabledFields();
969     }
970   }
971
972   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
973   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
974   {
975     if (!hasClient) return;
976
977     const vector<CAxis*> allAxis = CAxis::getAll();
978     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
979       (*it)->checkEligibilityForCompressedOutput();
980
981     const vector<CDomain*> allDomains = CDomain::getAll();
982     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
983       (*it)->checkEligibilityForCompressedOutput();
984
985     const vector<CGrid*> allGrids = CGrid::getAll();
986     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
987       (*it)->checkEligibilityForCompressedOutput();
988   }
989
990   //! Client side: Prepare the timeseries by adding the necessary files
991   void CContext::prepareTimeseries()
992   {
993     if (!hasClient) return;
994
995     const std::vector<CFile*> allFiles = CFile::getAll();
996     for (size_t i = 0; i < allFiles.size(); i++)
997     {
998       CFile* file = allFiles[i];
999
1000       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1001       {
1002         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1003
1004         const std::vector<CField*> allFields = file->getAllFields();
1005         for (size_t j = 0; j < allFields.size(); j++)
1006         {
1007           CField* field = allFields[j];
1008
1009           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1010           {
1011             CFile* tsFile = CFile::create();
1012             tsFile->duplicateAttributes(file);
1013             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1014
1015             tsFile->name = tsPrefix + "_";
1016             if (!field->name.isEmpty())
1017               tsFile->name.get() += field->name;
1018             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1019               tsFile->name.get() += field->field_ref;
1020             else
1021               tsFile->name.get() += field->getId();
1022
1023             if (!field->ts_split_freq.isEmpty())
1024               tsFile->split_freq = field->ts_split_freq;
1025
1026             CField* tsField = tsFile->addField();
1027             tsField->field_ref = field->getId();
1028             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1029
1030             tsFile->solveFieldRefInheritance(true);
1031
1032             if (file->timeseries == CFile::timeseries_attr::exclusive)
1033               field->enabled = false;
1034           }
1035         }
1036
1037         // Finally disable the original file is need be
1038         if (file->timeseries == CFile::timeseries_attr::only)
1039          file->enabled = false;
1040       }
1041     }
1042   }
1043
1044   //! Client side: Send information of reference grid of active fields
1045   void CContext::sendRefGrid()
1046   {
1047     std::set<StdString> gridIds;
1048     int sizeFile = this->enabledFiles.size();
1049     CFile* filePtr(NULL);
1050
1051     // Firstly, find all reference grids of all active fields
1052     for (int i = 0; i < sizeFile; ++i)
1053     {
1054       filePtr = this->enabledFiles[i];
1055       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1056       int sizeField = enabledFields.size();
1057       for (int numField = 0; numField < sizeField; ++numField)
1058       {
1059         if (0 != enabledFields[numField]->getRelGrid())
1060           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1061       }
1062     }
1063
1064     // Create all reference grids on server side
1065     StdString gridDefRoot("grid_definition");
1066     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1067     std::set<StdString>::const_iterator it, itE = gridIds.end();
1068     for (it = gridIds.begin(); it != itE; ++it)
1069     {
1070       gridPtr->sendCreateChild(*it);
1071       CGrid::get(*it)->sendAllAttributesToServer();
1072       CGrid::get(*it)->sendAllDomains();
1073       CGrid::get(*it)->sendAllAxis();
1074       CGrid::get(*it)->sendAllScalars();
1075     }
1076   }
1077
1078
1079   //! Client side: Send information of reference domain and axis of active fields
1080   void CContext::sendRefDomainsAxis()
1081   {
1082     std::set<StdString> domainIds, axisIds, scalarIds;
1083
1084     // Find all reference domain and axis of all active fields
1085     int numEnabledFiles = this->enabledFiles.size();
1086     for (int i = 0; i < numEnabledFiles; ++i)
1087     {
1088       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1089       int numEnabledFields = enabledFields.size();
1090       for (int j = 0; j < numEnabledFields; ++j)
1091       {
1092         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1093         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1094         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1095         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1096       }
1097     }
1098
1099     // Create all reference axis on server side
1100     std::set<StdString>::iterator itDom, itAxis, itScalar;
1101     std::set<StdString>::const_iterator itE;
1102
1103     StdString scalarDefRoot("scalar_definition");
1104     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1105     itE = scalarIds.end();
1106     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1107     {
1108       if (!itScalar->empty())
1109       {
1110         scalarPtr->sendCreateChild(*itScalar);
1111         CScalar::get(*itScalar)->sendAllAttributesToServer();
1112       }
1113     }
1114
1115     StdString axiDefRoot("axis_definition");
1116     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1117     itE = axisIds.end();
1118     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1119     {
1120       if (!itAxis->empty())
1121       {
1122         axisPtr->sendCreateChild(*itAxis);
1123         CAxis::get(*itAxis)->sendAllAttributesToServer();
1124       }
1125     }
1126
1127     // Create all reference domains on server side
1128     StdString domDefRoot("domain_definition");
1129     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1130     itE = domainIds.end();
1131     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1132     {
1133       if (!itDom->empty()) {
1134          domPtr->sendCreateChild(*itDom);
1135          CDomain::get(*itDom)->sendAllAttributesToServer();
1136       }
1137     }
1138   }
1139
1140   //! Update calendar in each time step
1141   void CContext::updateCalendar(int step)
1142   {
1143      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1144      calendar->update(step);
1145      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1146
1147      if (hasClient)
1148      {
1149        checkPrefetchingOfEnabledReadModeFiles();
1150        garbageCollector.invalidate(calendar->getCurrentDate());
1151      }
1152   }
1153
1154   //! Server side: Create header of netcdf file
1155   void CContext::createFileHeader(void )
1156   {
1157      vector<CFile*>::const_iterator it;
1158
1159      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1160      {
1161         (*it)->initFile();
1162      }
1163   }
1164
1165   //! Get current context
1166   CContext* CContext::getCurrent(void)
1167   {
1168     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1169   }
1170
1171   /*!
1172   \brief Set context with an id be the current context
1173   \param [in] id identity of context to be set to current
1174   */
1175   void CContext::setCurrent(const string& id)
1176   {
1177     CObjectFactory::SetCurrentContextId(id);
1178     CGroupFactory::SetCurrentContextId(id);
1179   }
1180
1181  /*!
1182  \brief Create a context with specific id
1183  \param [in] id identity of new context
1184  \return pointer to the new context or already-existed one with identity id
1185  */
1186  CContext* CContext::create(const StdString& id)
1187  {
1188    CContext::setCurrent(id);
1189
1190    bool hasctxt = CContext::has(id);
1191    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1192    getRoot();
1193    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1194
1195#define DECLARE_NODE(Name_, name_) \
1196    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1197#define DECLARE_NODE_PAR(Name_, name_)
1198#include "node_type.conf"
1199
1200    return (context);
1201  }
1202
1203
1204
1205     //! Server side: Receive a message to do some post processing
1206  void CContext::recvRegistry(CEventServer& event)
1207  {
1208    CBufferIn* buffer=event.subEvents.begin()->buffer;
1209    string id;
1210    *buffer>>id;
1211    get(id)->recvRegistry(*buffer);
1212  }
1213
1214  void CContext::recvRegistry(CBufferIn& buffer)
1215  {
1216    if (server->intraCommRank==0)
1217    {
1218      CRegistry registry(server->intraComm) ;
1219      registry.fromBuffer(buffer) ;
1220      registryOut->mergeRegistry(registry) ;
1221    }
1222  }
1223
1224  void CContext::sendRegistry(void)
1225  {
1226    registryOut->hierarchicalGatherRegistry() ;
1227
1228    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1229    if (client->isServerLeader())
1230    {
1231       CMessage msg ;
1232       msg<<this->getIdServer();
1233       if (client->clientRank==0) msg<<*registryOut ;
1234       const std::list<int>& ranks = client->getRanksServerLeader();
1235       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1236         event.push(*itRank,1,msg);
1237       client->sendEvent(event);
1238    }
1239    else client->sendEvent(event);
1240  }
1241
1242} // namespace xios
Note: See TracBrowser for help on using the repository browser.