source: NEMO/branches/2020/dev_r12512_HPC-04_mcastril_Mixed_Precision_implementation/src/OCE/LBC/lib_mpp.F90 @ 13135

Last change on this file since 13135 was 13135, checked in by orioltp, 3 months ago

dev_r12512_HPC-04_mcastril_Mixed_Precision_implementation: merge with trunk@13134, see #2364

  • Property svn:keywords set to Id
File size: 68.8 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
69   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
71   PUBLIC   mpp_report
72   PUBLIC   mpp_bcast_nml
73   PUBLIC   tic_tac
74#if ! defined key_mpp_mpi
75   PUBLIC MPI_Wtime
76#endif
77   
78   !! * Interfaces
79   !! define generic interface for these routine as they are called sometimes
80   !! with scalar arguments instead of array arguments, which causes problems
81   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
82   INTERFACE mpp_min
83      MODULE PROCEDURE mppmin_a_int, mppmin_int
84      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
85      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
86   END INTERFACE
87   INTERFACE mpp_max
88      MODULE PROCEDURE mppmax_a_int, mppmax_int
89      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
90      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
91   END INTERFACE
92   INTERFACE mpp_sum
93      MODULE PROCEDURE mppsum_a_int, mppsum_int
94      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
95      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
96      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
97   END INTERFACE
98   INTERFACE mpp_minloc
99      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
100      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
101   END INTERFACE
102   INTERFACE mpp_maxloc
103      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
104      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
105   END INTERFACE
106
107   !! ========================= !!
108   !!  MPI  variable definition !!
109   !! ========================= !!
110#if   defined key_mpp_mpi
111!$AGRIF_DO_NOT_TREAT
112   INCLUDE 'mpif.h'
113!$AGRIF_END_DO_NOT_TREAT
114   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
115#else   
116   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
117   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
118   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
119#endif
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Communications summary report
149   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
150   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
151   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
152   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
153   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
154   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
155   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
156   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
157   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
158   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
159   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
160   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
161   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
162   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
163   !: name (used as id) of allreduce-delayed operations
164   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
165   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
166   !: component name where the allreduce-delayed operation is performed
167   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
168   TYPE, PUBLIC ::   DELAYARR
169      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
170      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
171   END TYPE DELAYARR
172   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
173   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
174
175   ! timing summary report
176   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
177   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
178   
179   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
180
181   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
182   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
183   
184   !! * Substitutions
185#  include "do_loop_substitute.h90"
186   !!----------------------------------------------------------------------
187   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
188   !! $Id$
189   !! Software governed by the CeCILL license (see ./LICENSE)
190   !!----------------------------------------------------------------------
191CONTAINS
192
193   SUBROUTINE mpp_start( localComm )
194      !!----------------------------------------------------------------------
195      !!                  ***  routine mpp_start  ***
196      !!
197      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
198      !!----------------------------------------------------------------------
199      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
200      !
201      INTEGER ::   ierr
202      LOGICAL ::   llmpi_init
203      !!----------------------------------------------------------------------
204#if defined key_mpp_mpi
205      !
206      CALL mpi_initialized ( llmpi_init, ierr )
207      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
208
209      IF( .NOT. llmpi_init ) THEN
210         IF( PRESENT(localComm) ) THEN
211            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
212            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
213            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
214         ENDIF
215         CALL mpi_init( ierr )
216         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
217      ENDIF
218       
219      IF( PRESENT(localComm) ) THEN
220         IF( Agrif_Root() ) THEN
221            mpi_comm_oce = localComm
222         ENDIF
223      ELSE
224         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
225         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
226      ENDIF
227
228# if defined key_agrif
229      IF( Agrif_Root() ) THEN
230         CALL Agrif_MPI_Init(mpi_comm_oce)
231      ELSE
232         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
233      ENDIF
234# endif
235
236      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
237      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
238      !
239      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
240      !
241#else
242      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
243      mppsize = 1
244      mpprank = 0
245#endif
246   END SUBROUTINE mpp_start
247
248
249   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
250      !!----------------------------------------------------------------------
251      !!                  ***  routine mppsend  ***
252      !!
253      !! ** Purpose :   Send messag passing array
254      !!
255      !!----------------------------------------------------------------------
256      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
257      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
258      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
259      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
260      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
261      !!
262      INTEGER ::   iflag
263      !!----------------------------------------------------------------------
264      !
265#if defined key_mpp_mpi
266      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
267#endif
268      !
269   END SUBROUTINE mppsend
270
271
272   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
273      !!----------------------------------------------------------------------
274      !!                  ***  routine mppsend  ***
275      !!
276      !! ** Purpose :   Send messag passing array
277      !!
278      !!----------------------------------------------------------------------
279      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
280      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
281      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
282      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
283      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
284      !!
285      INTEGER ::   iflag
286      !!----------------------------------------------------------------------
287      !
288#if defined key_mpp_mpi
289      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
290#endif
291      !
292   END SUBROUTINE mppsend_dp
293
294
295   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
296      !!----------------------------------------------------------------------
297      !!                  ***  routine mppsend  ***
298      !!
299      !! ** Purpose :   Send messag passing array
300      !!
301      !!----------------------------------------------------------------------
302      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
303      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
304      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
305      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
306      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
307      !!
308      INTEGER ::   iflag
309      !!----------------------------------------------------------------------
310      !
311#if defined key_mpp_mpi
312      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
313#endif
314      !
315   END SUBROUTINE mppsend_sp
316
317
318   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
319      !!----------------------------------------------------------------------
320      !!                  ***  routine mpprecv  ***
321      !!
322      !! ** Purpose :   Receive messag passing array
323      !!
324      !!----------------------------------------------------------------------
325      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
326      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
327      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
328      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
329      !!
330      INTEGER :: istatus(mpi_status_size)
331      INTEGER :: iflag
332      INTEGER :: use_source
333      !!----------------------------------------------------------------------
334      !
335#if defined key_mpp_mpi
336      ! If a specific process number has been passed to the receive call,
337      ! use that one. Default is to use mpi_any_source
338      use_source = mpi_any_source
339      IF( PRESENT(ksource) )   use_source = ksource
340      !
341      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
342#endif
343      !
344   END SUBROUTINE mpprecv
345
346   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
347      !!----------------------------------------------------------------------
348      !!                  ***  routine mpprecv  ***
349      !!
350      !! ** Purpose :   Receive messag passing array
351      !!
352      !!----------------------------------------------------------------------
353      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
354      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
355      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
356      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
357      !!
358      INTEGER :: istatus(mpi_status_size)
359      INTEGER :: iflag
360      INTEGER :: use_source
361      !!----------------------------------------------------------------------
362      !
363#if defined key_mpp_mpi
364      ! If a specific process number has been passed to the receive call,
365      ! use that one. Default is to use mpi_any_source
366      use_source = mpi_any_source
367      IF( PRESENT(ksource) )   use_source = ksource
368      !
369      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
370#endif
371      !
372   END SUBROUTINE mpprecv_dp
373
374
375   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
376      !!----------------------------------------------------------------------
377      !!                  ***  routine mpprecv  ***
378      !!
379      !! ** Purpose :   Receive messag passing array
380      !!
381      !!----------------------------------------------------------------------
382      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
383      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
384      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
385      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
386      !!
387      INTEGER :: istatus(mpi_status_size)
388      INTEGER :: iflag
389      INTEGER :: use_source
390      !!----------------------------------------------------------------------
391      !
392#if defined key_mpp_mpi
393      ! If a specific process number has been passed to the receive call,
394      ! use that one. Default is to use mpi_any_source
395      use_source = mpi_any_source
396      IF( PRESENT(ksource) )   use_source = ksource
397      !
398      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
399#endif
400      !
401   END SUBROUTINE mpprecv_sp
402
403
404   SUBROUTINE mppgather( ptab, kp, pio )
405      !!----------------------------------------------------------------------
406      !!                   ***  routine mppgather  ***
407      !!
408      !! ** Purpose :   Transfert between a local subdomain array and a work
409      !!     array which is distributed following the vertical level.
410      !!
411      !!----------------------------------------------------------------------
412      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
413      INTEGER                           , INTENT(in   ) ::   kp     ! record length
414      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
415      !!
416      INTEGER :: itaille, ierror   ! temporary integer
417      !!---------------------------------------------------------------------
418      !
419      itaille = jpi * jpj
420#if defined key_mpp_mpi
421      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
422         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
423#else
424      pio(:,:,1) = ptab(:,:)
425#endif
426      !
427   END SUBROUTINE mppgather
428
429
430   SUBROUTINE mppscatter( pio, kp, ptab )
431      !!----------------------------------------------------------------------
432      !!                  ***  routine mppscatter  ***
433      !!
434      !! ** Purpose :   Transfert between awork array which is distributed
435      !!      following the vertical level and the local subdomain array.
436      !!
437      !!----------------------------------------------------------------------
438      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
439      INTEGER                             ::   kp     ! Tag (not used with MPI
440      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
441      !!
442      INTEGER :: itaille, ierror   ! temporary integer
443      !!---------------------------------------------------------------------
444      !
445      itaille = jpi * jpj
446      !
447#if defined key_mpp_mpi
448      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
449         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
450#else
451      ptab(:,:) = pio(:,:,1)
452#endif
453      !
454   END SUBROUTINE mppscatter
455
456   
457   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
458     !!----------------------------------------------------------------------
459      !!                   ***  routine mpp_delay_sum  ***
460      !!
461      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
462      !!
463      !!----------------------------------------------------------------------
464      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
465      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
466      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
467      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
468      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
469      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
470      !!
471      INTEGER ::   ji, isz
472      INTEGER ::   idvar
473      INTEGER ::   ierr, ilocalcomm
474      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
475      !!----------------------------------------------------------------------
476#if defined key_mpp_mpi
477      ilocalcomm = mpi_comm_oce
478      IF( PRESENT(kcom) )   ilocalcomm = kcom
479
480      isz = SIZE(y_in)
481     
482      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
483
484      idvar = -1
485      DO ji = 1, nbdelay
486         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
487      END DO
488      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
489
490      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
491         !                                       --------------------------
492         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
493            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
494            DEALLOCATE(todelay(idvar)%z1d)
495            ndelayid(idvar) = -1                                      ! do as if we had no restart
496         ELSE
497            ALLOCATE(todelay(idvar)%y1d(isz))
498            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
499         END IF
500      ENDIF
501     
502      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
503         !                                       --------------------------
504         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
505         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
506         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
507      ENDIF
508
509      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
510
511      ! send back pout from todelay(idvar)%z1d defined at previous call
512      pout(:) = todelay(idvar)%z1d(:)
513
514      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
515# if defined key_mpi2
516      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
517      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
518      ndelayid(idvar) = 1
519      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
520# else
521      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
522# endif
523#else
524      pout(:) = REAL(y_in(:), wp)
525#endif
526
527   END SUBROUTINE mpp_delay_sum
528
529   
530   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
531      !!----------------------------------------------------------------------
532      !!                   ***  routine mpp_delay_max  ***
533      !!
534      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
535      !!
536      !!----------------------------------------------------------------------
537      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
538      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
539      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
540      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
541      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
542      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
543      !!
544      INTEGER ::   ji, isz
545      INTEGER ::   idvar
546      INTEGER ::   ierr, ilocalcomm
547      INTEGER ::   MPI_TYPE
548      !!----------------------------------------------------------------------
549     
550#if defined key_mpp_mpi
551      if( wp == dp ) then
552         MPI_TYPE = MPI_DOUBLE_PRECISION
553      else if ( wp == sp ) then
554         MPI_TYPE = MPI_REAL
555      else
556        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
557   
558      end if
559
560      ilocalcomm = mpi_comm_oce
561      IF( PRESENT(kcom) )   ilocalcomm = kcom
562
563      isz = SIZE(p_in)
564
565      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
566
567      idvar = -1
568      DO ji = 1, nbdelay
569         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
570      END DO
571      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
572
573      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
574         !                                       --------------------------
575         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
576            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
577            DEALLOCATE(todelay(idvar)%z1d)
578            ndelayid(idvar) = -1                                      ! do as if we had no restart
579         END IF
580      ENDIF
581
582      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
583         !                                       --------------------------
584         ALLOCATE(todelay(idvar)%z1d(isz))
585         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
586      ENDIF
587
588      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
589
590      ! send back pout from todelay(idvar)%z1d defined at previous call
591      pout(:) = todelay(idvar)%z1d(:)
592
593      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
594# if defined key_mpi2
595      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
596      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
597      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
598# else
599      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
600# endif
601#else
602      pout(:) = p_in(:)
603#endif
604
605   END SUBROUTINE mpp_delay_max
606
607   
608   SUBROUTINE mpp_delay_rcv( kid )
609      !!----------------------------------------------------------------------
610      !!                   ***  routine mpp_delay_rcv  ***
611      !!
612      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
613      !!
614      !!----------------------------------------------------------------------
615      INTEGER,INTENT(in   )      ::  kid 
616      INTEGER ::   ierr
617      !!----------------------------------------------------------------------
618#if defined key_mpp_mpi
619      IF( ndelayid(kid) /= -2 ) THEN 
620#if ! defined key_mpi2
621         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
622         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
623         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
624#endif
625         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
626         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
627      ENDIF
628#endif
629   END SUBROUTINE mpp_delay_rcv
630
631   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
632      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
633      INTEGER                          , INTENT(INOUT) :: kleng
634      !!----------------------------------------------------------------------
635      !!                  ***  routine mpp_bcast_nml  ***
636      !!
637      !! ** Purpose :   broadcast namelist character buffer
638      !!
639      !!----------------------------------------------------------------------
640      !!
641      INTEGER ::   iflag
642      !!----------------------------------------------------------------------
643      !
644#if defined key_mpp_mpi
645      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
646      call MPI_BARRIER(mpi_comm_oce, iflag)
647!$AGRIF_DO_NOT_TREAT
648      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
649!$AGRIF_END_DO_NOT_TREAT
650      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
651      call MPI_BARRIER(mpi_comm_oce, iflag)
652#endif
653      !
654   END SUBROUTINE mpp_bcast_nml
655
656   
657   !!----------------------------------------------------------------------
658   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
659   !!   
660   !!----------------------------------------------------------------------
661   !!
662#  define OPERATION_MAX
663#  define INTEGER_TYPE
664#  define DIM_0d
665#     define ROUTINE_ALLREDUCE           mppmax_int
666#     include "mpp_allreduce_generic.h90"
667#     undef ROUTINE_ALLREDUCE
668#  undef DIM_0d
669#  define DIM_1d
670#     define ROUTINE_ALLREDUCE           mppmax_a_int
671#     include "mpp_allreduce_generic.h90"
672#     undef ROUTINE_ALLREDUCE
673#  undef DIM_1d
674#  undef INTEGER_TYPE
675!
676   !!
677   !!   ----   SINGLE PRECISION VERSIONS
678   !!
679#  define SINGLE_PRECISION
680#  define REAL_TYPE
681#  define DIM_0d
682#     define ROUTINE_ALLREDUCE           mppmax_real_sp
683#     include "mpp_allreduce_generic.h90"
684#     undef ROUTINE_ALLREDUCE
685#  undef DIM_0d
686#  define DIM_1d
687#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
688#     include "mpp_allreduce_generic.h90"
689#     undef ROUTINE_ALLREDUCE
690#  undef DIM_1d
691#  undef SINGLE_PRECISION
692   !!
693   !!
694   !!   ----   DOUBLE PRECISION VERSIONS
695   !!
696!
697#  define DIM_0d
698#     define ROUTINE_ALLREDUCE           mppmax_real_dp
699#     include "mpp_allreduce_generic.h90"
700#     undef ROUTINE_ALLREDUCE
701#  undef DIM_0d
702#  define DIM_1d
703#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
704#     include "mpp_allreduce_generic.h90"
705#     undef ROUTINE_ALLREDUCE
706#  undef DIM_1d
707#  undef REAL_TYPE
708#  undef OPERATION_MAX
709   !!----------------------------------------------------------------------
710   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
711   !!   
712   !!----------------------------------------------------------------------
713   !!
714#  define OPERATION_MIN
715#  define INTEGER_TYPE
716#  define DIM_0d
717#     define ROUTINE_ALLREDUCE           mppmin_int
718#     include "mpp_allreduce_generic.h90"
719#     undef ROUTINE_ALLREDUCE
720#  undef DIM_0d
721#  define DIM_1d
722#     define ROUTINE_ALLREDUCE           mppmin_a_int
723#     include "mpp_allreduce_generic.h90"
724#     undef ROUTINE_ALLREDUCE
725#  undef DIM_1d
726#  undef INTEGER_TYPE
727!
728   !!
729   !!   ----   SINGLE PRECISION VERSIONS
730   !!
731#  define SINGLE_PRECISION
732#  define REAL_TYPE
733#  define DIM_0d
734#     define ROUTINE_ALLREDUCE           mppmin_real_sp
735#     include "mpp_allreduce_generic.h90"
736#     undef ROUTINE_ALLREDUCE
737#  undef DIM_0d
738#  define DIM_1d
739#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
740#     include "mpp_allreduce_generic.h90"
741#     undef ROUTINE_ALLREDUCE
742#  undef DIM_1d
743#  undef SINGLE_PRECISION
744   !!
745   !!   ----   DOUBLE PRECISION VERSIONS
746   !!
747
748#  define DIM_0d
749#     define ROUTINE_ALLREDUCE           mppmin_real_dp
750#     include "mpp_allreduce_generic.h90"
751#     undef ROUTINE_ALLREDUCE
752#  undef DIM_0d
753#  define DIM_1d
754#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
755#     include "mpp_allreduce_generic.h90"
756#     undef ROUTINE_ALLREDUCE
757#  undef DIM_1d
758#  undef REAL_TYPE
759#  undef OPERATION_MIN
760
761   !!----------------------------------------------------------------------
762   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
763   !!   
764   !!   Global sum of 1D array or a variable (integer, real or complex)
765   !!----------------------------------------------------------------------
766   !!
767#  define OPERATION_SUM
768#  define INTEGER_TYPE
769#  define DIM_0d
770#     define ROUTINE_ALLREDUCE           mppsum_int
771#     include "mpp_allreduce_generic.h90"
772#     undef ROUTINE_ALLREDUCE
773#  undef DIM_0d
774#  define DIM_1d
775#     define ROUTINE_ALLREDUCE           mppsum_a_int
776#     include "mpp_allreduce_generic.h90"
777#     undef ROUTINE_ALLREDUCE
778#  undef DIM_1d
779#  undef INTEGER_TYPE
780
781   !!
782   !!   ----   SINGLE PRECISION VERSIONS
783   !!
784#  define OPERATION_SUM
785#  define SINGLE_PRECISION
786#  define REAL_TYPE
787#  define DIM_0d
788#     define ROUTINE_ALLREDUCE           mppsum_real_sp
789#     include "mpp_allreduce_generic.h90"
790#     undef ROUTINE_ALLREDUCE
791#  undef DIM_0d
792#  define DIM_1d
793#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
794#     include "mpp_allreduce_generic.h90"
795#     undef ROUTINE_ALLREDUCE
796#  undef DIM_1d
797#  undef REAL_TYPE
798#  undef OPERATION_SUM
799
800#  undef SINGLE_PRECISION
801
802   !!
803   !!   ----   DOUBLE PRECISION VERSIONS
804   !!
805#  define OPERATION_SUM
806#  define REAL_TYPE
807#  define DIM_0d
808#     define ROUTINE_ALLREDUCE           mppsum_real_dp
809#     include "mpp_allreduce_generic.h90"
810#     undef ROUTINE_ALLREDUCE
811#  undef DIM_0d
812#  define DIM_1d
813#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
814#     include "mpp_allreduce_generic.h90"
815#     undef ROUTINE_ALLREDUCE
816#  undef DIM_1d
817#  undef REAL_TYPE
818#  undef OPERATION_SUM
819
820#  define OPERATION_SUM_DD
821#  define COMPLEX_TYPE
822#  define DIM_0d
823#     define ROUTINE_ALLREDUCE           mppsum_realdd
824#     include "mpp_allreduce_generic.h90"
825#     undef ROUTINE_ALLREDUCE
826#  undef DIM_0d
827#  define DIM_1d
828#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
829#     include "mpp_allreduce_generic.h90"
830#     undef ROUTINE_ALLREDUCE
831#  undef DIM_1d
832#  undef COMPLEX_TYPE
833#  undef OPERATION_SUM_DD
834
835   !!----------------------------------------------------------------------
836   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
837   !!   
838   !!----------------------------------------------------------------------
839   !!
840   !!
841   !!   ----   SINGLE PRECISION VERSIONS
842   !!
843#  define SINGLE_PRECISION
844#  define OPERATION_MINLOC
845#  define DIM_2d
846#     define ROUTINE_LOC           mpp_minloc2d_sp
847#     include "mpp_loc_generic.h90"
848#     undef ROUTINE_LOC
849#  undef DIM_2d
850#  define DIM_3d
851#     define ROUTINE_LOC           mpp_minloc3d_sp
852#     include "mpp_loc_generic.h90"
853#     undef ROUTINE_LOC
854#  undef DIM_3d
855#  undef OPERATION_MINLOC
856
857#  define OPERATION_MAXLOC
858#  define DIM_2d
859#     define ROUTINE_LOC           mpp_maxloc2d_sp
860#     include "mpp_loc_generic.h90"
861#     undef ROUTINE_LOC
862#  undef DIM_2d
863#  define DIM_3d
864#     define ROUTINE_LOC           mpp_maxloc3d_sp
865#     include "mpp_loc_generic.h90"
866#     undef ROUTINE_LOC
867#  undef DIM_3d
868#  undef OPERATION_MAXLOC
869#  undef SINGLE_PRECISION
870   !!
871   !!   ----   DOUBLE PRECISION VERSIONS
872   !!
873#  define OPERATION_MINLOC
874#  define DIM_2d
875#     define ROUTINE_LOC           mpp_minloc2d_dp
876#     include "mpp_loc_generic.h90"
877#     undef ROUTINE_LOC
878#  undef DIM_2d
879#  define DIM_3d
880#     define ROUTINE_LOC           mpp_minloc3d_dp
881#     include "mpp_loc_generic.h90"
882#     undef ROUTINE_LOC
883#  undef DIM_3d
884#  undef OPERATION_MINLOC
885
886#  define OPERATION_MAXLOC
887#  define DIM_2d
888#     define ROUTINE_LOC           mpp_maxloc2d_dp
889#     include "mpp_loc_generic.h90"
890#     undef ROUTINE_LOC
891#  undef DIM_2d
892#  define DIM_3d
893#     define ROUTINE_LOC           mpp_maxloc3d_dp
894#     include "mpp_loc_generic.h90"
895#     undef ROUTINE_LOC
896#  undef DIM_3d
897#  undef OPERATION_MAXLOC
898
899
900   SUBROUTINE mppsync()
901      !!----------------------------------------------------------------------
902      !!                  ***  routine mppsync  ***
903      !!
904      !! ** Purpose :   Massively parallel processors, synchroneous
905      !!
906      !!-----------------------------------------------------------------------
907      INTEGER :: ierror
908      !!-----------------------------------------------------------------------
909      !
910#if defined key_mpp_mpi
911      CALL mpi_barrier( mpi_comm_oce, ierror )
912#endif
913      !
914   END SUBROUTINE mppsync
915
916
917   SUBROUTINE mppstop( ld_abort ) 
918      !!----------------------------------------------------------------------
919      !!                  ***  routine mppstop  ***
920      !!
921      !! ** purpose :   Stop massively parallel processors method
922      !!
923      !!----------------------------------------------------------------------
924      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
925      LOGICAL ::   ll_abort
926      INTEGER ::   info
927      !!----------------------------------------------------------------------
928      ll_abort = .FALSE.
929      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
930      !
931#if defined key_mpp_mpi
932      IF(ll_abort) THEN
933         CALL mpi_abort( MPI_COMM_WORLD )
934      ELSE
935         CALL mppsync
936         CALL mpi_finalize( info )
937      ENDIF
938#endif
939      IF( ll_abort ) STOP 123
940      !
941   END SUBROUTINE mppstop
942
943
944   SUBROUTINE mpp_comm_free( kcom )
945      !!----------------------------------------------------------------------
946      INTEGER, INTENT(in) ::   kcom
947      !!
948      INTEGER :: ierr
949      !!----------------------------------------------------------------------
950      !
951#if defined key_mpp_mpi
952      CALL MPI_COMM_FREE(kcom, ierr)
953#endif
954      !
955   END SUBROUTINE mpp_comm_free
956
957
958   SUBROUTINE mpp_ini_znl( kumout )
959      !!----------------------------------------------------------------------
960      !!               ***  routine mpp_ini_znl  ***
961      !!
962      !! ** Purpose :   Initialize special communicator for computing zonal sum
963      !!
964      !! ** Method  : - Look for processors in the same row
965      !!              - Put their number in nrank_znl
966      !!              - Create group for the znl processors
967      !!              - Create a communicator for znl processors
968      !!              - Determine if processor should write znl files
969      !!
970      !! ** output
971      !!      ndim_rank_znl = number of processors on the same row
972      !!      ngrp_znl = group ID for the znl processors
973      !!      ncomm_znl = communicator for the ice procs.
974      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
975      !!
976      !!----------------------------------------------------------------------
977      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
978      !
979      INTEGER :: jproc      ! dummy loop integer
980      INTEGER :: ierr, ii   ! local integer
981      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
982      !!----------------------------------------------------------------------
983#if defined key_mpp_mpi
984      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
985      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
986      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
987      !
988      ALLOCATE( kwork(jpnij), STAT=ierr )
989      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
990
991      IF( jpnj == 1 ) THEN
992         ngrp_znl  = ngrp_world
993         ncomm_znl = mpi_comm_oce
994      ELSE
995         !
996         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
997         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
998         !-$$        CALL flush(numout)
999         !
1000         ! Count number of processors on the same row
1001         ndim_rank_znl = 0
1002         DO jproc=1,jpnij
1003            IF ( kwork(jproc) == njmpp ) THEN
1004               ndim_rank_znl = ndim_rank_znl + 1
1005            ENDIF
1006         END DO
1007         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1008         !-$$        CALL flush(numout)
1009         ! Allocate the right size to nrank_znl
1010         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1011         ALLOCATE(nrank_znl(ndim_rank_znl))
1012         ii = 0
1013         nrank_znl (:) = 0
1014         DO jproc=1,jpnij
1015            IF ( kwork(jproc) == njmpp) THEN
1016               ii = ii + 1
1017               nrank_znl(ii) = jproc -1
1018            ENDIF
1019         END DO
1020         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1021         !-$$        CALL flush(numout)
1022
1023         ! Create the opa group
1024         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1025         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1026         !-$$        CALL flush(numout)
1027
1028         ! Create the znl group from the opa group
1029         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1030         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1031         !-$$        CALL flush(numout)
1032
1033         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1034         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1035         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1036         !-$$        CALL flush(numout)
1037         !
1038      END IF
1039
1040      ! Determines if processor if the first (starting from i=1) on the row
1041      IF ( jpni == 1 ) THEN
1042         l_znl_root = .TRUE.
1043      ELSE
1044         l_znl_root = .FALSE.
1045         kwork (1) = nimpp
1046         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1047         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1048      END IF
1049
1050      DEALLOCATE(kwork)
1051#endif
1052
1053   END SUBROUTINE mpp_ini_znl
1054
1055
1056   SUBROUTINE mpp_ini_north
1057      !!----------------------------------------------------------------------
1058      !!               ***  routine mpp_ini_north  ***
1059      !!
1060      !! ** Purpose :   Initialize special communicator for north folding
1061      !!      condition together with global variables needed in the mpp folding
1062      !!
1063      !! ** Method  : - Look for northern processors
1064      !!              - Put their number in nrank_north
1065      !!              - Create groups for the world processors and the north processors
1066      !!              - Create a communicator for northern processors
1067      !!
1068      !! ** output
1069      !!      njmppmax = njmpp for northern procs
1070      !!      ndim_rank_north = number of processors in the northern line
1071      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1072      !!      ngrp_world = group ID for the world processors
1073      !!      ngrp_north = group ID for the northern processors
1074      !!      ncomm_north = communicator for the northern procs.
1075      !!      north_root = number (in the world) of proc 0 in the northern comm.
1076      !!
1077      !!----------------------------------------------------------------------
1078      INTEGER ::   ierr
1079      INTEGER ::   jjproc
1080      INTEGER ::   ii, ji
1081      !!----------------------------------------------------------------------
1082      !
1083#if defined key_mpp_mpi
1084      njmppmax = MAXVAL( njmppt )
1085      !
1086      ! Look for how many procs on the northern boundary
1087      ndim_rank_north = 0
1088      DO jjproc = 1, jpnij
1089         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1090      END DO
1091      !
1092      ! Allocate the right size to nrank_north
1093      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1094      ALLOCATE( nrank_north(ndim_rank_north) )
1095
1096      ! Fill the nrank_north array with proc. number of northern procs.
1097      ! Note : the rank start at 0 in MPI
1098      ii = 0
1099      DO ji = 1, jpnij
1100         IF ( njmppt(ji) == njmppmax   ) THEN
1101            ii=ii+1
1102            nrank_north(ii)=ji-1
1103         END IF
1104      END DO
1105      !
1106      ! create the world group
1107      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1108      !
1109      ! Create the North group from the world group
1110      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1111      !
1112      ! Create the North communicator , ie the pool of procs in the north group
1113      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1114      !
1115#endif
1116   END SUBROUTINE mpp_ini_north
1117
1118
1119   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1120      !!---------------------------------------------------------------------
1121      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1122      !!
1123      !!   Modification of original codes written by David H. Bailey
1124      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1125      !!---------------------------------------------------------------------
1126      INTEGER                     , INTENT(in)    ::   ilen, itype
1127      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1128      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1129      !
1130      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1131      INTEGER  :: ji, ztmp           ! local scalar
1132      !!---------------------------------------------------------------------
1133      !
1134      ztmp = itype   ! avoid compilation warning
1135      !
1136      DO ji=1,ilen
1137      ! Compute ydda + yddb using Knuth's trick.
1138         zt1  = real(ydda(ji)) + real(yddb(ji))
1139         zerr = zt1 - real(ydda(ji))
1140         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1141                + aimag(ydda(ji)) + aimag(yddb(ji))
1142
1143         ! The result is zt1 + zt2, after normalization.
1144         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1145      END DO
1146      !
1147   END SUBROUTINE DDPDD_MPI
1148
1149
1150   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1151      !!----------------------------------------------------------------------
1152      !!                  ***  routine mpp_report  ***
1153      !!
1154      !! ** Purpose :   report use of mpp routines per time-setp
1155      !!
1156      !!----------------------------------------------------------------------
1157      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1158      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1159      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1160      !!
1161      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1162      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1163      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1164      !!----------------------------------------------------------------------
1165#if defined key_mpp_mpi
1166      !
1167      ll_lbc = .FALSE.
1168      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1169      ll_glb = .FALSE.
1170      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1171      ll_dlg = .FALSE.
1172      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1173      !
1174      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1175      ncom_freq = ncom_fsbc
1176      !
1177      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1178         IF( ll_lbc ) THEN
1179            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1180            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1181            n_sequence_lbc = n_sequence_lbc + 1
1182            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1183            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1184            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1185            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1186         ENDIF
1187         IF( ll_glb ) THEN
1188            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1189            n_sequence_glb = n_sequence_glb + 1
1190            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1191            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1192         ENDIF
1193         IF( ll_dlg ) THEN
1194            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1195            n_sequence_dlg = n_sequence_dlg + 1
1196            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1197            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1198         ENDIF
1199      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1200         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1201         WRITE(numcom,*) ' '
1202         WRITE(numcom,*) ' ------------------------------------------------------------'
1203         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1204         WRITE(numcom,*) ' ------------------------------------------------------------'
1205         WRITE(numcom,*) ' '
1206         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1207         jj = 0; jk = 0; jf = 0; jh = 0
1208         DO ji = 1, n_sequence_lbc
1209            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1210            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1211            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1212            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1213         END DO
1214         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1215         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1216         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1217         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1218         WRITE(numcom,*) ' '
1219         WRITE(numcom,*) ' lbc_lnk called'
1220         DO ji = 1, n_sequence_lbc - 1
1221            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1222               ccountname = crname_lbc(ji)
1223               crname_lbc(ji) = 'already counted'
1224               jcount = 1
1225               DO jj = ji + 1, n_sequence_lbc
1226                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1227                     jcount = jcount + 1
1228                     crname_lbc(jj) = 'already counted'
1229                  END IF
1230               END DO
1231               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1232            END IF
1233         END DO
1234         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1235            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1236         END IF
1237         WRITE(numcom,*) ' '
1238         IF ( n_sequence_glb > 0 ) THEN
1239            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1240            jj = 1
1241            DO ji = 2, n_sequence_glb
1242               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1243                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1244                  jj = 0
1245               END IF
1246               jj = jj + 1 
1247            END DO
1248            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1249            DEALLOCATE(crname_glb)
1250         ELSE
1251            WRITE(numcom,*) ' No MPI global communication '
1252         ENDIF
1253         WRITE(numcom,*) ' '
1254         IF ( n_sequence_dlg > 0 ) THEN
1255            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1256            jj = 1
1257            DO ji = 2, n_sequence_dlg
1258               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1259                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1260                  jj = 0
1261               END IF
1262               jj = jj + 1 
1263            END DO
1264            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1265            DEALLOCATE(crname_dlg)
1266         ELSE
1267            WRITE(numcom,*) ' No MPI delayed global communication '
1268         ENDIF
1269         WRITE(numcom,*) ' '
1270         WRITE(numcom,*) ' -----------------------------------------------'
1271         WRITE(numcom,*) ' '
1272         DEALLOCATE(ncomm_sequence)
1273         DEALLOCATE(crname_lbc)
1274      ENDIF
1275#endif
1276   END SUBROUTINE mpp_report
1277
1278   
1279   SUBROUTINE tic_tac (ld_tic, ld_global)
1280
1281    LOGICAL,           INTENT(IN) :: ld_tic
1282    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1283    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1284    REAL(dp),               SAVE :: tic_ct = 0._dp
1285    INTEGER :: ii
1286#if defined key_mpp_mpi
1287
1288    IF( ncom_stp <= nit000 ) RETURN
1289    IF( ncom_stp == nitend ) RETURN
1290    ii = 1
1291    IF( PRESENT( ld_global ) ) THEN
1292       IF( ld_global ) ii = 2
1293    END IF
1294   
1295    IF ( ld_tic ) THEN
1296       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1297       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1298    ELSE
1299       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1300       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1301    ENDIF
1302#endif
1303   
1304   END SUBROUTINE tic_tac
1305
1306#if ! defined key_mpp_mpi
1307   SUBROUTINE mpi_wait(request, status, ierror)
1308      INTEGER                            , INTENT(in   ) ::   request
1309      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1310      INTEGER                            , INTENT(  out) ::   ierror
1311   END SUBROUTINE mpi_wait
1312
1313   
1314   FUNCTION MPI_Wtime()
1315      REAL(wp) ::  MPI_Wtime
1316      MPI_Wtime = -1.
1317   END FUNCTION MPI_Wtime
1318#endif
1319
1320   !!----------------------------------------------------------------------
1321   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1322   !!----------------------------------------------------------------------
1323
1324   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1325      &                 cd6, cd7, cd8, cd9, cd10 )
1326      !!----------------------------------------------------------------------
1327      !!                  ***  ROUTINE  stop_opa  ***
1328      !!
1329      !! ** Purpose :   print in ocean.outpput file a error message and
1330      !!                increment the error number (nstop) by one.
1331      !!----------------------------------------------------------------------
1332      CHARACTER(len=*), INTENT(in   )           ::   cd1
1333      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1334      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1335      !
1336      CHARACTER(LEN=8) ::   clfmt            ! writing format
1337      INTEGER          ::   inum
1338      !!----------------------------------------------------------------------
1339      !
1340      nstop = nstop + 1
1341      !
1342      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1343         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1344         WRITE(inum,*)
1345         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1346         CLOSE(inum)
1347      ENDIF
1348      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1349         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1350      ENDIF
1351      !
1352                            WRITE(numout,*)
1353                            WRITE(numout,*) ' ===>>> : E R R O R'
1354                            WRITE(numout,*)
1355                            WRITE(numout,*) '         ==========='
1356                            WRITE(numout,*)
1357                            WRITE(numout,*) TRIM(cd1)
1358      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1359      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1360      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1361      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1362      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1363      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1364      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1365      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1366      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1367                            WRITE(numout,*)
1368      !
1369                               CALL FLUSH(numout    )
1370      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1371      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1372      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1373      !
1374      IF( cd1 == 'STOP' ) THEN
1375         WRITE(numout,*) 
1376         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1377         WRITE(numout,*) 
1378         CALL FLUSH(numout)
1379         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1380         CALL mppstop( ld_abort = .true. )
1381      ENDIF
1382      !
1383   END SUBROUTINE ctl_stop
1384
1385
1386   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1387      &                 cd6, cd7, cd8, cd9, cd10 )
1388      !!----------------------------------------------------------------------
1389      !!                  ***  ROUTINE  stop_warn  ***
1390      !!
1391      !! ** Purpose :   print in ocean.outpput file a error message and
1392      !!                increment the warning number (nwarn) by one.
1393      !!----------------------------------------------------------------------
1394      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1395      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1396      !!----------------------------------------------------------------------
1397      !
1398      nwarn = nwarn + 1
1399      !
1400      IF(lwp) THEN
1401                               WRITE(numout,*)
1402                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1403                               WRITE(numout,*)
1404                               WRITE(numout,*) '         ==============='
1405                               WRITE(numout,*)
1406         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1407         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1408         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1409         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1410         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1411         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1412         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1413         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1414         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1415         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1416                               WRITE(numout,*)
1417      ENDIF
1418      CALL FLUSH(numout)
1419      !
1420   END SUBROUTINE ctl_warn
1421
1422
1423   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1424      !!----------------------------------------------------------------------
1425      !!                  ***  ROUTINE ctl_opn  ***
1426      !!
1427      !! ** Purpose :   Open file and check if required file is available.
1428      !!
1429      !! ** Method  :   Fortan open
1430      !!----------------------------------------------------------------------
1431      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1432      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1433      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1434      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1435      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1436      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1437      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1438      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1439      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1440      !
1441      CHARACTER(len=80) ::   clfile
1442      CHARACTER(LEN=10) ::   clfmt            ! writing format
1443      INTEGER           ::   iost
1444      INTEGER           ::   idg              ! number of digits
1445      !!----------------------------------------------------------------------
1446      !
1447      ! adapt filename
1448      ! ----------------
1449      clfile = TRIM(cdfile)
1450      IF( PRESENT( karea ) ) THEN
1451         IF( karea > 1 ) THEN
1452            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1453            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1454            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1455            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1456         ENDIF
1457      ENDIF
1458#if defined key_agrif
1459      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1460      knum=Agrif_Get_Unit()
1461#else
1462      knum=get_unit()
1463#endif
1464      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1465      !
1466      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1467         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1468      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1469         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1470      ELSE
1471         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1472      ENDIF
1473      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1474         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1475      IF( iost == 0 ) THEN
1476         IF(ldwp .AND. kout > 0) THEN
1477            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1478            WRITE(kout,*) '     unit   = ', knum
1479            WRITE(kout,*) '     status = ', cdstat
1480            WRITE(kout,*) '     form   = ', cdform
1481            WRITE(kout,*) '     access = ', cdacce
1482            WRITE(kout,*)
1483         ENDIF
1484      ENDIF
1485100   CONTINUE
1486      IF( iost /= 0 ) THEN
1487         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1488         WRITE(ctmp2,*) ' =======   ===  '
1489         WRITE(ctmp3,*) '           unit   = ', knum
1490         WRITE(ctmp4,*) '           status = ', cdstat
1491         WRITE(ctmp5,*) '           form   = ', cdform
1492         WRITE(ctmp6,*) '           access = ', cdacce
1493         WRITE(ctmp7,*) '           iostat = ', iost
1494         WRITE(ctmp8,*) '           we stop. verify the file '
1495         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1496      ENDIF
1497      !
1498   END SUBROUTINE ctl_opn
1499
1500
1501   SUBROUTINE ctl_nam ( kios, cdnam )
1502      !!----------------------------------------------------------------------
1503      !!                  ***  ROUTINE ctl_nam  ***
1504      !!
1505      !! ** Purpose :   Informations when error while reading a namelist
1506      !!
1507      !! ** Method  :   Fortan open
1508      !!----------------------------------------------------------------------
1509      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1510      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1511      !
1512      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1513      !!----------------------------------------------------------------------
1514      !
1515      WRITE (clios, '(I5.0)')   kios
1516      IF( kios < 0 ) THEN         
1517         CALL ctl_warn( 'end of record or file while reading namelist '   &
1518            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1519      ENDIF
1520      !
1521      IF( kios > 0 ) THEN
1522         CALL ctl_stop( 'misspelled variable in namelist '   &
1523            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1524      ENDIF
1525      kios = 0
1526      !
1527   END SUBROUTINE ctl_nam
1528
1529
1530   INTEGER FUNCTION get_unit()
1531      !!----------------------------------------------------------------------
1532      !!                  ***  FUNCTION  get_unit  ***
1533      !!
1534      !! ** Purpose :   return the index of an unused logical unit
1535      !!----------------------------------------------------------------------
1536      LOGICAL :: llopn
1537      !!----------------------------------------------------------------------
1538      !
1539      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1540      llopn = .TRUE.
1541      DO WHILE( (get_unit < 998) .AND. llopn )
1542         get_unit = get_unit + 1
1543         INQUIRE( unit = get_unit, opened = llopn )
1544      END DO
1545      IF( (get_unit == 999) .AND. llopn ) THEN
1546         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1547      ENDIF
1548      !
1549   END FUNCTION get_unit
1550
1551   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1552      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1553      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1554      CHARACTER(LEN=256)                           :: chline
1555      CHARACTER(LEN=1)                             :: csp
1556      INTEGER, INTENT(IN)                          :: kout
1557      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1558      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1559      !
1560      !csp = NEW_LINE('A')
1561      ! a new line character is the best seperator but some systems (e.g.Cray)
1562      ! seem to terminate namelist reads from internal files early if they
1563      ! encounter new-lines. Use a single space for safety.
1564      csp = ' '
1565      !
1566      ! Check if the namelist buffer has already been allocated. Return if it has.
1567      !
1568      IF ( ALLOCATED( cdnambuff ) ) RETURN
1569      IF( ldwp ) THEN
1570         !
1571         ! Open namelist file
1572         !
1573         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1574         !
1575         ! First pass: count characters excluding comments and trimable white space
1576         !
1577         itot=0
1578     10  READ(iun,'(A256)',END=20,ERR=20) chline
1579         iltc = LEN_TRIM(chline)
1580         IF ( iltc.GT.0 ) THEN
1581          inl = INDEX(chline, '!') 
1582          IF( inl.eq.0 ) THEN
1583           itot = itot + iltc + 1                                ! +1 for the newline character
1584          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1585           itot = itot + inl                                  !  includes +1 for the newline character
1586          ENDIF
1587         ENDIF
1588         GOTO 10
1589     20  CONTINUE
1590         !
1591         ! Allocate text cdnambuff for condensed namelist
1592         !
1593!$AGRIF_DO_NOT_TREAT
1594         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1595!$AGRIF_END_DO_NOT_TREAT
1596         itotsav = itot
1597         !
1598         ! Second pass: read and transfer pruned characters into cdnambuff
1599         !
1600         REWIND(iun)
1601         itot=1
1602     30  READ(iun,'(A256)',END=40,ERR=40) chline
1603         iltc = LEN_TRIM(chline)
1604         IF ( iltc.GT.0 ) THEN
1605          inl = INDEX(chline, '!')
1606          IF( inl.eq.0 ) THEN
1607           inl = iltc
1608          ELSE
1609           inl = inl - 1
1610          ENDIF
1611          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1612             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1613             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1614             itot = itot + inl + 1
1615          ENDIF
1616         ENDIF
1617         GOTO 30
1618     40  CONTINUE
1619         itot = itot - 1
1620         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1621         !
1622         ! Close namelist file
1623         !
1624         CLOSE(iun)
1625         !write(*,'(32A)') cdnambuff
1626      ENDIF
1627#if defined key_mpp_mpi
1628      CALL mpp_bcast_nml( cdnambuff, itot )
1629#endif
1630  END SUBROUTINE load_nml
1631
1632
1633   !!----------------------------------------------------------------------
1634END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.