Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions airflow/dags/plugins/spark_snowflake_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,23 @@ def create_snowflake_table(sql):

def write_snowflake_spark_dataframe(table_name, df):

df.show()
snowflake_opts = snowflake_options.copy()

df.write.format("snowflake").options(**snowflake_options).option(
if table_name in ["spotify_genre_count", "artist_genre_count"]:
snowflake_opts["sfSchema"] = "ANALYTICS"

df.write.format("snowflake").options(**snowflake_opts).option(
"dbtable", f"{table_name}"
).mode("append").save()


def read_snowflake_spark_dataframe(spark, table_name):

df = (
spark.read.format("snowflake")
.options(**snowflake_options)
.option("dbtable", table_name)
.load()
)

return df
65 changes: 51 additions & 14 deletions airflow/dags/scripts/ELT_artist_info_globalTop50.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import snowflake.connector
from plugins.spark_snowflake_conn import *
from pyspark.sql.functions import (col, current_date, explode, lit,
regexp_replace, split, when)
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
regexp_replace, split, udf)
from pyspark.sql.types import (ArrayType, IntegerType, StringType, StructField,
StructType)

LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY")

Expand All @@ -27,7 +28,8 @@ def load():
artist VARCHAR(100),
artist_name VARCHAR(100),
artist_genre ARRAY,
date_time DATE
date_time DATE,
song_genre ARRAY
)
"""

Expand Down Expand Up @@ -77,9 +79,38 @@ def transformation():
artist_info_top50_df = artist_info_top50_df.withColumn(
"date_time", current_date())

artist_info_top50_df = artist_info_top50_df.withColumn(
"song_genre", add_song_genre_udf(col("artist_name"), col("title"))
)

return artist_info_top50_df


def add_song_genre(artist, track):

url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json"
print(url)

try:
response = requests.get(url).json()
return [
genre["name"] for genre in response.get(
"track",
{}).get(
"toptags",
{}).get(
"tag",
[])]
except requests.exceptions.RequestException as e:
print(f"API 요청 오류: {e}")
return ["API Error"]
except KeyError:
return ["Unknown"]


add_song_genre_udf = udf(add_song_genre, ArrayType(StringType()))


def extract(file_name, schema):

spark = create_spark_session("artist_global_table")
Expand All @@ -91,14 +122,22 @@ def extract(file_name, schema):
)

if file_name == "crawling_data":
df = (
df.withColumn(
"artist", split(
regexp_replace(
col("artist"), r"[\[\]']", ""), ", ")) .withColumn(
"artist_id", split(
regexp_replace(
col("artist_id"), r"[\[\]']", ""), ", "), ) )
df = df.withColumn(
"artist",
split(
regexp_replace(
col("artist"),
r"[\[\]']",
""),
", ")).withColumn(
"artist_id",
split(
regexp_replace(
col("artist_id"),
r"[\[\]']",
""),
", "),
)
if file_name == "artist_info":
df = df.withColumn(
"artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")
Expand All @@ -107,9 +146,7 @@ def extract(file_name, schema):
"artist_genre", split(df["artist_genre"], ", ")
) # 쉼표 기준으로 배열 변환
df = df.withColumnRenamed("artist", "artist_name")




df.show()

return df
Expand Down
88 changes: 88 additions & 0 deletions airflow/dags/scripts/ELT_chart_genre_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from datetime import datetime

import requests
import snowflake.connector
from plugins.spark_snowflake_conn import *
from pyspark.sql.functions import (col, count, current_date, desc, explode,
from_json)
from pyspark.sql.types import ArrayType, StringType

TODAY = datetime.today().strftime("%Y-%m-%d")


def load(sql, df, table_name):

create_snowflake_table(sql)
write_snowflake_spark_dataframe(table_name, df)


def transformation_artist_genre_count():

artist_info_table = (
extract()
.filter(col("date_time") == TODAY)
.dropDuplicates(["artist_id"])
.withColumn("artist_genre", explode(col("artist_genre")))
)

artist_genre_count = (
artist_info_table.groupBy("artist_genre")
.agg(count("artist_genre").alias("count"))
.withColumn("date_time", current_date())
)

sql = """
CREATE TABLE IF NOT EXISTS artist_genre_count(
artist_genre VARCHAR(100),
genre_count int,
date_tiem DATE
)
"""

load(sql, artist_genre_count, "artist_genre_count")


def transformation_genre_count():

artist_info_table = (
extract() .filter(
col("date_time") == TODAY) .dropDuplicates(
["title"]) .withColumn(
"song_genre",
from_json(
col("song_genre"),
ArrayType(
StringType()))) .withColumn(
"song_genre",
explode(
col("song_genre"))))

spotify_genre_count = (
artist_info_table.groupBy("song_genre")
.agg(count("song_genre").alias("genre_count"))
.withColumn("date_time", current_date())
.orderBy(desc("genre_count"))
)

sql = """
CREATE TABLE IF NOT EXISTS spotify_genre_count(
song_genre VARCHAR(100),
genre_count int,
date_time DATE
)
"""

load(sql, spotify_genre_count, "spotify_genre_count")


def extract():

spark = create_spark_session("chart_genre_count_table")
table = read_snowflake_spark_dataframe(spark, "artist_info_globalTop50")

return table


if __name__ == "__main__":
transformation_genre_count()
transformation_artist_genre_count()
14 changes: 7 additions & 7 deletions airflow/dags/scripts/crawling_spotify_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ def save_as_csv_file(df, logical_date):

dir_path = "crawling_data"
file_path = f"data/spotify_crawling_data_{TODAY}.csv"
df.index.name = 'rank'
df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=True)

df.index.name = "rank"
df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=True)
load_s3_bucket(dir_path, f"spotify_crawling_data_{logical_date}.csv")


Expand All @@ -53,7 +53,6 @@ def data_crawling(logical_date):
with webdriver.Chrome(service=Service(), options=chrome_options) as driver:

print("크롤링 시작")


driver.get(url)
driver.implicitly_wait(500)
Expand All @@ -72,13 +71,14 @@ def data_crawling(logical_date):
)
# 페이지 로딩 대기
driver.implicitly_wait(30)
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

driver.execute_script(
"window.scrollTo(0, document.body.scrollHeight);")

time.sleep(2)
song_lists = driver.find_elements(
By.XPATH, '//*[@id="main"]//div[@role="row"]'
)

print(len(song_lists))

for i in range(1, len(song_lists)):
Expand Down
3 changes: 3 additions & 0 deletions airflow/dags/scripts/request_spotify_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ def extract(url: str) -> Optional[Dict[str, Any]]:
): # 토큰 만료시 재요청
time.sleep(3)
get_token() # Variable에 저장된 token 변경
headers = {
"Authorization": f"Bearer {access_token}",
}
response = requests.get(url, headers=headers)
result = response.json()

Expand Down
13 changes: 12 additions & 1 deletion airflow/dags/spotify_ELT_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,15 @@
dag=dag,
)

[artist_info_Top10_table, artist_info_globalTop50_table]
spotify_genre_count_table = SparkSubmitOperator(
task_id="spotify_genre_count_table",
application="dags/scripts/ELT_chart_genre_count.py",
conn_id="spark_conn",
jars=SPARK_JARS,
dag=dag,
)

[
artist_info_Top10_table,
artist_info_globalTop50_table,
] >> spotify_genre_count_table