Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions airflow/dags/Bugs_DAG.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import io
import json
from datetime import datetime, timedelta

Expand Down Expand Up @@ -53,24 +54,36 @@ def fetch_bugs_chart():
def convert_json_to_csv(**kwargs):
ti = kwargs["ti"]
data = ti.xcom_pull(task_ids="fetch_bugs_chart")
csv_data = [["rank", "title", "artist",
"lastPos", "peakPos", "image", "genre"]]

output = io.StringIO()
writer = csv.writer(
output, quoting=csv.QUOTE_ALL
) # ✅ 모든 필드를 자동으로 따옴표 처리

# 헤더 추가
writer.writerow(["rank", "title", "artist", "lastPos",
"peakPos", "image", "genre"])

# 데이터 추가
for entry in data["entries"]:
csv_data.append(
genres = json.dumps(
entry["genres"], ensure_ascii=False
) # 리스트를 문자열로 변환
# 이중 따옴표가 포함되면 한번만 보이도록 처리
genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈
writer.writerow(
[
entry["rank"],
entry["title"],
entry["artist"],
entry["lastPos"],
entry["peakPos"],
entry["image"],
json.dumps(
entry["genres"], ensure_ascii=False
), # ✅ 리스트를 문자열로 변환하여 저장
genres,
]
)
csv_string = "\n".join(",".join(map(str, row)) for row in csv_data)
return csv_string

return output.getvalue()


# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수)
Expand All @@ -83,7 +96,7 @@ def save_csv_locally(csv_string):
def upload_to_s3(**kwargs):
ti = kwargs["ti"]
csv_string = ti.xcom_pull(task_ids="convert_json_to_csv")
# save_csv_locally(csv_string) # 테스트용 로컬 저장
save_csv_locally(csv_string) # 테스트용 로컬 저장
s3_hook = S3Hook(aws_conn_id="S4tify_S3")
s3_hook.load_string(
csv_string,
Expand All @@ -106,7 +119,7 @@ def upload_to_s3(**kwargs):
"bugs_chart_dag",
default_args=default_args,
schedule_interval="10 0 * * *", # 매일 00:10 실행
catchup=False,
catchup=True,
) as dag:

get_spotify_token_task = PythonOperator(
Expand Down
41 changes: 30 additions & 11 deletions airflow/dags/Flo_DAG.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import io
import json
from datetime import datetime, timedelta

Expand Down Expand Up @@ -26,10 +27,12 @@ def fetch_flo_chart():
chart = ChartData(fetch=True)
chart_data = {"date": chart.date.strftime(
"%Y-%m-%d %H:%M:%S"), "entries": []}

for entry in chart.entries:
print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}")
artist_id = search_artist_id(entry.artist)
genre = get_artist_genre(artist_id)

chart_data["entries"].append(
{
"rank": entry.rank,
Expand All @@ -41,42 +44,58 @@ def fetch_flo_chart():
"genres": genre.split(", ") if genre else [],
}
)

return chart_data


# 2. JSON → CSV 변환
# 2. JSON → CSV 변환 (쉼표 포함된 데이터도 깨지지 않도록 수정)
def convert_json_to_csv(**kwargs):
ti = kwargs["ti"]
data = ti.xcom_pull(task_ids="fetch_flo_chart")
csv_data = [["rank", "title", "artist",
"lastPos", "isNew", "image", "genre"]]

output = io.StringIO()
writer = csv.writer(
output, quoting=csv.QUOTE_ALL
) # ✅ 모든 필드를 자동으로 따옴표 처리

# 헤더 추가
writer.writerow(["rank", "title", "artist",
"lastPos", "isNew", "image", "genre"])

# 데이터 추가
for entry in data["entries"]:
csv_data.append(
genres = json.dumps(
entry["genres"], ensure_ascii=False
) # 리스트를 문자열로 변환
# 이중 따옴표가 포함되면 한번만 보이도록 처리
genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈
writer.writerow(
[
entry["rank"],
entry["title"],
entry["artist"],
entry["lastPos"],
entry["isNew"],
entry["image"],
json.dumps(entry["genres"], ensure_ascii=False),
genres,
]
)
csv_string = "\n".join(",".join(map(str, row)) for row in csv_data)
return csv_string

return output.getvalue()

# 3. 로컬에 CSV 저장 (테스트용)

# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수)
def save_csv_locally(csv_string):
with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f:
f.write(csv_string)


# 4. AWS S3 업로드
# 3. AWS S3 업로드
def upload_to_s3(**kwargs):
ti = kwargs["ti"]
csv_string = ti.xcom_pull(task_ids="convert_json_to_csv")
# save_csv_locally(csv_string) # 테스트용 로컬 저장
save_csv_locally(csv_string) # 테스트용 로컬 저장

s3_hook = S3Hook(aws_conn_id="S4tify_S3")
s3_hook.load_string(
csv_string,
Expand All @@ -99,7 +118,7 @@ def upload_to_s3(**kwargs):
"flo_chart_dag",
default_args=default_args,
schedule_interval="20 0 * * *", # 매일 00:20 실행
catchup=False,
catchup=True,
) as dag:

get_spotify_token_task = PythonOperator(
Expand Down
41 changes: 30 additions & 11 deletions airflow/dags/Genie_DAG.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import io
import json
from datetime import datetime, timedelta

Expand Down Expand Up @@ -26,10 +27,12 @@ def fetch_genie_chart():
chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True)
chart_data = {"date": chart.date.strftime(
"%Y-%m-%d %H:%M:%S"), "entries": []}

for entry in chart.entries:
print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}")
artist_id = search_artist_id(entry.artist)
genre = get_artist_genre(artist_id)

chart_data["entries"].append(
{
"rank": entry.rank,
Expand All @@ -41,42 +44,58 @@ def fetch_genie_chart():
"genres": genre.split(", ") if genre else [],
}
)

return chart_data


# 2. JSON → CSV 변환
# 2. JSON → CSV 변환 (쉼표 포함된 데이터도 깨지지 않도록 수정)
def convert_json_to_csv(**kwargs):
ti = kwargs["ti"]
data = ti.xcom_pull(task_ids="fetch_genie_chart")
csv_data = [["rank", "title", "artist",
"peakPos", "lastPos", "image", "genre"]]

output = io.StringIO()
writer = csv.writer(
output, quoting=csv.QUOTE_ALL
) # ✅ 모든 필드를 자동으로 따옴표 처리

# 헤더 추가
writer.writerow(["rank", "title", "artist", "peakPos",
"lastPos", "image", "genre"])

# 데이터 추가
for entry in data["entries"]:
csv_data.append(
genres = json.dumps(
entry["genres"], ensure_ascii=False
) # 리스트를 문자열로 변환
# 이중 따옴표가 포함되면 한번만 보이도록 처리
genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈
writer.writerow(
[
entry["rank"],
entry["title"],
entry["artist"],
entry["peakPos"],
entry["lastPos"],
entry["image"],
json.dumps(entry["genres"], ensure_ascii=False),
genres,
]
)
csv_string = "\n".join(",".join(map(str, row)) for row in csv_data)
return csv_string

return output.getvalue()

# 3. 로컬에 CSV 저장 (테스트용)

# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수)
def save_csv_locally(csv_string):
with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f:
f.write(csv_string)


# 4. AWS S3 업로드
# 3. AWS S3 업로드
def upload_to_s3(**kwargs):
ti = kwargs["ti"]
csv_string = ti.xcom_pull(task_ids="convert_json_to_csv")
# save_csv_locally(csv_string) # 테스트용 로컬 저장
save_csv_locally(csv_string) # 테스트용 로컬 저장

s3_hook = S3Hook(aws_conn_id="S4tify_S3")
s3_hook.load_string(
csv_string,
Expand All @@ -99,7 +118,7 @@ def upload_to_s3(**kwargs):
"genie_chart_dag",
default_args=default_args,
schedule_interval="30 0 * * *", # 매일 00:30 실행
catchup=False,
catchup=True,
) as dag:

get_spotify_token_task = PythonOperator(
Expand Down
37 changes: 28 additions & 9 deletions airflow/dags/Melon_DAG.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import io
import json
from datetime import datetime, timedelta

Expand Down Expand Up @@ -26,10 +27,12 @@ def fetch_melon_chart():
chart = ChartData(fetch=True)
chart_data = {"date": chart.date.strftime(
"%Y-%m-%d %H:%M:%S"), "entries": []}

for entry in chart.entries:
print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}")
artist_id = search_artist_id(entry.artist)
genre = get_artist_genre(artist_id)

chart_data["entries"].append(
{
"rank": entry.rank,
Expand All @@ -48,25 +51,40 @@ def fetch_melon_chart():
def convert_json_to_csv(**kwargs):
ti = kwargs["ti"]
data = ti.xcom_pull(task_ids="fetch_melon_chart")
csv_data = [["rank", "title", "artist",
"lastPos", "isNew", "image", "genre"]]

output = io.StringIO()
writer = csv.writer(
output, quoting=csv.QUOTE_ALL
) # ✅ 모든 필드를 자동으로 따옴표 처리

# 헤더 추가
writer.writerow(["rank", "title", "artist",
"lastPos", "isNew", "image", "genre"])

# 데이터 추가
for entry in data["entries"]:
csv_data.append(
genres = json.dumps(
entry["genres"], ensure_ascii=False
) # 리스트를 문자열로 변환
# 이중 따옴표가 포함되면 한번만 보이도록 처리
genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈

writer.writerow(
[
entry["rank"],
entry["title"],
entry["artist"],
entry["lastPos"],
entry["isNew"],
entry["image"],
json.dumps(entry["genres"], ensure_ascii=False),
genres,
]
)
csv_string = "\n".join(",".join(map(str, row)) for row in csv_data)
return csv_string

return output.getvalue()

# 3. 로컬에 CSV 저장 (테스트용)

# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수)
def save_csv_locally(csv_string):
with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f:
f.write(csv_string)
Expand All @@ -76,7 +94,8 @@ def save_csv_locally(csv_string):
def upload_to_s3(**kwargs):
ti = kwargs["ti"]
csv_string = ti.xcom_pull(task_ids="convert_json_to_csv")
# save_csv_locally(csv_string) # 테스트용 로컬 저장
save_csv_locally(csv_string) # 테스트용 로컬 저장

s3_hook = S3Hook(aws_conn_id="S4tify_S3")
s3_hook.load_string(
csv_string,
Expand All @@ -99,7 +118,7 @@ def upload_to_s3(**kwargs):
"melon_chart_dag",
default_args=default_args,
schedule_interval="0 1 * * *", # 매일 01:00 실행
catchup=False,
catchup=True,
) as dag:

get_spotify_token_task = PythonOperator(
Expand Down
Loading