New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles/src/OCE/LBC – NEMO

source: NEMO/branches/2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles/src/OCE/LBC/lib_mpp.F90 @ 11648

Last change on this file since 11648 was 11648, checked in by acc, 5 years ago

Branch 2019/dev_r11613_ENHANCE-04_namelists_as_internalfiles. Introduce broadcast of namelist character buffer from single reader to all others. This completes the second stage but there is still an issue with AGRIF that may scupper this whole concept

  • Property svn:keywords set to Id
File size: 60.0 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
[11624]34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
[2715]35   !!----------------------------------------------------------------------
[13]36   !!----------------------------------------------------------------------
[11536]37   !!   mpp_start     : get local communicator its size and rank
[2715]38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]40   !!   mpprecv       :
[9019]41   !!   mppsend       :
[2715]42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
[1344]51   !!   mpp_ini_north : initialisation of north fold
[9019]52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[11648]53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
[13]54   !!----------------------------------------------------------------------
[3764]55   USE dom_oce        ! ocean space and time domain
[2715]56   USE in_out_manager ! I/O manager
[3]57
[13]58   IMPLICIT NONE
[415]59   PRIVATE
[9019]60   !
[11624]61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
[11536]62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
[9019]63   PUBLIC   mpp_ini_north
[1344]64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10425]65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
[3294]66   PUBLIC   mppscatter, mppgather
[10425]67   PUBLIC   mpp_ini_znl
[3764]68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[11536]69   PUBLIC   mpp_report
[11648]70   PUBLIC   mpp_bcast_nml
[11536]71   PUBLIC   tic_tac
72#if ! defined key_mpp_mpi
73   PUBLIC MPI_Wtime
74#endif
[5429]75   
[13]76   !! * Interfaces
77   !! define generic interface for these routine as they are called sometimes
[1344]78   !! with scalar arguments instead of array arguments, which causes problems
79   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]80   INTERFACE mpp_min
81      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
82   END INTERFACE
83   INTERFACE mpp_max
[681]84      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]85   END INTERFACE
86   INTERFACE mpp_sum
[6140]87      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
[9019]88         &             mppsum_realdd, mppsum_a_realdd
[13]89   END INTERFACE
[1344]90   INTERFACE mpp_minloc
91      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
92   END INTERFACE
93   INTERFACE mpp_maxloc
94      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
95   END INTERFACE
[6490]96
[51]97   !! ========================= !!
98   !!  MPI  variable definition !!
99   !! ========================= !!
[11536]100#if   defined key_mpp_mpi
[1629]101!$AGRIF_DO_NOT_TREAT
[2004]102   INCLUDE 'mpif.h'
[1629]103!$AGRIF_END_DO_NOT_TREAT
[1344]104   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[11536]105#else   
106   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
107   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
108   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
109#endif
[3]110
[1344]111   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]112
[10425]113   INTEGER, PUBLIC ::   mppsize        ! number of process
114   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]115!$AGRIF_DO_NOT_TREAT
[9570]116   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]117!$AGRIF_END_DO_NOT_TREAT
[3]118
[2480]119   INTEGER :: MPI_SUMDD
[1976]120
[1345]121   ! variables used for zonal integration
122   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]123   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
124   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
125   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]126   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]127
[3764]128   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]129   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
130   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
131   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
132   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
133   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
134   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
135   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
136   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]137
[10425]138   ! Communications summary report
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
[10437]140   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
141   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
[10425]142   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
143   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
144   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
145   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
146   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10781]147   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
[10425]148   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
149   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
[10437]150   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
[10425]151   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
152   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
153   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
154   !: name (used as id) of allreduce-delayed operations
[10521]155   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
156   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
[10425]157   !: component name where the allreduce-delayed operation is performed
158   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
159   TYPE, PUBLIC ::   DELAYARR
160      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
161      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
162   END TYPE DELAYARR
[10817]163   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
164   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
[10425]165
166   ! timing summary report
167   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
168   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
169   
[9019]170   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]171
[9019]172   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
173   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
[11536]174   
[51]175   !!----------------------------------------------------------------------
[9598]176   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]177   !! $Id$
[10068]178   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]179   !!----------------------------------------------------------------------
[3]180CONTAINS
181
[11536]182   SUBROUTINE mpp_start( localComm )
[2715]183      !!----------------------------------------------------------------------
[11536]184      !!                  ***  routine mpp_start  ***
[3764]185      !!
[11536]186      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
[51]187      !!----------------------------------------------------------------------
[6140]188      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]189      !
[11536]190      INTEGER ::   ierr
191      LOGICAL ::   llmpi_init
[51]192      !!----------------------------------------------------------------------
[11536]193#if defined key_mpp_mpi
[1344]194      !
[11536]195      CALL mpi_initialized ( llmpi_init, ierr )
196      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
[2715]197
[11536]198      IF( .NOT. llmpi_init ) THEN
199         IF( PRESENT(localComm) ) THEN
200            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
201            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
202            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
203         ENDIF
204         CALL mpi_init( ierr )
205         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
[2480]206      ENDIF
[11536]207       
[3764]208      IF( PRESENT(localComm) ) THEN
[2480]209         IF( Agrif_Root() ) THEN
[9570]210            mpi_comm_oce = localComm
[2480]211         ENDIF
212      ELSE
[11536]213         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
214         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
[3764]215      ENDIF
[2480]216
[11536]217# if defined key_agrif
[9019]218      IF( Agrif_Root() ) THEN
[9570]219         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]220      ELSE
[9570]221         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]222      ENDIF
[11536]223# endif
[5656]224
[9570]225      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
226      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[3764]227      !
[1976]228      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
229      !
[11536]230#else
231      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
232      mppsize = 1
233      mpprank = 0
234#endif
235   END SUBROUTINE mpp_start
[3]236
[6140]237
[1344]238   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]239      !!----------------------------------------------------------------------
240      !!                  ***  routine mppsend  ***
[3764]241      !!
[51]242      !! ** Purpose :   Send messag passing array
243      !!
244      !!----------------------------------------------------------------------
[1344]245      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
246      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
247      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
248      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
249      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
250      !!
251      INTEGER ::   iflag
[51]252      !!----------------------------------------------------------------------
[1344]253      !
[11536]254#if defined key_mpp_mpi
255      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
256#endif
[1344]257      !
[51]258   END SUBROUTINE mppsend
[3]259
260
[3294]261   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]262      !!----------------------------------------------------------------------
263      !!                  ***  routine mpprecv  ***
264      !!
265      !! ** Purpose :   Receive messag passing array
266      !!
267      !!----------------------------------------------------------------------
[1344]268      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
269      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
270      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]271      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]272      !!
[51]273      INTEGER :: istatus(mpi_status_size)
274      INTEGER :: iflag
[3294]275      INTEGER :: use_source
[1344]276      !!----------------------------------------------------------------------
277      !
[11536]278#if defined key_mpp_mpi
[3764]279      ! If a specific process number has been passed to the receive call,
[3294]280      ! use that one. Default is to use mpi_any_source
[6140]281      use_source = mpi_any_source
282      IF( PRESENT(ksource) )   use_source = ksource
283      !
[9570]284      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[11536]285#endif
[1344]286      !
[51]287   END SUBROUTINE mpprecv
[3]288
289
[51]290   SUBROUTINE mppgather( ptab, kp, pio )
291      !!----------------------------------------------------------------------
292      !!                   ***  routine mppgather  ***
[3764]293      !!
294      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]295      !!     array which is distributed following the vertical level.
296      !!
[1344]297      !!----------------------------------------------------------------------
[6140]298      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
299      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]300      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]301      !!
[1344]302      INTEGER :: itaille, ierror   ! temporary integer
[51]303      !!---------------------------------------------------------------------
[1344]304      !
305      itaille = jpi * jpj
[11536]306#if defined key_mpp_mpi
[1344]307      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]308         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[11536]309#else
310      pio(:,:,1) = ptab(:,:)
311#endif
[1344]312      !
[51]313   END SUBROUTINE mppgather
[3]314
315
[51]316   SUBROUTINE mppscatter( pio, kp, ptab )
317      !!----------------------------------------------------------------------
318      !!                  ***  routine mppscatter  ***
319      !!
[3764]320      !! ** Purpose :   Transfert between awork array which is distributed
[51]321      !!      following the vertical level and the local subdomain array.
322      !!
323      !!----------------------------------------------------------------------
[6140]324      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
325      INTEGER                             ::   kp     ! Tag (not used with MPI
326      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]327      !!
328      INTEGER :: itaille, ierror   ! temporary integer
[51]329      !!---------------------------------------------------------------------
[1344]330      !
[6140]331      itaille = jpi * jpj
[1344]332      !
[11536]333#if defined key_mpp_mpi
[1344]334      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]335         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[11536]336#else
337      ptab(:,:) = pio(:,:,1)
338#endif
[1344]339      !
[51]340   END SUBROUTINE mppscatter
[3]341
[10425]342   
343   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
344     !!----------------------------------------------------------------------
345      !!                   ***  routine mpp_delay_sum  ***
[1344]346      !!
[10425]347      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
348      !!
[1344]349      !!----------------------------------------------------------------------
[10425]350      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
351      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
352      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
353      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
354      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
355      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
[1344]356      !!
[10425]357      INTEGER ::   ji, isz
358      INTEGER ::   idvar
359      INTEGER ::   ierr, ilocalcomm
360      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
[1344]361      !!----------------------------------------------------------------------
[11536]362#if defined key_mpp_mpi
[9570]363      ilocalcomm = mpi_comm_oce
[9019]364      IF( PRESENT(kcom) )   ilocalcomm = kcom
[3]365
[10425]366      isz = SIZE(y_in)
367     
[10437]368      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]369
[10425]370      idvar = -1
371      DO ji = 1, nbdelay
372         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
373      END DO
374      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
375
376      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
377         !                                       --------------------------
378         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
379            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
380            DEALLOCATE(todelay(idvar)%z1d)
381            ndelayid(idvar) = -1                                      ! do as if we had no restart
382         ELSE
383            ALLOCATE(todelay(idvar)%y1d(isz))
384            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
385         END IF
386      ENDIF
387     
388      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
389         !                                       --------------------------
390         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
391         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
392         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
393      ENDIF
394
395      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
396
397      ! send back pout from todelay(idvar)%z1d defined at previous call
398      pout(:) = todelay(idvar)%z1d(:)
399
400      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
[11536]401# if defined key_mpi2
[10526]402      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
403      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
404      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]405# else
406      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
407# endif
[10526]408#else
[11536]409      pout(:) = REAL(y_in(:), wp)
[10526]410#endif
[10425]411
412   END SUBROUTINE mpp_delay_sum
413
[9019]414   
[10425]415   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
[9019]416      !!----------------------------------------------------------------------
[10425]417      !!                   ***  routine mpp_delay_max  ***
[9019]418      !!
[10425]419      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
[9019]420      !!
421      !!----------------------------------------------------------------------
[10425]422      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
423      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
424      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
425      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
426      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
427      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
[9019]428      !!
[10425]429      INTEGER ::   ji, isz
430      INTEGER ::   idvar
431      INTEGER ::   ierr, ilocalcomm
[9019]432      !!----------------------------------------------------------------------
[11536]433#if defined key_mpp_mpi
[9570]434      ilocalcomm = mpi_comm_oce
[9019]435      IF( PRESENT(kcom) )   ilocalcomm = kcom
[6140]436
[10425]437      isz = SIZE(p_in)
[9019]438
[10437]439      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
[13]440
[10425]441      idvar = -1
442      DO ji = 1, nbdelay
443         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
444      END DO
445      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
[3]446
[10425]447      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
448         !                                       --------------------------
449         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
450            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
451            DEALLOCATE(todelay(idvar)%z1d)
452            ndelayid(idvar) = -1                                      ! do as if we had no restart
453         END IF
454      ENDIF
[13]455
[10425]456      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
457         !                                       --------------------------
458         ALLOCATE(todelay(idvar)%z1d(isz))
459         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
460      ENDIF
[3]461
[10425]462      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
[3]463
[10425]464      ! send back pout from todelay(idvar)%z1d defined at previous call
465      pout(:) = todelay(idvar)%z1d(:)
[13]466
[10425]467      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
[11536]468# if defined key_mpi2
[10526]469      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
470      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
471      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[11536]472# else
473      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
474# endif
[10526]475#else
[11536]476      pout(:) = p_in(:)
[10526]477#endif
[10425]478
479   END SUBROUTINE mpp_delay_max
480
481   
482   SUBROUTINE mpp_delay_rcv( kid )
483      !!----------------------------------------------------------------------
484      !!                   ***  routine mpp_delay_rcv  ***
[1344]485      !!
[10425]486      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
[1344]487      !!
[10425]488      !!----------------------------------------------------------------------
489      INTEGER,INTENT(in   )      ::  kid 
490      INTEGER ::   ierr
491      !!----------------------------------------------------------------------
[11536]492#if defined key_mpp_mpi
[10425]493      IF( ndelayid(kid) /= -2 ) THEN 
[10526]494#if ! defined key_mpi2
[10425]495         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
496         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
497         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[10526]498#endif
[10425]499         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
500         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
501      ENDIF
[11536]502#endif
[10425]503   END SUBROUTINE mpp_delay_rcv
[3]504
[11648]505   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
506      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
507      INTEGER                          , INTENT(INOUT) :: kleng
508      !!----------------------------------------------------------------------
509      !!                  ***  routine mpp_bcast_nml  ***
510      !!
511      !! ** Purpose :   broadcast namelist character buffer
512      !!
513      !!----------------------------------------------------------------------
514      !!
515      INTEGER ::   iflag
516      !!----------------------------------------------------------------------
517      !
518#if defined key_mpp_mpi
519      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
520      call MPI_BARRIER(mpi_comm_oce, iflag)
521      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
522      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
523      call MPI_BARRIER(mpi_comm_oce, iflag)
524#endif
525      !
526   END SUBROUTINE mpp_bcast_nml
527
[10425]528   
529   !!----------------------------------------------------------------------
530   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
531   !!   
532   !!----------------------------------------------------------------------
533   !!
534#  define OPERATION_MAX
535#  define INTEGER_TYPE
536#  define DIM_0d
537#     define ROUTINE_ALLREDUCE           mppmax_int
538#     include "mpp_allreduce_generic.h90"
539#     undef ROUTINE_ALLREDUCE
540#  undef DIM_0d
541#  define DIM_1d
542#     define ROUTINE_ALLREDUCE           mppmax_a_int
543#     include "mpp_allreduce_generic.h90"
544#     undef ROUTINE_ALLREDUCE
545#  undef DIM_1d
546#  undef INTEGER_TYPE
547!
548#  define REAL_TYPE
549#  define DIM_0d
550#     define ROUTINE_ALLREDUCE           mppmax_real
551#     include "mpp_allreduce_generic.h90"
552#     undef ROUTINE_ALLREDUCE
553#  undef DIM_0d
554#  define DIM_1d
555#     define ROUTINE_ALLREDUCE           mppmax_a_real
556#     include "mpp_allreduce_generic.h90"
557#     undef ROUTINE_ALLREDUCE
558#  undef DIM_1d
559#  undef REAL_TYPE
560#  undef OPERATION_MAX
561   !!----------------------------------------------------------------------
562   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
563   !!   
564   !!----------------------------------------------------------------------
565   !!
566#  define OPERATION_MIN
567#  define INTEGER_TYPE
568#  define DIM_0d
569#     define ROUTINE_ALLREDUCE           mppmin_int
570#     include "mpp_allreduce_generic.h90"
571#     undef ROUTINE_ALLREDUCE
572#  undef DIM_0d
573#  define DIM_1d
574#     define ROUTINE_ALLREDUCE           mppmin_a_int
575#     include "mpp_allreduce_generic.h90"
576#     undef ROUTINE_ALLREDUCE
577#  undef DIM_1d
578#  undef INTEGER_TYPE
579!
580#  define REAL_TYPE
581#  define DIM_0d
582#     define ROUTINE_ALLREDUCE           mppmin_real
583#     include "mpp_allreduce_generic.h90"
584#     undef ROUTINE_ALLREDUCE
585#  undef DIM_0d
586#  define DIM_1d
587#     define ROUTINE_ALLREDUCE           mppmin_a_real
588#     include "mpp_allreduce_generic.h90"
589#     undef ROUTINE_ALLREDUCE
590#  undef DIM_1d
591#  undef REAL_TYPE
592#  undef OPERATION_MIN
[869]593
[10425]594   !!----------------------------------------------------------------------
595   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
596   !!   
597   !!   Global sum of 1D array or a variable (integer, real or complex)
598   !!----------------------------------------------------------------------
599   !!
600#  define OPERATION_SUM
601#  define INTEGER_TYPE
602#  define DIM_0d
603#     define ROUTINE_ALLREDUCE           mppsum_int
604#     include "mpp_allreduce_generic.h90"
605#     undef ROUTINE_ALLREDUCE
606#  undef DIM_0d
607#  define DIM_1d
608#     define ROUTINE_ALLREDUCE           mppsum_a_int
609#     include "mpp_allreduce_generic.h90"
610#     undef ROUTINE_ALLREDUCE
611#  undef DIM_1d
612#  undef INTEGER_TYPE
613!
614#  define REAL_TYPE
615#  define DIM_0d
616#     define ROUTINE_ALLREDUCE           mppsum_real
617#     include "mpp_allreduce_generic.h90"
618#     undef ROUTINE_ALLREDUCE
619#  undef DIM_0d
620#  define DIM_1d
621#     define ROUTINE_ALLREDUCE           mppsum_a_real
622#     include "mpp_allreduce_generic.h90"
623#     undef ROUTINE_ALLREDUCE
624#  undef DIM_1d
625#  undef REAL_TYPE
626#  undef OPERATION_SUM
627
628#  define OPERATION_SUM_DD
629#  define COMPLEX_TYPE
630#  define DIM_0d
631#     define ROUTINE_ALLREDUCE           mppsum_realdd
632#     include "mpp_allreduce_generic.h90"
633#     undef ROUTINE_ALLREDUCE
634#  undef DIM_0d
635#  define DIM_1d
636#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
637#     include "mpp_allreduce_generic.h90"
638#     undef ROUTINE_ALLREDUCE
639#  undef DIM_1d
640#  undef COMPLEX_TYPE
641#  undef OPERATION_SUM_DD
642
643   !!----------------------------------------------------------------------
644   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
645   !!   
646   !!----------------------------------------------------------------------
647   !!
648#  define OPERATION_MINLOC
649#  define DIM_2d
650#     define ROUTINE_LOC           mpp_minloc2d
651#     include "mpp_loc_generic.h90"
652#     undef ROUTINE_LOC
653#  undef DIM_2d
654#  define DIM_3d
655#     define ROUTINE_LOC           mpp_minloc3d
656#     include "mpp_loc_generic.h90"
657#     undef ROUTINE_LOC
658#  undef DIM_3d
659#  undef OPERATION_MINLOC
660
661#  define OPERATION_MAXLOC
662#  define DIM_2d
663#     define ROUTINE_LOC           mpp_maxloc2d
664#     include "mpp_loc_generic.h90"
665#     undef ROUTINE_LOC
666#  undef DIM_2d
667#  define DIM_3d
668#     define ROUTINE_LOC           mpp_maxloc3d
669#     include "mpp_loc_generic.h90"
670#     undef ROUTINE_LOC
671#  undef DIM_3d
672#  undef OPERATION_MAXLOC
673
[1344]674   SUBROUTINE mppsync()
675      !!----------------------------------------------------------------------
676      !!                  ***  routine mppsync  ***
[3764]677      !!
[1344]678      !! ** Purpose :   Massively parallel processors, synchroneous
679      !!
680      !!-----------------------------------------------------------------------
681      INTEGER :: ierror
682      !!-----------------------------------------------------------------------
683      !
[11536]684#if defined key_mpp_mpi
[9570]685      CALL mpi_barrier( mpi_comm_oce, ierror )
[11536]686#endif
[1344]687      !
688   END SUBROUTINE mppsync
[3]689
690
[11536]691   SUBROUTINE mppstop( ld_abort ) 
[1344]692      !!----------------------------------------------------------------------
693      !!                  ***  routine mppstop  ***
[3764]694      !!
[3294]695      !! ** purpose :   Stop massively parallel processors method
[1344]696      !!
697      !!----------------------------------------------------------------------
[11536]698      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
699      LOGICAL ::   ll_abort
[1344]700      INTEGER ::   info
701      !!----------------------------------------------------------------------
[11536]702      ll_abort = .FALSE.
703      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
[1344]704      !
[11536]705#if defined key_mpp_mpi
706      IF(ll_abort) THEN
[10425]707         CALL mpi_abort( MPI_COMM_WORLD )
708      ELSE
709         CALL mppsync
710         CALL mpi_finalize( info )
711      ENDIF
[11536]712#endif
713      IF( ll_abort ) STOP 123
[1344]714      !
715   END SUBROUTINE mppstop
[3]716
717
[1344]718   SUBROUTINE mpp_comm_free( kcom )
719      !!----------------------------------------------------------------------
720      INTEGER, INTENT(in) ::   kcom
721      !!
722      INTEGER :: ierr
723      !!----------------------------------------------------------------------
724      !
[11536]725#if defined key_mpp_mpi
[1344]726      CALL MPI_COMM_FREE(kcom, ierr)
[11536]727#endif
[1344]728      !
729   END SUBROUTINE mpp_comm_free
[3]730
[869]731
[2715]732   SUBROUTINE mpp_ini_znl( kumout )
[1345]733      !!----------------------------------------------------------------------
734      !!               ***  routine mpp_ini_znl  ***
735      !!
736      !! ** Purpose :   Initialize special communicator for computing zonal sum
737      !!
738      !! ** Method  : - Look for processors in the same row
739      !!              - Put their number in nrank_znl
740      !!              - Create group for the znl processors
741      !!              - Create a communicator for znl processors
742      !!              - Determine if processor should write znl files
743      !!
744      !! ** output
745      !!      ndim_rank_znl = number of processors on the same row
746      !!      ngrp_znl = group ID for the znl processors
747      !!      ncomm_znl = communicator for the ice procs.
748      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
749      !!
750      !!----------------------------------------------------------------------
[2715]751      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]752      !
[2715]753      INTEGER :: jproc      ! dummy loop integer
754      INTEGER :: ierr, ii   ! local integer
755      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
756      !!----------------------------------------------------------------------
[11536]757#if defined key_mpp_mpi
[1345]758      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
759      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]760      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]761      !
[2715]762      ALLOCATE( kwork(jpnij), STAT=ierr )
[11536]763      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
[2715]764
765      IF( jpnj == 1 ) THEN
[1345]766         ngrp_znl  = ngrp_world
[9570]767         ncomm_znl = mpi_comm_oce
[1345]768      ELSE
769         !
[9570]770         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]771         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
772         !-$$        CALL flush(numout)
773         !
774         ! Count number of processors on the same row
775         ndim_rank_znl = 0
776         DO jproc=1,jpnij
777            IF ( kwork(jproc) == njmpp ) THEN
778               ndim_rank_znl = ndim_rank_znl + 1
779            ENDIF
780         END DO
781         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
782         !-$$        CALL flush(numout)
783         ! Allocate the right size to nrank_znl
[1441]784         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]785         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]786         ii = 0
[1345]787         nrank_znl (:) = 0
788         DO jproc=1,jpnij
789            IF ( kwork(jproc) == njmpp) THEN
790               ii = ii + 1
[3764]791               nrank_znl(ii) = jproc -1
[1345]792            ENDIF
793         END DO
794         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
795         !-$$        CALL flush(numout)
796
797         ! Create the opa group
[9570]798         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]799         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
800         !-$$        CALL flush(numout)
801
802         ! Create the znl group from the opa group
803         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
804         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
805         !-$$        CALL flush(numout)
806
807         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]808         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]809         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
810         !-$$        CALL flush(numout)
811         !
812      END IF
813
814      ! Determines if processor if the first (starting from i=1) on the row
[3764]815      IF ( jpni == 1 ) THEN
[1345]816         l_znl_root = .TRUE.
817      ELSE
818         l_znl_root = .FALSE.
819         kwork (1) = nimpp
[10425]820         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]821         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
822      END IF
823
[2715]824      DEALLOCATE(kwork)
[11536]825#endif
[2715]826
[1345]827   END SUBROUTINE mpp_ini_znl
828
829
[1344]830   SUBROUTINE mpp_ini_north
831      !!----------------------------------------------------------------------
832      !!               ***  routine mpp_ini_north  ***
833      !!
[3764]834      !! ** Purpose :   Initialize special communicator for north folding
[1344]835      !!      condition together with global variables needed in the mpp folding
836      !!
837      !! ** Method  : - Look for northern processors
838      !!              - Put their number in nrank_north
839      !!              - Create groups for the world processors and the north processors
840      !!              - Create a communicator for northern processors
841      !!
842      !! ** output
843      !!      njmppmax = njmpp for northern procs
844      !!      ndim_rank_north = number of processors in the northern line
845      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
846      !!      ngrp_world = group ID for the world processors
847      !!      ngrp_north = group ID for the northern processors
848      !!      ncomm_north = communicator for the northern procs.
849      !!      north_root = number (in the world) of proc 0 in the northern comm.
850      !!
851      !!----------------------------------------------------------------------
852      INTEGER ::   ierr
853      INTEGER ::   jjproc
854      INTEGER ::   ii, ji
855      !!----------------------------------------------------------------------
856      !
[11536]857#if defined key_mpp_mpi
[1344]858      njmppmax = MAXVAL( njmppt )
859      !
860      ! Look for how many procs on the northern boundary
861      ndim_rank_north = 0
862      DO jjproc = 1, jpnij
863         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
864      END DO
865      !
866      ! Allocate the right size to nrank_north
[1441]867      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]868      ALLOCATE( nrank_north(ndim_rank_north) )
[869]869
[1344]870      ! Fill the nrank_north array with proc. number of northern procs.
871      ! Note : the rank start at 0 in MPI
872      ii = 0
873      DO ji = 1, jpnij
874         IF ( njmppt(ji) == njmppmax   ) THEN
875            ii=ii+1
876            nrank_north(ii)=ji-1
877         END IF
878      END DO
879      !
880      ! create the world group
[9570]881      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]882      !
883      ! Create the North group from the world group
884      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
885      !
886      ! Create the North communicator , ie the pool of procs in the north group
[9570]887      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]888      !
[11536]889#endif
[1344]890   END SUBROUTINE mpp_ini_north
[869]891
892
[9019]893   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]894      !!---------------------------------------------------------------------
895      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
896      !!
897      !!   Modification of original codes written by David H. Bailey
898      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
899      !!---------------------------------------------------------------------
[9019]900      INTEGER                     , INTENT(in)    ::   ilen, itype
901      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
902      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]903      !
904      REAL(wp) :: zerr, zt1, zt2    ! local work variables
[9019]905      INTEGER  :: ji, ztmp           ! local scalar
906      !!---------------------------------------------------------------------
907      !
[1976]908      ztmp = itype   ! avoid compilation warning
[9019]909      !
[1976]910      DO ji=1,ilen
911      ! Compute ydda + yddb using Knuth's trick.
912         zt1  = real(ydda(ji)) + real(yddb(ji))
913         zerr = zt1 - real(ydda(ji))
914         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
915                + aimag(ydda(ji)) + aimag(yddb(ji))
916
917         ! The result is zt1 + zt2, after normalization.
918         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
919      END DO
[9019]920      !
[1976]921   END SUBROUTINE DDPDD_MPI
922
[6140]923
[10437]924   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
[10425]925      !!----------------------------------------------------------------------
926      !!                  ***  routine mpp_report  ***
927      !!
928      !! ** Purpose :   report use of mpp routines per time-setp
929      !!
930      !!----------------------------------------------------------------------
931      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
932      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
[10437]933      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
[10425]934      !!
[10982]935      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
[10437]936      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
[10982]937      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
[10425]938      !!----------------------------------------------------------------------
[11536]939#if defined key_mpp_mpi
[10425]940      !
941      ll_lbc = .FALSE.
942      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
943      ll_glb = .FALSE.
944      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
[10437]945      ll_dlg = .FALSE.
946      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
[10425]947      !
948      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
949      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
950      ncom_freq = ncom_fsbc
951      !
952      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
953         IF( ll_lbc ) THEN
954            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
955            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
956            n_sequence_lbc = n_sequence_lbc + 1
957            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
958            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
959            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
960            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
961         ENDIF
962         IF( ll_glb ) THEN
963            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
964            n_sequence_glb = n_sequence_glb + 1
965            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
966            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
967         ENDIF
[10437]968         IF( ll_dlg ) THEN
969            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
970            n_sequence_dlg = n_sequence_dlg + 1
971            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
972            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
973         ENDIF
[10425]974      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
975         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
976         WRITE(numcom,*) ' '
977         WRITE(numcom,*) ' ------------------------------------------------------------'
978         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
979         WRITE(numcom,*) ' ------------------------------------------------------------'
980         WRITE(numcom,*) ' '
981         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
982         jj = 0; jk = 0; jf = 0; jh = 0
983         DO ji = 1, n_sequence_lbc
984            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
985            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
986            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
987            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
988         END DO
989         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
990         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
991         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
992         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
993         WRITE(numcom,*) ' '
994         WRITE(numcom,*) ' lbc_lnk called'
[10982]995         DO ji = 1, n_sequence_lbc - 1
996            IF ( crname_lbc(ji) /= 'already counted' ) THEN
997               ccountname = crname_lbc(ji)
998               crname_lbc(ji) = 'already counted'
999               jcount = 1
1000               DO jj = ji + 1, n_sequence_lbc
1001                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1002                     jcount = jcount + 1
1003                     crname_lbc(jj) = 'already counted'
1004                  END IF
1005               END DO
1006               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
[10425]1007            END IF
1008         END DO
[10982]1009         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1010            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1011         END IF
[10425]1012         WRITE(numcom,*) ' '
1013         IF ( n_sequence_glb > 0 ) THEN
1014            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1015            jj = 1
1016            DO ji = 2, n_sequence_glb
1017               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1018                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1019                  jj = 0
1020               END IF
1021               jj = jj + 1 
1022            END DO
1023            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1024            DEALLOCATE(crname_glb)
1025         ELSE
1026            WRITE(numcom,*) ' No MPI global communication '
1027         ENDIF
1028         WRITE(numcom,*) ' '
[10437]1029         IF ( n_sequence_dlg > 0 ) THEN
1030            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1031            jj = 1
1032            DO ji = 2, n_sequence_dlg
1033               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1034                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1035                  jj = 0
1036               END IF
1037               jj = jj + 1 
1038            END DO
1039            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1040            DEALLOCATE(crname_dlg)
1041         ELSE
1042            WRITE(numcom,*) ' No MPI delayed global communication '
1043         ENDIF
1044         WRITE(numcom,*) ' '
[10425]1045         WRITE(numcom,*) ' -----------------------------------------------'
1046         WRITE(numcom,*) ' '
1047         DEALLOCATE(ncomm_sequence)
1048         DEALLOCATE(crname_lbc)
1049      ENDIF
[11536]1050#endif
[10425]1051   END SUBROUTINE mpp_report
1052
[6140]1053   
[10425]1054   SUBROUTINE tic_tac (ld_tic, ld_global)
1055
1056    LOGICAL,           INTENT(IN) :: ld_tic
1057    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1058    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1059    REAL(wp),               SAVE :: tic_ct = 0._wp
1060    INTEGER :: ii
[11536]1061#if defined key_mpp_mpi
[10425]1062
1063    IF( ncom_stp <= nit000 ) RETURN
1064    IF( ncom_stp == nitend ) RETURN
1065    ii = 1
1066    IF( PRESENT( ld_global ) ) THEN
1067       IF( ld_global ) ii = 2
1068    END IF
1069   
1070    IF ( ld_tic ) THEN
1071       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1072       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1073    ELSE
1074       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1075       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1076    ENDIF
[11536]1077#endif
[10425]1078   
1079   END SUBROUTINE tic_tac
1080
[11536]1081#if ! defined key_mpp_mpi
1082   SUBROUTINE mpi_wait(request, status, ierror)
1083      INTEGER                            , INTENT(in   ) ::   request
1084      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1085      INTEGER                            , INTENT(  out) ::   ierror
1086   END SUBROUTINE mpi_wait
[1976]1087
[10425]1088   
[11536]1089   FUNCTION MPI_Wtime()
1090      REAL(wp) ::  MPI_Wtime
1091      MPI_Wtime = -1.
1092   END FUNCTION MPI_Wtime
[3]1093#endif
[2715]1094
[13]1095   !!----------------------------------------------------------------------
[11624]1096   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
[2715]1097   !!----------------------------------------------------------------------
1098
1099   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1100      &                 cd6, cd7, cd8, cd9, cd10 )
1101      !!----------------------------------------------------------------------
1102      !!                  ***  ROUTINE  stop_opa  ***
1103      !!
[3764]1104      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1105      !!                increment the error number (nstop) by one.
1106      !!----------------------------------------------------------------------
[11536]1107      CHARACTER(len=*), INTENT(in   )           ::   cd1
1108      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1109      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
[2715]1110      !!----------------------------------------------------------------------
1111      !
[3764]1112      nstop = nstop + 1
[11536]1113      !
1114      ! force to open ocean.output file if not already opened
[10425]1115      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
[11536]1116      !
1117                            WRITE(numout,*)
1118                            WRITE(numout,*) ' ===>>> : E R R O R'
1119                            WRITE(numout,*)
1120                            WRITE(numout,*) '         ==========='
1121                            WRITE(numout,*)
1122                            WRITE(numout,*) TRIM(cd1)
[10425]1123      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1124      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1125      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1126      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1127      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1128      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1129      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1130      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1131      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[11536]1132                            WRITE(numout,*)
1133      !
[2715]1134                               CALL FLUSH(numout    )
1135      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1136      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1137      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1138      !
1139      IF( cd1 == 'STOP' ) THEN
[11536]1140         WRITE(numout,*) 
[10425]1141         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
[11536]1142         WRITE(numout,*) 
1143         CALL mppstop( ld_abort = .true. )
[2715]1144      ENDIF
1145      !
1146   END SUBROUTINE ctl_stop
1147
1148
1149   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1150      &                 cd6, cd7, cd8, cd9, cd10 )
1151      !!----------------------------------------------------------------------
1152      !!                  ***  ROUTINE  stop_warn  ***
1153      !!
[3764]1154      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1155      !!                increment the warning number (nwarn) by one.
1156      !!----------------------------------------------------------------------
1157      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1158      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1159      !!----------------------------------------------------------------------
[3764]1160      !
1161      nwarn = nwarn + 1
[11536]1162      !
[2715]1163      IF(lwp) THEN
[11536]1164                               WRITE(numout,*)
1165                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1166                               WRITE(numout,*)
1167                               WRITE(numout,*) '         ==============='
1168                               WRITE(numout,*)
1169         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1170         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1171         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1172         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1173         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1174         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1175         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1176         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1177         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1178         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1179                               WRITE(numout,*)
[2715]1180      ENDIF
1181      CALL FLUSH(numout)
1182      !
1183   END SUBROUTINE ctl_warn
1184
1185
1186   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1187      !!----------------------------------------------------------------------
1188      !!                  ***  ROUTINE ctl_opn  ***
1189      !!
1190      !! ** Purpose :   Open file and check if required file is available.
1191      !!
1192      !! ** Method  :   Fortan open
1193      !!----------------------------------------------------------------------
1194      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1195      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1196      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1197      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1198      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1199      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1200      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1201      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1202      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1203      !
[2715]1204      CHARACTER(len=80) ::   clfile
1205      INTEGER           ::   iost
1206      !!----------------------------------------------------------------------
[5836]1207      !
[2715]1208      ! adapt filename
1209      ! ----------------
1210      clfile = TRIM(cdfile)
1211      IF( PRESENT( karea ) ) THEN
1212         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1213      ENDIF
1214#if defined key_agrif
1215      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1216      knum=Agrif_Get_Unit()
1217#else
1218      knum=get_unit()
1219#endif
[10425]1220      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1221      !
[11536]1222      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
[10425]1223         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1224      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1225         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
[2715]1226      ELSE
[10425]1227         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1228      ENDIF
[10425]1229      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1230         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
[2715]1231      IF( iost == 0 ) THEN
1232         IF(ldwp) THEN
[10425]1233            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
[2715]1234            WRITE(kout,*) '     unit   = ', knum
1235            WRITE(kout,*) '     status = ', cdstat
1236            WRITE(kout,*) '     form   = ', cdform
1237            WRITE(kout,*) '     access = ', cdacce
1238            WRITE(kout,*)
1239         ENDIF
1240      ENDIF
1241100   CONTINUE
1242      IF( iost /= 0 ) THEN
[11536]1243         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1244         WRITE(ctmp2,*) ' =======   ===  '
1245         WRITE(ctmp3,*) '           unit   = ', knum
1246         WRITE(ctmp4,*) '           status = ', cdstat
1247         WRITE(ctmp5,*) '           form   = ', cdform
1248         WRITE(ctmp6,*) '           access = ', cdacce
1249         WRITE(ctmp7,*) '           iostat = ', iost
1250         WRITE(ctmp8,*) '           we stop. verify the file '
1251         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
[2715]1252      ENDIF
[5836]1253      !
[2715]1254   END SUBROUTINE ctl_opn
1255
[5836]1256
[11536]1257   SUBROUTINE ctl_nam ( kios, cdnam )
[4147]1258      !!----------------------------------------------------------------------
1259      !!                  ***  ROUTINE ctl_nam  ***
1260      !!
1261      !! ** Purpose :   Informations when error while reading a namelist
1262      !!
1263      !! ** Method  :   Fortan open
1264      !!----------------------------------------------------------------------
[11536]1265      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1266      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1267      !
1268      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
[4147]1269      !!----------------------------------------------------------------------
[5836]1270      !
[7646]1271      WRITE (clios, '(I5.0)')   kios
[4147]1272      IF( kios < 0 ) THEN         
[5836]1273         CALL ctl_warn( 'end of record or file while reading namelist '   &
1274            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1275      ENDIF
[5836]1276      !
[4147]1277      IF( kios > 0 ) THEN
[5836]1278         CALL ctl_stop( 'misspelled variable in namelist '   &
1279            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1280      ENDIF
1281      kios = 0
[5836]1282      !
[4147]1283   END SUBROUTINE ctl_nam
1284
[5836]1285
[2715]1286   INTEGER FUNCTION get_unit()
1287      !!----------------------------------------------------------------------
1288      !!                  ***  FUNCTION  get_unit  ***
1289      !!
1290      !! ** Purpose :   return the index of an unused logical unit
1291      !!----------------------------------------------------------------------
[3764]1292      LOGICAL :: llopn
[2715]1293      !!----------------------------------------------------------------------
1294      !
1295      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1296      llopn = .TRUE.
1297      DO WHILE( (get_unit < 998) .AND. llopn )
1298         get_unit = get_unit + 1
1299         INQUIRE( unit = get_unit, opened = llopn )
1300      END DO
1301      IF( (get_unit == 999) .AND. llopn ) THEN
[11536]1302         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
[2715]1303      ENDIF
1304      !
1305   END FUNCTION get_unit
1306
[11624]1307   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1308      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1309      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1310      CHARACTER(LEN=256)                           :: chline
1311      INTEGER, INTENT(IN)                          :: kout
[11648]1312      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
[11624]1313      INTEGER                                      :: itot, iun, iltc, inl, ios
1314      !
1315      ! Check if the namelist buffer has already been allocated. Return if it has.
1316      !
1317      IF ( ALLOCATED( cdnambuff ) ) RETURN
[11648]1318      IF( ldwp ) THEN
1319         !
1320         ! Open namelist file
1321         !
1322         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1323         !
1324         ! First pass: count characters excluding comments and trimable white space
1325         !
1326         itot=0
1327     10  READ(iun,'(A256)',END=20,ERR=20) chline
1328         iltc = LEN_TRIM(chline)
1329         IF ( iltc.GT.0 ) THEN
1330          inl = INDEX(chline, '!') 
1331          IF( inl.eq.0 ) THEN
1332           itot = itot + iltc + 1                                ! +1 for the newline character
1333          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1334           itot = itot + inl                                  !  includes +1 for the newline character
1335          ENDIF
1336         ENDIF
1337         GOTO 10
1338     20  CONTINUE
1339         !
1340         ! Allocate text cdnambuff for condensed namelist
1341         !
1342         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1343         WRITE(*,*) 'ALLOCATED ', itot
1344         !
1345         ! Second pass: read and transfer pruned characters into cdnambuff
1346         !
1347         REWIND(iun)
1348         itot=1
1349     30  READ(iun,'(A256)',END=40,ERR=40) chline
1350         iltc = LEN_TRIM(chline)
1351         IF ( iltc.GT.0 ) THEN
1352          inl = INDEX(chline, '!')
1353          IF( inl.eq.0 ) THEN
1354           inl = iltc
1355          ELSE
1356           inl = inl - 1
1357          ENDIF
1358          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1359             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1360             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) NEW_LINE('A')
1361             itot = itot + inl + 1
1362          ENDIF
1363         ENDIF
1364         GOTO 30
1365     40  CONTINUE
1366         itot = itot - 1
1367         WRITE(*,*) 'ASSIGNED ',itot
1368         !
1369         ! Close namelist file
1370         !
1371         CLOSE(iun)
1372         !write(*,'(32A)') cdnambuff
[11624]1373      ENDIF
[11648]1374#if defined key_mpp_mpi
1375      CALL mpp_bcast_nml( cdnambuff, itot )
1376#endif
[11624]1377  END SUBROUTINE load_nml
1378
1379
[2715]1380   !!----------------------------------------------------------------------
[3]1381END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.