Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 73 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ serde_repr = {version = "0.1"}
serde_yaml = "0.9"
sha2 = {version="0.10", optional = true}
simple_logger = {version = "5", default-features=false}
sqlx = { version = "0.8", features = ["sqlite", "runtime-tokio", "chrono"], optional = true}
sqlx = { version = "0.8", features = ["sqlite", "runtime-tokio", "chrono"], optional = true, default-features = false}
sqlx-type = {version = "0.4", optional = true}
tempfile = {version="3", optional=true}
tokio = { version = "1", default-features = false, features=['rt', 'net', 'fs', 'sync', 'macros', 'time', 'process', 'signal', 'io-std', 'rt-multi-thread'] }
tokio-rustls = "0.26"
tokio-tasks = {version = "0.3", optional=true}
tokio-tasks = {version = "0.5", optional=true}
tokio-tungstenite = { version = "0.27", features=['rustls-tls-webpki-roots']}
tokio-util = {version = "0.7", features = ["io"], optional = true}
totp-rs= {version="5", features = ["otpauth"], optional = true}
Expand Down
36 changes: 19 additions & 17 deletions src/bin/sadmin/client_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use tokio::{
time::timeout,
};
use tokio_rustls::{TlsConnector, client::TlsStream, rustls};
use tokio_tasks::{CancelledError, RunToken, Task, TaskBase, TaskBuilder, cancelable};
use tokio_tasks::{
CancelledError, RunToken, Task, TaskBase, TaskBuilder, cancelable, set_location,
};

use sadmin2::client_message::{
ClientHostMessage, CommandSpawnMessage, DataMessage, DataSource, DeployServiceMessage,
Expand Down Expand Up @@ -216,9 +218,9 @@ impl Client {
}
cmd.stdin(Stdio::null());
cmd.kill_on_drop(true);
run_token.set_location(file!(), line!());
set_location!(run_token);
let output = cmd.output().await?;
run_token.set_location(file!(), line!());
set_location!(run_token);
if !output.status.success() {
let code = output
.status
Expand Down Expand Up @@ -263,7 +265,7 @@ impl Client {
) -> Result<()> {
debug!("Start instant command {}: {}", msg.id, msg.name);
let id = msg.id;
run_token.set_location(file!(), line!());
set_location!(run_token);
let m = match self.handle_run_instant_inner(&run_token, msg).await {
Ok(v) => v,
Err(e) => {
Expand All @@ -276,9 +278,9 @@ impl Client {
})
}
};
run_token.set_location(file!(), line!());
set_location!(run_token);
self.send_message(m).await;
run_token.set_location(file!(), line!());
set_location!(run_token);
self.command_tasks.lock().unwrap().remove(&id);
debug!("Finished instant command {id}");
Ok(())
Expand Down Expand Up @@ -1387,7 +1389,7 @@ impl Client {
let notifier = SdNotify::from_env().ok();
let mut first = true;
loop {
run_token.set_location(file!(), line!());
set_location!(run_token);
let mut read = match cancelable(
&run_token,
timeout(Duration::from_secs(60), self.connect_to_upstream()),
Expand All @@ -1406,12 +1408,12 @@ impl Client {
}
Ok(Ok(Err(e))) => {
info!("Error connecting to upstream: {e:?}");
if let Some(notifier) = &notifier {
if first {
notifier.notify_ready()?;
notifier.set_status("Disconnected".to_string())?;
first = false;
}
if let Some(notifier) = &notifier
&& first
{
notifier.notify_ready()?;
notifier.set_status("Disconnected".to_string())?;
first = false;
}

if cancelable(&run_token, tokio::time::sleep(Duration::from_millis(1234)))
Expand All @@ -1428,7 +1430,7 @@ impl Client {
}
Err(_) => return Ok(()),
};
run_token.set_location(file!(), line!());
set_location!(run_token);
info!("Connected to server");
let mut last_ping_time = Instant::now();
let mut buffer = BytesMut::with_capacity(40960);
Expand All @@ -1445,7 +1447,7 @@ impl Client {
let read = read.read_buf(&mut buffer);
let send_failure = self.send_failure_notify.notified();
let sleep = tokio::time::sleep(Duration::from_secs(120));
run_token.set_location(file!(), line!());
set_location!(run_token);
tokio::select! {
val = read => {
match val {
Expand Down Expand Up @@ -1498,7 +1500,7 @@ impl Client {
}
}
info!("Trying to take sender for disconnect");
run_token.set_location(file!(), line!());
set_location!(run_token);
{
let f = async {
loop {
Expand All @@ -1514,7 +1516,7 @@ impl Client {
() = f => {panic!()}
}
}
run_token.set_location(file!(), line!());
set_location!(run_token);
info!("Took sender for disconnect");
if let Some(notifier) = &notifier {
notifier.set_status("Disconnected".to_string())?;
Expand Down
76 changes: 49 additions & 27 deletions src/bin/sadmin/client_daemon_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,12 @@ impl RemoteLogTarget<'_> {
}
}

async fn run_script(name: String, src: &String, log: &mut RemoteLogTarget<'_>) -> Result<()> {
async fn run_script(
name: String,
src: &String,
log: &mut RemoteLogTarget<'_>,
envs: &[(&str, &str)],
) -> Result<()> {
let (first, _) = src
.split_once('\n')
.with_context(|| format!("Expected two lines in script {name}"))?;
Expand All @@ -471,13 +476,14 @@ async fn run_script(name: String, src: &String, log: &mut RemoteLogTarget<'_>) -
let mut f = tempfile::Builder::new().prefix(&name).tempfile()?;
f.write_all(src.as_bytes())?;
f.flush()?;
let result = forward_command(
tokio::process::Command::new(interperter).arg(f.path()),
&None,
log,
)
.await
.context("Failed running script")?;
let mut cmd = tokio::process::Command::new(interperter);
cmd.arg(f.path());
for (k, v) in envs {
cmd.env(k, v);
}
let result = forward_command(&mut cmd, &None, log)
.await
.context("Failed running script")?;
if !result.success() {
bail!("Error running script {}: {:?}", name, result);
}
Expand Down Expand Up @@ -1247,23 +1253,23 @@ impl Service {
}

// Enable linger if required
if let Some(user) = &user {
if desc.enable_linger == Some(true) {
forward_command(
tokio::process::Command::new("/usr/bin/loginctl")
.arg("enable-linger")
.arg(&user.name),
&None,
log,
if let Some(user) = &user
&& desc.enable_linger == Some(true)
{
forward_command(
tokio::process::Command::new("/usr/bin/loginctl")
.arg("enable-linger")
.arg(&user.name),
&None,
log,
)
.await
.with_context(|| {
format!(
"Failed running /usr/bin/loginctl enable-linger {}",
user.name
)
.await
.with_context(|| {
format!(
"Failed running /usr/bin/loginctl enable-linger {}",
user.name
)
})?;
}
})?;
}

let t = tempfile::TempDir::new()?;
Expand Down Expand Up @@ -1316,9 +1322,17 @@ impl Service {
}
}

let mut env = Vec::new();
for (k, v) in &extra_env {
env.push((k.as_str(), v.as_str()));
}
if let Some(image) = &image {
env.push(("image", image.as_str()));
}

// Run pre_deploy
for (idx, src) in desc.pre_deploy.iter().enumerate() {
run_script(format!("predeploy {idx}"), src, log)
run_script(format!("predeploy {idx}"), src, log, &env)
.await
.with_context(|| format!("Failed running predeploy script {idx}"))?;
}
Expand Down Expand Up @@ -1800,9 +1814,17 @@ It will be hard killed in {:?} if it does not stop before that. ",
.build(Box::new(cgroups_rs::hierarchies::V2::new()));
}

let mut script_env = Vec::new();
for (k, v) in extra_env {
script_env.push((k.as_str(), v.as_str()));
}
let image_for_env = image.clone();
if let Some(image) = &image_for_env {
script_env.push(("image", image.as_str()));
}
// Run run prestart
for (idx, src) in desc.pre_start.iter().enumerate() {
run_script(format!("prestart {idx}"), src, log).await?;
run_script(format!("prestart {idx}"), src, log, &script_env).await?;
}

let dir = format!("/run/simpleadmin/services/{}/{}", desc.name, instance_id);
Expand Down Expand Up @@ -2226,7 +2248,7 @@ It will be hard killed in {:?} if it does not stop before that. ",
} else {
src.clone()
};
run_script(format!("poststart {idx}"), &src, log).await?;
run_script(format!("poststart {idx}"), &src, log, &script_env).await?;
}

Ok((instance, status.into_inner().unwrap()))
Expand Down
8 changes: 4 additions & 4 deletions src/bin/sadmin/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ impl ConnectionSend2 {
self.response_handlers.lock().unwrap().insert(msg_id, s);
self.send.lock().await.send_message_str(m).await?;
let r = r.await.context("r failed")?;
if let IServerAction::Response(r) = &r {
if let Some(e) = &r.error {
bail!("Remote error: {}", e);
}
if let IServerAction::Response(r) = &r
&& let Some(e) = &r.error
{
bail!("Remote error: {}", e);
}
Ok(r)
}
Expand Down
16 changes: 8 additions & 8 deletions src/bin/sadmin/list_deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,15 @@ pub async fn list_deployments(config: Config, args: ListDeployments) -> Result<(
if !host_names.contains_key(&d.host) {
return false;
}
if let Some(image) = &args.image {
if &d.image != image {
return false;
}
if let Some(image) = &args.image
&& &d.image != image
{
return false;
}
if let Some(container) = &args.container {
if &d.name != container {
return false;
}
if let Some(container) = &args.container
&& &d.name != container
{
return false;
}
true
});
Expand Down
10 changes: 4 additions & 6 deletions src/bin/sadmin/list_images.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,10 @@ pub async fn list_images(config: Config, args: ListImages) -> Result<()> {
if let Some(image) = &args.image {
images.retain(|i| &i.image == image);
}
if full {
if let Some(tail) = args.tail.as_ref().and_then(|l| l.last().cloned()) {
images.reverse();
images.truncate(tail);
images.reverse();
}
if full && let Some(tail) = args.tail.as_ref().and_then(|l| l.last().cloned()) {
images.reverse();
images.truncate(tail);
images.reverse();
}

let mut stdout = std::io::stdout();
Expand Down
16 changes: 8 additions & 8 deletions src/bin/sadmin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ async fn get_secret(config: Config, args: GetSecret) -> Result<()> {
}))
.await?;
loop {
if let IServerAction::GetSecretRes(r) = con.recv().await? {
if r.name == args.secret {
if let Some(v) = r.value {
println!("{v}");
return Ok(());
} else {
bail!("Secret {} not found", args.secret);
}
if let IServerAction::GetSecretRes(r) = con.recv().await?
&& r.name == args.secret
{
if let Some(v) = r.value {
println!("{v}");
return Ok(());
} else {
bail!("Secret {} not found", args.secret);
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/bin/sadmin/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,10 @@ pub async fn proxy(config: Config, cmd: ProxySocket) -> Result<()> {
}
}
r = writer_shutdown_r.recv() => {
if let Some((socket_id, read_half)) = r {
if let Some(write_half) = socket_write_halfs.remove(&socket_id) {
if let Some((socket_id, read_half)) = r
&& let Some(write_half) = socket_write_halfs.remove(&socket_id) {
read_half.reunite(write_half)?.shutdown().await?;
}
}
}
_ = interval.tick() => {
send.ping().await?;
Expand Down
Loading
Loading