diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index 1bf43b6..69ae3d7 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -56,8 +56,23 @@ def create_snowflake_table(sql): def write_snowflake_spark_dataframe(table_name, df): - df.show() + snowflake_opts = snowflake_options.copy() - df.write.format("snowflake").options(**snowflake_options).option( + if table_name in ["spotify_genre_count", "artist_genre_count"]: + snowflake_opts["sfSchema"] = "ANALYTICS" + + df.write.format("snowflake").options(**snowflake_opts).option( "dbtable", f"{table_name}" ).mode("append").save() + + +def read_snowflake_spark_dataframe(spark, table_name): + + df = ( + spark.read.format("snowflake") + .options(**snowflake_options) + .option("dbtable", table_name) + .load() + ) + + return df diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index 4f05dd1..f090d71 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -5,8 +5,9 @@ import snowflake.connector from plugins.spark_snowflake_conn import * from pyspark.sql.functions import (col, current_date, explode, lit, - regexp_replace, split, when) -from pyspark.sql.types import IntegerType, StringType, StructField, StructType + regexp_replace, split, udf) +from pyspark.sql.types import (ArrayType, IntegerType, StringType, StructField, + StructType) LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") @@ -27,7 +28,8 @@ def load(): artist VARCHAR(100), artist_name VARCHAR(100), artist_genre ARRAY, - date_time DATE + date_time DATE, + song_genre ARRAY ) """ @@ -77,9 +79,38 @@ def transformation(): artist_info_top50_df = artist_info_top50_df.withColumn( "date_time", current_date()) + artist_info_top50_df = artist_info_top50_df.withColumn( + "song_genre", add_song_genre_udf(col("artist_name"), col("title")) + ) + return artist_info_top50_df +def add_song_genre(artist, track): + + url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json" + print(url) + + try: + response = requests.get(url).json() + return [ + genre["name"] for genre in response.get( + "track", + {}).get( + "toptags", + {}).get( + "tag", + [])] + except requests.exceptions.RequestException as e: + print(f"API 요청 오류: {e}") + return ["API Error"] + except KeyError: + return ["Unknown"] + + +add_song_genre_udf = udf(add_song_genre, ArrayType(StringType())) + + def extract(file_name, schema): spark = create_spark_session("artist_global_table") @@ -91,14 +122,22 @@ def extract(file_name, schema): ) if file_name == "crawling_data": - df = ( - df.withColumn( - "artist", split( - regexp_replace( - col("artist"), r"[\[\]']", ""), ", ")) .withColumn( - "artist_id", split( - regexp_replace( - col("artist_id"), r"[\[\]']", ""), ", "), ) ) + df = df.withColumn( + "artist", + split( + regexp_replace( + col("artist"), + r"[\[\]']", + ""), + ", ")).withColumn( + "artist_id", + split( + regexp_replace( + col("artist_id"), + r"[\[\]']", + ""), + ", "), + ) if file_name == "artist_info": df = df.withColumn( "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") @@ -107,9 +146,7 @@ def extract(file_name, schema): "artist_genre", split(df["artist_genre"], ", ") ) # 쉼표 기준으로 배열 변환 df = df.withColumnRenamed("artist", "artist_name") - - - + df.show() return df diff --git a/airflow/dags/scripts/ELT_chart_genre_count.py b/airflow/dags/scripts/ELT_chart_genre_count.py new file mode 100644 index 0000000..e8aa604 --- /dev/null +++ b/airflow/dags/scripts/ELT_chart_genre_count.py @@ -0,0 +1,88 @@ +from datetime import datetime + +import requests +import snowflake.connector +from plugins.spark_snowflake_conn import * +from pyspark.sql.functions import (col, count, current_date, desc, explode, + from_json) +from pyspark.sql.types import ArrayType, StringType + +TODAY = datetime.today().strftime("%Y-%m-%d") + + +def load(sql, df, table_name): + + create_snowflake_table(sql) + write_snowflake_spark_dataframe(table_name, df) + + +def transformation_artist_genre_count(): + + artist_info_table = ( + extract() + .filter(col("date_time") == TODAY) + .dropDuplicates(["artist_id"]) + .withColumn("artist_genre", explode(col("artist_genre"))) + ) + + artist_genre_count = ( + artist_info_table.groupBy("artist_genre") + .agg(count("artist_genre").alias("count")) + .withColumn("date_time", current_date()) + ) + + sql = """ + CREATE TABLE IF NOT EXISTS artist_genre_count( + artist_genre VARCHAR(100), + genre_count int, + date_tiem DATE + ) + """ + + load(sql, artist_genre_count, "artist_genre_count") + + +def transformation_genre_count(): + + artist_info_table = ( + extract() .filter( + col("date_time") == TODAY) .dropDuplicates( + ["title"]) .withColumn( + "song_genre", + from_json( + col("song_genre"), + ArrayType( + StringType()))) .withColumn( + "song_genre", + explode( + col("song_genre")))) + + spotify_genre_count = ( + artist_info_table.groupBy("song_genre") + .agg(count("song_genre").alias("genre_count")) + .withColumn("date_time", current_date()) + .orderBy(desc("genre_count")) + ) + + sql = """ + CREATE TABLE IF NOT EXISTS spotify_genre_count( + song_genre VARCHAR(100), + genre_count int, + date_time DATE + ) + """ + + load(sql, spotify_genre_count, "spotify_genre_count") + + +def extract(): + + spark = create_spark_session("chart_genre_count_table") + table = read_snowflake_spark_dataframe(spark, "artist_info_globalTop50") + + return table + + +if __name__ == "__main__": + transformation_genre_count() + transformation_artist_genre_count() diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 71724ba..d323f13 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -28,9 +28,9 @@ def save_as_csv_file(df, logical_date): dir_path = "crawling_data" file_path = f"data/spotify_crawling_data_{TODAY}.csv" - - df.index.name = 'rank' - df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=True) + + df.index.name = "rank" + df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=True) load_s3_bucket(dir_path, f"spotify_crawling_data_{logical_date}.csv") @@ -53,7 +53,6 @@ def data_crawling(logical_date): with webdriver.Chrome(service=Service(), options=chrome_options) as driver: print("크롤링 시작") - driver.get(url) driver.implicitly_wait(500) @@ -72,13 +71,14 @@ def data_crawling(logical_date): ) # 페이지 로딩 대기 driver.implicitly_wait(30) - driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") - + driver.execute_script( + "window.scrollTo(0, document.body.scrollHeight);") + time.sleep(2) song_lists = driver.find_elements( By.XPATH, '//*[@id="main"]//div[@role="row"]' ) - + print(len(song_lists)) for i in range(1, len(song_lists)): diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index e6863b9..2d2040a 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -146,6 +146,9 @@ def extract(url: str) -> Optional[Dict[str, Any]]: ): # 토큰 만료시 재요청 time.sleep(3) get_token() # Variable에 저장된 token 변경 + headers = { + "Authorization": f"Bearer {access_token}", + } response = requests.get(url, headers=headers) result = response.json() diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 8c3fb96..8cb81c3 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -52,4 +52,15 @@ dag=dag, ) - [artist_info_Top10_table, artist_info_globalTop50_table] + spotify_genre_count_table = SparkSubmitOperator( + task_id="spotify_genre_count_table", + application="dags/scripts/ELT_chart_genre_count.py", + conn_id="spark_conn", + jars=SPARK_JARS, + dag=dag, + ) + + [ + artist_info_Top10_table, + artist_info_globalTop50_table, + ] >> spotify_genre_count_table