source: NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC/lib_mpp.F90 @ 10407

Last change on this file since 10407 was 10407, checked in by smasson, 21 months ago

dev_r10164_HPC09_ESIWACE_PREP_MERGE: send back error code, see #2181

  • Property svn:keywords set to Id
File size: 88.0 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
85   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
86   PUBLIC   mppscatter, mppgather
87   PUBLIC   mpp_ini_znl
88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
90   
91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
100   END INTERFACE
101   INTERFACE mpp_sum
102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
103         &             mppsum_realdd, mppsum_a_realdd
104   END INTERFACE
105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
111
112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
115!$AGRIF_DO_NOT_TREAT
116   INCLUDE 'mpif.h'
117!$AGRIF_END_DO_NOT_TREAT
118
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Type of send : standard, buffered, immediate
149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
152
153   ! Communications summary report
154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm  calling routines
156   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
157   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
158   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 2000          !: max number of communication record
162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
165   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
166   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
167   !: name (used as id) of allreduce-delayed operations
168   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb' /)
169   !: component name where the allreduce-delayed operation is performed
170   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
171   TYPE, PUBLIC ::   DELAYARR
172      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
173      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
174   END TYPE DELAYARR
175   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC  ::   todelay             
176   INTEGER,          DIMENSION(nbdelay), PUBLIC  ::   ndelayid = -1     !: mpi request id of the delayed operations
177
178   ! timing summary report
179   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
180   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
181   
182   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
183
184   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
185   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
186
187   !!----------------------------------------------------------------------
188   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
189   !! $Id$
190   !! Software governed by the CeCILL license (see ./LICENSE)
191   !!----------------------------------------------------------------------
192CONTAINS
193
194   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
195      !!----------------------------------------------------------------------
196      !!                  ***  routine mynode  ***
197      !!
198      !! ** Purpose :   Find processor unit
199      !!----------------------------------------------------------------------
200      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
201      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
202      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
203      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
204      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
205      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
206      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
207      !
208      INTEGER ::   mynode, ierr, code, ji, ii, ios
209      LOGICAL ::   mpi_was_called
210      !
211      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
212      !!----------------------------------------------------------------------
213      !
214      ii = 1
215      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
216      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
217      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
218      !
219      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
220      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
221901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
222      !
223      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
224      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
225902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
226      !
227      !                              ! control print
228      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
229      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
230      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
231      !
232      IF( jpni < 1 .OR. jpnj < 1  ) THEN
233         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
234      ELSE
235         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
236         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
237      ENDIF
238
239      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
240
241      CALL mpi_initialized ( mpi_was_called, code )
242      IF( code /= MPI_SUCCESS ) THEN
243         DO ji = 1, SIZE(ldtxt)
244            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
245         END DO
246         WRITE(*, cform_err)
247         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
248         CALL mpi_abort( mpi_comm_world, code, ierr )
249      ENDIF
250
251      IF( mpi_was_called ) THEN
252         !
253         SELECT CASE ( cn_mpi_send )
254         CASE ( 'S' )                ! Standard mpi send (blocking)
255            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
256         CASE ( 'B' )                ! Buffer mpi send (blocking)
257            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
258            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
259         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
260            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
261            l_isend = .TRUE.
262         CASE DEFAULT
263            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
264            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
265            kstop = kstop + 1
266         END SELECT
267         !
268      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
269         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
270         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
271         kstop = kstop + 1
272      ELSE
273         SELECT CASE ( cn_mpi_send )
274         CASE ( 'S' )                ! Standard mpi send (blocking)
275            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
276            CALL mpi_init( ierr )
277         CASE ( 'B' )                ! Buffer mpi send (blocking)
278            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
279            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
280         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
281            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
282            l_isend = .TRUE.
283            CALL mpi_init( ierr )
284         CASE DEFAULT
285            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
286            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
287            kstop = kstop + 1
288         END SELECT
289         !
290      ENDIF
291
292      IF( PRESENT(localComm) ) THEN
293         IF( Agrif_Root() ) THEN
294            mpi_comm_oce = localComm
295         ENDIF
296      ELSE
297         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
298         IF( code /= MPI_SUCCESS ) THEN
299            DO ji = 1, SIZE(ldtxt)
300               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
301            END DO
302            WRITE(*, cform_err)
303            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
304            CALL mpi_abort( mpi_comm_world, code, ierr )
305         ENDIF
306      ENDIF
307
308#if defined key_agrif
309      IF( Agrif_Root() ) THEN
310         CALL Agrif_MPI_Init(mpi_comm_oce)
311      ELSE
312         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
313      ENDIF
314#endif
315
316      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
317      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
318      mynode = mpprank
319
320      IF( mynode == 0 ) THEN
321         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
322         WRITE(kumond, nammpp)     
323      ENDIF
324      !
325      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
326      !
327   END FUNCTION mynode
328
329   !!----------------------------------------------------------------------
330   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
331   !!
332   !!   * Argument : dummy argument use in mpp_lnk_... routines
333   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
334   !!                cd_nat :   nature of array grid-points
335   !!                psgn   :   sign used across the north fold boundary
336   !!                kfld   :   optional, number of pt3d arrays
337   !!                cd_mpp :   optional, fill the overlap area only
338   !!                pval   :   optional, background value (used at closed boundaries)
339   !!----------------------------------------------------------------------
340   !
341   !                       !==  2D array and array of 2D pointer  ==!
342   !
343#  define DIM_2d
344#     define ROUTINE_LNK           mpp_lnk_2d
345#     include "mpp_lnk_generic.h90"
346#     undef ROUTINE_LNK
347#     define MULTI
348#     define ROUTINE_LNK           mpp_lnk_2d_ptr
349#     include "mpp_lnk_generic.h90"
350#     undef ROUTINE_LNK
351#     undef MULTI
352#  undef DIM_2d
353   !
354   !                       !==  3D array and array of 3D pointer  ==!
355   !
356#  define DIM_3d
357#     define ROUTINE_LNK           mpp_lnk_3d
358#     include "mpp_lnk_generic.h90"
359#     undef ROUTINE_LNK
360#     define MULTI
361#     define ROUTINE_LNK           mpp_lnk_3d_ptr
362#     include "mpp_lnk_generic.h90"
363#     undef ROUTINE_LNK
364#     undef MULTI
365#  undef DIM_3d
366   !
367   !                       !==  4D array and array of 4D pointer  ==!
368   !
369#  define DIM_4d
370#     define ROUTINE_LNK           mpp_lnk_4d
371#     include "mpp_lnk_generic.h90"
372#     undef ROUTINE_LNK
373#     define MULTI
374#     define ROUTINE_LNK           mpp_lnk_4d_ptr
375#     include "mpp_lnk_generic.h90"
376#     undef ROUTINE_LNK
377#     undef MULTI
378#  undef DIM_4d
379
380   !!----------------------------------------------------------------------
381   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
382   !!
383   !!   * Argument : dummy argument use in mpp_nfd_... routines
384   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
385   !!                cd_nat :   nature of array grid-points
386   !!                psgn   :   sign used across the north fold boundary
387   !!                kfld   :   optional, number of pt3d arrays
388   !!                cd_mpp :   optional, fill the overlap area only
389   !!                pval   :   optional, background value (used at closed boundaries)
390   !!----------------------------------------------------------------------
391   !
392   !                       !==  2D array and array of 2D pointer  ==!
393   !
394#  define DIM_2d
395#     define ROUTINE_NFD           mpp_nfd_2d
396#     include "mpp_nfd_generic.h90"
397#     undef ROUTINE_NFD
398#     define MULTI
399#     define ROUTINE_NFD           mpp_nfd_2d_ptr
400#     include "mpp_nfd_generic.h90"
401#     undef ROUTINE_NFD
402#     undef MULTI
403#  undef DIM_2d
404   !
405   !                       !==  3D array and array of 3D pointer  ==!
406   !
407#  define DIM_3d
408#     define ROUTINE_NFD           mpp_nfd_3d
409#     include "mpp_nfd_generic.h90"
410#     undef ROUTINE_NFD
411#     define MULTI
412#     define ROUTINE_NFD           mpp_nfd_3d_ptr
413#     include "mpp_nfd_generic.h90"
414#     undef ROUTINE_NFD
415#     undef MULTI
416#  undef DIM_3d
417   !
418   !                       !==  4D array and array of 4D pointer  ==!
419   !
420#  define DIM_4d
421#     define ROUTINE_NFD           mpp_nfd_4d
422#     include "mpp_nfd_generic.h90"
423#     undef ROUTINE_NFD
424#     define MULTI
425#     define ROUTINE_NFD           mpp_nfd_4d_ptr
426#     include "mpp_nfd_generic.h90"
427#     undef ROUTINE_NFD
428#     undef MULTI
429#  undef DIM_4d
430
431
432   !!----------------------------------------------------------------------
433   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
434   !!
435   !!   * Argument : dummy argument use in mpp_lnk_... routines
436   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
437   !!                cd_nat :   nature of array grid-points
438   !!                psgn   :   sign used across the north fold boundary
439   !!                kb_bdy :   BDY boundary set
440   !!                kfld   :   optional, number of pt3d arrays
441   !!----------------------------------------------------------------------
442   !
443   !                       !==  2D array and array of 2D pointer  ==!
444   !
445#  define DIM_2d
446#     define ROUTINE_BDY           mpp_lnk_bdy_2d
447#     include "mpp_bdy_generic.h90"
448#     undef ROUTINE_BDY
449#  undef DIM_2d
450   !
451   !                       !==  3D array and array of 3D pointer  ==!
452   !
453#  define DIM_3d
454#     define ROUTINE_BDY           mpp_lnk_bdy_3d
455#     include "mpp_bdy_generic.h90"
456#     undef ROUTINE_BDY
457#  undef DIM_3d
458   !
459   !                       !==  4D array and array of 4D pointer  ==!
460   !
461#  define DIM_4d
462#     define ROUTINE_BDY           mpp_lnk_bdy_4d
463#     include "mpp_bdy_generic.h90"
464#     undef ROUTINE_BDY
465#  undef DIM_4d
466
467   !!----------------------------------------------------------------------
468   !!
469   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
470   
471   
472   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
473   
474   
475   !!----------------------------------------------------------------------
476
477
478
479   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
480      !!----------------------------------------------------------------------
481      !!                  ***  routine mppsend  ***
482      !!
483      !! ** Purpose :   Send messag passing array
484      !!
485      !!----------------------------------------------------------------------
486      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
487      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
488      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
489      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
490      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
491      !!
492      INTEGER ::   iflag
493      !!----------------------------------------------------------------------
494      !
495      SELECT CASE ( cn_mpi_send )
496      CASE ( 'S' )                ! Standard mpi send (blocking)
497         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
498      CASE ( 'B' )                ! Buffer mpi send (blocking)
499         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
500      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
501         ! be carefull, one more argument here : the mpi request identifier..
502         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
503      END SELECT
504      !
505   END SUBROUTINE mppsend
506
507
508   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
509      !!----------------------------------------------------------------------
510      !!                  ***  routine mpprecv  ***
511      !!
512      !! ** Purpose :   Receive messag passing array
513      !!
514      !!----------------------------------------------------------------------
515      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
516      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
517      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
518      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
519      !!
520      INTEGER :: istatus(mpi_status_size)
521      INTEGER :: iflag
522      INTEGER :: use_source
523      !!----------------------------------------------------------------------
524      !
525      ! If a specific process number has been passed to the receive call,
526      ! use that one. Default is to use mpi_any_source
527      use_source = mpi_any_source
528      IF( PRESENT(ksource) )   use_source = ksource
529      !
530      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
531      !
532   END SUBROUTINE mpprecv
533
534
535   SUBROUTINE mppgather( ptab, kp, pio )
536      !!----------------------------------------------------------------------
537      !!                   ***  routine mppgather  ***
538      !!
539      !! ** Purpose :   Transfert between a local subdomain array and a work
540      !!     array which is distributed following the vertical level.
541      !!
542      !!----------------------------------------------------------------------
543      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
544      INTEGER                           , INTENT(in   ) ::   kp     ! record length
545      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
546      !!
547      INTEGER :: itaille, ierror   ! temporary integer
548      !!---------------------------------------------------------------------
549      !
550      itaille = jpi * jpj
551      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
552         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
553      !
554   END SUBROUTINE mppgather
555
556
557   SUBROUTINE mppscatter( pio, kp, ptab )
558      !!----------------------------------------------------------------------
559      !!                  ***  routine mppscatter  ***
560      !!
561      !! ** Purpose :   Transfert between awork array which is distributed
562      !!      following the vertical level and the local subdomain array.
563      !!
564      !!----------------------------------------------------------------------
565      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
566      INTEGER                             ::   kp     ! Tag (not used with MPI
567      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
568      !!
569      INTEGER :: itaille, ierror   ! temporary integer
570      !!---------------------------------------------------------------------
571      !
572      itaille = jpi * jpj
573      !
574      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
575         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
576      !
577   END SUBROUTINE mppscatter
578
579   
580   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
581     !!----------------------------------------------------------------------
582      !!                   ***  routine mpp_delay_sum  ***
583      !!
584      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
585      !!
586      !!----------------------------------------------------------------------
587      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
588      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
589      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
590      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
591      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
592      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
593      !!
594      INTEGER ::   ji, isz
595      INTEGER ::   idvar
596      INTEGER ::   ierr, ilocalcomm
597      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
598      !!----------------------------------------------------------------------
599      ilocalcomm = mpi_comm_oce
600      IF( PRESENT(kcom) )   ilocalcomm = kcom
601
602      isz = SIZE(y_in)
603     
604      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_glb = .TRUE. )
605
606      idvar = -1
607      DO ji = 1, nbdelay
608         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
609      END DO
610      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
611
612      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
613         !                                       --------------------------
614         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
615            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
616            DEALLOCATE(todelay(idvar)%z1d)
617            ndelayid(idvar) = -1                                      ! do as if we had no restart
618         ELSE
619            ALLOCATE(todelay(idvar)%y1d(isz))
620            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
621         END IF
622      ENDIF
623     
624      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
625         !                                       --------------------------
626         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
627         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
628         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
629      ENDIF
630
631      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
632
633      ! send back pout from todelay(idvar)%z1d defined at previous call
634      pout(:) = todelay(idvar)%z1d(:)
635
636      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
637      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
638
639   END SUBROUTINE mpp_delay_sum
640
641   
642   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
643      !!----------------------------------------------------------------------
644      !!                   ***  routine mpp_delay_max  ***
645      !!
646      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
647      !!
648      !!----------------------------------------------------------------------
649      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
650      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
651      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
652      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
653      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
654      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
655      !!
656      INTEGER ::   ji, isz
657      INTEGER ::   idvar
658      INTEGER ::   ierr, ilocalcomm
659      !!----------------------------------------------------------------------
660      ilocalcomm = mpi_comm_oce
661      IF( PRESENT(kcom) )   ilocalcomm = kcom
662
663      isz = SIZE(p_in)
664
665      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_glb = .TRUE. )
666
667      idvar = -1
668      DO ji = 1, nbdelay
669         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
670      END DO
671      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
672
673      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
674         !                                       --------------------------
675         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
676            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
677            DEALLOCATE(todelay(idvar)%z1d)
678            ndelayid(idvar) = -1                                      ! do as if we had no restart
679         END IF
680      ENDIF
681
682      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
683         !                                       --------------------------
684         ALLOCATE(todelay(idvar)%z1d(isz))
685         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
686      ENDIF
687
688      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
689
690      ! send back pout from todelay(idvar)%z1d defined at previous call
691      pout(:) = todelay(idvar)%z1d(:)
692
693      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
694      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
695
696   END SUBROUTINE mpp_delay_max
697
698   
699   SUBROUTINE mpp_delay_rcv( kid )
700      !!----------------------------------------------------------------------
701      !!                   ***  routine mpp_delay_rcv  ***
702      !!
703      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
704      !!
705      !!----------------------------------------------------------------------
706      INTEGER,INTENT(in   )      ::  kid 
707      INTEGER ::   ierr
708      !!----------------------------------------------------------------------
709      IF( ndelayid(kid) /= -2 ) THEN 
710         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
711         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
712         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
713         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
714         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
715      ENDIF
716   END SUBROUTINE mpp_delay_rcv
717
718   
719   !!----------------------------------------------------------------------
720   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
721   !!   
722   !!----------------------------------------------------------------------
723   !!
724#  define OPERATION_MAX
725#  define INTEGER_TYPE
726#  define DIM_0d
727#     define ROUTINE_ALLREDUCE           mppmax_int
728#     include "mpp_allreduce_generic.h90"
729#     undef ROUTINE_ALLREDUCE
730#  undef DIM_0d
731#  define DIM_1d
732#     define ROUTINE_ALLREDUCE           mppmax_a_int
733#     include "mpp_allreduce_generic.h90"
734#     undef ROUTINE_ALLREDUCE
735#  undef DIM_1d
736#  undef INTEGER_TYPE
737!
738#  define REAL_TYPE
739#  define DIM_0d
740#     define ROUTINE_ALLREDUCE           mppmax_real
741#     include "mpp_allreduce_generic.h90"
742#     undef ROUTINE_ALLREDUCE
743#  undef DIM_0d
744#  define DIM_1d
745#     define ROUTINE_ALLREDUCE           mppmax_a_real
746#     include "mpp_allreduce_generic.h90"
747#     undef ROUTINE_ALLREDUCE
748#  undef DIM_1d
749#  undef REAL_TYPE
750#  undef OPERATION_MAX
751   !!----------------------------------------------------------------------
752   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
753   !!   
754   !!----------------------------------------------------------------------
755   !!
756#  define OPERATION_MIN
757#  define INTEGER_TYPE
758#  define DIM_0d
759#     define ROUTINE_ALLREDUCE           mppmin_int
760#     include "mpp_allreduce_generic.h90"
761#     undef ROUTINE_ALLREDUCE
762#  undef DIM_0d
763#  define DIM_1d
764#     define ROUTINE_ALLREDUCE           mppmin_a_int
765#     include "mpp_allreduce_generic.h90"
766#     undef ROUTINE_ALLREDUCE
767#  undef DIM_1d
768#  undef INTEGER_TYPE
769!
770#  define REAL_TYPE
771#  define DIM_0d
772#     define ROUTINE_ALLREDUCE           mppmin_real
773#     include "mpp_allreduce_generic.h90"
774#     undef ROUTINE_ALLREDUCE
775#  undef DIM_0d
776#  define DIM_1d
777#     define ROUTINE_ALLREDUCE           mppmin_a_real
778#     include "mpp_allreduce_generic.h90"
779#     undef ROUTINE_ALLREDUCE
780#  undef DIM_1d
781#  undef REAL_TYPE
782#  undef OPERATION_MIN
783
784   !!----------------------------------------------------------------------
785   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
786   !!   
787   !!   Global sum of 1D array or a variable (integer, real or complex)
788   !!----------------------------------------------------------------------
789   !!
790#  define OPERATION_SUM
791#  define INTEGER_TYPE
792#  define DIM_0d
793#     define ROUTINE_ALLREDUCE           mppsum_int
794#     include "mpp_allreduce_generic.h90"
795#     undef ROUTINE_ALLREDUCE
796#  undef DIM_0d
797#  define DIM_1d
798#     define ROUTINE_ALLREDUCE           mppsum_a_int
799#     include "mpp_allreduce_generic.h90"
800#     undef ROUTINE_ALLREDUCE
801#  undef DIM_1d
802#  undef INTEGER_TYPE
803!
804#  define REAL_TYPE
805#  define DIM_0d
806#     define ROUTINE_ALLREDUCE           mppsum_real
807#     include "mpp_allreduce_generic.h90"
808#     undef ROUTINE_ALLREDUCE
809#  undef DIM_0d
810#  define DIM_1d
811#     define ROUTINE_ALLREDUCE           mppsum_a_real
812#     include "mpp_allreduce_generic.h90"
813#     undef ROUTINE_ALLREDUCE
814#  undef DIM_1d
815#  undef REAL_TYPE
816#  undef OPERATION_SUM
817
818#  define OPERATION_SUM_DD
819#  define COMPLEX_TYPE
820#  define DIM_0d
821#     define ROUTINE_ALLREDUCE           mppsum_realdd
822#     include "mpp_allreduce_generic.h90"
823#     undef ROUTINE_ALLREDUCE
824#  undef DIM_0d
825#  define DIM_1d
826#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
827#     include "mpp_allreduce_generic.h90"
828#     undef ROUTINE_ALLREDUCE
829#  undef DIM_1d
830#  undef COMPLEX_TYPE
831#  undef OPERATION_SUM_DD
832
833   !!----------------------------------------------------------------------
834   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
835   !!   
836   !!----------------------------------------------------------------------
837   !!
838#  define OPERATION_MINLOC
839#  define DIM_2d
840#     define ROUTINE_LOC           mpp_minloc2d
841#     include "mpp_loc_generic.h90"
842#     undef ROUTINE_LOC
843#  undef DIM_2d
844#  define DIM_3d
845#     define ROUTINE_LOC           mpp_minloc3d
846#     include "mpp_loc_generic.h90"
847#     undef ROUTINE_LOC
848#  undef DIM_3d
849#  undef OPERATION_MINLOC
850
851#  define OPERATION_MAXLOC
852#  define DIM_2d
853#     define ROUTINE_LOC           mpp_maxloc2d
854#     include "mpp_loc_generic.h90"
855#     undef ROUTINE_LOC
856#  undef DIM_2d
857#  define DIM_3d
858#     define ROUTINE_LOC           mpp_maxloc3d
859#     include "mpp_loc_generic.h90"
860#     undef ROUTINE_LOC
861#  undef DIM_3d
862#  undef OPERATION_MAXLOC
863
864   SUBROUTINE mppsync()
865      !!----------------------------------------------------------------------
866      !!                  ***  routine mppsync  ***
867      !!
868      !! ** Purpose :   Massively parallel processors, synchroneous
869      !!
870      !!-----------------------------------------------------------------------
871      INTEGER :: ierror
872      !!-----------------------------------------------------------------------
873      !
874      CALL mpi_barrier( mpi_comm_oce, ierror )
875      !
876   END SUBROUTINE mppsync
877
878
879   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
880      !!----------------------------------------------------------------------
881      !!                  ***  routine mppstop  ***
882      !!
883      !! ** purpose :   Stop massively parallel processors method
884      !!
885      !!----------------------------------------------------------------------
886      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
887      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
888      LOGICAL ::   llfinal, ll_force_abort
889      INTEGER ::   info
890      !!----------------------------------------------------------------------
891      llfinal = .FALSE.
892      IF( PRESENT(ldfinal) ) llfinal = ldfinal
893      ll_force_abort = .FALSE.
894      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
895      !
896      IF(ll_force_abort) THEN
897         CALL mpi_abort( MPI_COMM_WORLD )
898      ELSE
899         CALL mppsync
900         CALL mpi_finalize( info )
901      ENDIF
902      IF( .NOT. llfinal ) STOP 123456
903      !
904   END SUBROUTINE mppstop
905
906
907   SUBROUTINE mpp_comm_free( kcom )
908      !!----------------------------------------------------------------------
909      INTEGER, INTENT(in) ::   kcom
910      !!
911      INTEGER :: ierr
912      !!----------------------------------------------------------------------
913      !
914      CALL MPI_COMM_FREE(kcom, ierr)
915      !
916   END SUBROUTINE mpp_comm_free
917
918
919   SUBROUTINE mpp_ini_znl( kumout )
920      !!----------------------------------------------------------------------
921      !!               ***  routine mpp_ini_znl  ***
922      !!
923      !! ** Purpose :   Initialize special communicator for computing zonal sum
924      !!
925      !! ** Method  : - Look for processors in the same row
926      !!              - Put their number in nrank_znl
927      !!              - Create group for the znl processors
928      !!              - Create a communicator for znl processors
929      !!              - Determine if processor should write znl files
930      !!
931      !! ** output
932      !!      ndim_rank_znl = number of processors on the same row
933      !!      ngrp_znl = group ID for the znl processors
934      !!      ncomm_znl = communicator for the ice procs.
935      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
936      !!
937      !!----------------------------------------------------------------------
938      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
939      !
940      INTEGER :: jproc      ! dummy loop integer
941      INTEGER :: ierr, ii   ! local integer
942      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
943      !!----------------------------------------------------------------------
944      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
945      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
946      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
947      !
948      ALLOCATE( kwork(jpnij), STAT=ierr )
949      IF( ierr /= 0 ) THEN
950         WRITE(kumout, cform_err)
951         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
952         CALL mppstop
953      ENDIF
954
955      IF( jpnj == 1 ) THEN
956         ngrp_znl  = ngrp_world
957         ncomm_znl = mpi_comm_oce
958      ELSE
959         !
960         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
961         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
962         !-$$        CALL flush(numout)
963         !
964         ! Count number of processors on the same row
965         ndim_rank_znl = 0
966         DO jproc=1,jpnij
967            IF ( kwork(jproc) == njmpp ) THEN
968               ndim_rank_znl = ndim_rank_znl + 1
969            ENDIF
970         END DO
971         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
972         !-$$        CALL flush(numout)
973         ! Allocate the right size to nrank_znl
974         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
975         ALLOCATE(nrank_znl(ndim_rank_znl))
976         ii = 0
977         nrank_znl (:) = 0
978         DO jproc=1,jpnij
979            IF ( kwork(jproc) == njmpp) THEN
980               ii = ii + 1
981               nrank_znl(ii) = jproc -1
982            ENDIF
983         END DO
984         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
985         !-$$        CALL flush(numout)
986
987         ! Create the opa group
988         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
989         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
990         !-$$        CALL flush(numout)
991
992         ! Create the znl group from the opa group
993         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
994         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
995         !-$$        CALL flush(numout)
996
997         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
998         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
999         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1000         !-$$        CALL flush(numout)
1001         !
1002      END IF
1003
1004      ! Determines if processor if the first (starting from i=1) on the row
1005      IF ( jpni == 1 ) THEN
1006         l_znl_root = .TRUE.
1007      ELSE
1008         l_znl_root = .FALSE.
1009         kwork (1) = nimpp
1010         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1011         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1012      END IF
1013
1014      DEALLOCATE(kwork)
1015
1016   END SUBROUTINE mpp_ini_znl
1017
1018
1019   SUBROUTINE mpp_ini_north
1020      !!----------------------------------------------------------------------
1021      !!               ***  routine mpp_ini_north  ***
1022      !!
1023      !! ** Purpose :   Initialize special communicator for north folding
1024      !!      condition together with global variables needed in the mpp folding
1025      !!
1026      !! ** Method  : - Look for northern processors
1027      !!              - Put their number in nrank_north
1028      !!              - Create groups for the world processors and the north processors
1029      !!              - Create a communicator for northern processors
1030      !!
1031      !! ** output
1032      !!      njmppmax = njmpp for northern procs
1033      !!      ndim_rank_north = number of processors in the northern line
1034      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1035      !!      ngrp_world = group ID for the world processors
1036      !!      ngrp_north = group ID for the northern processors
1037      !!      ncomm_north = communicator for the northern procs.
1038      !!      north_root = number (in the world) of proc 0 in the northern comm.
1039      !!
1040      !!----------------------------------------------------------------------
1041      INTEGER ::   ierr
1042      INTEGER ::   jjproc
1043      INTEGER ::   ii, ji
1044      !!----------------------------------------------------------------------
1045      !
1046      njmppmax = MAXVAL( njmppt )
1047      !
1048      ! Look for how many procs on the northern boundary
1049      ndim_rank_north = 0
1050      DO jjproc = 1, jpnij
1051         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1052      END DO
1053      !
1054      ! Allocate the right size to nrank_north
1055      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1056      ALLOCATE( nrank_north(ndim_rank_north) )
1057
1058      ! Fill the nrank_north array with proc. number of northern procs.
1059      ! Note : the rank start at 0 in MPI
1060      ii = 0
1061      DO ji = 1, jpnij
1062         IF ( njmppt(ji) == njmppmax   ) THEN
1063            ii=ii+1
1064            nrank_north(ii)=ji-1
1065         END IF
1066      END DO
1067      !
1068      ! create the world group
1069      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1070      !
1071      ! Create the North group from the world group
1072      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1073      !
1074      ! Create the North communicator , ie the pool of procs in the north group
1075      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1076      !
1077   END SUBROUTINE mpp_ini_north
1078
1079
1080   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1081      !!---------------------------------------------------------------------
1082      !!                   ***  routine mpp_init.opa  ***
1083      !!
1084      !! ** Purpose :: export and attach a MPI buffer for bsend
1085      !!
1086      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1087      !!            but classical mpi_init
1088      !!
1089      !! History :: 01/11 :: IDRIS initial version for IBM only
1090      !!            08/04 :: R. Benshila, generalisation
1091      !!---------------------------------------------------------------------
1092      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1093      INTEGER                      , INTENT(inout) ::   ksft
1094      INTEGER                      , INTENT(  out) ::   code
1095      INTEGER                                      ::   ierr, ji
1096      LOGICAL                                      ::   mpi_was_called
1097      !!---------------------------------------------------------------------
1098      !
1099      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1100      IF ( code /= MPI_SUCCESS ) THEN
1101         DO ji = 1, SIZE(ldtxt)
1102            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1103         END DO
1104         WRITE(*, cform_err)
1105         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1106         CALL mpi_abort( mpi_comm_world, code, ierr )
1107      ENDIF
1108      !
1109      IF( .NOT. mpi_was_called ) THEN
1110         CALL mpi_init( code )
1111         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1112         IF ( code /= MPI_SUCCESS ) THEN
1113            DO ji = 1, SIZE(ldtxt)
1114               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1115            END DO
1116            WRITE(*, cform_err)
1117            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1118            CALL mpi_abort( mpi_comm_world, code, ierr )
1119         ENDIF
1120      ENDIF
1121      !
1122      IF( nn_buffer > 0 ) THEN
1123         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1124         ! Buffer allocation and attachment
1125         ALLOCATE( tampon(nn_buffer), stat = ierr )
1126         IF( ierr /= 0 ) THEN
1127            DO ji = 1, SIZE(ldtxt)
1128               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1129            END DO
1130            WRITE(*, cform_err)
1131            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1132            CALL mpi_abort( mpi_comm_world, code, ierr )
1133         END IF
1134         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1135      ENDIF
1136      !
1137   END SUBROUTINE mpi_init_oce
1138
1139
1140   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1141      !!---------------------------------------------------------------------
1142      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1143      !!
1144      !!   Modification of original codes written by David H. Bailey
1145      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1146      !!---------------------------------------------------------------------
1147      INTEGER                     , INTENT(in)    ::   ilen, itype
1148      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1149      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1150      !
1151      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1152      INTEGER  :: ji, ztmp           ! local scalar
1153      !!---------------------------------------------------------------------
1154      !
1155      ztmp = itype   ! avoid compilation warning
1156      !
1157      DO ji=1,ilen
1158      ! Compute ydda + yddb using Knuth's trick.
1159         zt1  = real(ydda(ji)) + real(yddb(ji))
1160         zerr = zt1 - real(ydda(ji))
1161         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1162                + aimag(ydda(ji)) + aimag(yddb(ji))
1163
1164         ! The result is zt1 + zt2, after normalization.
1165         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1166      END DO
1167      !
1168   END SUBROUTINE DDPDD_MPI
1169
1170
1171   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1172      !!---------------------------------------------------------------------
1173      !!                   ***  routine mpp_lbc_north_icb  ***
1174      !!
1175      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1176      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1177      !!              array with outer extra halo
1178      !!
1179      !! ** Method  :   North fold condition and mpp with more than one proc
1180      !!              in i-direction require a specific treatment. We gather
1181      !!              the 4+kextj northern lines of the global domain on 1
1182      !!              processor and apply lbc north-fold on this sub array.
1183      !!              Then we scatter the north fold array back to the processors.
1184      !!              This routine accounts for an extra halo with icebergs
1185      !!              and assumes ghost rows and columns have been suppressed.
1186      !!
1187      !!----------------------------------------------------------------------
1188      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1189      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1190      !                                                     !   = T ,  U , V , F or W -points
1191      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1192      !!                                                    ! north fold, =  1. otherwise
1193      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1194      !
1195      INTEGER ::   ji, jj, jr
1196      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1197      INTEGER ::   ipj, ij, iproc
1198      !
1199      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1200      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1201      !!----------------------------------------------------------------------
1202      !
1203      ipj=4
1204      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1205     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1206     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1207      !
1208      ztab_e(:,:)      = 0._wp
1209      znorthloc_e(:,:) = 0._wp
1210      !
1211      ij = 1 - kextj
1212      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1213      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1214         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1215         ij = ij + 1
1216      END DO
1217      !
1218      itaille = jpimax * ( ipj + 2*kextj )
1219      !
1220      IF( ln_timing ) CALL tic_tac(.TRUE.)
1221      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1222         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1223         &                ncomm_north, ierr )
1224      !
1225      IF( ln_timing ) CALL tic_tac(.FALSE.)
1226      !
1227      DO jr = 1, ndim_rank_north            ! recover the global north array
1228         iproc = nrank_north(jr) + 1
1229         ildi = nldit (iproc)
1230         ilei = nleit (iproc)
1231         iilb = nimppt(iproc)
1232         DO jj = 1-kextj, ipj+kextj
1233            DO ji = ildi, ilei
1234               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1235            END DO
1236         END DO
1237      END DO
1238
1239      ! 2. North-Fold boundary conditions
1240      ! ----------------------------------
1241      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1242
1243      ij = 1 - kextj
1244      !! Scatter back to pt2d
1245      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1246         DO ji= 1, jpi
1247            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1248         END DO
1249         ij  = ij +1
1250      END DO
1251      !
1252      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1253      !
1254   END SUBROUTINE mpp_lbc_north_icb
1255
1256
1257   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1258      !!----------------------------------------------------------------------
1259      !!                  ***  routine mpp_lnk_2d_icb  ***
1260      !!
1261      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1262      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1263      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1264      !!
1265      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1266      !!      between processors following neighboring subdomains.
1267      !!            domain parameters
1268      !!                    jpi    : first dimension of the local subdomain
1269      !!                    jpj    : second dimension of the local subdomain
1270      !!                    kexti  : number of columns for extra outer halo
1271      !!                    kextj  : number of rows for extra outer halo
1272      !!                    nbondi : mark for "east-west local boundary"
1273      !!                    nbondj : mark for "north-south local boundary"
1274      !!                    noea   : number for local neighboring processors
1275      !!                    nowe   : number for local neighboring processors
1276      !!                    noso   : number for local neighboring processors
1277      !!                    nono   : number for local neighboring processors
1278      !!----------------------------------------------------------------------
1279      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1280      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1281      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1282      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1283      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1284      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1285      !
1286      INTEGER  ::   jl   ! dummy loop indices
1287      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1288      INTEGER  ::   ipreci, iprecj             !   -       -
1289      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1290      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1291      !!
1292      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1293      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1294      !!----------------------------------------------------------------------
1295
1296      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1297      iprecj = nn_hls + kextj
1298
1299      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1300
1301      ! 1. standard boundary treatment
1302      ! ------------------------------
1303      ! Order matters Here !!!!
1304      !
1305      !                                      ! East-West boundaries
1306      !                                           !* Cyclic east-west
1307      IF( l_Iperio ) THEN
1308         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1309         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1310         !
1311      ELSE                                        !* closed
1312         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1313                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1314      ENDIF
1315      !                                      ! North-South boundaries
1316      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1317         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1318         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1319      ELSE                                        !* closed
1320         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1321                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1322      ENDIF
1323      !
1324
1325      ! north fold treatment
1326      ! -----------------------
1327      IF( npolj /= 0 ) THEN
1328         !
1329         SELECT CASE ( jpni )
1330                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1331                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1332         END SELECT
1333         !
1334      ENDIF
1335
1336      ! 2. East and west directions exchange
1337      ! ------------------------------------
1338      ! we play with the neigbours AND the row number because of the periodicity
1339      !
1340      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1341      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1342         iihom = jpi-nreci-kexti
1343         DO jl = 1, ipreci
1344            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1345            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1346         END DO
1347      END SELECT
1348      !
1349      !                           ! Migrations
1350      imigr = ipreci * ( jpj + 2*kextj )
1351      !
1352      IF( ln_timing ) CALL tic_tac(.TRUE.)
1353      !
1354      SELECT CASE ( nbondi )
1355      CASE ( -1 )
1356         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1357         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1358         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1359      CASE ( 0 )
1360         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1361         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1362         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1363         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1364         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1365         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1366      CASE ( 1 )
1367         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1368         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1369         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1370      END SELECT
1371      !
1372      IF( ln_timing ) CALL tic_tac(.FALSE.)
1373      !
1374      !                           ! Write Dirichlet lateral conditions
1375      iihom = jpi - nn_hls
1376      !
1377      SELECT CASE ( nbondi )
1378      CASE ( -1 )
1379         DO jl = 1, ipreci
1380            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1381         END DO
1382      CASE ( 0 )
1383         DO jl = 1, ipreci
1384            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1385            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1386         END DO
1387      CASE ( 1 )
1388         DO jl = 1, ipreci
1389            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1390         END DO
1391      END SELECT
1392
1393
1394      ! 3. North and south directions
1395      ! -----------------------------
1396      ! always closed : we play only with the neigbours
1397      !
1398      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1399         ijhom = jpj-nrecj-kextj
1400         DO jl = 1, iprecj
1401            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1402            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1403         END DO
1404      ENDIF
1405      !
1406      !                           ! Migrations
1407      imigr = iprecj * ( jpi + 2*kexti )
1408      !
1409      IF( ln_timing ) CALL tic_tac(.TRUE.)
1410      !
1411      SELECT CASE ( nbondj )
1412      CASE ( -1 )
1413         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1414         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1415         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1416      CASE ( 0 )
1417         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1418         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1419         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1420         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1421         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1422         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1423      CASE ( 1 )
1424         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1425         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1426         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1427      END SELECT
1428      !
1429      IF( ln_timing ) CALL tic_tac(.FALSE.)
1430      !
1431      !                           ! Write Dirichlet lateral conditions
1432      ijhom = jpj - nn_hls
1433      !
1434      SELECT CASE ( nbondj )
1435      CASE ( -1 )
1436         DO jl = 1, iprecj
1437            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1438         END DO
1439      CASE ( 0 )
1440         DO jl = 1, iprecj
1441            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1442            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1443         END DO
1444      CASE ( 1 )
1445         DO jl = 1, iprecj
1446            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1447         END DO
1448      END SELECT
1449      !
1450   END SUBROUTINE mpp_lnk_2d_icb
1451
1452
1453   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb )
1454      !!----------------------------------------------------------------------
1455      !!                  ***  routine mpp_report  ***
1456      !!
1457      !! ** Purpose :   report use of mpp routines per time-setp
1458      !!
1459      !!----------------------------------------------------------------------
1460      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1461      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1462      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb
1463      !!
1464      LOGICAL ::   ll_lbc, ll_glb
1465      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1466      !!----------------------------------------------------------------------
1467      !
1468      ll_lbc = .FALSE.
1469      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1470      ll_glb = .FALSE.
1471      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1472      !
1473      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1474      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1475      ncom_freq = ncom_fsbc
1476      !
1477      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1478         IF( ll_lbc ) THEN
1479            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1480            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1481            n_sequence_lbc = n_sequence_lbc + 1
1482            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1483            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1484            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1485            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1486         ENDIF
1487         IF( ll_glb ) THEN
1488            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1489            n_sequence_glb = n_sequence_glb + 1
1490            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1491            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1492         ENDIF
1493      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1494         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1495         WRITE(numcom,*) ' '
1496         WRITE(numcom,*) ' ------------------------------------------------------------'
1497         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1498         WRITE(numcom,*) ' ------------------------------------------------------------'
1499         WRITE(numcom,*) ' '
1500         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1501         jj = 0; jk = 0; jf = 0; jh = 0
1502         DO ji = 1, n_sequence_lbc
1503            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1504            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1505            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1506            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1507         END DO
1508         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1509         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1510         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1511         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1512         WRITE(numcom,*) ' '
1513         WRITE(numcom,*) ' lbc_lnk called'
1514         jj = 1
1515         DO ji = 2, n_sequence_lbc
1516            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1517               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1518               jj = 0
1519            END IF
1520            jj = jj + 1 
1521         END DO
1522         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1523         WRITE(numcom,*) ' '
1524         IF ( n_sequence_glb > 0 ) THEN
1525            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1526            jj = 1
1527            DO ji = 2, n_sequence_glb
1528               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1529                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1530                  jj = 0
1531               END IF
1532               jj = jj + 1 
1533            END DO
1534            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1535            DEALLOCATE(crname_glb)
1536         ELSE
1537            WRITE(numcom,*) ' No MPI global communication '
1538         ENDIF
1539         WRITE(numcom,*) ' '
1540         WRITE(numcom,*) ' -----------------------------------------------'
1541         WRITE(numcom,*) ' '
1542         DEALLOCATE(ncomm_sequence)
1543         DEALLOCATE(crname_lbc)
1544      ENDIF
1545   END SUBROUTINE mpp_report
1546
1547   
1548   SUBROUTINE tic_tac (ld_tic, ld_global)
1549
1550    LOGICAL,           INTENT(IN) :: ld_tic
1551    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1552    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1553    REAL(wp),               SAVE :: tic_ct = 0._wp
1554    INTEGER :: ii
1555
1556    IF( ncom_stp <= nit000 ) RETURN
1557    IF( ncom_stp == nitend ) RETURN
1558    ii = 1
1559    IF( PRESENT( ld_global ) ) THEN
1560       IF( ld_global ) ii = 2
1561    END IF
1562   
1563    IF ( ld_tic ) THEN
1564       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1565       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1566    ELSE
1567       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1568       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1569    ENDIF
1570   
1571   END SUBROUTINE tic_tac
1572
1573   
1574#else
1575   !!----------------------------------------------------------------------
1576   !!   Default case:            Dummy module        share memory computing
1577   !!----------------------------------------------------------------------
1578   USE in_out_manager
1579
1580   INTERFACE mpp_sum
1581      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1582   END INTERFACE
1583   INTERFACE mpp_max
1584      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1585   END INTERFACE
1586   INTERFACE mpp_min
1587      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1588   END INTERFACE
1589   INTERFACE mpp_minloc
1590      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1591   END INTERFACE
1592   INTERFACE mpp_maxloc
1593      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1594   END INTERFACE
1595
1596   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1597   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1598   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1599
1600   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1601   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1602   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1603   TYPE ::   DELAYARR
1604      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1605      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1606   END TYPE DELAYARR
1607   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1608   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1609   !!----------------------------------------------------------------------
1610CONTAINS
1611
1612   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1613      INTEGER, INTENT(in) ::   kumout
1614      lib_mpp_alloc = 0
1615   END FUNCTION lib_mpp_alloc
1616
1617   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1618      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1619      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1620      CHARACTER(len=*) ::   ldname
1621      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1622      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1623      function_value = 0
1624      IF( .FALSE. )   ldtxt(:) = 'never done'
1625      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1626   END FUNCTION mynode
1627
1628   SUBROUTINE mppsync                       ! Dummy routine
1629   END SUBROUTINE mppsync
1630
1631   !!----------------------------------------------------------------------
1632   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1633   !!   
1634   !!----------------------------------------------------------------------
1635   !!
1636#  define OPERATION_MAX
1637#  define INTEGER_TYPE
1638#  define DIM_0d
1639#     define ROUTINE_ALLREDUCE           mppmax_int
1640#     include "mpp_allreduce_generic.h90"
1641#     undef ROUTINE_ALLREDUCE
1642#  undef DIM_0d
1643#  define DIM_1d
1644#     define ROUTINE_ALLREDUCE           mppmax_a_int
1645#     include "mpp_allreduce_generic.h90"
1646#     undef ROUTINE_ALLREDUCE
1647#  undef DIM_1d
1648#  undef INTEGER_TYPE
1649!
1650#  define REAL_TYPE
1651#  define DIM_0d
1652#     define ROUTINE_ALLREDUCE           mppmax_real
1653#     include "mpp_allreduce_generic.h90"
1654#     undef ROUTINE_ALLREDUCE
1655#  undef DIM_0d
1656#  define DIM_1d
1657#     define ROUTINE_ALLREDUCE           mppmax_a_real
1658#     include "mpp_allreduce_generic.h90"
1659#     undef ROUTINE_ALLREDUCE
1660#  undef DIM_1d
1661#  undef REAL_TYPE
1662#  undef OPERATION_MAX
1663   !!----------------------------------------------------------------------
1664   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1665   !!   
1666   !!----------------------------------------------------------------------
1667   !!
1668#  define OPERATION_MIN
1669#  define INTEGER_TYPE
1670#  define DIM_0d
1671#     define ROUTINE_ALLREDUCE           mppmin_int
1672#     include "mpp_allreduce_generic.h90"
1673#     undef ROUTINE_ALLREDUCE
1674#  undef DIM_0d
1675#  define DIM_1d
1676#     define ROUTINE_ALLREDUCE           mppmin_a_int
1677#     include "mpp_allreduce_generic.h90"
1678#     undef ROUTINE_ALLREDUCE
1679#  undef DIM_1d
1680#  undef INTEGER_TYPE
1681!
1682#  define REAL_TYPE
1683#  define DIM_0d
1684#     define ROUTINE_ALLREDUCE           mppmin_real
1685#     include "mpp_allreduce_generic.h90"
1686#     undef ROUTINE_ALLREDUCE
1687#  undef DIM_0d
1688#  define DIM_1d
1689#     define ROUTINE_ALLREDUCE           mppmin_a_real
1690#     include "mpp_allreduce_generic.h90"
1691#     undef ROUTINE_ALLREDUCE
1692#  undef DIM_1d
1693#  undef REAL_TYPE
1694#  undef OPERATION_MIN
1695
1696   !!----------------------------------------------------------------------
1697   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1698   !!   
1699   !!   Global sum of 1D array or a variable (integer, real or complex)
1700   !!----------------------------------------------------------------------
1701   !!
1702#  define OPERATION_SUM
1703#  define INTEGER_TYPE
1704#  define DIM_0d
1705#     define ROUTINE_ALLREDUCE           mppsum_int
1706#     include "mpp_allreduce_generic.h90"
1707#     undef ROUTINE_ALLREDUCE
1708#  undef DIM_0d
1709#  define DIM_1d
1710#     define ROUTINE_ALLREDUCE           mppsum_a_int
1711#     include "mpp_allreduce_generic.h90"
1712#     undef ROUTINE_ALLREDUCE
1713#  undef DIM_1d
1714#  undef INTEGER_TYPE
1715!
1716#  define REAL_TYPE
1717#  define DIM_0d
1718#     define ROUTINE_ALLREDUCE           mppsum_real
1719#     include "mpp_allreduce_generic.h90"
1720#     undef ROUTINE_ALLREDUCE
1721#  undef DIM_0d
1722#  define DIM_1d
1723#     define ROUTINE_ALLREDUCE           mppsum_a_real
1724#     include "mpp_allreduce_generic.h90"
1725#     undef ROUTINE_ALLREDUCE
1726#  undef DIM_1d
1727#  undef REAL_TYPE
1728#  undef OPERATION_SUM
1729
1730#  define OPERATION_SUM_DD
1731#  define COMPLEX_TYPE
1732#  define DIM_0d
1733#     define ROUTINE_ALLREDUCE           mppsum_realdd
1734#     include "mpp_allreduce_generic.h90"
1735#     undef ROUTINE_ALLREDUCE
1736#  undef DIM_0d
1737#  define DIM_1d
1738#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1739#     include "mpp_allreduce_generic.h90"
1740#     undef ROUTINE_ALLREDUCE
1741#  undef DIM_1d
1742#  undef COMPLEX_TYPE
1743#  undef OPERATION_SUM_DD
1744
1745   !!----------------------------------------------------------------------
1746   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1747   !!   
1748   !!----------------------------------------------------------------------
1749   !!
1750#  define OPERATION_MINLOC
1751#  define DIM_2d
1752#     define ROUTINE_LOC           mpp_minloc2d
1753#     include "mpp_loc_generic.h90"
1754#     undef ROUTINE_LOC
1755#  undef DIM_2d
1756#  define DIM_3d
1757#     define ROUTINE_LOC           mpp_minloc3d
1758#     include "mpp_loc_generic.h90"
1759#     undef ROUTINE_LOC
1760#  undef DIM_3d
1761#  undef OPERATION_MINLOC
1762
1763#  define OPERATION_MAXLOC
1764#  define DIM_2d
1765#     define ROUTINE_LOC           mpp_maxloc2d
1766#     include "mpp_loc_generic.h90"
1767#     undef ROUTINE_LOC
1768#  undef DIM_2d
1769#  define DIM_3d
1770#     define ROUTINE_LOC           mpp_maxloc3d
1771#     include "mpp_loc_generic.h90"
1772#     undef ROUTINE_LOC
1773#  undef DIM_3d
1774#  undef OPERATION_MAXLOC
1775
1776   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1777      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1778      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1779      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1780      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1781      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1782      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1783      !
1784      pout(:) = REAL(y_in(:), wp)
1785   END SUBROUTINE mpp_delay_sum
1786
1787   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1788      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1789      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1790      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1791      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1792      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1793      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1794      !
1795      pout(:) = p_in(:)
1796   END SUBROUTINE mpp_delay_max
1797
1798   SUBROUTINE mpp_delay_rcv( kid )
1799      INTEGER,INTENT(in   )      ::  kid 
1800      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1801   END SUBROUTINE mpp_delay_rcv
1802   
1803   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1804      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1805      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1806      STOP      ! non MPP case, just stop the run
1807   END SUBROUTINE mppstop
1808
1809   SUBROUTINE mpp_ini_znl( knum )
1810      INTEGER :: knum
1811      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1812   END SUBROUTINE mpp_ini_znl
1813
1814   SUBROUTINE mpp_comm_free( kcom )
1815      INTEGER :: kcom
1816      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1817   END SUBROUTINE mpp_comm_free
1818   
1819#endif
1820
1821   !!----------------------------------------------------------------------
1822   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1823   !!----------------------------------------------------------------------
1824
1825   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1826      &                 cd6, cd7, cd8, cd9, cd10 )
1827      !!----------------------------------------------------------------------
1828      !!                  ***  ROUTINE  stop_opa  ***
1829      !!
1830      !! ** Purpose :   print in ocean.outpput file a error message and
1831      !!                increment the error number (nstop) by one.
1832      !!----------------------------------------------------------------------
1833      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1834      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1835      !!----------------------------------------------------------------------
1836      !
1837      nstop = nstop + 1
1838      IF(lwp) THEN
1839         WRITE(numout,cform_err)
1840         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1841         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1842         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1843         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1844         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1845         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1846         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1847         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1848         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1849         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1850      ENDIF
1851                               CALL FLUSH(numout    )
1852      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1853      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1854      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1855      !
1856      IF( cd1 == 'STOP' ) THEN
1857         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1858         CALL mppstop(ld_force_abort = .true.)
1859      ENDIF
1860      !
1861   END SUBROUTINE ctl_stop
1862
1863
1864   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1865      &                 cd6, cd7, cd8, cd9, cd10 )
1866      !!----------------------------------------------------------------------
1867      !!                  ***  ROUTINE  stop_warn  ***
1868      !!
1869      !! ** Purpose :   print in ocean.outpput file a error message and
1870      !!                increment the warning number (nwarn) by one.
1871      !!----------------------------------------------------------------------
1872      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1873      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1874      !!----------------------------------------------------------------------
1875      !
1876      nwarn = nwarn + 1
1877      IF(lwp) THEN
1878         WRITE(numout,cform_war)
1879         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1880         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1881         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1882         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1883         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1884         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1885         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1886         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1887         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1888         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1889      ENDIF
1890      CALL FLUSH(numout)
1891      !
1892   END SUBROUTINE ctl_warn
1893
1894
1895   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1896      !!----------------------------------------------------------------------
1897      !!                  ***  ROUTINE ctl_opn  ***
1898      !!
1899      !! ** Purpose :   Open file and check if required file is available.
1900      !!
1901      !! ** Method  :   Fortan open
1902      !!----------------------------------------------------------------------
1903      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1904      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1905      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1906      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1907      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1908      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1909      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1910      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1911      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1912      !
1913      CHARACTER(len=80) ::   clfile
1914      INTEGER           ::   iost
1915      !!----------------------------------------------------------------------
1916      !
1917      ! adapt filename
1918      ! ----------------
1919      clfile = TRIM(cdfile)
1920      IF( PRESENT( karea ) ) THEN
1921         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1922      ENDIF
1923#if defined key_agrif
1924      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1925      knum=Agrif_Get_Unit()
1926#else
1927      knum=get_unit()
1928#endif
1929      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1930      !
1931      iost=0
1932      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
1933         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1934      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
1935         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
1936      ELSE
1937         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
1938      ENDIF
1939      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1940         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
1941      IF( iost == 0 ) THEN
1942         IF(ldwp) THEN
1943            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
1944            WRITE(kout,*) '     unit   = ', knum
1945            WRITE(kout,*) '     status = ', cdstat
1946            WRITE(kout,*) '     form   = ', cdform
1947            WRITE(kout,*) '     access = ', cdacce
1948            WRITE(kout,*)
1949         ENDIF
1950      ENDIF
1951100   CONTINUE
1952      IF( iost /= 0 ) THEN
1953         IF(ldwp) THEN
1954            WRITE(kout,*)
1955            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1956            WRITE(kout,*) ' =======   ===  '
1957            WRITE(kout,*) '           unit   = ', knum
1958            WRITE(kout,*) '           status = ', cdstat
1959            WRITE(kout,*) '           form   = ', cdform
1960            WRITE(kout,*) '           access = ', cdacce
1961            WRITE(kout,*) '           iostat = ', iost
1962            WRITE(kout,*) '           we stop. verify the file '
1963            WRITE(kout,*)
1964         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1965            WRITE(*,*)
1966            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
1967            WRITE(*,*) ' =======   ===  '
1968            WRITE(*,*) '           unit   = ', knum
1969            WRITE(*,*) '           status = ', cdstat
1970            WRITE(*,*) '           form   = ', cdform
1971            WRITE(*,*) '           access = ', cdacce
1972            WRITE(*,*) '           iostat = ', iost
1973            WRITE(*,*) '           we stop. verify the file '
1974            WRITE(*,*)
1975         ENDIF
1976         CALL FLUSH( kout ) 
1977         STOP 'ctl_opn bad opening'
1978      ENDIF
1979      !
1980   END SUBROUTINE ctl_opn
1981
1982
1983   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1984      !!----------------------------------------------------------------------
1985      !!                  ***  ROUTINE ctl_nam  ***
1986      !!
1987      !! ** Purpose :   Informations when error while reading a namelist
1988      !!
1989      !! ** Method  :   Fortan open
1990      !!----------------------------------------------------------------------
1991      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1992      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1993      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
1994      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
1995      !!----------------------------------------------------------------------
1996      !
1997      WRITE (clios, '(I5.0)')   kios
1998      IF( kios < 0 ) THEN         
1999         CALL ctl_warn( 'end of record or file while reading namelist '   &
2000            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2001      ENDIF
2002      !
2003      IF( kios > 0 ) THEN
2004         CALL ctl_stop( 'misspelled variable in namelist '   &
2005            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2006      ENDIF
2007      kios = 0
2008      RETURN
2009      !
2010   END SUBROUTINE ctl_nam
2011
2012
2013   INTEGER FUNCTION get_unit()
2014      !!----------------------------------------------------------------------
2015      !!                  ***  FUNCTION  get_unit  ***
2016      !!
2017      !! ** Purpose :   return the index of an unused logical unit
2018      !!----------------------------------------------------------------------
2019      LOGICAL :: llopn
2020      !!----------------------------------------------------------------------
2021      !
2022      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2023      llopn = .TRUE.
2024      DO WHILE( (get_unit < 998) .AND. llopn )
2025         get_unit = get_unit + 1
2026         INQUIRE( unit = get_unit, opened = llopn )
2027      END DO
2028      IF( (get_unit == 999) .AND. llopn ) THEN
2029         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2030         get_unit = -1
2031      ENDIF
2032      !
2033   END FUNCTION get_unit
2034
2035   !!----------------------------------------------------------------------
2036END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.