Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[[bin]] # Bin to run the engine
name = "event"
path = "src/components/event.rs"

[[bin]] # Bin to run the engine
name = "task"
path = "src/components/task.rs"

[[bin]] # Bin to run the engine
name = "cli"
path = "src/cli.rs"

[dependencies]
serde_json = { version = "1.0.104", features = ["preserve_order"] }
clap = { version = "4.3.5", features = ["derive"] }
Expand All @@ -24,4 +36,14 @@ diesel = { version = "2.1.0", features = ["chrono", "postgres"] }
diesel_migrations = { version = "2.1.0", features = ["postgres"] }
tracing = "0.1.37"
prettytable-rs = "^0.10.0"
pnet = "0.34.0"
pnet = "0.34.0"
tonic = "0.9.2"
prost = "0.11.9"
tokio = { version = "1.32", features = ["rt-multi-thread", "macros", "sync", "time"] }
# futures-util-preview = "0.2.2"
rand = "0.8.5"
async-stream = "0.3.5"
tokio-stream = "0.1.14"

[build-dependencies]
tonic-build = "0.9"
4 changes: 4 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/grpc.proto")?;
Ok(())
}
14 changes: 14 additions & 0 deletions proto/grpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";
package grpc;

service OutputStreaming {
rpc StreamOutput(OutputChunk) returns ( stream Response);
}

message OutputChunk {
string content = 1;
}

message Response {
string message = 1;
}
87 changes: 72 additions & 15 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, Error as AnyError, Result};
use anyhow::{anyhow, Error as AnyError, Ok, Result};
use clap::{Parser, Subcommand};
use diesel::{ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, SelectableHelper};
use dotenv::dotenv;
Expand All @@ -9,8 +9,7 @@ use std::env;
use std::fs::File;
use std::process::Command;
use tracing::field;
use workflow::engine::{create_new_engine_entry, handle_stop, run_event_process};
use workflow::engine::{run_task_process, update_engine_status};
use workflow::engine_utils::{create_new_engine_entry, handle_stop, update_engine_status};
use workflow::models::{Engine, EngineStatus, Event, Task};
use workflow::parser::process_yaml_file;
use workflow::utils::establish_pg_connection;
Expand Down Expand Up @@ -41,6 +40,10 @@ enum Commands {
StartEventProcess {
engine_uid: i32,
},
Logs {
#[clap(subcommand)]
subcommand: LogsSubcommands,
},
Stop {},
/// Adds workflow to the queue
Add {
Expand Down Expand Up @@ -96,6 +99,16 @@ enum ShowSubcommands {
Engine { uid: i32 },
}

#[derive(Subcommand)]
enum LogsSubcommands {
// Lists all tasks
Task { uid: i32 },
// Lists all events
Event { uid: i32 },
// Lists all engines
// Engine { uid: i32 },
}

#[derive(PartialEq)]
enum ProcessType {
Task,
Expand All @@ -109,7 +122,7 @@ fn create_and_clear_log_file(file_path: &str) -> Result<File, AnyError> {
}

fn start_process(
subcommand_name: &str,
binary_name: &str,
process_type: ProcessType,
engine_uid: i32,
) -> Result<(), AnyError> {
Expand Down Expand Up @@ -141,8 +154,8 @@ fn start_process(
}

let command = binding
.arg(subcommand_name)
.arg("--")
.arg("--bin")
.arg(binary_name)
.arg(engine_uid.to_string())
.stdout(stdout)
.stderr(stderr);
Expand All @@ -151,7 +164,7 @@ fn start_process(
Ok(())
}

pub fn cli() {
pub async fn cli() {
let cli = Cli::parse();

match &cli.command {
Expand All @@ -174,15 +187,22 @@ pub fn cli() {
}
Commands::StartEventProcess { engine_uid } => {
println!("StartEventProcess");
if let Err(e) = run_event_process(*engine_uid) {
println!("Failed to start event process, {}", e);
std::process::exit(1);
};
// if let Err(e) = run_event_process(*engine_uid) {
// println!("Failed to start event process, {}", e);
// std::process::exit(1);
// };
}
Commands::StartTaskProcess { engine_uid } => {
println!("StartTaskProcess");
if let Err(e) = run_task_process(*engine_uid) {
println!("Failed to start task process, {}", e);
// if let Err(e) = run_task_process(*engine_uid) {
// println!("Failed to start task process, {}", e);
// std::process::exit(1);
// };
}
Commands::Logs { subcommand } => {
println!("Logs");
if let Err(e) = process_log_command(subcommand).await {
println!("Failed to stop the engine, {}", e);
std::process::exit(1);
};
}
Expand Down Expand Up @@ -229,6 +249,36 @@ pub fn cli() {
std::process::exit(0);
}

async fn process_log_command(subcommand: &LogsSubcommands) -> Result<(), AnyError> {
match subcommand {
LogsSubcommands::Task { uid } => show_log(uid.to_string()).await?,
LogsSubcommands::Event { uid } => show_log(uid.to_string()).await?,
};
Ok(())
}
pub mod grpc {
tonic::include_proto!("grpc");
}
use grpc::output_streaming_client::OutputStreamingClient;
use grpc::{OutputChunk, Response as GrpcResponse};

use std::error::Error;
use tonic::transport::Channel;

async fn show_log(server_id: String) -> Result<(), AnyError> {
let mut client = OutputStreamingClient::connect("http://[::1]:10000").await?;

let mut stream = client
.stream_output(OutputChunk::default())
.await?
.into_inner();

while let Some(log_message) = stream.message().await? {
println!("NOTE = {:?}", log_message);
}
Ok(())
}

fn get_system_ip_address() -> Result<String, AnyError> {
// Get a vector with all network interfaces found
let all_interfaces = interfaces();
Expand Down Expand Up @@ -262,13 +312,13 @@ fn process_start_command() -> Result<(), AnyError> {
)?;
println!("created new engine entry with uid: {}", engine_uid);

if let Err(e) = start_process("start-event-process", ProcessType::Event, engine_uid) {
if let Err(e) = start_process("event", ProcessType::Event, engine_uid) {
eprintln!("Failed to start Event process: {}", e);
eprintln!("exiting...");
std::process::exit(1);
}

if let Err(e) = start_process("start-task-process", ProcessType::Task, engine_uid) {
if let Err(e) = start_process("task", ProcessType::Task, engine_uid) {
eprintln!("Failed to start Task process: {}", e);
eprintln!("exiting...");
std::process::exit(1);
Expand Down Expand Up @@ -416,6 +466,13 @@ fn list_items<T: serde::ser::Serialize>(items: Vec<T>) -> Result<(), AnyError> {
Ok(())
}

#[tokio::main]

async fn main() -> Result<(), AnyError> {
cli().await;
Ok(())
}

// fn is_redis_running() -> bool {
// let redis_result = create_redis_connection();
// if let Err(e) = redis_result {
Expand Down
89 changes: 82 additions & 7 deletions src/engine/event.rs → src/components/event.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,61 @@
use crate::models::{EventStatus, LightEvent, LightTask, ProcessStatus};
use crate::schema;
use crate::utils::{establish_pg_connection, push_tasks_to_queue};
use anyhow::Error as AnyError;
use diesel::prelude::*;
use std::path::Path;
use std::process::Command as ShellCommand;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{str, thread};
use std::{env, str, thread};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
use workflow::engine_utils::run_process;
use workflow::models::{EventStatus, LightEvent, LightTask, ProcessStatus};
use workflow::schema;
use workflow::utils::{establish_pg_connection, push_tasks_to_queue};

pub mod grpc {
tonic::include_proto!("grpc");
}
use grpc::output_streaming_server::{OutputStreaming, OutputStreamingServer};
use grpc::{OutputChunk, Response as GrpcResponse};

#[derive(Debug)]
pub struct OutputStreamer {
// an stdout pipe steam
features: Arc<Vec<GrpcResponse>>,
}

#[tonic::async_trait]
impl OutputStreaming for OutputStreamer {
type StreamOutputStream = ReceiverStream<Result<GrpcResponse, Status>>;

async fn stream_output(
&self,
request: Request<OutputChunk>,
) -> Result<Response<Self::StreamOutputStream>, Status> {
let (mut tx, rx) = mpsc::channel(4);
let features = self.features.clone();

// Spawn an async task to send the output data to the client

tokio::spawn(async move {
for feature in &features[..] {
tx.send(Ok(feature.clone())).await.unwrap();
}
});

Ok(Response::new(ReceiverStream::new(rx)))
}
}

pub fn poll_events(running: Arc<AtomicBool>, engine_uid: i32) -> Result<(), AnyError> {
let mut event_uids: Vec<i32> = Vec::new();
let pg_conn = &mut establish_pg_connection();

use crate::schema::engines::dsl::*;
use workflow::schema::engines::dsl::*;

diesel::update(engines)
.filter(uid.eq(engine_uid))
Expand Down Expand Up @@ -91,14 +132,14 @@ fn execute_event(event: LightEvent) -> Result<(), AnyError> {
.expect("failed to execute process");

// if shell command return 0, then the event was triggered successfully
use crate::schema::events::dsl::*;
use workflow::schema::events::dsl::*;
if output.status.code().unwrap() == 0 {
diesel::update(events.find(event.uid))
.set(status.eq(EventStatus::Succeeded.to_string()))
.execute(conn)?;

{
use crate::schema::tasks::dsl::*;
use workflow::schema::tasks::dsl::*;
let light_tasks: Vec<LightTask> = tasks
.select(LightTask::as_select())
.filter(event_uid.eq(event.uid))
Expand Down Expand Up @@ -132,3 +173,37 @@ fn execute_event(event: LightEvent) -> Result<(), AnyError> {
println!("##############################################");
Ok(())
}

pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> {
run_process("Event", poll_events, engine_uid)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
println!("args: {:?}", args);

let engine_uid = args[1].parse::<i32>().unwrap();
println!("engine_uid: {}", engine_uid);

tokio::spawn(async move {
if let Err(e) = run_event_process(engine_uid) {
println!("Failed to start event process, {}", e);
std::process::exit(1);
};
});

let addr = "[::1]:10001".parse().unwrap();

let stream = OutputStreamer {
features: Arc::new(vec![GrpcResponse {
message: "Hello from event".into(),
}]),
};

let svc = OutputStreamingServer::new(stream);

Server::builder().add_service(svc).serve(addr).await?;

Ok(())
}
Loading