source: NEMO/trunk/src/OCE/LBC/lib_mpp.F90 @ 10521

Last change on this file since 10521 was 10521, checked in by smasson, 19 months ago

trunk: bugfix for compilation with (at least) gcc, see #2201

  • Property svn:keywords set to Id
File size: 89.7 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
85   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
86   PUBLIC   mppscatter, mppgather
87   PUBLIC   mpp_ini_znl
88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
90   
91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
100   END INTERFACE
101   INTERFACE mpp_sum
102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
103         &             mppsum_realdd, mppsum_a_realdd
104   END INTERFACE
105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
111
112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
115!$AGRIF_DO_NOT_TREAT
116   INCLUDE 'mpif.h'
117!$AGRIF_END_DO_NOT_TREAT
118
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Type of send : standard, buffered, immediate
149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
152
153   ! Communications summary report
154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
156   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
157   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
158   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
159   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
160   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
161   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
162   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 2000          !: max number of communication record
163   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
164   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
165   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
166   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
167   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
168   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
169   !: name (used as id) of allreduce-delayed operations
170   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
171   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
172   !: component name where the allreduce-delayed operation is performed
173   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
174   TYPE, PUBLIC ::   DELAYARR
175      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
176      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
177   END TYPE DELAYARR
178   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC  ::   todelay             
179   INTEGER,          DIMENSION(nbdelay), PUBLIC  ::   ndelayid = -1     !: mpi request id of the delayed operations
180
181   ! timing summary report
182   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
183   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
184   
185   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
186
187   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
188   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
189
190   !!----------------------------------------------------------------------
191   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
192   !! $Id$
193   !! Software governed by the CeCILL license (see ./LICENSE)
194   !!----------------------------------------------------------------------
195CONTAINS
196
197   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
198      !!----------------------------------------------------------------------
199      !!                  ***  routine mynode  ***
200      !!
201      !! ** Purpose :   Find processor unit
202      !!----------------------------------------------------------------------
203      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
204      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
205      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
206      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
207      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
208      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
209      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
210      !
211      INTEGER ::   mynode, ierr, code, ji, ii, ios
212      LOGICAL ::   mpi_was_called
213      !
214      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
215      !!----------------------------------------------------------------------
216      !
217      ii = 1
218      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
219      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
220      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
221      !
222      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
223      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
224901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
225      !
226      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
227      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
228902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
229      !
230      !                              ! control print
231      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
232      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
233      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
234      !
235      IF( jpni < 1 .OR. jpnj < 1  ) THEN
236         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
237      ELSE
238         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
239         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
240      ENDIF
241
242      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
243
244      CALL mpi_initialized ( mpi_was_called, code )
245      IF( code /= MPI_SUCCESS ) THEN
246         DO ji = 1, SIZE(ldtxt)
247            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
248         END DO
249         WRITE(*, cform_err)
250         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
251         CALL mpi_abort( mpi_comm_world, code, ierr )
252      ENDIF
253
254      IF( mpi_was_called ) THEN
255         !
256         SELECT CASE ( cn_mpi_send )
257         CASE ( 'S' )                ! Standard mpi send (blocking)
258            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
259         CASE ( 'B' )                ! Buffer mpi send (blocking)
260            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
261            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
262         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
263            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
264            l_isend = .TRUE.
265         CASE DEFAULT
266            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
267            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
268            kstop = kstop + 1
269         END SELECT
270         !
271      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
272         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
273         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
274         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
275         kstop = kstop + 1
276      ELSE
277         SELECT CASE ( cn_mpi_send )
278         CASE ( 'S' )                ! Standard mpi send (blocking)
279            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
280            CALL mpi_init( ierr )
281         CASE ( 'B' )                ! Buffer mpi send (blocking)
282            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
283            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
284         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
285            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
286            l_isend = .TRUE.
287            CALL mpi_init( ierr )
288         CASE DEFAULT
289            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
290            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
291            kstop = kstop + 1
292         END SELECT
293         !
294      ENDIF
295
296      IF( PRESENT(localComm) ) THEN
297         IF( Agrif_Root() ) THEN
298            mpi_comm_oce = localComm
299         ENDIF
300      ELSE
301         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
302         IF( code /= MPI_SUCCESS ) THEN
303            DO ji = 1, SIZE(ldtxt)
304               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
305            END DO
306            WRITE(*, cform_err)
307            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
308            CALL mpi_abort( mpi_comm_world, code, ierr )
309         ENDIF
310      ENDIF
311
312#if defined key_agrif
313      IF( Agrif_Root() ) THEN
314         CALL Agrif_MPI_Init(mpi_comm_oce)
315      ELSE
316         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
317      ENDIF
318#endif
319
320      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
321      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
322      mynode = mpprank
323
324      IF( mynode == 0 ) THEN
325         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
326         WRITE(kumond, nammpp)     
327      ENDIF
328      !
329      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
330      !
331   END FUNCTION mynode
332
333   !!----------------------------------------------------------------------
334   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
335   !!
336   !!   * Argument : dummy argument use in mpp_lnk_... routines
337   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
338   !!                cd_nat :   nature of array grid-points
339   !!                psgn   :   sign used across the north fold boundary
340   !!                kfld   :   optional, number of pt3d arrays
341   !!                cd_mpp :   optional, fill the overlap area only
342   !!                pval   :   optional, background value (used at closed boundaries)
343   !!----------------------------------------------------------------------
344   !
345   !                       !==  2D array and array of 2D pointer  ==!
346   !
347#  define DIM_2d
348#     define ROUTINE_LNK           mpp_lnk_2d
349#     include "mpp_lnk_generic.h90"
350#     undef ROUTINE_LNK
351#     define MULTI
352#     define ROUTINE_LNK           mpp_lnk_2d_ptr
353#     include "mpp_lnk_generic.h90"
354#     undef ROUTINE_LNK
355#     undef MULTI
356#  undef DIM_2d
357   !
358   !                       !==  3D array and array of 3D pointer  ==!
359   !
360#  define DIM_3d
361#     define ROUTINE_LNK           mpp_lnk_3d
362#     include "mpp_lnk_generic.h90"
363#     undef ROUTINE_LNK
364#     define MULTI
365#     define ROUTINE_LNK           mpp_lnk_3d_ptr
366#     include "mpp_lnk_generic.h90"
367#     undef ROUTINE_LNK
368#     undef MULTI
369#  undef DIM_3d
370   !
371   !                       !==  4D array and array of 4D pointer  ==!
372   !
373#  define DIM_4d
374#     define ROUTINE_LNK           mpp_lnk_4d
375#     include "mpp_lnk_generic.h90"
376#     undef ROUTINE_LNK
377#     define MULTI
378#     define ROUTINE_LNK           mpp_lnk_4d_ptr
379#     include "mpp_lnk_generic.h90"
380#     undef ROUTINE_LNK
381#     undef MULTI
382#  undef DIM_4d
383
384   !!----------------------------------------------------------------------
385   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
386   !!
387   !!   * Argument : dummy argument use in mpp_nfd_... routines
388   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
389   !!                cd_nat :   nature of array grid-points
390   !!                psgn   :   sign used across the north fold boundary
391   !!                kfld   :   optional, number of pt3d arrays
392   !!                cd_mpp :   optional, fill the overlap area only
393   !!                pval   :   optional, background value (used at closed boundaries)
394   !!----------------------------------------------------------------------
395   !
396   !                       !==  2D array and array of 2D pointer  ==!
397   !
398#  define DIM_2d
399#     define ROUTINE_NFD           mpp_nfd_2d
400#     include "mpp_nfd_generic.h90"
401#     undef ROUTINE_NFD
402#     define MULTI
403#     define ROUTINE_NFD           mpp_nfd_2d_ptr
404#     include "mpp_nfd_generic.h90"
405#     undef ROUTINE_NFD
406#     undef MULTI
407#  undef DIM_2d
408   !
409   !                       !==  3D array and array of 3D pointer  ==!
410   !
411#  define DIM_3d
412#     define ROUTINE_NFD           mpp_nfd_3d
413#     include "mpp_nfd_generic.h90"
414#     undef ROUTINE_NFD
415#     define MULTI
416#     define ROUTINE_NFD           mpp_nfd_3d_ptr
417#     include "mpp_nfd_generic.h90"
418#     undef ROUTINE_NFD
419#     undef MULTI
420#  undef DIM_3d
421   !
422   !                       !==  4D array and array of 4D pointer  ==!
423   !
424#  define DIM_4d
425#     define ROUTINE_NFD           mpp_nfd_4d
426#     include "mpp_nfd_generic.h90"
427#     undef ROUTINE_NFD
428#     define MULTI
429#     define ROUTINE_NFD           mpp_nfd_4d_ptr
430#     include "mpp_nfd_generic.h90"
431#     undef ROUTINE_NFD
432#     undef MULTI
433#  undef DIM_4d
434
435
436   !!----------------------------------------------------------------------
437   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
438   !!
439   !!   * Argument : dummy argument use in mpp_lnk_... routines
440   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
441   !!                cd_nat :   nature of array grid-points
442   !!                psgn   :   sign used across the north fold boundary
443   !!                kb_bdy :   BDY boundary set
444   !!                kfld   :   optional, number of pt3d arrays
445   !!----------------------------------------------------------------------
446   !
447   !                       !==  2D array and array of 2D pointer  ==!
448   !
449#  define DIM_2d
450#     define ROUTINE_BDY           mpp_lnk_bdy_2d
451#     include "mpp_bdy_generic.h90"
452#     undef ROUTINE_BDY
453#  undef DIM_2d
454   !
455   !                       !==  3D array and array of 3D pointer  ==!
456   !
457#  define DIM_3d
458#     define ROUTINE_BDY           mpp_lnk_bdy_3d
459#     include "mpp_bdy_generic.h90"
460#     undef ROUTINE_BDY
461#  undef DIM_3d
462   !
463   !                       !==  4D array and array of 4D pointer  ==!
464   !
465#  define DIM_4d
466#     define ROUTINE_BDY           mpp_lnk_bdy_4d
467#     include "mpp_bdy_generic.h90"
468#     undef ROUTINE_BDY
469#  undef DIM_4d
470
471   !!----------------------------------------------------------------------
472   !!
473   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
474   
475   
476   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
477   
478   
479   !!----------------------------------------------------------------------
480
481
482
483   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
484      !!----------------------------------------------------------------------
485      !!                  ***  routine mppsend  ***
486      !!
487      !! ** Purpose :   Send messag passing array
488      !!
489      !!----------------------------------------------------------------------
490      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
491      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
492      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
493      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
494      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
495      !!
496      INTEGER ::   iflag
497      !!----------------------------------------------------------------------
498      !
499      SELECT CASE ( cn_mpi_send )
500      CASE ( 'S' )                ! Standard mpi send (blocking)
501         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
502      CASE ( 'B' )                ! Buffer mpi send (blocking)
503         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
504      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
505         ! be carefull, one more argument here : the mpi request identifier..
506         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
507      END SELECT
508      !
509   END SUBROUTINE mppsend
510
511
512   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
513      !!----------------------------------------------------------------------
514      !!                  ***  routine mpprecv  ***
515      !!
516      !! ** Purpose :   Receive messag passing array
517      !!
518      !!----------------------------------------------------------------------
519      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
520      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
521      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
522      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
523      !!
524      INTEGER :: istatus(mpi_status_size)
525      INTEGER :: iflag
526      INTEGER :: use_source
527      !!----------------------------------------------------------------------
528      !
529      ! If a specific process number has been passed to the receive call,
530      ! use that one. Default is to use mpi_any_source
531      use_source = mpi_any_source
532      IF( PRESENT(ksource) )   use_source = ksource
533      !
534      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
535      !
536   END SUBROUTINE mpprecv
537
538
539   SUBROUTINE mppgather( ptab, kp, pio )
540      !!----------------------------------------------------------------------
541      !!                   ***  routine mppgather  ***
542      !!
543      !! ** Purpose :   Transfert between a local subdomain array and a work
544      !!     array which is distributed following the vertical level.
545      !!
546      !!----------------------------------------------------------------------
547      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
548      INTEGER                           , INTENT(in   ) ::   kp     ! record length
549      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
550      !!
551      INTEGER :: itaille, ierror   ! temporary integer
552      !!---------------------------------------------------------------------
553      !
554      itaille = jpi * jpj
555      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
556         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
557      !
558   END SUBROUTINE mppgather
559
560
561   SUBROUTINE mppscatter( pio, kp, ptab )
562      !!----------------------------------------------------------------------
563      !!                  ***  routine mppscatter  ***
564      !!
565      !! ** Purpose :   Transfert between awork array which is distributed
566      !!      following the vertical level and the local subdomain array.
567      !!
568      !!----------------------------------------------------------------------
569      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
570      INTEGER                             ::   kp     ! Tag (not used with MPI
571      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
572      !!
573      INTEGER :: itaille, ierror   ! temporary integer
574      !!---------------------------------------------------------------------
575      !
576      itaille = jpi * jpj
577      !
578      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
579         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
580      !
581   END SUBROUTINE mppscatter
582
583   
584   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
585     !!----------------------------------------------------------------------
586      !!                   ***  routine mpp_delay_sum  ***
587      !!
588      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
589      !!
590      !!----------------------------------------------------------------------
591      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
592      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
593      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
594      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
595      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
596      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
597      !!
598      INTEGER ::   ji, isz
599      INTEGER ::   idvar
600      INTEGER ::   ierr, ilocalcomm
601      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
602      !!----------------------------------------------------------------------
603      ilocalcomm = mpi_comm_oce
604      IF( PRESENT(kcom) )   ilocalcomm = kcom
605
606      isz = SIZE(y_in)
607     
608      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
609
610      idvar = -1
611      DO ji = 1, nbdelay
612         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
613      END DO
614      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
615
616      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
617         !                                       --------------------------
618         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
619            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
620            DEALLOCATE(todelay(idvar)%z1d)
621            ndelayid(idvar) = -1                                      ! do as if we had no restart
622         ELSE
623            ALLOCATE(todelay(idvar)%y1d(isz))
624            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
625         END IF
626      ENDIF
627     
628      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
629         !                                       --------------------------
630         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
631         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
632         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
633      ENDIF
634
635      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
636
637      ! send back pout from todelay(idvar)%z1d defined at previous call
638      pout(:) = todelay(idvar)%z1d(:)
639
640      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
641      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
642
643   END SUBROUTINE mpp_delay_sum
644
645   
646   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
647      !!----------------------------------------------------------------------
648      !!                   ***  routine mpp_delay_max  ***
649      !!
650      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
651      !!
652      !!----------------------------------------------------------------------
653      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
654      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
655      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
656      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
657      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
658      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
659      !!
660      INTEGER ::   ji, isz
661      INTEGER ::   idvar
662      INTEGER ::   ierr, ilocalcomm
663      !!----------------------------------------------------------------------
664      ilocalcomm = mpi_comm_oce
665      IF( PRESENT(kcom) )   ilocalcomm = kcom
666
667      isz = SIZE(p_in)
668
669      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
670
671      idvar = -1
672      DO ji = 1, nbdelay
673         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
674      END DO
675      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
676
677      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
678         !                                       --------------------------
679         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
680            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
681            DEALLOCATE(todelay(idvar)%z1d)
682            ndelayid(idvar) = -1                                      ! do as if we had no restart
683         END IF
684      ENDIF
685
686      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
687         !                                       --------------------------
688         ALLOCATE(todelay(idvar)%z1d(isz))
689         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
690      ENDIF
691
692      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
693
694      ! send back pout from todelay(idvar)%z1d defined at previous call
695      pout(:) = todelay(idvar)%z1d(:)
696
697      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
698      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
699
700   END SUBROUTINE mpp_delay_max
701
702   
703   SUBROUTINE mpp_delay_rcv( kid )
704      !!----------------------------------------------------------------------
705      !!                   ***  routine mpp_delay_rcv  ***
706      !!
707      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
708      !!
709      !!----------------------------------------------------------------------
710      INTEGER,INTENT(in   )      ::  kid 
711      INTEGER ::   ierr
712      !!----------------------------------------------------------------------
713      IF( ndelayid(kid) /= -2 ) THEN 
714         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
715         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
716         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
717         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
718         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
719      ENDIF
720   END SUBROUTINE mpp_delay_rcv
721
722   
723   !!----------------------------------------------------------------------
724   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
725   !!   
726   !!----------------------------------------------------------------------
727   !!
728#  define OPERATION_MAX
729#  define INTEGER_TYPE
730#  define DIM_0d
731#     define ROUTINE_ALLREDUCE           mppmax_int
732#     include "mpp_allreduce_generic.h90"
733#     undef ROUTINE_ALLREDUCE
734#  undef DIM_0d
735#  define DIM_1d
736#     define ROUTINE_ALLREDUCE           mppmax_a_int
737#     include "mpp_allreduce_generic.h90"
738#     undef ROUTINE_ALLREDUCE
739#  undef DIM_1d
740#  undef INTEGER_TYPE
741!
742#  define REAL_TYPE
743#  define DIM_0d
744#     define ROUTINE_ALLREDUCE           mppmax_real
745#     include "mpp_allreduce_generic.h90"
746#     undef ROUTINE_ALLREDUCE
747#  undef DIM_0d
748#  define DIM_1d
749#     define ROUTINE_ALLREDUCE           mppmax_a_real
750#     include "mpp_allreduce_generic.h90"
751#     undef ROUTINE_ALLREDUCE
752#  undef DIM_1d
753#  undef REAL_TYPE
754#  undef OPERATION_MAX
755   !!----------------------------------------------------------------------
756   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
757   !!   
758   !!----------------------------------------------------------------------
759   !!
760#  define OPERATION_MIN
761#  define INTEGER_TYPE
762#  define DIM_0d
763#     define ROUTINE_ALLREDUCE           mppmin_int
764#     include "mpp_allreduce_generic.h90"
765#     undef ROUTINE_ALLREDUCE
766#  undef DIM_0d
767#  define DIM_1d
768#     define ROUTINE_ALLREDUCE           mppmin_a_int
769#     include "mpp_allreduce_generic.h90"
770#     undef ROUTINE_ALLREDUCE
771#  undef DIM_1d
772#  undef INTEGER_TYPE
773!
774#  define REAL_TYPE
775#  define DIM_0d
776#     define ROUTINE_ALLREDUCE           mppmin_real
777#     include "mpp_allreduce_generic.h90"
778#     undef ROUTINE_ALLREDUCE
779#  undef DIM_0d
780#  define DIM_1d
781#     define ROUTINE_ALLREDUCE           mppmin_a_real
782#     include "mpp_allreduce_generic.h90"
783#     undef ROUTINE_ALLREDUCE
784#  undef DIM_1d
785#  undef REAL_TYPE
786#  undef OPERATION_MIN
787
788   !!----------------------------------------------------------------------
789   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
790   !!   
791   !!   Global sum of 1D array or a variable (integer, real or complex)
792   !!----------------------------------------------------------------------
793   !!
794#  define OPERATION_SUM
795#  define INTEGER_TYPE
796#  define DIM_0d
797#     define ROUTINE_ALLREDUCE           mppsum_int
798#     include "mpp_allreduce_generic.h90"
799#     undef ROUTINE_ALLREDUCE
800#  undef DIM_0d
801#  define DIM_1d
802#     define ROUTINE_ALLREDUCE           mppsum_a_int
803#     include "mpp_allreduce_generic.h90"
804#     undef ROUTINE_ALLREDUCE
805#  undef DIM_1d
806#  undef INTEGER_TYPE
807!
808#  define REAL_TYPE
809#  define DIM_0d
810#     define ROUTINE_ALLREDUCE           mppsum_real
811#     include "mpp_allreduce_generic.h90"
812#     undef ROUTINE_ALLREDUCE
813#  undef DIM_0d
814#  define DIM_1d
815#     define ROUTINE_ALLREDUCE           mppsum_a_real
816#     include "mpp_allreduce_generic.h90"
817#     undef ROUTINE_ALLREDUCE
818#  undef DIM_1d
819#  undef REAL_TYPE
820#  undef OPERATION_SUM
821
822#  define OPERATION_SUM_DD
823#  define COMPLEX_TYPE
824#  define DIM_0d
825#     define ROUTINE_ALLREDUCE           mppsum_realdd
826#     include "mpp_allreduce_generic.h90"
827#     undef ROUTINE_ALLREDUCE
828#  undef DIM_0d
829#  define DIM_1d
830#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
831#     include "mpp_allreduce_generic.h90"
832#     undef ROUTINE_ALLREDUCE
833#  undef DIM_1d
834#  undef COMPLEX_TYPE
835#  undef OPERATION_SUM_DD
836
837   !!----------------------------------------------------------------------
838   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
839   !!   
840   !!----------------------------------------------------------------------
841   !!
842#  define OPERATION_MINLOC
843#  define DIM_2d
844#     define ROUTINE_LOC           mpp_minloc2d
845#     include "mpp_loc_generic.h90"
846#     undef ROUTINE_LOC
847#  undef DIM_2d
848#  define DIM_3d
849#     define ROUTINE_LOC           mpp_minloc3d
850#     include "mpp_loc_generic.h90"
851#     undef ROUTINE_LOC
852#  undef DIM_3d
853#  undef OPERATION_MINLOC
854
855#  define OPERATION_MAXLOC
856#  define DIM_2d
857#     define ROUTINE_LOC           mpp_maxloc2d
858#     include "mpp_loc_generic.h90"
859#     undef ROUTINE_LOC
860#  undef DIM_2d
861#  define DIM_3d
862#     define ROUTINE_LOC           mpp_maxloc3d
863#     include "mpp_loc_generic.h90"
864#     undef ROUTINE_LOC
865#  undef DIM_3d
866#  undef OPERATION_MAXLOC
867
868   SUBROUTINE mppsync()
869      !!----------------------------------------------------------------------
870      !!                  ***  routine mppsync  ***
871      !!
872      !! ** Purpose :   Massively parallel processors, synchroneous
873      !!
874      !!-----------------------------------------------------------------------
875      INTEGER :: ierror
876      !!-----------------------------------------------------------------------
877      !
878      CALL mpi_barrier( mpi_comm_oce, ierror )
879      !
880   END SUBROUTINE mppsync
881
882
883   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
884      !!----------------------------------------------------------------------
885      !!                  ***  routine mppstop  ***
886      !!
887      !! ** purpose :   Stop massively parallel processors method
888      !!
889      !!----------------------------------------------------------------------
890      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
891      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
892      LOGICAL ::   llfinal, ll_force_abort
893      INTEGER ::   info
894      !!----------------------------------------------------------------------
895      llfinal = .FALSE.
896      IF( PRESENT(ldfinal) ) llfinal = ldfinal
897      ll_force_abort = .FALSE.
898      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
899      !
900      IF(ll_force_abort) THEN
901         CALL mpi_abort( MPI_COMM_WORLD )
902      ELSE
903         CALL mppsync
904         CALL mpi_finalize( info )
905      ENDIF
906      IF( .NOT. llfinal ) STOP 123456
907      !
908   END SUBROUTINE mppstop
909
910
911   SUBROUTINE mpp_comm_free( kcom )
912      !!----------------------------------------------------------------------
913      INTEGER, INTENT(in) ::   kcom
914      !!
915      INTEGER :: ierr
916      !!----------------------------------------------------------------------
917      !
918      CALL MPI_COMM_FREE(kcom, ierr)
919      !
920   END SUBROUTINE mpp_comm_free
921
922
923   SUBROUTINE mpp_ini_znl( kumout )
924      !!----------------------------------------------------------------------
925      !!               ***  routine mpp_ini_znl  ***
926      !!
927      !! ** Purpose :   Initialize special communicator for computing zonal sum
928      !!
929      !! ** Method  : - Look for processors in the same row
930      !!              - Put their number in nrank_znl
931      !!              - Create group for the znl processors
932      !!              - Create a communicator for znl processors
933      !!              - Determine if processor should write znl files
934      !!
935      !! ** output
936      !!      ndim_rank_znl = number of processors on the same row
937      !!      ngrp_znl = group ID for the znl processors
938      !!      ncomm_znl = communicator for the ice procs.
939      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
940      !!
941      !!----------------------------------------------------------------------
942      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
943      !
944      INTEGER :: jproc      ! dummy loop integer
945      INTEGER :: ierr, ii   ! local integer
946      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
947      !!----------------------------------------------------------------------
948      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
949      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
950      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
951      !
952      ALLOCATE( kwork(jpnij), STAT=ierr )
953      IF( ierr /= 0 ) THEN
954         WRITE(kumout, cform_err)
955         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
956         CALL mppstop
957      ENDIF
958
959      IF( jpnj == 1 ) THEN
960         ngrp_znl  = ngrp_world
961         ncomm_znl = mpi_comm_oce
962      ELSE
963         !
964         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
965         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
966         !-$$        CALL flush(numout)
967         !
968         ! Count number of processors on the same row
969         ndim_rank_znl = 0
970         DO jproc=1,jpnij
971            IF ( kwork(jproc) == njmpp ) THEN
972               ndim_rank_znl = ndim_rank_znl + 1
973            ENDIF
974         END DO
975         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
976         !-$$        CALL flush(numout)
977         ! Allocate the right size to nrank_znl
978         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
979         ALLOCATE(nrank_znl(ndim_rank_znl))
980         ii = 0
981         nrank_znl (:) = 0
982         DO jproc=1,jpnij
983            IF ( kwork(jproc) == njmpp) THEN
984               ii = ii + 1
985               nrank_znl(ii) = jproc -1
986            ENDIF
987         END DO
988         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
989         !-$$        CALL flush(numout)
990
991         ! Create the opa group
992         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
993         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
994         !-$$        CALL flush(numout)
995
996         ! Create the znl group from the opa group
997         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
998         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
999         !-$$        CALL flush(numout)
1000
1001         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1002         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1003         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1004         !-$$        CALL flush(numout)
1005         !
1006      END IF
1007
1008      ! Determines if processor if the first (starting from i=1) on the row
1009      IF ( jpni == 1 ) THEN
1010         l_znl_root = .TRUE.
1011      ELSE
1012         l_znl_root = .FALSE.
1013         kwork (1) = nimpp
1014         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1015         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1016      END IF
1017
1018      DEALLOCATE(kwork)
1019
1020   END SUBROUTINE mpp_ini_znl
1021
1022
1023   SUBROUTINE mpp_ini_north
1024      !!----------------------------------------------------------------------
1025      !!               ***  routine mpp_ini_north  ***
1026      !!
1027      !! ** Purpose :   Initialize special communicator for north folding
1028      !!      condition together with global variables needed in the mpp folding
1029      !!
1030      !! ** Method  : - Look for northern processors
1031      !!              - Put their number in nrank_north
1032      !!              - Create groups for the world processors and the north processors
1033      !!              - Create a communicator for northern processors
1034      !!
1035      !! ** output
1036      !!      njmppmax = njmpp for northern procs
1037      !!      ndim_rank_north = number of processors in the northern line
1038      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1039      !!      ngrp_world = group ID for the world processors
1040      !!      ngrp_north = group ID for the northern processors
1041      !!      ncomm_north = communicator for the northern procs.
1042      !!      north_root = number (in the world) of proc 0 in the northern comm.
1043      !!
1044      !!----------------------------------------------------------------------
1045      INTEGER ::   ierr
1046      INTEGER ::   jjproc
1047      INTEGER ::   ii, ji
1048      !!----------------------------------------------------------------------
1049      !
1050      njmppmax = MAXVAL( njmppt )
1051      !
1052      ! Look for how many procs on the northern boundary
1053      ndim_rank_north = 0
1054      DO jjproc = 1, jpnij
1055         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1056      END DO
1057      !
1058      ! Allocate the right size to nrank_north
1059      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1060      ALLOCATE( nrank_north(ndim_rank_north) )
1061
1062      ! Fill the nrank_north array with proc. number of northern procs.
1063      ! Note : the rank start at 0 in MPI
1064      ii = 0
1065      DO ji = 1, jpnij
1066         IF ( njmppt(ji) == njmppmax   ) THEN
1067            ii=ii+1
1068            nrank_north(ii)=ji-1
1069         END IF
1070      END DO
1071      !
1072      ! create the world group
1073      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1074      !
1075      ! Create the North group from the world group
1076      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1077      !
1078      ! Create the North communicator , ie the pool of procs in the north group
1079      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1080      !
1081   END SUBROUTINE mpp_ini_north
1082
1083
1084   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1085      !!---------------------------------------------------------------------
1086      !!                   ***  routine mpp_init.opa  ***
1087      !!
1088      !! ** Purpose :: export and attach a MPI buffer for bsend
1089      !!
1090      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1091      !!            but classical mpi_init
1092      !!
1093      !! History :: 01/11 :: IDRIS initial version for IBM only
1094      !!            08/04 :: R. Benshila, generalisation
1095      !!---------------------------------------------------------------------
1096      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1097      INTEGER                      , INTENT(inout) ::   ksft
1098      INTEGER                      , INTENT(  out) ::   code
1099      INTEGER                                      ::   ierr, ji
1100      LOGICAL                                      ::   mpi_was_called
1101      !!---------------------------------------------------------------------
1102      !
1103      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1104      IF ( code /= MPI_SUCCESS ) THEN
1105         DO ji = 1, SIZE(ldtxt)
1106            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1107         END DO
1108         WRITE(*, cform_err)
1109         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1110         CALL mpi_abort( mpi_comm_world, code, ierr )
1111      ENDIF
1112      !
1113      IF( .NOT. mpi_was_called ) THEN
1114         CALL mpi_init( code )
1115         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1116         IF ( code /= MPI_SUCCESS ) THEN
1117            DO ji = 1, SIZE(ldtxt)
1118               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1119            END DO
1120            WRITE(*, cform_err)
1121            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1122            CALL mpi_abort( mpi_comm_world, code, ierr )
1123         ENDIF
1124      ENDIF
1125      !
1126      IF( nn_buffer > 0 ) THEN
1127         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1128         ! Buffer allocation and attachment
1129         ALLOCATE( tampon(nn_buffer), stat = ierr )
1130         IF( ierr /= 0 ) THEN
1131            DO ji = 1, SIZE(ldtxt)
1132               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1133            END DO
1134            WRITE(*, cform_err)
1135            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1136            CALL mpi_abort( mpi_comm_world, code, ierr )
1137         END IF
1138         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1139      ENDIF
1140      !
1141   END SUBROUTINE mpi_init_oce
1142
1143
1144   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1145      !!---------------------------------------------------------------------
1146      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1147      !!
1148      !!   Modification of original codes written by David H. Bailey
1149      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1150      !!---------------------------------------------------------------------
1151      INTEGER                     , INTENT(in)    ::   ilen, itype
1152      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1153      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1154      !
1155      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1156      INTEGER  :: ji, ztmp           ! local scalar
1157      !!---------------------------------------------------------------------
1158      !
1159      ztmp = itype   ! avoid compilation warning
1160      !
1161      DO ji=1,ilen
1162      ! Compute ydda + yddb using Knuth's trick.
1163         zt1  = real(ydda(ji)) + real(yddb(ji))
1164         zerr = zt1 - real(ydda(ji))
1165         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1166                + aimag(ydda(ji)) + aimag(yddb(ji))
1167
1168         ! The result is zt1 + zt2, after normalization.
1169         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1170      END DO
1171      !
1172   END SUBROUTINE DDPDD_MPI
1173
1174
1175   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1176      !!---------------------------------------------------------------------
1177      !!                   ***  routine mpp_lbc_north_icb  ***
1178      !!
1179      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1180      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1181      !!              array with outer extra halo
1182      !!
1183      !! ** Method  :   North fold condition and mpp with more than one proc
1184      !!              in i-direction require a specific treatment. We gather
1185      !!              the 4+kextj northern lines of the global domain on 1
1186      !!              processor and apply lbc north-fold on this sub array.
1187      !!              Then we scatter the north fold array back to the processors.
1188      !!              This routine accounts for an extra halo with icebergs
1189      !!              and assumes ghost rows and columns have been suppressed.
1190      !!
1191      !!----------------------------------------------------------------------
1192      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1193      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1194      !                                                     !   = T ,  U , V , F or W -points
1195      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1196      !!                                                    ! north fold, =  1. otherwise
1197      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1198      !
1199      INTEGER ::   ji, jj, jr
1200      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1201      INTEGER ::   ipj, ij, iproc
1202      !
1203      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1204      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1205      !!----------------------------------------------------------------------
1206      !
1207      ipj=4
1208      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1209     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1210     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1211      !
1212      ztab_e(:,:)      = 0._wp
1213      znorthloc_e(:,:) = 0._wp
1214      !
1215      ij = 1 - kextj
1216      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1217      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1218         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1219         ij = ij + 1
1220      END DO
1221      !
1222      itaille = jpimax * ( ipj + 2*kextj )
1223      !
1224      IF( ln_timing ) CALL tic_tac(.TRUE.)
1225      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1226         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1227         &                ncomm_north, ierr )
1228      !
1229      IF( ln_timing ) CALL tic_tac(.FALSE.)
1230      !
1231      DO jr = 1, ndim_rank_north            ! recover the global north array
1232         iproc = nrank_north(jr) + 1
1233         ildi = nldit (iproc)
1234         ilei = nleit (iproc)
1235         iilb = nimppt(iproc)
1236         DO jj = 1-kextj, ipj+kextj
1237            DO ji = ildi, ilei
1238               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1239            END DO
1240         END DO
1241      END DO
1242
1243      ! 2. North-Fold boundary conditions
1244      ! ----------------------------------
1245      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1246
1247      ij = 1 - kextj
1248      !! Scatter back to pt2d
1249      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1250         DO ji= 1, jpi
1251            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1252         END DO
1253         ij  = ij +1
1254      END DO
1255      !
1256      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1257      !
1258   END SUBROUTINE mpp_lbc_north_icb
1259
1260
1261   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1262      !!----------------------------------------------------------------------
1263      !!                  ***  routine mpp_lnk_2d_icb  ***
1264      !!
1265      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1266      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1267      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1268      !!
1269      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1270      !!      between processors following neighboring subdomains.
1271      !!            domain parameters
1272      !!                    jpi    : first dimension of the local subdomain
1273      !!                    jpj    : second dimension of the local subdomain
1274      !!                    kexti  : number of columns for extra outer halo
1275      !!                    kextj  : number of rows for extra outer halo
1276      !!                    nbondi : mark for "east-west local boundary"
1277      !!                    nbondj : mark for "north-south local boundary"
1278      !!                    noea   : number for local neighboring processors
1279      !!                    nowe   : number for local neighboring processors
1280      !!                    noso   : number for local neighboring processors
1281      !!                    nono   : number for local neighboring processors
1282      !!----------------------------------------------------------------------
1283      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1284      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1285      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1286      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1287      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1288      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1289      !
1290      INTEGER  ::   jl   ! dummy loop indices
1291      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1292      INTEGER  ::   ipreci, iprecj             !   -       -
1293      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1294      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1295      !!
1296      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1297      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1298      !!----------------------------------------------------------------------
1299
1300      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1301      iprecj = nn_hls + kextj
1302
1303      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1304
1305      ! 1. standard boundary treatment
1306      ! ------------------------------
1307      ! Order matters Here !!!!
1308      !
1309      !                                      ! East-West boundaries
1310      !                                           !* Cyclic east-west
1311      IF( l_Iperio ) THEN
1312         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1313         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1314         !
1315      ELSE                                        !* closed
1316         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1317                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1318      ENDIF
1319      !                                      ! North-South boundaries
1320      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1321         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1322         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1323      ELSE                                        !* closed
1324         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1325                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1326      ENDIF
1327      !
1328
1329      ! north fold treatment
1330      ! -----------------------
1331      IF( npolj /= 0 ) THEN
1332         !
1333         SELECT CASE ( jpni )
1334                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1335                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1336         END SELECT
1337         !
1338      ENDIF
1339
1340      ! 2. East and west directions exchange
1341      ! ------------------------------------
1342      ! we play with the neigbours AND the row number because of the periodicity
1343      !
1344      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1345      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1346         iihom = jpi-nreci-kexti
1347         DO jl = 1, ipreci
1348            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1349            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1350         END DO
1351      END SELECT
1352      !
1353      !                           ! Migrations
1354      imigr = ipreci * ( jpj + 2*kextj )
1355      !
1356      IF( ln_timing ) CALL tic_tac(.TRUE.)
1357      !
1358      SELECT CASE ( nbondi )
1359      CASE ( -1 )
1360         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1361         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1362         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1363      CASE ( 0 )
1364         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1365         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1366         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1367         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1368         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1369         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1370      CASE ( 1 )
1371         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1372         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1373         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1374      END SELECT
1375      !
1376      IF( ln_timing ) CALL tic_tac(.FALSE.)
1377      !
1378      !                           ! Write Dirichlet lateral conditions
1379      iihom = jpi - nn_hls
1380      !
1381      SELECT CASE ( nbondi )
1382      CASE ( -1 )
1383         DO jl = 1, ipreci
1384            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1385         END DO
1386      CASE ( 0 )
1387         DO jl = 1, ipreci
1388            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1389            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1390         END DO
1391      CASE ( 1 )
1392         DO jl = 1, ipreci
1393            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1394         END DO
1395      END SELECT
1396
1397
1398      ! 3. North and south directions
1399      ! -----------------------------
1400      ! always closed : we play only with the neigbours
1401      !
1402      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1403         ijhom = jpj-nrecj-kextj
1404         DO jl = 1, iprecj
1405            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1406            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1407         END DO
1408      ENDIF
1409      !
1410      !                           ! Migrations
1411      imigr = iprecj * ( jpi + 2*kexti )
1412      !
1413      IF( ln_timing ) CALL tic_tac(.TRUE.)
1414      !
1415      SELECT CASE ( nbondj )
1416      CASE ( -1 )
1417         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1418         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1419         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1420      CASE ( 0 )
1421         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1422         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1423         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1424         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1425         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1426         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1427      CASE ( 1 )
1428         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1429         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1430         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1431      END SELECT
1432      !
1433      IF( ln_timing ) CALL tic_tac(.FALSE.)
1434      !
1435      !                           ! Write Dirichlet lateral conditions
1436      ijhom = jpj - nn_hls
1437      !
1438      SELECT CASE ( nbondj )
1439      CASE ( -1 )
1440         DO jl = 1, iprecj
1441            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1442         END DO
1443      CASE ( 0 )
1444         DO jl = 1, iprecj
1445            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1446            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1447         END DO
1448      CASE ( 1 )
1449         DO jl = 1, iprecj
1450            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1451         END DO
1452      END SELECT
1453      !
1454   END SUBROUTINE mpp_lnk_2d_icb
1455
1456
1457   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1458      !!----------------------------------------------------------------------
1459      !!                  ***  routine mpp_report  ***
1460      !!
1461      !! ** Purpose :   report use of mpp routines per time-setp
1462      !!
1463      !!----------------------------------------------------------------------
1464      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1465      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1466      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1467      !!
1468      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1469      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1470      !!----------------------------------------------------------------------
1471      !
1472      ll_lbc = .FALSE.
1473      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1474      ll_glb = .FALSE.
1475      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1476      ll_dlg = .FALSE.
1477      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1478      !
1479      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1480      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1481      ncom_freq = ncom_fsbc
1482      !
1483      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1484         IF( ll_lbc ) THEN
1485            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1486            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1487            n_sequence_lbc = n_sequence_lbc + 1
1488            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1489            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1490            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1491            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1492         ENDIF
1493         IF( ll_glb ) THEN
1494            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1495            n_sequence_glb = n_sequence_glb + 1
1496            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1497            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1498         ENDIF
1499         IF( ll_dlg ) THEN
1500            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1501            n_sequence_dlg = n_sequence_dlg + 1
1502            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1503            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1504         ENDIF
1505      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1506         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1507         WRITE(numcom,*) ' '
1508         WRITE(numcom,*) ' ------------------------------------------------------------'
1509         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1510         WRITE(numcom,*) ' ------------------------------------------------------------'
1511         WRITE(numcom,*) ' '
1512         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1513         jj = 0; jk = 0; jf = 0; jh = 0
1514         DO ji = 1, n_sequence_lbc
1515            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1516            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1517            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1518            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1519         END DO
1520         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1521         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1522         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1523         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1524         WRITE(numcom,*) ' '
1525         WRITE(numcom,*) ' lbc_lnk called'
1526         jj = 1
1527         DO ji = 2, n_sequence_lbc
1528            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1529               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1530               jj = 0
1531            END IF
1532            jj = jj + 1 
1533         END DO
1534         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1535         WRITE(numcom,*) ' '
1536         IF ( n_sequence_glb > 0 ) THEN
1537            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1538            jj = 1
1539            DO ji = 2, n_sequence_glb
1540               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1541                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1542                  jj = 0
1543               END IF
1544               jj = jj + 1 
1545            END DO
1546            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1547            DEALLOCATE(crname_glb)
1548         ELSE
1549            WRITE(numcom,*) ' No MPI global communication '
1550         ENDIF
1551         WRITE(numcom,*) ' '
1552         IF ( n_sequence_dlg > 0 ) THEN
1553            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1554            jj = 1
1555            DO ji = 2, n_sequence_dlg
1556               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1557                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1558                  jj = 0
1559               END IF
1560               jj = jj + 1 
1561            END DO
1562            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1563            DEALLOCATE(crname_dlg)
1564         ELSE
1565            WRITE(numcom,*) ' No MPI delayed global communication '
1566         ENDIF
1567         WRITE(numcom,*) ' '
1568         WRITE(numcom,*) ' -----------------------------------------------'
1569         WRITE(numcom,*) ' '
1570         DEALLOCATE(ncomm_sequence)
1571         DEALLOCATE(crname_lbc)
1572      ENDIF
1573   END SUBROUTINE mpp_report
1574
1575   
1576   SUBROUTINE tic_tac (ld_tic, ld_global)
1577
1578    LOGICAL,           INTENT(IN) :: ld_tic
1579    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1580    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1581    REAL(wp),               SAVE :: tic_ct = 0._wp
1582    INTEGER :: ii
1583
1584    IF( ncom_stp <= nit000 ) RETURN
1585    IF( ncom_stp == nitend ) RETURN
1586    ii = 1
1587    IF( PRESENT( ld_global ) ) THEN
1588       IF( ld_global ) ii = 2
1589    END IF
1590   
1591    IF ( ld_tic ) THEN
1592       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1593       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1594    ELSE
1595       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1596       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1597    ENDIF
1598   
1599   END SUBROUTINE tic_tac
1600
1601   
1602#else
1603   !!----------------------------------------------------------------------
1604   !!   Default case:            Dummy module        share memory computing
1605   !!----------------------------------------------------------------------
1606   USE in_out_manager
1607
1608   INTERFACE mpp_sum
1609      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1610   END INTERFACE
1611   INTERFACE mpp_max
1612      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1613   END INTERFACE
1614   INTERFACE mpp_min
1615      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1616   END INTERFACE
1617   INTERFACE mpp_minloc
1618      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1619   END INTERFACE
1620   INTERFACE mpp_maxloc
1621      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1622   END INTERFACE
1623
1624   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1625   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1626   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1627
1628   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1629   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1630   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1631   TYPE ::   DELAYARR
1632      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1633      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1634   END TYPE DELAYARR
1635   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1636   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1637   !!----------------------------------------------------------------------
1638CONTAINS
1639
1640   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1641      INTEGER, INTENT(in) ::   kumout
1642      lib_mpp_alloc = 0
1643   END FUNCTION lib_mpp_alloc
1644
1645   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1646      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1647      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1648      CHARACTER(len=*) ::   ldname
1649      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1650      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1651      function_value = 0
1652      IF( .FALSE. )   ldtxt(:) = 'never done'
1653      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1654   END FUNCTION mynode
1655
1656   SUBROUTINE mppsync                       ! Dummy routine
1657   END SUBROUTINE mppsync
1658
1659   !!----------------------------------------------------------------------
1660   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1661   !!   
1662   !!----------------------------------------------------------------------
1663   !!
1664#  define OPERATION_MAX
1665#  define INTEGER_TYPE
1666#  define DIM_0d
1667#     define ROUTINE_ALLREDUCE           mppmax_int
1668#     include "mpp_allreduce_generic.h90"
1669#     undef ROUTINE_ALLREDUCE
1670#  undef DIM_0d
1671#  define DIM_1d
1672#     define ROUTINE_ALLREDUCE           mppmax_a_int
1673#     include "mpp_allreduce_generic.h90"
1674#     undef ROUTINE_ALLREDUCE
1675#  undef DIM_1d
1676#  undef INTEGER_TYPE
1677!
1678#  define REAL_TYPE
1679#  define DIM_0d
1680#     define ROUTINE_ALLREDUCE           mppmax_real
1681#     include "mpp_allreduce_generic.h90"
1682#     undef ROUTINE_ALLREDUCE
1683#  undef DIM_0d
1684#  define DIM_1d
1685#     define ROUTINE_ALLREDUCE           mppmax_a_real
1686#     include "mpp_allreduce_generic.h90"
1687#     undef ROUTINE_ALLREDUCE
1688#  undef DIM_1d
1689#  undef REAL_TYPE
1690#  undef OPERATION_MAX
1691   !!----------------------------------------------------------------------
1692   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1693   !!   
1694   !!----------------------------------------------------------------------
1695   !!
1696#  define OPERATION_MIN
1697#  define INTEGER_TYPE
1698#  define DIM_0d
1699#     define ROUTINE_ALLREDUCE           mppmin_int
1700#     include "mpp_allreduce_generic.h90"
1701#     undef ROUTINE_ALLREDUCE
1702#  undef DIM_0d
1703#  define DIM_1d
1704#     define ROUTINE_ALLREDUCE           mppmin_a_int
1705#     include "mpp_allreduce_generic.h90"
1706#     undef ROUTINE_ALLREDUCE
1707#  undef DIM_1d
1708#  undef INTEGER_TYPE
1709!
1710#  define REAL_TYPE
1711#  define DIM_0d
1712#     define ROUTINE_ALLREDUCE           mppmin_real
1713#     include "mpp_allreduce_generic.h90"
1714#     undef ROUTINE_ALLREDUCE
1715#  undef DIM_0d
1716#  define DIM_1d
1717#     define ROUTINE_ALLREDUCE           mppmin_a_real
1718#     include "mpp_allreduce_generic.h90"
1719#     undef ROUTINE_ALLREDUCE
1720#  undef DIM_1d
1721#  undef REAL_TYPE
1722#  undef OPERATION_MIN
1723
1724   !!----------------------------------------------------------------------
1725   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1726   !!   
1727   !!   Global sum of 1D array or a variable (integer, real or complex)
1728   !!----------------------------------------------------------------------
1729   !!
1730#  define OPERATION_SUM
1731#  define INTEGER_TYPE
1732#  define DIM_0d
1733#     define ROUTINE_ALLREDUCE           mppsum_int
1734#     include "mpp_allreduce_generic.h90"
1735#     undef ROUTINE_ALLREDUCE
1736#  undef DIM_0d
1737#  define DIM_1d
1738#     define ROUTINE_ALLREDUCE           mppsum_a_int
1739#     include "mpp_allreduce_generic.h90"
1740#     undef ROUTINE_ALLREDUCE
1741#  undef DIM_1d
1742#  undef INTEGER_TYPE
1743!
1744#  define REAL_TYPE
1745#  define DIM_0d
1746#     define ROUTINE_ALLREDUCE           mppsum_real
1747#     include "mpp_allreduce_generic.h90"
1748#     undef ROUTINE_ALLREDUCE
1749#  undef DIM_0d
1750#  define DIM_1d
1751#     define ROUTINE_ALLREDUCE           mppsum_a_real
1752#     include "mpp_allreduce_generic.h90"
1753#     undef ROUTINE_ALLREDUCE
1754#  undef DIM_1d
1755#  undef REAL_TYPE
1756#  undef OPERATION_SUM
1757
1758#  define OPERATION_SUM_DD
1759#  define COMPLEX_TYPE
1760#  define DIM_0d
1761#     define ROUTINE_ALLREDUCE           mppsum_realdd
1762#     include "mpp_allreduce_generic.h90"
1763#     undef ROUTINE_ALLREDUCE
1764#  undef DIM_0d
1765#  define DIM_1d
1766#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1767#     include "mpp_allreduce_generic.h90"
1768#     undef ROUTINE_ALLREDUCE
1769#  undef DIM_1d
1770#  undef COMPLEX_TYPE
1771#  undef OPERATION_SUM_DD
1772
1773   !!----------------------------------------------------------------------
1774   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1775   !!   
1776   !!----------------------------------------------------------------------
1777   !!
1778#  define OPERATION_MINLOC
1779#  define DIM_2d
1780#     define ROUTINE_LOC           mpp_minloc2d
1781#     include "mpp_loc_generic.h90"
1782#     undef ROUTINE_LOC
1783#  undef DIM_2d
1784#  define DIM_3d
1785#     define ROUTINE_LOC           mpp_minloc3d
1786#     include "mpp_loc_generic.h90"
1787#     undef ROUTINE_LOC
1788#  undef DIM_3d
1789#  undef OPERATION_MINLOC
1790
1791#  define OPERATION_MAXLOC
1792#  define DIM_2d
1793#     define ROUTINE_LOC           mpp_maxloc2d
1794#     include "mpp_loc_generic.h90"
1795#     undef ROUTINE_LOC
1796#  undef DIM_2d
1797#  define DIM_3d
1798#     define ROUTINE_LOC           mpp_maxloc3d
1799#     include "mpp_loc_generic.h90"
1800#     undef ROUTINE_LOC
1801#  undef DIM_3d
1802#  undef OPERATION_MAXLOC
1803
1804   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1805      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1806      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1807      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1808      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1809      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1810      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1811      !
1812      pout(:) = REAL(y_in(:), wp)
1813   END SUBROUTINE mpp_delay_sum
1814
1815   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1816      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1817      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1818      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1819      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1820      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1821      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1822      !
1823      pout(:) = p_in(:)
1824   END SUBROUTINE mpp_delay_max
1825
1826   SUBROUTINE mpp_delay_rcv( kid )
1827      INTEGER,INTENT(in   )      ::  kid 
1828      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1829   END SUBROUTINE mpp_delay_rcv
1830   
1831   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1832      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1833      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1834      STOP      ! non MPP case, just stop the run
1835   END SUBROUTINE mppstop
1836
1837   SUBROUTINE mpp_ini_znl( knum )
1838      INTEGER :: knum
1839      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1840   END SUBROUTINE mpp_ini_znl
1841
1842   SUBROUTINE mpp_comm_free( kcom )
1843      INTEGER :: kcom
1844      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1845   END SUBROUTINE mpp_comm_free
1846   
1847#endif
1848
1849   !!----------------------------------------------------------------------
1850   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1851   !!----------------------------------------------------------------------
1852
1853   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1854      &                 cd6, cd7, cd8, cd9, cd10 )
1855      !!----------------------------------------------------------------------
1856      !!                  ***  ROUTINE  stop_opa  ***
1857      !!
1858      !! ** Purpose :   print in ocean.outpput file a error message and
1859      !!                increment the error number (nstop) by one.
1860      !!----------------------------------------------------------------------
1861      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1862      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1863      !!----------------------------------------------------------------------
1864      !
1865      nstop = nstop + 1
1866
1867      ! force to open ocean.output file
1868      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1869       
1870      WRITE(numout,cform_err)
1871      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1872      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1873      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1874      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1875      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1876      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1877      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1878      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1879      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1880      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1881
1882                               CALL FLUSH(numout    )
1883      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1884      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1885      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1886      !
1887      IF( cd1 == 'STOP' ) THEN
1888         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1889         CALL mppstop(ld_force_abort = .true.)
1890      ENDIF
1891      !
1892   END SUBROUTINE ctl_stop
1893
1894
1895   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1896      &                 cd6, cd7, cd8, cd9, cd10 )
1897      !!----------------------------------------------------------------------
1898      !!                  ***  ROUTINE  stop_warn  ***
1899      !!
1900      !! ** Purpose :   print in ocean.outpput file a error message and
1901      !!                increment the warning number (nwarn) by one.
1902      !!----------------------------------------------------------------------
1903      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1904      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1905      !!----------------------------------------------------------------------
1906      !
1907      nwarn = nwarn + 1
1908      IF(lwp) THEN
1909         WRITE(numout,cform_war)
1910         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1911         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1912         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1913         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1914         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1915         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1916         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1917         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1918         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1919         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1920      ENDIF
1921      CALL FLUSH(numout)
1922      !
1923   END SUBROUTINE ctl_warn
1924
1925
1926   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1927      !!----------------------------------------------------------------------
1928      !!                  ***  ROUTINE ctl_opn  ***
1929      !!
1930      !! ** Purpose :   Open file and check if required file is available.
1931      !!
1932      !! ** Method  :   Fortan open
1933      !!----------------------------------------------------------------------
1934      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1935      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1936      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1937      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1938      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1939      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1940      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1941      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1942      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1943      !
1944      CHARACTER(len=80) ::   clfile
1945      INTEGER           ::   iost
1946      !!----------------------------------------------------------------------
1947      !
1948      ! adapt filename
1949      ! ----------------
1950      clfile = TRIM(cdfile)
1951      IF( PRESENT( karea ) ) THEN
1952         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1953      ENDIF
1954#if defined key_agrif
1955      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1956      knum=Agrif_Get_Unit()
1957#else
1958      knum=get_unit()
1959#endif
1960      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1961      !
1962      iost=0
1963      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1964         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1965      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1966         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1967      ELSE
1968         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1969      ENDIF
1970      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1971         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1972      IF( iost == 0 ) THEN
1973         IF(ldwp) THEN
1974            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1975            WRITE(kout,*) '     unit   = ', knum
1976            WRITE(kout,*) '     status = ', cdstat
1977            WRITE(kout,*) '     form   = ', cdform
1978            WRITE(kout,*) '     access = ', cdacce
1979            WRITE(kout,*)
1980         ENDIF
1981      ENDIF
1982100   CONTINUE
1983      IF( iost /= 0 ) THEN
1984         IF(ldwp) THEN
1985            WRITE(kout,*)
1986            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1987            WRITE(kout,*) ' =======   ===  '
1988            WRITE(kout,*) '           unit   = ', knum
1989            WRITE(kout,*) '           status = ', cdstat
1990            WRITE(kout,*) '           form   = ', cdform
1991            WRITE(kout,*) '           access = ', cdacce
1992            WRITE(kout,*) '           iostat = ', iost
1993            WRITE(kout,*) '           we stop. verify the file '
1994            WRITE(kout,*)
1995         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1996            WRITE(*,*)
1997            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1998            WRITE(*,*) ' =======   ===  '
1999            WRITE(*,*) '           unit   = ', knum
2000            WRITE(*,*) '           status = ', cdstat
2001            WRITE(*,*) '           form   = ', cdform
2002            WRITE(*,*) '           access = ', cdacce
2003            WRITE(*,*) '           iostat = ', iost
2004            WRITE(*,*) '           we stop. verify the file '
2005            WRITE(*,*)
2006         ENDIF
2007         CALL FLUSH( kout ) 
2008         STOP 'ctl_opn bad opening'
2009      ENDIF
2010      !
2011   END SUBROUTINE ctl_opn
2012
2013
2014   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
2015      !!----------------------------------------------------------------------
2016      !!                  ***  ROUTINE ctl_nam  ***
2017      !!
2018      !! ** Purpose :   Informations when error while reading a namelist
2019      !!
2020      !! ** Method  :   Fortan open
2021      !!----------------------------------------------------------------------
2022      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
2023      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
2024      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
2025      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
2026      !!----------------------------------------------------------------------
2027      !
2028      WRITE (clios, '(I5.0)')   kios
2029      IF( kios < 0 ) THEN         
2030         CALL ctl_warn( 'end of record or file while reading namelist '   &
2031            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2032      ENDIF
2033      !
2034      IF( kios > 0 ) THEN
2035         CALL ctl_stop( 'misspelled variable in namelist '   &
2036            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2037      ENDIF
2038      kios = 0
2039      RETURN
2040      !
2041   END SUBROUTINE ctl_nam
2042
2043
2044   INTEGER FUNCTION get_unit()
2045      !!----------------------------------------------------------------------
2046      !!                  ***  FUNCTION  get_unit  ***
2047      !!
2048      !! ** Purpose :   return the index of an unused logical unit
2049      !!----------------------------------------------------------------------
2050      LOGICAL :: llopn
2051      !!----------------------------------------------------------------------
2052      !
2053      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2054      llopn = .TRUE.
2055      DO WHILE( (get_unit < 998) .AND. llopn )
2056         get_unit = get_unit + 1
2057         INQUIRE( unit = get_unit, opened = llopn )
2058      END DO
2059      IF( (get_unit == 999) .AND. llopn ) THEN
2060         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2061         get_unit = -1
2062      ENDIF
2063      !
2064   END FUNCTION get_unit
2065
2066   !!----------------------------------------------------------------------
2067END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.