Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion airflow/.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU
AWS_SECRET_ACCESS_KEY=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
AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정


# BSH SNOWFLAKE CONN INFO
SNOWFLAKE_USER_BSH= BSH
SNOWFLAKE_PASSWORD_BSH= BSH1234!
SNOWFLAKE_SILVER_SCHEMA = RAW_DATA
SNOWFLAKE_GOLD_SCHEMA = ANALYTICS

#S3_Spark_SnowFlake_ELT
SNOWFLAKE_USER= BY
Expand Down
173 changes: 173 additions & 0 deletions airflow/dags/ELT_eventsim_song_count_DAG.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import os
from datetime import timedelta

import pandas as pd
import snowflake.connector
from dotenv import load_dotenv
from spark_utils import execute_snowflake_query

from airflow import DAG
from airflow.exceptions import AirflowFailException
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

load_dotenv()

# SNOWFLAKE 설정
SNOWFLAKE_SOURCE_TABLE = "EVENTSIM_LOG"
SNOWFLAKE_SOURCE_SCHEMA = os.environ.get("SNOWFLAKE_SILVER_SCHMEA", "RAW_DATA")
SNOWFLAKE_TARGET_SONG_TABLE = "EVENTSIM_SONG_COUNTS"
SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS"
SNOWFLAKE_TARGET_SCHEMA = os.environ.get("SNOWFLAKE_GOLD_SCHEMA", "ANALYTICS")

SNOWFLAKE_PROPERTIES = {
"user": os.environ.get("SNOWFLAKE_USER_BSH"),
"password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"),
"account": os.environ.get("SNOWFLAKE_ACCOUNT"),
"db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"),
"warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"),
"role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"),
}


def extract_data_from_snowflake():
"""
Snowflake에서 데이터를 읽어오는 함수
"""
SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_SOURCE_SCHEMA

query = f"""
SELECT SONG, ARTIST, LOCATION, SESSIONID, USERID, TS
FROM {SNOWFLAKE_SOURCE_TABLE}
"""
try:
conn = snowflake.connector.connect(
user=SNOWFLAKE_PROPERTIES["user"],
password=SNOWFLAKE_PROPERTIES["password"],
account=SNOWFLAKE_PROPERTIES["account"],
database=SNOWFLAKE_PROPERTIES["db"],
schema=SNOWFLAKE_PROPERTIES["schema"],
warehouse=SNOWFLAKE_PROPERTIES["warehouse"],
role=SNOWFLAKE_PROPERTIES["role"],
)
cur = conn.cursor()
rows = cur.execute(query).fetchall()
columns = [desc[0] for desc in cur.description]

df = pd.DataFrame(rows, columns=columns)
print(df.head(5))
print("Data successfully extracted from Snowflake!")
return df

except Exception as e:
print(f"Error extracting data from Snowflake: {e}")
return None

finally:
cur.close()
conn.close()


def process_song_counts(**kwargs):
SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA
ti = kwargs["ti"]
df = ti.xcom_pull(task_ids="extract_data")

if df is None or df.empty:
raise AirflowFailException(
"No data available for processing song counts.")

# 노래 카운트 계산
song_counts = df.groupby(
["SONG", "ARTIST"]).size().reset_index(name="song_count")

# 테이블 비우기
execute_snowflake_query(
f"TRUNCATE TABLE {SNOWFLAKE_TARGET_SONG_TABLE};", SNOWFLAKE_PROPERTIES
)

# SQL 쿼리 (executemany 사용)
insert_query = f"""
INSERT INTO {SNOWFLAKE_TARGET_SONG_TABLE} (SONG, ARTIST, song_count)
VALUES (%s, %s, %s)
"""

# DataFrame을 리스트로 변환 후 executemany 실행
execute_snowflake_query(
insert_query, SNOWFLAKE_PROPERTIES, data=song_counts.values.tolist()
)

print(f"Data written to {SNOWFLAKE_TARGET_SONG_TABLE} in Snowflake.")


def process_artist_counts(**kwargs):
SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA
ti = kwargs["ti"]
df = ti.xcom_pull(task_ids="extract_data")
if df is None or df.empty:
raise AirflowFailException(
"No data available for processing artist counts.")

artist_counts = df.groupby(
"ARTIST").size().reset_index(name="artist_count")
execute_snowflake_query(
f"TRUNCATE TABLE {SNOWFLAKE_TARGET_ARTIST_TABLE};",
SNOWFLAKE_PROPERTIES)
insert_query = f"""
INSERT INTO {SNOWFLAKE_TARGET_ARTIST_TABLE} (ARTIST, artist_count)
VALUES (%s, %s)
"""
execute_snowflake_query(
insert_query, SNOWFLAKE_PROPERTIES, artist_counts.values.tolist()
)
print(f"Data written to {SNOWFLAKE_TARGET_ARTIST_TABLE} in Snowflake.")


# DAG 설정
default_args = {
"owner": "sanghyoek_boo",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"start_date": days_ago(1),
}

dag = DAG(
dag_id="ELT_eventsim_song_artist_count",
default_args=default_args,
schedule_interval="@daily",
catchup=False,
tags=["ELT", "Eventsim"],
)

trigger_dag_task = TriggerDagRunOperator(
task_id="trigger_eventsim_etl",
trigger_dag_id="eventsim_ETL", # 실행할 대상 DAG ID
wait_for_completion=True, # 완료될 때까지 대기
poke_interval=10, # DAG 상태 체크 주기 (초 단위)
dag=dag,
)

extract_data_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data_from_snowflake,
provide_context=True,
dag=dag,
)

process_song_task = PythonOperator(
task_id="process_song_counts",
python_callable=process_song_counts,
provide_context=True,
dag=dag,
)

process_artist_task = PythonOperator(
task_id="process_artist_counts",
python_callable=process_artist_counts,
provide_context=True,
dag=dag,
)

trigger_dag_task >> extract_data_task >> [
process_song_task, process_artist_task]
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
# SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE
spark_job = SparkSubmitOperator(
task_id="spark_process_s3_upsert",
application="/opt/airflow/dags/scripts/eventsim_ETL_script.py",
application="/opt/airflow/dags/scripts/ETL_eventsim_script.py",
conn_id="spark_conn",
application_args=[
S3_BUCKET,
Expand Down
50 changes: 0 additions & 50 deletions airflow/dags/eventsim_song_count_ELT.py

This file was deleted.

89 changes: 72 additions & 17 deletions airflow/dags/plugins/spark_utils.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import os

import pandas as pd
import snowflake.connector
from pyspark.sql import SparkSession

from airflow.models import connection
from airflow.exceptions import AirflowFailException

# Spark JARs 설정
SPARK_HOME = "/opt/spark/"
SPARK_JARS = ",".join(
[
os.path.join(SPARK_HOME, "jars", "snowflake-jdbc-3.9.2.jar"),
os.path.join(SPARK_HOME, "jars", "hadoop-aws-3.3.4.jar"),
os.path.join(SPARK_HOME, "jars", "aws-java-sdk-bundle-1.12.262.jar"),
]
)
# SPARK_HOME 설정
SPARK_HOME = "/opt/spark"
os.environ["SPARK_HOME"] = SPARK_HOME

# JAR 경로 설정
SPARK_JARS_DIR = os.path.join(SPARK_HOME, "jars")
SPARK_JARS_LIST = [
"snowflake-jdbc-3.9.2.jar",
"hadoop-aws-3.3.4.jar",
"aws-java-sdk-bundle-1.12.262.jar",
]
SPARK_JARS = ",".join([os.path.join(SPARK_JARS_DIR, jar)
for jar in SPARK_JARS_LIST])


# Spark Session builder
Expand All @@ -27,16 +33,32 @@ def spark_session_builder(app_name: str) -> SparkSession:
SparkSession
"""
return (
SparkSession.builder.appName(f"{app_name}")
.config("spark.jars", SPARK_JARS)
.getOrCreate()
)
SparkSession.builder.appName(f"{app_name}") .config(
"spark.jars",
SPARK_JARS) .config(
"spark.driver.extraClassPath",
"/opt/spark/jars/snowflake-jdbc-3.9.2.jar") .config(
"spark.executor.extraClassPath",
SPARK_JARS) .getOrCreate())


def execute_snowflake_query(query, snowflake_options):
def execute_snowflake_query(
query: str, snowflake_options: dict, data=None, fetch=False
):
"""
Snowflake에서 SQL 쿼리를 실행하는 함수
Snowflake에서 SQL 쿼리를 실행하거나 데이터를 조회하는 함수

Args:
query (str): 실행할 SQL 쿼리
snowflake_options (dict): Snowflake 접속 정보
data (list, optional): executemany를 사용할 경우 전달할 데이터 리스트
fetch (bool, optional): SELECT 쿼리 실행 후 데이터를 반환할지 여부

Returns:
pd.DataFrame | None: fetch=True인 경우 DataFrame 반환, 그렇지 않으면 None 반환
"""
print(f"snowflake_opt: {snowflake_options}")

try:
conn = snowflake.connector.connect(
user=snowflake_options["user"],
Expand All @@ -45,12 +67,45 @@ def execute_snowflake_query(query, snowflake_options):
database=snowflake_options["db"],
schema=snowflake_options["schema"],
warehouse=snowflake_options["warehouse"],
role=snowflake_options["role"],
)
cur = conn.cursor()
cur.execute(query)
conn.commit()

if data:
if isinstance(data, list):
cur.executemany(query, data)
else:
cur.execute(query, data)
conn.commit()

elif not fetch:
cur.execute(query)
conn.commit()

if fetch:
result = cur.fetchall() # 데이터 가져오기
print("result: " + result)
if cur.description: # 컬럼 정보가 존재할 경우에만 DataFrame 생성
df = pd.DataFrame(
result, columns=[
desc[0] for desc in cur.description])
else:
df = pd.DataFrame() # 빈 DataFrame 반환
cur.close()
conn.close()
return df

cur.close()
conn.close()
print("Query executed successfully.")
except Exception as e:
print(f"Execute_snowflake_query Error: {e}")
print(f"Query: {query}")
print(f"Data: {data}")
raise AirflowFailException("execute query error")


def escape_quotes(value):
if value is None:
return "NULL"
return "'{}'".format(value.replace("'", "''"))
Loading