# Clear workspace
rm(list = ls())

# Load packages
suppressPackageStartupMessages({
  library("Matrix")
  library("MASS")
  library("OpenMx")
  library("metaSEM")
  library("here")
  library("parallel")
  library("pbapply")
})

1 Simulation functions

The function sim_wrapper() simulates a single meta-analysis given the set of provided arguments. All required functions for the simulation are specified as internal functions within sim_wrapper() and are indicated by a leading “.”.

The entire simulation of multiple meta-analyses (i.e., for multiple replications) given a set of arguments is invoked by sim_run() and is run in parallel on multiple cores.

The SEM model specifies either a simple mediation model with a single mediator (M), a multiple mediator model with two parallel mediators (M, W), or a sequential mediator model with two successive mediators (M, W). All models include a single independent variable (X) and a single outcome (Y).

#
# Simulate a single meta-analysis
#
# @param rid       A numeric identifier for the replication
# @param MONX      A standardized regression weight of M on X
# @param YONX      A standardized regression weight of Y on X
# @param YONM      A standardized regression weight of Y on M (optional)
# @param WONX      A standardized regression weight of W on X (optional)
# @param YONW      A standardized regression weight of Y on W (optional)
# @param WONM      A standardized regression weight of W on M (optional)
# @param rxx       A vector with a minimum and maximum reliability 
#                    for X (optional)
# @param rmm       A vector with a minimum and maximum reliability
#                    for M (optional)
# @param rww       A vector with a minimum and maximum reliability
#                    for W (optional)
# @param ryy       A vector with a minimum and maximum reliability
#                    for Y (optional)
# @param tau       A random effect (SD) for the population 
#                    correlations (optional)
#                    Note: The same random effect is used for
#                          all correlations. Covariances between
#                          random effects are set to 0.
# @param k         A vector with a minimum and maximum number of samples 
#                    in the meta-analysis (optional)
# @param n         A vector with a minimum and maximum sample size 
#                    for each sample in the meta-analysis (optional)
# @param tau_max   Maximum random effect (SD) (optional)
# @return          A vector with the simulated SEM parameters,
#                    error codes, and summaries of the used
#                    settings
# @require         MASS, Matrix, OpenMx, metaSEM
#
# If optional parameters are not set, they are randomly drawn from
# from reasonable distributions.
#
sim_wrapper <- function(rid, MONX, YONX, YONM = NULL, 
                        WONX = NULL, YONW = NULL, WONM = NULL,
                        rxx = NULL, rmm = NULL, rww = NULL, ryy = NULL,
                        tau = NULL, k = NULL, n = NULL,
                        tau_max = .50) {

  #
  # Generate the implied correlation matrix from 
  #   standardized regression weights of a mediation model
  #
  # Assumes a simple mediation model, unless WONX, YONW, or 
  #   YONW are specified. If WONX and YONW are given, a two 
  #   variable parallel mediation is assumed. If YONW and WONM 
  #   are specified, a two variable sequential mediation is assumed 
  #
  # @param MONX     A standardized regression weight of M on X
  # @param YONX     A standardized regression weight of Y on X
  # @param YONM     A standardized regression weight of Y on M (optional)
  # @param WONX     A standardized regression weight of W on X (optional)
  # @param YONW     A standardized regression weight of Y on W (optional)
  # @param WONM     A standardized regression weight of W on M (optional)
  # @return         A 3x3 or 4x4 correlation matrix
  # @require        metaSEM
  #
  .beta_to_cor <- function(MONX, YONX, YONM = NULL, 
                           WONX = NULL, YONW = NULL, 
                           WONM = NULL) {
    
    # Single mediator model
    if (is.null(WONX) & is.null(YONW) & is.null(WONM) & 
        !is.null(YONM)) {
      
      Amat <- matrix(c(0,    0,    0, 
                       MONX, 0,    0, 
                       YONX, YONM, 0), 3, 3, byrow = TRUE) 
      colnames(Amat) <- rownames(Amat) <- c("X", "M", "Y")
      Smat <- matrix(c(1, 0, 0,
                       0, 1 - MONX^2, 0,
                       0, 0, 1 - YONM^2 - YONX^2 - 2 * MONX * YONM * YONX), 
                     3, 3, byrow = TRUE) 
      Imat <- diag(3)
      
    # Two parallel mediators model
    } else if (!is.null(WONX) & !is.null(YONM) & !is.null(YONW) & 
               is.null(WONM)) {
      
      Amat <- matrix(c(0,    0,    0,    0, 
                       MONX, 0,    0,    0, 
                       WONX, 0,    0,    0,
                       YONX, YONM, YONW, 0), 4, 4, byrow = TRUE) 
      colnames(Amat) <- rownames(Amat) <- c("X", "M", "W", "Y")
      Smat <- matrix(c(1, 0, 0, 0,
                       0, 1 - MONX^2, 0, 0,
                       0, 0, 1 - WONX^2, 0,
                       0, 0, 0, 1 - YONM^2 - YONW^2 - YONX^2 - 
                                  2 * MONX * YONM * YONX - 
                                  2 * WONX * YONW * YONX - 
                                  2 * MONX * WONX * YONM * YONW), 
                     4, 4, byrow = TRUE) 
      Imat <- diag(4)
      
    # Two sequential mediators model
    } else if (is.null(WONX) & is.null(YONM) & 
               !is.null(YONW) & !is.null(WONM)) {
      
      Amat <- matrix(c(0,    0,    0,    0, 
                       MONX, 0,    0,    0, 
                       0,    WONM, 0,    0,
                       YONX, 0,    YONW, 0), 4, 4, byrow = TRUE) 
      colnames(Amat) <- rownames(Amat) <- c("X", "M", "W", "Y")
      Smat <- matrix(c(1, 0, 0, 0,
                       0, 1 - MONX^2, 0, 0,
                       0, 0, 1 - WONM^2, 0,
                       0, 0, 0, 1 - YONW^2 - YONX^2 - 
                         2 * MONX * WONM * YONW * YONX), 
                     4, 4, byrow = TRUE) 
      Imat <- diag(4)
      
    } else {
      
      stop("Please specify a valid set of arguments!")
      
    }
    
    rmat <- solve(Imat - Amat) %*% Smat %*% t(solve(Imat - Amat))
    if (!metaSEM::is.pd(rmat))
      stop("Correlation matrix is not positive definite. Check your arguments.")
    
    return(rmat)
    
  }

  
  #
  # Generate random heterogeneity estimates (SD)
  #
  # @param tau      A fixed heterogeneity estimated (optional)
  # @param times    The number of heterogeneity estimates (optional)
  # @param tau_max  A maximum heterogeneity estimate (optional)
  # @return         A numeric vector
  #
  .get_tau <- function(tau = NULL, times = 1, tau_max = .50) {
    
    if (!is.null(tau)) return(rep(tau[1], times))
    tau <- abs(rnorm(times, 0, .20))
    repeat {
      tau[tau > tau_max] <- tau[tau > tau_max] / 2
      if (all(tau <= tau_max)) break
    }
    return(round(tau, 2))
    
  }
  
  
  #
  # Generate random sample sizes
  #
  # @param n      A vector with a minimum and maximum sample size (optional)
  # @param times  The number of sample sizes (optional)
  # @return       A numeric vector
  #
  .get_n <- function(n = NULL, times = 1) {
    
    if (is.null(n)) n <- c(50, 900)
    if (length(n) == 1) n <- c(n, n)
    n <- round(n)
    ns <- rgamma(times, 1.3, 1 / 250) + n[1]
    repeat {
      ns[ns > n[2]] <- round(ns[ns > n[2]] / 2)
      ns[ns < n[1]] <- n[1]
      if (all(ns <= n[2])) break
    }
    return(round(ns))
    
  }
  
  
  #
  # Generate random numbers of samples
  #
  # @param k      A vector with a minimum and maximum number of samples (optional)
  # @param times  The number of number of samples (optional)
  # @return       A numeric vector
  #
  .get_k <- function(k = NULL, times = 1) {
    
    if (is.null(k)) k <- c(5, 50)
    if (length(k) == 1) k <- c(k, k)
    k <- round(k)
    return(round(runif(times, k[1], k[2])))

  }
  
  
  #
  # Generate random reliability estimates
  #
  # @param rxx       A vector with a minimum and maximum reliability (optional)
  # @param times     The number of reliabilities (optional)
  # @return          A numeric vector
  #
  .get_rel <- function(rxx = NULL, times = 1) {
    
    if (is.null(rxx)) rxx <- c(.65, .95)
    if (length(rxx) == 1) rxx <- c(rxx, rxx)
    return(round(runif(times, rxx[1], rxx[2]), 2))
    
  }
  
  
  #
  # Generate a RAM specification for a mediation model
  #   for metaSEM::tssem2() and metaSEM::osmasem()
  #
  # @param type   Specify type of mediation model:
  #                 - "simple" for simple mediation model
  #                 - "parallel" for parallel mediation model
  #                 - "sequential" for sequential mediation model
  # @return       A list with the RAM specification
  # @require      metaSEM
  #
  .get_model <- function(type = "simple") {
    
    if (type == "sequential") {
      
      return(metaSEM::lavaan2RAM(
        "M ~ X; W ~ M; Y ~ W + X; X ~~ 1*X", 
        obs.variables = c("X", "M", "W", "Y")
      ))
      
    } else if (type == "parallel") {
        
        return(metaSEM::lavaan2RAM(
          "M ~ X; W ~ X; Y ~ M + W + X; X ~~ 1*X", 
          obs.variables = c("X", "M", "W", "Y")
        ))
        
    } else if (type == "simple") {
    
      return(metaSEM::lavaan2RAM(
        "M ~ X; Y ~ M + X; X ~~ 1*X", 
        obs.variables = c("X", "M", "Y")
      ))
      
    } else {
      
      stop("Unknown model!")
      
    }
    
  }
  
  
  # 
  # Define OpenMx matrices for indirect effects
  #
  # @param model   A list with the RAM specification of the mediation model 
  #                  as returned by .get_model()
  # @return        A list with OpenMx matrices
  # @require       OpenMx
  #
  .get_INDmatrix <- function(model) {
    
    # Simple mediation
    if (ncol(model$A) == 3) {
      
      out <- list(
        IND = OpenMx::mxAlgebra(Amatrix[2, 1] * Amatrix[3, 2], name = "IND")
      )
      
    # Sequential mediation
    } else if (ncol(model$A) == 4 & model$A["W", "X"] == "0") { 
      
      out <- list(
        IND = OpenMx::mxAlgebra(Amatrix[2, 1] * Amatrix[3, 2] * Amatrix[4, 3], 
                                name = "IND")
      )
      
    # Parallel mediation
    } else {
      
      out <- list(
        IND1 = OpenMx::mxAlgebra(Amatrix[2, 1] * Amatrix[4, 2],
                                 name = "IND1"),
        IND2 = OpenMx::mxAlgebra(Amatrix[3, 1] * Amatrix[4, 3], 
                                 name = "IND2"),
        IND = OpenMx::mxAlgebra(Amatrix[2, 1] * Amatrix[4, 2] + 
                                  Amatrix[3, 1] * Amatrix[4, 3], 
                                name = "IND")
      )
      
    }

    return(out)
  }


  #
  # Draw a random population correlation matrix
  #  given an average population correlation matrix 
  #  and a random effect
  #
  #  Note: A non-positive definite correlation
  #        matrix is discarded and replaced by 
  #        a valid correlation matrix.
  #
  # @param avg_mat   The average population correlation 
  #                    matrix
  # @param tau       The random effect (SD)
  # @return          The population correlation matrix
  # @require         MASS, metaSEM, OpenMx
  #
  .sim_pop_mat <- function(avg_mat, tau) {
    
    p <- (nrow(avg_mat)^2 - nrow(avg_mat)) / 2
    Sigma <- diag(tau^2, p, p)
    mu <- OpenMx::vechs(avg_mat)
    repeat {
      rho_vec <- MASS::mvrnorm(1, mu = mu, Sigma = Sigma)
      rho_mat <- metaSEM::vec2symMat(rho_vec, diag = FALSE)
      if (metaSEM::is.pd(rho_mat)) break # & all(abs(rho_vec) < .95)
    }
    return(rho_mat)
    
  }

  
  #
  # (Dis)Attentuate correlations / (co)variances
  #
  # @param r           A matrix of correlations / (co)variances
  # @param rxx         A vector of reliability estimates for X
  # @param rmm         A vector of reliability estimates for M
  # @param rww         A vector of reliability estimates for W (optional)
  # @param ryy         A vector of reliability estimates for Y
  # @param adj         A list with logical indicators for each variable
  #                      showing whether to adjust it for unreliability (optional)
  # @param invert      A logical value indicating whether to
  #                      disattenuate (FALSE) or attenuate (TRUE) 
  #                      the correlations
  # @param return_att  Return the attenuation factors instead of the 
  #                      correlations
  # @param vcov        A logical indicating whether to disattenuate the 
  #                      correlations (FALSE) or their (co)variances (TRUE)
  # @return            A matrix of (dis)attenuated correlations / (co)variances
  #
  .disattenuate <- function(x, rxx, rmm, ryy, rww = NULL, 
                            adj = NULL, invert = FALSE,
                            return_att = FALSE, vcov = FALSE) {
    
    # Determine which correlations to adjust for unreliability
    if (is.null(adj) | !is.list(adj)) adj <- list()
    for (i in c("rxx", "rmm", "rww", "ryy")) {
      if (!(i %in% names(adj))) adj[[i]] <- TRUE
    }
    
    # Correlations / (co)variances
    if (!is.matrix(x)) x <- rbind(x)
    
    # Reliabilities
    if ((!vcov && ncol(x) == 3) | (vcov && ncol(x) == 6)) {
      rel_vecs <- cbind(rxx, rmm, ryy)
    } else {
      rel_vecs <- cbind(rxx, rmm, rww, ryy)
    }
    if (!adj$rxx) rel_vecs[, "rxx"] <- 1
    if (!adj$rmm) rel_vecs[, "rmm"] <- 1
    if (!adj$rww & ncol(rel_vecs) == 4) rel_vecs[, "rww"] <- 1
    if (!adj$ryy) rel_vecs[, "ryy"] <- 1
    
    # Adjustment factors
    adj_vecs <- t(apply(rel_vecs, 1, \(y) { 
      a <- OpenMx::vechs(cbind(y^.5) %*% y^.5)
      if (vcov) a <- OpenMx::vech(cbind(a) %*% a)
      a
    }))
    if (invert) adj_vecs <- 1 / adj_vecs # attenuate correlations
    
    # Adjust correlations
    if (!return_att) {
      
      out <- t(apply(x / adj_vecs, 1, \(y) {
        y <- metaSEM::vec2symMat(y, diag = vcov)
        if (!metaSEM::is.pd(y)) y <- Matrix::nearPD(y, corr = !vcov)$mat
        if (vcov) {
          OpenMx::vech(y)
        } else {
          OpenMx::vechs(y)
        }
      }))
      dimnames(out) <- dimnames(x)
      
    # Return attenuation factor
    } else {
      out <- adj_vecs
    }
    
    return(out)
    
  } 
  
  .attenuate <- function(...) { .disattenuate(invert = TRUE, ...)  }
  
  
  #
  # Draw a random sample correlation matrix
  #   with attenuation given an average
  #   population correlation matrix and a 
  #   random effect
  #
  #  Note: A non-positive definite correlation
  #        matrix is discarded and replaced by 
  #        a valid correlation matrix.
  #
  # @param avg_mat     The average population correlation matrix
  # @param tau         A random effect (SD)
  # @param n           A vector with a minimum and maximum sample size (optional)
  # @param rxx         A vector with a minimum and maximum reliability 
  #                      for X (optional)
  # @param rmm         A vector with a minimum and maximum reliability
  #                      for M (optional)
  # @param rww         A vector with a minimum and maximum reliability
  #                      for W (optional)
  # @param ryy         A vector with a minimum and maximum reliability
  #                      for Y (optional)
  # @return            A numeric vector with the lower half of
  #                      the sample correlation matrix, 
  #                      reliabilities, and a sample size
  # @require           MASS, metaSEM, OpenMx
  #
  .sim_r <- function(avg_mat, tau, n = NULL, 
                     rxx = NULL, rmm = NULL,
                     rww = NULL, ryy = NULL) {
    
    # Draw population correlation matrix for present sample
    rho_mat <- .sim_pop_mat(avg_mat, tau)
    
    # Get sample size
    n <- .get_n(n)
    
    # Get reliabilities
    rxx <- .get_rel(rxx)
    rmm <- .get_rel(rmm)
    rww <- .get_rel(rww)
    ryy <- .get_rel(ryy)
    
    # Repeat draw for non-positive definite matrices
    iter <- 0
    repeat {
      
      # Correlation for random sample
      resp <- MASS::mvrnorm(n, mu = rep(0, nrow(rho_mat)), Sigma = rho_mat)
      r_vec <- OpenMx::vechs(cor(resp))
      if (nrow(rho_mat) == 3) {
        names(r_vec) <- paste0("r_", c("XM", "XY", "MY"))
      } else if (nrow(rho_mat) == 4) {
        names(r_vec) <- paste0("r_", c("XM", "XW", "XY", 
                                       "MW", "MY", "WY"))
      }
      r_vec <- .attenuate(r_vec, rxx = rxx, rmm = rmm, 
                          ryy = ryy, rww = rww)
      
      # Check if matrix is positive definite
      r_mat <- metaSEM::vec2symMat(r_vec, diag = FALSE)
      if (metaSEM::is.pd(r_mat)) break
      iter <- iter + 1
      
    }
    
    out <- c(r_vec[1, ], n = n, 
             rxx = rxx, rmm = rmm, ryy = ryy,
             iter = iter)
    if (nrow(rho_mat) > 3) out <- c(out, rww = rww)
    return(out)
    
  }
  
  
  #
  # Simulate correlations for multiple samples 
  #   in a meta-analysis
  #
  # @param MONX     A standardized regression weight of M on X
  # @param YONX     A standardized regression weight of Y on X
  # @param YONM     A standardized regression weight of Y on M (optional)
  # @param WONX     A standardized regression weight of W on X (optional)
  # @param YONW     A standardized regression weight of Y on W (optional)
  # @param WONM     A standardized regression weight of W on M (optional)
  # @param k        A vector with a minimum and maximum number of samples (optional)
  # @param n        A vector with a minimum and maximum sample size (optional)
  # @param tau      A random heterogeneity estimate (optional)
  # @param rxx      A vector with a minimum and maximum reliability 
  #                   for X (optional)
  # @param rmm      A vector with a minimum and maximum reliability
  #                   for M (optional)
  # @param rww      A vector with a minimum and maximum reliability
  #                   for W (optional)
  # @param ryy      A vector with a minimum and maximum reliability
  #                   for Y (optional)
  # @param tau_max  Maximum random effet (SD) (optional)
  # @return         A matrix with correlations
  #
  .sim_samples <- function(MONX, YONX, YONM = NULL, 
                           WONX = NULL, YONW = NULL, WONM = NULL, 
                           k = NULL, n = NULL, tau = NULL,
                           rxx = NULL, rmm = NULL, rww = NULL, 
                           ryy = NULL, tau_max = .50) {
    
    # Number of samples
    k <- .get_k(k)
    
    # Random effect for correlations
    tau <- .get_tau(tau, tau_max = tau_max)
    
    # Average population correlation matrix
    avg_mat <- .beta_to_cor(MONX = MONX, YONX = YONX, YONM = YONM,
                            WONX = WONX, YONW = YONW, WONM = WONM)
    
    # Simulate correlations for samples
    out <- replicate(k, .sim_r(avg_mat = avg_mat, tau = tau, n = n,
                               rxx = rxx, rmm = rmm, rww = rww,
                               ryy = ryy))
    out <- cbind(sid = seq_len(k), t(out), tau = tau)
    
    return(out)
    
  }
  
  
  # 
  # First stage in two-stage MASEM
  #
  # The sample correlations are adjusted individually 
  # for unreliability before pooling them.
  #
  # @param obj    A matrix with sample statistics for multiple samples
  #                 as returned by .sim_samples()
  # @param adj    A list with logical indicators for each variable
  #                 showing whether to adjust it for unreliability (optional)
  # @return       A list including the pooled correlation
  #                 matrix, its variance-covariance matrix,
  #                 the total sample size, the number of studies,
  #                 and the mean reliabilities
  # @require      metaSEM
  #
  .sim_tsmasem1 <- function(obj, adj = NULL) {
    
    # Sample correlations
    r_mats <- list()
    for (i in seq_len(nrow(obj))) {
      r_mats[[i]] <- metaSEM::vec2symMat(
        obj[i, grepl("^r_", colnames(obj))], 
        diag = FALSE
      )
    }
    r_vecs <- metaSEM::list2matrix(r_mats, diag = FALSE)
    
    # Sampling (co)variances of correlations
    V_vecs <- metaSEM::asyCov(
      x = r_mats, 
      n = obj[, "n"], 
      cor.analysis = TRUE, 
      as.matrix = TRUE,
      acov = "weighted"
    )
    
    # Disattenuate sample correlations
    rww <- obj[, grepl("^rww$", colnames(obj))]
    if (length(rww) == 0) rww <- NULL
    r_vecs <- .disattenuate(r_vecs, 
                            rxx = obj[, "rxx"], rmm = obj[, "rmm"],
                            ryy = obj[, "ryy"], rww = rww,
                            adj = adj, vcov = FALSE)
    
    # Disattentuate sampling (co)variances
    V_vecs <- .disattenuate(V_vecs,
                            rxx = obj[, "rxx"], rmm = obj[, "rmm"],
                            ryy = obj[, "ryy"], rww = rww,
                            adj = adj, vcov = TRUE)

    # Pool correlations
    fit1 <- 
      try(
        suppressWarnings(
          metaSEM::meta(
            y = r_vecs, 
            v = V_vecs,
            RE.constraints = Diag(rep(paste0(0.1, "*Tau2"), ncol(r_vecs))), 
            RE.lbound = 1e-10, 
            silent = TRUE
          )
        ), 
        silent = TRUE
      )

    # Rerun in case of convergence failures
    if (!inherits(fit1, "try-error") & !is.null(fit1$mx.fit$output$status$code)) {
      i <- 0
      while (!(fit1$mx.fit$output$status$code %in% c(0, 1)) & (i < 5)) {
        fit1 <- 
          try(
            suppressMessages(
              metaSEM::rerun(fit1, autofixtau2 = TRUE, extraTries = 10, 
                             silent = TRUE)
            )
          )
        i <- i + 1
      } 
    }
      
    # Return pooled results
    if (inherits(fit1, "try-error") |
        is.null(fit1$mx.fit$output$status$code)) {
      out <- list(status = 9)
    } else {
      pooledS <- coef(fit1, select = "fixed")
      pooledS <- metaSEM::vec2symMat(pooledS, diag = FALSE)
      aCov <- suppressWarnings(vcov(fit1, select = "fixed"))
      out <- list(
        pooledS = pooledS,
        aCov = aCov,
        n = sum(obj[, "n"]),
        k = nrow(obj),
        rxx = weighted.mean(obj[, "rxx"], obj[, "n"]),
        rmm = weighted.mean(obj[, "rmm"], obj[, "n"]),
        ryy = weighted.mean(obj[, "ryy"], obj[, "n"]),
        iter_sample = sum(obj[, "iter"]),
        status = fit1$mx.fit$output$status$code
      )
      if (!is.null(rww))
        out$rww <- weighted.mean(obj[, "rww"], obj[, "n"])
    }
    return(out)

  }
  
  
  # 
  # Second stage in two-stage MASEM
  #
  # The pooled correlations and their asymptotic (co)variances
  # are adjusted for unreliability before fitting the SEM.
  #
  # @param obj    The results of the first step of MASEM
  #                 as returned by .sim_tsmasem1()
  # @param model  The RAM specification for the SEM model
  #                 as returned by metaSEM::lavaan2RAM()
  # @param adj    A list with logical indicators for each variable
  #                 showing whether to adjust it for unreliability (optional)
  # @return       A data.frame with the parameter estimates
  # @require      metaSEM, OpenMx
  #
  .sim_tsmasem2 <- function(obj, model, adj = NULL) {
    
    # Disattenuate pooled correlations
    pooledS <- .disattenuate(OpenMx::vechs(obj$pooledS), 
                             rxx = obj$rxx, rmm = obj$rmm,
                             ryy = obj$ryy, rww = obj$rww,
                             adj = adj, vcov = FALSE)
    pooledS <- metaSEM::vec2symMat(pooledS, diag = FALSE)

    # Disattenuate asymptotic (co)variances
    aCov <- .disattenuate(OpenMx::vech(obj$aCov), 
                          rxx = obj$rxx, rmm = obj$rmm,
                          ryy = obj$ryy, rww = obj$rww,
                          adj = adj, vcov = TRUE)
    aCov <- metaSEM::vec2symMat(aCov, diag = TRUE)
    
    # OpenMx matrices for indirect effects
    ind <- .get_INDmatrix(model)
    
    # Fit SEM
    fit2 <- 
      try(
        suppressWarnings(
          metaSEM::wls(
            Cov = pooledS,
            aCov = aCov,
            n = obj$n,
            RAM = model,
            cor.analysis = TRUE,
            intervals.type = "LB",
            mx.algebras = ind
          )
        ), 
        silent = TRUE
      )
    
    # Rerun in case of convergence failures
    if (!inherits(fit2, "try-error") & !is.null(fit2$mx.fit$output$status$code)) {
      i <- 0
      while (!(fit2$mx.fit$output$status$code %in% c(0, 1)) & (i < 5)) {
        fit2 <- suppressMessages(
          metaSEM::rerun(fit2, extraTries = 10, intervals = TRUE, silent = TRUE)
        ) 
        i <- i + 1
      } 
    }
    
    # Return SEM results
    if (inherits(fit2, "try-error") |
        is.null(fit2$mx.fit$output$status$code) |
        is.null(fit2$mx.fit$output$confidenceIntervals)) {
      out <- c(status = 9)
    } else {
      pars <- fit2$mx.fit$output$confidenceIntervals
      if (length(ind) == 3) 
        rownames(pars)[nrow(pars) - c(2, 1)] <- c("IND1", "IND2")
      rownames(pars)[nrow(pars)] <- "IND"
      out <- c(status = fit2$mx.fit$output$status$code,
               iter_sample = obj$iter_sample)
      for (i in rownames(pars)) {
        out[paste0(i, "_est")] <- pars[i, "estimate"]
        out[paste0(i, "_lb")] <- pars[i, "lbound"]
        out[paste0(i, "_ub")] <- pars[i, "ubound"]
      }
    }
    return(out)
    
  }
  
  
  # 
  # One-stage MASEM with adjustments for unreliability
  #
  #
  # @param obj    A matrix with sample statistics for multiple samples
  #                 as returned by .sim_samples()
  # @param model  The RAM specification for the SEM model
  #                 as returned by metaSEM::lavaan2RAM()
  # @param adj    A list with logical indicators for each variable
  #                 showing whether to adjust it for unreliability (optional)
  # @param individual A logical indicating whether to adjust
  #                   the correlations individually (TRUE) or adjust the 
  #                   model implied correlation matrix (FALSE)
  # @return       A data.frame with the parameter estimates
  # @require      metaSEM, OpenMx
  #
  .sim_osmasem <- function(obj, model, adj = NULL, individual = TRUE) {
    
    # Sample correlations
    r_mats <- list()
    for (i in seq_len(nrow(obj))) {
      r_mats[[i]] <- metaSEM::vec2symMat(
        obj[i, grepl("^r_", colnames(obj))], 
        diag = FALSE
      )
    }
    r_vecs <- metaSEM::list2matrix(r_mats, diag = FALSE)
    
    # Sampling (co)variances of correlations
    V_vecs <- metaSEM::asyCov(
      x = r_mats, 
      n = obj[, "n"], 
      cor.analysis = TRUE, 
      as.matrix = TRUE,
      acov = "weighted"
    )
    
    # Adjust correlations individually
    rww <- obj[, grepl("^rww$", colnames(obj))]
    if (length(rww) == 0) rww <- NULL
    if (individual) {
      
      # Disattenuate sample correlations
      r_vecs <- .disattenuate(r_vecs, 
                              rxx = obj[, "rxx"], rmm = obj[, "rmm"],
                              ryy = obj[, "ryy"], rww = rww,
                              adj = adj, vcov = FALSE)
      
      # Disattentuate sampling (co)variances
      V_vecs <- .disattenuate(V_vecs,
                              rxx = obj[, "rxx"], rmm = obj[, "rmm"],
                              ryy = obj[, "ryy"], rww = rww,
                              adj = adj, vcov = TRUE)
      
    # Disattenuation factors for adjustments of implied correlations
    } else {
      
      att_vecs <- .disattenuate(r_vecs, 
                                rxx = weighted.mean(obj[, "rxx"], obj[, "n"]),
                                rmm = weighted.mean(obj[, "rmm"], obj[, "n"]),
                                ryy = weighted.mean(obj[, "ryy"], obj[, "n"]),
                                rww = weighted.mean(rww, obj[, "n"]),
                                adj = adj, vcov = FALSE, return_att = TRUE)
      att_mat <- metaSEM::vec2symMat(att_vecs, diag = FALSE)
      
    }

    # OpenMx matrices for indirect effects
    ind <- .get_INDmatrix(model)
    ind$CI <- OpenMx::mxCI(names(ind))

    # Generate OpenMx model
    mx.model <- metaSEM::osmasem(
      RAM = model,
      RE.type = "Diag", 
      data = list(
        data = cbind(r_vecs, V_vecs),
        n = obj[, "n"],
        obslabels = NULL,
        ylabels = colnames(r_vecs),
        vlabels = colnames(V_vecs)
      ),
      intervals.type = "LB",
      mxModel.Args = ind, 
      run = FALSE
    )
    
    # Constrain random effects
    mx.model$vecTau1$labels <- cbind(rep("Tau1", length(mx.model$vecTau1$labels)))
    
    # Adjust model implied correlations
    if (!individual) {
      mx.model$G <- metaSEM::as.mxMatrix(att_mat, name = "G")
      mx.model$impliedR <- OpenMx::mxAlgebra(
        ((Fmatrix %*% solve(Iden - Amatrix)) %&% Smatrix) * G, 
        name = "impliedR"
      )
    }
    
    # Fit SEM
    fit <- list(mx.fit = 
      try(
        suppressWarnings(
          OpenMx::mxRun(mx.model, intervals = TRUE, silent = TRUE, 
                        suppressWarnings = TRUE)
        ), 
        silent = TRUE
      )
    )
    class(fit) <- "osmasem"
    
    # Rerun in case of convergence failures
    if (!inherits(fit$mx.fit, "try-error") & !is.null(fit$mx.fit$output$status$code)) {
      i <- 0
      while (!(fit$mx.fit$output$status$code %in% c(0, 1)) & (i < 5)) {
        fit <- suppressMessages(
          metaSEM::rerun(fit, autofixtau2 = TRUE, extraTries = 10, 
                         intervals = TRUE, silent = TRUE)
        ) 
        i <- i + 1
      } 
    }
    
    # Return SEM results
    if (inherits(fit$mx.fit, "try-error") |
        is.null(fit$mx.fit$output$status$code) |
        is.null(fit$mx.fit$output$confidenceIntervals)) {
      out <- c(status = 9)
    } else {
      pars <- fit$mx.fit$output$confidenceIntervals
      pars <- pars[grepl("^osmasem.(Amatrix|IND)", rownames(pars)), ]
      lbl <- c("X", "M", "W", "Y")
      if (ncol(r_vecs) < 4) lbl <- lbl[-3]
      rownames(pars) <- c(paste0(rep(lbl, length(lbl)), "ON",
                                 rep(lbl, each = length(lbl))),
                          names(ind)[-length(ind)])
      pars <- pars[apply(pars, 1, \(x) { !all(x %in% c(0, NA)) }), ]
      out <- c(status = fit$mx.fit$output$status$code,
               iter_sample = sum(obj[, "iter"]))
      for (i in rownames(pars)) {
        out[paste0(i, "_est")] <- pars[i, "estimate"]
        out[paste0(i, "_lb")] <- pars[i, "lbound"]
        out[paste0(i, "_ub")] <- pars[i, "ubound"]
      }
    }
    return(out)
    
  }
  
  
  #########################################################

  
  # RAM specification for SEM model
  type <- ifelse(!is.null(WONM),
                 "sequential",
                 ifelse(!is.null(WONX), "parallel", "simple"))
  model <- .get_model(type)
  
  # Number of errors
  status <- list(masem1a = 0, masem2a = 0, 
                 masem1b = 0, masem2b = 0, 
                 masem1c = 0, masem2c = 0, 
                 masem0d = 0, masem0e = 0,
                 masem0f = 0)

  # Reestimate models until all have converged
  repeat {
    
    # Calculate correlation matrices from randomly
    #   generated samples in the meta-analysis
    simdat <- .sim_samples(MONX = MONX, YONM = YONM, YONX = YONX,
                           WONX = WONX, YONW = YONW, WONM = WONM,
                           k = k, n = n, tau = tau, rxx = rxx,
                           rmm = rmm, rww = rww, ryy = ryy,
                           tau_max = tau_max)
      
    out <- c()
    stage1 <- list()
    
    # The meta-analysis is estimated six times:
    #  (a) TSMASEM without adjustments
    #  (b) TSMASEM with adjustments for all variables
    #        at the first stage
    #  (c) TSMASEM with adjustments for all variables
    #        at the second stage
    #  (d) OSMASEM without adjustments
    #  (e) OSMASEM with individual adjustments for all variables
    #  (f) OSMASEM with adjustments of implied correlations
    #       for all variables
    # If a model results in an error (e.g., failed to converge),
    # the entire estimation restarts with new random draws.
    
    # Estimation error occurred
    e <- lapply(status, \(x) 0)
    
    # (a) TSMASEM without adjustments
    stage1$masem_a <- .sim_tsmasem1(simdat, 
                                    adj = list(rxx = FALSE, rmm = FALSE,
                                               rww = FALSE, ryy = FALSE))
    e$masem1a <- stage1$masem_a$status
    if (stage1$masem_a$status <= 1) {
      out$masem_a <- .sim_tsmasem2(stage1$masem_a, model,
                                   adj = list(rxx = FALSE, rmm = FALSE,
                                              rww = FALSE, ryy = FALSE))
      e$masem2a <- out$masem_a["status"]
    }
    
    # (b) TSMASEM with adjustments for all variables at the first stage
    stage1$masem_b <- .sim_tsmasem1(simdat, 
                                    adj = list(rxx = TRUE, rmm = TRUE, 
                                               rww = TRUE, ryy = TRUE))
    e$masem1b <- stage1$masem_b$status
    if (stage1$masem_b$status <= 1) {
      out$masem_b <- .sim_tsmasem2(stage1$masem_b, model,
                                   adj = list(rxx = FALSE, rmm = FALSE,
                                              rww = FALSE, ryy = FALSE))
      e$masem2b <- out$masem_b["status"]
    }
    
    # (c) TSMASEM with adjustments for all variables at the second stage
    e$masem1c <- e$masem1a 
    if (stage1$masem_a$status <= 1) {
      out$masem_c <- .sim_tsmasem2(stage1$masem_a, model,
                                   adj = list(rxx = TRUE, rmm = TRUE, 
                                              rww = TRUE, ryy = TRUE))
      e$masem2c <- out$masem_c["status"]
    }
    
    # (d) OSMASEM without adjustments
    out$masem_d <- .sim_osmasem(simdat, model,
                                adj = list(rxx = FALSE, rmm = FALSE, 
                                           rww = FALSE, ryy = FALSE))
    e$masem0d <- out$masem_d["status"]
    
    # (i) OSMASEM with individual adjustments for all variables
    out$masem_e <- .sim_osmasem(simdat, model,
                                adj = list(rxx = TRUE, rmm = TRUE,
                                           rww = TRUE, ryy = TRUE),
                                individual = TRUE)
    e$masem0e <- out$masem_e["status"]
    
    # (j) OSMASEM with adjustments of implied correlations
    #       for all variables
    out$masem_f <- .sim_osmasem(simdat, model,
                                adj = list(rxx = TRUE, rmm = TRUE,
                                           rww = TRUE, ryy = TRUE),
                                individual = FALSE)
    e$masem0f <- out$masem_f["status"]
    
    # Record errors
    for (i in names(e)) {
      if (e[[i]] %in% 2:9) status[[i]] <- e[[i]]
    }

    # Repeat meta-analysis unless converged
    if (all(sapply(e, \(x) (x %in% 0:1))))
      break

  }

  # True indirect effect
  IND <- switch(type,
                simple = c(IND = MONX * YONM),
                parallel = c(IND1 = MONX * YONM,
                             IND2 = WONX * YONW,
                             IND = MONX * YONM + WONX * YONW),
                sequential = c(IND = MONX * WONM * YONW))
  
  # Return results
  status <- unlist(status)
  names(status) <- paste0(names(status), ".status")
  out <- lapply(out, \(o) { o[-c(1, 2)] })
  out <- c(id = rid, 
           MONX = MONX, YONM = YONM, YONX = YONX, 
           WONX = WONX, YONW = YONW, WONM = WONM,
           IND,
           simdat[1, "tau"], 
           k = stage1$masem_a$k, 
           n = stage1$masem_a$n,
           rxx = stage1$masem_a$rxx,
           rmm = stage1$masem_a$rmm,
           rww = stage1$masem_a$rww,
           ryy = stage1$masem_a$ryy,
           unlist(out), 
           iter_samp = stage1$masem_a$iter_samp, 
           status) |>
         unlist()
  return(out)
  
}


#
# Run simulation for a given condition
#
# @param R         The number of replications (optional)
# @param cores     The number of cores to use for parallel processing 
#                    (optional)
# @param seed      A random seed (optional)
# @param ...       Arguments passed to sim_wrapper()
# @require         parallel, pbapply
#
sim_run <- function(R = 1000, cores = parallel::detectCores(), 
                    seed = 9789, ...) {
  
  # Don't run the simulation
  if (isTRUE(TEST)) return(NULL)
  
  # Create clusters
  cl <- parallel::makeCluster(cores)

  # Set seed
  parallel::clusterSetRNGStream(cl = cl, iseed = seed)
  
  # Run simulation
  parallel::clusterExport(cl, "sim_wrapper")
  invisible(parallel::clusterEvalQ(
    cl, 
    { library("OpenMx"); library("metaSEM"); 
      library("MASS"); library("Matrix") }
  ))
  data <- 
    pbapply::pbsapply(
      X = 1:R,
      FUN = \(i) { sim_wrapper(rid = i, ...) },
      cl = cl
    )
  data <- as.data.frame(t(data))
  
  # Stop clusters
  parallel::stopCluster(cl)
  
  return(data)
  
}


TEST <- TRUE

2 Simple mediation

simdat <- list()

# Low reliability
simdat$simple1 <- sim_run(MONX = .30, YONM = .30, YONX = .00,
                          rmm = c(.65, .75),
                          R = 1000, seed = 4312)

# High reliability
simdat$simple2 <- sim_run(MONX = .30, YONM = .30, YONX = .00,
                          rmm = c(.85, .95),
                          R = 1000, seed = 4312)

# Small indirect effect
simdat$simple3 <- sim_run(MONX = .30, YONM = .075, YONX = .00,
                          R = 1000, seed = 8214)

# Large indirect effect
simdat$simple4 <- sim_run(MONX = .30, YONM = .675, YONX = .00,
                          R = 1000, seed = 8214)

# Small a-path
simdat$simple5 <- sim_run(MONX = .15, YONM = .60, YONX = .00,
                          R = 1000, seed = 3419)

# Large a-path
simdat$simple6 <- sim_run(MONX = .60, YONM = .15, YONX = .00,
                          R = 1000, seed = 3419)

3 Parallel mediation

# Low reliability
simdat$parallel1 <- sim_run(MONX = .30, YONM = .30, YONX = .00,
                            WONX = .30, YONW = .30,
                            rmm = c(.65, .75),
                            rww = c(.65, .75),
                            R = 1000, seed = 4312)

# High reliability
simdat$parallel2 <- sim_run(MONX = .30, YONM = .30, YONX = .00,
                            WONX = .30, YONW = .30,
                            rmm = c(.85, .95),
                            rww = c(.85, .95),
                            R = 1000, seed = 4312)

# Small indirect effect
simdat$parallel3 <- sim_run(MONX = .30, YONM = .075, YONX = .00,
                            WONX = .30, YONW = .075,
                            R = 1000, seed = 8214)

# Large indirect effect
simdat$parallel4 <- sim_run(MONX = .30, YONM = .675, YONX = .00,
                            WONX = .30, YONW = .675,
                            R = 1000, seed = 8214)

# Small a-path
simdat$parallel5 <- sim_run(MONX = .15, YONM = .60, YONX = .00,
                            WONX = .15, YONW = .60,
                            R = 1000, seed = 3419)

# Large a-path
simdat$parallel6 <- sim_run(MONX = .60, YONM = .15, YONX = .00,
                            WONX = .60, YONW = .15,
                            R = 1000, seed = 3419)

4 Sequential mediation

# Low reliability
simdat$sequential1 <- sim_run(MONX = .30, YONX = .00,
                              WONM = .44, YONW = .30,
                              rmm = c(.65, .75),
                              rww = c(.65, .75),
                              R = 1000, seed = 4312)

# High reliability
simdat$sequential2 <- sim_run(MONX = .30, YONX = .00,
                              WONM = .44, YONW = .30,
                              rmm = c(.85, .95),
                              rww = c(.85, .95),
                              R = 1000, seed = 4312)

# Small indirect effect
simdat$sequential3 <- sim_run(MONX = .30, YONX = .00,
                              WONM = .44, YONW = .075,
                              R = 1000, seed = 8214)

# Large indirect effect
simdat$sequential4 <- sim_run(MONX = .30, YONX = .00,
                              WONM = .44, YONW = .675,
                              R = 1000, seed = 8214)

# Small a-path
simdat$sequential5 <- sim_run(MONX = .15, YONX = .00,
                              WONM = .44, YONW = .60,
                              R = 1000, seed = 3419)

# Large a-path
simdat$sequential6 <- sim_run(MONX = .60, YONX = .00,
                              WONM = .44, YONW = .15,
                              R = 1000, seed = 3419)

5 Save results

if (isFALSE(TEST))
  saveRDS(simdat, file = here::here("data", "simdat.rds"))