From 0a23f5a7d306036093a78e0e9a56c176b84d5cbc Mon Sep 17 00:00:00 2001 From: YEERRin Date: Sun, 16 Mar 2025 22:59:00 +0900 Subject: [PATCH 1/4] =?UTF-8?q?[feat]=20spotify=20=EC=B0=A8=ED=8A=B8=20?= =?UTF-8?q?=EC=9D=B8=20=EB=85=B8=EB=9E=98=20=EC=9E=A5=EB=A5=B4=20count=20E?= =?UTF-8?q?LT=20DAG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/plugins/spark_snowflake_conn.py | 12 ++++ .../scripts/ELT_artist_info_globalTop50.py | 25 +++++++- airflow/dags/scripts/ELT_chart_genre_count.py | 58 +++++++++++++++++++ airflow/dags/spotify_ELT_DAG.py | 10 +++- 4 files changed, 101 insertions(+), 4 deletions(-) create mode 100644 airflow/dags/scripts/ELT_chart_genre_count.py diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index 1bf43b6..eb1f447 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -61,3 +61,15 @@ def write_snowflake_spark_dataframe(table_name, df): df.write.format("snowflake").options(**snowflake_options).option( "dbtable", f"{table_name}" ).mode("append").save() + + +def read_snowflake_spark_dataframe(spark, table_name): + + df = spark.read \ + .format("snowflake") \ + .options(**snowflake_options) \ + .option("dbtable", table_name) \ + .load() + + + return df \ No newline at end of file diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index 4f05dd1..f5c2204 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -5,8 +5,8 @@ import snowflake.connector from plugins.spark_snowflake_conn import * from pyspark.sql.functions import (col, current_date, explode, lit, - regexp_replace, split, when) -from pyspark.sql.types import IntegerType, StringType, StructField, StructType + regexp_replace, split, udf) +from pyspark.sql.types import IntegerType, StringType, StructField, StructType, ArrayType LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") @@ -27,7 +27,8 @@ def load(): artist VARCHAR(100), artist_name VARCHAR(100), artist_genre ARRAY, - date_time DATE + date_time DATE, + song_genre ARRAY ) """ @@ -76,9 +77,27 @@ def transformation(): artist_info_top50_df = artist_info_top50_df.withColumn( "date_time", current_date()) + + artist_info_top50_df = artist_info_top50_df.withColumn("song_genre", add_song_genre_udf(col("artist_name"), col("title"))) return artist_info_top50_df +def add_song_genre(artist, track): + + url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json" + print(url) + + try: + response = requests.get(url).json() + return [genre['name'] for genre in response.get('track', {}).get('toptags', {}).get('tag', [])] + except requests.exceptions.RequestException as e: + print(f"API 요청 오류: {e}") + return ["API Error"] + except KeyError: + return ["Unknown"] + +add_song_genre_udf = udf(add_song_genre, ArrayType(StringType())) + def extract(file_name, schema): diff --git a/airflow/dags/scripts/ELT_chart_genre_count.py b/airflow/dags/scripts/ELT_chart_genre_count.py new file mode 100644 index 0000000..35ba3f5 --- /dev/null +++ b/airflow/dags/scripts/ELT_chart_genre_count.py @@ -0,0 +1,58 @@ +from pyspark.sql.functions import col, explode, from_json, count, current_date, desc +from pyspark.sql.types import StringType, ArrayType + +import requests +import snowflake.connector +from datetime import datetime + +from plugins.spark_snowflake_conn import * + + +TODAY = datetime.today().strftime("%Y-%m-%d") + +def load(): + + sql = """ + CREATE TABLE IF NOT EXISTS spotify_genre_count( + song_genre VARCHAR(100), + genre_count int, + date_time DATE + ) + """ + create_snowflake_table(sql) + transform_df = transformation() + write_snowflake_spark_dataframe('spotify_genre_count', transform_df) + + + +def transformation(): + + artist_info_table = ( + extract() + .filter(col('date_time')==TODAY) + .dropDuplicates(['title']) + .withColumn("song_genre", from_json(col("song_genre"), ArrayType(StringType()))) + .withColumn("song_genre",explode(col("song_genre"))) + ) + + spotify_genre_count = artist_info_table.groupBy("song_genre").agg(count("song_genre").alias("genre_count")) + + spotify_genre_count = ( + artist_info_table + .groupBy("song_genre").agg(count("song_genre").alias("genre_count")) + .withColumn("date_time", current_date()) + .orderBy(desc("genre_count")) + ) + + return spotify_genre_count + +def extract(): + + spark = create_spark_session("chart_genre_count_table") + table = read_snowflake_spark_dataframe(spark, 'artist_info_globalTop50') + + return table + + +if __name__ == "__main__": + load() \ No newline at end of file diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 8c3fb96..3f619d6 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -51,5 +51,13 @@ jars=SPARK_JARS, dag=dag, ) + + spotify_genre_count_table = SparkSubmitOperator( + task_id = 'spotify_genre_count_table', + application='dags/scripts/ELT_chart_genre_count.py', + conn_id = 'spark_conn', + jars=SPARK_JARS, + dag=dag + ) - [artist_info_Top10_table, artist_info_globalTop50_table] + [artist_info_Top10_table, artist_info_globalTop50_table] >> spotify_genre_count_table From 2655395ed0936d75a0b711a78236576bf8ab1f13 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Sun, 16 Mar 2025 14:01:41 +0000 Subject: [PATCH 2/4] Automated format fixes --- airflow/dags/plugins/spark_snowflake_conn.py | 16 ++--- .../scripts/ELT_artist_info_globalTop50.py | 54 ++++++++++------ airflow/dags/scripts/ELT_chart_genre_count.py | 61 +++++++++++-------- airflow/dags/scripts/crawling_spotify_data.py | 14 ++--- airflow/dags/spotify_ELT_DAG.py | 15 +++-- 5 files changed, 94 insertions(+), 66 deletions(-) diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index eb1f447..3f686b3 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -64,12 +64,12 @@ def write_snowflake_spark_dataframe(table_name, df): def read_snowflake_spark_dataframe(spark, table_name): - - df = spark.read \ - .format("snowflake") \ - .options(**snowflake_options) \ - .option("dbtable", table_name) \ + + df = ( + spark.read.format("snowflake") + .options(**snowflake_options) + .option("dbtable", table_name) .load() - - - return df \ No newline at end of file + ) + + return df diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index f5c2204..f090d71 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -6,7 +6,8 @@ from plugins.spark_snowflake_conn import * from pyspark.sql.functions import (col, current_date, explode, lit, regexp_replace, split, udf) -from pyspark.sql.types import IntegerType, StringType, StructField, StructType, ArrayType +from pyspark.sql.types import (ArrayType, IntegerType, StringType, StructField, + StructType) LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") @@ -77,25 +78,36 @@ def transformation(): artist_info_top50_df = artist_info_top50_df.withColumn( "date_time", current_date()) - - artist_info_top50_df = artist_info_top50_df.withColumn("song_genre", add_song_genre_udf(col("artist_name"), col("title"))) + + artist_info_top50_df = artist_info_top50_df.withColumn( + "song_genre", add_song_genre_udf(col("artist_name"), col("title")) + ) return artist_info_top50_df + def add_song_genre(artist, track): - + url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json" print(url) - + try: response = requests.get(url).json() - return [genre['name'] for genre in response.get('track', {}).get('toptags', {}).get('tag', [])] + return [ + genre["name"] for genre in response.get( + "track", + {}).get( + "toptags", + {}).get( + "tag", + [])] except requests.exceptions.RequestException as e: print(f"API 요청 오류: {e}") return ["API Error"] except KeyError: return ["Unknown"] - + + add_song_genre_udf = udf(add_song_genre, ArrayType(StringType())) @@ -110,14 +122,22 @@ def extract(file_name, schema): ) if file_name == "crawling_data": - df = ( - df.withColumn( - "artist", split( - regexp_replace( - col("artist"), r"[\[\]']", ""), ", ")) .withColumn( - "artist_id", split( - regexp_replace( - col("artist_id"), r"[\[\]']", ""), ", "), ) ) + df = df.withColumn( + "artist", + split( + regexp_replace( + col("artist"), + r"[\[\]']", + ""), + ", ")).withColumn( + "artist_id", + split( + regexp_replace( + col("artist_id"), + r"[\[\]']", + ""), + ", "), + ) if file_name == "artist_info": df = df.withColumn( "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") @@ -126,9 +146,7 @@ def extract(file_name, schema): "artist_genre", split(df["artist_genre"], ", ") ) # 쉼표 기준으로 배열 변환 df = df.withColumnRenamed("artist", "artist_name") - - - + df.show() return df diff --git a/airflow/dags/scripts/ELT_chart_genre_count.py b/airflow/dags/scripts/ELT_chart_genre_count.py index 35ba3f5..4a32e05 100644 --- a/airflow/dags/scripts/ELT_chart_genre_count.py +++ b/airflow/dags/scripts/ELT_chart_genre_count.py @@ -1,17 +1,17 @@ -from pyspark.sql.functions import col, explode, from_json, count, current_date, desc -from pyspark.sql.types import StringType, ArrayType +from datetime import datetime import requests import snowflake.connector -from datetime import datetime - -from plugins.spark_snowflake_conn import * - +from plugins.spark_snowflake_conn import * +from pyspark.sql.functions import (col, count, current_date, desc, explode, + from_json) +from pyspark.sql.types import ArrayType, StringType TODAY = datetime.today().strftime("%Y-%m-%d") + def load(): - + sql = """ CREATE TABLE IF NOT EXISTS spotify_genre_count( song_genre VARCHAR(100), @@ -21,38 +21,45 @@ def load(): """ create_snowflake_table(sql) transform_df = transformation() - write_snowflake_spark_dataframe('spotify_genre_count', transform_df) - + write_snowflake_spark_dataframe("spotify_genre_count", transform_df) def transformation(): - + artist_info_table = ( - extract() - .filter(col('date_time')==TODAY) - .dropDuplicates(['title']) - .withColumn("song_genre", from_json(col("song_genre"), ArrayType(StringType()))) - .withColumn("song_genre",explode(col("song_genre"))) - ) - - spotify_genre_count = artist_info_table.groupBy("song_genre").agg(count("song_genre").alias("genre_count")) - + extract() .filter( + col("date_time") == TODAY) .dropDuplicates( + ["title"]) .withColumn( + "song_genre", + from_json( + col("song_genre"), + ArrayType( + StringType()))) .withColumn( + "song_genre", + explode( + col("song_genre")))) + + spotify_genre_count = artist_info_table.groupBy("song_genre").agg( + count("song_genre").alias("genre_count") + ) + spotify_genre_count = ( - artist_info_table - .groupBy("song_genre").agg(count("song_genre").alias("genre_count")) + artist_info_table.groupBy("song_genre") + .agg(count("song_genre").alias("genre_count")) .withColumn("date_time", current_date()) .orderBy(desc("genre_count")) ) return spotify_genre_count - + + def extract(): - + spark = create_spark_session("chart_genre_count_table") - table = read_snowflake_spark_dataframe(spark, 'artist_info_globalTop50') - + table = read_snowflake_spark_dataframe(spark, "artist_info_globalTop50") + return table -if __name__ == "__main__": - load() \ No newline at end of file +if __name__ == "__main__": + load() diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 71724ba..d323f13 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -28,9 +28,9 @@ def save_as_csv_file(df, logical_date): dir_path = "crawling_data" file_path = f"data/spotify_crawling_data_{TODAY}.csv" - - df.index.name = 'rank' - df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=True) + + df.index.name = "rank" + df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=True) load_s3_bucket(dir_path, f"spotify_crawling_data_{logical_date}.csv") @@ -53,7 +53,6 @@ def data_crawling(logical_date): with webdriver.Chrome(service=Service(), options=chrome_options) as driver: print("크롤링 시작") - driver.get(url) driver.implicitly_wait(500) @@ -72,13 +71,14 @@ def data_crawling(logical_date): ) # 페이지 로딩 대기 driver.implicitly_wait(30) - driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") - + driver.execute_script( + "window.scrollTo(0, document.body.scrollHeight);") + time.sleep(2) song_lists = driver.find_elements( By.XPATH, '//*[@id="main"]//div[@role="row"]' ) - + print(len(song_lists)) for i in range(1, len(song_lists)): diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 3f619d6..8cb81c3 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -51,13 +51,16 @@ jars=SPARK_JARS, dag=dag, ) - + spotify_genre_count_table = SparkSubmitOperator( - task_id = 'spotify_genre_count_table', - application='dags/scripts/ELT_chart_genre_count.py', - conn_id = 'spark_conn', + task_id="spotify_genre_count_table", + application="dags/scripts/ELT_chart_genre_count.py", + conn_id="spark_conn", jars=SPARK_JARS, - dag=dag + dag=dag, ) - [artist_info_Top10_table, artist_info_globalTop50_table] >> spotify_genre_count_table + [ + artist_info_Top10_table, + artist_info_globalTop50_table, + ] >> spotify_genre_count_table From 9d73c9b5e8d5c3443d97228b577bd9935be4e3af Mon Sep 17 00:00:00 2001 From: YEERRin Date: Tue, 18 Mar 2025 07:12:06 +0900 Subject: [PATCH 3/4] =?UTF-8?q?[fix]=20=ED=86=A0=ED=81=B0=20=EC=9A=94?= =?UTF-8?q?=EC=B2=AD=20=EC=8B=9C=20401=EC=97=90=EB=9F=AC=20=EC=A4=91?= =?UTF-8?q?=EB=B3=B5=20=EB=B0=9C=EC=83=9D=20=EC=97=90=EB=9F=AC=20=EC=88=98?= =?UTF-8?q?=EC=A0=95=20=EB=B0=8F=20ELT=20DAG=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/plugins/spark_snowflake_conn.py | 7 ++- airflow/dags/scripts/ELT_chart_genre_count.py | 48 ++++++++++++++----- airflow/dags/scripts/request_spotify_api.py | 3 ++ 3 files changed, 45 insertions(+), 13 deletions(-) diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index eb1f447..f11ad51 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -56,9 +56,12 @@ def create_snowflake_table(sql): def write_snowflake_spark_dataframe(table_name, df): - df.show() + snowflake_opts = snowflake_options.copy() + + if table_name in ['spotify_genre_count', 'artist_genre_count']: + snowflake_opts["sfSchema"] = 'ANALYTICS' - df.write.format("snowflake").options(**snowflake_options).option( + df.write.format("snowflake").options(**snowflake_opts).option( "dbtable", f"{table_name}" ).mode("append").save() diff --git a/airflow/dags/scripts/ELT_chart_genre_count.py b/airflow/dags/scripts/ELT_chart_genre_count.py index 35ba3f5..2831010 100644 --- a/airflow/dags/scripts/ELT_chart_genre_count.py +++ b/airflow/dags/scripts/ELT_chart_genre_count.py @@ -10,22 +10,39 @@ TODAY = datetime.today().strftime("%Y-%m-%d") -def load(): +def load(sql, df, table_name): + + create_snowflake_table(sql) + write_snowflake_spark_dataframe(table_name, df) + + +def transformation_artist_genre_count(): + artist_info_table = ( + extract() + .filter(col('date_time')==TODAY) + .dropDuplicates(['artist_id']) + .withColumn("artist_genre",explode(col("artist_genre"))) + ) + + artist_genre_count = ( + artist_info_table + .groupBy("artist_genre").agg(count("artist_genre").alias("count")) + .withColumn("date_time", current_date()) + ) sql = """ - CREATE TABLE IF NOT EXISTS spotify_genre_count( - song_genre VARCHAR(100), + CREATE TABLE IF NOT EXISTS artist_genre_count( + artist_genre VARCHAR(100), genre_count int, - date_time DATE + date_tiem DATE ) """ - create_snowflake_table(sql) - transform_df = transformation() - write_snowflake_spark_dataframe('spotify_genre_count', transform_df) + + load(sql, artist_genre_count, 'artist_genre_count') -def transformation(): +def transformation_genre_count(): artist_info_table = ( extract() @@ -43,8 +60,16 @@ def transformation(): .withColumn("date_time", current_date()) .orderBy(desc("genre_count")) ) - - return spotify_genre_count + + sql = """ + CREATE TABLE IF NOT EXISTS spotify_genre_count( + song_genre VARCHAR(100), + genre_count int, + date_time DATE + ) + """ + + load(sql, spotify_genre_count, 'spotify_genre_count') def extract(): @@ -55,4 +80,5 @@ def extract(): if __name__ == "__main__": - load() \ No newline at end of file + transformation_genre_count() + transformation_artist_genre_count() \ No newline at end of file diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index e6863b9..014dd7d 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -146,6 +146,9 @@ def extract(url: str) -> Optional[Dict[str, Any]]: ): # 토큰 만료시 재요청 time.sleep(3) get_token() # Variable에 저장된 token 변경 + headers = { + "Authorization": f"Bearer {access_token}", + } response = requests.get(url, headers=headers) result = response.json() From d1ad694915f44c3096fbc95dec4e008aac07ba2f Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Mon, 17 Mar 2025 22:28:07 +0000 Subject: [PATCH 4/4] Automated format fixes --- airflow/dags/plugins/spark_snowflake_conn.py | 6 +-- airflow/dags/scripts/ELT_chart_genre_count.py | 41 ++++++++++--------- airflow/dags/scripts/request_spotify_api.py | 2 +- 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index f2d2c92..69ae3d7 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -57,9 +57,9 @@ def create_snowflake_table(sql): def write_snowflake_spark_dataframe(table_name, df): snowflake_opts = snowflake_options.copy() - - if table_name in ['spotify_genre_count', 'artist_genre_count']: - snowflake_opts["sfSchema"] = 'ANALYTICS' + + if table_name in ["spotify_genre_count", "artist_genre_count"]: + snowflake_opts["sfSchema"] = "ANALYTICS" df.write.format("snowflake").options(**snowflake_opts).option( "dbtable", f"{table_name}" diff --git a/airflow/dags/scripts/ELT_chart_genre_count.py b/airflow/dags/scripts/ELT_chart_genre_count.py index b6bb012..e8aa604 100644 --- a/airflow/dags/scripts/ELT_chart_genre_count.py +++ b/airflow/dags/scripts/ELT_chart_genre_count.py @@ -11,26 +11,26 @@ def load(sql, df, table_name): - + create_snowflake_table(sql) write_snowflake_spark_dataframe(table_name, df) - + def transformation_artist_genre_count(): - + artist_info_table = ( extract() - .filter(col('date_time')==TODAY) - .dropDuplicates(['artist_id']) - .withColumn("artist_genre",explode(col("artist_genre"))) - ) - + .filter(col("date_time") == TODAY) + .dropDuplicates(["artist_id"]) + .withColumn("artist_genre", explode(col("artist_genre"))) + ) + artist_genre_count = ( - artist_info_table - .groupBy("artist_genre").agg(count("artist_genre").alias("count")) + artist_info_table.groupBy("artist_genre") + .agg(count("artist_genre").alias("count")) .withColumn("date_time", current_date()) ) - + sql = """ CREATE TABLE IF NOT EXISTS artist_genre_count( artist_genre VARCHAR(100), @@ -38,13 +38,12 @@ def transformation_artist_genre_count(): date_tiem DATE ) """ - - load(sql, artist_genre_count, 'artist_genre_count') - + + load(sql, artist_genre_count, "artist_genre_count") def transformation_genre_count(): - + artist_info_table = ( extract() .filter( col("date_time") == TODAY) .dropDuplicates( @@ -64,7 +63,7 @@ def transformation_genre_count(): .withColumn("date_time", current_date()) .orderBy(desc("genre_count")) ) - + sql = """ CREATE TABLE IF NOT EXISTS spotify_genre_count( song_genre VARCHAR(100), @@ -72,9 +71,10 @@ def transformation_genre_count(): date_time DATE ) """ - - load(sql, spotify_genre_count, 'spotify_genre_count') - + + load(sql, spotify_genre_count, "spotify_genre_count") + + def extract(): spark = create_spark_session("chart_genre_count_table") @@ -82,6 +82,7 @@ def extract(): return table -if __name__ == "__main__": + +if __name__ == "__main__": transformation_genre_count() transformation_artist_genre_count() diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index 014dd7d..2d2040a 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -147,7 +147,7 @@ def extract(url: str) -> Optional[Dict[str, Any]]: time.sleep(3) get_token() # Variable에 저장된 token 변경 headers = { - "Authorization": f"Bearer {access_token}", + "Authorization": f"Bearer {access_token}", } response = requests.get(url, headers=headers) result = response.json()