From d5d3c22765f87f96e278c9d8e4edd7cd72e7d8a8 Mon Sep 17 00:00:00 2001 From: gland78 Date: Sat, 15 Mar 2025 01:09:21 +0900 Subject: [PATCH 1/6] =?UTF-8?q?=EA=B5=AD=EB=82=B4=20=EC=B0=A8=ED=8A=B8=20E?= =?UTF-8?q?LT=5Fdag=20=EC=9E=91=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/domestic_music_chart_elt.py | 189 +++++++++++++++++++++++ snowflake_ELT.sql | 80 ++++++++++ 2 files changed, 269 insertions(+) create mode 100644 airflow/dags/domestic_music_chart_elt.py create mode 100644 snowflake_ELT.sql diff --git a/airflow/dags/domestic_music_chart_elt.py b/airflow/dags/domestic_music_chart_elt.py new file mode 100644 index 0000000..f2390f9 --- /dev/null +++ b/airflow/dags/domestic_music_chart_elt.py @@ -0,0 +1,189 @@ +from airflow import DAG +from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago +from datetime import timedelta +import time + +def wait_one_minute(): + time.sleep(60) + +default_args = { + 'owner': 'airflow', + 'start_date': days_ago(1), + 'retries': 1, + 'retry_delay': timedelta(minutes=5) +} + +with DAG( + 'domestic_music_chart_dashboard_elt', + default_args=default_args, + schedule_interval='@daily', + catchup=False +) as dag: + + # 시작 Dummy 태스크 + start = DummyOperator(task_id='start') + + # Step 1: 최신 데이터만 추출하여 adhoc 스키마에 저장 (컬럼 "DATE" 사용) + clean_music_chart = SnowflakeOperator( + task_id='clean_music_chart', + snowflake_conn_id='snowflake_conn', + sql=""" + CREATE OR REPLACE TABLE s4tify.adhoc.music_chart_cleaned AS + SELECT + rank, title, artist, genre, lastpos, image, peakpos, isnew, source, "DATE" AS time_date + FROM s4tify.raw_data.music_charts + WHERE "DATE" = (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts); + """ + ) + + # Step 2: 1분 대기 (데이터 저장 후 안정적 처리 위해) + wait_task = PythonOperator( + task_id='wait_one_minute', + python_callable=wait_one_minute, + ) + + # 추가: 대시보드 스키마 생성 (존재하지 않을 경우) + create_dashboard_schema = SnowflakeOperator( + task_id='create_dashboard_schema', + snowflake_conn_id='snowflake_conn', + sql=""" + CREATE SCHEMA IF NOT EXISTS s4tify.analytics; + """ + ) + + # Step 3: 대시보드용 ELT SQL 태스크들 + + ## 3-1. 장르별 인기곡 트렌드 분석 (수정됨) + genre_trend_analysis = SnowflakeOperator( + task_id='genre_trend_analysis', + snowflake_conn_id='snowflake_conn', + sql=""" + CREATE OR REPLACE TABLE s4tify.analytics.genre_trend_analysis AS + SELECT + genre_flattened.value::string AS genre, + COUNT(DISTINCT title) AS total_songs, + AVG(rank) AS avg_rank + FROM s4tify.adhoc.music_chart_cleaned, + LATERAL FLATTEN(input => parse_json(REPLACE(genre, '''', '\"'))) AS genre_flattened + GROUP BY genre_flattened.value + ORDER BY avg_rank; + """ + ) + + ## 3-2. 아티스트별 최고 순위 및 평균 순위 분석 + artist_performance = SnowflakeOperator( + task_id='artist_performance', + snowflake_conn_id='snowflake_conn', + sql=""" + CREATE OR REPLACE TABLE s4tify.analytics.artist_performance AS + SELECT + artist, + COUNT(title) AS total_songs, + MIN(rank) AS best_rank, + AVG(rank) AS avg_rank + FROM s4tify.adhoc.music_chart_cleaned + GROUP BY artist + ORDER BY best_rank; + """ + ) + + ## 3-3. 신곡(NEW) 현황 분석 + new_songs_analysis = SnowflakeOperator( + task_id='new_songs_analysis', + snowflake_conn_id='snowflake_conn', + sql=""" + CREATE OR REPLACE TABLE s4tify.analytics.new_songs_analysis AS + SELECT + COUNT(*) AS total_songs, + SUM(CASE WHEN isnew = TRUE THEN 1 ELSE 0 END) AS new_songs, + ROUND(100 * SUM(CASE WHEN isnew = TRUE THEN 1 ELSE 0 END) / COUNT(*), 2) AS new_song_percentage + FROM s4tify.adhoc.music_chart_cleaned; + """ + ) + + ## 3-4. TOP 10 곡의 안정성 분석 (평균/최대 유지 기간) + top10_stability = SnowflakeOperator( + task_id='top10_stability', + snowflake_conn_id='snowflake_conn', + sql=""" + CREATE OR REPLACE TABLE s4tify.analytics.top10_stability AS + WITH top10 AS ( + SELECT title, COUNT(*) AS weeks_on_chart + FROM s4tify.raw_data.music_charts + WHERE rank <= 10 + GROUP BY title + ) + SELECT + AVG(weeks_on_chart) AS avg_weeks_top10, + MAX(weeks_on_chart) AS max_weeks_top10 + FROM top10; + """ + ) + + ## 3-5. 차트 1위 곡의 주간 유지 기간 분석 + no1_song_duration = SnowflakeOperator( + task_id='no1_song_duration', + snowflake_conn_id='snowflake_conn', + sql=""" + CREATE OR REPLACE TABLE s4tify.analytics.no1_song_duration AS + WITH no1_songs AS ( + SELECT title, COUNT(*) AS weeks_at_no1 + FROM s4tify.raw_data.music_charts + WHERE rank = 1 + GROUP BY title + ) + SELECT + title, + weeks_at_no1 + FROM no1_songs + ORDER BY weeks_at_no1 DESC; + """ + ) + + ## 3-6. 랭킹 상승/하락 곡 분석 + rank_change_analysis = SnowflakeOperator( + task_id='rank_change_analysis', + snowflake_conn_id='snowflake_conn', + sql=""" + CREATE OR REPLACE TABLE s4tify.analytics.rank_change_analysis AS + WITH prev_chart AS ( + SELECT * + FROM s4tify.raw_data.music_charts + WHERE "DATE" = ( + SELECT MAX("DATE") FROM s4tify.raw_data.music_charts + WHERE "DATE" < (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts) + ) + ), + latest_chart AS ( + SELECT * + FROM s4tify.raw_data.music_charts + WHERE "DATE" = (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts) + ) + SELECT + l.title, + l.artist, + p.rank AS prev_rank, + l.rank AS current_rank, + (p.rank - l.rank) AS rank_change + FROM latest_chart l + LEFT JOIN prev_chart p ON l.title = p.title + ORDER BY rank_change DESC; + """ + ) + + # 종료 Dummy 태스크 + end = DummyOperator(task_id='end') + + # DAG 실행 순서 + start >> clean_music_chart >> wait_task + wait_task >> create_dashboard_schema >> [ + genre_trend_analysis, + artist_performance, + new_songs_analysis, + top10_stability, + no1_song_duration, + rank_change_analysis + ] >> end diff --git a/snowflake_ELT.sql b/snowflake_ELT.sql new file mode 100644 index 0000000..0c81094 --- /dev/null +++ b/snowflake_ELT.sql @@ -0,0 +1,80 @@ +-- Step 1: 데이터 정리 및 중복 제거 +CREATE OR REPLACE TABLE s4tify.adhoc.music_chart_cleaned AS +SELECT DISTINCT + RANK AS chart_rank, + TITLE AS song_title, + ARTIST AS artist_name, + GENRE AS genre, + LASTPOS AS last_position, + IMAGE AS album_image, + PEAKPOS AS peak_position, + ISNEW AS is_new, + SOURCE AS platform +FROM s4tify.raw_data.music_charts; + +-- Step 2: 아티스트별 성과 비교 +CREATE OR REPLACE TABLE s4tify.adhoc.artist_performance AS +SELECT + artist_name, + COUNT(DISTINCT song_title) AS total_songs, + AVG(chart_rank) AS avg_rank, + MIN(chart_rank) AS best_rank +FROM s4tify.adhoc.music_chart_cleaned +GROUP BY artist_name; + +-- Step 3: 플랫폼별 인기곡 분석 +CREATE OR REPLACE TABLE s4tify.adhoc.platform_popularity AS +SELECT + platform, + COUNT(DISTINCT song_title) AS total_songs, + AVG(chart_rank) AS avg_rank +FROM s4tify.adhoc.music_chart_cleaned +GROUP BY platform; + +-- Step 4: 장르별 트렌드 분석 +CREATE OR REPLACE TABLE s4tify.adhoc.genre_trends AS +SELECT + genre, + COUNT(DISTINCT song_title) AS total_songs, + AVG(chart_rank) AS avg_rank +FROM s4tify.adhoc.music_chart_cleaned +GROUP BY genre; + +-- Step 5: 신곡과 기존 곡의 성과 비교 +CREATE OR REPLACE TABLE s4tify.adhoc.new_vs_old_songs AS +SELECT + CASE WHEN is_new = 'true' THEN 'New' ELSE 'Old' END AS song_type, + COUNT(*) AS total_songs, + AVG(chart_rank) AS avg_rank +FROM s4tify.adhoc.music_chart_cleaned +GROUP BY song_type; + +-- Step 6: 최고 순위 기록 비교 +CREATE OR REPLACE TABLE s4tify.adhoc.peak_rank_analysis AS +SELECT + artist_name, + song_title, + MIN(chart_rank) AS best_rank, + COUNT(*) AS weeks_on_chart +FROM s4tify.adhoc.music_chart_cleaned +GROUP BY artist_name, song_title; + +-- Step 7: 플랫폼별 최고 순위 기록 +CREATE OR REPLACE TABLE s4tify.adhoc.platform_peak_rank AS +SELECT + platform, + artist_name, + song_title, + MIN(chart_rank) AS best_rank +FROM s4tify.adhoc.music_chart_cleaned +GROUP BY platform, artist_name, song_title; + +-- Step 8: 아티스트별 플랫폼 편중 분석 +CREATE OR REPLACE TABLE s4tify.adhoc.artist_platform_focus AS +SELECT + artist_name, + platform, + COUNT(DISTINCT song_title) AS total_songs +FROM s4tify.adhoc.music_chart_cleaned +GROUP BY artist_name, platform +ORDER BY artist_name, total_songs DESC; From 9bcd38ae772aa5436064a760c13e86f873539ad8 Mon Sep 17 00:00:00 2001 From: gland78 Date: Sun, 16 Mar 2025 21:54:31 +0900 Subject: [PATCH 2/6] =?UTF-8?q?catchup=20False=20=EB=B3=80=EA=B2=BD=20?= =?UTF-8?q?=EB=B0=8F=20ELT=20SQL=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/Bugs_DAG.py | 2 +- airflow/dags/Flo_DAG.py | 2 +- airflow/dags/Genie_DAG.py | 2 +- airflow/dags/Melon_DAG.py | 2 +- airflow/dags/Vibe_DAG.py | 2 +- airflow/dags/domestic_music_chart_elt.py | 11 +++++++---- 6 files changed, 12 insertions(+), 9 deletions(-) diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index 3e9dc9e..7882a72 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -119,7 +119,7 @@ def upload_to_s3(**kwargs): "bugs_chart_dag", default_args=default_args, schedule_interval="10 0 * * *", # 매일 00:10 실행 - catchup=True, + catchup=False, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index 769513a..63a7b20 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -118,7 +118,7 @@ def upload_to_s3(**kwargs): "flo_chart_dag", default_args=default_args, schedule_interval="20 0 * * *", # 매일 00:20 실행 - catchup=True, + catchup=False, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index ae92f55..c62ff25 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -118,7 +118,7 @@ def upload_to_s3(**kwargs): "genie_chart_dag", default_args=default_args, schedule_interval="30 0 * * *", # 매일 00:30 실행 - catchup=True, + catchup=False, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index c7e4fce..da250bb 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -117,7 +117,7 @@ def upload_to_s3(**kwargs): "melon_chart_dag", default_args=default_args, schedule_interval="0 1 * * *", # 매일 01:00 실행 - catchup=True, + catchup=False, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index de30610..4b35f41 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -114,7 +114,7 @@ def upload_to_s3(**kwargs): "vibe_chart_dag", default_args=default_args, schedule_interval="45 0 * * *", # 매일 00:45 실행 - catchup=True, + catchup=False, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/domestic_music_chart_elt.py b/airflow/dags/domestic_music_chart_elt.py index f2390f9..3df639d 100644 --- a/airflow/dags/domestic_music_chart_elt.py +++ b/airflow/dags/domestic_music_chart_elt.py @@ -81,10 +81,11 @@ def wait_one_minute(): CREATE OR REPLACE TABLE s4tify.analytics.artist_performance AS SELECT artist, - COUNT(title) AS total_songs, + COUNT(DISTINCT title) AS total_songs, -- 중복 제거 MIN(rank) AS best_rank, AVG(rank) AS avg_rank FROM s4tify.adhoc.music_chart_cleaned + WHERE time_date = (SELECT MAX(time_date) FROM s4tify.adhoc.music_chart_cleaned) -- 최신 데이터만 사용 GROUP BY artist ORDER BY best_rank; """ @@ -130,15 +131,17 @@ def wait_one_minute(): sql=""" CREATE OR REPLACE TABLE s4tify.analytics.no1_song_duration AS WITH no1_songs AS ( - SELECT title, COUNT(*) AS weeks_at_no1 + SELECT LOWER(title) AS title, COUNT(*) AS weeks_at_no1 FROM s4tify.raw_data.music_charts WHERE rank = 1 - GROUP BY title + GROUP BY LOWER(title) ) SELECT title, - weeks_at_no1 + SUM(weeks_at_no1) AS weeks_at_no1 FROM no1_songs + GROUP BY title + HAVING SUM(weeks_at_no1) >= 2 ORDER BY weeks_at_no1 DESC; """ ) From 5da6fcc2fba13feedab15842fd5f53dc1faf42b3 Mon Sep 17 00:00:00 2001 From: gland78 Date: Sun, 16 Mar 2025 22:46:03 +0900 Subject: [PATCH 3/6] =?UTF-8?q?=20=EC=B5=9C=EC=A2=85=20elt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/domestic_music_chart_elt.py | 51 ++++++++++++++++-------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/airflow/dags/domestic_music_chart_elt.py b/airflow/dags/domestic_music_chart_elt.py index 3df639d..2cbdee0 100644 --- a/airflow/dags/domestic_music_chart_elt.py +++ b/airflow/dags/domestic_music_chart_elt.py @@ -131,18 +131,24 @@ def wait_one_minute(): sql=""" CREATE OR REPLACE TABLE s4tify.analytics.no1_song_duration AS WITH no1_songs AS ( - SELECT LOWER(title) AS title, COUNT(*) AS weeks_at_no1 - FROM s4tify.raw_data.music_charts - WHERE rank = 1 - GROUP BY LOWER(title) + SELECT + LOWER(title) AS title, + YEAR("DATE") AS year, + WEEKOFYEAR("DATE") AS week + FROM ( + SELECT DISTINCT LOWER(title) as title, "DATE" + FROM s4tify.raw_data.music_charts + WHERE rank = 1 + ) unique_no1 + GROUP BY LOWER(title), YEAR("DATE"), WEEKOFYEAR("DATE") ) SELECT title, - SUM(weeks_at_no1) AS weeks_at_no1 + COUNT(*) AS total_weeks_at_no1 FROM no1_songs GROUP BY title - HAVING SUM(weeks_at_no1) >= 2 - ORDER BY weeks_at_no1 DESC; + HAVING COUNT(*) >= 2 + ORDER BY total_weeks_at_no1 DESC; """ ) @@ -153,17 +159,27 @@ def wait_one_minute(): sql=""" CREATE OR REPLACE TABLE s4tify.analytics.rank_change_analysis AS WITH prev_chart AS ( - SELECT * - FROM s4tify.raw_data.music_charts - WHERE "DATE" = ( - SELECT MAX("DATE") FROM s4tify.raw_data.music_charts - WHERE "DATE" < (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts) - ) + SELECT title, artist, rank + FROM ( + SELECT title, artist, rank, "DATE", + ROW_NUMBER() OVER (PARTITION BY title, artist ORDER BY "DATE" DESC) AS rn + FROM s4tify.raw_data.music_charts + WHERE "DATE" = ( + SELECT MAX("DATE") FROM s4tify.raw_data.music_charts + WHERE "DATE" < (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts) + ) + ) + WHERE rn = 1 -- 중복 제거: 가장 최근 데이터만 선택 ), latest_chart AS ( - SELECT * - FROM s4tify.raw_data.music_charts - WHERE "DATE" = (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts) + SELECT title, artist, rank + FROM ( + SELECT title, artist, rank, "DATE", + ROW_NUMBER() OVER (PARTITION BY title, artist ORDER BY "DATE" DESC) AS rn + FROM s4tify.raw_data.music_charts + WHERE "DATE" = (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts) + ) + WHERE rn = 1 -- 중복 제거: 가장 최근 데이터만 선택 ) SELECT l.title, @@ -172,7 +188,8 @@ def wait_one_minute(): l.rank AS current_rank, (p.rank - l.rank) AS rank_change FROM latest_chart l - LEFT JOIN prev_chart p ON l.title = p.title + LEFT JOIN prev_chart p ON l.title = p.title AND l.artist = p.artist + WHERE p.rank IS NOT NULL AND l.rank IS NOT NULL -- NULL 값 제거 ORDER BY rank_change DESC; """ ) From 8d1958cda8b09708c8ac73dd0e1edef767fb4157 Mon Sep 17 00:00:00 2001 From: gland78 Date: Sun, 16 Mar 2025 23:57:51 +0900 Subject: [PATCH 4/6] =?UTF-8?q?create=5Fdashboard=5Fschema=20=ED=83=9C?= =?UTF-8?q?=EC=8A=A4=ED=81=AC=20=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/domestic_music_chart_elt.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/dags/domestic_music_chart_elt.py b/airflow/dags/domestic_music_chart_elt.py index 2cbdee0..ee4577a 100644 --- a/airflow/dags/domestic_music_chart_elt.py +++ b/airflow/dags/domestic_music_chart_elt.py @@ -199,7 +199,8 @@ def wait_one_minute(): # DAG 실행 순서 start >> clean_music_chart >> wait_task - wait_task >> create_dashboard_schema >> [ + #wait_task >> create_dashboard_schema >> [ + wait_task >>[ genre_trend_analysis, artist_performance, new_songs_analysis, From d5fc382c421979b1ae77a43dc161b12f7d0bd7e3 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Sun, 16 Mar 2025 15:00:44 +0000 Subject: [PATCH 5/6] Automated format fixes --- airflow/dags/domestic_music_chart_elt.py | 153 ++++++++++++----------- 1 file changed, 80 insertions(+), 73 deletions(-) diff --git a/airflow/dags/domestic_music_chart_elt.py b/airflow/dags/domestic_music_chart_elt.py index ee4577a..bd33eed 100644 --- a/airflow/dags/domestic_music_chart_elt.py +++ b/airflow/dags/domestic_music_chart_elt.py @@ -1,68 +1,71 @@ +import time +from datetime import timedelta + from airflow import DAG -from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator +from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator from airflow.utils.dates import days_ago -from datetime import timedelta -import time + def wait_one_minute(): time.sleep(60) + default_args = { - 'owner': 'airflow', - 'start_date': days_ago(1), - 'retries': 1, - 'retry_delay': timedelta(minutes=5) + "owner": "airflow", + "start_date": days_ago(1), + "retries": 1, + "retry_delay": timedelta(minutes=5), } with DAG( - 'domestic_music_chart_dashboard_elt', + "domestic_music_chart_dashboard_elt", default_args=default_args, - schedule_interval='@daily', - catchup=False + schedule_interval="@daily", + catchup=False, ) as dag: # 시작 Dummy 태스크 - start = DummyOperator(task_id='start') + start = DummyOperator(task_id="start") # Step 1: 최신 데이터만 추출하여 adhoc 스키마에 저장 (컬럼 "DATE" 사용) clean_music_chart = SnowflakeOperator( - task_id='clean_music_chart', - snowflake_conn_id='snowflake_conn', + task_id="clean_music_chart", + snowflake_conn_id="snowflake_conn", sql=""" CREATE OR REPLACE TABLE s4tify.adhoc.music_chart_cleaned AS - SELECT + SELECT rank, title, artist, genre, lastpos, image, peakpos, isnew, source, "DATE" AS time_date FROM s4tify.raw_data.music_charts WHERE "DATE" = (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts); - """ + """, ) # Step 2: 1분 대기 (데이터 저장 후 안정적 처리 위해) wait_task = PythonOperator( - task_id='wait_one_minute', + task_id="wait_one_minute", python_callable=wait_one_minute, ) # 추가: 대시보드 스키마 생성 (존재하지 않을 경우) create_dashboard_schema = SnowflakeOperator( - task_id='create_dashboard_schema', - snowflake_conn_id='snowflake_conn', + task_id="create_dashboard_schema", + snowflake_conn_id="snowflake_conn", sql=""" CREATE SCHEMA IF NOT EXISTS s4tify.analytics; - """ + """, ) # Step 3: 대시보드용 ELT SQL 태스크들 - ## 3-1. 장르별 인기곡 트렌드 분석 (수정됨) + # 3-1. 장르별 인기곡 트렌드 분석 (수정됨) genre_trend_analysis = SnowflakeOperator( - task_id='genre_trend_analysis', - snowflake_conn_id='snowflake_conn', + task_id="genre_trend_analysis", + snowflake_conn_id="snowflake_conn", sql=""" CREATE OR REPLACE TABLE s4tify.analytics.genre_trend_analysis AS - SELECT + SELECT genre_flattened.value::string AS genre, COUNT(DISTINCT title) AS total_songs, AVG(rank) AS avg_rank @@ -70,45 +73,45 @@ def wait_one_minute(): LATERAL FLATTEN(input => parse_json(REPLACE(genre, '''', '\"'))) AS genre_flattened GROUP BY genre_flattened.value ORDER BY avg_rank; - """ + """, ) - ## 3-2. 아티스트별 최고 순위 및 평균 순위 분석 + # 3-2. 아티스트별 최고 순위 및 평균 순위 분석 artist_performance = SnowflakeOperator( - task_id='artist_performance', - snowflake_conn_id='snowflake_conn', + task_id="artist_performance", + snowflake_conn_id="snowflake_conn", sql=""" CREATE OR REPLACE TABLE s4tify.analytics.artist_performance AS - SELECT - artist, + SELECT + artist, COUNT(DISTINCT title) AS total_songs, -- 중복 제거 - MIN(rank) AS best_rank, + MIN(rank) AS best_rank, AVG(rank) AS avg_rank FROM s4tify.adhoc.music_chart_cleaned WHERE time_date = (SELECT MAX(time_date) FROM s4tify.adhoc.music_chart_cleaned) -- 최신 데이터만 사용 GROUP BY artist ORDER BY best_rank; - """ + """, ) - ## 3-3. 신곡(NEW) 현황 분석 + # 3-3. 신곡(NEW) 현황 분석 new_songs_analysis = SnowflakeOperator( - task_id='new_songs_analysis', - snowflake_conn_id='snowflake_conn', + task_id="new_songs_analysis", + snowflake_conn_id="snowflake_conn", sql=""" CREATE OR REPLACE TABLE s4tify.analytics.new_songs_analysis AS - SELECT + SELECT COUNT(*) AS total_songs, SUM(CASE WHEN isnew = TRUE THEN 1 ELSE 0 END) AS new_songs, ROUND(100 * SUM(CASE WHEN isnew = TRUE THEN 1 ELSE 0 END) / COUNT(*), 2) AS new_song_percentage FROM s4tify.adhoc.music_chart_cleaned; - """ + """, ) - ## 3-4. TOP 10 곡의 안정성 분석 (평균/최대 유지 기간) + # 3-4. TOP 10 곡의 안정성 분석 (평균/최대 유지 기간) top10_stability = SnowflakeOperator( - task_id='top10_stability', - snowflake_conn_id='snowflake_conn', + task_id="top10_stability", + snowflake_conn_id="snowflake_conn", sql=""" CREATE OR REPLACE TABLE s4tify.analytics.top10_stability AS WITH top10 AS ( @@ -117,23 +120,23 @@ def wait_one_minute(): WHERE rank <= 10 GROUP BY title ) - SELECT + SELECT AVG(weeks_on_chart) AS avg_weeks_top10, MAX(weeks_on_chart) AS max_weeks_top10 FROM top10; - """ + """, ) - ## 3-5. 차트 1위 곡의 주간 유지 기간 분석 + # 3-5. 차트 1위 곡의 주간 유지 기간 분석 no1_song_duration = SnowflakeOperator( - task_id='no1_song_duration', - snowflake_conn_id='snowflake_conn', + task_id="no1_song_duration", + snowflake_conn_id="snowflake_conn", sql=""" CREATE OR REPLACE TABLE s4tify.analytics.no1_song_duration AS WITH no1_songs AS ( - SELECT - LOWER(title) AS title, - YEAR("DATE") AS year, + SELECT + LOWER(title) AS title, + YEAR("DATE") AS year, WEEKOFYEAR("DATE") AS week FROM ( SELECT DISTINCT LOWER(title) as title, "DATE" @@ -142,69 +145,73 @@ def wait_one_minute(): ) unique_no1 GROUP BY LOWER(title), YEAR("DATE"), WEEKOFYEAR("DATE") ) - SELECT - title, + SELECT + title, COUNT(*) AS total_weeks_at_no1 FROM no1_songs GROUP BY title HAVING COUNT(*) >= 2 ORDER BY total_weeks_at_no1 DESC; - """ + """, ) - ## 3-6. 랭킹 상승/하락 곡 분석 + # 3-6. 랭킹 상승/하락 곡 분석 rank_change_analysis = SnowflakeOperator( - task_id='rank_change_analysis', - snowflake_conn_id='snowflake_conn', + task_id="rank_change_analysis", + snowflake_conn_id="snowflake_conn", sql=""" CREATE OR REPLACE TABLE s4tify.analytics.rank_change_analysis AS WITH prev_chart AS ( - SELECT title, artist, rank + SELECT title, artist, rank FROM ( SELECT title, artist, rank, "DATE", ROW_NUMBER() OVER (PARTITION BY title, artist ORDER BY "DATE" DESC) AS rn FROM s4tify.raw_data.music_charts WHERE "DATE" = ( - SELECT MAX("DATE") FROM s4tify.raw_data.music_charts + SELECT MAX("DATE") FROM s4tify.raw_data.music_charts WHERE "DATE" < (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts) ) - ) + ) WHERE rn = 1 -- 중복 제거: 가장 최근 데이터만 선택 ), latest_chart AS ( - SELECT title, artist, rank + SELECT title, artist, rank FROM ( SELECT title, artist, rank, "DATE", ROW_NUMBER() OVER (PARTITION BY title, artist ORDER BY "DATE" DESC) AS rn FROM s4tify.raw_data.music_charts WHERE "DATE" = (SELECT MAX("DATE") FROM s4tify.raw_data.music_charts) - ) + ) WHERE rn = 1 -- 중복 제거: 가장 최근 데이터만 선택 ) - SELECT - l.title, - l.artist, - p.rank AS prev_rank, + SELECT + l.title, + l.artist, + p.rank AS prev_rank, l.rank AS current_rank, (p.rank - l.rank) AS rank_change FROM latest_chart l LEFT JOIN prev_chart p ON l.title = p.title AND l.artist = p.artist WHERE p.rank IS NOT NULL AND l.rank IS NOT NULL -- NULL 값 제거 ORDER BY rank_change DESC; - """ + """, ) # 종료 Dummy 태스크 - end = DummyOperator(task_id='end') + end = DummyOperator(task_id="end") # DAG 실행 순서 start >> clean_music_chart >> wait_task - #wait_task >> create_dashboard_schema >> [ - wait_task >>[ - genre_trend_analysis, - artist_performance, - new_songs_analysis, - top10_stability, - no1_song_duration, - rank_change_analysis - ] >> end + # wait_task >> create_dashboard_schema >> [ + ( + wait_task + >> [ + genre_trend_analysis, + artist_performance, + new_songs_analysis, + top10_stability, + no1_song_duration, + rank_change_analysis, + ] + >> end + ) From 94d369cc416aeba28b69cf001daec706e6328613 Mon Sep 17 00:00:00 2001 From: gland78 Date: Mon, 17 Mar 2025 00:42:19 +0900 Subject: [PATCH 6/6] =?UTF-8?q?snowflake=5FELT.sql=20=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- snowflake_ELT.sql | 80 ----------------------------------------------- 1 file changed, 80 deletions(-) delete mode 100644 snowflake_ELT.sql diff --git a/snowflake_ELT.sql b/snowflake_ELT.sql deleted file mode 100644 index 0c81094..0000000 --- a/snowflake_ELT.sql +++ /dev/null @@ -1,80 +0,0 @@ --- Step 1: 데이터 정리 및 중복 제거 -CREATE OR REPLACE TABLE s4tify.adhoc.music_chart_cleaned AS -SELECT DISTINCT - RANK AS chart_rank, - TITLE AS song_title, - ARTIST AS artist_name, - GENRE AS genre, - LASTPOS AS last_position, - IMAGE AS album_image, - PEAKPOS AS peak_position, - ISNEW AS is_new, - SOURCE AS platform -FROM s4tify.raw_data.music_charts; - --- Step 2: 아티스트별 성과 비교 -CREATE OR REPLACE TABLE s4tify.adhoc.artist_performance AS -SELECT - artist_name, - COUNT(DISTINCT song_title) AS total_songs, - AVG(chart_rank) AS avg_rank, - MIN(chart_rank) AS best_rank -FROM s4tify.adhoc.music_chart_cleaned -GROUP BY artist_name; - --- Step 3: 플랫폼별 인기곡 분석 -CREATE OR REPLACE TABLE s4tify.adhoc.platform_popularity AS -SELECT - platform, - COUNT(DISTINCT song_title) AS total_songs, - AVG(chart_rank) AS avg_rank -FROM s4tify.adhoc.music_chart_cleaned -GROUP BY platform; - --- Step 4: 장르별 트렌드 분석 -CREATE OR REPLACE TABLE s4tify.adhoc.genre_trends AS -SELECT - genre, - COUNT(DISTINCT song_title) AS total_songs, - AVG(chart_rank) AS avg_rank -FROM s4tify.adhoc.music_chart_cleaned -GROUP BY genre; - --- Step 5: 신곡과 기존 곡의 성과 비교 -CREATE OR REPLACE TABLE s4tify.adhoc.new_vs_old_songs AS -SELECT - CASE WHEN is_new = 'true' THEN 'New' ELSE 'Old' END AS song_type, - COUNT(*) AS total_songs, - AVG(chart_rank) AS avg_rank -FROM s4tify.adhoc.music_chart_cleaned -GROUP BY song_type; - --- Step 6: 최고 순위 기록 비교 -CREATE OR REPLACE TABLE s4tify.adhoc.peak_rank_analysis AS -SELECT - artist_name, - song_title, - MIN(chart_rank) AS best_rank, - COUNT(*) AS weeks_on_chart -FROM s4tify.adhoc.music_chart_cleaned -GROUP BY artist_name, song_title; - --- Step 7: 플랫폼별 최고 순위 기록 -CREATE OR REPLACE TABLE s4tify.adhoc.platform_peak_rank AS -SELECT - platform, - artist_name, - song_title, - MIN(chart_rank) AS best_rank -FROM s4tify.adhoc.music_chart_cleaned -GROUP BY platform, artist_name, song_title; - --- Step 8: 아티스트별 플랫폼 편중 분석 -CREATE OR REPLACE TABLE s4tify.adhoc.artist_platform_focus AS -SELECT - artist_name, - platform, - COUNT(DISTINCT song_title) AS total_songs -FROM s4tify.adhoc.music_chart_cleaned -GROUP BY artist_name, platform -ORDER BY artist_name, total_songs DESC;