New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC – NEMO

source: NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC/lib_mpp.F90 @ 10337

Last change on this file since 10337 was 10337, checked in by smasson, 5 years ago

dev_r10164_HPC09_ESIWACE_PREP_MERGE: action 3d: force safe NPF (2 lines update) for field recieved from oasis, see #2133

  • Property svn:keywords set to Id
File size: 78.8 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
[2715]35   !!----------------------------------------------------------------------
[3764]36#if   defined key_mpp_mpi
[13]37   !!----------------------------------------------------------------------
[1344]38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
[2715]40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]44   !!   mpprecv       :
[9019]45   !!   mppsend       :
[2715]46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
[1344]55   !!   mpp_ini_north : initialisation of north fold
[9019]56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[13]57   !!----------------------------------------------------------------------
[3764]58   USE dom_oce        ! ocean space and time domain
[2715]59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
[3]61
[13]62   IMPLICIT NONE
[415]63   PRIVATE
[9019]64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d      , mpp_nfd_3d      , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d      , mpp_lnk_3d      , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
[4147]79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
[1344]80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
[9019]81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
[1344]84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10292]85   PUBLIC   mpp_ilor
[3294]86   PUBLIC   mppscatter, mppgather
[10180]87   PUBLIC   mpp_ini_znl
[3764]88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[9890]89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
[5429]90   
[13]91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
[1344]93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
[681]99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]100   END INTERFACE
101   INTERFACE mpp_sum
[6140]102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
[9019]103         &             mppsum_realdd, mppsum_a_realdd
[13]104   END INTERFACE
[1344]105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
[6490]111
[51]112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
[1629]115!$AGRIF_DO_NOT_TREAT
[2004]116   INCLUDE 'mpif.h'
[1629]117!$AGRIF_END_DO_NOT_TREAT
[3764]118
[1344]119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[3]120
[1344]121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]122
[10330]123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]125!$AGRIF_DO_NOT_TREAT
[9570]126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]127!$AGRIF_END_DO_NOT_TREAT
[3]128
[2480]129   INTEGER :: MPI_SUMDD
[1976]130
[1345]131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]137
[3764]138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]147
[1344]148   ! Type of send : standard, buffered, immediate
[9019]149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
[3764]152
[10170]153   ! Communications summary report
[10297]154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm  calling routines
[10170]156   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
[10297]157   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
158   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
[10314]159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
[10170]160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10314]161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 2000          !: max number of communication record
[10297]162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
[10337]165   INTEGER, PUBLIC                               ::   l_full_nf_update = .FALSE.   !: logical for a full (2lines) update of bc at North fold report
[10297]166
[10172]167   ! timing summary report
[10300]168   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
169   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
[10172]170   
[9019]171   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]172
[9019]173   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
174   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
175
[51]176   !!----------------------------------------------------------------------
[9598]177   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]178   !! $Id$
[10068]179   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]180   !!----------------------------------------------------------------------
[3]181CONTAINS
182
[9019]183   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
[2715]184      !!----------------------------------------------------------------------
[51]185      !!                  ***  routine mynode  ***
[3764]186      !!
[51]187      !! ** Purpose :   Find processor unit
188      !!----------------------------------------------------------------------
[6140]189      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
190      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
191      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
192      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
193      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
194      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
195      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]196      !
[4147]197      INTEGER ::   mynode, ierr, code, ji, ii, ios
[532]198      LOGICAL ::   mpi_was_called
[2715]199      !
[3294]200      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
[51]201      !!----------------------------------------------------------------------
[1344]202      !
[2481]203      ii = 1
[6140]204      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
205      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
206      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
[1344]207      !
[4147]208      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
209      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
[9168]210901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
[9019]211      !
[4147]212      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
213      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
[9168]214902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
[9019]215      !
[1344]216      !                              ! control print
[6140]217      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
218      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
219      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
[9019]220      !
221      IF( jpni < 1 .OR. jpnj < 1  ) THEN
[6140]222         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
[2715]223      ELSE
[6140]224         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
225         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
[9019]226      ENDIF
[2715]227
[3294]228      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
229
[2480]230      CALL mpi_initialized ( mpi_was_called, code )
231      IF( code /= MPI_SUCCESS ) THEN
[3764]232         DO ji = 1, SIZE(ldtxt)
[2481]233            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
[3764]234         END DO
[2480]235         WRITE(*, cform_err)
236         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
237         CALL mpi_abort( mpi_comm_world, code, ierr )
238      ENDIF
[415]239
[2480]240      IF( mpi_was_called ) THEN
241         !
242         SELECT CASE ( cn_mpi_send )
243         CASE ( 'S' )                ! Standard mpi send (blocking)
[6140]244            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
[2480]245         CASE ( 'B' )                ! Buffer mpi send (blocking)
[6140]246            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
[9570]247            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
[2480]248         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[6140]249            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
[2480]250            l_isend = .TRUE.
251         CASE DEFAULT
[6140]252            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
253            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
[2715]254            kstop = kstop + 1
[2480]255         END SELECT
[9019]256         !
257      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
[6140]258         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
259         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
[2715]260         kstop = kstop + 1
[532]261      ELSE
[1601]262         SELECT CASE ( cn_mpi_send )
[524]263         CASE ( 'S' )                ! Standard mpi send (blocking)
[6140]264            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
[2480]265            CALL mpi_init( ierr )
[524]266         CASE ( 'B' )                ! Buffer mpi send (blocking)
[6140]267            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
[9570]268            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
[524]269         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[6140]270            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
[524]271            l_isend = .TRUE.
[2480]272            CALL mpi_init( ierr )
[524]273         CASE DEFAULT
[6140]274            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
275            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
[2715]276            kstop = kstop + 1
[524]277         END SELECT
[2480]278         !
[415]279      ENDIF
[570]280
[3764]281      IF( PRESENT(localComm) ) THEN
[2480]282         IF( Agrif_Root() ) THEN
[9570]283            mpi_comm_oce = localComm
[2480]284         ENDIF
285      ELSE
[9570]286         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
[2480]287         IF( code /= MPI_SUCCESS ) THEN
[3764]288            DO ji = 1, SIZE(ldtxt)
[2481]289               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
290            END DO
[2480]291            WRITE(*, cform_err)
292            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
293            CALL mpi_abort( mpi_comm_world, code, ierr )
294         ENDIF
[3764]295      ENDIF
[2480]296
[5656]297#if defined key_agrif
[9019]298      IF( Agrif_Root() ) THEN
[9570]299         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]300      ELSE
[9570]301         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]302      ENDIF
303#endif
304
[9570]305      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
306      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[629]307      mynode = mpprank
[4624]308
309      IF( mynode == 0 ) THEN
[5407]310         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
311         WRITE(kumond, nammpp)     
[4624]312      ENDIF
[3764]313      !
[1976]314      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
315      !
[51]316   END FUNCTION mynode
[3]317
[9019]318   !!----------------------------------------------------------------------
319   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
320   !!
321   !!   * Argument : dummy argument use in mpp_lnk_... routines
322   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
323   !!                cd_nat :   nature of array grid-points
324   !!                psgn   :   sign used across the north fold boundary
325   !!                kfld   :   optional, number of pt3d arrays
326   !!                cd_mpp :   optional, fill the overlap area only
327   !!                pval   :   optional, background value (used at closed boundaries)
328   !!----------------------------------------------------------------------
329   !
330   !                       !==  2D array and array of 2D pointer  ==!
331   !
332#  define DIM_2d
333#     define ROUTINE_LNK           mpp_lnk_2d
334#     include "mpp_lnk_generic.h90"
335#     undef ROUTINE_LNK
336#     define MULTI
337#     define ROUTINE_LNK           mpp_lnk_2d_ptr
338#     include "mpp_lnk_generic.h90"
339#     undef ROUTINE_LNK
340#     undef MULTI
341#  undef DIM_2d
342   !
343   !                       !==  3D array and array of 3D pointer  ==!
344   !
345#  define DIM_3d
346#     define ROUTINE_LNK           mpp_lnk_3d
347#     include "mpp_lnk_generic.h90"
348#     undef ROUTINE_LNK
349#     define MULTI
350#     define ROUTINE_LNK           mpp_lnk_3d_ptr
351#     include "mpp_lnk_generic.h90"
352#     undef ROUTINE_LNK
353#     undef MULTI
354#  undef DIM_3d
355   !
356   !                       !==  4D array and array of 4D pointer  ==!
357   !
358#  define DIM_4d
359#     define ROUTINE_LNK           mpp_lnk_4d
360#     include "mpp_lnk_generic.h90"
361#     undef ROUTINE_LNK
362#     define MULTI
363#     define ROUTINE_LNK           mpp_lnk_4d_ptr
364#     include "mpp_lnk_generic.h90"
365#     undef ROUTINE_LNK
366#     undef MULTI
367#  undef DIM_4d
[6140]368
[9019]369   !!----------------------------------------------------------------------
370   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
371   !!
372   !!   * Argument : dummy argument use in mpp_nfd_... routines
373   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
374   !!                cd_nat :   nature of array grid-points
375   !!                psgn   :   sign used across the north fold boundary
376   !!                kfld   :   optional, number of pt3d arrays
377   !!                cd_mpp :   optional, fill the overlap area only
378   !!                pval   :   optional, background value (used at closed boundaries)
379   !!----------------------------------------------------------------------
380   !
381   !                       !==  2D array and array of 2D pointer  ==!
382   !
383#  define DIM_2d
384#     define ROUTINE_NFD           mpp_nfd_2d
385#     include "mpp_nfd_generic.h90"
386#     undef ROUTINE_NFD
387#     define MULTI
388#     define ROUTINE_NFD           mpp_nfd_2d_ptr
389#     include "mpp_nfd_generic.h90"
390#     undef ROUTINE_NFD
391#     undef MULTI
392#  undef DIM_2d
393   !
394   !                       !==  3D array and array of 3D pointer  ==!
395   !
396#  define DIM_3d
397#     define ROUTINE_NFD           mpp_nfd_3d
398#     include "mpp_nfd_generic.h90"
399#     undef ROUTINE_NFD
400#     define MULTI
401#     define ROUTINE_NFD           mpp_nfd_3d_ptr
402#     include "mpp_nfd_generic.h90"
403#     undef ROUTINE_NFD
404#     undef MULTI
405#  undef DIM_3d
406   !
407   !                       !==  4D array and array of 4D pointer  ==!
408   !
409#  define DIM_4d
410#     define ROUTINE_NFD           mpp_nfd_4d
411#     include "mpp_nfd_generic.h90"
412#     undef ROUTINE_NFD
413#     define MULTI
414#     define ROUTINE_NFD           mpp_nfd_4d_ptr
415#     include "mpp_nfd_generic.h90"
416#     undef ROUTINE_NFD
417#     undef MULTI
418#  undef DIM_4d
[3]419
[1344]420
[9019]421   !!----------------------------------------------------------------------
422   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
423   !!
424   !!   * Argument : dummy argument use in mpp_lnk_... routines
425   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
426   !!                cd_nat :   nature of array grid-points
427   !!                psgn   :   sign used across the north fold boundary
428   !!                kb_bdy :   BDY boundary set
429   !!                kfld   :   optional, number of pt3d arrays
430   !!----------------------------------------------------------------------
431   !
432   !                       !==  2D array and array of 2D pointer  ==!
433   !
434#  define DIM_2d
435#     define ROUTINE_BDY           mpp_lnk_bdy_2d
436#     include "mpp_bdy_generic.h90"
437#     undef ROUTINE_BDY
438#  undef DIM_2d
439   !
440   !                       !==  3D array and array of 3D pointer  ==!
441   !
442#  define DIM_3d
443#     define ROUTINE_BDY           mpp_lnk_bdy_3d
444#     include "mpp_bdy_generic.h90"
445#     undef ROUTINE_BDY
446#  undef DIM_3d
447   !
448   !                       !==  4D array and array of 4D pointer  ==!
449   !
[9890]450#  define DIM_4d
451#     define ROUTINE_BDY           mpp_lnk_bdy_4d
452#     include "mpp_bdy_generic.h90"
453#     undef ROUTINE_BDY
454#  undef DIM_4d
[3]455
[9019]456   !!----------------------------------------------------------------------
457   !!
458   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
[5429]459   
460   
[9019]461   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
[5429]462   
[9019]463   
464   !!----------------------------------------------------------------------
[5429]465
466
[888]467
[1344]468   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]469      !!----------------------------------------------------------------------
470      !!                  ***  routine mppsend  ***
[3764]471      !!
[51]472      !! ** Purpose :   Send messag passing array
473      !!
474      !!----------------------------------------------------------------------
[1344]475      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
476      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
477      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
478      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
479      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
480      !!
481      INTEGER ::   iflag
[51]482      !!----------------------------------------------------------------------
[1344]483      !
[1601]484      SELECT CASE ( cn_mpi_send )
[300]485      CASE ( 'S' )                ! Standard mpi send (blocking)
[9570]486         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
[300]487      CASE ( 'B' )                ! Buffer mpi send (blocking)
[9570]488         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
[300]489      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[1344]490         ! be carefull, one more argument here : the mpi request identifier..
[9570]491         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
[300]492      END SELECT
[1344]493      !
[51]494   END SUBROUTINE mppsend
[3]495
496
[3294]497   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]498      !!----------------------------------------------------------------------
499      !!                  ***  routine mpprecv  ***
500      !!
501      !! ** Purpose :   Receive messag passing array
502      !!
503      !!----------------------------------------------------------------------
[1344]504      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
505      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
506      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]507      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]508      !!
[51]509      INTEGER :: istatus(mpi_status_size)
510      INTEGER :: iflag
[3294]511      INTEGER :: use_source
[1344]512      !!----------------------------------------------------------------------
513      !
[3764]514      ! If a specific process number has been passed to the receive call,
[3294]515      ! use that one. Default is to use mpi_any_source
[6140]516      use_source = mpi_any_source
517      IF( PRESENT(ksource) )   use_source = ksource
518      !
[9570]519      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[1344]520      !
[51]521   END SUBROUTINE mpprecv
[3]522
523
[51]524   SUBROUTINE mppgather( ptab, kp, pio )
525      !!----------------------------------------------------------------------
526      !!                   ***  routine mppgather  ***
[3764]527      !!
528      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]529      !!     array which is distributed following the vertical level.
530      !!
[1344]531      !!----------------------------------------------------------------------
[6140]532      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
533      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]534      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]535      !!
[1344]536      INTEGER :: itaille, ierror   ! temporary integer
[51]537      !!---------------------------------------------------------------------
[1344]538      !
539      itaille = jpi * jpj
540      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]541         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[1344]542      !
[51]543   END SUBROUTINE mppgather
[3]544
545
[51]546   SUBROUTINE mppscatter( pio, kp, ptab )
547      !!----------------------------------------------------------------------
548      !!                  ***  routine mppscatter  ***
549      !!
[3764]550      !! ** Purpose :   Transfert between awork array which is distributed
[51]551      !!      following the vertical level and the local subdomain array.
552      !!
553      !!----------------------------------------------------------------------
[6140]554      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
555      INTEGER                             ::   kp     ! Tag (not used with MPI
556      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]557      !!
558      INTEGER :: itaille, ierror   ! temporary integer
[51]559      !!---------------------------------------------------------------------
[1344]560      !
[6140]561      itaille = jpi * jpj
[1344]562      !
563      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]564         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[1344]565      !
[51]566   END SUBROUTINE mppscatter
[3]567
[9019]568   !!
[10292]569   SUBROUTINE mpp_ilor( ld_switch, ldlast, kcom )
570      ! WARNING: must be used only once (by ice_dyn_adv_umx) because ll_switch and ireq are SAVE
571      !!----------------------------------------------------------------------
572      LOGICAL, INTENT(inout), DIMENSION(2) ::   ld_switch
573      LOGICAL, INTENT(in   ), OPTIONAL     ::   ldlast
574      INTEGER, INTENT(in   ), OPTIONAL     ::   kcom 
575      INTEGER  ::   ierror, ilocalcomm
576      LOGICAL, SAVE ::   ll_switch 
577      INTEGER, SAVE ::   ireq = -1
578      !!----------------------------------------------------------------------
579      ilocalcomm = mpi_comm_oce
580      IF( PRESENT(kcom) )   ilocalcomm = kcom
581     
582      IF ( ireq /= -1 ) THEN   ! get ld_switch(2) from ll_switch (from previous call)
[10300]583         IF( ln_timing ) CALL tic_tac(.TRUE., ld_global = .TRUE.)
[10292]584         CALL mpi_wait(ireq, MPI_STATUS_IGNORE, ierror )
585         ld_switch(2) = ll_switch
[10300]586         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[10292]587      ENDIF
588      IF( .NOT. ldlast ) &     ! send ll_switch to be received on next call
589         CALL mpi_iallreduce( ld_switch(1), ll_switch, 1, MPI_LOGICAL, mpi_lor, ilocalcomm, ireq, ierror )
[681]590
[10292]591   END SUBROUTINE mpp_ilor
[10297]592   
[9019]593   !!----------------------------------------------------------------------
[10297]594   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
595   !!   
596   !!----------------------------------------------------------------------
597   !!
598#  define OPERATION_MAX
599#  define INTEGER_TYPE
600#  define DIM_0d
601#     define ROUTINE_ALLREDUCE           mppmax_int
602#     include "mpp_allreduce_generic.h90"
603#     undef ROUTINE_ALLREDUCE
604#  undef DIM_0d
605#  define DIM_1d
606#     define ROUTINE_ALLREDUCE           mppmax_a_int
607#     include "mpp_allreduce_generic.h90"
608#     undef ROUTINE_ALLREDUCE
609#  undef DIM_1d
610#  undef INTEGER_TYPE
611!
612#  define REAL_TYPE
613#  define DIM_0d
614#     define ROUTINE_ALLREDUCE           mppmax_real
615#     include "mpp_allreduce_generic.h90"
616#     undef ROUTINE_ALLREDUCE
617#  undef DIM_0d
618#  define DIM_1d
619#     define ROUTINE_ALLREDUCE           mppmax_a_real
620#     include "mpp_allreduce_generic.h90"
621#     undef ROUTINE_ALLREDUCE
622#  undef DIM_1d
623#  undef REAL_TYPE
624#  undef OPERATION_MAX
625   !!----------------------------------------------------------------------
[9019]626   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
627   !!   
628   !!----------------------------------------------------------------------
629   !!
[10297]630#  define OPERATION_MIN
631#  define INTEGER_TYPE
632#  define DIM_0d
633#     define ROUTINE_ALLREDUCE           mppmin_int
634#     include "mpp_allreduce_generic.h90"
635#     undef ROUTINE_ALLREDUCE
636#  undef DIM_0d
637#  define DIM_1d
638#     define ROUTINE_ALLREDUCE           mppmin_a_int
639#     include "mpp_allreduce_generic.h90"
640#     undef ROUTINE_ALLREDUCE
641#  undef DIM_1d
642#  undef INTEGER_TYPE
643!
644#  define REAL_TYPE
645#  define DIM_0d
646#     define ROUTINE_ALLREDUCE           mppmin_real
647#     include "mpp_allreduce_generic.h90"
648#     undef ROUTINE_ALLREDUCE
649#  undef DIM_0d
650#  define DIM_1d
651#     define ROUTINE_ALLREDUCE           mppmin_a_real
652#     include "mpp_allreduce_generic.h90"
653#     undef ROUTINE_ALLREDUCE
654#  undef DIM_1d
655#  undef REAL_TYPE
656#  undef OPERATION_MIN
[3]657
[9019]658   !!----------------------------------------------------------------------
659   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
660   !!   
661   !!   Global sum of 1D array or a variable (integer, real or complex)
662   !!----------------------------------------------------------------------
663   !!
[10297]664#  define OPERATION_SUM
665#  define INTEGER_TYPE
666#  define DIM_0d
667#     define ROUTINE_ALLREDUCE           mppsum_int
668#     include "mpp_allreduce_generic.h90"
669#     undef ROUTINE_ALLREDUCE
670#  undef DIM_0d
671#  define DIM_1d
672#     define ROUTINE_ALLREDUCE           mppsum_a_int
673#     include "mpp_allreduce_generic.h90"
674#     undef ROUTINE_ALLREDUCE
675#  undef DIM_1d
676#  undef INTEGER_TYPE
677!
678#  define REAL_TYPE
679#  define DIM_0d
680#     define ROUTINE_ALLREDUCE           mppsum_real
681#     include "mpp_allreduce_generic.h90"
682#     undef ROUTINE_ALLREDUCE
683#  undef DIM_0d
684#  define DIM_1d
685#     define ROUTINE_ALLREDUCE           mppsum_a_real
686#     include "mpp_allreduce_generic.h90"
687#     undef ROUTINE_ALLREDUCE
688#  undef DIM_1d
689#  undef REAL_TYPE
690#  undef OPERATION_SUM
[3764]691
[10297]692#  define OPERATION_SUM_DD
693#  define COMPLEX_TYPE
694#  define DIM_0d
695#     define ROUTINE_ALLREDUCE           mppsum_realdd
696#     include "mpp_allreduce_generic.h90"
697#     undef ROUTINE_ALLREDUCE
698#  undef DIM_0d
699#  define DIM_1d
700#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
701#     include "mpp_allreduce_generic.h90"
702#     undef ROUTINE_ALLREDUCE
703#  undef DIM_1d
704#  undef COMPLEX_TYPE
705#  undef OPERATION_SUM_DD
706
[10314]707   !!----------------------------------------------------------------------
708   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
709   !!   
710   !!----------------------------------------------------------------------
711   !!
712#  define OPERATION_MINLOC
713#  define DIM_2d
714#     define ROUTINE_LOC           mpp_minloc2d
715#     include "mpp_loc_generic.h90"
716#     undef ROUTINE_LOC
717#  undef DIM_2d
718#  define DIM_3d
719#     define ROUTINE_LOC           mpp_minloc3d
720#     include "mpp_loc_generic.h90"
721#     undef ROUTINE_LOC
722#  undef DIM_3d
723#  undef OPERATION_MINLOC
[6140]724
[10314]725#  define OPERATION_MAXLOC
726#  define DIM_2d
727#     define ROUTINE_LOC           mpp_maxloc2d
728#     include "mpp_loc_generic.h90"
729#     undef ROUTINE_LOC
730#  undef DIM_2d
731#  define DIM_3d
732#     define ROUTINE_LOC           mpp_maxloc3d
733#     include "mpp_loc_generic.h90"
734#     undef ROUTINE_LOC
735#  undef DIM_3d
736#  undef OPERATION_MAXLOC
[9019]737
[1344]738   SUBROUTINE mppsync()
739      !!----------------------------------------------------------------------
740      !!                  ***  routine mppsync  ***
[3764]741      !!
[1344]742      !! ** Purpose :   Massively parallel processors, synchroneous
743      !!
744      !!-----------------------------------------------------------------------
745      INTEGER :: ierror
746      !!-----------------------------------------------------------------------
747      !
[9570]748      CALL mpi_barrier( mpi_comm_oce, ierror )
[1344]749      !
750   END SUBROUTINE mppsync
[3]751
752
[10330]753   SUBROUTINE mppstop( ldfinal ) 
[1344]754      !!----------------------------------------------------------------------
755      !!                  ***  routine mppstop  ***
[3764]756      !!
[3294]757      !! ** purpose :   Stop massively parallel processors method
[1344]758      !!
759      !!----------------------------------------------------------------------
[10330]760      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
761      LOGICAL ::   llfinal
[1344]762      INTEGER ::   info
763      !!----------------------------------------------------------------------
764      !
765      CALL mppsync
766      CALL mpi_finalize( info )
[10330]767      llfinal = .FALSE.
768      IF( PRESENT(ldfinal) ) llfinal = ldfinal
769      IF( .NOT. llfinal ) STOP 123456
[1344]770      !
771   END SUBROUTINE mppstop
[3]772
773
[1344]774   SUBROUTINE mpp_comm_free( kcom )
775      !!----------------------------------------------------------------------
776      INTEGER, INTENT(in) ::   kcom
777      !!
778      INTEGER :: ierr
779      !!----------------------------------------------------------------------
780      !
781      CALL MPI_COMM_FREE(kcom, ierr)
782      !
783   END SUBROUTINE mpp_comm_free
[3]784
[869]785
[2715]786   SUBROUTINE mpp_ini_znl( kumout )
[1345]787      !!----------------------------------------------------------------------
788      !!               ***  routine mpp_ini_znl  ***
789      !!
790      !! ** Purpose :   Initialize special communicator for computing zonal sum
791      !!
792      !! ** Method  : - Look for processors in the same row
793      !!              - Put their number in nrank_znl
794      !!              - Create group for the znl processors
795      !!              - Create a communicator for znl processors
796      !!              - Determine if processor should write znl files
797      !!
798      !! ** output
799      !!      ndim_rank_znl = number of processors on the same row
800      !!      ngrp_znl = group ID for the znl processors
801      !!      ncomm_znl = communicator for the ice procs.
802      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
803      !!
804      !!----------------------------------------------------------------------
[2715]805      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]806      !
[2715]807      INTEGER :: jproc      ! dummy loop integer
808      INTEGER :: ierr, ii   ! local integer
809      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
810      !!----------------------------------------------------------------------
[1345]811      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
812      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]813      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]814      !
[2715]815      ALLOCATE( kwork(jpnij), STAT=ierr )
816      IF( ierr /= 0 ) THEN
817         WRITE(kumout, cform_err)
818         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
819         CALL mppstop
820      ENDIF
821
822      IF( jpnj == 1 ) THEN
[1345]823         ngrp_znl  = ngrp_world
[9570]824         ncomm_znl = mpi_comm_oce
[1345]825      ELSE
826         !
[9570]827         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]828         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
829         !-$$        CALL flush(numout)
830         !
831         ! Count number of processors on the same row
832         ndim_rank_znl = 0
833         DO jproc=1,jpnij
834            IF ( kwork(jproc) == njmpp ) THEN
835               ndim_rank_znl = ndim_rank_znl + 1
836            ENDIF
837         END DO
838         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
839         !-$$        CALL flush(numout)
840         ! Allocate the right size to nrank_znl
[1441]841         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]842         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]843         ii = 0
[1345]844         nrank_znl (:) = 0
845         DO jproc=1,jpnij
846            IF ( kwork(jproc) == njmpp) THEN
847               ii = ii + 1
[3764]848               nrank_znl(ii) = jproc -1
[1345]849            ENDIF
850         END DO
851         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
852         !-$$        CALL flush(numout)
853
854         ! Create the opa group
[9570]855         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]856         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
857         !-$$        CALL flush(numout)
858
859         ! Create the znl group from the opa group
860         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
861         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
862         !-$$        CALL flush(numout)
863
864         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]865         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]866         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
867         !-$$        CALL flush(numout)
868         !
869      END IF
870
871      ! Determines if processor if the first (starting from i=1) on the row
[3764]872      IF ( jpni == 1 ) THEN
[1345]873         l_znl_root = .TRUE.
874      ELSE
875         l_znl_root = .FALSE.
876         kwork (1) = nimpp
[10297]877         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]878         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
879      END IF
880
[2715]881      DEALLOCATE(kwork)
882
[1345]883   END SUBROUTINE mpp_ini_znl
884
885
[1344]886   SUBROUTINE mpp_ini_north
887      !!----------------------------------------------------------------------
888      !!               ***  routine mpp_ini_north  ***
889      !!
[3764]890      !! ** Purpose :   Initialize special communicator for north folding
[1344]891      !!      condition together with global variables needed in the mpp folding
892      !!
893      !! ** Method  : - Look for northern processors
894      !!              - Put their number in nrank_north
895      !!              - Create groups for the world processors and the north processors
896      !!              - Create a communicator for northern processors
897      !!
898      !! ** output
899      !!      njmppmax = njmpp for northern procs
900      !!      ndim_rank_north = number of processors in the northern line
901      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
902      !!      ngrp_world = group ID for the world processors
903      !!      ngrp_north = group ID for the northern processors
904      !!      ncomm_north = communicator for the northern procs.
905      !!      north_root = number (in the world) of proc 0 in the northern comm.
906      !!
907      !!----------------------------------------------------------------------
908      INTEGER ::   ierr
909      INTEGER ::   jjproc
910      INTEGER ::   ii, ji
911      !!----------------------------------------------------------------------
912      !
913      njmppmax = MAXVAL( njmppt )
914      !
915      ! Look for how many procs on the northern boundary
916      ndim_rank_north = 0
917      DO jjproc = 1, jpnij
918         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
919      END DO
920      !
921      ! Allocate the right size to nrank_north
[1441]922      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]923      ALLOCATE( nrank_north(ndim_rank_north) )
[869]924
[1344]925      ! Fill the nrank_north array with proc. number of northern procs.
926      ! Note : the rank start at 0 in MPI
927      ii = 0
928      DO ji = 1, jpnij
929         IF ( njmppt(ji) == njmppmax   ) THEN
930            ii=ii+1
931            nrank_north(ii)=ji-1
932         END IF
933      END DO
934      !
935      ! create the world group
[9570]936      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]937      !
938      ! Create the North group from the world group
939      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
940      !
941      ! Create the North communicator , ie the pool of procs in the north group
[9570]942      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]943      !
944   END SUBROUTINE mpp_ini_north
[869]945
946
[9570]947   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
[1344]948      !!---------------------------------------------------------------------
949      !!                   ***  routine mpp_init.opa  ***
950      !!
951      !! ** Purpose :: export and attach a MPI buffer for bsend
952      !!
953      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
954      !!            but classical mpi_init
[3764]955      !!
956      !! History :: 01/11 :: IDRIS initial version for IBM only
[1344]957      !!            08/04 :: R. Benshila, generalisation
958      !!---------------------------------------------------------------------
[3764]959      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
[2481]960      INTEGER                      , INTENT(inout) ::   ksft
961      INTEGER                      , INTENT(  out) ::   code
962      INTEGER                                      ::   ierr, ji
963      LOGICAL                                      ::   mpi_was_called
[1344]964      !!---------------------------------------------------------------------
965      !
966      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
[532]967      IF ( code /= MPI_SUCCESS ) THEN
[3764]968         DO ji = 1, SIZE(ldtxt)
[2481]969            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
[3764]970         END DO
[2481]971         WRITE(*, cform_err)
972         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
[1344]973         CALL mpi_abort( mpi_comm_world, code, ierr )
[532]974      ENDIF
[1344]975      !
976      IF( .NOT. mpi_was_called ) THEN
977         CALL mpi_init( code )
[9570]978         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
[532]979         IF ( code /= MPI_SUCCESS ) THEN
[3764]980            DO ji = 1, SIZE(ldtxt)
[2481]981               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
982            END DO
983            WRITE(*, cform_err)
984            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
[532]985            CALL mpi_abort( mpi_comm_world, code, ierr )
986         ENDIF
987      ENDIF
[1344]988      !
[897]989      IF( nn_buffer > 0 ) THEN
[2481]990         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
[897]991         ! Buffer allocation and attachment
[2481]992         ALLOCATE( tampon(nn_buffer), stat = ierr )
[3764]993         IF( ierr /= 0 ) THEN
994            DO ji = 1, SIZE(ldtxt)
[2481]995               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
996            END DO
997            WRITE(*, cform_err)
998            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
999            CALL mpi_abort( mpi_comm_world, code, ierr )
1000         END IF
1001         CALL mpi_buffer_attach( tampon, nn_buffer, code )
[897]1002      ENDIF
[1344]1003      !
[9570]1004   END SUBROUTINE mpi_init_oce
[3]1005
[9019]1006
1007   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]1008      !!---------------------------------------------------------------------
1009      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1010      !!
1011      !!   Modification of original codes written by David H. Bailey
1012      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1013      !!---------------------------------------------------------------------
[9019]1014      INTEGER                     , INTENT(in)    ::   ilen, itype
1015      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1016      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]1017      !
1018      REAL(wp) :: zerr, zt1, zt2    ! local work variables
[9019]1019      INTEGER  :: ji, ztmp           ! local scalar
1020      !!---------------------------------------------------------------------
1021      !
[1976]1022      ztmp = itype   ! avoid compilation warning
[9019]1023      !
[1976]1024      DO ji=1,ilen
1025      ! Compute ydda + yddb using Knuth's trick.
1026         zt1  = real(ydda(ji)) + real(yddb(ji))
1027         zerr = zt1 - real(ydda(ji))
1028         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1029                + aimag(ydda(ji)) + aimag(yddb(ji))
1030
1031         ! The result is zt1 + zt2, after normalization.
1032         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1033      END DO
[9019]1034      !
[1976]1035   END SUBROUTINE DDPDD_MPI
1036
[6140]1037
[9019]1038   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
[4990]1039      !!---------------------------------------------------------------------
1040      !!                   ***  routine mpp_lbc_north_icb  ***
1041      !!
1042      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1043      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1044      !!              array with outer extra halo
1045      !!
1046      !! ** Method  :   North fold condition and mpp with more than one proc
1047      !!              in i-direction require a specific treatment. We gather
[9019]1048      !!              the 4+kextj northern lines of the global domain on 1
[4990]1049      !!              processor and apply lbc north-fold on this sub array.
1050      !!              Then we scatter the north fold array back to the processors.
[9019]1051      !!              This routine accounts for an extra halo with icebergs
1052      !!              and assumes ghost rows and columns have been suppressed.
[4990]1053      !!
1054      !!----------------------------------------------------------------------
1055      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1056      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1057      !                                                     !   = T ,  U , V , F or W -points
1058      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1059      !!                                                    ! north fold, =  1. otherwise
[9019]1060      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
[6140]1061      !
[4990]1062      INTEGER ::   ji, jj, jr
1063      INTEGER ::   ierr, itaille, ildi, ilei, iilb
[9019]1064      INTEGER ::   ipj, ij, iproc
[4990]1065      !
1066      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1067      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1068      !!----------------------------------------------------------------------
1069      !
[9019]1070      ipj=4
[9467]1071      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1072     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1073     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
[4990]1074      !
[9019]1075      ztab_e(:,:)      = 0._wp
1076      znorthloc_e(:,:) = 0._wp
[6140]1077      !
[9467]1078      ij = 1 - kextj
1079      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1080      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1081         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
[4990]1082         ij = ij + 1
1083      END DO
1084      !
[9467]1085      itaille = jpimax * ( ipj + 2*kextj )
[10314]1086      !
1087      IF( ln_timing ) CALL tic_tac(.TRUE.)
[9467]1088      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1089         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1090         &                ncomm_north, ierr )
[4990]1091      !
[10314]1092      IF( ln_timing ) CALL tic_tac(.FALSE.)
1093      !
[4990]1094      DO jr = 1, ndim_rank_north            ! recover the global north array
1095         iproc = nrank_north(jr) + 1
1096         ildi = nldit (iproc)
1097         ilei = nleit (iproc)
1098         iilb = nimppt(iproc)
[9467]1099         DO jj = 1-kextj, ipj+kextj
[4990]1100            DO ji = ildi, ilei
1101               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1102            END DO
1103         END DO
1104      END DO
1105
1106      ! 2. North-Fold boundary conditions
1107      ! ----------------------------------
[9467]1108      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
[4990]1109
[9467]1110      ij = 1 - kextj
[4990]1111      !! Scatter back to pt2d
[9467]1112      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
[9019]1113         DO ji= 1, jpi
[4990]1114            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1115         END DO
[9467]1116         ij  = ij +1
[4990]1117      END DO
1118      !
1119      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1120      !
1121   END SUBROUTINE mpp_lbc_north_icb
1122
[6140]1123
[10314]1124   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
[4990]1125      !!----------------------------------------------------------------------
1126      !!                  ***  routine mpp_lnk_2d_icb  ***
1127      !!
[9019]1128      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1129      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1130      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
[4990]1131      !!
1132      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1133      !!      between processors following neighboring subdomains.
1134      !!            domain parameters
[9019]1135      !!                    jpi    : first dimension of the local subdomain
1136      !!                    jpj    : second dimension of the local subdomain
1137      !!                    kexti  : number of columns for extra outer halo
1138      !!                    kextj  : number of rows for extra outer halo
[4990]1139      !!                    nbondi : mark for "east-west local boundary"
1140      !!                    nbondj : mark for "north-south local boundary"
1141      !!                    noea   : number for local neighboring processors
1142      !!                    nowe   : number for local neighboring processors
1143      !!                    noso   : number for local neighboring processors
1144      !!                    nono   : number for local neighboring processors
1145      !!----------------------------------------------------------------------
[10314]1146      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
[9019]1147      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1148      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1149      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1150      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1151      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1152      !
[4990]1153      INTEGER  ::   jl   ! dummy loop indices
[9019]1154      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1155      INTEGER  ::   ipreci, iprecj             !   -       -
[4990]1156      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1157      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1158      !!
[9019]1159      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1160      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
[4990]1161      !!----------------------------------------------------------------------
1162
[9019]1163      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1164      iprecj = nn_hls + kextj
[4990]1165
[10314]1166      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
[4990]1167
1168      ! 1. standard boundary treatment
1169      ! ------------------------------
1170      ! Order matters Here !!!!
1171      !
1172      !                                      ! East-West boundaries
1173      !                                           !* Cyclic east-west
[9667]1174      IF( l_Iperio ) THEN
1175         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1176         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
[4990]1177         !
1178      ELSE                                        !* closed
[9667]1179         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1180                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
[4990]1181      ENDIF
[9667]1182      !                                      ! North-South boundaries
1183      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1184         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1185         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1186      ELSE                                        !* closed
1187         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1188                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1189      ENDIF
[4990]1190      !
1191
1192      ! north fold treatment
1193      ! -----------------------
1194      IF( npolj /= 0 ) THEN
1195         !
1196         SELECT CASE ( jpni )
[9019]1197                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1198                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
[4990]1199         END SELECT
1200         !
1201      ENDIF
1202
1203      ! 2. East and west directions exchange
1204      ! ------------------------------------
1205      ! we play with the neigbours AND the row number because of the periodicity
1206      !
1207      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1208      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
[9019]1209         iihom = jpi-nreci-kexti
[4990]1210         DO jl = 1, ipreci
[9019]1211            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
[4990]1212            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1213         END DO
1214      END SELECT
1215      !
1216      !                           ! Migrations
[9019]1217      imigr = ipreci * ( jpj + 2*kextj )
[4990]1218      !
[10314]1219      IF( ln_timing ) CALL tic_tac(.TRUE.)
1220      !
[4990]1221      SELECT CASE ( nbondi )
1222      CASE ( -1 )
[9019]1223         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1224         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
[4990]1225         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1226      CASE ( 0 )
[9019]1227         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1228         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1229         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1230         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
[4990]1231         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1232         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1233      CASE ( 1 )
[9019]1234         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1235         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
[4990]1236         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1237      END SELECT
1238      !
[10314]1239      IF( ln_timing ) CALL tic_tac(.FALSE.)
1240      !
[4990]1241      !                           ! Write Dirichlet lateral conditions
[9019]1242      iihom = jpi - nn_hls
[4990]1243      !
1244      SELECT CASE ( nbondi )
1245      CASE ( -1 )
1246         DO jl = 1, ipreci
1247            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1248         END DO
1249      CASE ( 0 )
1250         DO jl = 1, ipreci
[9019]1251            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1252            pt2d(iihom+jl,:) = r2dew(:,jl,2)
[4990]1253         END DO
1254      CASE ( 1 )
1255         DO jl = 1, ipreci
[9019]1256            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
[4990]1257         END DO
1258      END SELECT
1259
1260
1261      ! 3. North and south directions
1262      ! -----------------------------
1263      ! always closed : we play only with the neigbours
1264      !
1265      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
[9019]1266         ijhom = jpj-nrecj-kextj
[4990]1267         DO jl = 1, iprecj
1268            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
[9019]1269            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
[4990]1270         END DO
1271      ENDIF
1272      !
1273      !                           ! Migrations
[9019]1274      imigr = iprecj * ( jpi + 2*kexti )
[4990]1275      !
[10314]1276      IF( ln_timing ) CALL tic_tac(.TRUE.)
1277      !
[4990]1278      SELECT CASE ( nbondj )
1279      CASE ( -1 )
[9019]1280         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1281         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
[4990]1282         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1283      CASE ( 0 )
[9019]1284         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1285         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1286         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1287         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
[4990]1288         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1289         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1290      CASE ( 1 )
[9019]1291         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1292         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
[4990]1293         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1294      END SELECT
1295      !
[10314]1296      IF( ln_timing ) CALL tic_tac(.FALSE.)
1297      !
[4990]1298      !                           ! Write Dirichlet lateral conditions
[9019]1299      ijhom = jpj - nn_hls
[4990]1300      !
1301      SELECT CASE ( nbondj )
1302      CASE ( -1 )
1303         DO jl = 1, iprecj
1304            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1305         END DO
1306      CASE ( 0 )
1307         DO jl = 1, iprecj
[9019]1308            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1309            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
[4990]1310         END DO
1311      CASE ( 1 )
1312         DO jl = 1, iprecj
[9019]1313            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
[4990]1314         END DO
1315      END SELECT
[9019]1316      !
[4990]1317   END SUBROUTINE mpp_lnk_2d_icb
[10172]1318
[10314]1319
1320   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb )
1321      !!----------------------------------------------------------------------
1322      !!                  ***  routine mpp_report  ***
1323      !!
1324      !! ** Purpose :   report use of mpp routines per time-setp
1325      !!
1326      !!----------------------------------------------------------------------
1327      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1328      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1329      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb
1330      !!
1331      LOGICAL ::   ll_lbc, ll_glb
1332      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1333      !!----------------------------------------------------------------------
1334      !
1335      ll_lbc = .FALSE.
1336      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1337      ll_glb = .FALSE.
1338      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1339      !
1340      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1341      ncom_freq = ncom_fsbc * ncom_dttrc
1342      IF( MOD( MAX(ncom_fsbc,ncom_dttrc), MIN(ncom_fsbc,ncom_dttrc) ) == 0 ) ncom_freq = MAX(ncom_fsbc,ncom_dttrc)
1343      !
1344      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1345         IF( ll_lbc ) THEN
1346            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1347            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1348            n_sequence_lbc = n_sequence_lbc + 1
1349            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lnk_generic, increase ncom_rec_max' )   ! deadlock
1350            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1351            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1352            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1353         ENDIF
1354         IF( ll_glb ) THEN
1355            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1356            n_sequence_glb = n_sequence_glb + 1
1357            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lnk_generic, increase ncom_rec_max' )   ! deadlock
1358            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1359         ENDIF
1360      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1361         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1362         WRITE(numcom,*) ' '
1363         WRITE(numcom,*) ' ------------------------------------------------------------'
1364         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1365         WRITE(numcom,*) ' ------------------------------------------------------------'
1366         WRITE(numcom,*) ' '
1367         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1368         jj = 0; jk = 0; jf = 0; jh = 0
1369         DO ji = 1, n_sequence_lbc
1370            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1371            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1372            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1373            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1374         END DO
1375         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1376         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1377         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1378         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1379         WRITE(numcom,*) ' '
1380         WRITE(numcom,*) ' lbc_lnk called'
1381         jj = 1
1382         DO ji = 2, n_sequence_lbc
1383            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1384               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1385               jj = 0
1386            END IF
1387            jj = jj + 1 
1388         END DO
1389         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1390         WRITE(numcom,*) ' '
1391         IF ( n_sequence_glb > 0 ) THEN
1392            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1393            jj = 1
1394            DO ji = 2, n_sequence_glb
1395               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1396                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1397                  jj = 0
1398               END IF
1399               jj = jj + 1 
1400            END DO
1401            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1402            DEALLOCATE(crname_glb)
1403         ELSE
1404            WRITE(numcom,*) ' No MPI global communication '
1405         ENDIF
1406         WRITE(numcom,*) ' '
1407         WRITE(numcom,*) ' -----------------------------------------------'
1408         WRITE(numcom,*) ' '
1409         DEALLOCATE(ncomm_sequence)
1410         DEALLOCATE(crname_lbc)
1411      ENDIF
1412   END SUBROUTINE mpp_report
1413
[6140]1414   
[10300]1415   SUBROUTINE tic_tac (ld_tic, ld_global)
[10172]1416
[10300]1417    LOGICAL,           INTENT(IN) :: ld_tic
1418    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1419    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1420    REAL(wp),               SAVE :: tic_ct = 0._wp
1421    INTEGER :: ii
[10172]1422
1423    IF( ncom_stp <= nit000 ) RETURN
1424    IF( ncom_stp == nitend ) RETURN
[10300]1425    ii = 1
1426    IF( PRESENT( ld_global ) ) THEN
1427       IF( ld_global ) ii = 2
1428    END IF
[10172]1429   
[10300]1430    IF ( ld_tic ) THEN
1431       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
[10172]1432       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1433    ELSE
[10300]1434       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
[10172]1435       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1436    ENDIF
1437   
1438   END SUBROUTINE tic_tac
1439
1440   
[13]1441#else
1442   !!----------------------------------------------------------------------
1443   !!   Default case:            Dummy module        share memory computing
1444   !!----------------------------------------------------------------------
[2715]1445   USE in_out_manager
[1976]1446
[13]1447   INTERFACE mpp_sum
[10314]1448      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
[13]1449   END INTERFACE
1450   INTERFACE mpp_max
[681]1451      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]1452   END INTERFACE
1453   INTERFACE mpp_min
1454      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1455   END INTERFACE
[1344]1456   INTERFACE mpp_minloc
1457      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1458   END INTERFACE
1459   INTERFACE mpp_maxloc
1460      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1461   END INTERFACE
[3]1462
[13]1463   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
[4147]1464   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
[9570]1465   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
[2715]1466   !!----------------------------------------------------------------------
[13]1467CONTAINS
[3]1468
[2715]1469   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1470      INTEGER, INTENT(in) ::   kumout
1471      lib_mpp_alloc = 0
1472   END FUNCTION lib_mpp_alloc
1473
[5407]1474   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
[1579]1475      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
[3764]1476      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
[5407]1477      CHARACTER(len=*) ::   ldname
[4314]1478      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
[9570]1479      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
[5412]1480      function_value = 0
[1579]1481      IF( .FALSE. )   ldtxt(:) = 'never done'
[5407]1482      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
[13]1483   END FUNCTION mynode
[3]1484
[13]1485   SUBROUTINE mppsync                       ! Dummy routine
1486   END SUBROUTINE mppsync
[3]1487
[10314]1488   !!----------------------------------------------------------------------
1489   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1490   !!   
1491   !!----------------------------------------------------------------------
1492   !!
1493#  define OPERATION_MAX
1494#  define INTEGER_TYPE
1495#  define DIM_0d
1496#     define ROUTINE_ALLREDUCE           mppmax_int
1497#     include "mpp_allreduce_generic.h90"
1498#     undef ROUTINE_ALLREDUCE
1499#  undef DIM_0d
1500#  define DIM_1d
1501#     define ROUTINE_ALLREDUCE           mppmax_a_int
1502#     include "mpp_allreduce_generic.h90"
1503#     undef ROUTINE_ALLREDUCE
1504#  undef DIM_1d
1505#  undef INTEGER_TYPE
1506!
1507#  define REAL_TYPE
1508#  define DIM_0d
1509#     define ROUTINE_ALLREDUCE           mppmax_real
1510#     include "mpp_allreduce_generic.h90"
1511#     undef ROUTINE_ALLREDUCE
1512#  undef DIM_0d
1513#  define DIM_1d
1514#     define ROUTINE_ALLREDUCE           mppmax_a_real
1515#     include "mpp_allreduce_generic.h90"
1516#     undef ROUTINE_ALLREDUCE
1517#  undef DIM_1d
1518#  undef REAL_TYPE
1519#  undef OPERATION_MAX
1520   !!----------------------------------------------------------------------
1521   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1522   !!   
1523   !!----------------------------------------------------------------------
1524   !!
1525#  define OPERATION_MIN
1526#  define INTEGER_TYPE
1527#  define DIM_0d
1528#     define ROUTINE_ALLREDUCE           mppmin_int
1529#     include "mpp_allreduce_generic.h90"
1530#     undef ROUTINE_ALLREDUCE
1531#  undef DIM_0d
1532#  define DIM_1d
1533#     define ROUTINE_ALLREDUCE           mppmin_a_int
1534#     include "mpp_allreduce_generic.h90"
1535#     undef ROUTINE_ALLREDUCE
1536#  undef DIM_1d
1537#  undef INTEGER_TYPE
1538!
1539#  define REAL_TYPE
1540#  define DIM_0d
1541#     define ROUTINE_ALLREDUCE           mppmin_real
1542#     include "mpp_allreduce_generic.h90"
1543#     undef ROUTINE_ALLREDUCE
1544#  undef DIM_0d
1545#  define DIM_1d
1546#     define ROUTINE_ALLREDUCE           mppmin_a_real
1547#     include "mpp_allreduce_generic.h90"
1548#     undef ROUTINE_ALLREDUCE
1549#  undef DIM_1d
1550#  undef REAL_TYPE
1551#  undef OPERATION_MIN
[3]1552
[10314]1553   !!----------------------------------------------------------------------
1554   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1555   !!   
1556   !!   Global sum of 1D array or a variable (integer, real or complex)
1557   !!----------------------------------------------------------------------
1558   !!
1559#  define OPERATION_SUM
1560#  define INTEGER_TYPE
1561#  define DIM_0d
1562#     define ROUTINE_ALLREDUCE           mppsum_int
1563#     include "mpp_allreduce_generic.h90"
1564#     undef ROUTINE_ALLREDUCE
1565#  undef DIM_0d
1566#  define DIM_1d
1567#     define ROUTINE_ALLREDUCE           mppsum_a_int
1568#     include "mpp_allreduce_generic.h90"
1569#     undef ROUTINE_ALLREDUCE
1570#  undef DIM_1d
1571#  undef INTEGER_TYPE
1572!
1573#  define REAL_TYPE
1574#  define DIM_0d
1575#     define ROUTINE_ALLREDUCE           mppsum_real
1576#     include "mpp_allreduce_generic.h90"
1577#     undef ROUTINE_ALLREDUCE
1578#  undef DIM_0d
1579#  define DIM_1d
1580#     define ROUTINE_ALLREDUCE           mppsum_a_real
1581#     include "mpp_allreduce_generic.h90"
1582#     undef ROUTINE_ALLREDUCE
1583#  undef DIM_1d
1584#  undef REAL_TYPE
1585#  undef OPERATION_SUM
[3]1586
[10314]1587#  define OPERATION_SUM_DD
1588#  define COMPLEX_TYPE
1589#  define DIM_0d
1590#     define ROUTINE_ALLREDUCE           mppsum_realdd
1591#     include "mpp_allreduce_generic.h90"
1592#     undef ROUTINE_ALLREDUCE
1593#  undef DIM_0d
1594#  define DIM_1d
1595#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1596#     include "mpp_allreduce_generic.h90"
1597#     undef ROUTINE_ALLREDUCE
1598#  undef DIM_1d
1599#  undef COMPLEX_TYPE
1600#  undef OPERATION_SUM_DD
[3]1601
[10314]1602   !!----------------------------------------------------------------------
1603   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1604   !!   
1605   !!----------------------------------------------------------------------
1606   !!
1607#  define OPERATION_MINLOC
1608#  define DIM_2d
1609#     define ROUTINE_LOC           mpp_minloc2d
1610#     include "mpp_loc_generic.h90"
1611#     undef ROUTINE_LOC
1612#  undef DIM_2d
1613#  define DIM_3d
1614#     define ROUTINE_LOC           mpp_minloc3d
1615#     include "mpp_loc_generic.h90"
1616#     undef ROUTINE_LOC
1617#  undef DIM_3d
1618#  undef OPERATION_MINLOC
[2480]1619
[10314]1620#  define OPERATION_MAXLOC
1621#  define DIM_2d
1622#     define ROUTINE_LOC           mpp_maxloc2d
1623#     include "mpp_loc_generic.h90"
1624#     undef ROUTINE_LOC
1625#  undef DIM_2d
1626#  define DIM_3d
1627#     define ROUTINE_LOC           mpp_maxloc3d
1628#     include "mpp_loc_generic.h90"
1629#     undef ROUTINE_LOC
1630#  undef DIM_3d
1631#  undef OPERATION_MAXLOC
[13]1632
[10292]1633   SUBROUTINE mpp_ilor( ld_switch, ldlast, kcom )
1634      LOGICAL, INTENT(in   ), DIMENSION(2) ::   ld_switch
1635      LOGICAL, INTENT(in   ), OPTIONAL     ::   ldlast
1636      INTEGER, INTENT(in   ), OPTIONAL     ::   kcom    ! ???
1637      WRITE(*,*) 'mpp_ilor: You should not have seen this print! error?', ld_switch
1638   END SUBROUTINE mpp_ilor
1639
[51]1640   SUBROUTINE mppstop
[3799]1641      STOP      ! non MPP case, just stop the run
[51]1642   END SUBROUTINE mppstop
1643
[2715]1644   SUBROUTINE mpp_ini_znl( knum )
1645      INTEGER :: knum
1646      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
[1345]1647   END SUBROUTINE mpp_ini_znl
1648
[1344]1649   SUBROUTINE mpp_comm_free( kcom )
[869]1650      INTEGER :: kcom
[1344]1651      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
[869]1652   END SUBROUTINE mpp_comm_free
[9019]1653   
[3]1654#endif
[2715]1655
[13]1656   !!----------------------------------------------------------------------
[4147]1657   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
[2715]1658   !!----------------------------------------------------------------------
1659
1660   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1661      &                 cd6, cd7, cd8, cd9, cd10 )
1662      !!----------------------------------------------------------------------
1663      !!                  ***  ROUTINE  stop_opa  ***
1664      !!
[3764]1665      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1666      !!                increment the error number (nstop) by one.
1667      !!----------------------------------------------------------------------
1668      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1669      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1670      !!----------------------------------------------------------------------
1671      !
[3764]1672      nstop = nstop + 1
[2715]1673      IF(lwp) THEN
1674         WRITE(numout,cform_err)
[10330]1675         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1676         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1677         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1678         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1679         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1680         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1681         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1682         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1683         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1684         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[2715]1685      ENDIF
1686                               CALL FLUSH(numout    )
1687      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1688      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1689      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1690      !
1691      IF( cd1 == 'STOP' ) THEN
1692         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1693         CALL mppstop()
1694      ENDIF
1695      !
1696   END SUBROUTINE ctl_stop
1697
1698
1699   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1700      &                 cd6, cd7, cd8, cd9, cd10 )
1701      !!----------------------------------------------------------------------
1702      !!                  ***  ROUTINE  stop_warn  ***
1703      !!
[3764]1704      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1705      !!                increment the warning number (nwarn) by one.
1706      !!----------------------------------------------------------------------
1707      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1708      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1709      !!----------------------------------------------------------------------
[3764]1710      !
1711      nwarn = nwarn + 1
[2715]1712      IF(lwp) THEN
1713         WRITE(numout,cform_war)
[10330]1714         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1715         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1716         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1717         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1718         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1719         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1720         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1721         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1722         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1723         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
[2715]1724      ENDIF
1725      CALL FLUSH(numout)
1726      !
1727   END SUBROUTINE ctl_warn
1728
1729
1730   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1731      !!----------------------------------------------------------------------
1732      !!                  ***  ROUTINE ctl_opn  ***
1733      !!
1734      !! ** Purpose :   Open file and check if required file is available.
1735      !!
1736      !! ** Method  :   Fortan open
1737      !!----------------------------------------------------------------------
1738      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1739      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1740      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1741      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1742      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1743      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1744      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1745      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1746      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1747      !
[2715]1748      CHARACTER(len=80) ::   clfile
1749      INTEGER           ::   iost
1750      !!----------------------------------------------------------------------
[5836]1751      !
[2715]1752      ! adapt filename
1753      ! ----------------
1754      clfile = TRIM(cdfile)
1755      IF( PRESENT( karea ) ) THEN
1756         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1757      ENDIF
1758#if defined key_agrif
1759      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1760      knum=Agrif_Get_Unit()
1761#else
1762      knum=get_unit()
1763#endif
[10330]1764      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1765      !
[2715]1766      iost=0
1767      IF( cdacce(1:6) == 'DIRECT' )  THEN
1768         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh, ERR=100, IOSTAT=iost )
1769      ELSE
1770         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat             , ERR=100, IOSTAT=iost )
1771      ENDIF
[10330]1772      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &
1773         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat             , ERR=100, IOSTAT=iost )   ! for windows
[2715]1774      IF( iost == 0 ) THEN
1775         IF(ldwp) THEN
1776            WRITE(kout,*) '     file   : ', clfile,' open ok'
1777            WRITE(kout,*) '     unit   = ', knum
1778            WRITE(kout,*) '     status = ', cdstat
1779            WRITE(kout,*) '     form   = ', cdform
1780            WRITE(kout,*) '     access = ', cdacce
1781            WRITE(kout,*)
1782         ENDIF
1783      ENDIF
1784100   CONTINUE
1785      IF( iost /= 0 ) THEN
1786         IF(ldwp) THEN
1787            WRITE(kout,*)
1788            WRITE(kout,*) ' ===>>>> : bad opening file: ', clfile
1789            WRITE(kout,*) ' =======   ===  '
1790            WRITE(kout,*) '           unit   = ', knum
1791            WRITE(kout,*) '           status = ', cdstat
1792            WRITE(kout,*) '           form   = ', cdform
1793            WRITE(kout,*) '           access = ', cdacce
1794            WRITE(kout,*) '           iostat = ', iost
1795            WRITE(kout,*) '           we stop. verify the file '
1796            WRITE(kout,*)
[9438]1797         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1798            WRITE(*,*)
1799            WRITE(*,*) ' ===>>>> : bad opening file: ', clfile
1800            WRITE(*,*) ' =======   ===  '
1801            WRITE(*,*) '           unit   = ', knum
1802            WRITE(*,*) '           status = ', cdstat
1803            WRITE(*,*) '           form   = ', cdform
1804            WRITE(*,*) '           access = ', cdacce
1805            WRITE(*,*) '           iostat = ', iost
1806            WRITE(*,*) '           we stop. verify the file '
1807            WRITE(*,*)
[2715]1808         ENDIF
[9019]1809         CALL FLUSH( kout ) 
[2715]1810         STOP 'ctl_opn bad opening'
1811      ENDIF
[5836]1812      !
[2715]1813   END SUBROUTINE ctl_opn
1814
[5836]1815
[4147]1816   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1817      !!----------------------------------------------------------------------
1818      !!                  ***  ROUTINE ctl_nam  ***
1819      !!
1820      !! ** Purpose :   Informations when error while reading a namelist
1821      !!
1822      !! ** Method  :   Fortan open
1823      !!----------------------------------------------------------------------
[5836]1824      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1825      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
[7646]1826      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
[5836]1827      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
[4147]1828      !!----------------------------------------------------------------------
[5836]1829      !
[7646]1830      WRITE (clios, '(I5.0)')   kios
[4147]1831      IF( kios < 0 ) THEN         
[5836]1832         CALL ctl_warn( 'end of record or file while reading namelist '   &
1833            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1834      ENDIF
[5836]1835      !
[4147]1836      IF( kios > 0 ) THEN
[5836]1837         CALL ctl_stop( 'misspelled variable in namelist '   &
1838            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1839      ENDIF
1840      kios = 0
1841      RETURN
[5836]1842      !
[4147]1843   END SUBROUTINE ctl_nam
1844
[5836]1845
[2715]1846   INTEGER FUNCTION get_unit()
1847      !!----------------------------------------------------------------------
1848      !!                  ***  FUNCTION  get_unit  ***
1849      !!
1850      !! ** Purpose :   return the index of an unused logical unit
1851      !!----------------------------------------------------------------------
[3764]1852      LOGICAL :: llopn
[2715]1853      !!----------------------------------------------------------------------
1854      !
1855      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1856      llopn = .TRUE.
1857      DO WHILE( (get_unit < 998) .AND. llopn )
1858         get_unit = get_unit + 1
1859         INQUIRE( unit = get_unit, opened = llopn )
1860      END DO
1861      IF( (get_unit == 999) .AND. llopn ) THEN
1862         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
1863         get_unit = -1
1864      ENDIF
1865      !
1866   END FUNCTION get_unit
1867
1868   !!----------------------------------------------------------------------
[3]1869END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.