New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC – NEMO

source: NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC/lib_mpp.F90 @ 10358

Last change on this file since 10358 was 10358, checked in by smasson, 5 years ago

dev_r10164_HPC09_ESIWACE_PREP_MERGE: action 5b: by default, suppress global communication in stpctl, see #2133

  • Property svn:keywords set to Id
File size: 79.5 KB
RevLine 
[3]1MODULE lib_mpp
[13]2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
[1344]4   !! Ocean numerics:  massively parallel processing library
[13]5   !!=====================================================================
[1344]6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
[9019]10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
[1344]11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
[3764]19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
[2715]20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
[9019]21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
[6140]23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
[9019]24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
[13]27   !!----------------------------------------------------------------------
[2715]28
29   !!----------------------------------------------------------------------
[6140]30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
[2715]35   !!----------------------------------------------------------------------
[3764]36#if   defined key_mpp_mpi
[13]37   !!----------------------------------------------------------------------
[1344]38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
[2715]40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
[4990]43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
[6140]44   !!   mpprecv       :
[9019]45   !!   mppsend       :
[2715]46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
[1344]55   !!   mpp_ini_north : initialisation of north fold
[9019]56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
[13]57   !!----------------------------------------------------------------------
[3764]58   USE dom_oce        ! ocean space and time domain
[2715]59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
[3]61
[13]62   IMPLICIT NONE
[415]63   PRIVATE
[9019]64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d      , mpp_nfd_3d      , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d      , mpp_lnk_3d      , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
[4147]79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
[1344]80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
[9019]81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
[1344]84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
[10292]85   PUBLIC   mpp_ilor
[3294]86   PUBLIC   mppscatter, mppgather
[10180]87   PUBLIC   mpp_ini_znl
[3764]88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
[9890]89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
[5429]90   
[13]91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
[1344]93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
[13]95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
[681]99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]100   END INTERFACE
101   INTERFACE mpp_sum
[6140]102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
[9019]103         &             mppsum_realdd, mppsum_a_realdd
[13]104   END INTERFACE
[1344]105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
[6490]111
[51]112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
[1629]115!$AGRIF_DO_NOT_TREAT
[2004]116   INCLUDE 'mpif.h'
[1629]117!$AGRIF_END_DO_NOT_TREAT
[3764]118
[1344]119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
[3]120
[1344]121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
[3764]122
[10330]123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
[2363]125!$AGRIF_DO_NOT_TREAT
[9570]126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
[2363]127!$AGRIF_END_DO_NOT_TREAT
[3]128
[2480]129   INTEGER :: MPI_SUMDD
[1976]130
[1345]131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
[9019]133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
[2715]136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
[3]137
[3764]138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
[9019]139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
[3764]147
[1344]148   ! Type of send : standard, buffered, immediate
[9019]149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
[3764]152
[10170]153   ! Communications summary report
[10297]154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm  calling routines
[10170]156   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
[10297]157   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
158   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
[10314]159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
[10170]160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
[10314]161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 2000          !: max number of communication record
[10297]162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
[10357]165   LOGICAL, PUBLIC                               ::   l_full_nf_update = .FALSE.   !: logical for a full (2lines) update of bc at North fold report
[10297]166
[10172]167   ! timing summary report
[10300]168   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
169   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
[10172]170   
[9019]171   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
[3]172
[9019]173   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
174   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
175
[51]176   !!----------------------------------------------------------------------
[9598]177   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
[888]178   !! $Id$
[10068]179   !! Software governed by the CeCILL license (see ./LICENSE)
[1344]180   !!----------------------------------------------------------------------
[3]181CONTAINS
182
[9019]183   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
[2715]184      !!----------------------------------------------------------------------
[51]185      !!                  ***  routine mynode  ***
[3764]186      !!
[51]187      !! ** Purpose :   Find processor unit
188      !!----------------------------------------------------------------------
[6140]189      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
190      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
191      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
192      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
193      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
194      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
195      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
[2715]196      !
[4147]197      INTEGER ::   mynode, ierr, code, ji, ii, ios
[532]198      LOGICAL ::   mpi_was_called
[2715]199      !
[3294]200      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
[51]201      !!----------------------------------------------------------------------
[1344]202      !
[2481]203      ii = 1
[6140]204      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
205      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
206      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
[1344]207      !
[4147]208      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
209      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
[9168]210901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
[9019]211      !
[4147]212      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
213      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
[9168]214902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
[9019]215      !
[1344]216      !                              ! control print
[6140]217      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
218      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
219      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
[9019]220      !
221      IF( jpni < 1 .OR. jpnj < 1  ) THEN
[6140]222         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
[2715]223      ELSE
[6140]224         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
225         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
[9019]226      ENDIF
[2715]227
[3294]228      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
229
[2480]230      CALL mpi_initialized ( mpi_was_called, code )
231      IF( code /= MPI_SUCCESS ) THEN
[3764]232         DO ji = 1, SIZE(ldtxt)
[2481]233            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
[3764]234         END DO
[2480]235         WRITE(*, cform_err)
236         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
237         CALL mpi_abort( mpi_comm_world, code, ierr )
238      ENDIF
[415]239
[2480]240      IF( mpi_was_called ) THEN
241         !
242         SELECT CASE ( cn_mpi_send )
243         CASE ( 'S' )                ! Standard mpi send (blocking)
[6140]244            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
[2480]245         CASE ( 'B' )                ! Buffer mpi send (blocking)
[6140]246            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
[9570]247            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
[2480]248         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[6140]249            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
[2480]250            l_isend = .TRUE.
251         CASE DEFAULT
[6140]252            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
253            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
[2715]254            kstop = kstop + 1
[2480]255         END SELECT
[9019]256         !
257      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
[6140]258         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
259         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
[2715]260         kstop = kstop + 1
[532]261      ELSE
[1601]262         SELECT CASE ( cn_mpi_send )
[524]263         CASE ( 'S' )                ! Standard mpi send (blocking)
[6140]264            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
[2480]265            CALL mpi_init( ierr )
[524]266         CASE ( 'B' )                ! Buffer mpi send (blocking)
[6140]267            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
[9570]268            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
[524]269         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[6140]270            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
[524]271            l_isend = .TRUE.
[2480]272            CALL mpi_init( ierr )
[524]273         CASE DEFAULT
[6140]274            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
275            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
[2715]276            kstop = kstop + 1
[524]277         END SELECT
[2480]278         !
[415]279      ENDIF
[570]280
[3764]281      IF( PRESENT(localComm) ) THEN
[2480]282         IF( Agrif_Root() ) THEN
[9570]283            mpi_comm_oce = localComm
[2480]284         ENDIF
285      ELSE
[9570]286         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
[2480]287         IF( code /= MPI_SUCCESS ) THEN
[3764]288            DO ji = 1, SIZE(ldtxt)
[2481]289               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
290            END DO
[2480]291            WRITE(*, cform_err)
292            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
293            CALL mpi_abort( mpi_comm_world, code, ierr )
294         ENDIF
[3764]295      ENDIF
[2480]296
[5656]297#if defined key_agrif
[9019]298      IF( Agrif_Root() ) THEN
[9570]299         CALL Agrif_MPI_Init(mpi_comm_oce)
[5656]300      ELSE
[9570]301         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
[5656]302      ENDIF
303#endif
304
[9570]305      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
306      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
[629]307      mynode = mpprank
[4624]308
309      IF( mynode == 0 ) THEN
[5407]310         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
311         WRITE(kumond, nammpp)     
[4624]312      ENDIF
[3764]313      !
[1976]314      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
315      !
[51]316   END FUNCTION mynode
[3]317
[9019]318   !!----------------------------------------------------------------------
319   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
320   !!
321   !!   * Argument : dummy argument use in mpp_lnk_... routines
322   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
323   !!                cd_nat :   nature of array grid-points
324   !!                psgn   :   sign used across the north fold boundary
325   !!                kfld   :   optional, number of pt3d arrays
326   !!                cd_mpp :   optional, fill the overlap area only
327   !!                pval   :   optional, background value (used at closed boundaries)
328   !!----------------------------------------------------------------------
329   !
330   !                       !==  2D array and array of 2D pointer  ==!
331   !
332#  define DIM_2d
333#     define ROUTINE_LNK           mpp_lnk_2d
334#     include "mpp_lnk_generic.h90"
335#     undef ROUTINE_LNK
336#     define MULTI
337#     define ROUTINE_LNK           mpp_lnk_2d_ptr
338#     include "mpp_lnk_generic.h90"
339#     undef ROUTINE_LNK
340#     undef MULTI
341#  undef DIM_2d
342   !
343   !                       !==  3D array and array of 3D pointer  ==!
344   !
345#  define DIM_3d
346#     define ROUTINE_LNK           mpp_lnk_3d
347#     include "mpp_lnk_generic.h90"
348#     undef ROUTINE_LNK
349#     define MULTI
350#     define ROUTINE_LNK           mpp_lnk_3d_ptr
351#     include "mpp_lnk_generic.h90"
352#     undef ROUTINE_LNK
353#     undef MULTI
354#  undef DIM_3d
355   !
356   !                       !==  4D array and array of 4D pointer  ==!
357   !
358#  define DIM_4d
359#     define ROUTINE_LNK           mpp_lnk_4d
360#     include "mpp_lnk_generic.h90"
361#     undef ROUTINE_LNK
362#     define MULTI
363#     define ROUTINE_LNK           mpp_lnk_4d_ptr
364#     include "mpp_lnk_generic.h90"
365#     undef ROUTINE_LNK
366#     undef MULTI
367#  undef DIM_4d
[6140]368
[9019]369   !!----------------------------------------------------------------------
370   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
371   !!
372   !!   * Argument : dummy argument use in mpp_nfd_... routines
373   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
374   !!                cd_nat :   nature of array grid-points
375   !!                psgn   :   sign used across the north fold boundary
376   !!                kfld   :   optional, number of pt3d arrays
377   !!                cd_mpp :   optional, fill the overlap area only
378   !!                pval   :   optional, background value (used at closed boundaries)
379   !!----------------------------------------------------------------------
380   !
381   !                       !==  2D array and array of 2D pointer  ==!
382   !
383#  define DIM_2d
384#     define ROUTINE_NFD           mpp_nfd_2d
385#     include "mpp_nfd_generic.h90"
386#     undef ROUTINE_NFD
387#     define MULTI
388#     define ROUTINE_NFD           mpp_nfd_2d_ptr
389#     include "mpp_nfd_generic.h90"
390#     undef ROUTINE_NFD
391#     undef MULTI
392#  undef DIM_2d
393   !
394   !                       !==  3D array and array of 3D pointer  ==!
395   !
396#  define DIM_3d
397#     define ROUTINE_NFD           mpp_nfd_3d
398#     include "mpp_nfd_generic.h90"
399#     undef ROUTINE_NFD
400#     define MULTI
401#     define ROUTINE_NFD           mpp_nfd_3d_ptr
402#     include "mpp_nfd_generic.h90"
403#     undef ROUTINE_NFD
404#     undef MULTI
405#  undef DIM_3d
406   !
407   !                       !==  4D array and array of 4D pointer  ==!
408   !
409#  define DIM_4d
410#     define ROUTINE_NFD           mpp_nfd_4d
411#     include "mpp_nfd_generic.h90"
412#     undef ROUTINE_NFD
413#     define MULTI
414#     define ROUTINE_NFD           mpp_nfd_4d_ptr
415#     include "mpp_nfd_generic.h90"
416#     undef ROUTINE_NFD
417#     undef MULTI
418#  undef DIM_4d
[3]419
[1344]420
[9019]421   !!----------------------------------------------------------------------
422   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
423   !!
424   !!   * Argument : dummy argument use in mpp_lnk_... routines
425   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
426   !!                cd_nat :   nature of array grid-points
427   !!                psgn   :   sign used across the north fold boundary
428   !!                kb_bdy :   BDY boundary set
429   !!                kfld   :   optional, number of pt3d arrays
430   !!----------------------------------------------------------------------
431   !
432   !                       !==  2D array and array of 2D pointer  ==!
433   !
434#  define DIM_2d
435#     define ROUTINE_BDY           mpp_lnk_bdy_2d
436#     include "mpp_bdy_generic.h90"
437#     undef ROUTINE_BDY
438#  undef DIM_2d
439   !
440   !                       !==  3D array and array of 3D pointer  ==!
441   !
442#  define DIM_3d
443#     define ROUTINE_BDY           mpp_lnk_bdy_3d
444#     include "mpp_bdy_generic.h90"
445#     undef ROUTINE_BDY
446#  undef DIM_3d
447   !
448   !                       !==  4D array and array of 4D pointer  ==!
449   !
[9890]450#  define DIM_4d
451#     define ROUTINE_BDY           mpp_lnk_bdy_4d
452#     include "mpp_bdy_generic.h90"
453#     undef ROUTINE_BDY
454#  undef DIM_4d
[3]455
[9019]456   !!----------------------------------------------------------------------
457   !!
458   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
[5429]459   
460   
[9019]461   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
[5429]462   
[9019]463   
464   !!----------------------------------------------------------------------
[5429]465
466
[888]467
[1344]468   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
[51]469      !!----------------------------------------------------------------------
470      !!                  ***  routine mppsend  ***
[3764]471      !!
[51]472      !! ** Purpose :   Send messag passing array
473      !!
474      !!----------------------------------------------------------------------
[1344]475      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
476      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
477      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
478      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
479      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
480      !!
481      INTEGER ::   iflag
[51]482      !!----------------------------------------------------------------------
[1344]483      !
[1601]484      SELECT CASE ( cn_mpi_send )
[300]485      CASE ( 'S' )                ! Standard mpi send (blocking)
[9570]486         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
[300]487      CASE ( 'B' )                ! Buffer mpi send (blocking)
[9570]488         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
[300]489      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
[1344]490         ! be carefull, one more argument here : the mpi request identifier..
[9570]491         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
[300]492      END SELECT
[1344]493      !
[51]494   END SUBROUTINE mppsend
[3]495
496
[3294]497   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
[51]498      !!----------------------------------------------------------------------
499      !!                  ***  routine mpprecv  ***
500      !!
501      !! ** Purpose :   Receive messag passing array
502      !!
503      !!----------------------------------------------------------------------
[1344]504      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
505      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
506      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
[3764]507      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
[1344]508      !!
[51]509      INTEGER :: istatus(mpi_status_size)
510      INTEGER :: iflag
[3294]511      INTEGER :: use_source
[1344]512      !!----------------------------------------------------------------------
513      !
[3764]514      ! If a specific process number has been passed to the receive call,
[3294]515      ! use that one. Default is to use mpi_any_source
[6140]516      use_source = mpi_any_source
517      IF( PRESENT(ksource) )   use_source = ksource
518      !
[9570]519      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
[1344]520      !
[51]521   END SUBROUTINE mpprecv
[3]522
523
[51]524   SUBROUTINE mppgather( ptab, kp, pio )
525      !!----------------------------------------------------------------------
526      !!                   ***  routine mppgather  ***
[3764]527      !!
528      !! ** Purpose :   Transfert between a local subdomain array and a work
[51]529      !!     array which is distributed following the vertical level.
530      !!
[1344]531      !!----------------------------------------------------------------------
[6140]532      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
533      INTEGER                           , INTENT(in   ) ::   kp     ! record length
[1344]534      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
[51]535      !!
[1344]536      INTEGER :: itaille, ierror   ! temporary integer
[51]537      !!---------------------------------------------------------------------
[1344]538      !
539      itaille = jpi * jpj
540      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
[9570]541         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
[1344]542      !
[51]543   END SUBROUTINE mppgather
[3]544
545
[51]546   SUBROUTINE mppscatter( pio, kp, ptab )
547      !!----------------------------------------------------------------------
548      !!                  ***  routine mppscatter  ***
549      !!
[3764]550      !! ** Purpose :   Transfert between awork array which is distributed
[51]551      !!      following the vertical level and the local subdomain array.
552      !!
553      !!----------------------------------------------------------------------
[6140]554      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
555      INTEGER                             ::   kp     ! Tag (not used with MPI
556      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
[1344]557      !!
558      INTEGER :: itaille, ierror   ! temporary integer
[51]559      !!---------------------------------------------------------------------
[1344]560      !
[6140]561      itaille = jpi * jpj
[1344]562      !
563      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
[9570]564         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
[1344]565      !
[51]566   END SUBROUTINE mppscatter
[3]567
[9019]568   !!
[10292]569   SUBROUTINE mpp_ilor( ld_switch, ldlast, kcom )
570      ! WARNING: must be used only once (by ice_dyn_adv_umx) because ll_switch and ireq are SAVE
571      !!----------------------------------------------------------------------
572      LOGICAL, INTENT(inout), DIMENSION(2) ::   ld_switch
573      LOGICAL, INTENT(in   ), OPTIONAL     ::   ldlast
574      INTEGER, INTENT(in   ), OPTIONAL     ::   kcom 
575      INTEGER  ::   ierror, ilocalcomm
[10358]576      LOGICAL, SAVE ::   ll_switch , lllast
[10292]577      INTEGER, SAVE ::   ireq = -1
578      !!----------------------------------------------------------------------
579      ilocalcomm = mpi_comm_oce
[10358]580      IF( PRESENT(  kcom) )   ilocalcomm = kcom
581      lllast = .FALSE.
582      IF( PRESENT(ldlast) )   lllast = ldlast
[10292]583     
584      IF ( ireq /= -1 ) THEN   ! get ld_switch(2) from ll_switch (from previous call)
[10300]585         IF( ln_timing ) CALL tic_tac(.TRUE., ld_global = .TRUE.)
[10292]586         CALL mpi_wait(ireq, MPI_STATUS_IGNORE, ierror )
587         ld_switch(2) = ll_switch
[10300]588         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
[10292]589      ENDIF
[10358]590      IF( .NOT. lllast ) &     ! send ll_switch to be received on next call
[10292]591         CALL mpi_iallreduce( ld_switch(1), ll_switch, 1, MPI_LOGICAL, mpi_lor, ilocalcomm, ireq, ierror )
[681]592
[10292]593   END SUBROUTINE mpp_ilor
[10297]594   
[9019]595   !!----------------------------------------------------------------------
[10297]596   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
597   !!   
598   !!----------------------------------------------------------------------
599   !!
600#  define OPERATION_MAX
601#  define INTEGER_TYPE
602#  define DIM_0d
603#     define ROUTINE_ALLREDUCE           mppmax_int
604#     include "mpp_allreduce_generic.h90"
605#     undef ROUTINE_ALLREDUCE
606#  undef DIM_0d
607#  define DIM_1d
608#     define ROUTINE_ALLREDUCE           mppmax_a_int
609#     include "mpp_allreduce_generic.h90"
610#     undef ROUTINE_ALLREDUCE
611#  undef DIM_1d
612#  undef INTEGER_TYPE
613!
614#  define REAL_TYPE
615#  define DIM_0d
616#     define ROUTINE_ALLREDUCE           mppmax_real
617#     include "mpp_allreduce_generic.h90"
618#     undef ROUTINE_ALLREDUCE
619#  undef DIM_0d
620#  define DIM_1d
621#     define ROUTINE_ALLREDUCE           mppmax_a_real
622#     include "mpp_allreduce_generic.h90"
623#     undef ROUTINE_ALLREDUCE
624#  undef DIM_1d
625#  undef REAL_TYPE
626#  undef OPERATION_MAX
627   !!----------------------------------------------------------------------
[9019]628   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
629   !!   
630   !!----------------------------------------------------------------------
631   !!
[10297]632#  define OPERATION_MIN
633#  define INTEGER_TYPE
634#  define DIM_0d
635#     define ROUTINE_ALLREDUCE           mppmin_int
636#     include "mpp_allreduce_generic.h90"
637#     undef ROUTINE_ALLREDUCE
638#  undef DIM_0d
639#  define DIM_1d
640#     define ROUTINE_ALLREDUCE           mppmin_a_int
641#     include "mpp_allreduce_generic.h90"
642#     undef ROUTINE_ALLREDUCE
643#  undef DIM_1d
644#  undef INTEGER_TYPE
645!
646#  define REAL_TYPE
647#  define DIM_0d
648#     define ROUTINE_ALLREDUCE           mppmin_real
649#     include "mpp_allreduce_generic.h90"
650#     undef ROUTINE_ALLREDUCE
651#  undef DIM_0d
652#  define DIM_1d
653#     define ROUTINE_ALLREDUCE           mppmin_a_real
654#     include "mpp_allreduce_generic.h90"
655#     undef ROUTINE_ALLREDUCE
656#  undef DIM_1d
657#  undef REAL_TYPE
658#  undef OPERATION_MIN
[3]659
[9019]660   !!----------------------------------------------------------------------
661   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
662   !!   
663   !!   Global sum of 1D array or a variable (integer, real or complex)
664   !!----------------------------------------------------------------------
665   !!
[10297]666#  define OPERATION_SUM
667#  define INTEGER_TYPE
668#  define DIM_0d
669#     define ROUTINE_ALLREDUCE           mppsum_int
670#     include "mpp_allreduce_generic.h90"
671#     undef ROUTINE_ALLREDUCE
672#  undef DIM_0d
673#  define DIM_1d
674#     define ROUTINE_ALLREDUCE           mppsum_a_int
675#     include "mpp_allreduce_generic.h90"
676#     undef ROUTINE_ALLREDUCE
677#  undef DIM_1d
678#  undef INTEGER_TYPE
679!
680#  define REAL_TYPE
681#  define DIM_0d
682#     define ROUTINE_ALLREDUCE           mppsum_real
683#     include "mpp_allreduce_generic.h90"
684#     undef ROUTINE_ALLREDUCE
685#  undef DIM_0d
686#  define DIM_1d
687#     define ROUTINE_ALLREDUCE           mppsum_a_real
688#     include "mpp_allreduce_generic.h90"
689#     undef ROUTINE_ALLREDUCE
690#  undef DIM_1d
691#  undef REAL_TYPE
692#  undef OPERATION_SUM
[3764]693
[10297]694#  define OPERATION_SUM_DD
695#  define COMPLEX_TYPE
696#  define DIM_0d
697#     define ROUTINE_ALLREDUCE           mppsum_realdd
698#     include "mpp_allreduce_generic.h90"
699#     undef ROUTINE_ALLREDUCE
700#  undef DIM_0d
701#  define DIM_1d
702#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
703#     include "mpp_allreduce_generic.h90"
704#     undef ROUTINE_ALLREDUCE
705#  undef DIM_1d
706#  undef COMPLEX_TYPE
707#  undef OPERATION_SUM_DD
708
[10314]709   !!----------------------------------------------------------------------
710   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
711   !!   
712   !!----------------------------------------------------------------------
713   !!
714#  define OPERATION_MINLOC
715#  define DIM_2d
716#     define ROUTINE_LOC           mpp_minloc2d
717#     include "mpp_loc_generic.h90"
718#     undef ROUTINE_LOC
719#  undef DIM_2d
720#  define DIM_3d
721#     define ROUTINE_LOC           mpp_minloc3d
722#     include "mpp_loc_generic.h90"
723#     undef ROUTINE_LOC
724#  undef DIM_3d
725#  undef OPERATION_MINLOC
[6140]726
[10314]727#  define OPERATION_MAXLOC
728#  define DIM_2d
729#     define ROUTINE_LOC           mpp_maxloc2d
730#     include "mpp_loc_generic.h90"
731#     undef ROUTINE_LOC
732#  undef DIM_2d
733#  define DIM_3d
734#     define ROUTINE_LOC           mpp_maxloc3d
735#     include "mpp_loc_generic.h90"
736#     undef ROUTINE_LOC
737#  undef DIM_3d
738#  undef OPERATION_MAXLOC
[9019]739
[1344]740   SUBROUTINE mppsync()
741      !!----------------------------------------------------------------------
742      !!                  ***  routine mppsync  ***
[3764]743      !!
[1344]744      !! ** Purpose :   Massively parallel processors, synchroneous
745      !!
746      !!-----------------------------------------------------------------------
747      INTEGER :: ierror
748      !!-----------------------------------------------------------------------
749      !
[9570]750      CALL mpi_barrier( mpi_comm_oce, ierror )
[1344]751      !
752   END SUBROUTINE mppsync
[3]753
754
[10358]755   SUBROUTINE mppstop( ldfinal, ld_force_abort ) 
[1344]756      !!----------------------------------------------------------------------
757      !!                  ***  routine mppstop  ***
[3764]758      !!
[3294]759      !! ** purpose :   Stop massively parallel processors method
[1344]760      !!
761      !!----------------------------------------------------------------------
[10330]762      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
[10358]763      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
764      LOGICAL ::   llfinal, ll_force_abort
[1344]765      INTEGER ::   info
766      !!----------------------------------------------------------------------
[10330]767      llfinal = .FALSE.
768      IF( PRESENT(ldfinal) ) llfinal = ldfinal
[10358]769      ll_force_abort = .FALSE.
770      IF( PRESENT(ld_force_abort) ) ll_force_abort = ld_force_abort
771      !
772      IF(ll_force_abort) THEN
773         CALL mpi_abort( MPI_COMM_WORLD )
774      ELSE
775         CALL mppsync
776         CALL mpi_finalize( info )
777      ENDIF
[10330]778      IF( .NOT. llfinal ) STOP 123456
[1344]779      !
780   END SUBROUTINE mppstop
[3]781
782
[1344]783   SUBROUTINE mpp_comm_free( kcom )
784      !!----------------------------------------------------------------------
785      INTEGER, INTENT(in) ::   kcom
786      !!
787      INTEGER :: ierr
788      !!----------------------------------------------------------------------
789      !
790      CALL MPI_COMM_FREE(kcom, ierr)
791      !
792   END SUBROUTINE mpp_comm_free
[3]793
[869]794
[2715]795   SUBROUTINE mpp_ini_znl( kumout )
[1345]796      !!----------------------------------------------------------------------
797      !!               ***  routine mpp_ini_znl  ***
798      !!
799      !! ** Purpose :   Initialize special communicator for computing zonal sum
800      !!
801      !! ** Method  : - Look for processors in the same row
802      !!              - Put their number in nrank_znl
803      !!              - Create group for the znl processors
804      !!              - Create a communicator for znl processors
805      !!              - Determine if processor should write znl files
806      !!
807      !! ** output
808      !!      ndim_rank_znl = number of processors on the same row
809      !!      ngrp_znl = group ID for the znl processors
810      !!      ncomm_znl = communicator for the ice procs.
811      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
812      !!
813      !!----------------------------------------------------------------------
[2715]814      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
[1345]815      !
[2715]816      INTEGER :: jproc      ! dummy loop integer
817      INTEGER :: ierr, ii   ! local integer
818      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
819      !!----------------------------------------------------------------------
[1345]820      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
821      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
[9570]822      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
[1345]823      !
[2715]824      ALLOCATE( kwork(jpnij), STAT=ierr )
825      IF( ierr /= 0 ) THEN
826         WRITE(kumout, cform_err)
827         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
828         CALL mppstop
829      ENDIF
830
831      IF( jpnj == 1 ) THEN
[1345]832         ngrp_znl  = ngrp_world
[9570]833         ncomm_znl = mpi_comm_oce
[1345]834      ELSE
835         !
[9570]836         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
[1345]837         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
838         !-$$        CALL flush(numout)
839         !
840         ! Count number of processors on the same row
841         ndim_rank_znl = 0
842         DO jproc=1,jpnij
843            IF ( kwork(jproc) == njmpp ) THEN
844               ndim_rank_znl = ndim_rank_znl + 1
845            ENDIF
846         END DO
847         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
848         !-$$        CALL flush(numout)
849         ! Allocate the right size to nrank_znl
[1441]850         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
[1345]851         ALLOCATE(nrank_znl(ndim_rank_znl))
[3764]852         ii = 0
[1345]853         nrank_znl (:) = 0
854         DO jproc=1,jpnij
855            IF ( kwork(jproc) == njmpp) THEN
856               ii = ii + 1
[3764]857               nrank_znl(ii) = jproc -1
[1345]858            ENDIF
859         END DO
860         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
861         !-$$        CALL flush(numout)
862
863         ! Create the opa group
[9570]864         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
[1345]865         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
866         !-$$        CALL flush(numout)
867
868         ! Create the znl group from the opa group
869         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
870         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
871         !-$$        CALL flush(numout)
872
873         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
[9570]874         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
[1345]875         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
876         !-$$        CALL flush(numout)
877         !
878      END IF
879
880      ! Determines if processor if the first (starting from i=1) on the row
[3764]881      IF ( jpni == 1 ) THEN
[1345]882         l_znl_root = .TRUE.
883      ELSE
884         l_znl_root = .FALSE.
885         kwork (1) = nimpp
[10297]886         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
[1345]887         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
888      END IF
889
[2715]890      DEALLOCATE(kwork)
891
[1345]892   END SUBROUTINE mpp_ini_znl
893
894
[1344]895   SUBROUTINE mpp_ini_north
896      !!----------------------------------------------------------------------
897      !!               ***  routine mpp_ini_north  ***
898      !!
[3764]899      !! ** Purpose :   Initialize special communicator for north folding
[1344]900      !!      condition together with global variables needed in the mpp folding
901      !!
902      !! ** Method  : - Look for northern processors
903      !!              - Put their number in nrank_north
904      !!              - Create groups for the world processors and the north processors
905      !!              - Create a communicator for northern processors
906      !!
907      !! ** output
908      !!      njmppmax = njmpp for northern procs
909      !!      ndim_rank_north = number of processors in the northern line
910      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
911      !!      ngrp_world = group ID for the world processors
912      !!      ngrp_north = group ID for the northern processors
913      !!      ncomm_north = communicator for the northern procs.
914      !!      north_root = number (in the world) of proc 0 in the northern comm.
915      !!
916      !!----------------------------------------------------------------------
917      INTEGER ::   ierr
918      INTEGER ::   jjproc
919      INTEGER ::   ii, ji
920      !!----------------------------------------------------------------------
921      !
922      njmppmax = MAXVAL( njmppt )
923      !
924      ! Look for how many procs on the northern boundary
925      ndim_rank_north = 0
926      DO jjproc = 1, jpnij
927         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
928      END DO
929      !
930      ! Allocate the right size to nrank_north
[1441]931      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
[1344]932      ALLOCATE( nrank_north(ndim_rank_north) )
[869]933
[1344]934      ! Fill the nrank_north array with proc. number of northern procs.
935      ! Note : the rank start at 0 in MPI
936      ii = 0
937      DO ji = 1, jpnij
938         IF ( njmppt(ji) == njmppmax   ) THEN
939            ii=ii+1
940            nrank_north(ii)=ji-1
941         END IF
942      END DO
943      !
944      ! create the world group
[9570]945      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
[1344]946      !
947      ! Create the North group from the world group
948      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
949      !
950      ! Create the North communicator , ie the pool of procs in the north group
[9570]951      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
[1344]952      !
953   END SUBROUTINE mpp_ini_north
[869]954
955
[9570]956   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
[1344]957      !!---------------------------------------------------------------------
958      !!                   ***  routine mpp_init.opa  ***
959      !!
960      !! ** Purpose :: export and attach a MPI buffer for bsend
961      !!
962      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
963      !!            but classical mpi_init
[3764]964      !!
965      !! History :: 01/11 :: IDRIS initial version for IBM only
[1344]966      !!            08/04 :: R. Benshila, generalisation
967      !!---------------------------------------------------------------------
[3764]968      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
[2481]969      INTEGER                      , INTENT(inout) ::   ksft
970      INTEGER                      , INTENT(  out) ::   code
971      INTEGER                                      ::   ierr, ji
972      LOGICAL                                      ::   mpi_was_called
[1344]973      !!---------------------------------------------------------------------
974      !
975      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
[532]976      IF ( code /= MPI_SUCCESS ) THEN
[3764]977         DO ji = 1, SIZE(ldtxt)
[2481]978            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
[3764]979         END DO
[2481]980         WRITE(*, cform_err)
981         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
[1344]982         CALL mpi_abort( mpi_comm_world, code, ierr )
[532]983      ENDIF
[1344]984      !
985      IF( .NOT. mpi_was_called ) THEN
986         CALL mpi_init( code )
[9570]987         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
[532]988         IF ( code /= MPI_SUCCESS ) THEN
[3764]989            DO ji = 1, SIZE(ldtxt)
[2481]990               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
991            END DO
992            WRITE(*, cform_err)
993            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
[532]994            CALL mpi_abort( mpi_comm_world, code, ierr )
995         ENDIF
996      ENDIF
[1344]997      !
[897]998      IF( nn_buffer > 0 ) THEN
[2481]999         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
[897]1000         ! Buffer allocation and attachment
[2481]1001         ALLOCATE( tampon(nn_buffer), stat = ierr )
[3764]1002         IF( ierr /= 0 ) THEN
1003            DO ji = 1, SIZE(ldtxt)
[2481]1004               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
1005            END DO
1006            WRITE(*, cform_err)
1007            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
1008            CALL mpi_abort( mpi_comm_world, code, ierr )
1009         END IF
1010         CALL mpi_buffer_attach( tampon, nn_buffer, code )
[897]1011      ENDIF
[1344]1012      !
[9570]1013   END SUBROUTINE mpi_init_oce
[3]1014
[9019]1015
1016   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
[1976]1017      !!---------------------------------------------------------------------
1018      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1019      !!
1020      !!   Modification of original codes written by David H. Bailey
1021      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1022      !!---------------------------------------------------------------------
[9019]1023      INTEGER                     , INTENT(in)    ::   ilen, itype
1024      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1025      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
[1976]1026      !
1027      REAL(wp) :: zerr, zt1, zt2    ! local work variables
[9019]1028      INTEGER  :: ji, ztmp           ! local scalar
1029      !!---------------------------------------------------------------------
1030      !
[1976]1031      ztmp = itype   ! avoid compilation warning
[9019]1032      !
[1976]1033      DO ji=1,ilen
1034      ! Compute ydda + yddb using Knuth's trick.
1035         zt1  = real(ydda(ji)) + real(yddb(ji))
1036         zerr = zt1 - real(ydda(ji))
1037         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1038                + aimag(ydda(ji)) + aimag(yddb(ji))
1039
1040         ! The result is zt1 + zt2, after normalization.
1041         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1042      END DO
[9019]1043      !
[1976]1044   END SUBROUTINE DDPDD_MPI
1045
[6140]1046
[9019]1047   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
[4990]1048      !!---------------------------------------------------------------------
1049      !!                   ***  routine mpp_lbc_north_icb  ***
1050      !!
1051      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1052      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1053      !!              array with outer extra halo
1054      !!
1055      !! ** Method  :   North fold condition and mpp with more than one proc
1056      !!              in i-direction require a specific treatment. We gather
[9019]1057      !!              the 4+kextj northern lines of the global domain on 1
[4990]1058      !!              processor and apply lbc north-fold on this sub array.
1059      !!              Then we scatter the north fold array back to the processors.
[9019]1060      !!              This routine accounts for an extra halo with icebergs
1061      !!              and assumes ghost rows and columns have been suppressed.
[4990]1062      !!
1063      !!----------------------------------------------------------------------
1064      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1065      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1066      !                                                     !   = T ,  U , V , F or W -points
1067      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1068      !!                                                    ! north fold, =  1. otherwise
[9019]1069      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
[6140]1070      !
[4990]1071      INTEGER ::   ji, jj, jr
1072      INTEGER ::   ierr, itaille, ildi, ilei, iilb
[9019]1073      INTEGER ::   ipj, ij, iproc
[4990]1074      !
1075      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1076      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1077      !!----------------------------------------------------------------------
1078      !
[9019]1079      ipj=4
[9467]1080      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1081     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1082     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
[4990]1083      !
[9019]1084      ztab_e(:,:)      = 0._wp
1085      znorthloc_e(:,:) = 0._wp
[6140]1086      !
[9467]1087      ij = 1 - kextj
1088      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1089      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1090         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
[4990]1091         ij = ij + 1
1092      END DO
1093      !
[9467]1094      itaille = jpimax * ( ipj + 2*kextj )
[10314]1095      !
1096      IF( ln_timing ) CALL tic_tac(.TRUE.)
[9467]1097      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1098         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1099         &                ncomm_north, ierr )
[4990]1100      !
[10314]1101      IF( ln_timing ) CALL tic_tac(.FALSE.)
1102      !
[4990]1103      DO jr = 1, ndim_rank_north            ! recover the global north array
1104         iproc = nrank_north(jr) + 1
1105         ildi = nldit (iproc)
1106         ilei = nleit (iproc)
1107         iilb = nimppt(iproc)
[9467]1108         DO jj = 1-kextj, ipj+kextj
[4990]1109            DO ji = ildi, ilei
1110               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1111            END DO
1112         END DO
1113      END DO
1114
1115      ! 2. North-Fold boundary conditions
1116      ! ----------------------------------
[9467]1117      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
[4990]1118
[9467]1119      ij = 1 - kextj
[4990]1120      !! Scatter back to pt2d
[9467]1121      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
[9019]1122         DO ji= 1, jpi
[4990]1123            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1124         END DO
[9467]1125         ij  = ij +1
[4990]1126      END DO
1127      !
1128      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1129      !
1130   END SUBROUTINE mpp_lbc_north_icb
1131
[6140]1132
[10314]1133   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
[4990]1134      !!----------------------------------------------------------------------
1135      !!                  ***  routine mpp_lnk_2d_icb  ***
1136      !!
[9019]1137      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1138      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1139      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
[4990]1140      !!
1141      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1142      !!      between processors following neighboring subdomains.
1143      !!            domain parameters
[9019]1144      !!                    jpi    : first dimension of the local subdomain
1145      !!                    jpj    : second dimension of the local subdomain
1146      !!                    kexti  : number of columns for extra outer halo
1147      !!                    kextj  : number of rows for extra outer halo
[4990]1148      !!                    nbondi : mark for "east-west local boundary"
1149      !!                    nbondj : mark for "north-south local boundary"
1150      !!                    noea   : number for local neighboring processors
1151      !!                    nowe   : number for local neighboring processors
1152      !!                    noso   : number for local neighboring processors
1153      !!                    nono   : number for local neighboring processors
1154      !!----------------------------------------------------------------------
[10314]1155      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
[9019]1156      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1157      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1158      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1159      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1160      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1161      !
[4990]1162      INTEGER  ::   jl   ! dummy loop indices
[9019]1163      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1164      INTEGER  ::   ipreci, iprecj             !   -       -
[4990]1165      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1166      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1167      !!
[9019]1168      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1169      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
[4990]1170      !!----------------------------------------------------------------------
1171
[9019]1172      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1173      iprecj = nn_hls + kextj
[4990]1174
[10314]1175      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
[4990]1176
1177      ! 1. standard boundary treatment
1178      ! ------------------------------
1179      ! Order matters Here !!!!
1180      !
1181      !                                      ! East-West boundaries
1182      !                                           !* Cyclic east-west
[9667]1183      IF( l_Iperio ) THEN
1184         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1185         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
[4990]1186         !
1187      ELSE                                        !* closed
[9667]1188         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1189                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
[4990]1190      ENDIF
[9667]1191      !                                      ! North-South boundaries
1192      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1193         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1194         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1195      ELSE                                        !* closed
1196         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1197                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1198      ENDIF
[4990]1199      !
1200
1201      ! north fold treatment
1202      ! -----------------------
1203      IF( npolj /= 0 ) THEN
1204         !
1205         SELECT CASE ( jpni )
[9019]1206                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1207                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
[4990]1208         END SELECT
1209         !
1210      ENDIF
1211
1212      ! 2. East and west directions exchange
1213      ! ------------------------------------
1214      ! we play with the neigbours AND the row number because of the periodicity
1215      !
1216      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1217      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
[9019]1218         iihom = jpi-nreci-kexti
[4990]1219         DO jl = 1, ipreci
[9019]1220            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
[4990]1221            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1222         END DO
1223      END SELECT
1224      !
1225      !                           ! Migrations
[9019]1226      imigr = ipreci * ( jpj + 2*kextj )
[4990]1227      !
[10314]1228      IF( ln_timing ) CALL tic_tac(.TRUE.)
1229      !
[4990]1230      SELECT CASE ( nbondi )
1231      CASE ( -1 )
[9019]1232         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1233         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
[4990]1234         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1235      CASE ( 0 )
[9019]1236         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1237         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1238         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1239         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
[4990]1240         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1241         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1242      CASE ( 1 )
[9019]1243         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1244         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
[4990]1245         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1246      END SELECT
1247      !
[10314]1248      IF( ln_timing ) CALL tic_tac(.FALSE.)
1249      !
[4990]1250      !                           ! Write Dirichlet lateral conditions
[9019]1251      iihom = jpi - nn_hls
[4990]1252      !
1253      SELECT CASE ( nbondi )
1254      CASE ( -1 )
1255         DO jl = 1, ipreci
1256            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1257         END DO
1258      CASE ( 0 )
1259         DO jl = 1, ipreci
[9019]1260            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1261            pt2d(iihom+jl,:) = r2dew(:,jl,2)
[4990]1262         END DO
1263      CASE ( 1 )
1264         DO jl = 1, ipreci
[9019]1265            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
[4990]1266         END DO
1267      END SELECT
1268
1269
1270      ! 3. North and south directions
1271      ! -----------------------------
1272      ! always closed : we play only with the neigbours
1273      !
1274      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
[9019]1275         ijhom = jpj-nrecj-kextj
[4990]1276         DO jl = 1, iprecj
1277            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
[9019]1278            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
[4990]1279         END DO
1280      ENDIF
1281      !
1282      !                           ! Migrations
[9019]1283      imigr = iprecj * ( jpi + 2*kexti )
[4990]1284      !
[10314]1285      IF( ln_timing ) CALL tic_tac(.TRUE.)
1286      !
[4990]1287      SELECT CASE ( nbondj )
1288      CASE ( -1 )
[9019]1289         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1290         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
[4990]1291         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1292      CASE ( 0 )
[9019]1293         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1294         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1295         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1296         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
[4990]1297         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1298         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1299      CASE ( 1 )
[9019]1300         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1301         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
[4990]1302         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1303      END SELECT
1304      !
[10314]1305      IF( ln_timing ) CALL tic_tac(.FALSE.)
1306      !
[4990]1307      !                           ! Write Dirichlet lateral conditions
[9019]1308      ijhom = jpj - nn_hls
[4990]1309      !
1310      SELECT CASE ( nbondj )
1311      CASE ( -1 )
1312         DO jl = 1, iprecj
1313            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1314         END DO
1315      CASE ( 0 )
1316         DO jl = 1, iprecj
[9019]1317            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1318            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
[4990]1319         END DO
1320      CASE ( 1 )
1321         DO jl = 1, iprecj
[9019]1322            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
[4990]1323         END DO
1324      END SELECT
[9019]1325      !
[4990]1326   END SUBROUTINE mpp_lnk_2d_icb
[10172]1327
[10314]1328
1329   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb )
1330      !!----------------------------------------------------------------------
1331      !!                  ***  routine mpp_report  ***
1332      !!
1333      !! ** Purpose :   report use of mpp routines per time-setp
1334      !!
1335      !!----------------------------------------------------------------------
1336      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1337      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1338      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb
1339      !!
1340      LOGICAL ::   ll_lbc, ll_glb
1341      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1342      !!----------------------------------------------------------------------
1343      !
1344      ll_lbc = .FALSE.
1345      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1346      ll_glb = .FALSE.
1347      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1348      !
1349      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1350      ncom_freq = ncom_fsbc * ncom_dttrc
1351      IF( MOD( MAX(ncom_fsbc,ncom_dttrc), MIN(ncom_fsbc,ncom_dttrc) ) == 0 ) ncom_freq = MAX(ncom_fsbc,ncom_dttrc)
1352      !
1353      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1354         IF( ll_lbc ) THEN
1355            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1356            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1357            n_sequence_lbc = n_sequence_lbc + 1
1358            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lnk_generic, increase ncom_rec_max' )   ! deadlock
1359            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1360            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1361            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1362         ENDIF
1363         IF( ll_glb ) THEN
1364            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1365            n_sequence_glb = n_sequence_glb + 1
1366            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lnk_generic, increase ncom_rec_max' )   ! deadlock
1367            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1368         ENDIF
1369      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1370         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1371         WRITE(numcom,*) ' '
1372         WRITE(numcom,*) ' ------------------------------------------------------------'
1373         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1374         WRITE(numcom,*) ' ------------------------------------------------------------'
1375         WRITE(numcom,*) ' '
1376         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1377         jj = 0; jk = 0; jf = 0; jh = 0
1378         DO ji = 1, n_sequence_lbc
1379            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1380            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1381            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1382            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1383         END DO
1384         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1385         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1386         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1387         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1388         WRITE(numcom,*) ' '
1389         WRITE(numcom,*) ' lbc_lnk called'
1390         jj = 1
1391         DO ji = 2, n_sequence_lbc
1392            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1393               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1394               jj = 0
1395            END IF
1396            jj = jj + 1 
1397         END DO
1398         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1399         WRITE(numcom,*) ' '
1400         IF ( n_sequence_glb > 0 ) THEN
1401            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1402            jj = 1
1403            DO ji = 2, n_sequence_glb
1404               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1405                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1406                  jj = 0
1407               END IF
1408               jj = jj + 1 
1409            END DO
1410            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1411            DEALLOCATE(crname_glb)
1412         ELSE
1413            WRITE(numcom,*) ' No MPI global communication '
1414         ENDIF
1415         WRITE(numcom,*) ' '
1416         WRITE(numcom,*) ' -----------------------------------------------'
1417         WRITE(numcom,*) ' '
1418         DEALLOCATE(ncomm_sequence)
1419         DEALLOCATE(crname_lbc)
1420      ENDIF
1421   END SUBROUTINE mpp_report
1422
[6140]1423   
[10300]1424   SUBROUTINE tic_tac (ld_tic, ld_global)
[10172]1425
[10300]1426    LOGICAL,           INTENT(IN) :: ld_tic
1427    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1428    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1429    REAL(wp),               SAVE :: tic_ct = 0._wp
1430    INTEGER :: ii
[10172]1431
1432    IF( ncom_stp <= nit000 ) RETURN
1433    IF( ncom_stp == nitend ) RETURN
[10300]1434    ii = 1
1435    IF( PRESENT( ld_global ) ) THEN
1436       IF( ld_global ) ii = 2
1437    END IF
[10172]1438   
[10300]1439    IF ( ld_tic ) THEN
1440       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
[10172]1441       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1442    ELSE
[10300]1443       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
[10172]1444       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1445    ENDIF
1446   
1447   END SUBROUTINE tic_tac
1448
1449   
[13]1450#else
1451   !!----------------------------------------------------------------------
1452   !!   Default case:            Dummy module        share memory computing
1453   !!----------------------------------------------------------------------
[2715]1454   USE in_out_manager
[1976]1455
[13]1456   INTERFACE mpp_sum
[10314]1457      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
[13]1458   END INTERFACE
1459   INTERFACE mpp_max
[681]1460      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
[13]1461   END INTERFACE
1462   INTERFACE mpp_min
1463      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1464   END INTERFACE
[1344]1465   INTERFACE mpp_minloc
1466      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1467   END INTERFACE
1468   INTERFACE mpp_maxloc
1469      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1470   END INTERFACE
[3]1471
[13]1472   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
[4147]1473   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
[9570]1474   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
[2715]1475   !!----------------------------------------------------------------------
[13]1476CONTAINS
[3]1477
[2715]1478   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1479      INTEGER, INTENT(in) ::   kumout
1480      lib_mpp_alloc = 0
1481   END FUNCTION lib_mpp_alloc
1482
[5407]1483   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
[1579]1484      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
[3764]1485      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
[5407]1486      CHARACTER(len=*) ::   ldname
[4314]1487      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
[9570]1488      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
[5412]1489      function_value = 0
[1579]1490      IF( .FALSE. )   ldtxt(:) = 'never done'
[5407]1491      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
[13]1492   END FUNCTION mynode
[3]1493
[13]1494   SUBROUTINE mppsync                       ! Dummy routine
1495   END SUBROUTINE mppsync
[3]1496
[10314]1497   !!----------------------------------------------------------------------
1498   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1499   !!   
1500   !!----------------------------------------------------------------------
1501   !!
1502#  define OPERATION_MAX
1503#  define INTEGER_TYPE
1504#  define DIM_0d
1505#     define ROUTINE_ALLREDUCE           mppmax_int
1506#     include "mpp_allreduce_generic.h90"
1507#     undef ROUTINE_ALLREDUCE
1508#  undef DIM_0d
1509#  define DIM_1d
1510#     define ROUTINE_ALLREDUCE           mppmax_a_int
1511#     include "mpp_allreduce_generic.h90"
1512#     undef ROUTINE_ALLREDUCE
1513#  undef DIM_1d
1514#  undef INTEGER_TYPE
1515!
1516#  define REAL_TYPE
1517#  define DIM_0d
1518#     define ROUTINE_ALLREDUCE           mppmax_real
1519#     include "mpp_allreduce_generic.h90"
1520#     undef ROUTINE_ALLREDUCE
1521#  undef DIM_0d
1522#  define DIM_1d
1523#     define ROUTINE_ALLREDUCE           mppmax_a_real
1524#     include "mpp_allreduce_generic.h90"
1525#     undef ROUTINE_ALLREDUCE
1526#  undef DIM_1d
1527#  undef REAL_TYPE
1528#  undef OPERATION_MAX
1529   !!----------------------------------------------------------------------
1530   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1531   !!   
1532   !!----------------------------------------------------------------------
1533   !!
1534#  define OPERATION_MIN
1535#  define INTEGER_TYPE
1536#  define DIM_0d
1537#     define ROUTINE_ALLREDUCE           mppmin_int
1538#     include "mpp_allreduce_generic.h90"
1539#     undef ROUTINE_ALLREDUCE
1540#  undef DIM_0d
1541#  define DIM_1d
1542#     define ROUTINE_ALLREDUCE           mppmin_a_int
1543#     include "mpp_allreduce_generic.h90"
1544#     undef ROUTINE_ALLREDUCE
1545#  undef DIM_1d
1546#  undef INTEGER_TYPE
1547!
1548#  define REAL_TYPE
1549#  define DIM_0d
1550#     define ROUTINE_ALLREDUCE           mppmin_real
1551#     include "mpp_allreduce_generic.h90"
1552#     undef ROUTINE_ALLREDUCE
1553#  undef DIM_0d
1554#  define DIM_1d
1555#     define ROUTINE_ALLREDUCE           mppmin_a_real
1556#     include "mpp_allreduce_generic.h90"
1557#     undef ROUTINE_ALLREDUCE
1558#  undef DIM_1d
1559#  undef REAL_TYPE
1560#  undef OPERATION_MIN
[3]1561
[10314]1562   !!----------------------------------------------------------------------
1563   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1564   !!   
1565   !!   Global sum of 1D array or a variable (integer, real or complex)
1566   !!----------------------------------------------------------------------
1567   !!
1568#  define OPERATION_SUM
1569#  define INTEGER_TYPE
1570#  define DIM_0d
1571#     define ROUTINE_ALLREDUCE           mppsum_int
1572#     include "mpp_allreduce_generic.h90"
1573#     undef ROUTINE_ALLREDUCE
1574#  undef DIM_0d
1575#  define DIM_1d
1576#     define ROUTINE_ALLREDUCE           mppsum_a_int
1577#     include "mpp_allreduce_generic.h90"
1578#     undef ROUTINE_ALLREDUCE
1579#  undef DIM_1d
1580#  undef INTEGER_TYPE
1581!
1582#  define REAL_TYPE
1583#  define DIM_0d
1584#     define ROUTINE_ALLREDUCE           mppsum_real
1585#     include "mpp_allreduce_generic.h90"
1586#     undef ROUTINE_ALLREDUCE
1587#  undef DIM_0d
1588#  define DIM_1d
1589#     define ROUTINE_ALLREDUCE           mppsum_a_real
1590#     include "mpp_allreduce_generic.h90"
1591#     undef ROUTINE_ALLREDUCE
1592#  undef DIM_1d
1593#  undef REAL_TYPE
1594#  undef OPERATION_SUM
[3]1595
[10314]1596#  define OPERATION_SUM_DD
1597#  define COMPLEX_TYPE
1598#  define DIM_0d
1599#     define ROUTINE_ALLREDUCE           mppsum_realdd
1600#     include "mpp_allreduce_generic.h90"
1601#     undef ROUTINE_ALLREDUCE
1602#  undef DIM_0d
1603#  define DIM_1d
1604#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1605#     include "mpp_allreduce_generic.h90"
1606#     undef ROUTINE_ALLREDUCE
1607#  undef DIM_1d
1608#  undef COMPLEX_TYPE
1609#  undef OPERATION_SUM_DD
[3]1610
[10314]1611   !!----------------------------------------------------------------------
1612   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1613   !!   
1614   !!----------------------------------------------------------------------
1615   !!
1616#  define OPERATION_MINLOC
1617#  define DIM_2d
1618#     define ROUTINE_LOC           mpp_minloc2d
1619#     include "mpp_loc_generic.h90"
1620#     undef ROUTINE_LOC
1621#  undef DIM_2d
1622#  define DIM_3d
1623#     define ROUTINE_LOC           mpp_minloc3d
1624#     include "mpp_loc_generic.h90"
1625#     undef ROUTINE_LOC
1626#  undef DIM_3d
1627#  undef OPERATION_MINLOC
[2480]1628
[10314]1629#  define OPERATION_MAXLOC
1630#  define DIM_2d
1631#     define ROUTINE_LOC           mpp_maxloc2d
1632#     include "mpp_loc_generic.h90"
1633#     undef ROUTINE_LOC
1634#  undef DIM_2d
1635#  define DIM_3d
1636#     define ROUTINE_LOC           mpp_maxloc3d
1637#     include "mpp_loc_generic.h90"
1638#     undef ROUTINE_LOC
1639#  undef DIM_3d
1640#  undef OPERATION_MAXLOC
[13]1641
[10292]1642   SUBROUTINE mpp_ilor( ld_switch, ldlast, kcom )
1643      LOGICAL, INTENT(in   ), DIMENSION(2) ::   ld_switch
1644      LOGICAL, INTENT(in   ), OPTIONAL     ::   ldlast
1645      INTEGER, INTENT(in   ), OPTIONAL     ::   kcom    ! ???
1646      WRITE(*,*) 'mpp_ilor: You should not have seen this print! error?', ld_switch
1647   END SUBROUTINE mpp_ilor
1648
[10358]1649   SUBROUTINE mppstop( ldfinal, ld_force_abort )
1650      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
1651      LOGICAL, OPTIONAL, INTENT(in) :: ld_force_abort    ! source process number
[3799]1652      STOP      ! non MPP case, just stop the run
[51]1653   END SUBROUTINE mppstop
1654
[2715]1655   SUBROUTINE mpp_ini_znl( knum )
1656      INTEGER :: knum
1657      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
[1345]1658   END SUBROUTINE mpp_ini_znl
1659
[1344]1660   SUBROUTINE mpp_comm_free( kcom )
[869]1661      INTEGER :: kcom
[1344]1662      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
[869]1663   END SUBROUTINE mpp_comm_free
[9019]1664   
[3]1665#endif
[2715]1666
[13]1667   !!----------------------------------------------------------------------
[4147]1668   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
[2715]1669   !!----------------------------------------------------------------------
1670
1671   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1672      &                 cd6, cd7, cd8, cd9, cd10 )
1673      !!----------------------------------------------------------------------
1674      !!                  ***  ROUTINE  stop_opa  ***
1675      !!
[3764]1676      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1677      !!                increment the error number (nstop) by one.
1678      !!----------------------------------------------------------------------
1679      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1680      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1681      !!----------------------------------------------------------------------
1682      !
[3764]1683      nstop = nstop + 1
[2715]1684      IF(lwp) THEN
1685         WRITE(numout,cform_err)
[10330]1686         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1687         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1688         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1689         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1690         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1691         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1692         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1693         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1694         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1695         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
[2715]1696      ENDIF
1697                               CALL FLUSH(numout    )
1698      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
[9019]1699      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
[2715]1700      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1701      !
1702      IF( cd1 == 'STOP' ) THEN
1703         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1704         CALL mppstop()
1705      ENDIF
1706      !
1707   END SUBROUTINE ctl_stop
1708
1709
1710   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1711      &                 cd6, cd7, cd8, cd9, cd10 )
1712      !!----------------------------------------------------------------------
1713      !!                  ***  ROUTINE  stop_warn  ***
1714      !!
[3764]1715      !! ** Purpose :   print in ocean.outpput file a error message and
[2715]1716      !!                increment the warning number (nwarn) by one.
1717      !!----------------------------------------------------------------------
1718      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1719      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1720      !!----------------------------------------------------------------------
[3764]1721      !
1722      nwarn = nwarn + 1
[2715]1723      IF(lwp) THEN
1724         WRITE(numout,cform_war)
[10330]1725         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1726         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1727         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1728         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1729         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1730         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1731         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1732         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1733         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1734         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
[2715]1735      ENDIF
1736      CALL FLUSH(numout)
1737      !
1738   END SUBROUTINE ctl_warn
1739
1740
1741   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1742      !!----------------------------------------------------------------------
1743      !!                  ***  ROUTINE ctl_opn  ***
1744      !!
1745      !! ** Purpose :   Open file and check if required file is available.
1746      !!
1747      !! ** Method  :   Fortan open
1748      !!----------------------------------------------------------------------
1749      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1750      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1751      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1752      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1753      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1754      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1755      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1756      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1757      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
[5836]1758      !
[2715]1759      CHARACTER(len=80) ::   clfile
1760      INTEGER           ::   iost
1761      !!----------------------------------------------------------------------
[5836]1762      !
[2715]1763      ! adapt filename
1764      ! ----------------
1765      clfile = TRIM(cdfile)
1766      IF( PRESENT( karea ) ) THEN
1767         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1768      ENDIF
1769#if defined key_agrif
1770      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1771      knum=Agrif_Get_Unit()
1772#else
1773      knum=get_unit()
1774#endif
[10330]1775      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
[5836]1776      !
[2715]1777      iost=0
1778      IF( cdacce(1:6) == 'DIRECT' )  THEN
[10358]1779         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh         , ERR=100, IOSTAT=iost )
1780      ELSE IF( cdstat(1:6) == 'APPEND' )  THEN
1781         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS='UNKNOWN', POSITION='APPEND', ERR=100, IOSTAT=iost )
[2715]1782      ELSE
[10358]1783         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )
[2715]1784      ENDIF
[10358]1785      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &   ! for windows
1786         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat                      , ERR=100, IOSTAT=iost )   
[2715]1787      IF( iost == 0 ) THEN
1788         IF(ldwp) THEN
[10358]1789            WRITE(kout,*) '     file   : ', TRIM(clfile),' open ok'
[2715]1790            WRITE(kout,*) '     unit   = ', knum
1791            WRITE(kout,*) '     status = ', cdstat
1792            WRITE(kout,*) '     form   = ', cdform
1793            WRITE(kout,*) '     access = ', cdacce
1794            WRITE(kout,*)
1795         ENDIF
1796      ENDIF
1797100   CONTINUE
1798      IF( iost /= 0 ) THEN
1799         IF(ldwp) THEN
1800            WRITE(kout,*)
[10358]1801            WRITE(kout,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
[2715]1802            WRITE(kout,*) ' =======   ===  '
1803            WRITE(kout,*) '           unit   = ', knum
1804            WRITE(kout,*) '           status = ', cdstat
1805            WRITE(kout,*) '           form   = ', cdform
1806            WRITE(kout,*) '           access = ', cdacce
1807            WRITE(kout,*) '           iostat = ', iost
1808            WRITE(kout,*) '           we stop. verify the file '
1809            WRITE(kout,*)
[9438]1810         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1811            WRITE(*,*)
[10358]1812            WRITE(*,*) ' ===>>>> : bad opening file: ', TRIM(clfile)
[9438]1813            WRITE(*,*) ' =======   ===  '
1814            WRITE(*,*) '           unit   = ', knum
1815            WRITE(*,*) '           status = ', cdstat
1816            WRITE(*,*) '           form   = ', cdform
1817            WRITE(*,*) '           access = ', cdacce
1818            WRITE(*,*) '           iostat = ', iost
1819            WRITE(*,*) '           we stop. verify the file '
1820            WRITE(*,*)
[2715]1821         ENDIF
[9019]1822         CALL FLUSH( kout ) 
[2715]1823         STOP 'ctl_opn bad opening'
1824      ENDIF
[5836]1825      !
[2715]1826   END SUBROUTINE ctl_opn
1827
[5836]1828
[4147]1829   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1830      !!----------------------------------------------------------------------
1831      !!                  ***  ROUTINE ctl_nam  ***
1832      !!
1833      !! ** Purpose :   Informations when error while reading a namelist
1834      !!
1835      !! ** Method  :   Fortan open
1836      !!----------------------------------------------------------------------
[5836]1837      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1838      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
[7646]1839      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
[5836]1840      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
[4147]1841      !!----------------------------------------------------------------------
[5836]1842      !
[7646]1843      WRITE (clios, '(I5.0)')   kios
[4147]1844      IF( kios < 0 ) THEN         
[5836]1845         CALL ctl_warn( 'end of record or file while reading namelist '   &
1846            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1847      ENDIF
[5836]1848      !
[4147]1849      IF( kios > 0 ) THEN
[5836]1850         CALL ctl_stop( 'misspelled variable in namelist '   &
1851            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
[4147]1852      ENDIF
1853      kios = 0
1854      RETURN
[5836]1855      !
[4147]1856   END SUBROUTINE ctl_nam
1857
[5836]1858
[2715]1859   INTEGER FUNCTION get_unit()
1860      !!----------------------------------------------------------------------
1861      !!                  ***  FUNCTION  get_unit  ***
1862      !!
1863      !! ** Purpose :   return the index of an unused logical unit
1864      !!----------------------------------------------------------------------
[3764]1865      LOGICAL :: llopn
[2715]1866      !!----------------------------------------------------------------------
1867      !
1868      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1869      llopn = .TRUE.
1870      DO WHILE( (get_unit < 998) .AND. llopn )
1871         get_unit = get_unit + 1
1872         INQUIRE( unit = get_unit, opened = llopn )
1873      END DO
1874      IF( (get_unit == 999) .AND. llopn ) THEN
1875         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
1876         get_unit = -1
1877      ENDIF
1878      !
1879   END FUNCTION get_unit
1880
1881   !!----------------------------------------------------------------------
[3]1882END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.