Skip to content

Commit c9cebae

Browse files
committed
chunked transfer with buffer
1 parent de4794d commit c9cebae

File tree

5 files changed

+245
-91
lines changed

5 files changed

+245
-91
lines changed

config/dev.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ config :pling, dev_routes: true
6464

6565
config :logger, :console,
6666
format: "[$level] $message $metadata\n",
67-
metadata: [:playlist_id, :error, :chunk, :track_id]
67+
metadata: [:playlist_id, :error, :chunk, :track_id],
68+
level: :debug
6869

6970
# Set a higher stacktrace during development. Avoid configuring such
7071
# in production as building large stacktraces may be expensive.

lib/pling/playlists/music_library.ex

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,17 @@ defmodule Pling.Playlists.MusicLibrary do
5959
Gets a playlist by Spotify ID, fetching it from Spotify if it doesn't exist locally.
6060
Returns the playlist as soon as it has basic info and at least one track.
6161
"""
62-
def get_or_fetch_playlist(spotify_id, timeout \\ 5000) do
62+
def get_or_fetch_playlist(spotify_id) do
6363
case Repo.get(Playlist, spotify_id) do
6464
nil ->
6565
ref = make_ref()
6666
parent = self()
67-
notification_agent = Process.spawn(fn -> receive do: (_ -> :ok) end, [])
67+
Logger.debug("Starting playlist fetch", spotify_id: spotify_id, ref: ref)
6868

6969
callback = fn
7070
%{"playlist_info" => info} ->
71+
Logger.debug("Processing playlist info", name: info["name"])
72+
7173
playlist = %Playlist{
7274
spotify_id: spotify_id,
7375
name: info["name"],
@@ -77,63 +79,78 @@ defmodule Pling.Playlists.MusicLibrary do
7779
}
7880

7981
{:ok, saved_playlist} = Repo.insert(playlist)
80-
saved_playlist
82+
:ok
8183

8284
%{"track" => track} ->
83-
result =
84-
Repo.transaction(fn ->
85-
track_entry = %Track{
86-
uri: track["uri"],
87-
title: track["name"],
88-
artists: Enum.map(track["artists"], & &1["name"]),
89-
popularity: track["popularity"],
90-
album: track["album"]["name"]
91-
}
92-
93-
# First ensure the track exists
94-
{:ok, saved_track} = Repo.insert(track_entry, on_conflict: :nothing)
95-
96-
# Then create the association only if track was saved
97-
if saved_track.uri do
98-
{1, _} =
99-
Repo.insert_all(
100-
"playlist_tracks",
101-
[
102-
%{
103-
track_uri: saved_track.uri,
104-
playlist_spotify_id: spotify_id,
105-
inserted_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second),
106-
updated_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
107-
}
108-
],
109-
on_conflict: :nothing
110-
)
111-
end
112-
113-
# Notify parent process of first track
114-
if not Process.get(@notification_key) do
115-
Process.put(@notification_key, true)
116-
send(parent, {:first_track_saved, ref, spotify_id})
117-
end
118-
119-
saved_track
120-
end)
121-
122-
# Try to notify of first track - only the first message will be received
123-
try do
124-
send(notification_agent, :notified)
125-
send(parent, {:first_track_saved, ref, spotify_id})
126-
catch
127-
# Process already notified
128-
:error, :badarg -> :ok
85+
Logger.debug("Processing track", title: track["name"], uri: track["uri"])
86+
87+
case Repo.transaction(fn ->
88+
track_entry = %Track{
89+
uri: track["uri"],
90+
title: track["name"],
91+
artists: Enum.map(track["artists"], & &1["name"]),
92+
popularity: track["popularity"],
93+
album: track["album"]["name"]
94+
}
95+
96+
# First ensure the track exists
97+
{:ok, saved_track} = Repo.insert(track_entry, on_conflict: :nothing)
98+
Logger.debug("Track saved", uri: saved_track.uri)
99+
100+
# Then create the association only if track was saved
101+
if saved_track.uri do
102+
{1, _} =
103+
Repo.insert_all(
104+
"playlist_tracks",
105+
[
106+
%{
107+
track_uri: saved_track.uri,
108+
playlist_spotify_id: spotify_id,
109+
inserted_at:
110+
NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second),
111+
updated_at:
112+
NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
113+
}
114+
],
115+
on_conflict: :nothing
116+
)
117+
118+
Logger.debug("Playlist-track association created",
119+
track_uri: saved_track.uri,
120+
playlist_id: spotify_id
121+
)
122+
end
123+
124+
# Notify parent process of first track
125+
notified = Process.get(@notification_key)
126+
127+
Logger.debug("Checking notification status",
128+
already_notified: notified,
129+
notification_key: @notification_key
130+
)
131+
132+
if notified == nil do
133+
Process.put(@notification_key, true)
134+
Logger.debug("Sending first track notification", ref: ref)
135+
send(parent, {:first_track_saved, ref, spotify_id})
136+
end
137+
138+
saved_track
139+
end) do
140+
{:ok, saved_track} ->
141+
Logger.debug("Transaction completed successfully", track_uri: saved_track.uri)
142+
:ok
143+
144+
{:error, reason} ->
145+
Logger.warning("Transaction failed", error: inspect(reason))
146+
:ok
129147
end
130-
131-
result
132148
end
133149

134150
Task.start(fn ->
135151
case Pling.Services.Spotify.stream_playlist(spotify_id, callback) do
136152
:ok ->
153+
Logger.debug("Playlist stream completed successfully", spotify_id: spotify_id)
137154
:ok
138155

139156
{:error, reason} ->
@@ -145,19 +162,16 @@ defmodule Pling.Playlists.MusicLibrary do
145162

146163
receive do
147164
{:first_track_saved, ^ref, playlist_id} ->
165+
Logger.debug("Received first track notification", playlist_id: playlist_id)
166+
148167
case Repo.get(Playlist, playlist_id) do
149168
nil -> {:error, :playlist_fetch_failed}
150169
playlist -> {:ok, :first_track_saved, playlist}
151170
end
152171

153172
{:playlist_error, ^ref, reason} ->
173+
Logger.debug("Received playlist error", error: inspect(reason))
154174
{:error, reason}
155-
after
156-
timeout ->
157-
case Repo.get(Playlist, spotify_id) do
158-
nil -> {:error, :playlist_fetch_timeout}
159-
playlist -> {:ok, :timeout, playlist}
160-
end
161175
end
162176

163177
playlist ->
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
defmodule Pling.Services.NDJsonStream do
2+
@moduledoc """
3+
Handles streaming of newline-delimited JSON (NDJSON) data, processing complete objects
4+
as they arrive and handling partial chunks internally.
5+
"""
6+
require Logger
7+
8+
@doc """
9+
Creates a streaming function that processes NDJSON data and calls the callback
10+
for each complete JSON object.
11+
12+
The function handles:
13+
- Complete JSON objects
14+
- Multiple objects in one chunk
15+
- Partial objects across chunks
16+
- Invalid JSON gracefully
17+
- Final buffer processing
18+
"""
19+
def stream_fn(callback) when is_function(callback, 1) do
20+
fn
21+
{:data, data}, buffer ->
22+
process_chunk(buffer <> data, false, callback)
23+
24+
{:done, data}, buffer ->
25+
process_chunk(buffer <> (data || ""), true, callback)
26+
27+
_, buffer ->
28+
{:cont, buffer}
29+
end
30+
end
31+
32+
defp process_chunk(data, is_final?, callback) do
33+
parts = String.split(data, "\n")
34+
35+
case {parts, is_final?} do
36+
{[], _} -> {:cont, ""}
37+
{[single], true} -> process_json(single, callback) && {:cont, ""}
38+
{parts, true} -> process_all_parts(parts, callback) && {:cont, ""}
39+
{parts, false} -> process_parts(parts, callback)
40+
end
41+
end
42+
43+
defp process_all_parts(parts, callback) do
44+
Enum.each(parts, &process_json(&1, callback))
45+
end
46+
47+
defp process_parts([partial], _callback), do: {:cont, partial}
48+
49+
defp process_parts(parts, callback) do
50+
[last | rest] = Enum.reverse(parts)
51+
52+
rest
53+
|> Enum.reverse()
54+
|> Enum.each(&process_json(&1, callback))
55+
56+
{:cont, last}
57+
end
58+
59+
defp process_json("", _callback), do: :ok
60+
61+
defp process_json(line, callback) do
62+
case Jason.decode(line) do
63+
{:ok, decoded} ->
64+
callback.(decoded)
65+
66+
{:error, error} ->
67+
Logger.warning(
68+
"Failed to decode JSON line: #{inspect(error)}\nLine content: #{inspect(line)}"
69+
)
70+
end
71+
end
72+
end

lib/pling/services/spotify.ex

Lines changed: 27 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
defmodule Pling.Services.Spotify do
22
require Logger
3+
alias Pling.Services.NDJsonStream
34

45
@base_url "https://playlist-fetcher-1031294514094.europe-north1.run.app"
56
@api_key "1367a6ef2cb6f3cd2bfb3f73978292fdae3bc9a5e885ca7632d4eb3fc15eb4d4"
@@ -13,41 +14,34 @@ defmodule Pling.Services.Spotify do
1314
headers: [{"x-api-key", @api_key}]
1415
)
1516

16-
case Req.get(req,
17-
url: "/playlist/#{playlist_id}",
18-
params: [stream: true],
19-
into: fn {:data, data}, {req, resp} ->
20-
String.split(data, "\n", trim: true)
21-
|> Enum.each(fn line ->
22-
case Jason.decode(line) do
23-
{:ok, decoded} ->
24-
Logger.debug("Processing playlist item",
25-
playlist_id: playlist_id,
26-
track_id: decoded["id"]
27-
)
28-
29-
callback.(decoded)
30-
31-
{:error, error} ->
32-
Logger.error("Failed to decode chunk",
33-
playlist_id: playlist_id,
34-
error: inspect(error),
35-
chunk: line
36-
)
37-
end
38-
end)
39-
40-
{:cont, {req, resp}}
41-
end
42-
) do
43-
{:ok, _response} ->
44-
Logger.info("Completed playlist stream", playlist_id: playlist_id)
45-
:ok
46-
47-
{:error, error} ->
17+
stream_fn = NDJsonStream.stream_fn(callback)
18+
buffer = ""
19+
20+
try do
21+
Req.get!(req,
22+
url: "/playlist/#{playlist_id}",
23+
params: [stream: true],
24+
into: fn
25+
{:data, data}, {req, resp} ->
26+
{_, new_buffer} = stream_fn.({:data, data}, buffer)
27+
{:cont, {req, resp}}
28+
29+
{:done, data}, {req, resp} ->
30+
stream_fn.({:done, data}, buffer)
31+
{:cont, {req, resp}}
32+
33+
_, state ->
34+
{:cont, state}
35+
end
36+
)
37+
38+
Logger.info("Completed playlist stream", playlist_id: playlist_id)
39+
:ok
40+
catch
41+
kind, error ->
4842
Logger.error("Failed to stream playlist",
4943
playlist_id: playlist_id,
50-
error: inspect(error)
44+
error: Exception.format(kind, error, __STACKTRACE__)
5145
)
5246

5347
{:error, error}

0 commit comments

Comments
 (0)