New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/UKMO/NEMO_4.0_GC_couple_pkg/src/OCE/LBC – NEMO

source: NEMO/branches/UKMO/NEMO_4.0_GC_couple_pkg/src/OCE/LBC/lib_mpp.F90 @ 11303

Last change on this file since 11303 was 11303, checked in by dancopsey, 5 years ago

update to be relative to 11081 of NEMO_4.0_mirror

File size: 91.3 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d    , mpp_lnk_3d    , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
85   PUBLIC   mpp_delay_max, mpp_delay_sum, mpp_delay_rcv
86   PUBLIC   mppscatter, mppgather
87   PUBLIC   mpp_ini_znl
88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
90   
91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
100   END INTERFACE
101   INTERFACE mpp_sum
102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
103         &             mppsum_realdd, mppsum_a_realdd
104   END INTERFACE
105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
111
112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
115!$AGRIF_DO_NOT_TREAT
116   INCLUDE 'mpif.h'
117!$AGRIF_END_DO_NOT_TREAT
118
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Type of send : standard, buffered, immediate
149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
152
153   ! Communications summary report
154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm calling routines
156   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_dlg                   !: names of delayed global comm calling routines
157   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
158   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
159   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
160   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
161   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
162   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 5000          !: max number of communication record
163   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
164   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
165   INTEGER, PUBLIC                               ::   n_sequence_dlg = 0           !: # of delayed global communications
166   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
167   LOGICAL, PUBLIC                               ::   l_full_nf_update = .TRUE.    !: logical for a full (2lines) update of bc at North fold report
168   INTEGER,                    PARAMETER, PUBLIC ::   nbdelay = 2       !: number of delayed operations
169   !: name (used as id) of allreduce-delayed operations
170   ! Warning: we must use the same character length in an array constructor (at least for gcc compiler)
171   CHARACTER(len=32), DIMENSION(nbdelay), PUBLIC ::   c_delaylist = (/ 'cflice', 'fwb   ' /)
172   !: component name where the allreduce-delayed operation is performed
173   CHARACTER(len=3),  DIMENSION(nbdelay), PUBLIC ::   c_delaycpnt = (/ 'ICE'   , 'OCE' /)
174   TYPE, PUBLIC ::   DELAYARR
175      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
176      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
177   END TYPE DELAYARR
178   TYPE( DELAYARR ), DIMENSION(nbdelay), PUBLIC, SAVE  ::   todelay         !: must have SAVE for default initialization of DELAYARR
179   INTEGER,          DIMENSION(nbdelay), PUBLIC        ::   ndelayid = -1   !: mpi request id of the delayed operations
180
181   ! timing summary report
182   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
183   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
184   
185   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
186
187   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
188   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
189
190   !!----------------------------------------------------------------------
191   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
192   !! $Id$
193   !! Software governed by the CeCILL license (see ./LICENSE)
194   !!----------------------------------------------------------------------
195CONTAINS
196
197   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
198      !!----------------------------------------------------------------------
199      !!                  ***  routine mynode  ***
200      !!
201      !! ** Purpose :   Find processor unit
202      !!----------------------------------------------------------------------
203      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
204      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
205      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
206      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
207      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
208      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
209      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
210      !
211      INTEGER ::   mynode, ierr, code, ji, ii, ios
212      LOGICAL ::   mpi_was_called
213      !
214      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, ln_nnogather
215      !!----------------------------------------------------------------------
216      !
217      ii = 1
218      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
219      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
220      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
221      !
222      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
223      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
224901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
225      !
226      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
227      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
228902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
229      !
230      !                              ! control print
231      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
232      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
233      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
234      !
235      IF( jpni < 1 .OR. jpnj < 1  ) THEN
236         WRITE(ldtxt(ii),*) '      jpni and jpnj will be calculated automatically' ;   ii = ii + 1
237      ELSE
238         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
239         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
240      ENDIF
241
242      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
243
244      CALL mpi_initialized ( mpi_was_called, code )
245      IF( code /= MPI_SUCCESS ) THEN
246         DO ji = 1, SIZE(ldtxt)
247            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
248         END DO
249         WRITE(*, cform_err)
250         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
251         CALL mpi_abort( mpi_comm_world, code, ierr )
252      ENDIF
253
254      IF( mpi_was_called ) THEN
255         !
256         SELECT CASE ( cn_mpi_send )
257         CASE ( 'S' )                ! Standard mpi send (blocking)
258            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
259         CASE ( 'B' )                ! Buffer mpi send (blocking)
260            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
261            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
262         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
263            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
264            l_isend = .TRUE.
265         CASE DEFAULT
266            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
267            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
268            kstop = kstop + 1
269         END SELECT
270         !
271      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
272         WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
273         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
274         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
275         kstop = kstop + 1
276      ELSE
277         SELECT CASE ( cn_mpi_send )
278         CASE ( 'S' )                ! Standard mpi send (blocking)
279            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
280            CALL mpi_init( ierr )
281         CASE ( 'B' )                ! Buffer mpi send (blocking)
282            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
283            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
284         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
285            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
286            l_isend = .TRUE.
287            CALL mpi_init( ierr )
288         CASE DEFAULT
289            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
290            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
291            kstop = kstop + 1
292         END SELECT
293         !
294      ENDIF
295
296      IF( PRESENT(localComm) ) THEN
297         IF( Agrif_Root() ) THEN
298            mpi_comm_oce = localComm
299         ENDIF
300      ELSE
301         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
302         IF( code /= MPI_SUCCESS ) THEN
303            DO ji = 1, SIZE(ldtxt)
304               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
305            END DO
306            WRITE(*, cform_err)
307            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
308            CALL mpi_abort( mpi_comm_world, code, ierr )
309         ENDIF
310      ENDIF
311
312#if defined key_agrif
313      IF( Agrif_Root() ) THEN
314         CALL Agrif_MPI_Init(mpi_comm_oce)
315      ELSE
316         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
317      ENDIF
318#endif
319
320      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
321      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
322      mynode = mpprank
323
324      IF( mynode == 0 ) THEN
325         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
326         WRITE(kumond, nammpp)     
327      ENDIF
328      !
329      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
330      !
331   END FUNCTION mynode
332
333   !!----------------------------------------------------------------------
334   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
335   !!
336   !!   * Argument : dummy argument use in mpp_lnk_... routines
337   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
338   !!                cd_nat :   nature of array grid-points
339   !!                psgn   :   sign used across the north fold boundary
340   !!                kfld   :   optional, number of pt3d arrays
341   !!                cd_mpp :   optional, fill the overlap area only
342   !!                pval   :   optional, background value (used at closed boundaries)
343   !!----------------------------------------------------------------------
344   !
345   !                       !==  2D array and array of 2D pointer  ==!
346   !
347#  define DIM_2d
348#     define ROUTINE_LNK           mpp_lnk_2d
349#     include "mpp_lnk_generic.h90"
350#     undef ROUTINE_LNK
351#     define MULTI
352#     define ROUTINE_LNK           mpp_lnk_2d_ptr
353#     include "mpp_lnk_generic.h90"
354#     undef ROUTINE_LNK
355#     undef MULTI
356#  undef DIM_2d
357   !
358   !                       !==  3D array and array of 3D pointer  ==!
359   !
360#  define DIM_3d
361#     define ROUTINE_LNK           mpp_lnk_3d
362#     include "mpp_lnk_generic.h90"
363#     undef ROUTINE_LNK
364#     define MULTI
365#     define ROUTINE_LNK           mpp_lnk_3d_ptr
366#     include "mpp_lnk_generic.h90"
367#     undef ROUTINE_LNK
368#     undef MULTI
369#  undef DIM_3d
370   !
371   !                       !==  4D array and array of 4D pointer  ==!
372   !
373#  define DIM_4d
374#     define ROUTINE_LNK           mpp_lnk_4d
375#     include "mpp_lnk_generic.h90"
376#     undef ROUTINE_LNK
377#     define MULTI
378#     define ROUTINE_LNK           mpp_lnk_4d_ptr
379#     include "mpp_lnk_generic.h90"
380#     undef ROUTINE_LNK
381#     undef MULTI
382#  undef DIM_4d
383
384   !!----------------------------------------------------------------------
385   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
386   !!
387   !!   * Argument : dummy argument use in mpp_nfd_... routines
388   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
389   !!                cd_nat :   nature of array grid-points
390   !!                psgn   :   sign used across the north fold boundary
391   !!                kfld   :   optional, number of pt3d arrays
392   !!                cd_mpp :   optional, fill the overlap area only
393   !!                pval   :   optional, background value (used at closed boundaries)
394   !!----------------------------------------------------------------------
395   !
396   !                       !==  2D array and array of 2D pointer  ==!
397   !
398#  define DIM_2d
399#     define ROUTINE_NFD           mpp_nfd_2d
400#     include "mpp_nfd_generic.h90"
401#     undef ROUTINE_NFD
402#     define MULTI
403#     define ROUTINE_NFD           mpp_nfd_2d_ptr
404#     include "mpp_nfd_generic.h90"
405#     undef ROUTINE_NFD
406#     undef MULTI
407#  undef DIM_2d
408   !
409   !                       !==  3D array and array of 3D pointer  ==!
410   !
411#  define DIM_3d
412#     define ROUTINE_NFD           mpp_nfd_3d
413#     include "mpp_nfd_generic.h90"
414#     undef ROUTINE_NFD
415#     define MULTI
416#     define ROUTINE_NFD           mpp_nfd_3d_ptr
417#     include "mpp_nfd_generic.h90"
418#     undef ROUTINE_NFD
419#     undef MULTI
420#  undef DIM_3d
421   !
422   !                       !==  4D array and array of 4D pointer  ==!
423   !
424#  define DIM_4d
425#     define ROUTINE_NFD           mpp_nfd_4d
426#     include "mpp_nfd_generic.h90"
427#     undef ROUTINE_NFD
428#     define MULTI
429#     define ROUTINE_NFD           mpp_nfd_4d_ptr
430#     include "mpp_nfd_generic.h90"
431#     undef ROUTINE_NFD
432#     undef MULTI
433#  undef DIM_4d
434
435
436   !!----------------------------------------------------------------------
437   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
438   !!
439   !!   * Argument : dummy argument use in mpp_lnk_... routines
440   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
441   !!                cd_nat :   nature of array grid-points
442   !!                psgn   :   sign used across the north fold boundary
443   !!                kb_bdy :   BDY boundary set
444   !!                kfld   :   optional, number of pt3d arrays
445   !!----------------------------------------------------------------------
446   !
447   !                       !==  2D array and array of 2D pointer  ==!
448   !
449#  define DIM_2d
450#     define ROUTINE_BDY           mpp_lnk_bdy_2d
451#     include "mpp_bdy_generic.h90"
452#     undef ROUTINE_BDY
453#  undef DIM_2d
454   !
455   !                       !==  3D array and array of 3D pointer  ==!
456   !
457#  define DIM_3d
458#     define ROUTINE_BDY           mpp_lnk_bdy_3d
459#     include "mpp_bdy_generic.h90"
460#     undef ROUTINE_BDY
461#  undef DIM_3d
462   !
463   !                       !==  4D array and array of 4D pointer  ==!
464   !
465#  define DIM_4d
466#     define ROUTINE_BDY           mpp_lnk_bdy_4d
467#     include "mpp_bdy_generic.h90"
468#     undef ROUTINE_BDY
469#  undef DIM_4d
470
471   !!----------------------------------------------------------------------
472   !!
473   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
474   
475   
476   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
477   
478   
479   !!----------------------------------------------------------------------
480
481
482
483   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
484      !!----------------------------------------------------------------------
485      !!                  ***  routine mppsend  ***
486      !!
487      !! ** Purpose :   Send messag passing array
488      !!
489      !!----------------------------------------------------------------------
490      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
491      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
492      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
493      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
494      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
495      !!
496      INTEGER ::   iflag
497      !!----------------------------------------------------------------------
498      !
499      SELECT CASE ( cn_mpi_send )
500      CASE ( 'S' )                ! Standard mpi send (blocking)
501         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
502      CASE ( 'B' )                ! Buffer mpi send (blocking)
503         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
504      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
505         ! be carefull, one more argument here : the mpi request identifier..
506         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
507      END SELECT
508      !
509   END SUBROUTINE mppsend
510
511
512   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
513      !!----------------------------------------------------------------------
514      !!                  ***  routine mpprecv  ***
515      !!
516      !! ** Purpose :   Receive messag passing array
517      !!
518      !!----------------------------------------------------------------------
519      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
520      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
521      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
522      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
523      !!
524      INTEGER :: istatus(mpi_status_size)
525      INTEGER :: iflag
526      INTEGER :: use_source
527      !!----------------------------------------------------------------------
528      !
529      ! If a specific process number has been passed to the receive call,
530      ! use that one. Default is to use mpi_any_source
531      use_source = mpi_any_source
532      IF( PRESENT(ksource) )   use_source = ksource
533      !
534      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
535      !
536   END SUBROUTINE mpprecv
537
538
539   SUBROUTINE mppgather( ptab, kp, pio )
540      !!----------------------------------------------------------------------
541      !!                   ***  routine mppgather  ***
542      !!
543      !! ** Purpose :   Transfert between a local subdomain array and a work
544      !!     array which is distributed following the vertical level.
545      !!
546      !!----------------------------------------------------------------------
547      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
548      INTEGER                           , INTENT(in   ) ::   kp     ! record length
549      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
550      !!
551      INTEGER :: itaille, ierror   ! temporary integer
552      !!---------------------------------------------------------------------
553      !
554      itaille = jpi * jpj
555      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
556         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
557      !
558   END SUBROUTINE mppgather
559
560
561   SUBROUTINE mppscatter( pio, kp, ptab )
562      !!----------------------------------------------------------------------
563      !!                  ***  routine mppscatter  ***
564      !!
565      !! ** Purpose :   Transfert between awork array which is distributed
566      !!      following the vertical level and the local subdomain array.
567      !!
568      !!----------------------------------------------------------------------
569      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
570      INTEGER                             ::   kp     ! Tag (not used with MPI
571      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
572      !!
573      INTEGER :: itaille, ierror   ! temporary integer
574      !!---------------------------------------------------------------------
575      !
576      itaille = jpi * jpj
577      !
578      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
579         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
580      !
581   END SUBROUTINE mppscatter
582
583   
584   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
585     !!----------------------------------------------------------------------
586      !!                   ***  routine mpp_delay_sum  ***
587      !!
588      !! ** Purpose :   performed delayed mpp_sum, the result is received on next call
589      !!
590      !!----------------------------------------------------------------------
591      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
592      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
593      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
594      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
595      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
596      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
597      !!
598      INTEGER ::   ji, isz
599      INTEGER ::   idvar
600      INTEGER ::   ierr, ilocalcomm
601      COMPLEX(wp), ALLOCATABLE, DIMENSION(:) ::   ytmp
602      !!----------------------------------------------------------------------
603      ilocalcomm = mpi_comm_oce
604      IF( PRESENT(kcom) )   ilocalcomm = kcom
605
606      isz = SIZE(y_in)
607     
608      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
609
610      idvar = -1
611      DO ji = 1, nbdelay
612         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
613      END DO
614      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_sum : please add a new delayed exchange for '//TRIM(cdname) )
615
616      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
617         !                                       --------------------------
618         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
619            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
620            DEALLOCATE(todelay(idvar)%z1d)
621            ndelayid(idvar) = -1                                      ! do as if we had no restart
622         ELSE
623            ALLOCATE(todelay(idvar)%y1d(isz))
624            todelay(idvar)%y1d(:) = CMPLX(todelay(idvar)%z1d(:), 0., wp)   ! create %y1d, complex variable needed by mpi_sumdd
625         END IF
626      ENDIF
627     
628      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %y1d and %z1d from y_in with blocking allreduce
629         !                                       --------------------------
630         ALLOCATE(todelay(idvar)%z1d(isz), todelay(idvar)%y1d(isz))   ! allocate also %z1d as used for the restart
631         CALL mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ierr )   ! get %y1d
632         todelay(idvar)%z1d(:) = REAL(todelay(idvar)%y1d(:), wp)      ! define %z1d from %y1d
633      ENDIF
634
635      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
636
637      ! send back pout from todelay(idvar)%z1d defined at previous call
638      pout(:) = todelay(idvar)%z1d(:)
639
640      ! send y_in into todelay(idvar)%y1d with a non-blocking communication
641#if defined key_mpi2
642      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
643      CALL  mpi_allreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
644      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
645#else
646      CALL mpi_iallreduce( y_in(:), todelay(idvar)%y1d(:), isz, MPI_DOUBLE_COMPLEX, mpi_sumdd, ilocalcomm, ndelayid(idvar), ierr )
647#endif
648
649   END SUBROUTINE mpp_delay_sum
650
651   
652   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
653      !!----------------------------------------------------------------------
654      !!                   ***  routine mpp_delay_max  ***
655      !!
656      !! ** Purpose :   performed delayed mpp_max, the result is received on next call
657      !!
658      !!----------------------------------------------------------------------
659      CHARACTER(len=*), INTENT(in   )                 ::   cdname  ! name of the calling subroutine
660      CHARACTER(len=*), INTENT(in   )                 ::   cdelay  ! name (used as id) of the delayed operation
661      REAL(wp),         INTENT(in   ), DIMENSION(:)   ::   p_in    !
662      REAL(wp),         INTENT(  out), DIMENSION(:)   ::   pout    !
663      LOGICAL,          INTENT(in   )                 ::   ldlast  ! true if this is the last time we call this routine
664      INTEGER,          INTENT(in   ), OPTIONAL       ::   kcom
665      !!
666      INTEGER ::   ji, isz
667      INTEGER ::   idvar
668      INTEGER ::   ierr, ilocalcomm
669      !!----------------------------------------------------------------------
670      ilocalcomm = mpi_comm_oce
671      IF( PRESENT(kcom) )   ilocalcomm = kcom
672
673      isz = SIZE(p_in)
674
675      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, ld_dlg = .TRUE. )
676
677      idvar = -1
678      DO ji = 1, nbdelay
679         IF( TRIM(cdelay) == TRIM(c_delaylist(ji)) ) idvar = ji
680      END DO
681      IF ( idvar == -1 )   CALL ctl_stop( 'STOP',' mpp_delay_max : please add a new delayed exchange for '//TRIM(cdname) )
682
683      IF ( ndelayid(idvar) == 0 ) THEN         ! first call    with restart: %z1d defined in iom_delay_rst
684         !                                       --------------------------
685         IF ( SIZE(todelay(idvar)%z1d) /= isz ) THEN                  ! Check dimension coherence
686            IF(lwp) WRITE(numout,*) ' WARNING: the nb of delayed variables in restart file is not the model one'
687            DEALLOCATE(todelay(idvar)%z1d)
688            ndelayid(idvar) = -1                                      ! do as if we had no restart
689         END IF
690      ENDIF
691
692      IF( ndelayid(idvar) == -1 ) THEN         ! first call without restart: define %z1d from p_in with a blocking allreduce
693         !                                       --------------------------
694         ALLOCATE(todelay(idvar)%z1d(isz))
695         CALL mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ierr )   ! get %z1d
696      ENDIF
697
698      IF( ndelayid(idvar) > 0 )   CALL mpp_delay_rcv( idvar )         ! make sure %z1d is received
699
700      ! send back pout from todelay(idvar)%z1d defined at previous call
701      pout(:) = todelay(idvar)%z1d(:)
702
703      ! send p_in into todelay(idvar)%z1d with a non-blocking communication
704#if defined key_mpi2
705      IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
706      CALL  mpi_allreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
707      IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
708#else
709      CALL mpi_iallreduce( p_in(:), todelay(idvar)%z1d(:), isz, MPI_DOUBLE_PRECISION, mpi_max, ilocalcomm, ndelayid(idvar), ierr )
710#endif
711
712   END SUBROUTINE mpp_delay_max
713
714   
715   SUBROUTINE mpp_delay_rcv( kid )
716      !!----------------------------------------------------------------------
717      !!                   ***  routine mpp_delay_rcv  ***
718      !!
719      !! ** Purpose :  force barrier for delayed mpp (needed for restart)
720      !!
721      !!----------------------------------------------------------------------
722      INTEGER,INTENT(in   )      ::  kid 
723      INTEGER ::   ierr
724      !!----------------------------------------------------------------------
725      IF( ndelayid(kid) /= -2 ) THEN 
726#if ! defined key_mpi2
727         IF( ln_timing ) CALL tic_tac( .TRUE., ld_global = .TRUE.)
728         CALL mpi_wait( ndelayid(kid), MPI_STATUS_IGNORE, ierr )                        ! make sure todelay(kid) is received
729         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
730#endif
731         IF( ASSOCIATED(todelay(kid)%y1d) )   todelay(kid)%z1d(:) = REAL(todelay(kid)%y1d(:), wp)  ! define %z1d from %y1d
732         ndelayid(kid) = -2   ! add flag to know that mpi_wait was already called on kid
733      ENDIF
734   END SUBROUTINE mpp_delay_rcv
735
736   
737   !!----------------------------------------------------------------------
738   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
739   !!   
740   !!----------------------------------------------------------------------
741   !!
742#  define OPERATION_MAX
743#  define INTEGER_TYPE
744#  define DIM_0d
745#     define ROUTINE_ALLREDUCE           mppmax_int
746#     include "mpp_allreduce_generic.h90"
747#     undef ROUTINE_ALLREDUCE
748#  undef DIM_0d
749#  define DIM_1d
750#     define ROUTINE_ALLREDUCE           mppmax_a_int
751#     include "mpp_allreduce_generic.h90"
752#     undef ROUTINE_ALLREDUCE
753#  undef DIM_1d
754#  undef INTEGER_TYPE
755!
756#  define REAL_TYPE
757#  define DIM_0d
758#     define ROUTINE_ALLREDUCE           mppmax_real
759#     include "mpp_allreduce_generic.h90"
760#     undef ROUTINE_ALLREDUCE
761#  undef DIM_0d
762#  define DIM_1d
763#     define ROUTINE_ALLREDUCE           mppmax_a_real
764#     include "mpp_allreduce_generic.h90"
765#     undef ROUTINE_ALLREDUCE
766#  undef DIM_1d
767#  undef REAL_TYPE
768#  undef OPERATION_MAX
769   !!----------------------------------------------------------------------
770   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
771   !!   
772   !!----------------------------------------------------------------------
773   !!
774#  define OPERATION_MIN
775#  define INTEGER_TYPE
776#  define DIM_0d
777#     define ROUTINE_ALLREDUCE           mppmin_int
778#     include "mpp_allreduce_generic.h90"
779#     undef ROUTINE_ALLREDUCE
780#  undef DIM_0d
781#  define DIM_1d
782#     define ROUTINE_ALLREDUCE           mppmin_a_int
783#     include "mpp_allreduce_generic.h90"
784#     undef ROUTINE_ALLREDUCE
785#  undef DIM_1d
786#  undef INTEGER_TYPE
787!
788#  define REAL_TYPE
789#  define DIM_0d
790#     define ROUTINE_ALLREDUCE           mppmin_real
791#     include "mpp_allreduce_generic.h90"
792#     undef ROUTINE_ALLREDUCE
793#  undef DIM_0d
794#  define DIM_1d
795#     define ROUTINE_ALLREDUCE           mppmin_a_real
796#     include "mpp_allreduce_generic.h90"
797#     undef ROUTINE_ALLREDUCE
798#  undef DIM_1d
799#  undef REAL_TYPE
800#  undef OPERATION_MIN
801
802   !!----------------------------------------------------------------------
803   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
804   !!   
805   !!   Global sum of 1D array or a variable (integer, real or complex)
806   !!----------------------------------------------------------------------
807   !!
808#  define OPERATION_SUM
809#  define INTEGER_TYPE
810#  define DIM_0d
811#     define ROUTINE_ALLREDUCE           mppsum_int
812#     include "mpp_allreduce_generic.h90"
813#     undef ROUTINE_ALLREDUCE
814#  undef DIM_0d
815#  define DIM_1d
816#     define ROUTINE_ALLREDUCE           mppsum_a_int
817#     include "mpp_allreduce_generic.h90"
818#     undef ROUTINE_ALLREDUCE
819#  undef DIM_1d
820#  undef INTEGER_TYPE
821!
822#  define REAL_TYPE
823#  define DIM_0d
824#     define ROUTINE_ALLREDUCE           mppsum_real
825#     include "mpp_allreduce_generic.h90"
826#     undef ROUTINE_ALLREDUCE
827#  undef DIM_0d
828#  define DIM_1d
829#     define ROUTINE_ALLREDUCE           mppsum_a_real
830#     include "mpp_allreduce_generic.h90"
831#     undef ROUTINE_ALLREDUCE
832#  undef DIM_1d
833#  undef REAL_TYPE
834#  undef OPERATION_SUM
835
836#  define OPERATION_SUM_DD
837#  define COMPLEX_TYPE
838#  define DIM_0d
839#     define ROUTINE_ALLREDUCE           mppsum_realdd
840#     include "mpp_allreduce_generic.h90"
841#     undef ROUTINE_ALLREDUCE
842#  undef DIM_0d
843#  define DIM_1d
844#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
845#     include "mpp_allreduce_generic.h90"
846#     undef ROUTINE_ALLREDUCE
847#  undef DIM_1d
848#  undef COMPLEX_TYPE
849#  undef OPERATION_SUM_DD
850
851   !!----------------------------------------------------------------------
852   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
853   !!   
854   !!----------------------------------------------------------------------
855   !!
856#  define OPERATION_MINLOC
857#  define DIM_2d
858#     define ROUTINE_LOC           mpp_minloc2d
859#     include "mpp_loc_generic.h90"
860#     undef ROUTINE_LOC
861#  undef DIM_2d
862#  define DIM_3d
863#     define ROUTINE_LOC           mpp_minloc3d
864#     include "mpp_loc_generic.h90"
865#     undef ROUTINE_LOC
866#  undef DIM_3d
867#  undef OPERATION_MINLOC
868
869#  define OPERATION_MAXLOC
870#  define DIM_2d
871#     define ROUTINE_LOC           mpp_maxloc2d
872#     include "mpp_loc_generic.h90"
873#     undef ROUTINE_LOC
874#  undef DIM_2d
875#  define DIM_3d
876#     define ROUTINE_LOC           mpp_maxloc3d
877#     include "mpp_loc_generic.h90"
878#     undef ROUTINE_LOC
879#  undef DIM_3d
880#  undef OPERATION_MAXLOC
881
882   SUBROUTINE mppsync()
883      !!----------------------------------------------------------------------
884      !!                  ***  routine mppsync  ***
885      !!
886      !! ** Purpose :   Massively parallel processors, synchroneous
887      !!
888      !!-----------------------------------------------------------------------
889      INTEGER :: ierror
890      !!-----------------------------------------------------------------------
891      !
892      CALL mpi_barrier( mpi_comm_oce, ierror )
893      !
894   END SUBROUTINE mppsync
895
896
897   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
898   
899   USE mod_oasis      ! coupling routines
900
901      !!----------------------------------------------------------------------
902      !!                  ***  routine mppstop  ***
903      !!
904      !! ** purpose :   Stop massively parallel processors method
905      !!
906      !!----------------------------------------------------------------------
907      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
908      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
909      LOGICAL ::   llfinal, ll_force_abort
910      INTEGER ::   info
911      !!----------------------------------------------------------------------
912      llfinal = .FALSE.
913      IF( PRESENT(ldfinal) ) llfinal = ldfinal
914      ll_force_abort = .FALSE.
915      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
916      !
917     
918#if defined key_oasis3
919      ! If we're trying to shut down cleanly then we need to consider the fact
920      ! that this could be part of an MPMD configuration - we don't want to
921      ! leave other components deadlocked.
922
923      CALL oasis_abort(nproc,"mppstop","NEMO initiated abort")
924
925
926#else
927      IF(ll_force_abort) THEN
928         CALL mpi_abort( MPI_COMM_WORLD )
929      ELSE
930         CALL mppsync
931         CALL mpi_finalize( info )
932      ENDIF
933      IF( .NOT. llfinal ) STOP 123
934      !
935#endif
936   END SUBROUTINE mppstop
937
938
939   SUBROUTINE mpp_comm_free( kcom )
940      !!----------------------------------------------------------------------
941      INTEGER, INTENT(in) ::   kcom
942      !!
943      INTEGER :: ierr
944      !!----------------------------------------------------------------------
945      !
946      CALL MPI_COMM_FREE(kcom, ierr)
947      !
948   END SUBROUTINE mpp_comm_free
949
950
951   SUBROUTINE mpp_ini_znl( kumout )
952      !!----------------------------------------------------------------------
953      !!               ***  routine mpp_ini_znl  ***
954      !!
955      !! ** Purpose :   Initialize special communicator for computing zonal sum
956      !!
957      !! ** Method  : - Look for processors in the same row
958      !!              - Put their number in nrank_znl
959      !!              - Create group for the znl processors
960      !!              - Create a communicator for znl processors
961      !!              - Determine if processor should write znl files
962      !!
963      !! ** output
964      !!      ndim_rank_znl = number of processors on the same row
965      !!      ngrp_znl = group ID for the znl processors
966      !!      ncomm_znl = communicator for the ice procs.
967      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
968      !!
969      !!----------------------------------------------------------------------
970      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
971      !
972      INTEGER :: jproc      ! dummy loop integer
973      INTEGER :: ierr, ii   ! local integer
974      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
975      !!----------------------------------------------------------------------
976      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
977      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
978      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
979      !
980      ALLOCATE( kwork(jpnij), STAT=ierr )
981      IF( ierr /= 0 ) THEN
982         WRITE(kumout, cform_err)
983         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
984         CALL mppstop
985      ENDIF
986
987      IF( jpnj == 1 ) THEN
988         ngrp_znl  = ngrp_world
989         ncomm_znl = mpi_comm_oce
990      ELSE
991         !
992         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
993         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
994         !-$$        CALL flush(numout)
995         !
996         ! Count number of processors on the same row
997         ndim_rank_znl = 0
998         DO jproc=1,jpnij
999            IF ( kwork(jproc) == njmpp ) THEN
1000               ndim_rank_znl = ndim_rank_znl + 1
1001            ENDIF
1002         END DO
1003         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1004         !-$$        CALL flush(numout)
1005         ! Allocate the right size to nrank_znl
1006         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1007         ALLOCATE(nrank_znl(ndim_rank_znl))
1008         ii = 0
1009         nrank_znl (:) = 0
1010         DO jproc=1,jpnij
1011            IF ( kwork(jproc) == njmpp) THEN
1012               ii = ii + 1
1013               nrank_znl(ii) = jproc -1
1014            ENDIF
1015         END DO
1016         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1017         !-$$        CALL flush(numout)
1018
1019         ! Create the opa group
1020         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
1021         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1022         !-$$        CALL flush(numout)
1023
1024         ! Create the znl group from the opa group
1025         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1026         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1027         !-$$        CALL flush(numout)
1028
1029         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1030         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
1031         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1032         !-$$        CALL flush(numout)
1033         !
1034      END IF
1035
1036      ! Determines if processor if the first (starting from i=1) on the row
1037      IF ( jpni == 1 ) THEN
1038         l_znl_root = .TRUE.
1039      ELSE
1040         l_znl_root = .FALSE.
1041         kwork (1) = nimpp
1042         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
1043         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1044      END IF
1045
1046      DEALLOCATE(kwork)
1047
1048   END SUBROUTINE mpp_ini_znl
1049
1050
1051   SUBROUTINE mpp_ini_north
1052      !!----------------------------------------------------------------------
1053      !!               ***  routine mpp_ini_north  ***
1054      !!
1055      !! ** Purpose :   Initialize special communicator for north folding
1056      !!      condition together with global variables needed in the mpp folding
1057      !!
1058      !! ** Method  : - Look for northern processors
1059      !!              - Put their number in nrank_north
1060      !!              - Create groups for the world processors and the north processors
1061      !!              - Create a communicator for northern processors
1062      !!
1063      !! ** output
1064      !!      njmppmax = njmpp for northern procs
1065      !!      ndim_rank_north = number of processors in the northern line
1066      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1067      !!      ngrp_world = group ID for the world processors
1068      !!      ngrp_north = group ID for the northern processors
1069      !!      ncomm_north = communicator for the northern procs.
1070      !!      north_root = number (in the world) of proc 0 in the northern comm.
1071      !!
1072      !!----------------------------------------------------------------------
1073      INTEGER ::   ierr
1074      INTEGER ::   jjproc
1075      INTEGER ::   ii, ji
1076      !!----------------------------------------------------------------------
1077      !
1078      njmppmax = MAXVAL( njmppt )
1079      !
1080      ! Look for how many procs on the northern boundary
1081      ndim_rank_north = 0
1082      DO jjproc = 1, jpnij
1083         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1084      END DO
1085      !
1086      ! Allocate the right size to nrank_north
1087      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1088      ALLOCATE( nrank_north(ndim_rank_north) )
1089
1090      ! Fill the nrank_north array with proc. number of northern procs.
1091      ! Note : the rank start at 0 in MPI
1092      ii = 0
1093      DO ji = 1, jpnij
1094         IF ( njmppt(ji) == njmppmax   ) THEN
1095            ii=ii+1
1096            nrank_north(ii)=ji-1
1097         END IF
1098      END DO
1099      !
1100      ! create the world group
1101      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
1102      !
1103      ! Create the North group from the world group
1104      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1105      !
1106      ! Create the North communicator , ie the pool of procs in the north group
1107      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
1108      !
1109   END SUBROUTINE mpp_ini_north
1110
1111
1112   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
1113      !!---------------------------------------------------------------------
1114      !!                   ***  routine mpp_init.opa  ***
1115      !!
1116      !! ** Purpose :: export and attach a MPI buffer for bsend
1117      !!
1118      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
1119      !!            but classical mpi_init
1120      !!
1121      !! History :: 01/11 :: IDRIS initial version for IBM only
1122      !!            08/04 :: R. Benshila, generalisation
1123      !!---------------------------------------------------------------------
1124      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
1125      INTEGER                      , INTENT(inout) ::   ksft
1126      INTEGER                      , INTENT(  out) ::   code
1127      INTEGER                                      ::   ierr, ji
1128      LOGICAL                                      ::   mpi_was_called
1129      !!---------------------------------------------------------------------
1130      !
1131      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
1132      IF ( code /= MPI_SUCCESS ) THEN
1133         DO ji = 1, SIZE(ldtxt)
1134            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1135         END DO
1136         WRITE(*, cform_err)
1137         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
1138         CALL mpi_abort( mpi_comm_world, code, ierr )
1139      ENDIF
1140      !
1141      IF( .NOT. mpi_was_called ) THEN
1142         CALL mpi_init( code )
1143         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
1144         IF ( code /= MPI_SUCCESS ) THEN
1145            DO ji = 1, SIZE(ldtxt)
1146               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1147            END DO
1148            WRITE(*, cform_err)
1149            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
1150            CALL mpi_abort( mpi_comm_world, code, ierr )
1151         ENDIF
1152      ENDIF
1153      !
1154      IF( nn_buffer > 0 ) THEN
1155         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
1156         ! Buffer allocation and attachment
1157         ALLOCATE( tampon(nn_buffer), stat = ierr )
1158         IF( ierr /= 0 ) THEN
1159            DO ji = 1, SIZE(ldtxt)
1160               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1161            END DO
1162            WRITE(*, cform_err)
1163            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1164            CALL mpi_abort( mpi_comm_world, code, ierr )
1165         END IF
1166         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1167      ENDIF
1168      !
1169   END SUBROUTINE mpi_init_oce
1170
1171
1172   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1173      !!---------------------------------------------------------------------
1174      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1175      !!
1176      !!   Modification of original codes written by David H. Bailey
1177      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1178      !!---------------------------------------------------------------------
1179      INTEGER                     , INTENT(in)    ::   ilen, itype
1180      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1181      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1182      !
1183      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1184      INTEGER  :: ji, ztmp           ! local scalar
1185      !!---------------------------------------------------------------------
1186      !
1187      ztmp = itype   ! avoid compilation warning
1188      !
1189      DO ji=1,ilen
1190      ! Compute ydda + yddb using Knuth's trick.
1191         zt1  = real(ydda(ji)) + real(yddb(ji))
1192         zerr = zt1 - real(ydda(ji))
1193         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1194                + aimag(ydda(ji)) + aimag(yddb(ji))
1195
1196         ! The result is zt1 + zt2, after normalization.
1197         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1198      END DO
1199      !
1200   END SUBROUTINE DDPDD_MPI
1201
1202
1203   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1204      !!---------------------------------------------------------------------
1205      !!                   ***  routine mpp_lbc_north_icb  ***
1206      !!
1207      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1208      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1209      !!              array with outer extra halo
1210      !!
1211      !! ** Method  :   North fold condition and mpp with more than one proc
1212      !!              in i-direction require a specific treatment. We gather
1213      !!              the 4+kextj northern lines of the global domain on 1
1214      !!              processor and apply lbc north-fold on this sub array.
1215      !!              Then we scatter the north fold array back to the processors.
1216      !!              This routine accounts for an extra halo with icebergs
1217      !!              and assumes ghost rows and columns have been suppressed.
1218      !!
1219      !!----------------------------------------------------------------------
1220      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1221      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1222      !                                                     !   = T ,  U , V , F or W -points
1223      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1224      !!                                                    ! north fold, =  1. otherwise
1225      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1226      !
1227      INTEGER ::   ji, jj, jr
1228      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1229      INTEGER ::   ipj, ij, iproc
1230      !
1231      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1232      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1233      !!----------------------------------------------------------------------
1234      !
1235      ipj=4
1236      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1237     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1238     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1239      !
1240      ztab_e(:,:)      = 0._wp
1241      znorthloc_e(:,:) = 0._wp
1242      !
1243      ij = 1 - kextj
1244      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1245      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1246         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1247         ij = ij + 1
1248      END DO
1249      !
1250      itaille = jpimax * ( ipj + 2*kextj )
1251      !
1252      IF( ln_timing ) CALL tic_tac(.TRUE.)
1253      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1254         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1255         &                ncomm_north, ierr )
1256      !
1257      IF( ln_timing ) CALL tic_tac(.FALSE.)
1258      !
1259      DO jr = 1, ndim_rank_north            ! recover the global north array
1260         iproc = nrank_north(jr) + 1
1261         ildi = nldit (iproc)
1262         ilei = nleit (iproc)
1263         iilb = nimppt(iproc)
1264         DO jj = 1-kextj, ipj+kextj
1265            DO ji = ildi, ilei
1266               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1267            END DO
1268         END DO
1269      END DO
1270
1271      ! 2. North-Fold boundary conditions
1272      ! ----------------------------------
1273      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1274
1275      ij = 1 - kextj
1276      !! Scatter back to pt2d
1277      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1278         DO ji= 1, jpi
1279            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1280         END DO
1281         ij  = ij +1
1282      END DO
1283      !
1284      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1285      !
1286   END SUBROUTINE mpp_lbc_north_icb
1287
1288
1289   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1290      !!----------------------------------------------------------------------
1291      !!                  ***  routine mpp_lnk_2d_icb  ***
1292      !!
1293      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1294      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1295      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1296      !!
1297      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1298      !!      between processors following neighboring subdomains.
1299      !!            domain parameters
1300      !!                    jpi    : first dimension of the local subdomain
1301      !!                    jpj    : second dimension of the local subdomain
1302      !!                    kexti  : number of columns for extra outer halo
1303      !!                    kextj  : number of rows for extra outer halo
1304      !!                    nbondi : mark for "east-west local boundary"
1305      !!                    nbondj : mark for "north-south local boundary"
1306      !!                    noea   : number for local neighboring processors
1307      !!                    nowe   : number for local neighboring processors
1308      !!                    noso   : number for local neighboring processors
1309      !!                    nono   : number for local neighboring processors
1310      !!----------------------------------------------------------------------
1311      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1312      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1313      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1314      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1315      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1316      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1317      !
1318      INTEGER  ::   jl   ! dummy loop indices
1319      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1320      INTEGER  ::   ipreci, iprecj             !   -       -
1321      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1322      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1323      !!
1324      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1325      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1326      !!----------------------------------------------------------------------
1327
1328      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1329      iprecj = nn_hls + kextj
1330
1331      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1332
1333      ! 1. standard boundary treatment
1334      ! ------------------------------
1335      ! Order matters Here !!!!
1336      !
1337      !                                      ! East-West boundaries
1338      !                                           !* Cyclic east-west
1339      IF( l_Iperio ) THEN
1340         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1341         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1342         !
1343      ELSE                                        !* closed
1344         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1345                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1346      ENDIF
1347      !                                      ! North-South boundaries
1348      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1349         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1350         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1351      ELSE                                        !* closed
1352         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1353                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1354      ENDIF
1355      !
1356
1357      ! north fold treatment
1358      ! -----------------------
1359      IF( npolj /= 0 ) THEN
1360         !
1361         SELECT CASE ( jpni )
1362                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1363                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1364         END SELECT
1365         !
1366      ENDIF
1367
1368      ! 2. East and west directions exchange
1369      ! ------------------------------------
1370      ! we play with the neigbours AND the row number because of the periodicity
1371      !
1372      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1373      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1374         iihom = jpi-nreci-kexti
1375         DO jl = 1, ipreci
1376            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1377            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1378         END DO
1379      END SELECT
1380      !
1381      !                           ! Migrations
1382      imigr = ipreci * ( jpj + 2*kextj )
1383      !
1384      IF( ln_timing ) CALL tic_tac(.TRUE.)
1385      !
1386      SELECT CASE ( nbondi )
1387      CASE ( -1 )
1388         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1389         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1390         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1391      CASE ( 0 )
1392         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1393         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1394         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1395         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1396         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1397         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1398      CASE ( 1 )
1399         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1400         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1401         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1402      END SELECT
1403      !
1404      IF( ln_timing ) CALL tic_tac(.FALSE.)
1405      !
1406      !                           ! Write Dirichlet lateral conditions
1407      iihom = jpi - nn_hls
1408      !
1409      SELECT CASE ( nbondi )
1410      CASE ( -1 )
1411         DO jl = 1, ipreci
1412            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1413         END DO
1414      CASE ( 0 )
1415         DO jl = 1, ipreci
1416            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1417            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1418         END DO
1419      CASE ( 1 )
1420         DO jl = 1, ipreci
1421            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1422         END DO
1423      END SELECT
1424
1425
1426      ! 3. North and south directions
1427      ! -----------------------------
1428      ! always closed : we play only with the neigbours
1429      !
1430      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1431         ijhom = jpj-nrecj-kextj
1432         DO jl = 1, iprecj
1433            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1434            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1435         END DO
1436      ENDIF
1437      !
1438      !                           ! Migrations
1439      imigr = iprecj * ( jpi + 2*kexti )
1440      !
1441      IF( ln_timing ) CALL tic_tac(.TRUE.)
1442      !
1443      SELECT CASE ( nbondj )
1444      CASE ( -1 )
1445         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1446         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1447         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1448      CASE ( 0 )
1449         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1450         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1451         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1452         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1453         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1454         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1455      CASE ( 1 )
1456         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1457         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1458         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1459      END SELECT
1460      !
1461      IF( ln_timing ) CALL tic_tac(.FALSE.)
1462      !
1463      !                           ! Write Dirichlet lateral conditions
1464      ijhom = jpj - nn_hls
1465      !
1466      SELECT CASE ( nbondj )
1467      CASE ( -1 )
1468         DO jl = 1, iprecj
1469            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1470         END DO
1471      CASE ( 0 )
1472         DO jl = 1, iprecj
1473            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1474            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1475         END DO
1476      CASE ( 1 )
1477         DO jl = 1, iprecj
1478            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1479         END DO
1480      END SELECT
1481      !
1482   END SUBROUTINE mpp_lnk_2d_icb
1483
1484
1485   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb, ld_dlg )
1486      !!----------------------------------------------------------------------
1487      !!                  ***  routine mpp_report  ***
1488      !!
1489      !! ** Purpose :   report use of mpp routines per time-setp
1490      !!
1491      !!----------------------------------------------------------------------
1492      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1493      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1494      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb, ld_dlg
1495      !!
1496      CHARACTER(len=128)                        ::   ccountname  ! name of a subroutine to count communications
1497      LOGICAL ::   ll_lbc, ll_glb, ll_dlg
1498      INTEGER ::    ji,  jj,  jk,  jh, jf, jcount   ! dummy loop indices
1499      !!----------------------------------------------------------------------
1500      !
1501      ll_lbc = .FALSE.
1502      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1503      ll_glb = .FALSE.
1504      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1505      ll_dlg = .FALSE.
1506      IF( PRESENT(ld_dlg) ) ll_dlg = ld_dlg
1507      !
1508      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1509      IF( ncom_dttrc /= 1 )   CALL ctl_stop( 'STOP', 'mpp_report, ncom_dttrc /= 1 not coded...' ) 
1510      ncom_freq = ncom_fsbc
1511      !
1512      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1513         IF( ll_lbc ) THEN
1514            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1515            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1516            n_sequence_lbc = n_sequence_lbc + 1
1517            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1518            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1519            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1520            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1521         ENDIF
1522         IF( ll_glb ) THEN
1523            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1524            n_sequence_glb = n_sequence_glb + 1
1525            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1526            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1527         ENDIF
1528         IF( ll_dlg ) THEN
1529            IF( .NOT. ALLOCATED(crname_dlg) ) ALLOCATE( crname_dlg(ncom_rec_max) )
1530            n_sequence_dlg = n_sequence_dlg + 1
1531            IF( n_sequence_dlg > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lib_mpp, increase ncom_rec_max' )   ! deadlock
1532            crname_dlg(n_sequence_dlg) = cdname     ! keep the name of the calling routine
1533         ENDIF
1534      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1535         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1536         WRITE(numcom,*) ' '
1537         WRITE(numcom,*) ' ------------------------------------------------------------'
1538         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1539         WRITE(numcom,*) ' ------------------------------------------------------------'
1540         WRITE(numcom,*) ' '
1541         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1542         jj = 0; jk = 0; jf = 0; jh = 0
1543         DO ji = 1, n_sequence_lbc
1544            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1545            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1546            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1547            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1548         END DO
1549         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1550         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1551         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1552         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1553         WRITE(numcom,*) ' '
1554         WRITE(numcom,*) ' lbc_lnk called'
1555         DO ji = 1, n_sequence_lbc - 1
1556            IF ( crname_lbc(ji) /= 'already counted' ) THEN
1557               ccountname = crname_lbc(ji)
1558               crname_lbc(ji) = 'already counted'
1559               jcount = 1
1560               DO jj = ji + 1, n_sequence_lbc
1561                  IF ( ccountname ==  crname_lbc(jj) ) THEN
1562                     jcount = jcount + 1
1563                     crname_lbc(jj) = 'already counted'
1564                  END IF
1565               END DO
1566               WRITE(numcom,'(A, I4, A, A)') ' - ', jcount,' times by subroutine ', TRIM(ccountname)
1567            END IF
1568         END DO
1569         IF ( crname_lbc(n_sequence_lbc) /= 'already counted' ) THEN
1570            WRITE(numcom,'(A, I4, A, A)') ' - ', 1,' times by subroutine ', TRIM(crname_lbc(ncom_rec_max))
1571         END IF
1572         WRITE(numcom,*) ' '
1573         IF ( n_sequence_glb > 0 ) THEN
1574            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1575            jj = 1
1576            DO ji = 2, n_sequence_glb
1577               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1578                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1579                  jj = 0
1580               END IF
1581               jj = jj + 1 
1582            END DO
1583            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1584            DEALLOCATE(crname_glb)
1585         ELSE
1586            WRITE(numcom,*) ' No MPI global communication '
1587         ENDIF
1588         WRITE(numcom,*) ' '
1589         IF ( n_sequence_dlg > 0 ) THEN
1590            WRITE(numcom,'(A,I4)') ' Delayed global communications : ', n_sequence_dlg
1591            jj = 1
1592            DO ji = 2, n_sequence_dlg
1593               IF( crname_dlg(ji-1) /= crname_dlg(ji) ) THEN
1594                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(ji-1))
1595                  jj = 0
1596               END IF
1597               jj = jj + 1 
1598            END DO
1599            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_dlg(n_sequence_dlg))
1600            DEALLOCATE(crname_dlg)
1601         ELSE
1602            WRITE(numcom,*) ' No MPI delayed global communication '
1603         ENDIF
1604         WRITE(numcom,*) ' '
1605         WRITE(numcom,*) ' -----------------------------------------------'
1606         WRITE(numcom,*) ' '
1607         DEALLOCATE(ncomm_sequence)
1608         DEALLOCATE(crname_lbc)
1609      ENDIF
1610   END SUBROUTINE mpp_report
1611
1612   
1613   SUBROUTINE tic_tac (ld_tic, ld_global)
1614
1615    LOGICAL,           INTENT(IN) :: ld_tic
1616    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1617    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1618    REAL(wp),               SAVE :: tic_ct = 0._wp
1619    INTEGER :: ii
1620
1621    IF( ncom_stp <= nit000 ) RETURN
1622    IF( ncom_stp == nitend ) RETURN
1623    ii = 1
1624    IF( PRESENT( ld_global ) ) THEN
1625       IF( ld_global ) ii = 2
1626    END IF
1627   
1628    IF ( ld_tic ) THEN
1629       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1630       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1631    ELSE
1632       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1633       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1634    ENDIF
1635   
1636   END SUBROUTINE tic_tac
1637
1638   
1639#else
1640   !!----------------------------------------------------------------------
1641   !!   Default case:            Dummy module        share memory computing
1642   !!----------------------------------------------------------------------
1643   USE in_out_manager
1644
1645   INTERFACE mpp_sum
1646      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1647   END INTERFACE
1648   INTERFACE mpp_max
1649      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1650   END INTERFACE
1651   INTERFACE mpp_min
1652      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1653   END INTERFACE
1654   INTERFACE mpp_minloc
1655      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1656   END INTERFACE
1657   INTERFACE mpp_maxloc
1658      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1659   END INTERFACE
1660
1661   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1662   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1663   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1664
1665   INTEGER, PARAMETER, PUBLIC               ::   nbdelay = 0   ! make sure we don't enter loops: DO ji = 1, nbdelay
1666   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaylist = 'empty'
1667   CHARACTER(len=32), DIMENSION(1), PUBLIC  ::   c_delaycpnt = 'empty'
1668   LOGICAL, PUBLIC                          ::   l_full_nf_update = .TRUE.
1669   TYPE ::   DELAYARR
1670      REAL(   wp), POINTER, DIMENSION(:) ::  z1d => NULL()
1671      COMPLEX(wp), POINTER, DIMENSION(:) ::  y1d => NULL()
1672   END TYPE DELAYARR
1673   TYPE( DELAYARR ), DIMENSION(1), PUBLIC  ::   todelay             
1674   INTEGER,  PUBLIC, DIMENSION(1)           ::   ndelayid = -1
1675   !!----------------------------------------------------------------------
1676CONTAINS
1677
1678   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1679      INTEGER, INTENT(in) ::   kumout
1680      lib_mpp_alloc = 0
1681   END FUNCTION lib_mpp_alloc
1682
1683   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1684      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1685      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1686      CHARACTER(len=*) ::   ldname
1687      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1688      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1689      function_value = 0
1690      IF( .FALSE. )   ldtxt(:) = 'never done'
1691      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1692   END FUNCTION mynode
1693
1694   SUBROUTINE mppsync                       ! Dummy routine
1695   END SUBROUTINE mppsync
1696
1697   !!----------------------------------------------------------------------
1698   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1699   !!   
1700   !!----------------------------------------------------------------------
1701   !!
1702#  define OPERATION_MAX
1703#  define INTEGER_TYPE
1704#  define DIM_0d
1705#     define ROUTINE_ALLREDUCE           mppmax_int
1706#     include "mpp_allreduce_generic.h90"
1707#     undef ROUTINE_ALLREDUCE
1708#  undef DIM_0d
1709#  define DIM_1d
1710#     define ROUTINE_ALLREDUCE           mppmax_a_int
1711#     include "mpp_allreduce_generic.h90"
1712#     undef ROUTINE_ALLREDUCE
1713#  undef DIM_1d
1714#  undef INTEGER_TYPE
1715!
1716#  define REAL_TYPE
1717#  define DIM_0d
1718#     define ROUTINE_ALLREDUCE           mppmax_real
1719#     include "mpp_allreduce_generic.h90"
1720#     undef ROUTINE_ALLREDUCE
1721#  undef DIM_0d
1722#  define DIM_1d
1723#     define ROUTINE_ALLREDUCE           mppmax_a_real
1724#     include "mpp_allreduce_generic.h90"
1725#     undef ROUTINE_ALLREDUCE
1726#  undef DIM_1d
1727#  undef REAL_TYPE
1728#  undef OPERATION_MAX
1729   !!----------------------------------------------------------------------
1730   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1731   !!   
1732   !!----------------------------------------------------------------------
1733   !!
1734#  define OPERATION_MIN
1735#  define INTEGER_TYPE
1736#  define DIM_0d
1737#     define ROUTINE_ALLREDUCE           mppmin_int
1738#     include "mpp_allreduce_generic.h90"
1739#     undef ROUTINE_ALLREDUCE
1740#  undef DIM_0d
1741#  define DIM_1d
1742#     define ROUTINE_ALLREDUCE           mppmin_a_int
1743#     include "mpp_allreduce_generic.h90"
1744#     undef ROUTINE_ALLREDUCE
1745#  undef DIM_1d
1746#  undef INTEGER_TYPE
1747!
1748#  define REAL_TYPE
1749#  define DIM_0d
1750#     define ROUTINE_ALLREDUCE           mppmin_real
1751#     include "mpp_allreduce_generic.h90"
1752#     undef ROUTINE_ALLREDUCE
1753#  undef DIM_0d
1754#  define DIM_1d
1755#     define ROUTINE_ALLREDUCE           mppmin_a_real
1756#     include "mpp_allreduce_generic.h90"
1757#     undef ROUTINE_ALLREDUCE
1758#  undef DIM_1d
1759#  undef REAL_TYPE
1760#  undef OPERATION_MIN
1761
1762   !!----------------------------------------------------------------------
1763   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1764   !!   
1765   !!   Global sum of 1D array or a variable (integer, real or complex)
1766   !!----------------------------------------------------------------------
1767   !!
1768#  define OPERATION_SUM
1769#  define INTEGER_TYPE
1770#  define DIM_0d
1771#     define ROUTINE_ALLREDUCE           mppsum_int
1772#     include "mpp_allreduce_generic.h90"
1773#     undef ROUTINE_ALLREDUCE
1774#  undef DIM_0d
1775#  define DIM_1d
1776#     define ROUTINE_ALLREDUCE           mppsum_a_int
1777#     include "mpp_allreduce_generic.h90"
1778#     undef ROUTINE_ALLREDUCE
1779#  undef DIM_1d
1780#  undef INTEGER_TYPE
1781!
1782#  define REAL_TYPE
1783#  define DIM_0d
1784#     define ROUTINE_ALLREDUCE           mppsum_real
1785#     include "mpp_allreduce_generic.h90"
1786#     undef ROUTINE_ALLREDUCE
1787#  undef DIM_0d
1788#  define DIM_1d
1789#     define ROUTINE_ALLREDUCE           mppsum_a_real
1790#     include "mpp_allreduce_generic.h90"
1791#     undef ROUTINE_ALLREDUCE
1792#  undef DIM_1d
1793#  undef REAL_TYPE
1794#  undef OPERATION_SUM
1795
1796#  define OPERATION_SUM_DD
1797#  define COMPLEX_TYPE
1798#  define DIM_0d
1799#     define ROUTINE_ALLREDUCE           mppsum_realdd
1800#     include "mpp_allreduce_generic.h90"
1801#     undef ROUTINE_ALLREDUCE
1802#  undef DIM_0d
1803#  define DIM_1d
1804#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1805#     include "mpp_allreduce_generic.h90"
1806#     undef ROUTINE_ALLREDUCE
1807#  undef DIM_1d
1808#  undef COMPLEX_TYPE
1809#  undef OPERATION_SUM_DD
1810
1811   !!----------------------------------------------------------------------
1812   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1813   !!   
1814   !!----------------------------------------------------------------------
1815   !!
1816#  define OPERATION_MINLOC
1817#  define DIM_2d
1818#     define ROUTINE_LOC           mpp_minloc2d
1819#     include "mpp_loc_generic.h90"
1820#     undef ROUTINE_LOC
1821#  undef DIM_2d
1822#  define DIM_3d
1823#     define ROUTINE_LOC           mpp_minloc3d
1824#     include "mpp_loc_generic.h90"
1825#     undef ROUTINE_LOC
1826#  undef DIM_3d
1827#  undef OPERATION_MINLOC
1828
1829#  define OPERATION_MAXLOC
1830#  define DIM_2d
1831#     define ROUTINE_LOC           mpp_maxloc2d
1832#     include "mpp_loc_generic.h90"
1833#     undef ROUTINE_LOC
1834#  undef DIM_2d
1835#  define DIM_3d
1836#     define ROUTINE_LOC           mpp_maxloc3d
1837#     include "mpp_loc_generic.h90"
1838#     undef ROUTINE_LOC
1839#  undef DIM_3d
1840#  undef OPERATION_MAXLOC
1841
1842   SUBROUTINE mpp_delay_sum( cdname, cdelay, y_in, pout, ldlast, kcom )
1843      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1844      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1845      COMPLEX(wp),      INTENT(in   ), DIMENSION(:) ::   y_in
1846      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1847      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1848      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1849      !
1850      pout(:) = REAL(y_in(:), wp)
1851   END SUBROUTINE mpp_delay_sum
1852
1853   SUBROUTINE mpp_delay_max( cdname, cdelay, p_in, pout, ldlast, kcom )
1854      CHARACTER(len=*), INTENT(in   )               ::   cdname  ! name of the calling subroutine
1855      CHARACTER(len=*), INTENT(in   )               ::   cdelay  ! name (used as id) of the delayed operation
1856      REAL(wp),         INTENT(in   ), DIMENSION(:) ::   p_in
1857      REAL(wp),         INTENT(  out), DIMENSION(:) ::   pout
1858      LOGICAL,          INTENT(in   )               ::   ldlast  ! true if this is the last time we call this routine
1859      INTEGER,          INTENT(in   ), OPTIONAL     ::   kcom
1860      !
1861      pout(:) = p_in(:)
1862   END SUBROUTINE mpp_delay_max
1863
1864   SUBROUTINE mpp_delay_rcv( kid )
1865      INTEGER,INTENT(in   )      ::  kid 
1866      WRITE(*,*) 'mpp_delay_rcv: You should not have seen this print! error?', kid
1867   END SUBROUTINE mpp_delay_rcv
1868   
1869   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1870      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1871      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
1872      STOP      ! non MPP case, just stop the run
1873   END SUBROUTINE mppstop
1874
1875   SUBROUTINE mpp_ini_znl( knum )
1876      INTEGER :: knum
1877      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1878   END SUBROUTINE mpp_ini_znl
1879
1880   SUBROUTINE mpp_comm_free( kcom )
1881      INTEGER :: kcom
1882      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1883   END SUBROUTINE mpp_comm_free
1884   
1885#endif
1886
1887   !!----------------------------------------------------------------------
1888   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1889   !!----------------------------------------------------------------------
1890
1891   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1892      &                 cd6, cd7, cd8, cd9, cd10 )
1893      !!----------------------------------------------------------------------
1894      !!                  ***  ROUTINE  stop_opa  ***
1895      !!
1896      !! ** Purpose :   print in ocean.outpput file a error message and
1897      !!                increment the error number (nstop) by one.
1898      !!----------------------------------------------------------------------
1899      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1900      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1901      !!----------------------------------------------------------------------
1902      !
1903      nstop = nstop + 1
1904
1905      ! force to open ocean.output file
1906      IF( numout == 6 ) CALL ctl_opn( numout, 'ocean.output', 'APPEND', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. )
1907       
1908      WRITE(numout,cform_err)
1909      IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1910      IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1911      IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1912      IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1913      IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1914      IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1915      IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1916      IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1917      IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1918      IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1919
1920                               CALL FLUSH(numout    )
1921      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1922      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1923      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1924      !
1925      IF( cd1 == 'STOP' ) THEN
1926         WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1927         CALL mppstop(ld_force_abort = .true.)
1928      ENDIF
1929      !
1930   END SUBROUTINE ctl_stop
1931
1932
1933   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1934      &                 cd6, cd7, cd8, cd9, cd10 )
1935      !!----------------------------------------------------------------------
1936      !!                  ***  ROUTINE  stop_warn  ***
1937      !!
1938      !! ** Purpose :   print in ocean.outpput file a error message and
1939      !!                increment the warning number (nwarn) by one.
1940      !!----------------------------------------------------------------------
1941      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1942      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1943      !!----------------------------------------------------------------------
1944      !
1945      nwarn = nwarn + 1
1946      IF(lwp) THEN
1947         WRITE(numout,cform_war)
1948         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1949         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1950         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1951         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1952         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1953         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1954         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1955         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1956         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1957         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1958      ENDIF
1959      CALL FLUSH(numout)
1960      !
1961   END SUBROUTINE ctl_warn
1962
1963
1964   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1965      !!----------------------------------------------------------------------
1966      !!                  ***  ROUTINE ctl_opn  ***
1967      !!
1968      !! ** Purpose :   Open file and check if required file is available.
1969      !!
1970      !! ** Method  :   Fortan open
1971      !!----------------------------------------------------------------------
1972      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1973      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1974      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1975      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1976      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1977      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1978      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1979      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1980      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1981      !
1982      CHARACTER(len=80) ::   clfile
1983      INTEGER           ::   iost
1984      !!----------------------------------------------------------------------
1985      !
1986      ! adapt filename
1987      ! ----------------
1988      clfile = TRIM(cdfile)
1989      IF( PRESENT( karea ) ) THEN
1990         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1991      ENDIF
1992#if defined key_agrif
1993      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1994      knum=Agrif_Get_Unit()
1995#else
1996      knum=get_unit()
1997#endif
1998      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1999      !
2000      iost=0
2001      IF( cdacce(1:6) == 'DIRECT' )  THEN         ! cdacce has always more than 6 characters
2002         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
2003      ELSE IF( TRIM(cdstat) == 'APPEND' )  THEN   ! cdstat can have less than 6 characters
2004         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
2005      ELSE
2006         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
2007      ENDIF
2008      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
2009         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
2010      IF( iost == 0 ) THEN
2011         IF(ldwp) THEN
2012            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
2013            WRITE(kout,*) '     unit   = ', knum
2014            WRITE(kout,*) '     status = ', cdstat
2015            WRITE(kout,*) '     form   = ', cdform
2016            WRITE(kout,*) '     access = ', cdacce
2017            WRITE(kout,*)
2018         ENDIF
2019      ENDIF
2020100   CONTINUE
2021      IF( iost /= 0 ) THEN
2022         IF(ldwp) THEN
2023            WRITE(kout,*)
2024            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
2025            WRITE(kout,*) ' =======   ===  '
2026            WRITE(kout,*) '           unit   = ', knum
2027            WRITE(kout,*) '           status = ', cdstat
2028            WRITE(kout,*) '           form   = ', cdform
2029            WRITE(kout,*) '           access = ', cdacce
2030            WRITE(kout,*) '           iostat = ', iost
2031            WRITE(kout,*) '           we stop. verify the file '
2032            WRITE(kout,*)
2033         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
2034            WRITE(*,*)
2035            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
2036            WRITE(*,*) ' =======   ===  '
2037            WRITE(*,*) '           unit   = ', knum
2038            WRITE(*,*) '           status = ', cdstat
2039            WRITE(*,*) '           form   = ', cdform
2040            WRITE(*,*) '           access = ', cdacce
2041            WRITE(*,*) '           iostat = ', iost
2042            WRITE(*,*) '           we stop. verify the file '
2043            WRITE(*,*)
2044         ENDIF
2045         CALL FLUSH( kout ) 
2046         CALL ctl_stop ('STOP', 'NEMO abort ctl_opn bad opening')
2047      ENDIF
2048      !
2049   END SUBROUTINE ctl_opn
2050
2051
2052   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
2053      !!----------------------------------------------------------------------
2054      !!                  ***  ROUTINE ctl_nam  ***
2055      !!
2056      !! ** Purpose :   Informations when error while reading a namelist
2057      !!
2058      !! ** Method  :   Fortan open
2059      !!----------------------------------------------------------------------
2060      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
2061      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
2062      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
2063      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
2064      !!----------------------------------------------------------------------
2065      !
2066      WRITE (clios, '(I5.0)')   kios
2067      IF( kios < 0 ) THEN         
2068         CALL ctl_warn( 'end of record or file while reading namelist '   &
2069            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2070      ENDIF
2071      !
2072      IF( kios > 0 ) THEN
2073         CALL ctl_stop( 'misspelled variable in namelist '   &
2074            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
2075      ENDIF
2076      kios = 0
2077      RETURN
2078      !
2079   END SUBROUTINE ctl_nam
2080
2081
2082   INTEGER FUNCTION get_unit()
2083      !!----------------------------------------------------------------------
2084      !!                  ***  FUNCTION  get_unit  ***
2085      !!
2086      !! ** Purpose :   return the index of an unused logical unit
2087      !!----------------------------------------------------------------------
2088      LOGICAL :: llopn
2089      !!----------------------------------------------------------------------
2090      !
2091      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
2092      llopn = .TRUE.
2093      DO WHILE( (get_unit < 998) .AND. llopn )
2094         get_unit = get_unit + 1
2095         INQUIRE( unit = get_unit, opened = llopn )
2096      END DO
2097      IF( (get_unit == 999) .AND. llopn ) THEN
2098         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
2099         get_unit = -1
2100      ENDIF
2101      !
2102   END FUNCTION get_unit
2103
2104   !!----------------------------------------------------------------------
2105END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.