diff --git a/Cargo.lock b/Cargo.lock index f7b9f27b1f3..e594a1b6e69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4062,7 +4062,7 @@ dependencies = [ "colored", "dlpi", "libc", - "num_enum 0.5.11", + "num_enum", "nvpair", "nvpair-sys", "rusty-doors", diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index e29ed21192c..539294ddbd1 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -58,7 +58,7 @@ use sled_agent_client::types::InstancePutStateBody; use std::matches; use std::net::SocketAddr; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use uuid::Uuid; type SledAgentClientError = @@ -1710,7 +1710,7 @@ impl super::Nexus { instance_lookup: &lookup::Instance<'_>, params: ¶ms::InstanceSerialConsoleStreamRequest, ) -> Result<(), Error> { - let client_addr = match self + let propolis_addr = match self .propolis_addr_for_instance( opctx, instance_lookup, @@ -1737,7 +1737,7 @@ impl super::Nexus { }; match propolis_client::support::InstanceSerialConsoleHelper::new( - client_addr, + propolis_addr, offset, Some(self.log.clone()), ) @@ -1765,6 +1765,58 @@ impl super::Nexus { } } + pub(crate) async fn instance_vnc_stream( + &self, + opctx: &OpContext, + mut client_stream: WebSocketStream< + impl AsyncRead + AsyncWrite + Unpin + Send + 'static, + >, + instance_lookup: &lookup::Instance<'_>, + ) -> Result<(), Error> { + const VNC_PORT: u16 = 5900; + + let propolis_ip = match self + .propolis_addr_for_instance( + opctx, + instance_lookup, + authz::Action::Modify, + ) + .await + { + Ok(x) => x.ip(), + Err(e) => { + let _ = client_stream + .close(Some(CloseFrame { + code: CloseCode::Error, + reason: e.to_string().into(), + })) + .await + .is_ok(); + return Err(e); + } + }; + + match tokio::net::TcpStream::connect((propolis_ip, VNC_PORT)).await { + Ok(propolis_conn) => { + Self::proxy_tcp_socket_ws(client_stream, propolis_conn) + .await + .map_err(|e| Error::internal_error(&format!("{}", e))) + } + Err(e) => { + let message = + format!("socket connection to instance VNC failed: {}", e); + let _ = client_stream + .close(Some(CloseFrame { + code: CloseCode::Error, + reason: message.clone().into(), + })) + .await + .is_ok(); + Err(Error::internal_error(&message)) + } + } + } + async fn propolis_addr_for_instance( &self, opctx: &OpContext, @@ -1803,8 +1855,7 @@ impl super::Nexus { } } else { Err(Error::invalid_request(format!( - "instance is {} and has no active serial console \ - server", + "instance is {} and has no active console server", instance.runtime().nexus_state ))) } @@ -1976,6 +2027,113 @@ impl super::Nexus { Ok(()) } + /// Trivially pack data read from a TcpStream into binary websocket frames, + /// and unpack those received from the client accordingly. + /// NoVNC (a web VNC client) calls their version of this "websockify". + async fn proxy_tcp_socket_ws( + client_stream: WebSocketStream< + impl AsyncRead + AsyncWrite + Unpin + Send + 'static, + >, + propolis_conn: tokio::net::TcpStream, + ) -> Result<(), propolis_client::support::tungstenite::Error> { + let (mut nexus_sink, mut nexus_stream) = client_stream.split(); + let (mut propolis_reader, mut propolis_writer) = + propolis_conn.into_split(); + let (closed_tx, mut closed_rx) = tokio::sync::oneshot::channel::<()>(); + + let mut jh = tokio::spawn(async move { + // big enough for 1024x768 32bpp and then some + let mut read_buffer = vec![0u8; 4 * 1024 * 1024]; + loop { + tokio::select! { + _ = &mut closed_rx => break, + num_bytes_res = propolis_reader.read(&mut read_buffer) => { + let Ok(num_bytes) = num_bytes_res else { + let _ = nexus_sink.send(WebSocketMessage::Close(None)).await.is_ok(); + break; + }; + let data = Vec::from(&read_buffer[..num_bytes]); + match nexus_sink.send(WebSocketMessage::Binary(data)).await { + Ok(_) => {} + Err(_e) => break, + } + } + } + } + Ok::<_, Error>(nexus_sink) + }); + + let mut close_frame = None; + let mut task_joined = false; + loop { + tokio::select! { + res = &mut jh => { + task_joined = true; + if let Ok(Ok(mut nexus_sink)) = res { + // .take() here avoids borrow collision in the cleanup code + // below the loop where we also join the task if it hasn't been + let _ = nexus_sink + .send(WebSocketMessage::Close(close_frame.take())) + .await + .is_ok(); + } + break; + } + msg = nexus_stream.next() => { + match msg { + None => { + // websocket connection to nexus client closed unexpectedly + break; + } + Some(Err(e)) => { + // error in websocket connection to nexus client + return Err(e); + } + Some(Ok(WebSocketMessage::Close(_details))) => { + // websocket connection to nexus client closed normally + break; + } + Some(Ok(WebSocketMessage::Text(_text))) => { + // TODO: json payloads specifying client-sent metadata? + } + Some(Ok(WebSocketMessage::Binary(data))) => { + let mut start = 0; + while start < data.len() { + match propolis_writer.write(&data[start..]).await { + Ok(num_bytes) => { + start += num_bytes; + } + Err(e) => { + close_frame = Some(CloseFrame { + code: CloseCode::Error, + reason: e.to_string().into(), + }); + break; + } + } + } + } + // Frame won't exist at this level, and ping reply is handled by tungstenite + Some(Ok(WebSocketMessage::Frame(_) | WebSocketMessage::Ping(_) | WebSocketMessage::Pong(_))) => {} + } + } + } + } + + // double-joining a task handle is a panic + if !task_joined { + let _ = closed_tx.send(()).is_ok(); + if let Ok(Ok(mut nexus_sink)) = jh.await { + let _ = nexus_sink + .send(WebSocketMessage::Close(close_frame)) + .await + .is_ok(); + } + } + + Ok(()) + } + /// Attach an ephemeral IP to an instance. pub(crate) async fn instance_attach_ephemeral_ip( self: &Arc, @@ -2099,6 +2257,7 @@ mod tests { InstanceSerialConsoleHelper, WSClientOffset, }; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::test] async fn test_serial_console_stream_proxying() { @@ -2191,4 +2350,96 @@ mod tests { .expect("proxy task exited successfully"); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_tcp_stream_proxying() { + let logctx = test_setup_log("test_tcp_stream_proxying"); + let (nexus_client_conn, nexus_server_conn) = tokio::io::duplex(1024); + let propolis_listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("couldn't make TcpListener"); + + let addr = propolis_listener.local_addr().unwrap(); + + let jh = tokio::spawn(async move { propolis_listener.accept().await }); + + let propolis_client_conn = tokio::net::TcpStream::connect(addr) + .await + .expect("couldn't open TcpStream connection to TcpListener"); + + let mut propolis_server_conn = jh + .await + .expect("couldn't join") + .expect("couldn't accept client connection from TcpListener") + .0; + + let jh = tokio::spawn(async move { + let nexus_client_stream = WebSocketStream::from_raw_socket( + nexus_server_conn, + Role::Server, + None, + ) + .await; + Nexus::proxy_tcp_socket_ws( + nexus_client_stream, + propolis_client_conn, + ) + .await + }); + let mut nexus_client_ws = WebSocketStream::from_raw_socket( + nexus_client_conn, + Role::Client, + None, + ) + .await; + + slog::info!(logctx.log, "sending messages to nexus client"); + let sent1 = WebSocketMessage::Binary(vec![1, 2, 3, 42, 5]); + nexus_client_ws.send(sent1.clone()).await.unwrap(); + let sent2 = WebSocketMessage::Binary(vec![5, 42, 3, 2, 1]); + nexus_client_ws.send(sent2.clone()).await.unwrap(); + slog::info!( + logctx.log, + "messages sent, receiving them via propolis server" + ); + let received = + tokio::time::timeout(std::time::Duration::from_secs(10), async { + let mut buf = [0u8; 1024]; + let mut received = Vec::::new(); + while received.len() < 10 { + let bytes = + propolis_server_conn.read(&mut buf).await.unwrap(); + received.extend(&buf[..bytes]); + } + received + }) + .await + .expect("timed out receiving"); + assert_eq!(received, vec![1, 2, 3, 42, 5, 5, 42, 3, 2, 1]); + + slog::info!(logctx.log, "sending data to propolis server"); + let sent3 = vec![6, 7, 8, 90, 90, 8, 7, 6]; + propolis_server_conn.write_all(&sent3).await.unwrap(); + slog::info!(logctx.log, "data sent, receiving it via nexus client"); + let received3 = nexus_client_ws.next().await.unwrap().unwrap(); + assert_eq!(WebSocketMessage::Binary(sent3), received3); + + slog::info!(logctx.log, "sending close message to nexus client"); + let sent = WebSocketMessage::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: std::borrow::Cow::from("test done"), + })); + nexus_client_ws.send(sent.clone()).await.unwrap(); + slog::info!( + logctx.log, + "sent close message, waiting \ + 1s for proxy task to shut down" + ); + tokio::time::timeout(Duration::from_secs(1), jh) + .await + .expect("proxy task shut down within 1s") + .expect("task successfully completed") + .expect("proxy task exited successfully"); + logctx.cleanup_successful(); + } } diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index 551ef00817e..c423a65d367 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -175,6 +175,7 @@ pub(crate) fn external_api() -> NexusApiDescription { api.register(instance_disk_detach)?; api.register(instance_serial_console)?; api.register(instance_serial_console_stream)?; + api.register(instance_vnc)?; api.register(instance_ssh_public_key_list)?; api.register(image_list)?; @@ -2787,6 +2788,53 @@ async fn instance_serial_console_stream( } } +/// Stream instance VNC framebuffer +#[channel { + protocol = WEBSOCKETS, + path = "/v1/instances/{instance}/vnc", + tags = ["instances"], +}] +async fn instance_vnc( + rqctx: RequestContext>, + path_params: Path, + query_params: Query, + conn: WebsocketConnection, +) -> WebsocketChannelResult { + let apictx = rqctx.context(); + let nexus = &apictx.nexus; + let path = path_params.into_inner(); + let query = query_params.into_inner(); + let opctx = crate::context::op_context_for_external_api(&rqctx).await?; + let instance_selector = params::InstanceSelector { + project: query.project.clone(), + instance: path.instance, + }; + let mut client_stream = WebSocketStream::from_raw_socket( + conn.into_inner(), + WebSocketRole::Server, + None, + ) + .await; + match nexus.instance_lookup(&opctx, instance_selector) { + Ok(instance_lookup) => { + nexus + .instance_vnc_stream(&opctx, client_stream, &instance_lookup) + .await?; + Ok(()) + } + Err(e) => { + let _ = client_stream + .close(Some(CloseFrame { + code: CloseCode::Error, + reason: e.to_string().into(), + })) + .await + .is_ok(); + Err(e.into()) + } + } +} + /// List SSH public keys for instance /// /// List SSH public keys injected via cloud-init during instance creation. Note diff --git a/nexus/tests/output/nexus_tags.txt b/nexus/tests/output/nexus_tags.txt index 64413f396ea..7caf12a2693 100644 --- a/nexus/tests/output/nexus_tags.txt +++ b/nexus/tests/output/nexus_tags.txt @@ -64,6 +64,7 @@ instance_ssh_public_key_list GET /v1/instances/{instance}/ssh-p instance_start POST /v1/instances/{instance}/start instance_stop POST /v1/instances/{instance}/stop instance_view GET /v1/instances/{instance} +instance_vnc GET /v1/instances/{instance}/vnc API operations found with tag "login" OPERATION ID METHOD URL PATH diff --git a/openapi/nexus.json b/openapi/nexus.json index 7d236de7a33..6813b5e54e9 100644 --- a/openapi/nexus.json +++ b/openapi/nexus.json @@ -2676,6 +2676,45 @@ } } }, + "/v1/instances/{instance}/vnc": { + "get": { + "tags": [ + "instances" + ], + "summary": "Stream instance VNC framebuffer", + "operationId": "instance_vnc", + "parameters": [ + { + "in": "path", + "name": "instance", + "description": "Name or ID of the instance", + "required": true, + "schema": { + "$ref": "#/components/schemas/NameOrId" + } + }, + { + "in": "query", + "name": "project", + "description": "Name or ID of the project", + "schema": { + "$ref": "#/components/schemas/NameOrId" + } + } + ], + "responses": { + "default": { + "description": "", + "content": { + "*/*": { + "schema": {} + } + } + } + }, + "x-dropshot-websocket": {} + } + }, "/v1/ip-pools": { "get": { "tags": [