Changeset 1021 for XIOS/dev/dev_olga/src/node/field.cpp
- Timestamp:
- 01/10/17 14:36:29 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/node/field.cpp
r1009 r1021 36 36 , domAxisScalarIds_(vector<StdString>(3,"")), areAllReferenceSolved(false), isReferenceSolved(false) 37 37 , useCompressedOutput(false) 38 , isReadDataRequestPending(false)38 , wasDataAlreadyReceivedFromServer(false) 39 39 { setVirtualVariableGroup(CVariableGroup::create(getId() + "_virtual_variable_group")); } 40 40 … … 47 47 , domAxisScalarIds_(vector<StdString>(3,"")), areAllReferenceSolved(false), isReferenceSolved(false) 48 48 , useCompressedOutput(false) 49 , isReadDataRequestPending(false)49 , wasDataAlreadyReceivedFromServer(false) 50 50 { setVirtualVariableGroup(CVariableGroup::create(getId() + "_virtual_variable_group")); } 51 51 … … 179 179 } 180 180 181 void CField::sendUpdateData(const CArray<double,1>& data, const int srvPool)181 void CField::sendUpdateData(const CArray<double,1>& data, CContextClient* client) 182 182 { 183 183 CTimer::get("XIOS Send Data").resume(); 184 185 CContext* context = CContext::getCurrent();186 CContextClient* client = context->clientPrimServer[srvPool];187 184 188 185 CEventClient event(getType(), EVENT_ID_UPDATE_DATA); … … 312 309 int fileIdx = std::find(context->enabledFiles.begin(), context->enabledFiles.end(), this->file) - context->enabledFiles.begin(); 313 310 int srvId = fileIdx % context->clientPrimServer.size(); 314 sendUpdateData(fieldData, srvId);311 sendUpdateData(fieldData, context->clientPrimServer[srvId]); 315 312 } 316 313 if (!context->hasClient && context->hasServer) … … 348 345 } 349 346 350 void CField::sendReadDataRequest( void)347 void CField::sendReadDataRequest(const CDate& tsDataRequested) 351 348 { 352 349 CContext* context = CContext::getCurrent(); 353 350 CContextClient* client = context->client; 354 351 355 lastDataRequestedFromServer = context->getCalendar()->getCurrentDate(); 356 isReadDataRequestPending = true; 352 lastDataRequestedFromServer = tsDataRequested; 357 353 358 354 CEventClient event(getType(), EVENT_ID_READ_DATA); … … 377 373 const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate(); 378 374 379 bool requestData = (currentDate >= lastDataRequestedFromServer + file->output_freq.getValue());380 381 if (requestData)382 {383 cout<<"currentDate : "<<currentDate<<endl ;384 cout<<"lastDataRequestedFromServer : "<<lastDataRequestedFromServer<<endl ;385 cout<<"file->output_freq.getValue() : "<<file->output_freq.getValue()<<endl ;386 cout<<"lastDataRequestedFromServer + file->output_freq.getValue() : "<<lastDataRequestedFromServer + file->output_freq.getValue()<<endl ; 387 388 sendReadDataRequest(); 389 }390 391 return requestData;375 bool dataRequested = false; 376 while (currentDate >= lastDataRequestedFromServer) 377 { 378 info(20) << "currentDate : " << currentDate << endl ; 379 info(20) << "lastDataRequestedFromServer : " << lastDataRequestedFromServer << endl ; 380 info(20) << "file->output_freq.getValue() : " << file->output_freq.getValue() << endl ; 381 info(20) << "lastDataRequestedFromServer + file->output_freq.getValue() : " << lastDataRequestedFromServer + file->output_freq << endl ; 382 383 sendReadDataRequest(lastDataRequestedFromServer + file->output_freq); 384 385 dataRequested = true; 386 } 387 return dataRequested; 392 388 } 393 389 … … 411 407 412 408 map<int, CArray<double,1> >::iterator it; 413 for (it = data_srv.begin(); it != data_srv.end(); it++) 414 { 415 msgs.push_back(CMessage()); 416 CMessage& msg = msgs.back(); 417 msg << getId(); 418 if (hasData) 419 msg << getNStep() - 1 << it->second; 420 else 421 msg << int(-1); 422 event.push(it->first, grid->nbSenders[it->first], msg); 423 } 424 client->sendEvent(event); 409 // for (it = data_srv.begin(); it != data_srv.end(); it++) 410 // { 411 // msgs.push_back(CMessage()); 412 // CMessage& msg = msgs.back(); 413 // msg << getId(); 414 // if (hasData) 415 // msg << getNStep() - 1 << it->second; 416 // else 417 // msg << int(-1); 418 // event.push(it->first, grid->nbSenders[it->first], msg); 419 // } 420 // client->sendEvent(event); 421 if (!grid->doGridHaveDataDistributed()) 422 { 423 if (client->isServerLeader()) 424 { 425 if (!data_srv.empty()) 426 { 427 it = data_srv.begin(); 428 const std::list<int>& ranks = client->getRanksServerLeader(); 429 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 430 { 431 msgs.push_back(CMessage()); 432 CMessage& msg = msgs.back(); 433 msg << getId(); 434 if (hasData) 435 msg << getNStep() - 1 << it->second; 436 else 437 msg << int(-1); 438 event.push(*itRank, 1, msg); 439 } 440 } 441 client->sendEvent(event); 442 } 443 else 444 { 445 // if (!data_srv.empty()) 446 // { 447 // it = data_srv.begin(); 448 // const std::list<int>& ranks = client->getRanksServerNotLeader(); 449 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 450 // { 451 // msgs.push_back(CMessage()); 452 // CMessage& msg = msgs.back(); 453 // msg << getId(); 454 // if (hasData) 455 // msg << getNStep() - 1 << it->second; 456 // else 457 // msg << int(-1); 458 // event.push(*itRank, 1, msg); 459 // } 460 // } 461 client->sendEvent(event); 462 } 463 } 464 else 465 { 466 for (it = data_srv.begin(); it != data_srv.end(); it++) 467 { 468 msgs.push_back(CMessage()); 469 CMessage& msg = msgs.back(); 470 msg << getId(); 471 if (hasData) 472 msg << getNStep() - 1 << it->second; 473 else 474 msg << int(-1); 475 event.push(it->first, grid->nbSenders[it->first], msg); 476 } 477 client->sendEvent(event); 478 } 425 479 } 426 480 … … 494 548 } 495 549 550 if (wasDataAlreadyReceivedFromServer) 551 lastDataReceivedFromServer = lastDataReceivedFromServer + file->output_freq; 552 else 553 { 554 lastDataReceivedFromServer = context->getCalendar()->getInitDate(); 555 wasDataAlreadyReceivedFromServer = true; 556 } 557 496 558 if (isEOF) 497 serverSourceFilter->signalEndOfStream(lastDataRe questedFromServer);559 serverSourceFilter->signalEndOfStream(lastDataReceivedFromServer); 498 560 else 499 serverSourceFilter->streamDataFromServer(lastDataRequestedFromServer, data); 500 501 isReadDataRequestPending = false; 561 serverSourceFilter->streamDataFromServer(lastDataReceivedFromServer, data); 502 562 } 503 563 … … 749 809 solveGridDomainAxisRef(doSending2Server); 750 810 751 if (context->hasClient && !context->has Client)811 if (context->hasClient && !context->hasServer) 752 812 { 753 813 solveTransformedGrid(); … … 831 891 { 832 892 // Check if we have an expression to parse 833 if (!content.empty()) 834 { 835 boost::scoped_ptr<IFilterExprNode> expr(parseExpr(content + '\0')); 836 instantDataFilter = expr->reduce(gc, *this); 893 if (hasExpression()) 894 { 895 boost::scoped_ptr<IFilterExprNode> expr(parseExpr(getExpression() + '\0')); 896 boost::shared_ptr<COutputPin> filter = expr->reduce(gc, *this); 897 898 // Check if a spatial transformation is needed 899 if (!field_ref.isEmpty()) 900 { 901 CGrid* gridRef = CField::get(field_ref)->grid; 902 903 if (grid && grid != gridRef && grid->hasTransform()) 904 { 905 double defaultValue = !default_value.isEmpty() ? default_value : 0.0; 906 std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters = CSpatialTransformFilter::buildFilterGraph(gc, gridRef, grid, defaultValue); 907 908 filter->connectOutput(filters.first, 0); 909 filter = filters.second; 910 } 911 } 912 913 instantDataFilter = filter; 837 914 } 838 915 // Check if we have a reference on another field … … 841 918 // Check if the data is to be read from a file 842 919 else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read) 843 instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid, 844 freq_offset.isEmpty() ? NoneDu : freq_offset)); 920 instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 921 freq_offset.isEmpty() ? NoneDu : freq_offset, 922 true)); 845 923 else // The data might be passed from the model 846 instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(g rid));924 instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 847 925 } 848 926 … … 863 941 } 864 942 } 943 865 944 866 945 /*! … … 871 950 * \return the output pin corresponding to the field reference 872 951 */ 873 boost::shared_ptr<COutputPin> CField::getFieldReference(CGarbageCollector& gc) 874 { 875 if (instantDataFilter || field_ref.isEmpty()) 876 ERROR("COutputPin* CField::getFieldReference(CGarbageCollector& gc)", 877 "Impossible to get the field reference for a field which has already been parsed or which does not have a field_ref."); 878 879 CField* fieldRef = CField::get(field_ref); 880 fieldRef->buildFilterGraph(gc, false); 881 882 std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters; 883 // Check if a spatial transformation is needed 884 if (grid && grid != fieldRef->grid && grid->hasTransform()) 885 { 886 double defaultValue = 0.0; 887 if (!default_value.isEmpty()) defaultValue = this->default_value; 888 filters = CSpatialTransformFilter::buildFilterGraph(gc, fieldRef->grid, grid, defaultValue); 889 } 890 891 else 892 filters.first = filters.second = boost::shared_ptr<CFilter>(new CPassThroughFilter(gc)); 893 894 fieldRef->getInstantDataFilter()->connectOutput(filters.first, 0); 895 896 return filters.second; 897 } 952 boost::shared_ptr<COutputPin> CField::getFieldReference(CGarbageCollector& gc) 953 { 954 if (instantDataFilter || field_ref.isEmpty()) 955 ERROR("COutputPin* CField::getFieldReference(CGarbageCollector& gc)", 956 "Impossible to get the field reference for a field which has already been parsed or which does not have a field_ref."); 957 958 CField* fieldRef = CField::get(field_ref); 959 fieldRef->buildFilterGraph(gc, false); 960 961 std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters; 962 // Check if a spatial transformation is needed 963 if (grid && grid != fieldRef->grid && grid->hasTransform()) 964 { 965 double defaultValue = !default_value.isEmpty() ? default_value : 0.0; 966 filters = CSpatialTransformFilter::buildFilterGraph(gc, fieldRef->grid, grid, defaultValue); 967 } 968 else 969 filters.first = filters.second = boost::shared_ptr<CFilter>(new CPassThroughFilter(gc)); 970 971 fieldRef->getInstantDataFilter()->connectOutput(filters.first, 0); 972 973 return filters.second; 974 } 898 975 899 976 /*! … … 908 985 boost::shared_ptr<COutputPin> CField::getSelfReference(CGarbageCollector& gc) 909 986 { 910 if (instantDataFilter || content.empty())987 if (instantDataFilter || !hasExpression()) 911 988 ERROR("COutputPin* CField::getSelfReference(CGarbageCollector& gc)", 912 989 "Impossible to add a self reference to a field which has already been parsed or which does not have an expression."); … … 917 994 { 918 995 if (!serverSourceFilter) 919 serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(g rid,920 freq_offset.isEmpty() ? NoneDu : freq_offset ));921 996 serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 997 freq_offset.isEmpty() ? NoneDu : freq_offset, 998 true)); 922 999 selfReferenceFilter = serverSourceFilter; 923 1000 } 924 1001 else if (!field_ref.isEmpty()) 925 selfReferenceFilter = getFieldReference(gc); 1002 { 1003 CField* fieldRef = CField::get(field_ref); 1004 fieldRef->buildFilterGraph(gc, false); 1005 selfReferenceFilter = fieldRef->getInstantDataFilter(); 1006 } 926 1007 else 927 1008 { 928 1009 if (!clientSourceFilter) 929 clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(g rid));1010 clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 930 1011 931 1012 selfReferenceFilter = clientSourceFilter; … … 1126 1207 else if (grid && grid->hasTransform() && !grid->isTransformed()) 1127 1208 { 1128 grid->transformGrid(grid); 1209 // Temporarily deactivate the self-transformation of grid 1210 //grid->transformGrid(grid); 1129 1211 } 1130 1212 } … … 1325 1407 } 1326 1408 1327 void CField::sendAddAllVariables( const int srvPool)1409 void CField::sendAddAllVariables(CContextClient* client) 1328 1410 { 1329 1411 std::vector<CVariable*> allVar = getAllVariables(); … … 1333 1415 for (; it != itE; ++it) 1334 1416 { 1335 this->sendAddVariable((*it)->getId() );1336 (*it)->sendAllAttributesToServer( srvPool);1337 (*it)->sendValue( srvPool);1417 this->sendAddVariable((*it)->getId(), client); 1418 (*it)->sendAllAttributesToServer(client); 1419 (*it)->sendValue(client); 1338 1420 } 1339 1421 } … … 1363 1445 } 1364 1446 1447 void CField::sendAddVariable(const string& id, CContextClient* client) 1448 { 1449 sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE, client); 1450 // CContext* context = CContext::getCurrent(); 1451 1452 // if (!context->hasServer) 1453 // { 1454 // CContextClient* client = context->client; 1455 1456 // CEventClient event(this->getType(),EVENT_ID_ADD_VARIABLE); 1457 // if (client->isServerLeader()) 1458 // { 1459 // CMessage msg; 1460 // msg << this->getId(); 1461 // msg << id; 1462 // const std::list<int>& ranks = client->getRanksServerLeader(); 1463 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1464 // event.push(*itRank,1,msg); 1465 // client->sendEvent(event); 1466 // } 1467 // else client->sendEvent(event); 1468 // } 1469 } 1470 1365 1471 void CField::sendAddVariableGroup(const string& id) 1366 1472 { … … 1418 1524 } 1419 1525 1526 /*! 1527 * Returns string arithmetic expression associated to the field. 1528 * \return if content is defined return content string, otherwise, if "expr" attribute is defined, return expr string. 1529 */ 1530 const string& CField::getExpression(void) 1531 { 1532 if (!expr.isEmpty() && content.empty()) 1533 { 1534 content = expr; 1535 expr.reset(); 1536 } 1537 1538 return content; 1539 } 1540 1541 bool CField::hasExpression(void) const 1542 { 1543 return (!expr.isEmpty() || !content.empty()); 1544 } 1545 1546 1420 1547 DEFINE_REF_FUNC(Field,field) 1421 1548 } // namespace xios
Note: See TracChangeset
for help on using the changeset viewer.