/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.eks.tpcds;

import com.amazonaws.eks.tpcds.TpcdsDdlStatements$;
import java.io.Serializable;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.util.Try$;

public final class CreateIcebergTablesForWrites$ {
    public static CreateIcebergTablesForWrites$ MODULE$;

    static {
        new CreateIcebergTablesForWrites$();
    }

    public void main(String[] args) {
        String lstBenchDataDir = args[0];
        String deltaMergeSourcesDir = args[1];
        String s3PathPrefixDestinationDir = args[2];
        String format = (String)Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> args[3]).getOrElse((Function0 & Serializable & scala.Serializable)() -> "parquet");
        String scaleFactor = (String)Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> args[4]).getOrElse((Function0 & Serializable & scala.Serializable)() -> "1");
        boolean onlyWarn = BoxesRunTime.unboxToBoolean((Object)Try$.MODULE$.apply((Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString(args[5])).toBoolean()).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> false));
        String databaseName = (String)Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> args[6]).getOrElse((Function0 & Serializable & scala.Serializable)() -> "tpcds_writes_db_iceberg");
        Predef$.MODULE$.println((Object)new StringBuilder(22).append("LST BENCH DATA DIR is ").append(lstBenchDataDir).toString());
        Predef$.MODULE$.println((Object)new StringBuilder(27).append("DELTA MERGE SOURCES DIR is ").append(deltaMergeSourcesDir).toString());
        String tempDatabaseName = new StringBuilder(10).append(databaseName).append("_temp_hive").toString();
        SparkSession spark = SparkSession$.MODULE$.builder().appName(new StringBuilder(80).append("TPCDS SQL Benchmark create tables for write benchmark with scale ").append(scaleFactor).append(" GB and ").append(format).append(" format").toString()).enableHiveSupport().getOrCreate();
        if (onlyWarn) {
            Predef$.MODULE$.println((Object)"Only WARN");
            LogManager.getLogger((String)"org").setLevel(Level.WARN);
        }
        Seq inputs = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"web_returns_file005_match000_notmatch005", "web_returns_file005_match000_notmatch010", "web_returns_file005_match000_notmatch050", "web_returns_file005_match000_notmatch100", "web_returns_file005_match001_notmatch010", "web_returns_file005_match005_notmatch000", "web_returns_file005_match010_notmatch000", "web_returns_file005_match100_notmatch001", "web_returns_file005_match010_notmatch010", "web_returns_file005_match050_notmatch001", "web_returns_file005_match099_notmatch001", "web_returns_file050_match001_notmatch0001", "web_returns_file100_match001_notmatch0001"}));
        Seq lstSourceToCopy = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"catalog_returns", (Object)TpcdsDdlStatements$.MODULE$.catalog_returns_ddl()), new Tuple2((Object)"catalog_sales", (Object)TpcdsDdlStatements$.MODULE$.catalog_sales_ddl()), new Tuple2((Object)"inventory", (Object)TpcdsDdlStatements$.MODULE$.inventory_ddl()), new Tuple2((Object)"store_returns", (Object)TpcdsDdlStatements$.MODULE$.store_returns_ddl()), new Tuple2((Object)"store_sales", (Object)TpcdsDdlStatements$.MODULE$.store_sales_ddl()), new Tuple2((Object)"web_returns", (Object)TpcdsDdlStatements$.MODULE$.web_returns_ddl()), new Tuple2((Object)"web_sales", (Object)TpcdsDdlStatements$.MODULE$.web_sales_ddl()), new Tuple2((Object)"date_dim", (Object)TpcdsDdlStatements$.MODULE$.date_dim_ddl())}));
        Seq deltaCPS3Command = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"s3-dist-cp", "--src", deltaMergeSourcesDir, "--dest", new StringBuilder(1).append(s3PathPrefixDestinationDir).append("/").append(tempDatabaseName).toString()}));
        int deltaCPOutput = scala.sys.process.package$.MODULE$.stringSeqToProcess(deltaCPS3Command).$bang();
        if (deltaCPOutput != 0) {
            Predef$.MODULE$.println((Object)"Failed to copy over delta merge sources");
            throw scala.sys.package$.MODULE$.exit(-1);
        }
        spark.sql(new StringBuilder(30).append("CREATE DATABASE IF NOT EXISTS ").append(tempDatabaseName).toString());
        spark.sql(new StringBuilder(4).append("USE ").append(tempDatabaseName).toString());
        spark.sql("show tables").show();
        inputs.foreach((Function1 & Serializable & scala.Serializable)tableName -> {
            CreateIcebergTablesForWrites$.$anonfun$main$9(s3PathPrefixDestinationDir, tempDatabaseName, spark, tableName);
            return BoxedUnit.UNIT;
        });
        Seq tasks = (Seq)lstSourceToCopy.withFilter((Function1 & Serializable & scala.Serializable)check$ifrefutable$1 -> BoxesRunTime.boxToBoolean((boolean)CreateIcebergTablesForWrites$.$anonfun$main$10(check$ifrefutable$1))).map((Function1 & Serializable & scala.Serializable)x$1 -> {
            Tuple2 tuple2 = x$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            String tableName = (String)tuple2._1();
            String ddl = (String)tuple2._2();
            Future future = Future$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                Seq s3CPCommand = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"s3-dist-cp", "--src", new StringBuilder(1).append(lstBenchDataDir).append("/").append(tableName).toString(), "--dest", new StringBuilder(2).append(s3PathPrefixDestinationDir).append("/").append(tempDatabaseName).append("/").append(tableName).toString()}));
                int output = scala.sys.process.package$.MODULE$.stringSeqToProcess(s3CPCommand).$bang();
                if (output != 0) {
                    Predef$.MODULE$.println((Object)"Failed to copy over LST data");
                    throw scala.sys.package$.MODULE$.exit(-1);
                }
                String sqlForTable = ddl.replace("s3PathPrefix", s3PathPrefixDestinationDir).replace("s3PathTargetDatabase", tempDatabaseName);
                spark.sql(sqlForTable).show();
                if (!tableName.contains("date_dim")) {
                    spark.sql(new StringBuilder(18).append("MSCK REPAIR TABLE ").append(tableName).toString()).show();
                }
                Predef$.MODULE$.println((Object)new StringBuilder(8).append("Finish: ").append(tableName).toString());
            }, ExecutionContext.Implicits$.MODULE$.global());
            return future;
        }, Seq$.MODULE$.canBuildFrom());
        Future allTasks = Future$.MODULE$.sequence((TraversableOnce)tasks, Seq$.MODULE$.canBuildFrom(), ExecutionContext.Implicits$.MODULE$.global());
        Predef$.MODULE$.println((Object)"Awaiting creation...");
        Await$.MODULE$.result((Awaitable)allTasks, (Duration)new package.DurationInt(package$.MODULE$.DurationInt(36000)).seconds());
        IndexedSeq deltaMergeTargetTasks = (IndexedSeq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 16).map((Function1 & Serializable & scala.Serializable)index -> Future$.MODULE$.apply((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> {
            Seq s3Command = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"s3-dist-cp", "--src", new StringBuilder(14).append(s3PathPrefixDestinationDir$1).append("/").append(tempDatabaseName$1).append("/web_returns/").toString(), "--dest", new StringBuilder(16).append(s3PathPrefixDestinationDir$1).append("/").append(tempDatabaseName$1).append("/web_returns_q").append(index).append("/").toString()}));
            scala.sys.process.package$.MODULE$.stringSeqToProcess(s3Command).$bang$bang();
            String sqlForTable = TpcdsDdlStatements$.MODULE$.createDeltaMergeTargetTableSql().replace("tableName", new StringBuilder(13).append("web_returns_q").append(index).toString()).replace("s3PathPrefix", s3PathPrefixDestinationDir$1).replace("s3PathTargetDatabase", tempDatabaseName$1);
            spark$1.sql(sqlForTable);
            spark$1.sql(new StringBuilder(31).append("MSCK REPAIR TABLE web_returns_q").append(index).toString());
            Predef$.MODULE$.println((Object)new StringBuilder(21).append("Finish: web_returns_q").append(index).toString());
            return index;
        }, ExecutionContext.Implicits$.MODULE$.global()), IndexedSeq$.MODULE$.canBuildFrom());
        Future allDeltaMergeTargetTasks = Future$.MODULE$.sequence((TraversableOnce)deltaMergeTargetTasks, IndexedSeq$.MODULE$.canBuildFrom(), ExecutionContext.Implicits$.MODULE$.global());
        Predef$.MODULE$.println((Object)"Awaiting creation...");
        Await$.MODULE$.result((Awaitable)allDeltaMergeTargetTasks, (Duration)new package.DurationInt(package$.MODULE$.DurationInt(36000)).seconds());
        Predef$.MODULE$.println((Object)"Finished creation...");
        spark.sql("show tables").show();
        spark.sql(new StringBuilder(45).append("CREATE DATABASE IF NOT EXISTS hadoop_catalog.").append(databaseName).toString());
        Dataset tables = spark.sql("show tables");
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])tables.collect())).map((Function1 & Serializable & scala.Serializable)r -> r.apply(1).toString(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).foreach((Function1 & Serializable & scala.Serializable)t -> {
            CreateIcebergTablesForWrites$.$anonfun$main$16(tempDatabaseName, databaseName, spark, t);
            return BoxedUnit.UNIT;
        });
        spark.sql(new StringBuilder(32).append("show tables from hadoop_catalog.").append(databaseName).toString()).show();
        throw scala.sys.package$.MODULE$.exit(0);
    }

    public static final /* synthetic */ void $anonfun$main$9(String s3PathPrefixDestinationDir$1, String tempDatabaseName$1, SparkSession spark$1, String tableName) {
        String sqlForTable = TpcdsDdlStatements$.MODULE$.createDeltaMergeSourceSql().replace("tableName", tableName).replace("s3PathPrefix", s3PathPrefixDestinationDir$1).replace("s3PathTargetDatabase", tempDatabaseName$1);
        spark$1.sql(sqlForTable).show();
    }

    public static final /* synthetic */ boolean $anonfun$main$10(Tuple2 check$ifrefutable$1) {
        Tuple2 tuple2 = check$ifrefutable$1;
        boolean bl = tuple2 != null;
        return bl;
    }

    public static final /* synthetic */ void $anonfun$main$16(String tempDatabaseName$1, String databaseName$1, SparkSession spark$1, String t) {
        String source_table = new StringBuilder(35).append("source_table => 'spark_catalog.").append(tempDatabaseName$1).append(".").append(t).append("', ").toString();
        String target_table = new StringBuilder(27).append("table => 'hadoop_catalog.").append(databaseName$1).append(".").append(t).append("'").toString();
        String command = new StringBuilder(37).append("CALL hadoop_catalog.system.snapshot(").append(source_table).append(target_table).append(")").toString();
        spark$1.sql(command);
        Thread.sleep(64000L);
    }

    private CreateIcebergTablesForWrites$() {
        MODULE$ = this;
    }
}

