diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index a597e33..34fc3bd 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -1,4 +1,5 @@ import csv +import io import json from datetime import datetime, timedelta @@ -53,10 +54,24 @@ def fetch_bugs_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_bugs_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "peakPos", "image", "genre"]] + + output = io.StringIO() + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", "lastPos", + "peakPos", "image", "genre"]) + + # 데이터 추가 for entry in data["entries"]: - csv_data.append( + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + writer.writerow( [ entry["rank"], entry["title"], @@ -64,13 +79,11 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["peakPos"], entry["image"], - json.dumps( - entry["genres"], ensure_ascii=False - ), # ✅ 리스트를 문자열로 변환하여 저장 + genres, ] ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string + + return output.getvalue() # 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) @@ -83,7 +96,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -106,7 +119,7 @@ def upload_to_s3(**kwargs): "bugs_chart_dag", default_args=default_args, schedule_interval="10 0 * * *", # 매일 00:10 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index d72a328..5c596e4 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -1,4 +1,5 @@ import csv +import io import json from datetime import datetime, timedelta @@ -26,10 +27,12 @@ def fetch_flo_chart(): chart = ChartData(fetch=True) chart_data = {"date": chart.date.strftime( "%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) + chart_data["entries"].append( { "rank": entry.rank, @@ -41,17 +44,32 @@ def fetch_flo_chart(): "genres": genre.split(", ") if genre else [], } ) + return chart_data -# 2. JSON → CSV 변환 +# 2. JSON → CSV 변환 (쉼표 포함된 데이터도 깨지지 않도록 수정) def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_flo_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]] + + output = io.StringIO() + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]) + + # 데이터 추가 for entry in data["entries"]: - csv_data.append( + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + writer.writerow( [ entry["rank"], entry["title"], @@ -59,24 +77,25 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - json.dumps(entry["genres"], ensure_ascii=False), + genres, ] ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string + return output.getvalue() -# 3. 로컬에 CSV 저장 (테스트용) + +# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) -# 4. AWS S3 업로드 +# 3. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -99,7 +118,7 @@ def upload_to_s3(**kwargs): "flo_chart_dag", default_args=default_args, schedule_interval="20 0 * * *", # 매일 00:20 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index 0ad279c..e0dd127 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -1,4 +1,5 @@ import csv +import io import json from datetime import datetime, timedelta @@ -26,10 +27,12 @@ def fetch_genie_chart(): chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True) chart_data = {"date": chart.date.strftime( "%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) + chart_data["entries"].append( { "rank": entry.rank, @@ -41,17 +44,32 @@ def fetch_genie_chart(): "genres": genre.split(", ") if genre else [], } ) + return chart_data -# 2. JSON → CSV 변환 +# 2. JSON → CSV 변환 (쉼표 포함된 데이터도 깨지지 않도록 수정) def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_genie_chart") - csv_data = [["rank", "title", "artist", - "peakPos", "lastPos", "image", "genre"]] + + output = io.StringIO() + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", "peakPos", + "lastPos", "image", "genre"]) + + # 데이터 추가 for entry in data["entries"]: - csv_data.append( + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + writer.writerow( [ entry["rank"], entry["title"], @@ -59,24 +77,25 @@ def convert_json_to_csv(**kwargs): entry["peakPos"], entry["lastPos"], entry["image"], - json.dumps(entry["genres"], ensure_ascii=False), + genres, ] ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string + return output.getvalue() -# 3. 로컬에 CSV 저장 (테스트용) + +# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) -# 4. AWS S3 업로드 +# 3. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -99,7 +118,7 @@ def upload_to_s3(**kwargs): "genie_chart_dag", default_args=default_args, schedule_interval="30 0 * * *", # 매일 00:30 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index 6622984..8d89066 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -1,4 +1,5 @@ import csv +import io import json from datetime import datetime, timedelta @@ -26,10 +27,12 @@ def fetch_melon_chart(): chart = ChartData(fetch=True) chart_data = {"date": chart.date.strftime( "%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) + chart_data["entries"].append( { "rank": entry.rank, @@ -48,10 +51,25 @@ def fetch_melon_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_melon_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]] + + output = io.StringIO() + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]) + + # 데이터 추가 for entry in data["entries"]: - csv_data.append( + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + + writer.writerow( [ entry["rank"], entry["title"], @@ -59,14 +77,14 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - json.dumps(entry["genres"], ensure_ascii=False), + genres, ] ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string + return output.getvalue() -# 3. 로컬에 CSV 저장 (테스트용) + +# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) @@ -76,7 +94,8 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -99,7 +118,7 @@ def upload_to_s3(**kwargs): "melon_chart_dag", default_args=default_args, schedule_interval="0 1 * * *", # 매일 01:00 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index 9b5958d..612a477 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -1,4 +1,5 @@ import csv +import io import json from datetime import datetime, timedelta @@ -48,10 +49,24 @@ def fetch_vibe_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_vibe_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]] + + output = io.StringIO() + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]) + + # 데이터 추가 for entry in data["entries"]: - csv_data.append( + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + writer.writerow( [ entry["rank"], entry["title"], @@ -59,11 +74,11 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - json.dumps(entry["genres"], ensure_ascii=False), + genres, ] ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string + + return output.getvalue() # 3. 로컬에 CSV 저장 (테스트용) @@ -76,7 +91,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -99,7 +114,7 @@ def upload_to_s3(**kwargs): "vibe_chart_dag", default_args=default_args, schedule_interval="45 0 * * *", # 매일 00:45 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index e5a54ba..eca84fb 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -24,11 +24,8 @@ "account": Variable.get("SNOWFLAKE_ACCOUNT"), "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), - "schema": ( - Variable.get("SNOWFLAKE_SCHEMA") - if Variable.get("SNOWFLAKE_SCHEMA") - else "raw_data" - ), + # "schema": (Variable.get("SNOWFLAKE_SCHEMA")if Variable.get("SNOWFLAKE_SCHEMA")else "raw_data"), + "schema": "RAW_DATA", "role": "ACCOUNTADMIN", "driver": "net.snowflake.client.jdbc.SnowflakeDriver", "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', @@ -70,22 +67,29 @@ def check_and_create_table(): cur = conn.cursor() # 테이블 존재 여부 확인 - cur.execute(f"SHOW TABLES LIKE 'music_charts'") + cur.execute( + f""" + SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = '{SNOWFLAKE_OPTIONS["schema"]}' + AND UPPER(TABLE_NAME) = 'MUSIC_CHARTS' + """ + ) result = cur.fetchone() if result is None: - # 테이블이 존재하지 않으면 생성 - create_table_query = """ - CREATE OR REPLACE TABLE ADHOC.music_charts ( + # 테이블이 없으면 생성 + create_table_query = f""" + CREATE TABLE IF NOT EXISTS {SNOWFLAKE_OPTIONS['schema']}.music_charts ( rank INT, title STRING, artist STRING, + genre STRING, -- 🎵 genre 컬럼 추가 lastPos INT, image STRING, peakPos INT, isNew BOOLEAN, source STRING - ); + ) """ cur.execute(create_table_query) print("✅ music_charts 테이블 생성 완료.") @@ -123,35 +127,32 @@ def insert_data_into_snowflake(df, table_name): ) cur = conn.cursor() - # DataFrame을 순회하며 INSERT 쿼리 실행 for row in df.collect(): - # None 값을 NULL로 처리하고, 문자열 값은 작은따옴표로 감쌈 - rank = f"NULL" if row["rank"] is None else row["rank"] + rank = "NULL" if row["rank"] is None else row["rank"] title = escape_quotes( - f"NULL" if row["title"] is None else f"'{row['title']}'" - ) - artist = escape_quotes( - f"NULL" if row["artist"] is None else f"'{row['artist']}'" - ) - lastPos = f"NULL" if row["lastPos"] is None else row["lastPos"] + row["title"]) if row["title"] is not None else "NULL" + artist = ( + escape_quotes( + row["artist"]) if row["artist"] is not None else "NULL") + genre = ( + escape_quotes( + row["genre"]) if row["genre"] is not None else "NULL") # 🎵 genre 추가 + lastPos = "NULL" if row["lastPos"] is None else row["lastPos"] image = escape_quotes( - f"NULL" if row["image"] is None else f"'{row['image']}'" - ) - peakPos = f"NULL" if row["peakPos"] is None else row["peakPos"] - # isNew 값은 TRUE/FALSE로 처리하고 NULL은 그대로 처리 + row["image"]) if row["image"] is not None else "NULL" + peakPos = "NULL" if row["peakPos"] is None else row["peakPos"] isNew = ( - f"NULL" + "NULL" if row["isNew"] is None - else ("True" if row["isNew"] else "FALSE") - ) - source = escape_quotes( - f"NULL" if row["source"] is None else f"'{row['source']}'" + else ("TRUE" if row["isNew"] else "FALSE") ) + source = ( + escape_quotes( + row["source"]) if row["source"] is not None else "NULL") - # 삽입할 쿼리 (컬럼 이름은 큰따옴표 없이) query = f""" - INSERT INTO {table_name} (rank, title, artist, lastPos, image, peakPos, isNew, source) - VALUES ({rank}, {title}, {artist}, {lastPos}, {image}, {peakPos}, {isNew}, {source}) + INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source) + VALUES ({rank}, {title}, {artist}, {genre}, {lastPos}, {image}, {peakPos}, {isNew}, {source}) """ cur.execute(query) @@ -159,6 +160,7 @@ def insert_data_into_snowflake(df, table_name): cur.close() conn.close() print("✅ Data inserted into Snowflake successfully.") + except Exception as e: print(query) print(f"⚠️ Error inserting data into Snowflake: {e}") @@ -187,6 +189,7 @@ def read_chart_data(source, path): .option("inferSchema", True) .load(path) ) + df.printSchema() # ✅ 데이터 스키마 출력해서 `genre` 확인 return df.withColumn("source", lit(source)) except Exception as e: print(f"⚠️ {source} 데이터 로드 실패: {e}") @@ -206,21 +209,21 @@ def read_chart_data(source, path): merged_df = merged_df.unionByName(df, allowMissingColumns=True) final_df = merged_df.select( - when( - col("rank").rlike("^[0-9]+$"), - col("rank").cast("int")).alias("rank"), + when(col("rank").rlike("^[0-9]+$"), + col("rank").cast("int")).alias("rank"), col("title"), col("artist"), - when( - col("lastPos").rlike("^[0-9]+$"), - col("lastPos").cast("int")).alias("lastPos"), + col("genre"), # ✅ genre 컬럼 추가 + when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias( + "lastPos" + ), col("image"), - when( - col("peakPos").rlike("^[0-9]+$"), - col("peakPos").cast("int")).alias("peakPos"), - when( - col("isNew").rlike("^(true|false)$"), - col("isNew").cast("boolean")).alias("isNew"), + when(col("peakPos").rlike("^[0-9]+$"), col("peakPos").cast("int")).alias( + "peakPos" + ), + when(col("isNew").rlike("^(true|false)$"), col("isNew").cast("boolean")).alias( + "isNew" + ), col("source"), ) diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 24a477c..2ff2a8b 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -55,10 +55,16 @@ def data_crawling(): try: # top50 리스트 가져오기 - scroll_element = driver.find_element(By.XPATH, '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]') - driver.execute_script(""" - arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'}); - """, scroll_element) + scroll_element = driver.find_element( + By.XPATH, + '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]', + ) + driver.execute_script( + """ + arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'}); + """, + scroll_element, + ) # 페이지 로딩 대기 driver.implicitly_wait(30) song_lists = driver.find_elements( diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index b9088df..8df83dd 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -66,10 +66,9 @@ def get_arti_top10(logical_date, **kwargs): index=False) try: load_s3_bucket(object_name) - os.remove(f'data/{object_name}') + os.remove(f"data/{object_name}") except Exception as e: - print(f'error: {e}') - + print(f"error: {e}") # 아티스트 정보 가져오기 @@ -111,10 +110,9 @@ def get_artist_info(logical_date, **kwargs): index=False) try: load_s3_bucket(object_name) - os.remove(f'data/{object_name}') + os.remove(f"data/{object_name}") except Exception as e: - print(f'error: {e}') - + print(f"error: {e}") # 크롤링 데이터 읽어오는 함수 diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index 6ae5738..561bbee 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -4,8 +4,8 @@ from scripts.request_spotify_api import * from airflow import DAG -from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator default_args = { "depends_on_past": False, @@ -40,13 +40,15 @@ python_callable=get_arti_top10, op_kwargs={"logical_date": "{{ ds }}"}, ) - + remove_crawling_data = BashOperator( - task_id = 'remove_crawling_data', - bash_command='rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv', - dag=dag + task_id="remove_crawling_data", + bash_command="rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv", + dag=dag, ) - extract_globalTop50_data >> [ - extract_artistInfo_data, - extract_artistTop10_data] >> remove_crawling_data + ( + extract_globalTop50_data + >> [extract_artistInfo_data, extract_artistTop10_data] + >> remove_crawling_data + )