source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 12933

Last change on this file since 12933 was 12933, checked in by smasson, 4 months ago

trunk: merge back r12581_ticket2418 branch into the trunk, see #2418

  • Property svn:keywords set to Id
File size: 61.5 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
69   PUBLIC   mpp_report
70   PUBLIC   mpp_bcast_nml
71   PUBLIC   tic_tac
72#if ! defined key_mpp_mpi
73   PUBLIC MPI_Wtime
74#endif
75   
76   !! * Interfaces
77   !! define generic interface for these routine as they are called sometimes
78   !! with scalar arguments instead of array arguments, which causes problems
79   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
80   INTERFACE mpp_min
81      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
82   END INTERFACE
83   INTERFACE mpp_max
84      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
85   END INTERFACE
86   INTERFACE mpp_sum
87      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
88         &             mppsum_realdd, mppsum_a_realdd
89   END INTERFACE
90   INTERFACE mpp_minloc
91      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
92   END INTERFACE
93   INTERFACE mpp_maxloc
94      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
95   END INTERFACE
96
97   !! ========================= !!
98   !!  MPI  variable definition !!
99   !! ========================= !!
100#if   defined key_mpp_mpi
101!$AGRIF_DO_NOT_TREAT
102   INCLUDE 'mpif.h'
103!$AGRIF_END_DO_NOT_TREAT
104   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
105#else   
106   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
107   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
108   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
109#endif
110
111   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
112
113   INTEGER, PUBLIC ::   mppsize        ! number of process
114   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
115!$AGRIF_DO_NOT_TREAT
116   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
117!$AGRIF_END_DO_NOT_TREAT
118
119   INTEGER :: MPI_SUMDD
120
121   ! variables used for zonal integration
122   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
123   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
124   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
125   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
126   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
127
128   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
129   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
130   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
131   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
132   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
133   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
134   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
135   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
136   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
137
138   ! Communications summary report
139   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
140   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
141   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
142   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
143   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
144   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
145   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
146   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
147   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
148   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
149   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
150   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
151   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
152   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
153   !: name (used as id) of allreduce-delayed operations
154   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
155   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
156   !: component name where the allreduce-delayed operation is performed
157   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
158   TYPE, PUBLIC ::   DELAYARR
159      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
160      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
161   END TYPE DELAYARR
162   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
163   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
164
165   ! timing summary report
166   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
167   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
168   
169   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
170
171   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
172   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
173   
174   !! * Substitutions
175#  include "do_loop_substitute.h90"
176   !!----------------------------------------------------------------------
177   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
178   !! $Id$
179   !! Software governed by the CeCILL license (see ./LICENSE)
180   !!----------------------------------------------------------------------
181CONTAINS
182
183   SUBROUTINE mpp_start( localComm )
184      !!----------------------------------------------------------------------
185      !!                  ***  routine mpp_start  ***
186      !!
187      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
188      !!----------------------------------------------------------------------
189      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
190      !
191      INTEGER ::   ierr
192      LOGICAL ::   llmpi_init
193      !!----------------------------------------------------------------------
194#if defined key_mpp_mpi
195      !
196      CALL mpi_initialized ( llmpi_init, ierr )
197      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
198
199      IF( .NOT. llmpi_init ) THEN
200         IF( PRESENT(localComm) ) THEN
201            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
202            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
203            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
204         ENDIF
205         CALL mpi_init( ierr )
206         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
207      ENDIF
208       
209      IF( PRESENT(localComm) ) THEN
210         IF( Agrif_Root() ) THEN
211            mpi_comm_oce = localComm
212         ENDIF
213      ELSE
214         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
215         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
216      ENDIF
217
218# if defined key_agrif
219      IF( Agrif_Root() ) THEN
220         CALL Agrif_MPI_Init(mpi_comm_oce)
221      ELSE
222         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
223      ENDIF
224# endif
225
226      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
227      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
228      !
229      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
230      !
231#else
232      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
233      mppsize = 1
234      mpprank = 0
235#endif
236   END SUBROUTINE mpp_start
237
238
239   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
240      !!----------------------------------------------------------------------
241      !!                  ***  routine mppsend  ***
242      !!
243      !! ** Purpose :   Send messag passing array
244      !!
245      !!----------------------------------------------------------------------
246      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
247      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
248      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
249      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
250      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
251      !!
252      INTEGER ::   iflag
253      !!----------------------------------------------------------------------
254      !
255#if defined key_mpp_mpi
256      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
257#endif
258      !
259   END SUBROUTINE mppsend
260
261
262   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
263      !!----------------------------------------------------------------------
264      !!                  ***  routine mpprecv  ***
265      !!
266      !! ** Purpose :   Receive messag passing array
267      !!
268      !!----------------------------------------------------------------------
269      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
270      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
271      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
272      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
273      !!
274      INTEGER :: istatus(mpi_status_size)
275      INTEGER :: iflag
276      INTEGER :: use_source
277      !!----------------------------------------------------------------------
278      !
279#if defined key_mpp_mpi
280      ! If a specific process number has been passed to the receive call,
281      ! use that one. Default is to use mpi_any_source
282      use_source = mpi_any_source
283      IF( PRESENT(ksource) )   use_source = ksource
284      !
285      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
286#endif
287      !
288   END SUBROUTINE mpprecv
289
290
291   SUBROUTINE mppgather( ptab, kp, pio )
292      !!----------------------------------------------------------------------
293      !!                   ***  routine mppgather  ***
294      !!
295      !! ** Purpose :   Transfert between a local subdomain array and a work
296      !!     array which is distributed following the vertical level.
297      !!
298      !!----------------------------------------------------------------------
299      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
300      INTEGER                           , INTENT(in   ) ::   kp     ! record length
301      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
302      !!
303      INTEGER :: itaille, ierror   ! temporary integer
304      !!---------------------------------------------------------------------
305      !
306      itaille = jpi * jpj
307#if defined key_mpp_mpi
308      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
309         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
310#else
311      pio(:,:,1) = ptab(:,:)
312#endif
313      !
314   END SUBROUTINE mppgather
315
316
317   SUBROUTINE mppscatter( pio, kp, ptab )
318      !!----------------------------------------------------------------------
319      !!                  ***  routine mppscatter  ***
320      !!
321      !! ** Purpose :   Transfert between awork array which is distributed
322      !!      following the vertical level and the local subdomain array.
323      !!
324      !!----------------------------------------------------------------------
325      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
326      INTEGER                             ::   kp     ! Tag (not used with MPI
327      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
328      !!
329      INTEGER :: itaille, ierror   ! temporary integer
330      !!---------------------------------------------------------------------
331      !
332      itaille = jpi * jpj
333      !
334#if defined key_mpp_mpi
335      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
336         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
337#else
338      ptab(:,:) = pio(:,:,1)
339#endif
340      !
341   END SUBROUTINE mppscatter
342
343   
344   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
345     !!----------------------------------------------------------------------
346      !!                   ***  routine mpp_delay_sum  ***
347      !!
348      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
349      !!
350      !!----------------------------------------------------------------------
351      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
352      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
353      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
354      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
355      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
356      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
357      !!
358      INTEGER ::   ji, isz
359      INTEGER ::   idvar
360      INTEGER ::   ierr, ilocalcomm
361      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
362      !!----------------------------------------------------------------------
363#if defined key_mpp_mpi
364      ilocalcomm = mpi_comm_oce
365      IF( PRESENT(kcom) )   ilocalcomm = kcom
366
367      isz = SIZE(y_in)
368     
369      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
370
371      idvar = -1
372      DO ji = 1, nbdelay
373         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
374      END DO
375      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
376
377      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
378         !                                       --------------------------
379         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
380            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
381            DEALLOCATE(todelay(idvar)%z1d)
382            ndelayid(idvar) = -1                                      ! do as if we had no restart
383         ELSE
384            ALLOCATE(todelay(idvar)%y1d(isz))
385            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
386         END IF
387      ENDIF
388     
389      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
390         !                                       --------------------------
391         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
392         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
393         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
394      ENDIF
395
396      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
397
398      ! send back pout from todelay(idvar)%z1d defined at previous call
399      pout(:) = todelay(idvar)%z1d(:)
400
401      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
402# if defined key_mpi2
403      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
404      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
405      ndelayid(idvar) = 1
406      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
407# else
408      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
409# endif
410#else
411      pout(:) = REAL(y_in(:), wp)
412#endif
413
414   END SUBROUTINE mpp_delay_sum
415
416   
417   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
418      !!----------------------------------------------------------------------
419      !!                   ***  routine mpp_delay_max  ***
420      !!
421      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
422      !!
423      !!----------------------------------------------------------------------
424      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
425      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
426      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
427      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
428      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
429      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
430      !!
431      INTEGER ::   ji, isz
432      INTEGER ::   idvar
433      INTEGER ::   ierr, ilocalcomm
434      !!----------------------------------------------------------------------
435#if defined key_mpp_mpi
436      ilocalcomm = mpi_comm_oce
437      IF( PRESENT(kcom) )   ilocalcomm = kcom
438
439      isz = SIZE(p_in)
440
441      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
442
443      idvar = -1
444      DO ji = 1, nbdelay
445         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
446      END DO
447      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
448
449      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
450         !                                       --------------------------
451         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
452            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
453            DEALLOCATE(todelay(idvar)%z1d)
454            ndelayid(idvar) = -1                                      ! do as if we had no restart
455         END IF
456      ENDIF
457
458      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
459         !                                       --------------------------
460         ALLOCATE(todelay(idvar)%z1d(isz))
461         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
462      ENDIF
463
464      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
465
466      ! send back pout from todelay(idvar)%z1d defined at previous call
467      pout(:) = todelay(idvar)%z1d(:)
468
469      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
470# if defined key_mpi2
471      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
472      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )
473      ndelayid(idvar) = 1
474      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
475# else
476      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
477# endif
478#else
479      pout(:) = p_in(:)
480#endif
481
482   END SUBROUTINE mpp_delay_max
483
484   
485   SUBROUTINE mpp_delay_rcv( kid )
486      !!----------------------------------------------------------------------
487      !!                   ***  routine mpp_delay_rcv  ***
488      !!
489      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
490      !!
491      !!----------------------------------------------------------------------
492      INTEGER,INTENT(in   )      ::  kid 
493      INTEGER ::   ierr
494      !!----------------------------------------------------------------------
495#if defined key_mpp_mpi
496      IF( ndelayid(kid) /= -2 ) THEN 
497#if ! defined key_mpi2
498         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
499         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
500         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
501#endif
502         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
503         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
504      ENDIF
505#endif
506   END SUBROUTINE mpp_delay_rcv
507
508   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
509      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
510      INTEGER                          , INTENT(INOUT) :: kleng
511      !!----------------------------------------------------------------------
512      !!                  ***  routine mpp_bcast_nml  ***
513      !!
514      !! ** Purpose :   broadcast namelist character buffer
515      !!
516      !!----------------------------------------------------------------------
517      !!
518      INTEGER ::   iflag
519      !!----------------------------------------------------------------------
520      !
521#if defined key_mpp_mpi
522      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
523      call MPI_BARRIER(mpi_comm_oce, iflag)
524!$AGRIF_DO_NOT_TREAT
525      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
526!$AGRIF_END_DO_NOT_TREAT
527      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
528      call MPI_BARRIER(mpi_comm_oce, iflag)
529#endif
530      !
531   END SUBROUTINE mpp_bcast_nml
532
533   
534   !!----------------------------------------------------------------------
535   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
536   !!   
537   !!----------------------------------------------------------------------
538   !!
539#  define OPERATION_MAX
540#  define INTEGER_TYPE
541#  define DIM_0d
542#     define ROUTINE_ALLREDUCE           mppmax_int
543#     include "mpp_allreduce_generic.h90"
544#     undef ROUTINE_ALLREDUCE
545#  undef DIM_0d
546#  define DIM_1d
547#     define ROUTINE_ALLREDUCE           mppmax_a_int
548#     include "mpp_allreduce_generic.h90"
549#     undef ROUTINE_ALLREDUCE
550#  undef DIM_1d
551#  undef INTEGER_TYPE
552!
553#  define REAL_TYPE
554#  define DIM_0d
555#     define ROUTINE_ALLREDUCE           mppmax_real
556#     include "mpp_allreduce_generic.h90"
557#     undef ROUTINE_ALLREDUCE
558#  undef DIM_0d
559#  define DIM_1d
560#     define ROUTINE_ALLREDUCE           mppmax_a_real
561#     include "mpp_allreduce_generic.h90"
562#     undef ROUTINE_ALLREDUCE
563#  undef DIM_1d
564#  undef REAL_TYPE
565#  undef OPERATION_MAX
566   !!----------------------------------------------------------------------
567   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
568   !!   
569   !!----------------------------------------------------------------------
570   !!
571#  define OPERATION_MIN
572#  define INTEGER_TYPE
573#  define DIM_0d
574#     define ROUTINE_ALLREDUCE           mppmin_int
575#     include "mpp_allreduce_generic.h90"
576#     undef ROUTINE_ALLREDUCE
577#  undef DIM_0d
578#  define DIM_1d
579#     define ROUTINE_ALLREDUCE           mppmin_a_int
580#     include "mpp_allreduce_generic.h90"
581#     undef ROUTINE_ALLREDUCE
582#  undef DIM_1d
583#  undef INTEGER_TYPE
584!
585#  define REAL_TYPE
586#  define DIM_0d
587#     define ROUTINE_ALLREDUCE           mppmin_real
588#     include "mpp_allreduce_generic.h90"
589#     undef ROUTINE_ALLREDUCE
590#  undef DIM_0d
591#  define DIM_1d
592#     define ROUTINE_ALLREDUCE           mppmin_a_real
593#     include "mpp_allreduce_generic.h90"
594#     undef ROUTINE_ALLREDUCE
595#  undef DIM_1d
596#  undef REAL_TYPE
597#  undef OPERATION_MIN
598
599   !!----------------------------------------------------------------------
600   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
601   !!   
602   !!   Global sum of 1D array or a variable (integer, real or complex)
603   !!----------------------------------------------------------------------
604   !!
605#  define OPERATION_SUM
606#  define INTEGER_TYPE
607#  define DIM_0d
608#     define ROUTINE_ALLREDUCE           mppsum_int
609#     include "mpp_allreduce_generic.h90"
610#     undef ROUTINE_ALLREDUCE
611#  undef DIM_0d
612#  define DIM_1d
613#     define ROUTINE_ALLREDUCE           mppsum_a_int
614#     include "mpp_allreduce_generic.h90"
615#     undef ROUTINE_ALLREDUCE
616#  undef DIM_1d
617#  undef INTEGER_TYPE
618!
619#  define REAL_TYPE
620#  define DIM_0d
621#     define ROUTINE_ALLREDUCE           mppsum_real
622#     include "mpp_allreduce_generic.h90"
623#     undef ROUTINE_ALLREDUCE
624#  undef DIM_0d
625#  define DIM_1d
626#     define ROUTINE_ALLREDUCE           mppsum_a_real
627#     include "mpp_allreduce_generic.h90"
628#     undef ROUTINE_ALLREDUCE
629#  undef DIM_1d
630#  undef REAL_TYPE
631#  undef OPERATION_SUM
632
633#  define OPERATION_SUM_DD
634#  define COMPLEX_TYPE
635#  define DIM_0d
636#     define ROUTINE_ALLREDUCE           mppsum_realdd
637#     include "mpp_allreduce_generic.h90"
638#     undef ROUTINE_ALLREDUCE
639#  undef DIM_0d
640#  define DIM_1d
641#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
642#     include "mpp_allreduce_generic.h90"
643#     undef ROUTINE_ALLREDUCE
644#  undef DIM_1d
645#  undef COMPLEX_TYPE
646#  undef OPERATION_SUM_DD
647
648   !!----------------------------------------------------------------------
649   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
650   !!   
651   !!----------------------------------------------------------------------
652   !!
653#  define OPERATION_MINLOC
654#  define DIM_2d
655#     define ROUTINE_LOC           mpp_minloc2d
656#     include "mpp_loc_generic.h90"
657#     undef ROUTINE_LOC
658#  undef DIM_2d
659#  define DIM_3d
660#     define ROUTINE_LOC           mpp_minloc3d
661#     include "mpp_loc_generic.h90"
662#     undef ROUTINE_LOC
663#  undef DIM_3d
664#  undef OPERATION_MINLOC
665
666#  define OPERATION_MAXLOC
667#  define DIM_2d
668#     define ROUTINE_LOC           mpp_maxloc2d
669#     include "mpp_loc_generic.h90"
670#     undef ROUTINE_LOC
671#  undef DIM_2d
672#  define DIM_3d
673#     define ROUTINE_LOC           mpp_maxloc3d
674#     include "mpp_loc_generic.h90"
675#     undef ROUTINE_LOC
676#  undef DIM_3d
677#  undef OPERATION_MAXLOC
678
679   SUBROUTINE mppsync()
680      !!----------------------------------------------------------------------
681      !!                  ***  routine mppsync  ***
682      !!
683      !! ** Purpose :   Massively parallel processors, synchroneous
684      !!
685      !!-----------------------------------------------------------------------
686      INTEGER :: ierror
687      !!-----------------------------------------------------------------------
688      !
689#if defined key_mpp_mpi
690      CALL mpi_barrier( mpi_comm_oce, ierror )
691#endif
692      !
693   END SUBROUTINE mppsync
694
695
696   SUBROUTINE mppstop( ld_abort ) 
697      !!----------------------------------------------------------------------
698      !!                  ***  routine mppstop  ***
699      !!
700      !! ** purpose :   Stop massively parallel processors method
701      !!
702      !!----------------------------------------------------------------------
703      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
704      LOGICAL ::   ll_abort
705      INTEGER ::   info
706      !!----------------------------------------------------------------------
707      ll_abort = .FALSE.
708      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
709      !
710#if defined key_mpp_mpi
711      IF(ll_abort) THEN
712         CALL mpi_abort( MPI_COMM_WORLD )
713      ELSE
714         CALL mppsync
715         CALL mpi_finalize( info )
716      ENDIF
717#endif
718      IF( ll_abort ) STOP 123
719      !
720   END SUBROUTINE mppstop
721
722
723   SUBROUTINE mpp_comm_free( kcom )
724      !!----------------------------------------------------------------------
725      INTEGER, INTENT(in) ::   kcom
726      !!
727      INTEGER :: ierr
728      !!----------------------------------------------------------------------
729      !
730#if defined key_mpp_mpi
731      CALL MPI_COMM_FREE(kcom, ierr)
732#endif
733      !
734   END SUBROUTINE mpp_comm_free
735
736
737   SUBROUTINE mpp_ini_znl( kumout )
738      !!----------------------------------------------------------------------
739      !!               ***  routine mpp_ini_znl  ***
740      !!
741      !! ** Purpose :   Initialize special communicator for computing zonal sum
742      !!
743      !! ** Method  : - Look for processors in the same row
744      !!              - Put their number in nrank_znl
745      !!              - Create group for the znl processors
746      !!              - Create a communicator for znl processors
747      !!              - Determine if processor should write znl files
748      !!
749      !! ** output
750      !!      ndim_rank_znl = number of processors on the same row
751      !!      ngrp_znl = group ID for the znl processors
752      !!      ncomm_znl = communicator for the ice procs.
753      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
754      !!
755      !!----------------------------------------------------------------------
756      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
757      !
758      INTEGER :: jproc      ! dummy loop integer
759      INTEGER :: ierr, ii   ! local integer
760      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
761      !!----------------------------------------------------------------------
762#if defined key_mpp_mpi
763      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
764      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
765      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
766      !
767      ALLOCATE( kwork(jpnij), STAT=ierr )
768      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
769
770      IF( jpnj == 1 ) THEN
771         ngrp_znl  = ngrp_world
772         ncomm_znl = mpi_comm_oce
773      ELSE
774         !
775         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
776         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
777         !-$$        CALL flush(numout)
778         !
779         ! Count number of processors on the same row
780         ndim_rank_znl = 0
781         DO jproc=1,jpnij
782            IF ( kwork(jproc) == njmpp ) THEN
783               ndim_rank_znl = ndim_rank_znl + 1
784            ENDIF
785         END DO
786         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
787         !-$$        CALL flush(numout)
788         ! Allocate the right size to nrank_znl
789         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
790         ALLOCATE(nrank_znl(ndim_rank_znl))
791         ii = 0
792         nrank_znl (:) = 0
793         DO jproc=1,jpnij
794            IF ( kwork(jproc) == njmpp) THEN
795               ii = ii + 1
796               nrank_znl(ii) = jproc -1
797            ENDIF
798         END DO
799         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
800         !-$$        CALL flush(numout)
801
802         ! Create the opa group
803         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
804         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
805         !-$$        CALL flush(numout)
806
807         ! Create the znl group from the opa group
808         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
809         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
810         !-$$        CALL flush(numout)
811
812         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
813         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
814         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
815         !-$$        CALL flush(numout)
816         !
817      END IF
818
819      ! Determines if processor if the first (starting from i=1) on the row
820      IF ( jpni == 1 ) THEN
821         l_znl_root = .TRUE.
822      ELSE
823         l_znl_root = .FALSE.
824         kwork (1) = nimpp
825         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
826         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
827      END IF
828
829      DEALLOCATE(kwork)
830#endif
831
832   END SUBROUTINE mpp_ini_znl
833
834
835   SUBROUTINE mpp_ini_north
836      !!----------------------------------------------------------------------
837      !!               ***  routine mpp_ini_north  ***
838      !!
839      !! ** Purpose :   Initialize special communicator for north folding
840      !!      condition together with global variables needed in the mpp folding
841      !!
842      !! ** Method  : - Look for northern processors
843      !!              - Put their number in nrank_north
844      !!              - Create groups for the world processors and the north processors
845      !!              - Create a communicator for northern processors
846      !!
847      !! ** output
848      !!      njmppmax = njmpp for northern procs
849      !!      ndim_rank_north = number of processors in the northern line
850      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
851      !!      ngrp_world = group ID for the world processors
852      !!      ngrp_north = group ID for the northern processors
853      !!      ncomm_north = communicator for the northern procs.
854      !!      north_root = number (in the world) of proc 0 in the northern comm.
855      !!
856      !!----------------------------------------------------------------------
857      INTEGER ::   ierr
858      INTEGER ::   jjproc
859      INTEGER ::   ii, ji
860      !!----------------------------------------------------------------------
861      !
862#if defined key_mpp_mpi
863      njmppmax = MAXVAL( njmppt )
864      !
865      ! Look for how many procs on the northern boundary
866      ndim_rank_north = 0
867      DO jjproc = 1, jpnij
868         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
869      END DO
870      !
871      ! Allocate the right size to nrank_north
872      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
873      ALLOCATE( nrank_north(ndim_rank_north) )
874
875      ! Fill the nrank_north array with proc. number of northern procs.
876      ! Note : the rank start at 0 in MPI
877      ii = 0
878      DO ji = 1, jpnij
879         IF ( njmppt(ji) == njmppmax   ) THEN
880            ii=ii+1
881            nrank_north(ii)=ji-1
882         END IF
883      END DO
884      !
885      ! create the world group
886      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
887      !
888      ! Create the North group from the world group
889      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
890      !
891      ! Create the North communicator , ie the pool of procs in the north group
892      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
893      !
894#endif
895   END SUBROUTINE mpp_ini_north
896
897
898   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
899      !!---------------------------------------------------------------------
900      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
901      !!
902      !!   Modification of original codes written by David H. Bailey
903      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
904      !!---------------------------------------------------------------------
905      INTEGER                     , INTENT(in)    ::   ilen, itype
906      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
907      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
908      !
909      REAL(wp) :: zerr, zt1, zt2    ! local work variables
910      INTEGER  :: ji, ztmp           ! local scalar
911      !!---------------------------------------------------------------------
912      !
913      ztmp = itype   ! avoid compilation warning
914      !
915      DO ji=1,ilen
916      ! Compute ydda + yddb using Knuth's trick.
917         zt1  = real(ydda(ji)) + real(yddb(ji))
918         zerr = zt1 - real(ydda(ji))
919         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
920                + aimag(ydda(ji)) + aimag(yddb(ji))
921
922         ! The result is zt1 + zt2, after normalization.
923         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
924      END DO
925      !
926   END SUBROUTINE DDPDD_MPI
927
928
929   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
930      !!----------------------------------------------------------------------
931      !!                  ***  routine mpp_report  ***
932      !!
933      !! ** Purpose :   report use of mpp routines per time-setp
934      !!
935      !!----------------------------------------------------------------------
936      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
937      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
938      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
939      !!
940      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
941      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
942      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
943      !!----------------------------------------------------------------------
944#if defined key_mpp_mpi
945      !
946      ll_lbc = .FALSE.
947      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
948      ll_glb = .FALSE.
949      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
950      ll_dlg = .FALSE.
951      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
952      !
953      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
954      ncom_freq = ncom_fsbc
955      !
956      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
957         IF( ll_lbc ) THEN
958            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
959            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
960            n_sequence_lbc = n_sequence_lbc + 1
961            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
962            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
963            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
964            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
965         ENDIF
966         IF( ll_glb ) THEN
967            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
968            n_sequence_glb = n_sequence_glb + 1
969            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
970            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
971         ENDIF
972         IF( ll_dlg ) THEN
973            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
974            n_sequence_dlg = n_sequence_dlg + 1
975            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
976            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
977         ENDIF
978      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
979         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
980         WRITE(numcom,*) ' '
981         WRITE(numcom,*) ' ------------------------------------------------------------'
982         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
983         WRITE(numcom,*) ' ------------------------------------------------------------'
984         WRITE(numcom,*) ' '
985         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
986         jj = 0; jk = 0; jf = 0; jh = 0
987         DO ji = 1, n_sequence_lbc
988            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
989            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
990            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
991            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
992         END DO
993         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
994         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
995         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
996         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
997         WRITE(numcom,*) ' '
998         WRITE(numcom,*) ' lbc_lnk called'
999         DO ji = 1, n_sequence_lbc - 1
1000            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1001               ccountname = crname_lbc(ji)
1002               crname_lbc(ji) = 'already counted'
1003               jcount = 1
1004               DO jj = ji + 1, n_sequence_lbc
1005                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1006                     jcount = jcount + 1
1007                     crname_lbc(jj) = 'already counted'
1008                  END IF
1009               END DO
1010               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1011            END IF
1012         END DO
1013         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1014            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1015         END IF
1016         WRITE(numcom,*) ' '
1017         IF ( n_sequence_glb > 0 ) THEN
1018            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1019            jj = 1
1020            DO ji = 2, n_sequence_glb
1021               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1022                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1023                  jj = 0
1024               END IF
1025               jj = jj + 1 
1026            END DO
1027            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1028            DEALLOCATE(crname_glb)
1029         ELSE
1030            WRITE(numcom,*) ' No MPI global communication '
1031         ENDIF
1032         WRITE(numcom,*) ' '
1033         IF ( n_sequence_dlg > 0 ) THEN
1034            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1035            jj = 1
1036            DO ji = 2, n_sequence_dlg
1037               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1038                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1039                  jj = 0
1040               END IF
1041               jj = jj + 1 
1042            END DO
1043            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1044            DEALLOCATE(crname_dlg)
1045         ELSE
1046            WRITE(numcom,*) ' No MPI delayed global communication '
1047         ENDIF
1048         WRITE(numcom,*) ' '
1049         WRITE(numcom,*) ' -----------------------------------------------'
1050         WRITE(numcom,*) ' '
1051         DEALLOCATE(ncomm_sequence)
1052         DEALLOCATE(crname_lbc)
1053      ENDIF
1054#endif
1055   END SUBROUTINE mpp_report
1056
1057   
1058   SUBROUTINE tic_tac (ld_tic, ld_global)
1059
1060    LOGICAL,           INTENT(IN) :: ld_tic
1061    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1062    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1063    REAL(wp),               SAVE :: tic_ct = 0._wp
1064    INTEGER :: ii
1065#if defined key_mpp_mpi
1066
1067    IF( ncom_stp <= nit000 ) RETURN
1068    IF( ncom_stp == nitend ) RETURN
1069    ii = 1
1070    IF( PRESENT( ld_global ) ) THEN
1071       IF( ld_global ) ii = 2
1072    END IF
1073   
1074    IF ( ld_tic ) THEN
1075       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1076       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1077    ELSE
1078       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1079       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1080    ENDIF
1081#endif
1082   
1083   END SUBROUTINE tic_tac
1084
1085#if ! defined key_mpp_mpi
1086   SUBROUTINE mpi_wait(request, status, ierror)
1087      INTEGER                            , INTENT(in   ) ::   request
1088      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1089      INTEGER                            , INTENT(  out) ::   ierror
1090   END SUBROUTINE mpi_wait
1091
1092   
1093   FUNCTION MPI_Wtime()
1094      REAL(wp) ::  MPI_Wtime
1095      MPI_Wtime = -1.
1096   END FUNCTION MPI_Wtime
1097#endif
1098
1099   !!----------------------------------------------------------------------
1100   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1101   !!----------------------------------------------------------------------
1102
1103   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1104      &                 cd6, cd7, cd8, cd9, cd10 )
1105      !!----------------------------------------------------------------------
1106      !!                  ***  ROUTINE  stop_opa  ***
1107      !!
1108      !! ** Purpose :   print in ocean.outpput file a error message and
1109      !!                increment the error number (nstop) by one.
1110      !!----------------------------------------------------------------------
1111      CHARACTER(len=*), INTENT(in   )           ::   cd1
1112      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1113      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1114      !
1115      CHARACTER(LEN=8) ::   clfmt            ! writing format
1116      INTEGER ::   inum
1117      INTEGER ::   idg  ! number of digits
1118      !!----------------------------------------------------------------------
1119      !
1120      nstop = nstop + 1
1121      !
1122      IF( numout == 6 ) THEN                          ! force to open ocean.output file if not already opened
1123         CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1124      ELSE
1125         IF( narea > 1 .AND. cd1 == 'STOP' ) THEN     ! add an error message in ocean.output
1126            CALL ctl_opn( inum,'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1127            WRITE(inum,*)
1128            idg = MAX( INT(LOG10(REAL(jpnij-1,wp))) + 1, 4 )        ! how many digits to we need to write? min=4, max=9
1129            WRITE(clfmt, "('(a,i', i1, '.', i1, ')')") idg, idg     ! '(a,ix.x)'
1130            WRITE(inum,clfmt) ' ===>>> : see E R R O R in ocean.output_', narea - 1
1131         ENDIF
1132      ENDIF
1133      !
1134                            WRITE(numout,*)
1135                            WRITE(numout,*) ' ===>>> : E R R O R'
1136                            WRITE(numout,*)
1137                            WRITE(numout,*) '         ==========='
1138                            WRITE(numout,*)
1139                            WRITE(numout,*) TRIM(cd1)
1140      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1141      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1142      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1143      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1144      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1145      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1146      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1147      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1148      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1149                            WRITE(numout,*)
1150      !
1151                               CALL FLUSH(numout    )
1152      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1153      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1154      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1155      !
1156      IF( cd1 == 'STOP' ) THEN
1157         WRITE(numout,*) 
1158         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1159         WRITE(numout,*) 
1160         CALL FLUSH(numout)
1161         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1162         CALL mppstop( ld_abort = .true. )
1163      ENDIF
1164      !
1165   END SUBROUTINE ctl_stop
1166
1167
1168   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1169      &                 cd6, cd7, cd8, cd9, cd10 )
1170      !!----------------------------------------------------------------------
1171      !!                  ***  ROUTINE  stop_warn  ***
1172      !!
1173      !! ** Purpose :   print in ocean.outpput file a error message and
1174      !!                increment the warning number (nwarn) by one.
1175      !!----------------------------------------------------------------------
1176      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1177      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1178      !!----------------------------------------------------------------------
1179      !
1180      nwarn = nwarn + 1
1181      !
1182      IF(lwp) THEN
1183                               WRITE(numout,*)
1184                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1185                               WRITE(numout,*)
1186                               WRITE(numout,*) '         ==============='
1187                               WRITE(numout,*)
1188         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1189         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1190         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1191         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1192         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1193         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1194         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1195         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1196         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1197         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1198                               WRITE(numout,*)
1199      ENDIF
1200      CALL FLUSH(numout)
1201      !
1202   END SUBROUTINE ctl_warn
1203
1204
1205   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1206      !!----------------------------------------------------------------------
1207      !!                  ***  ROUTINE ctl_opn  ***
1208      !!
1209      !! ** Purpose :   Open file and check if required file is available.
1210      !!
1211      !! ** Method  :   Fortan open
1212      !!----------------------------------------------------------------------
1213      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1214      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1215      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1216      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1217      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1218      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1219      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1220      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1221      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1222      !
1223      CHARACTER(len=80) ::   clfile
1224      CHARACTER(LEN=10) ::   clfmt            ! writing format
1225      INTEGER           ::   iost
1226      INTEGER           ::   idg  ! number of digits
1227      !!----------------------------------------------------------------------
1228      !
1229      ! adapt filename
1230      ! ----------------
1231      clfile = TRIM(cdfile)
1232      IF( PRESENT( karea ) ) THEN
1233         IF( karea > 1 ) THEN
1234            idg = MAX( INT(LOG10(REAL(jpnij-1,wp))) + 1, 4 )        ! how many digits to we need to write? min=4, max=9
1235            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg   ! '(a,a,ix.x)'
1236            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1237         ENDIF
1238      ENDIF
1239#if defined key_agrif
1240      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1241      knum=Agrif_Get_Unit()
1242#else
1243      knum=get_unit()
1244#endif
1245      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1246      !
1247      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1248         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1249      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1250         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1251      ELSE
1252         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1253      ENDIF
1254      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1255         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1256      IF( iost == 0 ) THEN
1257         IF(ldwp .AND. kout > 0) THEN
1258            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1259            WRITE(kout,*) '     unit   = ', knum
1260            WRITE(kout,*) '     status = ', cdstat
1261            WRITE(kout,*) '     form   = ', cdform
1262            WRITE(kout,*) '     access = ', cdacce
1263            WRITE(kout,*)
1264         ENDIF
1265      ENDIF
1266100   CONTINUE
1267      IF( iost /= 0 ) THEN
1268         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1269         WRITE(ctmp2,*) ' =======   ===  '
1270         WRITE(ctmp3,*) '           unit   = ', knum
1271         WRITE(ctmp4,*) '           status = ', cdstat
1272         WRITE(ctmp5,*) '           form   = ', cdform
1273         WRITE(ctmp6,*) '           access = ', cdacce
1274         WRITE(ctmp7,*) '           iostat = ', iost
1275         WRITE(ctmp8,*) '           we stop. verify the file '
1276         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1277      ENDIF
1278      !
1279   END SUBROUTINE ctl_opn
1280
1281
1282   SUBROUTINE ctl_nam ( kios, cdnam )
1283      !!----------------------------------------------------------------------
1284      !!                  ***  ROUTINE ctl_nam  ***
1285      !!
1286      !! ** Purpose :   Informations when error while reading a namelist
1287      !!
1288      !! ** Method  :   Fortan open
1289      !!----------------------------------------------------------------------
1290      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1291      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1292      !
1293      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1294      !!----------------------------------------------------------------------
1295      !
1296      WRITE (clios, '(I5.0)')   kios
1297      IF( kios < 0 ) THEN         
1298         CALL ctl_warn( 'end of record or file while reading namelist '   &
1299            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1300      ENDIF
1301      !
1302      IF( kios > 0 ) THEN
1303         CALL ctl_stop( 'misspelled variable in namelist '   &
1304            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1305      ENDIF
1306      kios = 0
1307      !
1308   END SUBROUTINE ctl_nam
1309
1310
1311   INTEGER FUNCTION get_unit()
1312      !!----------------------------------------------------------------------
1313      !!                  ***  FUNCTION  get_unit  ***
1314      !!
1315      !! ** Purpose :   return the index of an unused logical unit
1316      !!----------------------------------------------------------------------
1317      LOGICAL :: llopn
1318      !!----------------------------------------------------------------------
1319      !
1320      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1321      llopn = .TRUE.
1322      DO WHILE( (get_unit < 998) .AND. llopn )
1323         get_unit = get_unit + 1
1324         INQUIRE( unit = get_unit, opened = llopn )
1325      END DO
1326      IF( (get_unit == 999) .AND. llopn ) THEN
1327         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1328      ENDIF
1329      !
1330   END FUNCTION get_unit
1331
1332   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1333      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1334      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1335      CHARACTER(LEN=256)                           :: chline
1336      CHARACTER(LEN=1)                             :: csp
1337      INTEGER, INTENT(IN)                          :: kout
1338      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1339      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1340      !
1341      !csp = NEW_LINE('A')
1342      ! a new line character is the best seperator but some systems (e.g.Cray)
1343      ! seem to terminate namelist reads from internal files early if they
1344      ! encounter new-lines. Use a single space for safety.
1345      csp = ' '
1346      !
1347      ! Check if the namelist buffer has already been allocated. Return if it has.
1348      !
1349      IF ( ALLOCATED( cdnambuff ) ) RETURN
1350      IF( ldwp ) THEN
1351         !
1352         ! Open namelist file
1353         !
1354         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1355         !
1356         ! First pass: count characters excluding comments and trimable white space
1357         !
1358         itot=0
1359     10  READ(iun,'(A256)',END=20,ERR=20) chline
1360         iltc = LEN_TRIM(chline)
1361         IF ( iltc.GT.0 ) THEN
1362          inl = INDEX(chline, '!') 
1363          IF( inl.eq.0 ) THEN
1364           itot = itot + iltc + 1                                ! +1 for the newline character
1365          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1366           itot = itot + inl                                  !  includes +1 for the newline character
1367          ENDIF
1368         ENDIF
1369         GOTO 10
1370     20  CONTINUE
1371         !
1372         ! Allocate text cdnambuff for condensed namelist
1373         !
1374!$AGRIF_DO_NOT_TREAT
1375         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1376!$AGRIF_END_DO_NOT_TREAT
1377         itotsav = itot
1378         !
1379         ! Second pass: read and transfer pruned characters into cdnambuff
1380         !
1381         REWIND(iun)
1382         itot=1
1383     30  READ(iun,'(A256)',END=40,ERR=40) chline
1384         iltc = LEN_TRIM(chline)
1385         IF ( iltc.GT.0 ) THEN
1386          inl = INDEX(chline, '!')
1387          IF( inl.eq.0 ) THEN
1388           inl = iltc
1389          ELSE
1390           inl = inl - 1
1391          ENDIF
1392          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1393             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1394             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1395             itot = itot + inl + 1
1396          ENDIF
1397         ENDIF
1398         GOTO 30
1399     40  CONTINUE
1400         itot = itot - 1
1401         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1402         !
1403         ! Close namelist file
1404         !
1405         CLOSE(iun)
1406         !write(*,'(32A)') cdnambuff
1407      ENDIF
1408#if defined key_mpp_mpi
1409      CALL mpp_bcast_nml( cdnambuff, itot )
1410#endif
1411  END SUBROUTINE load_nml
1412
1413
1414   !!----------------------------------------------------------------------
1415END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.