From 2f708c4c5fd07e5d322f5d5b5b2d2770b201acc4 Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Sat, 15 Mar 2025 23:20:57 +0900 Subject: [PATCH 1/2] =?UTF-8?q?ETL,=20ELT=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/.env | 6 +- airflow/dags/ELT_eventsim_song_count_DAG.py | 154 ++++++++++++++++++ ...ventsim_ETL_DAG.py => ETL_eventsim_DAG.py} | 2 +- airflow/dags/eventsim_song_count_ELT.py | 50 ------ airflow/dags/plugins/spark_utils.py | 75 +++++++-- ...m_ETL_script.py => ETL_eventsim_script.py} | 57 ++++--- 6 files changed, 253 insertions(+), 91 deletions(-) create mode 100644 airflow/dags/ELT_eventsim_song_count_DAG.py rename airflow/dags/{eventsim_ETL_DAG.py => ETL_eventsim_DAG.py} (96%) delete mode 100644 airflow/dags/eventsim_song_count_ELT.py rename airflow/dags/scripts/{eventsim_ETL_script.py => ETL_eventsim_script.py} (68%) diff --git a/airflow/.env b/airflow/.env index e983c51..cbc0216 100644 --- a/airflow/.env +++ b/airflow/.env @@ -18,7 +18,11 @@ AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU AWS_SECRET_ACCESS_KEY=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 AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정 - +# BSH SNOWFLAKE CONN INFO +SNOWFLAKE_USER_BSH= BSH +SNOWFLAKE_PASSWORD_BSH= BSH1234! +SNOWFLAKE_SILVER_SCHEMA = RAW_DATA +SNOWFLAKE_GOLD_SCHEMA = ANALYTICS #S3_Spark_SnowFlake_ELT SNOWFLAKE_USER= BY diff --git a/airflow/dags/ELT_eventsim_song_count_DAG.py b/airflow/dags/ELT_eventsim_song_count_DAG.py new file mode 100644 index 0000000..05a1742 --- /dev/null +++ b/airflow/dags/ELT_eventsim_song_count_DAG.py @@ -0,0 +1,154 @@ +import os +from datetime import timedelta +from dotenv import load_dotenv +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.utils.dates import days_ago +from airflow.exceptions import AirflowFailException +import snowflake.connector +import pandas as pd +from spark_utils import execute_snowflake_query + +load_dotenv() + +# SNOWFLAKE 설정 +SNOWFLAKE_SOURCE_TABLE = 'EVENTSIM_LOG' +SNOWFLAKE_SOURCE_SCHEMA = os.environ.get("SNOWFLAKE_SILVER_SCHMEA", "RAW_DATA") +SNOWFLAKE_TARGET_SONG_TABLE = "EVENTSIM_SONG_COUNTS" +SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS" +SNOWFLAKE_TARGET_SCHEMA = os.environ.get("SNOWFLAKE_GOLD_SCHEMA", "ANALYTICS") + +SNOWFLAKE_PROPERTIES = { + "user": os.environ.get("SNOWFLAKE_USER_BSH"), + "password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"), + "account": os.environ.get("SNOWFLAKE_ACCOUNT"), + "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), +} + +def extract_data_from_snowflake(): + """ + Snowflake에서 데이터를 읽어오는 함수 + """ + SNOWFLAKE_PROPERTIES['schema'] = SNOWFLAKE_SOURCE_SCHEMA + + query = f""" + SELECT SONG, ARTIST, LOCATION, SESSIONID, USERID, TS + FROM {SNOWFLAKE_SOURCE_TABLE} + """ + try: + conn = snowflake.connector.connect( + user=SNOWFLAKE_PROPERTIES["user"], + password=SNOWFLAKE_PROPERTIES["password"], + account=SNOWFLAKE_PROPERTIES["account"], + database=SNOWFLAKE_PROPERTIES["db"], + schema=SNOWFLAKE_PROPERTIES["schema"], + warehouse=SNOWFLAKE_PROPERTIES["warehouse"], + role=SNOWFLAKE_PROPERTIES["role"], + ) + cur = conn.cursor() + rows = cur.execute(query).fetchall() + columns = [desc[0] for desc in cur.description] + + df = pd.DataFrame(rows, columns=columns) + print(df.head(5)) + print("Data successfully extracted from Snowflake!") + return df + + except Exception as e: + print(f"Error extracting data from Snowflake: {e}") + return None + + finally: + cur.close() + conn.close() + +def process_song_counts(**kwargs): + SNOWFLAKE_PROPERTIES['schema'] = SNOWFLAKE_TARGET_SCHEMA + ti = kwargs['ti'] + df = ti.xcom_pull(task_ids='extract_data') + + if df is None or df.empty: + raise AirflowFailException("No data available for processing song counts.") + + # 노래 카운트 계산 + song_counts = df.groupby(["SONG", "ARTIST"]).size().reset_index(name="song_count") + + # 테이블 비우기 + execute_snowflake_query(f"TRUNCATE TABLE {SNOWFLAKE_TARGET_SONG_TABLE};", SNOWFLAKE_PROPERTIES) + + # SQL 쿼리 (executemany 사용) + insert_query = f""" + INSERT INTO {SNOWFLAKE_TARGET_SONG_TABLE} (SONG, ARTIST, song_count) + VALUES (%s, %s, %s) + """ + + # DataFrame을 리스트로 변환 후 executemany 실행 + execute_snowflake_query(insert_query, SNOWFLAKE_PROPERTIES, data=song_counts.values.tolist()) + + print(f"Data written to {SNOWFLAKE_TARGET_SONG_TABLE} in Snowflake.") + +def process_artist_counts(**kwargs): + SNOWFLAKE_PROPERTIES['schema'] = SNOWFLAKE_TARGET_SCHEMA + ti = kwargs['ti'] + df = ti.xcom_pull(task_ids='extract_data') + if df is None or df.empty: + raise AirflowFailException("No data available for processing artist counts.") + + artist_counts = df.groupby("ARTIST").size().reset_index(name="artist_count") + execute_snowflake_query(f"TRUNCATE TABLE {SNOWFLAKE_TARGET_ARTIST_TABLE};", SNOWFLAKE_PROPERTIES) + insert_query = f""" + INSERT INTO {SNOWFLAKE_TARGET_ARTIST_TABLE} (ARTIST, artist_count) + VALUES (%s, %s) + """ + execute_snowflake_query(insert_query, SNOWFLAKE_PROPERTIES, artist_counts.values.tolist()) + print(f"Data written to {SNOWFLAKE_TARGET_ARTIST_TABLE} in Snowflake.") + +# DAG 설정 +default_args = { + "owner": "sanghyoek_boo", + "retries": 1, + "retry_delay": timedelta(minutes=5), + "start_date": days_ago(1), +} + +dag = DAG( + dag_id="ELT_eventsim_song_artist_count", + default_args=default_args, + schedule_interval="@daily", + catchup=False, + tags=["ELT", "Eventsim"], +) + +trigger_dag_task = TriggerDagRunOperator( + task_id="trigger_eventsim_etl", + trigger_dag_id="eventsim_ETL", # 실행할 대상 DAG ID + wait_for_completion=True, # 완료될 때까지 대기 + poke_interval=10, # DAG 상태 체크 주기 (초 단위) + dag=dag, +) + +extract_data_task = PythonOperator( + task_id="extract_data", + python_callable=extract_data_from_snowflake, + provide_context=True, + dag=dag, +) + +process_song_task = PythonOperator( + task_id="process_song_counts", + python_callable=process_song_counts, + provide_context=True, + dag=dag, +) + +process_artist_task = PythonOperator( + task_id="process_artist_counts", + python_callable=process_artist_counts, + provide_context=True, + dag=dag, +) + +trigger_dag_task >> extract_data_task >> [process_song_task, process_artist_task] diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/ETL_eventsim_DAG.py similarity index 96% rename from airflow/dags/eventsim_ETL_DAG.py rename to airflow/dags/ETL_eventsim_DAG.py index baf5058..668dbc8 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/ETL_eventsim_DAG.py @@ -41,7 +41,7 @@ # SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE spark_job = SparkSubmitOperator( task_id="spark_process_s3_upsert", - application="/opt/airflow/dags/scripts/eventsim_ETL_script.py", + application="/opt/airflow/dags/scripts/ETL_eventsim_script.py", conn_id="spark_conn", application_args=[ S3_BUCKET, diff --git a/airflow/dags/eventsim_song_count_ELT.py b/airflow/dags/eventsim_song_count_ELT.py deleted file mode 100644 index 68f7dc9..0000000 --- a/airflow/dags/eventsim_song_count_ELT.py +++ /dev/null @@ -1,50 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.operators.trigger_dagrun import TriggerDagRunOperator -from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator -from airflow.utils.dates import days_ago - -SOURCE_TABLE = "RAW_DATA.EVENTSIM_LOG" -TARGET_TABLE = "ANALYTICS.EVENTSIM_SONG_COUNTS" - -SQL_QUERY = f""" - CREATE OR REPLACE TABLE {TARGET_TABLE} AS - SELECT SONG, ARTIST, COUNT(*) AS song_count - FROM {SOURCE_TABLE} - GROUP BY SONG, ARTIST - ORDER BY song_count DESC; -""" - -default_args = { - "owner": "sanghyoek_boo", - "retries": 1, - "retry_delay": timedelta(minutes=5), - "start_date": days_ago(1), -} - -dag = DAG( - dag_id="evetnsim_song_count_ELT", - default_args=default_args, - schedule_interval="@daily", - catchup=False, - tags=["ELT", "Eventsim", "Song Count"], -) - -trigger_dag_task = TriggerDagRunOperator( - task_id="trigger_task", - trigger_dag_id="eventsim_ETL", - wait_for_completion=True, - poke_interval=10, - dag=dag, -) - -run_snowflake_query = SnowflakeOperator( - task_id="aggregate_song_counts", - sql=SQL_QUERY, - snowflake_conn_id="snowflake_conn", - dag=dag, -) - -if __name__ == "__main__": - trigger_dag_task >> run_snowflake_query diff --git a/airflow/dags/plugins/spark_utils.py b/airflow/dags/plugins/spark_utils.py index 6c1a461..697f547 100644 --- a/airflow/dags/plugins/spark_utils.py +++ b/airflow/dags/plugins/spark_utils.py @@ -2,18 +2,22 @@ import snowflake.connector from pyspark.sql import SparkSession - -from airflow.models import connection +import pandas as pd +from airflow.exceptions import AirflowFailException # Spark JARs 설정 -SPARK_HOME = "/opt/spark/" -SPARK_JARS = ",".join( - [ - os.path.join(SPARK_HOME, "jars", "snowflake-jdbc-3.9.2.jar"), - os.path.join(SPARK_HOME, "jars", "hadoop-aws-3.3.4.jar"), - os.path.join(SPARK_HOME, "jars", "aws-java-sdk-bundle-1.12.262.jar"), - ] -) +# SPARK_HOME 설정 +SPARK_HOME = "/opt/spark" +os.environ["SPARK_HOME"] = SPARK_HOME + +# JAR 경로 설정 +SPARK_JARS_DIR = os.path.join(SPARK_HOME, "jars") +SPARK_JARS_LIST = [ + "snowflake-jdbc-3.9.2.jar", + "hadoop-aws-3.3.4.jar", + "aws-java-sdk-bundle-1.12.262.jar" +] +SPARK_JARS = ",".join([os.path.join(SPARK_JARS_DIR, jar) for jar in SPARK_JARS_LIST]) # Spark Session builder @@ -29,14 +33,27 @@ def spark_session_builder(app_name: str) -> SparkSession: return ( SparkSession.builder.appName(f"{app_name}") .config("spark.jars", SPARK_JARS) + .config("spark.driver.extraClassPath", "/opt/spark/jars/snowflake-jdbc-3.9.2.jar") + .config("spark.executor.extraClassPath", SPARK_JARS) .getOrCreate() ) -def execute_snowflake_query(query, snowflake_options): +def execute_snowflake_query(query: str, snowflake_options: dict, data=None, fetch=False): """ - Snowflake에서 SQL 쿼리를 실행하는 함수 + Snowflake에서 SQL 쿼리를 실행하거나 데이터를 조회하는 함수 + + Args: + query (str): 실행할 SQL 쿼리 + snowflake_options (dict): Snowflake 접속 정보 + data (list, optional): executemany를 사용할 경우 전달할 데이터 리스트 + fetch (bool, optional): SELECT 쿼리 실행 후 데이터를 반환할지 여부 + + Returns: + pd.DataFrame | None: fetch=True인 경우 DataFrame 반환, 그렇지 않으면 None 반환 """ + print(f"snowflake_opt: {snowflake_options}") + try: conn = snowflake.connector.connect( user=snowflake_options["user"], @@ -45,12 +62,42 @@ def execute_snowflake_query(query, snowflake_options): database=snowflake_options["db"], schema=snowflake_options["schema"], warehouse=snowflake_options["warehouse"], + role=snowflake_options["role"], ) cur = conn.cursor() - cur.execute(query) - conn.commit() + + if data: + if isinstance(data, list): + cur.executemany(query, data) + else: + cur.execute(query, data) + conn.commit() + + elif not fetch: + cur.execute(query) + conn.commit() + + if fetch: + result = cur.fetchall() # 데이터 가져오기 + print('result: ' + result) + if cur.description: # 컬럼 정보가 존재할 경우에만 DataFrame 생성 + df = pd.DataFrame(result, columns=[desc[0] for desc in cur.description]) + else: + df = pd.DataFrame() # 빈 DataFrame 반환 + cur.close() + conn.close() + return df + cur.close() conn.close() print("Query executed successfully.") except Exception as e: print(f"Execute_snowflake_query Error: {e}") + print(f'Query: {query}') + print(f'Data: {data}') + raise AirflowFailException("execute query error") + +def escape_quotes(value): + if value is None: + return "NULL" + return "'{}'".format(value.replace("'", "''")) \ No newline at end of file diff --git a/airflow/dags/scripts/eventsim_ETL_script.py b/airflow/dags/scripts/ETL_eventsim_script.py similarity index 68% rename from airflow/dags/scripts/eventsim_ETL_script.py rename to airflow/dags/scripts/ETL_eventsim_script.py index 35cb843..0f223f8 100644 --- a/airflow/dags/scripts/eventsim_ETL_script.py +++ b/airflow/dags/scripts/ETL_eventsim_script.py @@ -5,9 +5,7 @@ from dotenv import load_dotenv from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) -from spark_utils import execute_snowflake_query, spark_session_builder - -from airflow.models import Variable +from spark_utils import execute_snowflake_query, spark_session_builder, escape_quotes load_dotenv() @@ -16,8 +14,8 @@ SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP" SNOWFLAKE_SCHEMA = "RAW_DATA" SNOWFLAKE_PROPERTIES = { - "user": os.environ.get("SNOWFLAKE_USER"), - "password": os.environ.get("SNOWFLAKE_PASSWORD"), + "user": os.environ.get("SNOWFLAKE_USER_BSH"), + "password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"), "account": os.environ.get("SNOWFLAKE_ACCOUNT"), "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), @@ -37,26 +35,19 @@ # ------------------------------------------------- spark = spark_session_builder("app") -schema = StructType( - [ - StructField("song", StringType(), True), - StructField("artist", StringType(), True), - StructField("location", StringType(), True), - StructField("sessionId", IntegerType(), True), - StructField("userId", IntegerType(), True), - # Snowflake의 BIGINT에 맞게 LongType 사용 - StructField("ts", LongType(), True), - ] -) - # S3에서 데이터 읽어오기 df = spark.read.json( f"{S3_BUCKET}/topics/eventsim_music_streaming/year={year}/month={month}/day={day}/*.json" ) -df_clean = df.dropna(subset=["song", "artist"]) +df_clean = df.select("song", "artist", "location", "sessionId", "userId", "ts") \ + .filter((df.song.isNotNull()) & (df.artist.isNotNull()) & (df.page != "Home"))\ + .fillna("NULL") # df_clean = df.wehre("song IS NOT NULL AND artist IS NOT NULL") +print(df_clean.show(5)) +print(f"Data count: {df_clean.count()}") + # -------------------CREATE TABLE-------------------- # 테이블 생성 create_table_sql = f""" @@ -87,26 +78,42 @@ execute_snowflake_query(create_temp_table_sql, SNOWFLAKE_PROPERTIES) print("TEMP 테이블 확인 완료") +# TEMP 테이블에 데이터 INSERT +# data_to_insert = [tuple(row) for row in df_clean.collect()] +# for row in data_to_insert: +# insert_temp_table_sql = f""" +# INSERT INTO {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} (song, artist, location, sessionId, userId, ts) +# VALUES (%s, %s, %s, %s, %s, %s) +# """ + +# execute_snowflake_query(insert_temp_table_sql, SNOWFLAKE_PROPERTIES, data=row) +df_clean.write\ + .format("jdbc")\ + .options(**SNOWFLAKE_PROPERTIES)\ + .option("dbtable", f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}")\ + .mode("overwrite").save() +print("TEMP 테이블 적재 완료") + # Snowflake에서 MERGE 수행 merge_sql = f""" MERGE INTO {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE} AS target USING {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} AS s - ON target.USERID = s."userId" - AND target.TS = s."ts" - AND target.SONG = s."song" + ON target.USERID = s.userId + AND target.TS = s.ts + AND target.SONG = s.song WHEN MATCHED THEN UPDATE SET - target.LOCATION = s."location", - target.SESSIONID = s."sessionId" + target.LOCATION = s.location, + target.SESSIONID = s.sessionId WHEN NOT MATCHED THEN INSERT ("SONG", "ARTIST", "LOCATION", "SESSIONID", "USERID", "TS") - VALUES (s."song", s."artist", s."location", s."sessionId", s."userId", s."ts"); + VALUES (s.song, s.artist, s.location, s.sessionId, s.userId, s.ts); """ execute_snowflake_query(merge_sql, SNOWFLAKE_PROPERTIES) print("Merge 완료") # -------------------DROP TABLE-------------------- -# 임시 테이블 삭제제 +# 임시 테이블 삭제 drop_table_sql = f""" DROP TABLE {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}; """ From 855b60e34c0bb113d3960c83b0a48aef80500d2a Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Sat, 15 Mar 2025 14:22:14 +0000 Subject: [PATCH 2/2] Automated format fixes --- airflow/dags/ELT_eventsim_song_count_DAG.py | 71 +++++++++++++-------- airflow/dags/plugins/spark_utils.py | 50 +++++++++------ airflow/dags/scripts/ETL_eventsim_script.py | 25 +++++--- 3 files changed, 90 insertions(+), 56 deletions(-) diff --git a/airflow/dags/ELT_eventsim_song_count_DAG.py b/airflow/dags/ELT_eventsim_song_count_DAG.py index 05a1742..03f0a91 100644 --- a/airflow/dags/ELT_eventsim_song_count_DAG.py +++ b/airflow/dags/ELT_eventsim_song_count_DAG.py @@ -1,19 +1,21 @@ import os from datetime import timedelta + +import pandas as pd +import snowflake.connector from dotenv import load_dotenv +from spark_utils import execute_snowflake_query + from airflow import DAG +from airflow.exceptions import AirflowFailException from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.dates import days_ago -from airflow.exceptions import AirflowFailException -import snowflake.connector -import pandas as pd -from spark_utils import execute_snowflake_query load_dotenv() # SNOWFLAKE 설정 -SNOWFLAKE_SOURCE_TABLE = 'EVENTSIM_LOG' +SNOWFLAKE_SOURCE_TABLE = "EVENTSIM_LOG" SNOWFLAKE_SOURCE_SCHEMA = os.environ.get("SNOWFLAKE_SILVER_SCHMEA", "RAW_DATA") SNOWFLAKE_TARGET_SONG_TABLE = "EVENTSIM_SONG_COUNTS" SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS" @@ -28,11 +30,12 @@ "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), } + def extract_data_from_snowflake(): """ Snowflake에서 데이터를 읽어오는 함수 """ - SNOWFLAKE_PROPERTIES['schema'] = SNOWFLAKE_SOURCE_SCHEMA + SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_SOURCE_SCHEMA query = f""" SELECT SONG, ARTIST, LOCATION, SESSIONID, USERID, TS @@ -56,28 +59,33 @@ def extract_data_from_snowflake(): print(df.head(5)) print("Data successfully extracted from Snowflake!") return df - + except Exception as e: print(f"Error extracting data from Snowflake: {e}") return None - + finally: cur.close() conn.close() + def process_song_counts(**kwargs): - SNOWFLAKE_PROPERTIES['schema'] = SNOWFLAKE_TARGET_SCHEMA - ti = kwargs['ti'] - df = ti.xcom_pull(task_ids='extract_data') + SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA + ti = kwargs["ti"] + df = ti.xcom_pull(task_ids="extract_data") if df is None or df.empty: - raise AirflowFailException("No data available for processing song counts.") - + raise AirflowFailException( + "No data available for processing song counts.") + # 노래 카운트 계산 - song_counts = df.groupby(["SONG", "ARTIST"]).size().reset_index(name="song_count") - + song_counts = df.groupby( + ["SONG", "ARTIST"]).size().reset_index(name="song_count") + # 테이블 비우기 - execute_snowflake_query(f"TRUNCATE TABLE {SNOWFLAKE_TARGET_SONG_TABLE};", SNOWFLAKE_PROPERTIES) + execute_snowflake_query( + f"TRUNCATE TABLE {SNOWFLAKE_TARGET_SONG_TABLE};", SNOWFLAKE_PROPERTIES + ) # SQL 쿼리 (executemany 사용) insert_query = f""" @@ -86,26 +94,36 @@ def process_song_counts(**kwargs): """ # DataFrame을 리스트로 변환 후 executemany 실행 - execute_snowflake_query(insert_query, SNOWFLAKE_PROPERTIES, data=song_counts.values.tolist()) + execute_snowflake_query( + insert_query, SNOWFLAKE_PROPERTIES, data=song_counts.values.tolist() + ) print(f"Data written to {SNOWFLAKE_TARGET_SONG_TABLE} in Snowflake.") + def process_artist_counts(**kwargs): - SNOWFLAKE_PROPERTIES['schema'] = SNOWFLAKE_TARGET_SCHEMA - ti = kwargs['ti'] - df = ti.xcom_pull(task_ids='extract_data') + SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA + ti = kwargs["ti"] + df = ti.xcom_pull(task_ids="extract_data") if df is None or df.empty: - raise AirflowFailException("No data available for processing artist counts.") - - artist_counts = df.groupby("ARTIST").size().reset_index(name="artist_count") - execute_snowflake_query(f"TRUNCATE TABLE {SNOWFLAKE_TARGET_ARTIST_TABLE};", SNOWFLAKE_PROPERTIES) + raise AirflowFailException( + "No data available for processing artist counts.") + + artist_counts = df.groupby( + "ARTIST").size().reset_index(name="artist_count") + execute_snowflake_query( + f"TRUNCATE TABLE {SNOWFLAKE_TARGET_ARTIST_TABLE};", + SNOWFLAKE_PROPERTIES) insert_query = f""" INSERT INTO {SNOWFLAKE_TARGET_ARTIST_TABLE} (ARTIST, artist_count) VALUES (%s, %s) """ - execute_snowflake_query(insert_query, SNOWFLAKE_PROPERTIES, artist_counts.values.tolist()) + execute_snowflake_query( + insert_query, SNOWFLAKE_PROPERTIES, artist_counts.values.tolist() + ) print(f"Data written to {SNOWFLAKE_TARGET_ARTIST_TABLE} in Snowflake.") + # DAG 설정 default_args = { "owner": "sanghyoek_boo", @@ -151,4 +169,5 @@ def process_artist_counts(**kwargs): dag=dag, ) -trigger_dag_task >> extract_data_task >> [process_song_task, process_artist_task] +trigger_dag_task >> extract_data_task >> [ + process_song_task, process_artist_task] diff --git a/airflow/dags/plugins/spark_utils.py b/airflow/dags/plugins/spark_utils.py index 697f547..4261ee7 100644 --- a/airflow/dags/plugins/spark_utils.py +++ b/airflow/dags/plugins/spark_utils.py @@ -1,8 +1,9 @@ import os +import pandas as pd import snowflake.connector from pyspark.sql import SparkSession -import pandas as pd + from airflow.exceptions import AirflowFailException # Spark JARs 설정 @@ -15,9 +16,10 @@ SPARK_JARS_LIST = [ "snowflake-jdbc-3.9.2.jar", "hadoop-aws-3.3.4.jar", - "aws-java-sdk-bundle-1.12.262.jar" + "aws-java-sdk-bundle-1.12.262.jar", ] -SPARK_JARS = ",".join([os.path.join(SPARK_JARS_DIR, jar) for jar in SPARK_JARS_LIST]) +SPARK_JARS = ",".join([os.path.join(SPARK_JARS_DIR, jar) + for jar in SPARK_JARS_LIST]) # Spark Session builder @@ -31,24 +33,27 @@ def spark_session_builder(app_name: str) -> SparkSession: SparkSession """ return ( - SparkSession.builder.appName(f"{app_name}") - .config("spark.jars", SPARK_JARS) - .config("spark.driver.extraClassPath", "/opt/spark/jars/snowflake-jdbc-3.9.2.jar") - .config("spark.executor.extraClassPath", SPARK_JARS) - .getOrCreate() - ) + SparkSession.builder.appName(f"{app_name}") .config( + "spark.jars", + SPARK_JARS) .config( + "spark.driver.extraClassPath", + "/opt/spark/jars/snowflake-jdbc-3.9.2.jar") .config( + "spark.executor.extraClassPath", + SPARK_JARS) .getOrCreate()) -def execute_snowflake_query(query: str, snowflake_options: dict, data=None, fetch=False): +def execute_snowflake_query( + query: str, snowflake_options: dict, data=None, fetch=False +): """ Snowflake에서 SQL 쿼리를 실행하거나 데이터를 조회하는 함수 - + Args: query (str): 실행할 SQL 쿼리 snowflake_options (dict): Snowflake 접속 정보 data (list, optional): executemany를 사용할 경우 전달할 데이터 리스트 fetch (bool, optional): SELECT 쿼리 실행 후 데이터를 반환할지 여부 - + Returns: pd.DataFrame | None: fetch=True인 경우 DataFrame 반환, 그렇지 않으면 None 반환 """ @@ -65,39 +70,42 @@ def execute_snowflake_query(query: str, snowflake_options: dict, data=None, fetc role=snowflake_options["role"], ) cur = conn.cursor() - + if data: if isinstance(data, list): cur.executemany(query, data) else: cur.execute(query, data) conn.commit() - + elif not fetch: cur.execute(query) conn.commit() - + if fetch: result = cur.fetchall() # 데이터 가져오기 - print('result: ' + result) + print("result: " + result) if cur.description: # 컬럼 정보가 존재할 경우에만 DataFrame 생성 - df = pd.DataFrame(result, columns=[desc[0] for desc in cur.description]) + df = pd.DataFrame( + result, columns=[ + desc[0] for desc in cur.description]) else: df = pd.DataFrame() # 빈 DataFrame 반환 cur.close() conn.close() return df - + cur.close() conn.close() print("Query executed successfully.") except Exception as e: print(f"Execute_snowflake_query Error: {e}") - print(f'Query: {query}') - print(f'Data: {data}') + print(f"Query: {query}") + print(f"Data: {data}") raise AirflowFailException("execute query error") + def escape_quotes(value): if value is None: return "NULL" - return "'{}'".format(value.replace("'", "''")) \ No newline at end of file + return "'{}'".format(value.replace("'", "''")) diff --git a/airflow/dags/scripts/ETL_eventsim_script.py b/airflow/dags/scripts/ETL_eventsim_script.py index 0f223f8..3005552 100644 --- a/airflow/dags/scripts/ETL_eventsim_script.py +++ b/airflow/dags/scripts/ETL_eventsim_script.py @@ -5,7 +5,8 @@ from dotenv import load_dotenv from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) -from spark_utils import execute_snowflake_query, spark_session_builder, escape_quotes +from spark_utils import (escape_quotes, execute_snowflake_query, + spark_session_builder) load_dotenv() @@ -40,9 +41,17 @@ f"{S3_BUCKET}/topics/eventsim_music_streaming/year={year}/month={month}/day={day}/*.json" ) -df_clean = df.select("song", "artist", "location", "sessionId", "userId", "ts") \ - .filter((df.song.isNotNull()) & (df.artist.isNotNull()) & (df.page != "Home"))\ - .fillna("NULL") +df_clean = ( + df.select( + "song", + "artist", + "location", + "sessionId", + "userId", + "ts") .filter( + (df.song.isNotNull()) & ( + df.artist.isNotNull()) & ( + df.page != "Home")) .fillna("NULL")) # df_clean = df.wehre("song IS NOT NULL AND artist IS NOT NULL") print(df_clean.show(5)) @@ -87,11 +96,9 @@ # """ # execute_snowflake_query(insert_temp_table_sql, SNOWFLAKE_PROPERTIES, data=row) -df_clean.write\ - .format("jdbc")\ - .options(**SNOWFLAKE_PROPERTIES)\ - .option("dbtable", f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}")\ - .mode("overwrite").save() +df_clean.write.format("jdbc").options(**SNOWFLAKE_PROPERTIES).option( + "dbtable", f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}" +).mode("overwrite").save() print("TEMP 테이블 적재 완료") # Snowflake에서 MERGE 수행