diff --git a/cmd/mesg-cli/orchestrator.go b/cmd/mesg-cli/orchestrator.go index 9adea22ed..1d82ae23c 100644 --- a/cmd/mesg-cli/orchestrator.go +++ b/cmd/mesg-cli/orchestrator.go @@ -78,12 +78,15 @@ func startOrchestratorCmd(cdc *codec.Codec) *cobra.Command { return err } + // init cosmos store + store := cosmos.NewStore(rpc, logger) + // init event publisher - ep := publisher.New(rpc) + ep := publisher.New(store) // orchestrator logger.Info("Starting orchestrator") - orch := orchestrator.New(rpc, ep, logger) + orch := orchestrator.New(store, ep, logger) defer func() { logger.Info("Stopping orchestrator") orch.Stop() @@ -97,7 +100,7 @@ func startOrchestratorCmd(cdc *codec.Codec) *cobra.Command { // init gRPC server. logger.Info("Starting gRPC server") - server := grpc.New(rpc, ep, logger, viper.GetStringSlice(flagAuthorizedPubKeys)) + server := grpc.New(store, ep, logger, cdc, viper.GetStringSlice(flagAuthorizedPubKeys)) defer func() { logger.Info("Stopping gRPC server") server.Close() diff --git a/cosmos/store.go b/cosmos/store.go new file mode 100644 index 000000000..ea7871fa6 --- /dev/null +++ b/cosmos/store.go @@ -0,0 +1,380 @@ +package cosmos + +import ( + "context" + "fmt" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/ext/xstrings" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/instance" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/mesg-foundation/engine/runner" + "github.com/mesg-foundation/engine/service" + executionmodule "github.com/mesg-foundation/engine/x/execution" + instancemodule "github.com/mesg-foundation/engine/x/instance" + processmodule "github.com/mesg-foundation/engine/x/process" + runnermodule "github.com/mesg-foundation/engine/x/runner" + servicemodule "github.com/mesg-foundation/engine/x/service" + tmlog "github.com/tendermint/tendermint/libs/log" +) + +// Store is an implementation of the orchestrator.Store interface using cosmos rpc. +type Store struct { + rpc *RPC + logger tmlog.Logger +} + +// NewStore returns a new implementation of orchestrator.Store using cosmos rpc. +func NewStore(rpc *RPC, logger tmlog.Logger) *Store { + return &Store{ + rpc: rpc, + logger: logger, + } +} + +// FetchProcesses returns all processes. +func (s *Store) FetchProcesses(ctx context.Context) ([]*process.Process, error) { + var processes []*process.Process + route := fmt.Sprintf("custom/%s/%s", processmodule.QuerierRoute, processmodule.QueryList) + if err := s.rpc.QueryJSON(route, nil, &processes); err != nil { + return nil, err + } + return processes, nil +} + +// FetchExecution returns an execution from its hash. +func (s *Store) FetchExecution(ctx context.Context, hash hash.Hash) (*execution.Execution, error) { + var exec *execution.Execution + route := fmt.Sprintf("custom/%s/%s/%s", executionmodule.QuerierRoute, executionmodule.QueryGet, hash) + if err := s.rpc.QueryJSON(route, nil, &exec); err != nil { + return nil, err + } + return exec, nil +} + +// FetchService returns a service from its hash. +func (s *Store) FetchService(ctx context.Context, hash hash.Hash) (*service.Service, error) { + var srv *service.Service + route := fmt.Sprintf("custom/%s/%s/%s", servicemodule.QuerierRoute, servicemodule.QueryGet, hash) + if err := s.rpc.QueryJSON(route, nil, &srv); err != nil { + return nil, err + } + return srv, nil +} + +// FetchInstance returns an instance from its hash. +func (s *Store) FetchInstance(ctx context.Context, hash hash.Hash) (*instance.Instance, error) { + var inst *instance.Instance + route := fmt.Sprintf("custom/%s/%s/%s", instancemodule.QuerierRoute, instancemodule.QueryGet, hash) + if err := s.rpc.QueryJSON(route, nil, &inst); err != nil { + return nil, err + } + return inst, nil +} + +// FetchRunner returns a runner from its hash. +func (s *Store) FetchRunner(ctx context.Context, hash hash.Hash) (*runner.Runner, error) { + var run *runner.Runner + route := fmt.Sprintf("custom/%s/%s/%s", runnermodule.QuerierRoute, runnermodule.QueryGet, hash) + if err := s.rpc.QueryJSON(route, nil, &run); err != nil { + return nil, err + } + return run, nil +} + +// FetchRunners returns all runners of an instance. +func (s *Store) FetchRunners(ctx context.Context, instanceHash hash.Hash) ([]*runner.Runner, error) { + var runners []*runner.Runner + route := fmt.Sprintf("custom/%s/%s", runnermodule.QuerierRoute, runnermodule.QueryList) + if err := s.rpc.QueryJSON(route, nil, &runners); err != nil { + return nil, err + } + executors := make([]*runner.Runner, 0) + for _, run := range runners { + if run.InstanceHash.Equal(instanceHash) { + executors = append(executors, run) + } + } + return executors, nil +} + +// CreateExecution creates an execution. +func (s *Store) CreateExecution(ctx context.Context, taskKey string, inputs *types.Struct, tags []string, parentHash hash.Hash, eventHash hash.Hash, processHash hash.Hash, nodeKey string, executorHash hash.Hash) (hash.Hash, error) { + acc, err := s.rpc.GetAccount() + if err != nil { + return nil, err + } + msg := executionmodule.MsgCreate{ + Signer: acc.GetAddress(), + ProcessHash: processHash, + EventHash: eventHash, + ParentHash: parentHash, + NodeKey: nodeKey, + TaskKey: taskKey, + Inputs: inputs, + ExecutorHash: executorHash, + Tags: tags, + } + res, err := s.rpc.BuildAndBroadcastMsg(msg) + if err != nil { + return nil, err + } + return hash.DecodeFromBytes(res.Data) +} + +// UpdateExecution update an execution. +func (s *Store) UpdateExecution(ctx context.Context, execHash hash.Hash, start int64, stop int64, outputs *types.Struct, outputErr string) error { + acc, err := s.rpc.GetAccount() + if err != nil { + return err + } + msg := executionmodule.MsgUpdate{ + Executor: acc.GetAddress(), + Hash: execHash, + Start: start, + Stop: stop, + } + if outputs != nil { + msg.Result = &executionmodule.MsgUpdateOutputs{ + Outputs: outputs, + } + } else { + msg.Result = &executionmodule.MsgUpdateError{ + Error: outputErr, + } + } + if _, err := s.rpc.BuildAndBroadcastMsg(msg); err != nil { + return err + } + return nil +} + +// SubscribeToNewCompletedExecutions returns a chan that will contain newly completed execution. +func (s *Store) SubscribeToNewCompletedExecutions(ctx context.Context) (<-chan *execution.Execution, error) { + subscriber := xstrings.RandASCIILetters(8) + query := fmt.Sprintf("%s.%s EXISTS AND %s.%s='%s'", + executionmodule.EventType, executionmodule.AttributeKeyHash, + executionmodule.EventType, sdk.AttributeKeyAction, executionmodule.AttributeActionCompleted, + ) + eventStream, err := s.rpc.Subscribe(ctx, subscriber, query, 0) + if err != nil { + return nil, err + } + executionStream := make(chan *execution.Execution) + go func() { + loop: + for { + select { + case event := <-eventStream: + // get the index of the action=completed attributes + attrKeyActionCreated := fmt.Sprintf("%s.%s", executionmodule.EventType, sdk.AttributeKeyAction) + attrIndexes := make([]int, 0) + for index, attr := range event.Events[attrKeyActionCreated] { + if attr == executionmodule.AttributeActionCompleted { + attrIndexes = append(attrIndexes, index) + } + } + // iterate only on the index of attribute hash where action=completed + attrKeyHash := fmt.Sprintf("%s.%s", executionmodule.EventType, executionmodule.AttributeKeyHash) + for _, index := range attrIndexes { + attr := event.Events[attrKeyHash][index] + hash, err := hash.Decode(attr) + if err != nil { + s.logger.Error(err.Error()) + continue + } + var exec *execution.Execution + route := fmt.Sprintf("custom/%s/%s/%s", executionmodule.QuerierRoute, executionmodule.QueryGet, hash) + if err := s.rpc.QueryJSON(route, nil, &exec); err != nil { + s.logger.Error(err.Error()) + continue + } + executionStream <- exec + } + case <-ctx.Done(): + break loop + } + } + if err := s.rpc.Unsubscribe(context.Background(), subscriber, query); err != nil { + s.logger.Error(err.Error()) + } + }() + return executionStream, nil +} + +// SubscribeToExecutions returns a chan that will contain executions that have been created, updated, or anything. +func (s *Store) SubscribeToExecutions(ctx context.Context) (<-chan *execution.Execution, error) { + subscriber := xstrings.RandASCIILetters(8) + query := fmt.Sprintf("%s.%s EXISTS", executionmodule.EventType, executionmodule.AttributeKeyHash) + eventStream, err := s.rpc.Subscribe(ctx, subscriber, query, 0) + if err != nil { + return nil, err + } + executionStream := make(chan *execution.Execution) + // listen to event stream + go func() { + loop: + for { + select { + case event := <-eventStream: + attrHash := fmt.Sprintf("%s.%s", executionmodule.EventType, executionmodule.AttributeKeyHash) + attrs := event.Events[attrHash] + alreadySeeHashes := make(map[string]bool) + for _, attr := range attrs { + // skip already see hash. it deduplicate same execution in multiple event. + if alreadySeeHashes[attr] { + continue + } + alreadySeeHashes[attr] = true + hash, err := hash.Decode(attr) + if err != nil { + s.logger.Error(err.Error()) + continue + } + var exec *execution.Execution + route := fmt.Sprintf("custom/%s/%s/%s", executionmodule.QuerierRoute, executionmodule.QueryGet, hash) + if err := s.rpc.QueryJSON(route, nil, &exec); err != nil { + s.logger.Error(err.Error()) + continue + } + executionStream <- exec + } + case <-ctx.Done(): + break loop + } + } + if err := s.rpc.Unsubscribe(context.Background(), subscriber, query); err != nil { + s.logger.Error(err.Error()) + } + }() + return executionStream, nil +} + +// SubscribeToExecutionsForRunner returns a chan that will contain executions that a specific runner must execute. +func (s *Store) SubscribeToExecutionsForRunner(ctx context.Context, runnerHash hash.Hash) (<-chan *execution.Execution, error) { + subscriber := xstrings.RandASCIILetters(8) + query := fmt.Sprintf("%s.%s='%s' AND %s.%s='%s'", + executionmodule.EventType, executionmodule.AttributeKeyExecutor, runnerHash.String(), + executionmodule.EventType, sdk.AttributeKeyAction, executionmodule.AttributeActionCreated, + ) + eventStream, err := s.rpc.Subscribe(ctx, subscriber, query, 0) + if err != nil { + return nil, err + } + executionStream := make(chan *execution.Execution) + // listen to event stream + go func() { + loop: + for { + select { + case event := <-eventStream: + // get the index of the action=created attributes + attrKeyActionCreated := fmt.Sprintf("%s.%s", executionmodule.EventType, sdk.AttributeKeyAction) + attrIndexes := make([]int, 0) + for index, attr := range event.Events[attrKeyActionCreated] { + if attr == executionmodule.AttributeActionCreated { + attrIndexes = append(attrIndexes, index) + } + } + // iterate only on the index of attribute hash where action=created + attrKeyHash := fmt.Sprintf("%s.%s", executionmodule.EventType, executionmodule.AttributeKeyHash) + for _, index := range attrIndexes { + attr := event.Events[attrKeyHash][index] + hash, err := hash.Decode(attr) + if err != nil { + s.logger.Error(err.Error()) + continue + } + var exec *execution.Execution + route := fmt.Sprintf("custom/%s/%s/%s", executionmodule.QuerierRoute, executionmodule.QueryGet, hash) + if err := s.rpc.QueryJSON(route, nil, &exec); err != nil { + s.logger.Error(err.Error()) + continue + } + executionStream <- exec + } + case <-ctx.Done(): + break loop + } + } + if err := s.rpc.Unsubscribe(context.Background(), subscriber, query); err != nil { + s.logger.Error(err.Error()) + } + }() + return executionStream, nil +} + +// RegisterRunner registers a new or existing runner. +func (s *Store) RegisterRunner(ctx context.Context, serviceHash hash.Hash, envHash hash.Hash) (hash.Hash, error) { + // get engine account + acc, err := s.rpc.GetAccount() + if err != nil { + return nil, err + } + + // calculate runner hash + inst, err := instance.New(serviceHash, envHash) + if err != nil { + return nil, err + } + run, err := runner.New(acc.GetAddress().String(), inst.Hash) + if err != nil { + return nil, err + } + runnerHash := run.Hash + + // check that runner doesn't already exist + var runnerExist bool + route := fmt.Sprintf("custom/%s/%s/%s", runnermodule.QuerierRoute, runnermodule.QueryExist, runnerHash) + if err := s.rpc.QueryJSON(route, nil, &runnerExist); err != nil { + return nil, err + } + + // only broadcast if runner doesn't exist + if !runnerExist { + tx, err := s.rpc.BuildAndBroadcastMsg(runnermodule.MsgCreate{ + Owner: acc.GetAddress(), + ServiceHash: serviceHash, + EnvHash: envHash, + }) + if err != nil { + return nil, err + } + runnerHashCreated, err := hash.DecodeFromBytes(tx.Data) + if err != nil { + return nil, err + } + if !runnerHashCreated.Equal(runnerHash) { + // delete wrong runner + _, err := s.rpc.BuildAndBroadcastMsg(runnermodule.MsgDelete{ + Owner: acc.GetAddress(), + Hash: runnerHashCreated, + }) + if err != nil { + return nil, err + } + return nil, fmt.Errorf("runner hash created is not expected: got %q, expect %q", runnerHashCreated, runnerHash) + } + } + + return runnerHash, nil +} + +// DeleteRunner deletes an existing runner. +func (s *Store) DeleteRunner(ctx context.Context, runnerHash hash.Hash) error { + acc, err := s.rpc.GetAccount() + if err != nil { + return err + } + msg := runnermodule.MsgDelete{ + Owner: acc.GetAddress(), + Hash: runnerHash, + } + if _, err := s.rpc.BuildAndBroadcastMsg(msg); err != nil { + return err + } + return nil +} diff --git a/event/publisher/publisher.go b/event/publisher/publisher.go index d32eae446..f1193ef5b 100644 --- a/event/publisher/publisher.go +++ b/event/publisher/publisher.go @@ -1,19 +1,24 @@ package publisher import ( - "fmt" + "context" "github.com/cskr/pubsub" - "github.com/mesg-foundation/engine/cosmos" "github.com/mesg-foundation/engine/event" "github.com/mesg-foundation/engine/hash" "github.com/mesg-foundation/engine/instance" "github.com/mesg-foundation/engine/protobuf/types" "github.com/mesg-foundation/engine/service" - instancemodule "github.com/mesg-foundation/engine/x/instance" - servicemodule "github.com/mesg-foundation/engine/x/service" ) +// Store is the interface to implement to fetch data. +type Store interface { + // FetchService returns a service from its hash. + FetchService(ctx context.Context, hash hash.Hash) (*service.Service, error) + // FetchInstance returns an instance from its hash. + FetchInstance(ctx context.Context, hash hash.Hash) (*instance.Instance, error) +} + const ( // streamTopic is topic used to broadcast events. streamTopic = "event-stream" @@ -21,29 +26,27 @@ const ( // EventPublisher exposes event APIs of MESG. type EventPublisher struct { - ps *pubsub.PubSub - rpc *cosmos.RPC + ps *pubsub.PubSub + store Store } // New creates a new Event SDK with given options. -func New(rpc *cosmos.RPC) *EventPublisher { +func New(store Store) *EventPublisher { return &EventPublisher{ - ps: pubsub.New(0), - rpc: rpc, + ps: pubsub.New(0), + store: store, } } // Publish a MESG event eventKey with eventData for service token. func (ep *EventPublisher) Publish(instanceHash hash.Hash, eventKey string, eventData *types.Struct) (*event.Event, error) { - var i *instance.Instance - route := fmt.Sprintf("custom/%s/%s/%s", instancemodule.QuerierRoute, instancemodule.QueryGet, instanceHash) - if err := ep.rpc.QueryJSON(route, nil, &i); err != nil { + i, err := ep.store.FetchInstance(context.Background(), instanceHash) + if err != nil { return nil, err } - var s *service.Service - route = fmt.Sprintf("custom/%s/%s/%s", servicemodule.QuerierRoute, servicemodule.QueryGet, i.ServiceHash) - if err := ep.rpc.QueryJSON(route, nil, &s); err != nil { + s, err := ep.store.FetchService(context.Background(), i.ServiceHash) + if err != nil { return nil, err } diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 8ae7b0392..08887ea3e 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -6,20 +6,14 @@ import ( "fmt" "math/rand" - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/mesg-foundation/engine/cosmos" "github.com/mesg-foundation/engine/event" "github.com/mesg-foundation/engine/event/publisher" "github.com/mesg-foundation/engine/execution" "github.com/mesg-foundation/engine/ext/xrand" - "github.com/mesg-foundation/engine/ext/xstrings" "github.com/mesg-foundation/engine/hash" "github.com/mesg-foundation/engine/process" "github.com/mesg-foundation/engine/protobuf/types" "github.com/mesg-foundation/engine/runner" - executionmodule "github.com/mesg-foundation/engine/x/execution" - processmodule "github.com/mesg-foundation/engine/x/process" - runnermodule "github.com/mesg-foundation/engine/x/runner" tmlog "github.com/tendermint/tendermint/libs/log" ) @@ -27,23 +21,43 @@ func init() { xrand.SeedInit() } +// Store is an interface for fetching all data the orchestrator needs. +type Store interface { + // FetchProcesses returns all processes. + FetchProcesses(ctx context.Context) ([]*process.Process, error) + + // FetchExecution returns one execution from its hash. + FetchExecution(ctx context.Context, hash hash.Hash) (*execution.Execution, error) + + // FetchRunners returns all runners of an instance. + FetchRunners(ctx context.Context, instanceHash hash.Hash) ([]*runner.Runner, error) + + // CreateExecution creates an execution. + CreateExecution(ctx context.Context, taskKey string, inputs *types.Struct, tags []string, parentHash hash.Hash, eventHash hash.Hash, processHash hash.Hash, nodeKey string, executorHash hash.Hash) (hash.Hash, error) + + // SubscribeToNewCompletedExecutions returns a chan that will contain newly completed execution. + SubscribeToNewCompletedExecutions(ctx context.Context) (<-chan *execution.Execution, error) +} + // Orchestrator manages the executions based on the definition of the processes type Orchestrator struct { - rpc *cosmos.RPC - ep *publisher.EventPublisher + store Store + ep *publisher.EventPublisher + logger tmlog.Logger + eventStream *event.Listener - executionStream chan *execution.Execution + executionStream <-chan *execution.Execution stopC chan bool - logger tmlog.Logger } // New creates a new Process instance -func New(rpc *cosmos.RPC, ep *publisher.EventPublisher, logger tmlog.Logger) *Orchestrator { +func New(store Store, ep *publisher.EventPublisher, logger tmlog.Logger) *Orchestrator { return &Orchestrator{ - rpc: rpc, + store: store, ep: ep, - stopC: make(chan bool), logger: logger.With("module", "orchestrator"), + + stopC: make(chan bool), } } @@ -125,9 +139,8 @@ func (s *Orchestrator) findNodes(wf *process.Process, filter func(wf *process.Pr } func (s *Orchestrator) execute(filter func(wf *process.Process, node *process.Process_Node) (bool, error), exec *execution.Execution, event *event.Event, data *types.Struct) { - var processes []*process.Process - route := fmt.Sprintf("custom/%s/%s", processmodule.QuerierRoute, processmodule.QueryList) - if err := s.rpc.QueryJSON(route, nil, &processes); err != nil { + processes, err := s.store.FetchProcesses(context.Background()) + if err != nil { s.logger.Error(err.Error()) return } @@ -305,9 +318,8 @@ func (s *Orchestrator) resolveInput(wfHash hash.Hash, exec *execution.Execution, return path.Resolve(exec.Outputs) } // get parentExec and do a recursive call - var parentExec *execution.Execution - route := fmt.Sprintf("custom/%s/%s/%s", executionmodule.QuerierRoute, executionmodule.QueryGet, exec.ParentHash) - if err := s.rpc.QueryJSON(route, nil, &parentExec); err != nil { + parentExec, err := s.store.FetchExecution(context.Background(), exec.ParentHash) + if err != nil { return nil, err } return s.resolveInput(wfHash, parentExec, refNodeKey, path) @@ -322,97 +334,34 @@ func (s *Orchestrator) processTask(node *process.Process_Node, task *process.Pro if exec != nil { execHash = exec.Hash } - var runners []*runner.Runner - route := fmt.Sprintf("custom/%s/%s", runnermodule.QuerierRoute, runnermodule.QueryList) - if err := s.rpc.QueryJSON(route, nil, &runners); err != nil { + executors, err := s.store.FetchRunners(context.Background(), task.InstanceHash) + if err != nil { return nil, err } - executors := make([]*runner.Runner, 0) - for _, run := range runners { - if run.InstanceHash.Equal(task.InstanceHash) { - executors = append(executors, run) - } - } if len(executors) == 0 { return nil, fmt.Errorf("no runner is running instance %q", task.InstanceHash) } executor := executors[rand.Intn(len(executors))] // create execution - acc, err := s.rpc.GetAccount() - if err != nil { - return nil, err - } - msg := executionmodule.MsgCreate{ - Signer: acc.GetAddress(), - ProcessHash: wf.Hash, - EventHash: eventHash, - ParentHash: execHash, - NodeKey: node.Key, - TaskKey: task.TaskKey, - Inputs: data, - ExecutorHash: executor.Hash, - Tags: nil, - } - res, err := s.rpc.BuildAndBroadcastMsg(msg) - if err != nil { - return nil, err - } - return hash.DecodeFromBytes(res.Data) + return s.store.CreateExecution( + context.Background(), + task.TaskKey, + data, + nil, + execHash, + eventHash, + wf.Hash, + node.Key, + executor.Hash, + ) } // startExecutionStream returns execution that matches given hash. func (s *Orchestrator) startExecutionStream(ctx context.Context) error { - subscriber := xstrings.RandASCIILetters(8) - query := fmt.Sprintf("%s.%s EXISTS AND %s.%s='%s'", - executionmodule.EventType, executionmodule.AttributeKeyHash, - executionmodule.EventType, sdk.AttributeKeyAction, executionmodule.AttributeActionCompleted, - ) - eventStream, err := s.rpc.Subscribe(ctx, subscriber, query, 0) - if err != nil { - return err - } - - s.executionStream = make(chan *execution.Execution) - go func() { - loop: - for { - select { - case event := <-eventStream: - // get the index of the action=completed attributes - attrKeyActionCreated := fmt.Sprintf("%s.%s", executionmodule.EventType, sdk.AttributeKeyAction) - attrIndexes := make([]int, 0) - for index, attr := range event.Events[attrKeyActionCreated] { - if attr == executionmodule.AttributeActionCompleted { - attrIndexes = append(attrIndexes, index) - } - } - // iterate only on the index of attribute hash where action=completed - attrKeyHash := fmt.Sprintf("%s.%s", executionmodule.EventType, executionmodule.AttributeKeyHash) - for _, index := range attrIndexes { - attr := event.Events[attrKeyHash][index] - hash, err := hash.Decode(attr) - if err != nil { - s.logger.Error(err.Error()) - continue - } - var exec *execution.Execution - route := fmt.Sprintf("custom/%s/%s/%s", executionmodule.QuerierRoute, executionmodule.QueryGet, hash) - if err := s.rpc.QueryJSON(route, nil, &exec); err != nil { - s.logger.Error(err.Error()) - continue - } - s.executionStream <- exec - } - case <-ctx.Done(): - break loop - } - } - if err := s.rpc.Unsubscribe(context.Background(), subscriber, query); err != nil { - s.logger.Error(err.Error()) - } - }() - return nil + var err error + s.executionStream, err = s.store.SubscribeToNewCompletedExecutions(ctx) + return err } func keyvals(proc *process.Process, node *process.Process_Node, parentExec *execution.Execution, event *event.Event, data *types.Struct) []interface{} { diff --git a/orchestrator/orchestrator_event_task_test.go b/orchestrator/orchestrator_event_task_test.go new file mode 100644 index 000000000..5eb98194e --- /dev/null +++ b/orchestrator/orchestrator_event_task_test.go @@ -0,0 +1,84 @@ +package orchestrator + +import ( + "context" + "testing" + "time" + + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/stretchr/testify/require" +) + +func TestOrchestratorEventTask(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + orch, ep, store, _, testInstanceHash, _, _, execChan := newTestOrchestrator(ctx, t) + defer orch.Stop() + + var ( + testProcessHash hash.Hash + err error + ) + t.Run("create process", func(t *testing.T) { + testProcessHash, err = store.CreateProcess( + context.Background(), + "result-task-process", + []*process.Process_Node{ + { + Key: "n0", + Type: &process.Process_Node_Event_{ + Event: &process.Process_Node_Event{ + InstanceHash: testInstanceHash, + EventKey: "event_trigger", + }, + }, + }, + { + Key: "n1", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task1", + }, + }, + }, + }, []*process.Process_Edge{ + {Src: "n0", Dst: "n1"}, + }, + ) + require.NoError(t, err) + }) + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "foo_1", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("check created execution", func(t *testing.T) { + exec := <-execChan + require.NoError(t, err) + require.Equal(t, "task1", exec.TaskKey) + require.Equal(t, "n1", exec.NodeKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "foo_1", exec.Inputs.Fields["msg"].GetStringValue()) + }) +} diff --git a/orchestrator/orchestrator_filter_path_nested_test.go b/orchestrator/orchestrator_filter_path_nested_test.go new file mode 100644 index 000000000..119d01d4d --- /dev/null +++ b/orchestrator/orchestrator_filter_path_nested_test.go @@ -0,0 +1,254 @@ +package orchestrator + +import ( + "context" + "testing" + "time" + + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/stretchr/testify/require" +) + +func TestOrchestratorFilterPathNested(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + orch, ep, store, _, testInstanceHash, _, _, execChan := newTestOrchestrator(ctx, t) + defer orch.Stop() + + var ( + testProcessHash hash.Hash + err error + ) + t.Run("create process", func(t *testing.T) { + testProcessHash, err = store.CreateProcess( + context.Background(), + "filter", + []*process.Process_Node{ + { + Key: "n0", + Type: &process.Process_Node_Event_{ + Event: &process.Process_Node_Event{ + InstanceHash: testInstanceHash, + EventKey: "event_complex_trigger", + }, + }, + }, + { + Key: "n1", + Type: &process.Process_Node_Filter_{ + Filter: &process.Process_Node_Filter{ + Conditions: []process.Process_Node_Filter_Condition{ + { + Ref: &process.Process_Node_Reference{ + NodeKey: "n0", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + }, + }, + }, + Predicate: process.Process_Node_Filter_Condition_EQ, + Value: &types.Value{ + Kind: &types.Value_StringValue{ + StringValue: "shouldMatch", + }, + }, + }, + { + Ref: &process.Process_Node_Reference{ + NodeKey: "n0", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "timestamp", + }, + }, + }, + }, + Predicate: process.Process_Node_Filter_Condition_GT, + Value: &types.Value{ + Kind: &types.Value_NumberValue{ + NumberValue: 10, + }, + }, + }, + { + Ref: &process.Process_Node_Reference{ + NodeKey: "n0", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "array", + }, + }, + }, + }, + Predicate: process.Process_Node_Filter_Condition_EQ, + Value: &types.Value{ + Kind: &types.Value_ListValue{ + ListValue: &types.ListValue{ + Values: []*types.Value{ + { + Kind: &types.Value_StringValue{ + StringValue: "one", + }, + }, + { + Kind: &types.Value_StringValue{ + StringValue: "two", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Key: "n2", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task_complex", + }, + }, + }, + }, + []*process.Process_Edge{ + {Src: "n0", Dst: "n1"}, + {Src: "n1", Dst: "n2"}, + }, + ) + require.NoError(t, err) + }) + t.Run("pass filter", func(t *testing.T) { + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_complex_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StructValue{ + StructValue: &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "shouldMatch", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + "array": { + Kind: &types.Value_ListValue{ + ListValue: &types.ListValue{ + Values: []*types.Value{ + { + Kind: &types.Value_StringValue{ + StringValue: "one", + }, + }, + { + Kind: &types.Value_StringValue{ + StringValue: "two", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("check created execution", func(t *testing.T) { + exec := <-execChan + require.Equal(t, "task_complex", exec.TaskKey) + require.Equal(t, "n2", exec.NodeKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "shouldMatch", exec.Inputs.Fields["msg"].GetStructValue().Fields["msg"].GetStringValue()) + }) + }) + t.Run("stop at filter", func(t *testing.T) { + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_complex_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StructValue{ + StructValue: &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "shouldNotMatch", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + "array": { + Kind: &types.Value_ListValue{ + ListValue: &types.ListValue{ + Values: []*types.Value{ + { + Kind: &types.Value_StringValue{ + StringValue: "one", + }, + }, + { + Kind: &types.Value_StringValue{ + StringValue: "two", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("wait timeout to check execution is not created", func(t *testing.T) { + select { + case <-time.After(time.Second): + case <-execChan: + require.FailNow(t, "should timeout") + } + }) + }) +} diff --git a/orchestrator/orchestrator_filter_test.go b/orchestrator/orchestrator_filter_test.go new file mode 100644 index 000000000..9fa312671 --- /dev/null +++ b/orchestrator/orchestrator_filter_test.go @@ -0,0 +1,158 @@ +package orchestrator + +import ( + "context" + "testing" + "time" + + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/stretchr/testify/require" +) + +func TestOrchestratorFilterTask(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + orch, ep, store, _, testInstanceHash, _, _, execChan := newTestOrchestrator(ctx, t) + defer orch.Stop() + + var ( + testProcessHash hash.Hash + err error + ) + t.Run("create process", func(t *testing.T) { + testProcessHash, err = store.CreateProcess( + context.Background(), + "filter", + []*process.Process_Node{ + { + Key: "n0", + Type: &process.Process_Node_Event_{ + Event: &process.Process_Node_Event{ + InstanceHash: testInstanceHash, + EventKey: "event_trigger", + }, + }, + }, + { + Key: "n1", + Type: &process.Process_Node_Filter_{ + Filter: &process.Process_Node_Filter{ + Conditions: []process.Process_Node_Filter_Condition{ + { + Ref: &process.Process_Node_Reference{ + NodeKey: "n0", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + }, + }, + Predicate: process.Process_Node_Filter_Condition_EQ, + Value: &types.Value{ + Kind: &types.Value_StringValue{ + StringValue: "shouldMatch", + }, + }, + }, + { + Ref: &process.Process_Node_Reference{ + NodeKey: "n0", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "timestamp", + }, + }, + }, + Predicate: process.Process_Node_Filter_Condition_GT, + Value: &types.Value{ + Kind: &types.Value_NumberValue{ + NumberValue: 10, + }, + }, + }, + }, + }, + }, + }, + { + Key: "n2", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task1", + }, + }, + }, + }, + []*process.Process_Edge{ + {Src: "n0", Dst: "n1"}, + {Src: "n1", Dst: "n2"}, + }, + ) + require.NoError(t, err) + }) + t.Run("pass filter", func(t *testing.T) { + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "shouldMatch", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("check created execution", func(t *testing.T) { + exec := <-execChan + require.Equal(t, "task1", exec.TaskKey) + require.Equal(t, "n2", exec.NodeKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "shouldMatch", exec.Inputs.Fields["msg"].GetStringValue()) + }) + }) + t.Run("stop at filter", func(t *testing.T) { + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "shouldNotMatch", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("wait timeout to check execution is not created", func(t *testing.T) { + select { + case <-time.After(time.Second): + case <-execChan: + require.FailNow(t, "should timeout") + } + }) + }) +} diff --git a/orchestrator/orchestrator_map_const_test.go b/orchestrator/orchestrator_map_const_test.go new file mode 100644 index 000000000..ec37d54ee --- /dev/null +++ b/orchestrator/orchestrator_map_const_test.go @@ -0,0 +1,99 @@ +package orchestrator + +import ( + "context" + "testing" + "time" + + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/stretchr/testify/require" +) + +func TestOrchestratorMapConst(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + orch, ep, store, _, testInstanceHash, _, _, execChan := newTestOrchestrator(ctx, t) + defer orch.Stop() + + var ( + testProcessHash hash.Hash + err error + ) + t.Run("create process", func(t *testing.T) { + testProcessHash, err = store.CreateProcess( + context.Background(), + "map-const", + []*process.Process_Node{ + { + Key: "n0", + Type: &process.Process_Node_Event_{ + Event: &process.Process_Node_Event{ + InstanceHash: testInstanceHash, + EventKey: "event_trigger", + }, + }, + }, + { + Key: "n1", + Type: &process.Process_Node_Map_{ + Map: &process.Process_Node_Map{ + Outputs: map[string]*process.Process_Node_Map_Output{ + "msg": { + Value: &process.Process_Node_Map_Output_StringConst{ + StringConst: "itsAConstant", + }, + }, + }, + }, + }, + }, + { + Key: "n2", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task1", + }, + }, + }, + }, + []*process.Process_Edge{ + {Src: "n0", Dst: "n1"}, + {Src: "n1", Dst: "n2"}, + }, + ) + require.NoError(t, err) + }) + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "whatever", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("check created execution", func(t *testing.T) { + exec := <-execChan + require.Equal(t, "task1", exec.TaskKey) + require.Equal(t, "n2", exec.NodeKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "itsAConstant", exec.Inputs.Fields["msg"].GetStringValue()) + }) +} diff --git a/orchestrator/orchestrator_nested_data_test.go b/orchestrator/orchestrator_nested_data_test.go new file mode 100644 index 000000000..72681700c --- /dev/null +++ b/orchestrator/orchestrator_nested_data_test.go @@ -0,0 +1,105 @@ +package orchestrator + +import ( + "context" + "testing" + "time" + + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/stretchr/testify/require" +) + +func TestOrchestratorNestedData(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + orch, ep, store, _, testInstanceHash, _, _, execChan := newTestOrchestrator(ctx, t) + defer orch.Stop() + + var ( + testProcessHash hash.Hash + err error + ) + t.Run("create process", func(t *testing.T) { + testProcessHash, err = store.CreateProcess( + context.Background(), + "nested-data", + []*process.Process_Node{ + { + Key: "n0", + Type: &process.Process_Node_Event_{ + Event: &process.Process_Node_Event{ + InstanceHash: testInstanceHash, + EventKey: "event_complex_trigger", + }, + }, + }, + { + Key: "n1", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task_complex", + }, + }, + }, + }, + []*process.Process_Edge{ + {Src: "n0", Dst: "n1"}, + }, + ) + require.NoError(t, err) + }) + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_complex_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StructValue{ + StructValue: &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "complex", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + "array": { + Kind: &types.Value_ListValue{ + ListValue: &types.ListValue{Values: []*types.Value{ + {Kind: &types.Value_StringValue{StringValue: "first"}}, + {Kind: &types.Value_StringValue{StringValue: "second"}}, + {Kind: &types.Value_StringValue{StringValue: "third"}}, + }}, + }, + }, + }, + }, + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("check created execution", func(t *testing.T) { + exec := <-execChan + require.Equal(t, "task_complex", exec.TaskKey) + require.Equal(t, "n1", exec.NodeKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "complex", exec.Inputs.Fields["msg"].GetStructValue().Fields["msg"].GetStringValue()) + require.Len(t, exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values, 3) + require.Equal(t, "first", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[0].GetStringValue()) + require.Equal(t, "second", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[1].GetStringValue()) + require.Equal(t, "third", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[2].GetStringValue()) + }) +} diff --git a/orchestrator/orchestrator_nested_map_test.go b/orchestrator/orchestrator_nested_map_test.go new file mode 100644 index 000000000..c91aba2ee --- /dev/null +++ b/orchestrator/orchestrator_nested_map_test.go @@ -0,0 +1,132 @@ +package orchestrator + +import ( + "context" + "testing" + + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/stretchr/testify/require" +) + +func TestOrchestratorNestedMap(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + orch, ep, store, _, testInstanceHash, _, _, execChan := newTestOrchestrator(ctx, t) + defer orch.Stop() + + var ( + testProcessHash hash.Hash + err error + ) + t.Run("create process", func(t *testing.T) { + testProcessHash, err = store.CreateProcess( + context.Background(), + "nested-map", + []*process.Process_Node{ + { + Key: "n0", + Type: &process.Process_Node_Event_{ + Event: &process.Process_Node_Event{ + InstanceHash: testInstanceHash, + EventKey: "event_complex_trigger", + }, + }, + }, + { + Key: "n1", + Type: &process.Process_Node_Map_{ + Map: &process.Process_Node_Map{ + Outputs: map[string]*process.Process_Node_Map_Output{ + "msg": { + Value: &process.Process_Node_Map_Output_Map_{ + Map: &process.Process_Node_Map_Output_Map{Outputs: map[string]*process.Process_Node_Map_Output{ + "msg": {Value: &process.Process_Node_Map_Output_StringConst{ + StringConst: "isAConstant", + }}, + "array": {Value: &process.Process_Node_Map_Output_List_{ + List: &process.Process_Node_Map_Output_List{Outputs: []*process.Process_Node_Map_Output{ + {Value: &process.Process_Node_Map_Output_StringConst{StringConst: "first-constant"}}, + {Value: &process.Process_Node_Map_Output_StringConst{StringConst: "second-constant"}}, + {Value: &process.Process_Node_Map_Output_StringConst{StringConst: "third-constant"}}, + {Value: &process.Process_Node_Map_Output_StringConst{StringConst: "fourth-constant"}}, + }}, + }}, + }}, + }, + }, + }, + }, + }, + }, + { + Key: "n2", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task_complex", + }, + }, + }, + }, + []*process.Process_Edge{ + {Src: "n0", Dst: "n1"}, + {Src: "n1", Dst: "n2"}, + }, + ) + require.NoError(t, err) + }) + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_complex_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StructValue{ + StructValue: &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "complex", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: 101, + }, + }, + "array": { + Kind: &types.Value_ListValue{ + ListValue: &types.ListValue{Values: []*types.Value{ + {Kind: &types.Value_StringValue{StringValue: "first"}}, + {Kind: &types.Value_StringValue{StringValue: "second"}}, + {Kind: &types.Value_StringValue{StringValue: "third"}}, + }}, + }, + }, + }, + }, + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("check created execution", func(t *testing.T) { + exec := <-execChan + require.Equal(t, "task_complex", exec.TaskKey) + require.Equal(t, "n2", exec.NodeKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "isAConstant", exec.Inputs.Fields["msg"].GetStructValue().Fields["msg"].GetStringValue()) + require.Len(t, exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values, 4) + require.Equal(t, "first-constant", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[0].GetStringValue()) + require.Equal(t, "second-constant", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[1].GetStringValue()) + require.Equal(t, "third-constant", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[2].GetStringValue()) + require.Equal(t, "fourth-constant", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[3].GetStringValue()) + }) +} diff --git a/orchestrator/orchestrator_ref_grand_parent_task_test.go b/orchestrator/orchestrator_ref_grand_parent_task_test.go new file mode 100644 index 000000000..db37b0c53 --- /dev/null +++ b/orchestrator/orchestrator_ref_grand_parent_task_test.go @@ -0,0 +1,232 @@ +package orchestrator + +import ( + "context" + "testing" + "time" + + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/stretchr/testify/require" +) + +func TestOrchestratorRefGrandParentTask(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + orch, ep, store, _, testInstanceHash, _, _, execChan := newTestOrchestrator(ctx, t) + defer orch.Stop() + + var ( + testProcessHash hash.Hash + err error + ) + t.Run("create process", func(t *testing.T) { + testProcessHash, err = store.CreateProcess( + context.Background(), + "nested-map", + []*process.Process_Node{ + { + Key: "n0", + Type: &process.Process_Node_Event_{ + Event: &process.Process_Node_Event{ + InstanceHash: testInstanceHash, + EventKey: "event_trigger", + }, + }, + }, + { + Key: "n1", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task1", + }, + }, + }, + { + Key: "n2", + Type: &process.Process_Node_Map_{ + Map: &process.Process_Node_Map{ + Outputs: map[string]*process.Process_Node_Map_Output{ + "msg": { + Value: &process.Process_Node_Map_Output_StringConst{ + StringConst: "itsAConstant", + }, + }, + }, + }, + }, + }, + { + Key: "n3", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task1", + }, + }, + }, + { + Key: "n4", + Type: &process.Process_Node_Map_{ + Map: &process.Process_Node_Map{ + Outputs: map[string]*process.Process_Node_Map_Output{ + "msg": { + Value: &process.Process_Node_Map_Output_Ref{ + Ref: &process.Process_Node_Reference{ + NodeKey: "n1", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Key: "n5", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task1", + }, + }, + }, + }, + []*process.Process_Edge{ + {Src: "n0", Dst: "n1"}, + {Src: "n1", Dst: "n2"}, + {Src: "n2", Dst: "n3"}, + {Src: "n3", Dst: "n4"}, + {Src: "n4", Dst: "n5"}, + }, + ) + require.NoError(t, err) + }) + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "foo_event", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("check first task", func(t *testing.T) { + var exec *execution.Execution + t.Run("check created execution", func(t *testing.T) { + exec = <-execChan + require.Equal(t, "n1", exec.NodeKey) + require.Equal(t, "task1", exec.TaskKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "foo_event", exec.Inputs.Fields["msg"].GetStringValue()) + }) + t.Run("update exec", func(t *testing.T) { + err := store.UpdateExecution( + context.Background(), + exec.Hash, + time.Now().UnixNano(), + time.Now().UnixNano(), + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "foo_event", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + }, + }, + "", + ) + require.NoError(t, err) + }) + t.Run("check completed execution", func(t *testing.T) { + exec := <-execChan + require.Equal(t, "n1", exec.NodeKey) + require.Equal(t, "task1", exec.TaskKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_Completed, exec.Status) + require.Equal(t, "foo_event", exec.Outputs.Fields["msg"].GetStringValue()) + require.NotEmpty(t, exec.Outputs.Fields["timestamp"].GetNumberValue()) + }) + }) + t.Run("check second task", func(t *testing.T) { + var exec *execution.Execution + t.Run("check created execution", func(t *testing.T) { + exec = <-execChan + require.Equal(t, "n3", exec.NodeKey) + require.Equal(t, "task1", exec.TaskKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "itsAConstant", exec.Inputs.Fields["msg"].GetStringValue()) + }) + t.Run("update exec", func(t *testing.T) { + err := store.UpdateExecution( + context.Background(), + exec.Hash, + time.Now().UnixNano(), + time.Now().UnixNano(), + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "itsAConstant", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + }, + }, + "", + ) + require.NoError(t, err) + }) + t.Run("check completed execution", func(t *testing.T) { + exec := <-execChan + require.Equal(t, "n3", exec.NodeKey) + require.Equal(t, "task1", exec.TaskKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_Completed, exec.Status) + require.Equal(t, "itsAConstant", exec.Outputs.Fields["msg"].GetStringValue()) + require.NotEmpty(t, exec.Outputs.Fields["timestamp"].GetNumberValue()) + }) + }) + t.Run("check third task", func(t *testing.T) { + var exec *execution.Execution + t.Run("check created execution", func(t *testing.T) { + exec = <-execChan + require.Equal(t, "n5", exec.NodeKey) + require.Equal(t, "task1", exec.TaskKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "foo_event", exec.Inputs.Fields["msg"].GetStringValue()) + }) + }) +} diff --git a/orchestrator/orchestrator_ref_path_nested_test.go b/orchestrator/orchestrator_ref_path_nested_test.go new file mode 100644 index 000000000..adf902dd2 --- /dev/null +++ b/orchestrator/orchestrator_ref_path_nested_test.go @@ -0,0 +1,319 @@ +package orchestrator + +import ( + "context" + "testing" + "time" + + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/stretchr/testify/require" +) + +func TestOrchestratorRefPathNested(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + orch, ep, store, _, testInstanceHash, _, _, execChan := newTestOrchestrator(ctx, t) + defer orch.Stop() + + var ( + testProcessHash hash.Hash + err error + ) + t.Run("create process", func(t *testing.T) { + testProcessHash, err = store.CreateProcess( + context.Background(), + "nested-path-data", + []*process.Process_Node{ + { + Key: "n0", + Type: &process.Process_Node_Event_{ + Event: &process.Process_Node_Event{ + InstanceHash: testInstanceHash, + EventKey: "event_complex_trigger", + }, + }, + }, + { + Key: "n1", + Type: &process.Process_Node_Map_{ + Map: &process.Process_Node_Map{ + Outputs: map[string]*process.Process_Node_Map_Output{ + "msg": { + Value: &process.Process_Node_Map_Output_Map_{ + Map: &process.Process_Node_Map_Output_Map{ + Outputs: map[string]*process.Process_Node_Map_Output{ + "msg": { + Value: &process.Process_Node_Map_Output_Ref{ + Ref: &process.Process_Node_Reference{ + NodeKey: "n0", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + }, + }, + }, + }, + }, + "array": { + Value: &process.Process_Node_Map_Output_List_{ + List: &process.Process_Node_Map_Output_List{ + Outputs: []*process.Process_Node_Map_Output{ + { + Value: &process.Process_Node_Map_Output_Ref{ + Ref: &process.Process_Node_Reference{ + NodeKey: "n0", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "array", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Index{ + Index: 2, + }, + }, + }, + }, + }, + }, + }, + { + Value: &process.Process_Node_Map_Output_Ref{ + Ref: &process.Process_Node_Reference{ + NodeKey: "n0", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "array", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Index{ + Index: 1, + }, + }, + }, + }, + }, + }, + }, + { + Value: &process.Process_Node_Map_Output_Ref{ + Ref: &process.Process_Node_Reference{ + NodeKey: "n0", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "array", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Index{ + Index: 0, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Key: "n2", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task_complex", + }, + }, + }, + { + Key: "n3", + Type: &process.Process_Node_Map_{ + Map: &process.Process_Node_Map{ + Outputs: map[string]*process.Process_Node_Map_Output{ + "msg": { + Value: &process.Process_Node_Map_Output_Ref{ + Ref: &process.Process_Node_Reference{ + NodeKey: "n2", + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + Path: &process.Process_Node_Reference_Path{ + Selector: &process.Process_Node_Reference_Path_Key{ + Key: "msg", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Key: "n4", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task1", + }, + }, + }, + }, + []*process.Process_Edge{ + {Src: "n0", Dst: "n1"}, + {Src: "n1", Dst: "n2"}, + {Src: "n2", Dst: "n3"}, + {Src: "n3", Dst: "n4"}, + }, + ) + require.NoError(t, err) + }) + t.Run("trigger process", func(t *testing.T) { + _, err := ep.Publish( + testInstanceHash, + "event_complex_trigger", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StructValue{ + StructValue: &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "complex", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + "array": { + Kind: &types.Value_ListValue{ + ListValue: &types.ListValue{Values: []*types.Value{ + {Kind: &types.Value_StringValue{StringValue: "first"}}, + {Kind: &types.Value_StringValue{StringValue: "second"}}, + {Kind: &types.Value_StringValue{StringValue: "third"}}, + }}, + }, + }, + }, + }, + }, + }, + }, + }, + ) + require.NoError(t, err) + }) + t.Run("first ref", func(t *testing.T) { + var exec *execution.Execution + t.Run("check created execution", func(t *testing.T) { + exec = <-execChan + require.Equal(t, "task_complex", exec.TaskKey) + require.Equal(t, "n2", exec.NodeKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "complex", exec.Inputs.Fields["msg"].GetStructValue().Fields["msg"].GetStringValue()) + require.Len(t, exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values, 3) + require.Equal(t, "third", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[0].GetStringValue()) + require.Equal(t, "second", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[1].GetStringValue()) + require.Equal(t, "first", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[2].GetStringValue()) + }) + t.Run("update exec", func(t *testing.T) { + err := store.UpdateExecution( + context.Background(), + exec.Hash, + time.Now().UnixNano(), + time.Now().UnixNano(), + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StructValue{ + StructValue: &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "complex", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + "array": { + Kind: &types.Value_ListValue{ + ListValue: &types.ListValue{Values: []*types.Value{ + {Kind: &types.Value_StringValue{StringValue: "third"}}, + {Kind: &types.Value_StringValue{StringValue: "second"}}, + {Kind: &types.Value_StringValue{StringValue: "first"}}, + }}, + }, + }, + }, + }, + }, + }, + }, + }, + "", + ) + require.NoError(t, err) + }) + t.Run("check completed execution", func(t *testing.T) { + exec := <-execChan + require.Equal(t, "task_complex", exec.TaskKey) + require.Equal(t, "n2", exec.NodeKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_Completed, exec.Status) + require.Equal(t, "complex", exec.Outputs.Fields["msg"].GetStructValue().Fields["msg"].GetStringValue()) + require.Len(t, exec.Outputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values, 3) + require.Equal(t, "third", exec.Outputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[0].GetStringValue()) + require.Equal(t, "second", exec.Outputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[1].GetStringValue()) + require.Equal(t, "first", exec.Outputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[2].GetStringValue()) + require.NotEmpty(t, exec.Outputs.Fields["msg"].GetStructValue().Fields["timestamp"].GetNumberValue()) + }) + }) + t.Run("second ref", func(t *testing.T) { + var exec *execution.Execution + t.Run("check created execution", func(t *testing.T) { + exec = <-execChan + require.Equal(t, "task1", exec.TaskKey) + require.Equal(t, "n4", exec.NodeKey) + require.True(t, testProcessHash.Equal(exec.ProcessHash)) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "complex", exec.Inputs.Fields["msg"].GetStringValue()) + }) + }) +} diff --git a/orchestrator/orchestrator_result_task_test.go b/orchestrator/orchestrator_result_task_test.go new file mode 100644 index 000000000..60a82b582 --- /dev/null +++ b/orchestrator/orchestrator_result_task_test.go @@ -0,0 +1,141 @@ +package orchestrator + +import ( + "context" + "testing" + "time" + + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/stretchr/testify/require" +) + +func TestOrchestratorResultTask(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + orch, _, store, _, testInstanceHash, testRunnerHash, _, execChan := newTestOrchestrator(ctx, t) + defer orch.Stop() + + var ( + testProcessHash hash.Hash + triggerExecHash hash.Hash + err error + ) + t.Run("create process", func(t *testing.T) { + testProcessHash, err = store.CreateProcess( + context.Background(), + "result-task-process", + []*process.Process_Node{ + { + Key: "n0", + Type: &process.Process_Node_Result_{ + Result: &process.Process_Node_Result{ + InstanceHash: testInstanceHash, + TaskKey: "task1", + }, + }, + }, + { + Key: "n1", + Type: &process.Process_Node_Task_{ + Task: &process.Process_Node_Task{ + InstanceHash: testInstanceHash, + TaskKey: "task2", + }, + }, + }, + }, []*process.Process_Edge{ + {Src: "n0", Dst: "n1"}, + }, + ) + require.NoError(t, err) + }) + t.Run("trigger process", func(t *testing.T) { + eventHash, err := hash.Random() + require.NoError(t, err) + triggerExecHash, err = store.CreateExecution( + context.Background(), + "task1", + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "foo_2", + }, + }, + }, + }, + nil, + nil, + eventHash, + testProcessHash, + "", + testRunnerHash, + ) + require.NoError(t, err) + }) + t.Run("check trigger process execution", func(t *testing.T) { + t.Run("in progress", func(t *testing.T) { + exec := <-execChan + require.NoError(t, err) + require.Equal(t, triggerExecHash, exec.Hash) + require.Equal(t, "task1", exec.TaskKey) + require.Equal(t, "", exec.NodeKey) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.True(t, exec.Inputs.Equal(&types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "foo_2", + }, + }, + }, + })) + }) + t.Run("update exec", func(t *testing.T) { + err := store.UpdateExecution( + context.Background(), + triggerExecHash, + time.Now().UnixNano(), + time.Now().UnixNano(), + &types.Struct{ + Fields: map[string]*types.Value{ + "msg": { + Kind: &types.Value_StringValue{ + StringValue: "foo_2", + }, + }, + "timestamp": { + Kind: &types.Value_NumberValue{ + NumberValue: float64(time.Now().Unix()), + }, + }, + }, + }, + "", + ) + require.NoError(t, err) + }) + t.Run("completed", func(t *testing.T) { + exec := <-execChan + require.NoError(t, err) + require.Equal(t, triggerExecHash, exec.Hash) + require.Equal(t, "task1", exec.TaskKey) + require.Equal(t, "", exec.NodeKey) + require.Equal(t, execution.Status_Completed, exec.Status) + require.Equal(t, "foo_2", exec.Outputs.Fields["msg"].GetStringValue()) + require.NotEmpty(t, exec.Outputs.Fields["timestamp"].GetNumberValue()) + }) + }) + t.Run("check created execution", func(t *testing.T) { + exec := <-execChan + require.NoError(t, err) + require.Equal(t, "task2", exec.TaskKey) + require.Equal(t, "n1", exec.NodeKey) + require.Equal(t, testProcessHash, exec.ProcessHash) + require.Equal(t, execution.Status_InProgress, exec.Status) + require.Equal(t, "foo_2", exec.Inputs.Fields["msg"].GetStringValue()) + }) +} diff --git a/orchestrator/orchestrator_srv_test.go b/orchestrator/orchestrator_srv_test.go new file mode 100644 index 000000000..1b42521b6 --- /dev/null +++ b/orchestrator/orchestrator_srv_test.go @@ -0,0 +1,221 @@ +package orchestrator + +import ( + "context" + + sdktypes "github.com/cosmos/cosmos-sdk/types" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/service" +) + +func createTestService(store *storeTest) (hash.Hash, error) { + taskPrice := &service.Service_Task_Price{ + PerCall: sdktypes.NewInt(0), + PerKB: sdktypes.NewInt(0), + PerSec: sdktypes.NewInt(0), + } + task1Price := &service.Service_Task_Price{ + PerCall: sdktypes.NewInt(1000), + PerKB: sdktypes.NewInt(1000), + PerSec: sdktypes.NewInt(30000), + } + return store.CreateService(context.Background(), + "test-service", + "test-service", + "", + service.Service_Configuration{ + Env: []string{"FOO=1", "BAR=2", "REQUIRED"}, + }, []*service.Service_Task{ + { + Key: "task_trigger", + Price: taskPrice, + Inputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + }, + Outputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + { + Key: "timestamp", + Type: "Number", + }, + }, + }, + { + Key: "task1", + Price: task1Price, + Inputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + }, + Outputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + { + Key: "timestamp", + Type: "Number", + }, + }, + }, + { + Key: "task2", + Price: taskPrice, + Inputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + }, + Outputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + { + Key: "timestamp", + Type: "Number", + }, + }, + }, + { + Key: "task_complex", + Price: taskPrice, + Inputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "Object", + Object: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + { + Key: "array", + Type: "String", + Repeated: true, + Optional: true, + }, + }, + }, + }, + Outputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "Object", + Object: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + { + Key: "timestamp", + Type: "Number", + }, + { + Key: "array", + Type: "String", + Repeated: true, + Optional: true, + }, + }, + }, + }, + }, + { + Key: "task_complex_trigger", + Price: taskPrice, + Inputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "Object", + Object: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + { + Key: "array", + Type: "String", + Repeated: true, + Optional: true, + }, + }, + }, + }, + Outputs: []*service.Service_Parameter{ + { + Key: "msg", + Type: "Object", + Object: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + { + Key: "timestamp", + Type: "Number", + }, + { + Key: "array", + Type: "String", + Repeated: true, + Optional: true, + }, + }, + }, + }, + }}, []*service.Service_Event{ + { + Key: "service_ready", + }, + { + Key: "event_trigger", + Data: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + { + Key: "timestamp", + Type: "Number", + }, + }, + }, + { + Key: "event_complex_trigger", + Data: []*service.Service_Parameter{ + { + Key: "msg", + Type: "Object", + Object: []*service.Service_Parameter{ + { + Key: "msg", + Type: "String", + }, + { + Key: "timestamp", + Type: "Number", + }, + { + Key: "array", + Type: "String", + Repeated: true, + }, + }, + }, + }, + }}, + nil, + "", + "NOT_NEEDED", + ) +} diff --git a/orchestrator/orchestrator_test.go b/orchestrator/orchestrator_test.go index f260cc3a8..eee700e87 100644 --- a/orchestrator/orchestrator_test.go +++ b/orchestrator/orchestrator_test.go @@ -1,5 +1,65 @@ package orchestrator +import ( + "context" + "os" + "testing" + + "github.com/mesg-foundation/engine/event/publisher" + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/instance" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" +) + +func newTestOrchestrator(ctx context.Context, t *testing.T) ( + orch *Orchestrator, + ep *publisher.EventPublisher, + store *storeTest, + serviceHash hash.Hash, + instanceHash hash.Hash, + runnerHash hash.Hash, + instanceEnvHash hash.Hash, + execChan <-chan *execution.Execution, +) { + var err error + logger := log.NewTMJSONLogger(log.NewSyncWriter(os.Stdout)) + store, err = newStoreTest() + require.NoError(t, err) + ep = publisher.New(store) + orch = New(store, ep, logger) + + go func() { + require.NoError(t, orch.Start()) + }() + + t.Run("create service", func(t *testing.T) { + serviceHash, err = createTestService(store) + require.NoError(t, err) + }) + + t.Run("create runner", func(t *testing.T) { + instanceEnvHash, err = hash.Random() + require.NoError(t, err) + runnerHash, err = store.RegisterRunner(context.Background(), serviceHash, instanceEnvHash) + require.NoError(t, err) + }) + + t.Run("generate instance hash", func(t *testing.T) { + inst, err := instance.New(serviceHash, instanceEnvHash) + require.NoError(t, err) + instanceHash = inst.Hash + }) + + t.Run("init execution subscriber", func(t *testing.T) { + execChan, err = store.SubscribeToExecutions(ctx) + require.NoError(t, err) + }) + + return +} + // XXX: comment test because they were using sdk mocks. // we don't have sdk now so for now keeping it commented // TODO: add them later. diff --git a/orchestrator/store_test.go b/orchestrator/store_test.go new file mode 100644 index 000000000..a268cdd13 --- /dev/null +++ b/orchestrator/store_test.go @@ -0,0 +1,287 @@ +package orchestrator + +import ( + "context" + "errors" + "fmt" + + sdktypes "github.com/cosmos/cosmos-sdk/types" + "github.com/cskr/pubsub" + "github.com/mesg-foundation/engine/cosmos" + "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/instance" + "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/types" + "github.com/mesg-foundation/engine/runner" + "github.com/mesg-foundation/engine/service" +) + +const ( + pubsubExecTopic = "exec" +) + +type storeTest struct { + processes []*process.Process + executions []*execution.Execution + services []*service.Service + instances []*instance.Instance + runners []*runner.Runner + + pubsub *pubsub.PubSub + runnerOwner string + procPaymentAddress sdktypes.AccAddress +} + +func init() { + cosmos.InitConfig() +} + +func newStoreTest() (*storeTest, error) { + procPaymentAddress, err := sdktypes.AccAddressFromBech32("mesg1t9h20sn3lk2jdnak5eea4lkqxkpwyfaadtqk4t") + if err != nil { + return nil, err + } + return &storeTest{ + processes: make([]*process.Process, 0), + executions: make([]*execution.Execution, 0), + services: make([]*service.Service, 0), + instances: make([]*instance.Instance, 0), + runners: make([]*runner.Runner, 0), + + pubsub: pubsub.New(0), + runnerOwner: "mesg1s6mqusxaq93d70jeekqehg7aepwt7zs306ctq7", + procPaymentAddress: procPaymentAddress, + }, nil +} + +func (s *storeTest) CreateProcess(ctx context.Context, name string, nodes []*process.Process_Node, edges []*process.Process_Edge) (hash.Hash, error) { + proc, err := process.New(name, nodes, edges, s.procPaymentAddress) + if err != nil { + return nil, err + } + s.processes = append(s.processes, proc) + return proc.Hash, nil +} + +func (s *storeTest) CreateService(ctx context.Context, sid, name, description string, configuration service.Service_Configuration, tasks []*service.Service_Task, events []*service.Service_Event, dependencies []*service.Service_Dependency, repository, source string) (hash.Hash, error) { + srv, err := service.New(sid, name, description, configuration, tasks, events, dependencies, repository, source) + if err != nil { + return nil, err + } + s.services = append(s.services, srv) + return srv.Hash, nil +} + +func (s *storeTest) FetchProcesses(ctx context.Context) ([]*process.Process, error) { + return s.processes, nil +} + +// FetchExecution returns an execution from its hash. +func (s *storeTest) FetchExecution(ctx context.Context, hash hash.Hash) (*execution.Execution, error) { + for _, exec := range s.executions { + if exec.Hash.Equal(hash) { + return exec, nil + } + } + return nil, fmt.Errorf("execution %q not found", hash) +} + +// FetchService returns a service from its hash. +func (s *storeTest) FetchService(ctx context.Context, hash hash.Hash) (*service.Service, error) { + for _, srv := range s.services { + if srv.Hash.Equal(hash) { + return srv, nil + } + } + return nil, fmt.Errorf("service %q not found", hash) +} + +// FetchInstance returns an instance from its hash. +func (s *storeTest) FetchInstance(ctx context.Context, hash hash.Hash) (*instance.Instance, error) { + for _, inst := range s.instances { + if inst.Hash.Equal(hash) { + return inst, nil + } + } + return nil, fmt.Errorf("instance %q not found", hash) +} + +// FetchRunner returns a runner from its hash. +func (s *storeTest) FetchRunner(ctx context.Context, hash hash.Hash) (*runner.Runner, error) { + for _, run := range s.runners { + if run.Hash.Equal(hash) { + return run, nil + } + } + return nil, fmt.Errorf("runner %q not found", hash) +} + +// FetchRunners returns all runners of an instance. +func (s *storeTest) FetchRunners(ctx context.Context, instanceHash hash.Hash) ([]*runner.Runner, error) { + executors := make([]*runner.Runner, 0) + for _, run := range s.runners { + if run.InstanceHash.Equal(instanceHash) { + executors = append(executors, run) + } + } + return executors, nil +} + +// CreateExecution creates an execution. +func (s *storeTest) CreateExecution(ctx context.Context, taskKey string, inputs *types.Struct, tags []string, parentHash hash.Hash, eventHash hash.Hash, processHash hash.Hash, nodeKey string, executorHash hash.Hash) (hash.Hash, error) { + run, err := s.FetchRunner(ctx, executorHash) + if err != nil { + return nil, err + } + exec, err := execution.New( + processHash, + run.InstanceHash, + parentHash, + eventHash, + nodeKey, + taskKey, + inputs, + tags, + executorHash, + ) + if err != nil { + return nil, err + } + if execExist, _ := s.FetchExecution(ctx, exec.Hash); execExist != nil { + return nil, fmt.Errorf("execution %q already exists", exec.Hash) + } + if err := exec.Execute(); err != nil { + return nil, err + } + s.executions = append(s.executions, exec) + s.pubsub.Pub(exec, pubsubExecTopic) + return exec.Hash, nil +} + +// UpdateExecution update an execution. +func (s *storeTest) UpdateExecution(ctx context.Context, execHash hash.Hash, start int64, stop int64, outputs *types.Struct, outputErr string) error { + exec, err := s.FetchExecution(ctx, execHash) + if err != nil { + return err + } + exec.Start = start + exec.Stop = stop + exec.Price = "10" + if outputs != nil { + if err := exec.Complete(outputs); err != nil { + return err + } + } else { + if err := exec.Fail(errors.New(outputErr)); err != nil { + return err + } + } + s.pubsub.Pub(exec, pubsubExecTopic) + return nil +} + +// SubscribeToNewCompletedExecutions returns a chan that will contain newly completed execution. +func (s *storeTest) SubscribeToNewCompletedExecutions(ctx context.Context) (<-chan *execution.Execution, error) { + execChan := make(chan *execution.Execution) + c := s.pubsub.Sub(pubsubExecTopic) + go func() { + for { + select { + case v := <-c: + if exec, ok := v.(*execution.Execution); ok { + if exec.Status == execution.Status_Completed { + execChan <- exec + } + } + case <-ctx.Done(): + s.pubsub.Unsub(c, pubsubExecTopic) + close(execChan) + return + } + } + }() + return execChan, nil +} + +// SubscribeToExecutions returns a chan that will contain executions that have been created, updated, or anything. +func (s *storeTest) SubscribeToExecutions(ctx context.Context) (<-chan *execution.Execution, error) { + execChan := make(chan *execution.Execution) + c := s.pubsub.Sub(pubsubExecTopic) + go func() { + for { + select { + case v := <-c: + if exec, ok := v.(*execution.Execution); ok { + execChan <- exec + } + case <-ctx.Done(): + s.pubsub.Unsub(c, pubsubExecTopic) + close(execChan) + return + } + } + }() + return execChan, nil +} + +// SubscribeToExecutionsForRunner returns a chan that will contain executions that a specific runner must execute. +func (s *storeTest) SubscribeToExecutionsForRunner(ctx context.Context, runnerHash hash.Hash) (<-chan *execution.Execution, error) { + execChan := make(chan *execution.Execution) + c := s.pubsub.Sub(pubsubExecTopic) + go func() { + for { + select { + case v := <-c: + if exec, ok := v.(*execution.Execution); ok { + if exec.Status == execution.Status_InProgress && exec.ExecutorHash.Equal(runnerHash) { + execChan <- exec + } + } + case <-ctx.Done(): + s.pubsub.Unsub(c, pubsubExecTopic) + close(execChan) + return + } + } + }() + return execChan, nil + +} + +// RegisterRunner registers a new or existing runner. +func (s *storeTest) RegisterRunner(ctx context.Context, serviceHash hash.Hash, envHash hash.Hash) (hash.Hash, error) { + inst, err := instance.New(serviceHash, envHash) + if err != nil { + return nil, err + } + if instExist, _ := s.FetchInstance(ctx, inst.Hash); instExist == nil { + s.instances = append(s.instances, inst) + } + + run, err := runner.New(s.runnerOwner, inst.Hash) + if err != nil { + return nil, err + } + if runExist, _ := s.FetchRunner(ctx, run.Hash); runExist != nil { + return nil, fmt.Errorf("runner %q already exists", run.Hash) + } + s.runners = append(s.runners, run) + return run.Hash, nil +} + +// DeleteRunner deletes an existing runner. +func (s *storeTest) DeleteRunner(ctx context.Context, runnerHash hash.Hash) error { + index := -1 + for i, run := range s.runners { + if run.Hash.Equal(runnerHash) { + index = i + break + } + } + if index == -1 { + return fmt.Errorf("runner %q not found", runnerHash) + } + s.runners = append(s.runners[:index], s.runners[index+1:]...) + return nil +} diff --git a/server/grpc/orchestrator/execution.go b/server/grpc/orchestrator/execution.go index aa9eef220..fd9a9fdbc 100644 --- a/server/grpc/orchestrator/execution.go +++ b/server/grpc/orchestrator/execution.go @@ -2,26 +2,33 @@ package orchestrator import ( "context" - fmt "fmt" - "github.com/mesg-foundation/engine/cosmos" "github.com/mesg-foundation/engine/execution" "github.com/mesg-foundation/engine/ext/xstrings" "github.com/mesg-foundation/engine/hash" "github.com/mesg-foundation/engine/protobuf/acknowledgement" - executionmodule "github.com/mesg-foundation/engine/x/execution" + types "github.com/mesg-foundation/engine/protobuf/types" ) +// executionStore is the interface to implement to fetch data. +type executionStore interface { + // CreateExecution creates an execution. + CreateExecution(ctx context.Context, taskKey string, inputs *types.Struct, tags []string, parentHash hash.Hash, eventHash hash.Hash, processHash hash.Hash, nodeKey string, executorHash hash.Hash) (hash.Hash, error) + + // SubscribeToExecutions returns a chan that will contain executions that have been created, updated, or anything. + SubscribeToExecutions(ctx context.Context) (<-chan *execution.Execution, error) +} + type executionServer struct { - rpc *cosmos.RPC - auth *Authorizer + store executionStore + auth *Authorizer } // NewExecutionServer creates a new Execution Server. -func NewExecutionServer(rpc *cosmos.RPC, auth *Authorizer) ExecutionServer { +func NewExecutionServer(store executionStore, auth *Authorizer) ExecutionServer { return &executionServer{ - rpc: rpc, - auth: auth, + store: store, + auth: auth, } } @@ -33,28 +40,26 @@ func (s *executionServer) Create(ctx context.Context, req *ExecutionCreateReques } // create execution - acc, err := s.rpc.GetAccount() - if err != nil { - return nil, err - } eventHash, err := hash.Random() if err != nil { return nil, err } - msg := executionmodule.MsgCreate{ - Signer: acc.GetAddress(), - EventHash: eventHash, - ExecutorHash: req.ExecutorHash, - Inputs: req.Inputs, - Tags: req.Tags, - TaskKey: req.TaskKey, - } - tx, err := s.rpc.BuildAndBroadcastMsg(msg) + execHash, err := s.store.CreateExecution( + ctx, + req.TaskKey, + req.Inputs, + req.Tags, + nil, + eventHash, + nil, + "", + req.ExecutorHash, + ) if err != nil { return nil, err } return &ExecutionCreateResponse{ - Hash: tx.Data, + Hash: execHash, }, nil } @@ -65,13 +70,10 @@ func (s *executionServer) Stream(req *ExecutionStreamRequest, stream Execution_S return err } - // create rpc event stream + // create event stream ctx, cancel := context.WithCancel(stream.Context()) defer cancel() - subscriber := xstrings.RandASCIILetters(8) - query := fmt.Sprintf("%s.%s EXISTS", executionmodule.EventType, executionmodule.AttributeKeyHash) - eventStream, err := s.rpc.Subscribe(ctx, subscriber, query, 0) - defer s.rpc.Unsubscribe(context.Background(), subscriber, query) + executionStream, err := s.store.SubscribeToExecutions(ctx) if err != nil { return err } @@ -82,31 +84,12 @@ func (s *executionServer) Stream(req *ExecutionStreamRequest, stream Execution_S // listen to event stream for { select { - case event := <-eventStream: - attrHash := fmt.Sprintf("%s.%s", executionmodule.EventType, executionmodule.AttributeKeyHash) - attrs := event.Events[attrHash] - alreadySeeHashes := make(map[string]bool) - for _, attr := range attrs { - // skip already see hash. it deduplicate same execution in multiple event. - if alreadySeeHashes[attr] { - continue - } - alreadySeeHashes[attr] = true - hash, err := hash.Decode(attr) - if err != nil { - return err - } - var exec *execution.Execution - route := fmt.Sprintf("custom/%s/%s/%s", executionmodule.QuerierRoute, executionmodule.QueryGet, hash) - if err := s.rpc.QueryJSON(route, nil, &exec); err != nil { - return err - } - if !req.Filter.Match(exec) { - continue - } - if err := stream.Send(exec); err != nil { - return err - } + case exec := <-executionStream: + if !req.Filter.Match(exec) { + continue + } + if err := stream.Send(exec); err != nil { + return err } case <-ctx.Done(): return ctx.Err() diff --git a/server/grpc/orchestrator/runner.go b/server/grpc/orchestrator/runner.go index 3a96260e7..4371863c4 100644 --- a/server/grpc/orchestrator/runner.go +++ b/server/grpc/orchestrator/runner.go @@ -2,26 +2,29 @@ package orchestrator import ( "context" - fmt "fmt" "sync" - "github.com/mesg-foundation/engine/cosmos" "github.com/mesg-foundation/engine/hash" - "github.com/mesg-foundation/engine/instance" - "github.com/mesg-foundation/engine/runner" - runnermodule "github.com/mesg-foundation/engine/x/runner" ) +type runnerStore interface { + // RegisterRunner registers a new or existing runner. + RegisterRunner(ctx context.Context, serviceHash hash.Hash, envHash hash.Hash) (hash.Hash, error) + + // DeleteRunner deletes an existing runner. + DeleteRunner(ctx context.Context, runnerHash hash.Hash) error +} + type runnerServer struct { - rpc *cosmos.RPC + store runnerStore tokenToRunnerHash *sync.Map auth *Authorizer } // NewRunnerServer creates a new Runner Server. -func NewRunnerServer(rpc *cosmos.RPC, tokenToRunnerHash *sync.Map, auth *Authorizer) RunnerServer { +func NewRunnerServer(store runnerStore, tokenToRunnerHash *sync.Map, auth *Authorizer) RunnerServer { return &runnerServer{ - rpc: rpc, + store: store, tokenToRunnerHash: tokenToRunnerHash, auth: auth, } @@ -34,57 +37,12 @@ func (s *runnerServer) Register(ctx context.Context, req *RunnerRegisterRequest) return nil, err } - // get engine account - acc, err := s.rpc.GetAccount() + // register runner + runnerHash, err := s.store.RegisterRunner(ctx, req.ServiceHash, req.EnvHash) if err != nil { return nil, err } - // calculate runner hash - inst, err := instance.New(req.ServiceHash, req.EnvHash) - if err != nil { - return nil, err - } - run, err := runner.New(acc.GetAddress().String(), inst.Hash) - if err != nil { - return nil, err - } - runnerHash := run.Hash - - // check that runner doesn't already exist - var runnerExist bool - route := fmt.Sprintf("custom/%s/%s/%s", runnermodule.QuerierRoute, runnermodule.QueryExist, runnerHash) - if err := s.rpc.QueryJSON(route, nil, &runnerExist); err != nil { - return nil, err - } - - // only broadcast if runner doesn't exist - if !runnerExist { - tx, err := s.rpc.BuildAndBroadcastMsg(runnermodule.MsgCreate{ - Owner: acc.GetAddress(), - ServiceHash: req.ServiceHash, - EnvHash: req.EnvHash, - }) - if err != nil { - return nil, err - } - runnerHashCreated, err := hash.DecodeFromBytes(tx.Data) - if err != nil { - return nil, err - } - if !runnerHashCreated.Equal(runnerHash) { - // delete wrong runner - _, err := s.rpc.BuildAndBroadcastMsg(runnermodule.MsgDelete{ - Owner: acc.GetAddress(), - Hash: runnerHashCreated, - }) - if err != nil { - return nil, err - } - return nil, fmt.Errorf("runner hash created is not expected: got %q, expect %q", runnerHashCreated, runnerHash) - } - } - // delete any other token corresponding to runnerHash s.tokenToRunnerHash.Range(func(key, value interface{}) bool { savedRunnerHash := value.(hash.Hash) @@ -115,16 +73,8 @@ func (s *runnerServer) Delete(ctx context.Context, req *RunnerDeleteRequest) (*R return nil, err } - // create execution - acc, err := s.rpc.GetAccount() - if err != nil { - return nil, err - } - msg := runnermodule.MsgDelete{ - Owner: acc.GetAddress(), - Hash: req.RunnerHash, - } - if _, err := s.rpc.BuildAndBroadcastMsg(msg); err != nil { + // delete runner + if err := s.store.DeleteRunner(ctx, req.RunnerHash); err != nil { return nil, err } return &RunnerDeleteResponse{}, nil diff --git a/server/grpc/runner/runner.go b/server/grpc/runner/runner.go index 2e1db000f..e000da200 100644 --- a/server/grpc/runner/runner.go +++ b/server/grpc/runner/runner.go @@ -6,26 +6,37 @@ import ( "sync" "time" - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/mesg-foundation/engine/cosmos" "github.com/mesg-foundation/engine/event/publisher" "github.com/mesg-foundation/engine/execution" - "github.com/mesg-foundation/engine/ext/xstrings" "github.com/mesg-foundation/engine/hash" "github.com/mesg-foundation/engine/protobuf/acknowledgement" + types "github.com/mesg-foundation/engine/protobuf/types" "github.com/mesg-foundation/engine/runner" - executionmodule "github.com/mesg-foundation/engine/x/execution" - runnermodule "github.com/mesg-foundation/engine/x/runner" tmlog "github.com/tendermint/tendermint/libs/log" "google.golang.org/grpc/metadata" ) +// Store is the interface to implement to fetch data. +type Store interface { + // SubscribeToExecutionsForRunner returns a chan that will contain executions that a specific runner must execute. + SubscribeToExecutionsForRunner(ctx context.Context, runnerHash hash.Hash) (<-chan *execution.Execution, error) + + // FetchExecution returns one execution from its hash. + FetchExecution(ctx context.Context, hash hash.Hash) (*execution.Execution, error) + + // UpdateExecution update an execution. + UpdateExecution(ctx context.Context, execHash hash.Hash, start int64, stop int64, outputs *types.Struct, err string) error + + // FetchRunner returns a runner from its hash. + FetchRunner(ctx context.Context, hash hash.Hash) (*runner.Runner, error) +} + // CredentialToken is the name to use in the gRPC metadata to set and read the credential token. const CredentialToken = "mesg_credential_token" // Server is the type to aggregate all Runner APIs. type Server struct { - rpc *cosmos.RPC + store Store eventPublisher *publisher.EventPublisher tokenToRunnerHash *sync.Map execInProgress *sync.Map @@ -33,9 +44,9 @@ type Server struct { } // NewServer creates a new Server. -func NewServer(rpc *cosmos.RPC, eventPublisher *publisher.EventPublisher, tokenToRunnerHash *sync.Map, logger tmlog.Logger) *Server { +func NewServer(store Store, eventPublisher *publisher.EventPublisher, tokenToRunnerHash *sync.Map, logger tmlog.Logger) *Server { return &Server{ - rpc: rpc, + store: store, eventPublisher: eventPublisher, tokenToRunnerHash: tokenToRunnerHash, execInProgress: &sync.Map{}, @@ -68,16 +79,10 @@ func (s *Server) Execution(req *ExecutionRequest, stream Runner_ExecutionServer) return err } - // create rpc event stream + // create event stream ctx, cancel := context.WithCancel(stream.Context()) defer cancel() - subscriber := xstrings.RandASCIILetters(8) - query := fmt.Sprintf("%s.%s='%s' AND %s.%s='%s'", - executionmodule.EventType, executionmodule.AttributeKeyExecutor, runnerHash.String(), - executionmodule.EventType, sdk.AttributeKeyAction, executionmodule.AttributeActionCreated, - ) - eventStream, err := s.rpc.Subscribe(ctx, subscriber, query, 0) - defer s.rpc.Unsubscribe(context.Background(), subscriber, query) + executionStream, err := s.store.SubscribeToExecutionsForRunner(ctx, runnerHash) if err != nil { return err } @@ -88,32 +93,10 @@ func (s *Server) Execution(req *ExecutionRequest, stream Runner_ExecutionServer) // listen to event stream for { select { - case event := <-eventStream: - // get the index of the action=created attributes - attrKeyActionCreated := fmt.Sprintf("%s.%s", executionmodule.EventType, sdk.AttributeKeyAction) - attrIndexes := make([]int, 0) - for index, attr := range event.Events[attrKeyActionCreated] { - if attr == executionmodule.AttributeActionCreated { - attrIndexes = append(attrIndexes, index) - } - } - // iterate only on the index of attribute hash where action=created - attrKeyHash := fmt.Sprintf("%s.%s", executionmodule.EventType, executionmodule.AttributeKeyHash) - for _, index := range attrIndexes { - attr := event.Events[attrKeyHash][index] - hash, err := hash.Decode(attr) - if err != nil { - return err - } - var exec *execution.Execution - route := fmt.Sprintf("custom/%s/%s/%s", executionmodule.QuerierRoute, executionmodule.QueryGet, hash) - if err := s.rpc.QueryJSON(route, nil, &exec); err != nil { - return err - } - s.execInProgress.Store(hash.String(), time.Now().UnixNano()) - if err := stream.Send(exec); err != nil { - return err - } + case exec := <-executionStream: + s.execInProgress.Store(exec.Hash.String(), time.Now().UnixNano()) + if err := stream.Send(exec); err != nil { + return err } case <-ctx.Done(): return ctx.Err() @@ -132,9 +115,8 @@ func (s *Server) Result(ctx context.Context, req *ResultRequest) (*ResultRespons } // make sure runner is allowed to update this execution - var exec *execution.Execution - route := fmt.Sprintf("custom/%s/%s/%s", executionmodule.QuerierRoute, executionmodule.QueryGet, req.ExecutionHash) - if err := s.rpc.QueryJSON(route, nil, &exec); err != nil { + exec, err := s.store.FetchExecution(ctx, req.ExecutionHash) + if err != nil { return nil, err } if !exec.ExecutorHash.Equal(runnerHash) { @@ -142,32 +124,19 @@ func (s *Server) Result(ctx context.Context, req *ResultRequest) (*ResultRespons } // update execution - acc, err := s.rpc.GetAccount() - if err != nil { - return nil, err - } start, ok := s.execInProgress.Load(req.ExecutionHash.String()) if !ok { s.logger.Error(fmt.Sprintf("execution %q should be in memory", req.ExecutionHash.String())) start = time.Now().UnixNano() } - msg := executionmodule.MsgUpdate{ - Executor: acc.GetAddress(), - Hash: req.ExecutionHash, - Start: start.(int64), - Stop: time.Now().UnixNano(), - } - switch result := req.Result.(type) { - case *ResultRequest_Outputs: - msg.Result = &executionmodule.MsgUpdateOutputs{ - Outputs: result.Outputs, - } - case *ResultRequest_Error: - msg.Result = &executionmodule.MsgUpdateError{ - Error: result.Error, - } - } - if _, err := s.rpc.BuildAndBroadcastMsg(msg); err != nil { + if err := s.store.UpdateExecution( + ctx, + req.ExecutionHash, + start.(int64), + time.Now().UnixNano(), + req.GetOutputs(), + req.GetError(), + ); err != nil { return nil, err } s.execInProgress.Delete(req.ExecutionHash.String()) @@ -183,11 +152,7 @@ func (s *Server) Event(ctx context.Context, req *EventRequest) (*EventResponse, } // get runner to access instance hash - var run *runner.Runner - route := fmt.Sprintf("custom/%s/%s/%s", runnermodule.QuerierRoute, runnermodule.QueryGet, runnerHash) - if err := s.rpc.QueryJSON(route, nil, &run); err != nil { - return nil, err - } + run, err := s.store.FetchRunner(ctx, runnerHash) // publish event if _, err := s.eventPublisher.Publish(run.InstanceHash, req.Key, req.Data); err != nil { diff --git a/server/grpc/server.go b/server/grpc/server.go index 3579bfd6b..94c125b02 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -6,36 +6,77 @@ import ( "sync" "time" + "github.com/cosmos/cosmos-sdk/codec" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_kit "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/mesg-foundation/engine/cosmos" "github.com/mesg-foundation/engine/event/publisher" + "github.com/mesg-foundation/engine/execution" "github.com/mesg-foundation/engine/ext/xvalidator" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/instance" + types "github.com/mesg-foundation/engine/protobuf/types" + "github.com/mesg-foundation/engine/runner" "github.com/mesg-foundation/engine/server/grpc/orchestrator" - "github.com/mesg-foundation/engine/server/grpc/runner" + grpcrunner "github.com/mesg-foundation/engine/server/grpc/runner" + "github.com/mesg-foundation/engine/service" tmlog "github.com/tendermint/tendermint/libs/log" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" ) +// Store is the interface to implement to fetch data. +type Store interface { + // FetchService returns a service from its hash. + FetchService(ctx context.Context, hash hash.Hash) (*service.Service, error) + + // FetchInstance returns an instance from its hash. + FetchInstance(ctx context.Context, hash hash.Hash) (*instance.Instance, error) + + // CreateExecution creates an execution. + CreateExecution(ctx context.Context, taskKey string, inputs *types.Struct, tags []string, parentHash hash.Hash, eventHash hash.Hash, processHash hash.Hash, nodeKey string, executorHash hash.Hash) (hash.Hash, error) + + // SubscribeToExecutions returns a chan that will contain executions that have been created, updated, or anything. + SubscribeToExecutions(ctx context.Context) (<-chan *execution.Execution, error) + + // SubscribeToExecutionsForRunner returns a chan that will contain executions that a specific runner must execute. + SubscribeToExecutionsForRunner(ctx context.Context, runnerHash hash.Hash) (<-chan *execution.Execution, error) + + // FetchExecution returns one execution from its hash. + FetchExecution(ctx context.Context, hash hash.Hash) (*execution.Execution, error) + + // UpdateExecution update an execution. + UpdateExecution(ctx context.Context, execHash hash.Hash, start int64, stop int64, outputs *types.Struct, err string) error + + // FetchRunner returns a runner from its hash. + FetchRunner(ctx context.Context, hash hash.Hash) (*runner.Runner, error) + + // RegisterRunner registers a new or existing runner. + RegisterRunner(ctx context.Context, serviceHash hash.Hash, envHash hash.Hash) (hash.Hash, error) + + // DeleteRunner deletes an existing runner. + DeleteRunner(ctx context.Context, runnerHash hash.Hash) error +} + // Server contains the server config. type Server struct { instance *grpc.Server - rpc *cosmos.RPC + store Store ep *publisher.EventPublisher logger tmlog.Logger authorizedPubKeys []string + cdc *codec.Codec } // New returns a new gRPC server. -func New(rpc *cosmos.RPC, ep *publisher.EventPublisher, logger tmlog.Logger, authorizedPubKeys []string) *Server { +func New(store Store, ep *publisher.EventPublisher, logger tmlog.Logger, cdc *codec.Codec, authorizedPubKeys []string) *Server { return &Server{ - rpc: rpc, + store: store, ep: ep, logger: logger.With("module", "grpc"), authorizedPubKeys: authorizedPubKeys, + cdc: cdc, } } @@ -88,15 +129,15 @@ func validateInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryS func (s *Server) register() error { tokenToRunnerHash := &sync.Map{} - runner.RegisterRunnerServer(s.instance, runner.NewServer(s.rpc, s.ep, tokenToRunnerHash, s.logger)) + grpcrunner.RegisterRunnerServer(s.instance, grpcrunner.NewServer(s.store, s.ep, tokenToRunnerHash, s.logger)) - authorizer, err := orchestrator.NewAuthorizer(s.rpc.Codec(), s.authorizedPubKeys) + authorizer, err := orchestrator.NewAuthorizer(s.cdc, s.authorizedPubKeys) if err != nil { return err } orchestrator.RegisterEventServer(s.instance, orchestrator.NewEventServer(s.ep, authorizer)) - orchestrator.RegisterExecutionServer(s.instance, orchestrator.NewExecutionServer(s.rpc, authorizer)) - orchestrator.RegisterRunnerServer(s.instance, orchestrator.NewRunnerServer(s.rpc, tokenToRunnerHash, authorizer)) + orchestrator.RegisterExecutionServer(s.instance, orchestrator.NewExecutionServer(s.store, authorizer)) + orchestrator.RegisterRunnerServer(s.instance, orchestrator.NewRunnerServer(s.store, tokenToRunnerHash, authorizer)) reflection.Register(s.instance)