Skip to content

Commit 672b7b4

Browse files
committed
letsplay_runner_core: Slowly bring up rpc transport
as well as detail outstanding additions to the client main to make it speak RPC properly
1 parent 9a672b9 commit 672b7b4

File tree

9 files changed

+147
-66
lines changed

9 files changed

+147
-66
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ clap = { version = "4.5.6", features = ["cargo"] }
4545
# Async stuff
4646
tokio = { version = "1.37.0", features = [ "full" ] }
4747
tokio-util = { version = "0.7.13" }
48+
futures = "0.3.31"
4849
futures-util = "0.3.31"
4950

5051
# Tracing

crates/letsplay_retro_frontend/src/frontend/frontend_impl.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ impl Frontend {
8181
/// # Notes
8282
///
8383
/// As mentioned before, only one frontend can be created at once in a application.
84-
///
84+
///
8585
/// The provided [FrontendInterface] implementation *must* last at least as long as the
8686
/// frontend itself.
87-
///
87+
///
8888
/// The returned [Box] *must* be held until the frontend is no longer used.
8989
pub fn new(interface: *mut dyn FrontendInterface) -> Box<Self> {
9090
let mut boxed = Box::new(Self {
@@ -132,6 +132,16 @@ impl Frontend {
132132
self.core_library.is_some() && self.core_api.is_some()
133133
}
134134

135+
pub fn game_loaded(&self) -> bool {
136+
// It doesn't exactly make sense for a game to be considered
137+
// loaded if we don't have a core.
138+
if !self.core_loaded() {
139+
return false;
140+
}
141+
142+
self.game_loaded
143+
}
144+
135145
/// Plugs in an input device to the specified port.
136146
pub fn plug_input_device(&mut self, port: u32, device: *mut dyn InputDevice) {
137147
if self.core_loaded() {

crates/letsplay_runner_core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ bytes.workspace = true
1717
tokio.workspace = true
1818
tokio-util = { workspace = true, features = ["codec"] }
1919
futures-util = { workspace = true, features = [ "sink" ] }
20+
futures.workspace = true
2021

2122
tracing.workspace = true
2223
tracing-subscriber.workspace = true
@@ -47,3 +48,6 @@ client = [ ]
4748

4849
# Build runner server code
4950
server = [ ]
51+
52+
# bleh
53+
temp = []

crates/letsplay_runner_core/src/client/game.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@ use letsplay_av_ffmpeg::encoder_thread::Control;
33
use super::GraphicsContexts;
44
use std::time::Instant;
55

6+
#[repr(u32)]
7+
#[derive(Eq, PartialEq, PartialOrd, Ord)]
8+
pub enum ConfigurationState {
9+
ConfigurationNeeded,
10+
ConfigurationComplete
11+
}
12+
613
/// A game that should be run by the runner core.
714
///
815
/// # Implementation notes
@@ -18,6 +25,9 @@ pub trait Game {
1825
// Not needed per se since we will just exit after shutdown,
1926
// but cleaning up after ourselves isn't bad programming practice
2027

28+
/// Gets the configuration state of the game.
29+
fn get_configuration_state(&mut self) -> ConfigurationState;
30+
2131
/// Set a named property. Failable.
2232
fn set_property(&mut self, key: &str, value: &str) -> anyhow::Result<()>;
2333

crates/letsplay_runner_core/src/client/game_thread.rs

Lines changed: 59 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use tokio::sync::{
1111
oneshot,
1212
};
1313

14+
use crate::client::ConfigurationState;
15+
1416
use super::{Game, GraphicsContexts};
1517

1618
use letsplay_av_ffmpeg::encoder_thread;
@@ -59,56 +61,70 @@ fn main(mut rx: mpsc::UnboundedReceiver<GameThreadMessage>, mut game: Box<dyn Ga
5961

6062
game.init(&contexts, &video_encoder_control);
6163

62-
// TEMP CODE: This accepts a single tcp connection and broadcasts packets to it
63-
// this is temporary as all hell
64-
// yes I know sync io but its running on another thread anyways so its fine.
65-
let server = std::net::TcpListener::bind("0.0.0.0:6040").expect("rrr");
66-
let mut clients = Vec::new();
67-
68-
while clients.len() != 1 {
69-
let client = server.accept().expect("baned");
70-
clients.push(client.0);
71-
}
64+
#[cfg(feature = "temp")]
65+
{
66+
// TEMP CODE: This accepts a single tcp connection and broadcasts NALU packets to it
67+
// this is temporary as all hell
68+
// yes I know sync io but its running on another thread anyways so its fine.
69+
let server = std::net::TcpListener::bind("0.0.0.0:6040").expect("rrr");
70+
let mut clients = Vec::new();
71+
72+
while clients.len() != 1 {
73+
let client = server.accept().expect("baned");
74+
clients.push(client.0);
75+
}
7276

73-
tracing::info!("all clients accepted - unblocking and completing intialization");
77+
tracing::info!("all clients accepted - unblocking and completing intialization");
7478

75-
// Helper thread
76-
std::thread::spawn(move || loop {
77-
let frame = packet_waiter.wait_for_packet();
78-
for client in &mut clients {
79-
let _ = client.write_all(frame.data().unwrap());
80-
}
81-
});
79+
// Helper thread
80+
std::thread::spawn(move || loop {
81+
let frame = packet_waiter.wait_for_packet();
82+
for client in &mut clients {
83+
let _ = client.write_all(frame.data().unwrap());
84+
}
85+
});
86+
}
8287

8388
loop {
8489
match rx.try_recv() {
85-
Ok(message) => match message {
86-
GameThreadMessage::Shutdown => break,
87-
GameThreadMessage::Suspend { suspend, tx } => {
88-
if suspended != suspend {
89-
suspended = suspend
90-
}
90+
Ok(message) => {
91+
match message {
92+
GameThreadMessage::Shutdown => break,
93+
GameThreadMessage::Suspend { suspend, tx } => {
94+
if suspended != suspend {
95+
// TODO: This is temporary, since we probably should instead shutdown or something
96+
// since a unconfigured game indicates a JSON misconfiguration.
97+
if suspend == false
98+
&& game.get_configuration_state()
99+
== ConfigurationState::ConfigurationNeeded
100+
{
101+
tracing::error!("Attempting to unsuspend a game that hasn't been fully configured!");
102+
continue;
103+
}
104+
suspended = suspend
105+
}
91106

92-
let _ = tx.send(());
93-
}
107+
let _ = tx.send(());
108+
}
94109

95-
GameThreadMessage::Reset { tx } => {
96-
game.reset();
97-
let _ = tx.send(());
98-
}
110+
GameThreadMessage::Reset { tx } => {
111+
game.reset();
112+
let _ = tx.send(());
113+
}
99114

100-
GameThreadMessage::SetProperty { key, value, tx } => {
101-
// TODO: we should send the error result to the given tx
102-
// so that we can propegate errors to the main thread
103-
match game.set_property(&key, &value) {
104-
Ok(_) => {}
105-
Err(err) => {
106-
tracing::error!("Error setting property {key} to {value}: {}", err);
107-
}
108-
};
109-
let _ = tx.send(());
115+
GameThreadMessage::SetProperty { key, value, tx } => {
116+
// TODO: we should send the error result to the given tx
117+
// so that we can propegate errors to the main thread
118+
match game.set_property(&key, &value) {
119+
Ok(_) => {}
120+
Err(err) => {
121+
tracing::error!("Error setting property {key} to {value}: {}", err);
122+
}
123+
};
124+
let _ = tx.send(());
125+
}
110126
}
111-
},
127+
}
112128

113129
Err(TryRecvError::Empty) => {}
114130
Err(TryRecvError::Disconnected) => break,
@@ -197,8 +213,8 @@ impl GameThread {
197213
let _ = rx.await;
198214
}
199215

200-
/// Shuts down the game thread.
201-
pub async fn shutdown(self) {
216+
/// Shuts down and waits for the game thread to exit.
217+
pub fn shutdown(self) {
202218
let _ = self.tx.send(GameThreadMessage::Shutdown);
203219
self.join_handle.join().expect("Failed to join game thread");
204220
}

crates/letsplay_runner_core/src/client/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,24 @@ use tracing_subscriber::FmtSubscriber;
1717

1818
use thiserror::Error;
1919

20+
// TODO: Use this
2021
#[derive(Error, Debug)]
2122
pub enum RunnerError {}
2223

2324
/// The main Let's Play runners using the letsplay_runner_core crate utilize.
24-
pub async fn main(game: Box<dyn Game + Send>) -> Result<(), RunnerError> {
25+
pub async fn main(game: Box<dyn Game + Send>) -> anyhow::Result<()> {
2526
let subscriber = FmtSubscriber::builder()
2627
.with_max_level(Level::INFO)
2728
.with_thread_names(true)
2829
.finish();
2930

3031
tracing::subscriber::set_global_default(subscriber).unwrap();
3132

32-
let game_thread = GameThread::spawn(game);
33+
// DOGFOOD:
34+
// - Implement JSON configuration
35+
// - Implement RPC client (including both local and remote modes)
3336

34-
// just spin forever for now
35-
//
36-
// TODO: implement RPC client
37+
let game_thread = GameThread::spawn(game);
3738

3839
// TEMP: Just for testing libretro runner bringup
3940
game_thread
@@ -49,14 +50,15 @@ pub async fn main(game: Box<dyn Game + Send>) -> Result<(), RunnerError> {
4950
)
5051
.await;
5152

53+
// FIXME: Remove this when RPC client is implemented
54+
// (this is temporary for bringup)
5255
game_thread.set_suspend(false).await;
5356

5457
loop {
5558
time::sleep(Duration::from_secs(1)).await;
5659
}
5760

58-
// Once we exit the loop, we should shutdown the game thread
59-
// and all our resources
61+
game_thread.shutdown();
6062

6163
Ok(())
6264
}

crates/letsplay_runner_core/src/shared/rpc_transport.rs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
1+
use bytes::{Bytes, BytesMut};
2+
use futures::{
3+
stream::{SplitSink, SplitStream},
4+
Stream,
5+
};
16
use letsplay_core::si_unit::MB;
7+
use protobuf::{Message, Serialize};
28
use tokio::io::{AsyncReadExt, AsyncWriteExt};
39
use tokio_util::codec::{Framed, LengthDelimitedCodec};
410

511
use crate::shared::proto::{ClientMessage, ServerMessage};
612

713
use futures_util::{SinkExt, StreamExt};
814

9-
// FIXME(s):
15+
// FIXME(s):
1016
// - DO NOT USE ANYHOW! DO NOT! SIMPLY DO NOT
1117
// - Instead of providing a static read_xxx helper
12-
// I wonder if it would be slightly more ergonomic to
18+
// I wonder if it would be slightly more ergonomic to
1319
// return a futures map() or whatever which parses
1420
// the message. It would allow us to handle hangup
1521
// a BIT easier, and tackle the first fixme too.
@@ -23,27 +29,50 @@ pub struct RpcTransport<RW>
2329
where
2430
RW: AsyncReadExt + AsyncWriteExt + Unpin,
2531
{
26-
transport: Framed<RW, LengthDelimitedCodec>,
32+
read: SplitStream<Framed<RW, LengthDelimitedCodec>>,
33+
write: SplitSink<Framed<RW, LengthDelimitedCodec>, Bytes>,
2734
}
2835

2936
impl<RW> RpcTransport<RW>
3037
where
3138
RW: AsyncReadExt + AsyncWriteExt + Unpin,
3239
{
3340
pub fn from_stream(stream: RW) -> Self {
34-
Self {
35-
transport: LengthDelimitedCodec::builder()
36-
.length_field_type::<u32>()
37-
.max_frame_length(MAX_FRAME_SIZE)
38-
.new_framed(stream),
39-
}
41+
let framed = LengthDelimitedCodec::builder()
42+
.length_field_type::<u32>()
43+
.max_frame_length(MAX_FRAME_SIZE)
44+
.new_framed(stream);
45+
46+
let (write, read) = framed.split();
47+
Self { read, write }
4048
}
4149

42-
#[cfg(feature = "client")]
43-
pub async fn read_server_message(&mut self) -> anyhow::Result<ServerMessage> {
44-
if let Some(framed) = self.transport.next().await {
45-
let bytes = framed?;
50+
/* this MIGHT be better but doesnt compile
51+
pub async fn sink_server_messages(&self) -> impl Stream<Item = anyhow::Result<ServerMessage>> {
52+
self.read.map(|res| {
53+
let bytes = res?;
4654
Ok(ServerMessage::parse(&bytes[..])?)
55+
})
56+
}
57+
*/
58+
59+
/// Writes a single protobuf message.
60+
pub async fn write_message<T: Message>(
61+
&mut self,
62+
message: &T,
63+
) -> anyhow::Result<()> {
64+
let bytes = Bytes::from(message.serialize()?);
65+
self.write.send(bytes).await?;
66+
Ok(())
67+
}
68+
69+
/// Read a protobuf message.
70+
pub async fn read_message<T: Message>(
71+
&mut self
72+
) -> anyhow::Result<T> {
73+
if let Some(framed) = self.read.next().await {
74+
let bytes = framed?;
75+
Ok(T::parse(&bytes[..])?)
4776
} else {
4877
Err(anyhow::anyhow!("End of stream"))
4978
}

crates/letsplay_runner_retro/src/game.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{
44
};
55

66
use anyhow::Context;
7-
use client::GraphicsContexts;
7+
use client::{ConfigurationState, GraphicsContexts};
88
use letsplay_core::sleep;
99
use letsplay_gpu::{self as gpu, GlFramebuffer};
1010
use letsplay_retro_frontend::{
@@ -84,6 +84,14 @@ impl client::Game for RetroGame {
8484
self.get_frontend().reset();
8585
}
8686

87+
fn get_configuration_state(&mut self) -> ConfigurationState {
88+
if self.get_frontend().game_loaded() {
89+
return ConfigurationState::ConfigurationComplete;
90+
}
91+
92+
ConfigurationState::ConfigurationNeeded
93+
}
94+
8795
fn set_property(&mut self, key: &str, value: &str) -> anyhow::Result<()> {
8896
match key {
8997
"libretro.core" => {

0 commit comments

Comments
 (0)