from pyspark.sql import SparkSession
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv, ['JOB_NAME','output_bucket','input_data_path', 'database', 'table'])
s3_input = args['input_data_path']
table = args['table']
s3_output=args['output_bucket'] + '/' + args['table']
database = args['database']
spark = SparkSession.builder.appName("create_iceberg_table_on_s3") \
   .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
   .config("spark.sql.catalog.glue_catalog","org.apache.iceberg.spark.SparkCatalog") \
   .config("spark.sql.catalog.glue_catalog.warehouse",f"s3://${s3_output}/") \
   .config("spark.sql.catalog.glue_catalog.catalog-impl","org.apache.iceberg.aws.glue.GlueCatalog") \
   .config("spark.sql.catalog.glue_catalog.io-impl","org.apache.iceberg.aws.s3.S3FileIO") \
   .getOrCreate()
dataframe = spark.read.parquet(s3_input)
dataframe.createOrReplaceTempView("tmp_taxi")
query = f"""
CREATE TABLE glue_catalog.{database}.{table}
USING iceberg
TBLPROPERTIES ("format-version"="2")
AS SELECT VendorID,passenger_count,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_distance,RatecodeID,cast(tpep_pickup_datetime as timestamp) tpep_pickup_datetime,cast(tpep_dropoff_datetime as timestamp) tpep_dropoff_datetime
   FROM tmp_taxi
"""
spark.sql(query)
