diff --git a/airflow/.env b/airflow/.env index e983c51..cbc0216 100644 --- a/airflow/.env +++ b/airflow/.env @@ -18,7 +18,11 @@ AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU AWS_SECRET_ACCESS_KEY=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 AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정 - +# BSH SNOWFLAKE CONN INFO +SNOWFLAKE_USER_BSH= BSH +SNOWFLAKE_PASSWORD_BSH= BSH1234! +SNOWFLAKE_SILVER_SCHEMA = RAW_DATA +SNOWFLAKE_GOLD_SCHEMA = ANALYTICS #S3_Spark_SnowFlake_ELT SNOWFLAKE_USER= BY diff --git a/airflow/dags/ELT_eventsim_song_count_DAG.py b/airflow/dags/ELT_eventsim_song_count_DAG.py new file mode 100644 index 0000000..03f0a91 --- /dev/null +++ b/airflow/dags/ELT_eventsim_song_count_DAG.py @@ -0,0 +1,173 @@ +import os +from datetime import timedelta + +import pandas as pd +import snowflake.connector +from dotenv import load_dotenv +from spark_utils import execute_snowflake_query + +from airflow import DAG +from airflow.exceptions import AirflowFailException +from airflow.operators.python import PythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.utils.dates import days_ago + +load_dotenv() + +# SNOWFLAKE 설정 +SNOWFLAKE_SOURCE_TABLE = "EVENTSIM_LOG" +SNOWFLAKE_SOURCE_SCHEMA = os.environ.get("SNOWFLAKE_SILVER_SCHMEA", "RAW_DATA") +SNOWFLAKE_TARGET_SONG_TABLE = "EVENTSIM_SONG_COUNTS" +SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS" +SNOWFLAKE_TARGET_SCHEMA = os.environ.get("SNOWFLAKE_GOLD_SCHEMA", "ANALYTICS") + +SNOWFLAKE_PROPERTIES = { + "user": os.environ.get("SNOWFLAKE_USER_BSH"), + "password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"), + "account": os.environ.get("SNOWFLAKE_ACCOUNT"), + "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), +} + + +def extract_data_from_snowflake(): + """ + Snowflake에서 데이터를 읽어오는 함수 + """ + SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_SOURCE_SCHEMA + + query = f""" + SELECT SONG, ARTIST, LOCATION, SESSIONID, USERID, TS + FROM {SNOWFLAKE_SOURCE_TABLE} + """ + try: + conn = snowflake.connector.connect( + user=SNOWFLAKE_PROPERTIES["user"], + password=SNOWFLAKE_PROPERTIES["password"], + account=SNOWFLAKE_PROPERTIES["account"], + database=SNOWFLAKE_PROPERTIES["db"], + schema=SNOWFLAKE_PROPERTIES["schema"], + warehouse=SNOWFLAKE_PROPERTIES["warehouse"], + role=SNOWFLAKE_PROPERTIES["role"], + ) + cur = conn.cursor() + rows = cur.execute(query).fetchall() + columns = [desc[0] for desc in cur.description] + + df = pd.DataFrame(rows, columns=columns) + print(df.head(5)) + print("Data successfully extracted from Snowflake!") + return df + + except Exception as e: + print(f"Error extracting data from Snowflake: {e}") + return None + + finally: + cur.close() + conn.close() + + +def process_song_counts(**kwargs): + SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA + ti = kwargs["ti"] + df = ti.xcom_pull(task_ids="extract_data") + + if df is None or df.empty: + raise AirflowFailException( + "No data available for processing song counts.") + + # 노래 카운트 계산 + song_counts = df.groupby( + ["SONG", "ARTIST"]).size().reset_index(name="song_count") + + # 테이블 비우기 + execute_snowflake_query( + f"TRUNCATE TABLE {SNOWFLAKE_TARGET_SONG_TABLE};", SNOWFLAKE_PROPERTIES + ) + + # SQL 쿼리 (executemany 사용) + insert_query = f""" + INSERT INTO {SNOWFLAKE_TARGET_SONG_TABLE} (SONG, ARTIST, song_count) + VALUES (%s, %s, %s) + """ + + # DataFrame을 리스트로 변환 후 executemany 실행 + execute_snowflake_query( + insert_query, SNOWFLAKE_PROPERTIES, data=song_counts.values.tolist() + ) + + print(f"Data written to {SNOWFLAKE_TARGET_SONG_TABLE} in Snowflake.") + + +def process_artist_counts(**kwargs): + SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA + ti = kwargs["ti"] + df = ti.xcom_pull(task_ids="extract_data") + if df is None or df.empty: + raise AirflowFailException( + "No data available for processing artist counts.") + + artist_counts = df.groupby( + "ARTIST").size().reset_index(name="artist_count") + execute_snowflake_query( + f"TRUNCATE TABLE {SNOWFLAKE_TARGET_ARTIST_TABLE};", + SNOWFLAKE_PROPERTIES) + insert_query = f""" + INSERT INTO {SNOWFLAKE_TARGET_ARTIST_TABLE} (ARTIST, artist_count) + VALUES (%s, %s) + """ + execute_snowflake_query( + insert_query, SNOWFLAKE_PROPERTIES, artist_counts.values.tolist() + ) + print(f"Data written to {SNOWFLAKE_TARGET_ARTIST_TABLE} in Snowflake.") + + +# DAG 설정 +default_args = { + "owner": "sanghyoek_boo", + "retries": 1, + "retry_delay": timedelta(minutes=5), + "start_date": days_ago(1), +} + +dag = DAG( + dag_id="ELT_eventsim_song_artist_count", + default_args=default_args, + schedule_interval="@daily", + catchup=False, + tags=["ELT", "Eventsim"], +) + +trigger_dag_task = TriggerDagRunOperator( + task_id="trigger_eventsim_etl", + trigger_dag_id="eventsim_ETL", # 실행할 대상 DAG ID + wait_for_completion=True, # 완료될 때까지 대기 + poke_interval=10, # DAG 상태 체크 주기 (초 단위) + dag=dag, +) + +extract_data_task = PythonOperator( + task_id="extract_data", + python_callable=extract_data_from_snowflake, + provide_context=True, + dag=dag, +) + +process_song_task = PythonOperator( + task_id="process_song_counts", + python_callable=process_song_counts, + provide_context=True, + dag=dag, +) + +process_artist_task = PythonOperator( + task_id="process_artist_counts", + python_callable=process_artist_counts, + provide_context=True, + dag=dag, +) + +trigger_dag_task >> extract_data_task >> [ + process_song_task, process_artist_task] diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/ETL_eventsim_DAG.py similarity index 96% rename from airflow/dags/eventsim_ETL_DAG.py rename to airflow/dags/ETL_eventsim_DAG.py index baf5058..668dbc8 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/ETL_eventsim_DAG.py @@ -41,7 +41,7 @@ # SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE spark_job = SparkSubmitOperator( task_id="spark_process_s3_upsert", - application="/opt/airflow/dags/scripts/eventsim_ETL_script.py", + application="/opt/airflow/dags/scripts/ETL_eventsim_script.py", conn_id="spark_conn", application_args=[ S3_BUCKET, diff --git a/airflow/dags/eventsim_song_count_ELT.py b/airflow/dags/eventsim_song_count_ELT.py deleted file mode 100644 index 68f7dc9..0000000 --- a/airflow/dags/eventsim_song_count_ELT.py +++ /dev/null @@ -1,50 +0,0 @@ -from datetime import timedelta - -from airflow import DAG -from airflow.operators.trigger_dagrun import TriggerDagRunOperator -from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator -from airflow.utils.dates import days_ago - -SOURCE_TABLE = "RAW_DATA.EVENTSIM_LOG" -TARGET_TABLE = "ANALYTICS.EVENTSIM_SONG_COUNTS" - -SQL_QUERY = f""" - CREATE OR REPLACE TABLE {TARGET_TABLE} AS - SELECT SONG, ARTIST, COUNT(*) AS song_count - FROM {SOURCE_TABLE} - GROUP BY SONG, ARTIST - ORDER BY song_count DESC; -""" - -default_args = { - "owner": "sanghyoek_boo", - "retries": 1, - "retry_delay": timedelta(minutes=5), - "start_date": days_ago(1), -} - -dag = DAG( - dag_id="evetnsim_song_count_ELT", - default_args=default_args, - schedule_interval="@daily", - catchup=False, - tags=["ELT", "Eventsim", "Song Count"], -) - -trigger_dag_task = TriggerDagRunOperator( - task_id="trigger_task", - trigger_dag_id="eventsim_ETL", - wait_for_completion=True, - poke_interval=10, - dag=dag, -) - -run_snowflake_query = SnowflakeOperator( - task_id="aggregate_song_counts", - sql=SQL_QUERY, - snowflake_conn_id="snowflake_conn", - dag=dag, -) - -if __name__ == "__main__": - trigger_dag_task >> run_snowflake_query diff --git a/airflow/dags/plugins/spark_utils.py b/airflow/dags/plugins/spark_utils.py index 6c1a461..4261ee7 100644 --- a/airflow/dags/plugins/spark_utils.py +++ b/airflow/dags/plugins/spark_utils.py @@ -1,19 +1,25 @@ import os +import pandas as pd import snowflake.connector from pyspark.sql import SparkSession -from airflow.models import connection +from airflow.exceptions import AirflowFailException # Spark JARs 설정 -SPARK_HOME = "/opt/spark/" -SPARK_JARS = ",".join( - [ - os.path.join(SPARK_HOME, "jars", "snowflake-jdbc-3.9.2.jar"), - os.path.join(SPARK_HOME, "jars", "hadoop-aws-3.3.4.jar"), - os.path.join(SPARK_HOME, "jars", "aws-java-sdk-bundle-1.12.262.jar"), - ] -) +# SPARK_HOME 설정 +SPARK_HOME = "/opt/spark" +os.environ["SPARK_HOME"] = SPARK_HOME + +# JAR 경로 설정 +SPARK_JARS_DIR = os.path.join(SPARK_HOME, "jars") +SPARK_JARS_LIST = [ + "snowflake-jdbc-3.9.2.jar", + "hadoop-aws-3.3.4.jar", + "aws-java-sdk-bundle-1.12.262.jar", +] +SPARK_JARS = ",".join([os.path.join(SPARK_JARS_DIR, jar) + for jar in SPARK_JARS_LIST]) # Spark Session builder @@ -27,16 +33,32 @@ def spark_session_builder(app_name: str) -> SparkSession: SparkSession """ return ( - SparkSession.builder.appName(f"{app_name}") - .config("spark.jars", SPARK_JARS) - .getOrCreate() - ) + SparkSession.builder.appName(f"{app_name}") .config( + "spark.jars", + SPARK_JARS) .config( + "spark.driver.extraClassPath", + "/opt/spark/jars/snowflake-jdbc-3.9.2.jar") .config( + "spark.executor.extraClassPath", + SPARK_JARS) .getOrCreate()) -def execute_snowflake_query(query, snowflake_options): +def execute_snowflake_query( + query: str, snowflake_options: dict, data=None, fetch=False +): """ - Snowflake에서 SQL 쿼리를 실행하는 함수 + Snowflake에서 SQL 쿼리를 실행하거나 데이터를 조회하는 함수 + + Args: + query (str): 실행할 SQL 쿼리 + snowflake_options (dict): Snowflake 접속 정보 + data (list, optional): executemany를 사용할 경우 전달할 데이터 리스트 + fetch (bool, optional): SELECT 쿼리 실행 후 데이터를 반환할지 여부 + + Returns: + pd.DataFrame | None: fetch=True인 경우 DataFrame 반환, 그렇지 않으면 None 반환 """ + print(f"snowflake_opt: {snowflake_options}") + try: conn = snowflake.connector.connect( user=snowflake_options["user"], @@ -45,12 +67,45 @@ def execute_snowflake_query(query, snowflake_options): database=snowflake_options["db"], schema=snowflake_options["schema"], warehouse=snowflake_options["warehouse"], + role=snowflake_options["role"], ) cur = conn.cursor() - cur.execute(query) - conn.commit() + + if data: + if isinstance(data, list): + cur.executemany(query, data) + else: + cur.execute(query, data) + conn.commit() + + elif not fetch: + cur.execute(query) + conn.commit() + + if fetch: + result = cur.fetchall() # 데이터 가져오기 + print("result: " + result) + if cur.description: # 컬럼 정보가 존재할 경우에만 DataFrame 생성 + df = pd.DataFrame( + result, columns=[ + desc[0] for desc in cur.description]) + else: + df = pd.DataFrame() # 빈 DataFrame 반환 + cur.close() + conn.close() + return df + cur.close() conn.close() print("Query executed successfully.") except Exception as e: print(f"Execute_snowflake_query Error: {e}") + print(f"Query: {query}") + print(f"Data: {data}") + raise AirflowFailException("execute query error") + + +def escape_quotes(value): + if value is None: + return "NULL" + return "'{}'".format(value.replace("'", "''")) diff --git a/airflow/dags/scripts/eventsim_ETL_script.py b/airflow/dags/scripts/ETL_eventsim_script.py similarity index 66% rename from airflow/dags/scripts/eventsim_ETL_script.py rename to airflow/dags/scripts/ETL_eventsim_script.py index 35cb843..3005552 100644 --- a/airflow/dags/scripts/eventsim_ETL_script.py +++ b/airflow/dags/scripts/ETL_eventsim_script.py @@ -5,9 +5,8 @@ from dotenv import load_dotenv from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) -from spark_utils import execute_snowflake_query, spark_session_builder - -from airflow.models import Variable +from spark_utils import (escape_quotes, execute_snowflake_query, + spark_session_builder) load_dotenv() @@ -16,8 +15,8 @@ SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP" SNOWFLAKE_SCHEMA = "RAW_DATA" SNOWFLAKE_PROPERTIES = { - "user": os.environ.get("SNOWFLAKE_USER"), - "password": os.environ.get("SNOWFLAKE_PASSWORD"), + "user": os.environ.get("SNOWFLAKE_USER_BSH"), + "password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"), "account": os.environ.get("SNOWFLAKE_ACCOUNT"), "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), @@ -37,26 +36,27 @@ # ------------------------------------------------- spark = spark_session_builder("app") -schema = StructType( - [ - StructField("song", StringType(), True), - StructField("artist", StringType(), True), - StructField("location", StringType(), True), - StructField("sessionId", IntegerType(), True), - StructField("userId", IntegerType(), True), - # Snowflake의 BIGINT에 맞게 LongType 사용 - StructField("ts", LongType(), True), - ] -) - # S3에서 데이터 읽어오기 df = spark.read.json( f"{S3_BUCKET}/topics/eventsim_music_streaming/year={year}/month={month}/day={day}/*.json" ) -df_clean = df.dropna(subset=["song", "artist"]) +df_clean = ( + df.select( + "song", + "artist", + "location", + "sessionId", + "userId", + "ts") .filter( + (df.song.isNotNull()) & ( + df.artist.isNotNull()) & ( + df.page != "Home")) .fillna("NULL")) # df_clean = df.wehre("song IS NOT NULL AND artist IS NOT NULL") +print(df_clean.show(5)) +print(f"Data count: {df_clean.count()}") + # -------------------CREATE TABLE-------------------- # 테이블 생성 create_table_sql = f""" @@ -87,26 +87,40 @@ execute_snowflake_query(create_temp_table_sql, SNOWFLAKE_PROPERTIES) print("TEMP 테이블 확인 완료") +# TEMP 테이블에 데이터 INSERT +# data_to_insert = [tuple(row) for row in df_clean.collect()] +# for row in data_to_insert: +# insert_temp_table_sql = f""" +# INSERT INTO {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} (song, artist, location, sessionId, userId, ts) +# VALUES (%s, %s, %s, %s, %s, %s) +# """ + +# execute_snowflake_query(insert_temp_table_sql, SNOWFLAKE_PROPERTIES, data=row) +df_clean.write.format("jdbc").options(**SNOWFLAKE_PROPERTIES).option( + "dbtable", f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}" +).mode("overwrite").save() +print("TEMP 테이블 적재 완료") + # Snowflake에서 MERGE 수행 merge_sql = f""" MERGE INTO {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE} AS target USING {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} AS s - ON target.USERID = s."userId" - AND target.TS = s."ts" - AND target.SONG = s."song" + ON target.USERID = s.userId + AND target.TS = s.ts + AND target.SONG = s.song WHEN MATCHED THEN UPDATE SET - target.LOCATION = s."location", - target.SESSIONID = s."sessionId" + target.LOCATION = s.location, + target.SESSIONID = s.sessionId WHEN NOT MATCHED THEN INSERT ("SONG", "ARTIST", "LOCATION", "SESSIONID", "USERID", "TS") - VALUES (s."song", s."artist", s."location", s."sessionId", s."userId", s."ts"); + VALUES (s.song, s.artist, s.location, s.sessionId, s.userId, s.ts); """ execute_snowflake_query(merge_sql, SNOWFLAKE_PROPERTIES) print("Merge 완료") # -------------------DROP TABLE-------------------- -# 임시 테이블 삭제제 +# 임시 테이블 삭제 drop_table_sql = f""" DROP TABLE {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}; """