New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2021/dev_r14312_MPI_Interface/src/OCE/LBC – NEMO

source: NEMO/branches/2021/dev_r14312_MPI_Interface/src/OCE/LBC/lib_mpp.F90 @ 14314

Last change on this file since 14314 was 14314, checked in by smasson, 3 years ago

dev_r14312_MPI_Interface: first implementation, #2598

  • Property svn:keywords set to Id
File size: 71.8 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   load_nml      : Read, condense and buffer namelist file into character array for use as an internal file
35   !!----------------------------------------------------------------------
36   !!----------------------------------------------------------------------
37   !!   mpp_start     : get local communicator its size and rank
38   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
39   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
40   !!   mpprecv       :
41   !!   mppsend       :
42   !!   mppscatter    :
43   !!   mppgather     :
44   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
45   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
46   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
47   !!   mpp_minloc    :
48   !!   mpp_maxloc    :
49   !!   mppsync       :
50   !!   mppstop       :
51   !!   mpp_ini_north : initialisation of north fold
52   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
53   !!   mpp_bcast_nml : broadcast/receive namelist character buffer from reading process to all others
54   !!----------------------------------------------------------------------
55   USE dom_oce        ! ocean space and time domain
56   USE in_out_manager ! I/O manager
57
58   IMPLICIT NONE
59   PRIVATE
60   !
61   PUBLIC   ctl_stop, ctl_warn, ctl_opn, ctl_nam, load_nml
62   PUBLIC   mpp_start, mppstop, mppsync, mpp_comm_free
63   PUBLIC   mpp_ini_north
64   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
65   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
66   PUBLIC   mppscatter, mppgather
67   PUBLIC   mpp_ini_znl
68   PUBLIC   mpp_ini_nc
69   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
70   PUBLIC   mppsend_sp, mpprecv_sp                          ! needed by TAM and ICB routines
71   PUBLIC   mppsend_dp, mpprecv_dp                          ! needed by TAM and ICB routines
72   PUBLIC   mpp_report
73   PUBLIC   mpp_bcast_nml
74   PUBLIC   tic_tac
75#if defined key_mpp_off
76   PUBLIC MPI_wait
77   PUBLIC MPI_Wtime
78#endif
79
80   !! * Interfaces
81   !! define generic interface for these routine as they are called sometimes
82   !! with scalar arguments instead of array arguments, which causes problems
83   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
84   INTERFACE mpp_min
85      MODULE PROCEDURE mppmin_a_int, mppmin_int
86      MODULE PROCEDURE mppmin_a_real_sp, mppmin_real_sp
87      MODULE PROCEDURE mppmin_a_real_dp, mppmin_real_dp
88   END INTERFACE
89   INTERFACE mpp_max
90      MODULE PROCEDURE mppmax_a_int, mppmax_int
91      MODULE PROCEDURE mppmax_a_real_sp, mppmax_real_sp
92      MODULE PROCEDURE mppmax_a_real_dp, mppmax_real_dp
93   END INTERFACE
94   INTERFACE mpp_sum
95      MODULE PROCEDURE mppsum_a_int, mppsum_int
96      MODULE PROCEDURE mppsum_realdd, mppsum_a_realdd
97      MODULE PROCEDURE mppsum_a_real_sp, mppsum_real_sp
98      MODULE PROCEDURE mppsum_a_real_dp, mppsum_real_dp
99   END INTERFACE
100   INTERFACE mpp_minloc
101      MODULE PROCEDURE mpp_minloc2d_sp ,mpp_minloc3d_sp
102      MODULE PROCEDURE mpp_minloc2d_dp ,mpp_minloc3d_dp
103   END INTERFACE
104   INTERFACE mpp_maxloc
105      MODULE PROCEDURE mpp_maxloc2d_sp ,mpp_maxloc3d_sp
106      MODULE PROCEDURE mpp_maxloc2d_dp ,mpp_maxloc3d_dp
107   END INTERFACE
108
109   !! ========================= !!
110   !!  MPI  variable definition !!
111   !! ========================= !!
112#if ! defined key_mpi_off
113!$AGRIF_DO_NOT_TREAT
114   INCLUDE 'mpif.h'
115!$AGRIF_END_DO_NOT_TREAT
116   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
117#else
118   INTEGER, PUBLIC, PARAMETER ::   MPI_STATUS_SIZE = 1
119   INTEGER, PUBLIC, PARAMETER ::   MPI_REAL = 4
120   INTEGER, PUBLIC, PARAMETER ::   MPI_DOUBLE_PRECISION = 8
121   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.    !: mpp flag
122#endif
123
124   INTEGER, PUBLIC ::   mppsize        ! number of process
125   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
126!$AGRIF_DO_NOT_TREAT
127   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
128!$AGRIF_END_DO_NOT_TREAT
129
130   INTEGER :: MPI_SUMDD
131
132   ! Neighbourgs informations
133   INTEGER, DIMENSION(8), PUBLIC ::   mpinei     !: 8-neighbourg MPI indexes (starting at 0, -1 if no neighbourg)
134   INTEGER,    PARAMETER, PUBLIC ::   jpwe = 1   !: WEst
135   INTEGER,    PARAMETER, PUBLIC ::   jpea = 2   !: EAst
136   INTEGER,    PARAMETER, PUBLIC ::   jpso = 3   !: SOuth
137   INTEGER,    PARAMETER, PUBLIC ::   jpno = 4   !: NOrth
138   INTEGER,    PARAMETER, PUBLIC ::   jpsw = 5   !: South-West
139   INTEGER,    PARAMETER, PUBLIC ::   jpse = 6   !: South-East
140   INTEGER,    PARAMETER, PUBLIC ::   jpnw = 7   !: North-West
141   INTEGER,    PARAMETER, PUBLIC ::   jpne = 8   !: North-East
142
143   LOGICAL, DIMENSION(8), PUBLIC ::   l_SelfPerio  !   should we explicitely take care of I/J periodicity
144   LOGICAL,               PUBLIC ::   l_IdoNFold
145
146   ! variables used for zonal integration
147   INTEGER, PUBLIC ::   ncomm_znl         !: communicator made by the processors on the same zonal average
148   LOGICAL, PUBLIC ::   l_znl_root        !: True on the 'left'most processor on the same row
149   INTEGER         ::   ngrp_znl          !: group ID for the znl processors
150   INTEGER         ::   ndim_rank_znl     !: number of processors on the same zonal average
151   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
152
153   ! variables used for MPI3 neighbourhood collectives
154   INTEGER, PUBLIC ::   mpi_nc_com4       ! MPI3 neighbourhood collectives communicator
155   INTEGER, PUBLIC ::   mpi_nc_com8       ! MPI3 neighbourhood collectives communicator (with diagionals)
156
157   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
158   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
159   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
160   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
161   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
162   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
163   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
164   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
165   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
166
167   ! Communications summary report
168   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
169   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
170   CHARACTER(len=lca), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
171   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
172   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
173   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
174   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
175   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
176   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
177   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
178   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
179   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
180   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
181   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
182   !: name (used as id) of allreduce-delayed operations
183   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
184   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
185   !: component name where the allreduce-delayed operation is performed
186   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
187   TYPE, PUBLIC ::   DELAYARR
188      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
189      COMPLEX(dp), POINTER, DIMENSION(:) ::  y1d => NULL()
190   END TYPE DELAYARR
191   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
192   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
193
194   ! timing summary report
195   REAL(dp), DIMENSION(2), PUBLIC ::  waiting_time = 0._dp
196   REAL(dp)              , PUBLIC ::  compute_time = 0._dp, elapsed_time = 0._dp
197
198   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
199
200   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
201
202   !! * Substitutions
203#  include "do_loop_substitute.h90"
204   !!----------------------------------------------------------------------
205   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
206   !! $Id$
207   !! Software governed by the CeCILL license (see ./LICENSE)
208   !!----------------------------------------------------------------------
209CONTAINS
210
211   SUBROUTINE mpp_start( localComm )
212      !!----------------------------------------------------------------------
213      !!                  ***  routine mpp_start  ***
214      !!
215      !! ** Purpose :   get mpi_comm_oce, mpprank and mppsize
216      !!----------------------------------------------------------------------
217      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
218      !
219      INTEGER ::   ierr
220      LOGICAL ::   llmpi_init
221      !!----------------------------------------------------------------------
222#if ! defined key_mpi_off
223      !
224      CALL mpi_initialized ( llmpi_init, ierr )
225      IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_initialized' )
226
227      IF( .NOT. llmpi_init ) THEN
228         IF( PRESENT(localComm) ) THEN
229            WRITE(ctmp1,*) ' lib_mpp: You cannot provide a local communicator '
230            WRITE(ctmp2,*) '          without calling MPI_Init before ! '
231            CALL ctl_stop( 'STOP', ctmp1, ctmp2 )
232         ENDIF
233         CALL mpi_init( ierr )
234         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_init' )
235      ENDIF
236
237      IF( PRESENT(localComm) ) THEN
238         IF( Agrif_Root() ) THEN
239            mpi_comm_oce = localComm
240         ENDIF
241      ELSE
242         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, ierr)
243         IF( ierr /= MPI_SUCCESS ) CALL ctl_stop( 'STOP', ' lib_mpp: Error in routine mpi_comm_dup' )
244      ENDIF
245
246# if defined key_agrif
247      IF( Agrif_Root() ) THEN
248         CALL Agrif_MPI_Init(mpi_comm_oce)
249      ELSE
250         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
251      ENDIF
252# endif
253
254      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
255      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
256      !
257      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
258      !
259#else
260      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
261      mppsize = 1
262      mpprank = 0
263#endif
264   END SUBROUTINE mpp_start
265
266
267   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
268      !!----------------------------------------------------------------------
269      !!                  ***  routine mppsend  ***
270      !!
271      !! ** Purpose :   Send messag passing array
272      !!
273      !!----------------------------------------------------------------------
274      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
275      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
276      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
277      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
278      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
279      !!
280      INTEGER ::   iflag
281      INTEGER :: mpi_working_type
282      !!----------------------------------------------------------------------
283      !
284#if ! defined key_mpi_off
285      IF (wp == dp) THEN
286         mpi_working_type = mpi_double_precision
287      ELSE
288         mpi_working_type = mpi_real
289      END IF
290      CALL mpi_isend( pmess, kbytes, mpi_working_type, kdest , ktyp, mpi_comm_oce, md_req, iflag )
291#endif
292      !
293   END SUBROUTINE mppsend
294
295
296   SUBROUTINE mppsend_dp( ktyp, pmess, kbytes, kdest, md_req )
297      !!----------------------------------------------------------------------
298      !!                  ***  routine mppsend  ***
299      !!
300      !! ** Purpose :   Send messag passing array
301      !!
302      !!----------------------------------------------------------------------
303      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
304      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
305      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
306      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
307      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
308      !!
309      INTEGER ::   iflag
310      !!----------------------------------------------------------------------
311      !
312#if ! defined key_mpi_off
313      CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
314#endif
315      !
316   END SUBROUTINE mppsend_dp
317
318
319   SUBROUTINE mppsend_sp( ktyp, pmess, kbytes, kdest, md_req )
320      !!----------------------------------------------------------------------
321      !!                  ***  routine mppsend  ***
322      !!
323      !! ** Purpose :   Send messag passing array
324      !!
325      !!----------------------------------------------------------------------
326      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
327      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
328      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
329      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
330      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
331      !!
332      INTEGER ::   iflag
333      !!----------------------------------------------------------------------
334      !
335#if ! defined key_mpi_off
336      CALL mpi_isend( pmess, kbytes, mpi_real, kdest , ktyp, mpi_comm_oce, md_req, iflag )
337#endif
338      !
339   END SUBROUTINE mppsend_sp
340
341
342   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
343      !!----------------------------------------------------------------------
344      !!                  ***  routine mpprecv  ***
345      !!
346      !! ** Purpose :   Receive messag passing array
347      !!
348      !!----------------------------------------------------------------------
349      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
350      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
351      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
352      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
353      !!
354      INTEGER :: istatus(mpi_status_size)
355      INTEGER :: iflag
356      INTEGER :: use_source
357      INTEGER :: mpi_working_type
358      !!----------------------------------------------------------------------
359      !
360#if ! defined key_mpi_off
361      ! If a specific process number has been passed to the receive call,
362      ! use that one. Default is to use mpi_any_source
363      use_source = mpi_any_source
364      IF( PRESENT(ksource) )   use_source = ksource
365      !
366      IF (wp == dp) THEN
367         mpi_working_type = mpi_double_precision
368      ELSE
369         mpi_working_type = mpi_real
370      END IF
371      CALL mpi_recv( pmess, kbytes, mpi_working_type, use_source, ktyp, mpi_comm_oce, istatus, iflag )
372#endif
373      !
374   END SUBROUTINE mpprecv
375
376   SUBROUTINE mpprecv_dp( ktyp, pmess, kbytes, ksource )
377      !!----------------------------------------------------------------------
378      !!                  ***  routine mpprecv  ***
379      !!
380      !! ** Purpose :   Receive messag passing array
381      !!
382      !!----------------------------------------------------------------------
383      REAL(dp), INTENT(inout) ::   pmess(*)   ! array of real
384      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
385      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
386      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
387      !!
388      INTEGER :: istatus(mpi_status_size)
389      INTEGER :: iflag
390      INTEGER :: use_source
391      !!----------------------------------------------------------------------
392      !
393#if ! defined key_mpi_off
394      ! If a specific process number has been passed to the receive call,
395      ! use that one. Default is to use mpi_any_source
396      use_source = mpi_any_source
397      IF( PRESENT(ksource) )   use_source = ksource
398      !
399      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
400#endif
401      !
402   END SUBROUTINE mpprecv_dp
403
404
405   SUBROUTINE mpprecv_sp( ktyp, pmess, kbytes, ksource )
406      !!----------------------------------------------------------------------
407      !!                  ***  routine mpprecv  ***
408      !!
409      !! ** Purpose :   Receive messag passing array
410      !!
411      !!----------------------------------------------------------------------
412      REAL(sp), INTENT(inout) ::   pmess(*)   ! array of real
413      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
414      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
415      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
416      !!
417      INTEGER :: istatus(mpi_status_size)
418      INTEGER :: iflag
419      INTEGER :: use_source
420      !!----------------------------------------------------------------------
421      !
422#if ! defined key_mpi_off
423      ! If a specific process number has been passed to the receive call,
424      ! use that one. Default is to use mpi_any_source
425      use_source = mpi_any_source
426      IF( PRESENT(ksource) )   use_source = ksource
427      !
428      CALL mpi_recv( pmess, kbytes, mpi_real, use_source, ktyp, mpi_comm_oce, istatus, iflag )
429#endif
430      !
431   END SUBROUTINE mpprecv_sp
432
433
434   SUBROUTINE mppgather( ptab, kp, pio )
435      !!----------------------------------------------------------------------
436      !!                   ***  routine mppgather  ***
437      !!
438      !! ** Purpose :   Transfert between a local subdomain array and a work
439      !!     array which is distributed following the vertical level.
440      !!
441      !!----------------------------------------------------------------------
442      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
443      INTEGER                           , INTENT(in   ) ::   kp     ! record length
444      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
445      !!
446      INTEGER :: itaille, ierror   ! temporary integer
447      !!---------------------------------------------------------------------
448      !
449      itaille = jpi * jpj
450#if ! defined key_mpi_off
451      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
452         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
453#else
454      pio(:,:,1) = ptab(:,:)
455#endif
456      !
457   END SUBROUTINE mppgather
458
459
460   SUBROUTINE mppscatter( pio, kp, ptab )
461      !!----------------------------------------------------------------------
462      !!                  ***  routine mppscatter  ***
463      !!
464      !! ** Purpose :   Transfert between awork array which is distributed
465      !!      following the vertical level and the local subdomain array.
466      !!
467      !!----------------------------------------------------------------------
468      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
469      INTEGER                             ::   kp     ! Tag (not used with MPI
470      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
471      !!
472      INTEGER :: itaille, ierror   ! temporary integer
473      !!---------------------------------------------------------------------
474      !
475      itaille = jpi * jpj
476      !
477#if ! defined key_mpi_off
478      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
479         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
480#else
481      ptab(:,:) = pio(:,:,1)
482#endif
483      !
484   END SUBROUTINE mppscatter
485
486
487   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
488     !!----------------------------------------------------------------------
489      !!                   ***  routine mpp_delay_sum  ***
490      !!
491      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
492      !!
493      !!----------------------------------------------------------------------
494      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
495      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
496      COMPLEX(dp),      INTENT(in   ), DIMENSION(:) ::   y_in
497      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
498      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
499      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
500      !!
501      INTEGER ::   ji, isz
502      INTEGER ::   idvar
503      INTEGER ::   ierr, ilocalcomm
504      COMPLEX(dp), ALLOCATABLE, DIMENSION(:) ::   ytmp
505      !!----------------------------------------------------------------------
506#if ! defined key_mpi_off
507      ilocalcomm = mpi_comm_oce
508      IF( PRESENT(kcom) )   ilocalcomm = kcom
509
510      isz = SIZE(y_in)
511
512      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
513
514      idvar = -1
515      DO ji = 1, nbdelay
516         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
517      END DO
518      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
519
520      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
521         !                                       --------------------------
522         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
523            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
524            DEALLOCATE(todelay(idvar)%z1d)
525            ndelayid(idvar) = -1                                      ! do as if we had no restart
526         ELSE
527            ALLOCATE(todelay(idvar)%y1d(isz))
528            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
529            ndelayid(idvar) = MPI_REQUEST_NULL                             ! initialised request to a valid value
530         END IF
531      ENDIF
532
533      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
534         !                                       --------------------------
535         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
536         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
537         ndelayid(idvar) = MPI_REQUEST_NULL
538      ENDIF
539
540      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
541
542      ! send back pout from todelay(idvar)%z1d defined at previous call
543      pout(:) = todelay(idvar)%z1d(:)
544
545      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
546# if defined key_mpi2
547      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
548      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )
549      ndelayid(idvar) = MPI_REQUEST_NULL
550      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
551# else
552      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
553# endif
554#else
555      pout(:) = REAL(y_in(:), wp)
556#endif
557
558   END SUBROUTINE mpp_delay_sum
559
560
561   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
562      !!----------------------------------------------------------------------
563      !!                   ***  routine mpp_delay_max  ***
564      !!
565      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
566      !!
567      !!----------------------------------------------------------------------
568      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
569      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
570      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
571      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
572      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
573      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
574      !!
575      INTEGER ::   ji, isz
576      INTEGER ::   idvar
577      INTEGER ::   ierr, ilocalcomm
578      INTEGER ::   MPI_TYPE
579      !!----------------------------------------------------------------------
580
581#if ! defined key_mpi_off
582      if( wp == dp ) then
583         MPI_TYPE = MPI_DOUBLE_PRECISION
584      else if ( wp == sp ) then
585         MPI_TYPE = MPI_REAL
586      else
587        CALL ctl_stop( "Error defining type, wp is neither dp nor sp" )
588
589      end if
590
591      ilocalcomm = mpi_comm_oce
592      IF( PRESENT(kcom) )   ilocalcomm = kcom
593
594      isz = SIZE(p_in)
595
596      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
597
598      idvar = -1
599      DO ji = 1, nbdelay
600         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
601      END DO
602      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
603
604      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
605         !                                       --------------------------
606         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
607            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
608            DEALLOCATE(todelay(idvar)%z1d)
609            ndelayid(idvar) = -1                                      ! do as if we had no restart
610         ELSE
611            ndelayid(idvar) = MPI_REQUEST_NULL
612         END IF
613      ENDIF
614
615      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
616         !                                       --------------------------
617         ALLOCATE(todelay(idvar)%z1d(isz))
618         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
619         ndelayid(idvar) = MPI_REQUEST_NULL
620      ENDIF
621
622      CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
623
624      ! send back pout from todelay(idvar)%z1d defined at previous call
625      pout(:) = todelay(idvar)%z1d(:)
626
627      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
628      ! (PM) Should we get rid of MPI2 option ? MPI3 was release in 2013. Who is still using MPI2 ?
629# if defined key_mpi2
630      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
631      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ierr )
632      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
633# else
634      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_TYPE, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
635# endif
636#else
637      pout(:) = p_in(:)
638#endif
639
640   END SUBROUTINE mpp_delay_max
641
642
643   SUBROUTINE mpp_delay_rcv( kid )
644      !!----------------------------------------------------------------------
645      !!                   ***  routine mpp_delay_rcv  ***
646      !!
647      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
648      !!
649      !!----------------------------------------------------------------------
650      INTEGER,INTENT(in   )      ::  kid
651      INTEGER ::   ierr
652      !!----------------------------------------------------------------------
653#if ! defined key_mpi_off
654      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
655      ! test on ndelayid(kid) useless as mpi_wait return immediatly if the request handle is MPI_REQUEST_NULL
656      CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr ) ! after this ndelayid(kid) = MPI_REQUEST_NULL
657      IF( ln_timing ) CALL tic_tac( .FALSE., ld_global = .TRUE.)
658      IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
659#endif
660   END SUBROUTINE mpp_delay_rcv
661
662   SUBROUTINE mpp_bcast_nml( cdnambuff , kleng )
663      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
664      INTEGER                          , INTENT(INOUT) :: kleng
665      !!----------------------------------------------------------------------
666      !!                  ***  routine mpp_bcast_nml  ***
667      !!
668      !! ** Purpose :   broadcast namelist character buffer
669      !!
670      !!----------------------------------------------------------------------
671      !!
672      INTEGER ::   iflag
673      !!----------------------------------------------------------------------
674      !
675#if ! defined key_mpi_off
676      call MPI_BCAST(kleng, 1, MPI_INT, 0, mpi_comm_oce, iflag)
677      call MPI_BARRIER(mpi_comm_oce, iflag)
678!$AGRIF_DO_NOT_TREAT
679      IF ( .NOT. ALLOCATED(cdnambuff) ) ALLOCATE( CHARACTER(LEN=kleng) :: cdnambuff )
680!$AGRIF_END_DO_NOT_TREAT
681      call MPI_BCAST(cdnambuff, kleng, MPI_CHARACTER, 0, mpi_comm_oce, iflag)
682      call MPI_BARRIER(mpi_comm_oce, iflag)
683#endif
684      !
685   END SUBROUTINE mpp_bcast_nml
686
687
688   !!----------------------------------------------------------------------
689   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
690   !!
691   !!----------------------------------------------------------------------
692   !!
693#  define OPERATION_MAX
694#  define INTEGER_TYPE
695#  define DIM_0d
696#     define ROUTINE_ALLREDUCE           mppmax_int
697#     include "mpp_allreduce_generic.h90"
698#     undef ROUTINE_ALLREDUCE
699#  undef DIM_0d
700#  define DIM_1d
701#     define ROUTINE_ALLREDUCE           mppmax_a_int
702#     include "mpp_allreduce_generic.h90"
703#     undef ROUTINE_ALLREDUCE
704#  undef DIM_1d
705#  undef INTEGER_TYPE
706!
707   !!
708   !!   ----   SINGLE PRECISION VERSIONS
709   !!
710#  define SINGLE_PRECISION
711#  define REAL_TYPE
712#  define DIM_0d
713#     define ROUTINE_ALLREDUCE           mppmax_real_sp
714#     include "mpp_allreduce_generic.h90"
715#     undef ROUTINE_ALLREDUCE
716#  undef DIM_0d
717#  define DIM_1d
718#     define ROUTINE_ALLREDUCE           mppmax_a_real_sp
719#     include "mpp_allreduce_generic.h90"
720#     undef ROUTINE_ALLREDUCE
721#  undef DIM_1d
722#  undef SINGLE_PRECISION
723   !!
724   !!
725   !!   ----   DOUBLE PRECISION VERSIONS
726   !!
727!
728#  define DIM_0d
729#     define ROUTINE_ALLREDUCE           mppmax_real_dp
730#     include "mpp_allreduce_generic.h90"
731#     undef ROUTINE_ALLREDUCE
732#  undef DIM_0d
733#  define DIM_1d
734#     define ROUTINE_ALLREDUCE           mppmax_a_real_dp
735#     include "mpp_allreduce_generic.h90"
736#     undef ROUTINE_ALLREDUCE
737#  undef DIM_1d
738#  undef REAL_TYPE
739#  undef OPERATION_MAX
740   !!----------------------------------------------------------------------
741   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
742   !!
743   !!----------------------------------------------------------------------
744   !!
745#  define OPERATION_MIN
746#  define INTEGER_TYPE
747#  define DIM_0d
748#     define ROUTINE_ALLREDUCE           mppmin_int
749#     include "mpp_allreduce_generic.h90"
750#     undef ROUTINE_ALLREDUCE
751#  undef DIM_0d
752#  define DIM_1d
753#     define ROUTINE_ALLREDUCE           mppmin_a_int
754#     include "mpp_allreduce_generic.h90"
755#     undef ROUTINE_ALLREDUCE
756#  undef DIM_1d
757#  undef INTEGER_TYPE
758!
759   !!
760   !!   ----   SINGLE PRECISION VERSIONS
761   !!
762#  define SINGLE_PRECISION
763#  define REAL_TYPE
764#  define DIM_0d
765#     define ROUTINE_ALLREDUCE           mppmin_real_sp
766#     include "mpp_allreduce_generic.h90"
767#     undef ROUTINE_ALLREDUCE
768#  undef DIM_0d
769#  define DIM_1d
770#     define ROUTINE_ALLREDUCE           mppmin_a_real_sp
771#     include "mpp_allreduce_generic.h90"
772#     undef ROUTINE_ALLREDUCE
773#  undef DIM_1d
774#  undef SINGLE_PRECISION
775   !!
776   !!   ----   DOUBLE PRECISION VERSIONS
777   !!
778
779#  define DIM_0d
780#     define ROUTINE_ALLREDUCE           mppmin_real_dp
781#     include "mpp_allreduce_generic.h90"
782#     undef ROUTINE_ALLREDUCE
783#  undef DIM_0d
784#  define DIM_1d
785#     define ROUTINE_ALLREDUCE           mppmin_a_real_dp
786#     include "mpp_allreduce_generic.h90"
787#     undef ROUTINE_ALLREDUCE
788#  undef DIM_1d
789#  undef REAL_TYPE
790#  undef OPERATION_MIN
791
792   !!----------------------------------------------------------------------
793   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
794   !!
795   !!   Global sum of 1D array or a variable (integer, real or complex)
796   !!----------------------------------------------------------------------
797   !!
798#  define OPERATION_SUM
799#  define INTEGER_TYPE
800#  define DIM_0d
801#     define ROUTINE_ALLREDUCE           mppsum_int
802#     include "mpp_allreduce_generic.h90"
803#     undef ROUTINE_ALLREDUCE
804#  undef DIM_0d
805#  define DIM_1d
806#     define ROUTINE_ALLREDUCE           mppsum_a_int
807#     include "mpp_allreduce_generic.h90"
808#     undef ROUTINE_ALLREDUCE
809#  undef DIM_1d
810#  undef INTEGER_TYPE
811
812   !!
813   !!   ----   SINGLE PRECISION VERSIONS
814   !!
815#  define OPERATION_SUM
816#  define SINGLE_PRECISION
817#  define REAL_TYPE
818#  define DIM_0d
819#     define ROUTINE_ALLREDUCE           mppsum_real_sp
820#     include "mpp_allreduce_generic.h90"
821#     undef ROUTINE_ALLREDUCE
822#  undef DIM_0d
823#  define DIM_1d
824#     define ROUTINE_ALLREDUCE           mppsum_a_real_sp
825#     include "mpp_allreduce_generic.h90"
826#     undef ROUTINE_ALLREDUCE
827#  undef DIM_1d
828#  undef REAL_TYPE
829#  undef OPERATION_SUM
830
831#  undef SINGLE_PRECISION
832
833   !!
834   !!   ----   DOUBLE PRECISION VERSIONS
835   !!
836#  define OPERATION_SUM
837#  define REAL_TYPE
838#  define DIM_0d
839#     define ROUTINE_ALLREDUCE           mppsum_real_dp
840#     include "mpp_allreduce_generic.h90"
841#     undef ROUTINE_ALLREDUCE
842#  undef DIM_0d
843#  define DIM_1d
844#     define ROUTINE_ALLREDUCE           mppsum_a_real_dp
845#     include "mpp_allreduce_generic.h90"
846#     undef ROUTINE_ALLREDUCE
847#  undef DIM_1d
848#  undef REAL_TYPE
849#  undef OPERATION_SUM
850
851#  define OPERATION_SUM_DD
852#  define COMPLEX_TYPE
853#  define DIM_0d
854#     define ROUTINE_ALLREDUCE           mppsum_realdd
855#     include "mpp_allreduce_generic.h90"
856#     undef ROUTINE_ALLREDUCE
857#  undef DIM_0d
858#  define DIM_1d
859#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
860#     include "mpp_allreduce_generic.h90"
861#     undef ROUTINE_ALLREDUCE
862#  undef DIM_1d
863#  undef COMPLEX_TYPE
864#  undef OPERATION_SUM_DD
865
866   !!----------------------------------------------------------------------
867   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
868   !!
869   !!----------------------------------------------------------------------
870   !!
871   !!
872   !!   ----   SINGLE PRECISION VERSIONS
873   !!
874#  define SINGLE_PRECISION
875#  define OPERATION_MINLOC
876#  define DIM_2d
877#     define ROUTINE_LOC           mpp_minloc2d_sp
878#     include "mpp_loc_generic.h90"
879#     undef ROUTINE_LOC
880#  undef DIM_2d
881#  define DIM_3d
882#     define ROUTINE_LOC           mpp_minloc3d_sp
883#     include "mpp_loc_generic.h90"
884#     undef ROUTINE_LOC
885#  undef DIM_3d
886#  undef OPERATION_MINLOC
887
888#  define OPERATION_MAXLOC
889#  define DIM_2d
890#     define ROUTINE_LOC           mpp_maxloc2d_sp
891#     include "mpp_loc_generic.h90"
892#     undef ROUTINE_LOC
893#  undef DIM_2d
894#  define DIM_3d
895#     define ROUTINE_LOC           mpp_maxloc3d_sp
896#     include "mpp_loc_generic.h90"
897#     undef ROUTINE_LOC
898#  undef DIM_3d
899#  undef OPERATION_MAXLOC
900#  undef SINGLE_PRECISION
901   !!
902   !!   ----   DOUBLE PRECISION VERSIONS
903   !!
904#  define OPERATION_MINLOC
905#  define DIM_2d
906#     define ROUTINE_LOC           mpp_minloc2d_dp
907#     include "mpp_loc_generic.h90"
908#     undef ROUTINE_LOC
909#  undef DIM_2d
910#  define DIM_3d
911#     define ROUTINE_LOC           mpp_minloc3d_dp
912#     include "mpp_loc_generic.h90"
913#     undef ROUTINE_LOC
914#  undef DIM_3d
915#  undef OPERATION_MINLOC
916
917#  define OPERATION_MAXLOC
918#  define DIM_2d
919#     define ROUTINE_LOC           mpp_maxloc2d_dp
920#     include "mpp_loc_generic.h90"
921#     undef ROUTINE_LOC
922#  undef DIM_2d
923#  define DIM_3d
924#     define ROUTINE_LOC           mpp_maxloc3d_dp
925#     include "mpp_loc_generic.h90"
926#     undef ROUTINE_LOC
927#  undef DIM_3d
928#  undef OPERATION_MAXLOC
929
930
931   SUBROUTINE mppsync()
932      !!----------------------------------------------------------------------
933      !!                  ***  routine mppsync  ***
934      !!
935      !! ** Purpose :   Massively parallel processors, synchroneous
936      !!
937      !!-----------------------------------------------------------------------
938      INTEGER :: ierror
939      !!-----------------------------------------------------------------------
940      !
941#if ! defined key_mpi_off
942      CALL mpi_barrier( mpi_comm_oce, ierror )
943#endif
944      !
945   END SUBROUTINE mppsync
946
947
948   SUBROUTINE mppstop( ld_abort )
949      !!----------------------------------------------------------------------
950      !!                  ***  routine mppstop  ***
951      !!
952      !! ** purpose :   Stop massively parallel processors method
953      !!
954      !!----------------------------------------------------------------------
955      LOGICAL, OPTIONAL, INTENT(in) :: ld_abort    ! source process number
956      LOGICAL ::   ll_abort
957      INTEGER ::   info
958      !!----------------------------------------------------------------------
959      ll_abort = .FALSE.
960      IF( PRESENT(ld_abort) ) ll_abort = ld_abort
961      !
962#if ! defined key_mpi_off
963      IF(ll_abort) THEN
964         CALL mpi_abort( MPI_COMM_WORLD )
965      ELSE
966         CALL mppsync
967         CALL mpi_finalize( info )
968      ENDIF
969#endif
970      IF( ll_abort ) STOP 123
971      !
972   END SUBROUTINE mppstop
973
974
975   SUBROUTINE mpp_comm_free( kcom )
976      !!----------------------------------------------------------------------
977      INTEGER, INTENT(in) ::   kcom
978      !!
979      INTEGER :: ierr
980      !!----------------------------------------------------------------------
981      !
982#if ! defined key_mpi_off
983      CALL MPI_COMM_FREE(kcom, ierr)
984#endif
985      !
986   END SUBROUTINE mpp_comm_free
987
988
989   SUBROUTINE mpp_ini_znl( kumout )
990      !!----------------------------------------------------------------------
991      !!               ***  routine mpp_ini_znl  ***
992      !!
993      !! ** Purpose :   Initialize special communicator for computing zonal sum
994      !!
995      !! ** Method  : - Look for processors in the same row
996      !!              - Put their number in nrank_znl
997      !!              - Create group for the znl processors
998      !!              - Create a communicator for znl processors
999      !!              - Determine if processor should write znl files
1000      !!
1001      !! ** output
1002      !!      ndim_rank_znl = number of processors on the same row
1003      !!      ngrp_znl = group ID for the znl processors
1004      !!      ncomm_znl = communicator for the ice procs.
1005      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
1006      !!
1007      !!----------------------------------------------------------------------
1008      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
1009      !
1010      INTEGER :: jproc      ! dummy loop integer
1011      INTEGER :: ierr, ii   ! local integer
1012      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1013      !!----------------------------------------------------------------------
1014#if ! defined key_mpi_off
1015      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_world     : ', ngrp_world
1016      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_world : ', mpi_comm_world
1017      !-$$     WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - mpi_comm_oce   : ', mpi_comm_oce
1018      !
1019      ALLOCATE( kwork(jpnij), STAT=ierr )
1020      IF( ierr /= 0 ) CALL ctl_stop( 'STOP', 'mpp_ini_znl : failed to allocate 1D array of length jpnij')
1021
1022      IF( jpnj == 1 ) THEN
1023         ngrp_znl  = ngrp_world
1024         ncomm_znl = mpi_comm_oce
1025      ELSE
1026         !
1027         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
1028         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - kwork pour njmpp : ', kwork
1029         !-$$        CALL flush(numout)
1030         !
1031         ! Count number of processors on the same row
1032         ndim_rank_znl = 0
1033         DO jproc=1,jpnij
1034            IF ( kwork(jproc) == njmpp ) THEN
1035               ndim_rank_znl = ndim_rank_znl + 1
1036            ENDIF
1037         END DO
1038         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ndim_rank_znl : ', ndim_rank_znl
1039         !-$$        CALL flush(numout)
1040         ! Allocate the right size to nrank_znl
1041         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1042         ALLOCATE(nrank_znl(ndim_rank_znl))
1043         ii = 0
1044         nrank_znl (:) = 0
1045         DO jproc=1,jpnij
1046            IF ( kwork(jproc) == njmpp) THEN
1047               ii = ii + 1
1048               nrank_znl(ii) = jproc -1
1049            ENDIF
1050         END DO
1051         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - nrank_znl : ', nrank_znl
1052         !-$$        CALL flush(numout)
1053
1054         ! Create the opa group
1055         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1056         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_opa : ', ngrp_opa
1057         !-$$        CALL flush(numout)
1058
1059         ! Create the znl group from the opa group
1060         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1061         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ngrp_znl ', ngrp_znl
1062         !-$$        CALL flush(numout)
1063
1064         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1065         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1066         !-$$        WRITE (numout,*) 'mpp_ini_znl ', mpprank, ' - ncomm_znl ', ncomm_znl
1067         !-$$        CALL flush(numout)
1068         !
1069      END IF
1070
1071      ! Determines if processor if the first (starting from i=1) on the row
1072      IF ( jpni == 1 ) THEN
1073         l_znl_root = .TRUE.
1074      ELSE
1075         l_znl_root = .FALSE.
1076         kwork (1) = nimpp
1077         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1078         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1079      END IF
1080
1081      DEALLOCATE(kwork)
1082#endif
1083
1084   END SUBROUTINE mpp_ini_znl
1085
1086   
1087   SUBROUTINE mpp_ini_nc
1088      !!----------------------------------------------------------------------
1089      !!               ***  routine mpp_ini_nc  ***
1090      !!
1091      !! ** Purpose :   Initialize special communicators for MPI3 neighbourhood
1092      !!                collectives
1093      !!
1094      !! ** Method  : - Create graph communicators starting from the processes
1095      !!                distribution along i and j directions
1096      !
1097      !! ** output
1098      !!         mpi_nc_com4 = MPI3 neighbourhood collectives communicator
1099      !!         mpi_nc_com8 = MPI3 neighbourhood collectives communicator (with diagonals)
1100      !!----------------------------------------------------------------------
1101      INTEGER, DIMENSION(:), ALLOCATABLE :: inei4, inei8
1102      INTEGER :: icnt4, icnt8
1103      INTEGER :: ierr
1104      LOGICAL, PARAMETER :: ireord = .FALSE.
1105      !!----------------------------------------------------------------------
1106#if ! defined key_mpi_off && ! defined key_mpi2
1107     
1108      icnt4 = COUNT( mpinei(1:4) >= 0 )
1109      icnt8 = COUNT( mpinei(1:8) >= 0 )
1110
1111      ALLOCATE( inei4(icnt4), inei8(icnt8) )   ! ok if icnt4 or icnt8 = 0
1112
1113      inei4 = PACK( mpinei(1:4), mask = mpinei(1:4) >= 0 )
1114      inei8 = PACK( mpinei(1:8), mask = mpinei(1:8) >= 0 )
1115
1116      CALL MPI_Dist_graph_create_adjacent(mpi_comm_oce, icnt4, inei4, MPI_UNWEIGHTED,   &
1117         &                                              icnt4, inei4, MPI_UNWEIGHTED, MPI_INFO_NULL, ireord, mpi_nc_com4, ierr)
1118      CALL MPI_Dist_graph_create_adjacent(mpi_comm_oce, icnt8, inei8, MPI_UNWEIGHTED,   &
1119         &                                              icnt8, inei8, MPI_UNWEIGHTED, MPI_INFO_NULL, ireord, mpi_nc_com8, ierr)
1120
1121      DEALLOCATE (inei4, inei8)
1122#endif
1123   END SUBROUTINE mpp_ini_nc
1124
1125
1126   SUBROUTINE mpp_ini_north
1127      !!----------------------------------------------------------------------
1128      !!               ***  routine mpp_ini_north  ***
1129      !!
1130      !! ** Purpose :   Initialize special communicator for north folding
1131      !!      condition together with global variables needed in the mpp folding
1132      !!
1133      !! ** Method  : - Look for northern processors
1134      !!              - Put their number in nrank_north
1135      !!              - Create groups for the world processors and the north processors
1136      !!              - Create a communicator for northern processors
1137      !!
1138      !! ** output
1139      !!      ndim_rank_north = number of processors in the northern line
1140      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1141      !!      ngrp_world = group ID for the world processors
1142      !!      ngrp_north = group ID for the northern processors
1143      !!      ncomm_north = communicator for the northern procs.
1144      !!      north_root = number (in the world) of proc 0 in the northern comm.
1145      !!
1146      !!----------------------------------------------------------------------
1147      INTEGER ::   ierr
1148      INTEGER ::   jjproc
1149      INTEGER ::   ii, ji
1150      !!----------------------------------------------------------------------
1151      !
1152#if ! defined key_mpi_off
1153      !
1154      ! Look for how many procs on the northern boundary
1155      ndim_rank_north = 0
1156      DO jjproc = 1, jpni
1157         IF( nfproc(jjproc) /= -1 )   ndim_rank_north = ndim_rank_north + 1
1158      END DO
1159      !
1160      ! Allocate the right size to nrank_north
1161      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1162      ALLOCATE( nrank_north(ndim_rank_north) )
1163
1164      ! Fill the nrank_north array with proc. number of northern procs.
1165      ! Note : the rank start at 0 in MPI
1166      ii = 0
1167      DO ji = 1, jpni
1168         IF ( nfproc(ji) /= -1   ) THEN
1169            ii=ii+1
1170            nrank_north(ii)=nfproc(ji)
1171         END IF
1172      END DO
1173      !
1174      ! create the world group
1175      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1176      !
1177      ! Create the North group from the world group
1178      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1179      !
1180      ! Create the North communicator , ie the pool of procs in the north group
1181      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1182      !
1183#endif
1184   END SUBROUTINE mpp_ini_north
1185
1186
1187   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1188      !!---------------------------------------------------------------------
1189      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1190      !!
1191      !!   Modification of original codes written by David H. Bailey
1192      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1193      !!---------------------------------------------------------------------
1194      INTEGER                     , INTENT(in)    ::   ilen, itype
1195      COMPLEX(dp), DIMENSION(ilen), INTENT(in)    ::   ydda
1196      COMPLEX(dp), DIMENSION(ilen), INTENT(inout) ::   yddb
1197      !
1198      REAL(dp) :: zerr, zt1, zt2    ! local work variables
1199      INTEGER  :: ji, ztmp           ! local scalar
1200      !!---------------------------------------------------------------------
1201      !
1202      ztmp = itype   ! avoid compilation warning
1203      !
1204      DO ji=1,ilen
1205      ! Compute ydda + yddb using Knuth's trick.
1206         zt1  = real(ydda(ji)) + real(yddb(ji))
1207         zerr = zt1 - real(ydda(ji))
1208         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1209                + aimag(ydda(ji)) + aimag(yddb(ji))
1210
1211         ! The result is zt1 + zt2, after normalization.
1212         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1213      END DO
1214      !
1215   END SUBROUTINE DDPDD_MPI
1216
1217
1218   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1219      !!----------------------------------------------------------------------
1220      !!                  ***  routine mpp_report  ***
1221      !!
1222      !! ** Purpose :   report use of mpp routines per time-setp
1223      !!
1224      !!----------------------------------------------------------------------
1225      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1226      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1227      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1228      !!
1229      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1230      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1231      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1232      !!----------------------------------------------------------------------
1233#if ! defined key_mpi_off
1234      !
1235      ll_lbc = .FALSE.
1236      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1237      ll_glb = .FALSE.
1238      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1239      ll_dlg = .FALSE.
1240      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1241      !
1242      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1243      ncom_freq = ncom_fsbc
1244      !
1245      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1246         IF( ll_lbc ) THEN
1247            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1248            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1249            n_sequence_lbc = n_sequence_lbc + 1
1250            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1251            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1252            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1253            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1254         ENDIF
1255         IF( ll_glb ) THEN
1256            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1257            n_sequence_glb = n_sequence_glb + 1
1258            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1259            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1260         ENDIF
1261         IF( ll_dlg ) THEN
1262            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1263            n_sequence_dlg = n_sequence_dlg + 1
1264            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1265            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1266         ENDIF
1267      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1268         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1269         WRITE(numcom,*) ' '
1270         WRITE(numcom,*) ' ------------------------------------------------------------'
1271         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1272         WRITE(numcom,*) ' ------------------------------------------------------------'
1273         WRITE(numcom,*) ' '
1274         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1275         jj = 0; jk = 0; jf = 0; jh = 0
1276         DO ji = 1, n_sequence_lbc
1277            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1278            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1279            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1280            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1281         END DO
1282         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1283         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1284         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1285         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1286         WRITE(numcom,*) ' '
1287         WRITE(numcom,*) ' lbc_lnk called'
1288         DO ji = 1, n_sequence_lbc - 1
1289            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1290               ccountname = crname_lbc(ji)
1291               crname_lbc(ji) = 'already counted'
1292               jcount = 1
1293               DO jj = ji + 1, n_sequence_lbc
1294                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1295                     jcount = jcount + 1
1296                     crname_lbc(jj) = 'already counted'
1297                  END IF
1298               END DO
1299               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1300            END IF
1301         END DO
1302         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1303            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1304         END IF
1305         WRITE(numcom,*) ' '
1306         IF ( n_sequence_glb > 0 ) THEN
1307            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1308            jj = 1
1309            DO ji = 2, n_sequence_glb
1310               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1311                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1312                  jj = 0
1313               END IF
1314               jj = jj + 1
1315            END DO
1316            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1317            DEALLOCATE(crname_glb)
1318         ELSE
1319            WRITE(numcom,*) ' No MPI global communication '
1320         ENDIF
1321         WRITE(numcom,*) ' '
1322         IF ( n_sequence_dlg > 0 ) THEN
1323            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1324            jj = 1
1325            DO ji = 2, n_sequence_dlg
1326               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1327                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1328                  jj = 0
1329               END IF
1330               jj = jj + 1
1331            END DO
1332            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1333            DEALLOCATE(crname_dlg)
1334         ELSE
1335            WRITE(numcom,*) ' No MPI delayed global communication '
1336         ENDIF
1337         WRITE(numcom,*) ' '
1338         WRITE(numcom,*) ' -----------------------------------------------'
1339         WRITE(numcom,*) ' '
1340         DEALLOCATE(ncomm_sequence)
1341         DEALLOCATE(crname_lbc)
1342      ENDIF
1343#endif
1344   END SUBROUTINE mpp_report
1345
1346
1347   SUBROUTINE tic_tac (ld_tic, ld_global)
1348
1349    LOGICAL,           INTENT(IN) :: ld_tic
1350    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1351    REAL(dp), DIMENSION(2), SAVE :: tic_wt
1352    REAL(dp),               SAVE :: tic_ct = 0._dp
1353    INTEGER :: ii
1354#if ! defined key_mpi_off
1355
1356    IF( ncom_stp <= nit000 ) RETURN
1357    IF( ncom_stp == nitend ) RETURN
1358    ii = 1
1359    IF( PRESENT( ld_global ) ) THEN
1360       IF( ld_global ) ii = 2
1361    END IF
1362
1363    IF ( ld_tic ) THEN
1364       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1365       IF ( tic_ct > 0.0_dp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1366    ELSE
1367       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1368       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1369    ENDIF
1370#endif
1371
1372   END SUBROUTINE tic_tac
1373
1374#if defined key_mpi_off
1375   SUBROUTINE mpi_wait(request, status, ierror)
1376      INTEGER                            , INTENT(in   ) ::   request
1377      INTEGER, DIMENSION(MPI_STATUS_SIZE), INTENT(  out) ::   status
1378      INTEGER                            , INTENT(  out) ::   ierror
1379   END SUBROUTINE mpi_wait
1380
1381
1382   FUNCTION MPI_Wtime()
1383      REAL(wp) ::  MPI_Wtime
1384      MPI_Wtime = -1.
1385   END FUNCTION MPI_Wtime
1386#endif
1387
1388   !!----------------------------------------------------------------------
1389   !!   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam, load_nml   routines
1390   !!----------------------------------------------------------------------
1391
1392   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1393      &                 cd6, cd7, cd8, cd9, cd10 )
1394      !!----------------------------------------------------------------------
1395      !!                  ***  ROUTINE  stop_opa  ***
1396      !!
1397      !! ** Purpose :   print in ocean.outpput file a error message and
1398      !!                increment the error number (nstop) by one.
1399      !!----------------------------------------------------------------------
1400      CHARACTER(len=*), INTENT(in   )           ::   cd1
1401      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::        cd2, cd3, cd4, cd5
1402      CHARACTER(len=*), INTENT(in   ), OPTIONAL ::   cd6, cd7, cd8, cd9, cd10
1403      !
1404      CHARACTER(LEN=8) ::   clfmt            ! writing format
1405      INTEGER          ::   inum
1406      !!----------------------------------------------------------------------
1407      !
1408      nstop = nstop + 1
1409      !
1410      IF( cd1 == 'STOP' .AND. narea /= 1 ) THEN    ! Immediate stop: add an arror message in 'ocean.output' file
1411         CALL ctl_opn( inum, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1412         WRITE(inum,*)
1413         WRITE(inum,*) ' ==>>>   Look for "E R R O R" messages in all existing *ocean.output* files'
1414         CLOSE(inum)
1415      ENDIF
1416      IF( numout == 6 ) THEN                       ! force to open ocean.output file if not already opened
1417         CALL ctl_opn( numout, 'ocean.output', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, -1, .FALSE., narea )
1418      ENDIF
1419      !
1420                            WRITE(numout,*)
1421                            WRITE(numout,*) ' ===>>> : E R R O R'
1422                            WRITE(numout,*)
1423                            WRITE(numout,*) '         ==========='
1424                            WRITE(numout,*)
1425                            WRITE(numout,*) TRIM(cd1)
1426      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1427      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1428      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1429      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1430      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1431      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1432      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1433      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1434      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1435                            WRITE(numout,*)
1436      !
1437                               CALL FLUSH(numout    )
1438      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1439      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1440      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1441      !
1442      IF( cd1 == 'STOP' ) THEN
1443         WRITE(numout,*)
1444         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1445         WRITE(numout,*)
1446         CALL FLUSH(numout)
1447         CALL SLEEP(60)   ! make sure that all output and abort files are written by all cores. 60s should be enough...
1448         CALL mppstop( ld_abort = .true. )
1449      ENDIF
1450      !
1451   END SUBROUTINE ctl_stop
1452
1453
1454   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1455      &                 cd6, cd7, cd8, cd9, cd10 )
1456      !!----------------------------------------------------------------------
1457      !!                  ***  ROUTINE  stop_warn  ***
1458      !!
1459      !! ** Purpose :   print in ocean.outpput file a error message and
1460      !!                increment the warning number (nwarn) by one.
1461      !!----------------------------------------------------------------------
1462      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1463      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1464      !!----------------------------------------------------------------------
1465      !
1466      nwarn = nwarn + 1
1467      !
1468      IF(lwp) THEN
1469                               WRITE(numout,*)
1470                               WRITE(numout,*) ' ===>>> : W A R N I N G'
1471                               WRITE(numout,*)
1472                               WRITE(numout,*) '         ==============='
1473                               WRITE(numout,*)
1474         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1475         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1476         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1477         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1478         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1479         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1480         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1481         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1482         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1483         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1484                               WRITE(numout,*)
1485      ENDIF
1486      CALL FLUSH(numout)
1487      !
1488   END SUBROUTINE ctl_warn
1489
1490
1491   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1492      !!----------------------------------------------------------------------
1493      !!                  ***  ROUTINE ctl_opn  ***
1494      !!
1495      !! ** Purpose :   Open file and check if required file is available.
1496      !!
1497      !! ** Method  :   Fortan open
1498      !!----------------------------------------------------------------------
1499      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1500      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1501      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1502      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1503      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1504      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1505      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1506      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1507      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1508      !
1509      CHARACTER(len=80) ::   clfile
1510      CHARACTER(LEN=10) ::   clfmt            ! writing format
1511      INTEGER           ::   iost
1512      INTEGER           ::   idg              ! number of digits
1513      !!----------------------------------------------------------------------
1514      !
1515      ! adapt filename
1516      ! ----------------
1517      clfile = TRIM(cdfile)
1518      IF( PRESENT( karea ) ) THEN
1519         IF( karea > 1 ) THEN
1520            ! Warning: jpnij is maybe not already defined when calling ctl_opn -> use mppsize instead of jpnij
1521            idg = MAX( INT(LOG10(REAL(MAX(1,mppsize-1),wp))) + 1, 4 )      ! how many digits to we need to write? min=4, max=9
1522            WRITE(clfmt, "('(a,a,i', i1, '.', i1, ')')") idg, idg          ! '(a,a,ix.x)'
1523            WRITE(clfile, clfmt) TRIM(clfile), '_', karea-1
1524         ENDIF
1525      ENDIF
1526#if defined key_agrif
1527      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1528      knum=Agrif_Get_Unit()
1529#else
1530      knum=get_unit()
1531#endif
1532      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1533      !
1534      IF(       cdacce(1:6) == 'DIRECT' )  THEN   ! cdacce has always more than 6 characters
1535         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1536      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1537         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1538      ELSE
1539         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1540      ENDIF
1541      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1542         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1543      IF( iost == 0 ) THEN
1544         IF(ldwp .AND. kout > 0) THEN
1545            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1546            WRITE(kout,*) '     unit   = ', knum
1547            WRITE(kout,*) '     status = ', cdstat
1548            WRITE(kout,*) '     form   = ', cdform
1549            WRITE(kout,*) '     access = ', cdacce
1550            WRITE(kout,*)
1551         ENDIF
1552      ENDIF
1553100   CONTINUE
1554      IF( iost /= 0 ) THEN
1555         WRITE(ctmp1,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1556         WRITE(ctmp2,*) ' =======   ===  '
1557         WRITE(ctmp3,*) '           unit   = ', knum
1558         WRITE(ctmp4,*) '           status = ', cdstat
1559         WRITE(ctmp5,*) '           form   = ', cdform
1560         WRITE(ctmp6,*) '           access = ', cdacce
1561         WRITE(ctmp7,*) '           iostat = ', iost
1562         WRITE(ctmp8,*) '           we stop. verify the file '
1563         CALL ctl_stop( 'STOP', ctmp1, ctmp2, ctmp3, ctmp4, ctmp5, ctmp6, ctmp7, ctmp8 )
1564      ENDIF
1565      !
1566   END SUBROUTINE ctl_opn
1567
1568
1569   SUBROUTINE ctl_nam ( kios, cdnam )
1570      !!----------------------------------------------------------------------
1571      !!                  ***  ROUTINE ctl_nam  ***
1572      !!
1573      !! ** Purpose :   Informations when error while reading a namelist
1574      !!
1575      !! ** Method  :   Fortan open
1576      !!----------------------------------------------------------------------
1577      INTEGER                                , INTENT(inout) ::   kios    ! IO status after reading the namelist
1578      CHARACTER(len=*)                       , INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1579      !
1580      CHARACTER(len=5) ::   clios   ! string to convert iostat in character for print
1581      !!----------------------------------------------------------------------
1582      !
1583      WRITE (clios, '(I5.0)')   kios
1584      IF( kios < 0 ) THEN
1585         CALL ctl_warn( 'end of record or file while reading namelist '   &
1586            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1587      ENDIF
1588      !
1589      IF( kios > 0 ) THEN
1590         CALL ctl_stop( 'misspelled variable in namelist '   &
1591            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1592      ENDIF
1593      kios = 0
1594      !
1595   END SUBROUTINE ctl_nam
1596
1597
1598   INTEGER FUNCTION get_unit()
1599      !!----------------------------------------------------------------------
1600      !!                  ***  FUNCTION  get_unit  ***
1601      !!
1602      !! ** Purpose :   return the index of an unused logical unit
1603      !!----------------------------------------------------------------------
1604      LOGICAL :: llopn
1605      !!----------------------------------------------------------------------
1606      !
1607      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1608      llopn = .TRUE.
1609      DO WHILE( (get_unit < 998) .AND. llopn )
1610         get_unit = get_unit + 1
1611         INQUIRE( unit = get_unit, opened = llopn )
1612      END DO
1613      IF( (get_unit == 999) .AND. llopn ) THEN
1614         CALL ctl_stop( 'STOP', 'get_unit: All logical units until 999 are used...' )
1615      ENDIF
1616      !
1617   END FUNCTION get_unit
1618
1619   SUBROUTINE load_nml( cdnambuff , cdnamfile, kout, ldwp)
1620      CHARACTER(LEN=:)    , ALLOCATABLE, INTENT(INOUT) :: cdnambuff
1621      CHARACTER(LEN=*), INTENT(IN )                :: cdnamfile
1622      CHARACTER(LEN=256)                           :: chline
1623      CHARACTER(LEN=1)                             :: csp
1624      INTEGER, INTENT(IN)                          :: kout
1625      LOGICAL, INTENT(IN)                          :: ldwp  !: .true. only for the root broadcaster
1626      INTEGER                                      :: itot, iun, iltc, inl, ios, itotsav
1627      !
1628      !csp = NEW_LINE('A')
1629      ! a new line character is the best seperator but some systems (e.g.Cray)
1630      ! seem to terminate namelist reads from internal files early if they
1631      ! encounter new-lines. Use a single space for safety.
1632      csp = ' '
1633      !
1634      ! Check if the namelist buffer has already been allocated. Return if it has.
1635      !
1636      IF ( ALLOCATED( cdnambuff ) ) RETURN
1637      IF( ldwp ) THEN
1638         !
1639         ! Open namelist file
1640         !
1641         CALL ctl_opn( iun, cdnamfile, 'OLD', 'FORMATTED', 'SEQUENTIAL', -1, kout, ldwp )
1642         !
1643         ! First pass: count characters excluding comments and trimable white space
1644         !
1645         itot=0
1646     10  READ(iun,'(A256)',END=20,ERR=20) chline
1647         iltc = LEN_TRIM(chline)
1648         IF ( iltc.GT.0 ) THEN
1649          inl = INDEX(chline, '!')
1650          IF( inl.eq.0 ) THEN
1651           itot = itot + iltc + 1                                ! +1 for the newline character
1652          ELSEIF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl-1) ).GT.0 ) THEN
1653           itot = itot + inl                                  !  includes +1 for the newline character
1654          ENDIF
1655         ENDIF
1656         GOTO 10
1657     20  CONTINUE
1658         !
1659         ! Allocate text cdnambuff for condensed namelist
1660         !
1661!$AGRIF_DO_NOT_TREAT
1662         ALLOCATE( CHARACTER(LEN=itot) :: cdnambuff )
1663!$AGRIF_END_DO_NOT_TREAT
1664         itotsav = itot
1665         !
1666         ! Second pass: read and transfer pruned characters into cdnambuff
1667         !
1668         REWIND(iun)
1669         itot=1
1670     30  READ(iun,'(A256)',END=40,ERR=40) chline
1671         iltc = LEN_TRIM(chline)
1672         IF ( iltc.GT.0 ) THEN
1673          inl = INDEX(chline, '!')
1674          IF( inl.eq.0 ) THEN
1675           inl = iltc
1676          ELSE
1677           inl = inl - 1
1678          ENDIF
1679          IF( inl.GT.0 .AND. LEN_TRIM( chline(1:inl) ).GT.0 ) THEN
1680             cdnambuff(itot:itot+inl-1) = chline(1:inl)
1681             WRITE( cdnambuff(itot+inl:itot+inl), '(a)' ) csp
1682             itot = itot + inl + 1
1683          ENDIF
1684         ENDIF
1685         GOTO 30
1686     40  CONTINUE
1687         itot = itot - 1
1688         IF( itotsav .NE. itot ) WRITE(*,*) 'WARNING in load_nml. Allocated ',itotsav,' for read buffer; but used ',itot
1689         !
1690         ! Close namelist file
1691         !
1692         CLOSE(iun)
1693         !write(*,'(32A)') cdnambuff
1694      ENDIF
1695#if ! defined key_mpi_off
1696      CALL mpp_bcast_nml( cdnambuff, itot )
1697#endif
1698  END SUBROUTINE load_nml
1699
1700
1701   !!----------------------------------------------------------------------
1702END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.