From 4c61b3531d3c42aad6e7b75b9648f8d661f589c2 Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Wed, 6 May 2015 16:29:47 +0200 Subject: [PATCH 01/10] Implemented scheduler --- elasticfeed/elasticfeed.go | 1 + elasticfeed/model/event.go | 4 ++++ event/manager.go | 31 +++++++++++++++++++++-------- workflow/manager.go | 40 ++++++++++++++++++++++++++++---------- 4 files changed, 58 insertions(+), 18 deletions(-) diff --git a/elasticfeed/elasticfeed.go b/elasticfeed/elasticfeed.go index 7a2a9f0..5628668 100644 --- a/elasticfeed/elasticfeed.go +++ b/elasticfeed/elasticfeed.go @@ -48,6 +48,7 @@ func (this *Elasticfeed) Run() { this.GetResourceManager().Init() this.GetServiceManager().Init() this.GetWorkflowManager().Init() + this.GetEventManager().Init() feedify.SetStaticPath("/static", "public") feedify.Run() diff --git a/elasticfeed/model/event.go b/elasticfeed/model/event.go index 6915a8d..30c0376 100644 --- a/elasticfeed/model/event.go +++ b/elasticfeed/model/event.go @@ -1,4 +1,8 @@ package model type EventManager interface { + + Init() + + InstallSchedule(string, string, func() error) error } diff --git a/event/manager.go b/event/manager.go index ea7762f..42d8eda 100644 --- a/event/manager.go +++ b/event/manager.go @@ -1,21 +1,23 @@ package event import ( + "github.com/astaxie/beego/toolbox" + "github.com/feedlabs/elasticfeed/elasticfeed/model" ) const ( - EVENT_STORING = "storing" - EVENT_PROCESSING = "processing" + EVENT_STORING = "storing" + EVENT_PROCESSING = "processing" EVENT_DISTRIBUTING = "distributing" - EVENT_LEARNING = "learning" + EVENT_LEARNING = "learning" - EVENT_STORING_CREATE_ENTRY = "create-entry" - EVENT_STORING_CREATE_VIEWER = "create-viewer" + EVENT_STORING_CREATE_ENTRY = "create-entry" + EVENT_STORING_CREATE_VIEWER = "create-viewer" EVENT_PROCESSING_FEED_MAINTAINER = "feed-maintainer" - EVENT_PROCESSING_SENSOR_UPDATE = "sensor-update" - EVENT_DISTRIBUTING_PUSH_ENTRY = "push-entry" - EVENT_LEARNING_CREATE_METRIC = "create-metric" + EVENT_PROCESSING_SENSOR_UPDATE = "sensor-update" + EVENT_DISTRIBUTING_PUSH_ENTRY = "push-entry" + EVENT_LEARNING_CREATE_METRIC = "create-metric" ) /** @@ -39,6 +41,11 @@ type EventManager struct { events map[string]interface{} } +func (this *EventManager) Init() { + toolbox.StartTask() +// defer toolbox.StopTask() +} + func (this *EventManager) On(name string, callback func(event *Event)) { this.events[name] = callback } @@ -63,6 +70,14 @@ func (this *EventManager) GetEventsMap() map[string]interface{} { } } +func (this *EventManager) InstallSchedule(name string, spec string, cb func() error) error { + t := toolbox.NewTask(name, spec, cb) + + toolbox.AddTask(name, t) + + return nil +} + func NewEventManager(engine model.Elasticfeed) model.EventManager { e := make(map[string]interface{}) return &EventManager{engine, e} diff --git a/workflow/manager.go b/workflow/manager.go index 62dce53..4edafb2 100644 --- a/workflow/manager.go +++ b/workflow/manager.go @@ -1,6 +1,8 @@ package workflow import ( + "fmt" + "encoding/json" "time" "math/rand" @@ -34,6 +36,9 @@ type WorkflowManager struct { func (this *WorkflowManager) Init() { this.BindServiceEvents() + + this.InstallSensorsSchedule() + this.InstallFeedMaintenanceSchedule() } /** @@ -69,7 +74,7 @@ func (this *WorkflowManager) GetEngine() emodel.Elasticfeed { } func (this *WorkflowManager) InitTemplate(t interface{}) { - // verify event availability into EventsManger + // verify event availability into EventsManager // verify hooks workflows this.template = t } @@ -179,7 +184,7 @@ func (this *WorkflowManager) ResourcePipelineRound(socketEvent smodel.SocketEven select { - // IF PIPE TAKES TOO MUCH TIME, DATA DELAYED + // IF PIPE TAKES TOO MUCH TIME, DATA DELAYED case <-timeout: event := room.NewFeedEvent(room.FEED_ENTRY_NEW, socketEvent.FeedId, "{Content:\"tiemout\"}") @@ -195,16 +200,16 @@ func (this *WorkflowManager) ResourcePipelineRound(socketEvent smodel.SocketEven socketEvent.Ch <- data } - // IF DATA ARRIVES WITHOUT DELAY + // IF DATA ARRIVES WITHOUT DELAY case list := <-results: - // ********************************************************************* - // register socket handler - // needs to send notiffication to long pooling + ws - // join should generate uniqe ID and client should use it - // maybe sessionID could be as uniqeID ? - // room.FeedSubscribers[socketEvent.FeedId][channelID] = socketEvent - // ********************************************************************* + // ********************************************************************* + // register socket handler + // needs to send notiffication to long pooling + ws + // join should generate uniqe ID and client should use it + // maybe sessionID could be as uniqeID ? + // room.FeedSubscribers[socketEvent.FeedId][channelID] = socketEvent + // ********************************************************************* d, _ := json.Marshal(list) event := room.NewFeedEvent(room.FEED_ENTRY_INIT, socketEvent.FeedId, string(d)) @@ -219,7 +224,22 @@ func (this *WorkflowManager) ResourcePipelineRound(socketEvent smodel.SocketEven } } +} + +func (this *WorkflowManager) InstallFeedMaintenanceSchedule() { + e := this.GetEngine().GetEventManager() + _ = e.InstallSchedule("feed", "0 12 * * * *", func() error { + fmt.Println("hello feed schedule") + return nil + }) +} +func (this *WorkflowManager) InstallSensorsSchedule() { + e := this.GetEngine().GetEventManager() + _ = e.InstallSchedule("sensor", "0 18 * * * *", func() error { + fmt.Println("hello sensor schedule") + return nil + }) } func NewWorkflowManager(engine emodel.Elasticfeed) *WorkflowManager { From e7a44f440d0bd6ce12d70b91ff02634d986e53de Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Thu, 7 May 2015 13:01:46 +0200 Subject: [PATCH 02/10] General thoughts; Added new service "predict" --- event/event.go | 5 ++- resource/viewer.go | 16 +++++++ service/predict/predict.go | 17 +++++++ service/predict/v1/controller/default.go | 24 ++++++++++ service/predict/v1/controller/status.go | 17 +++++++ service/predict/v1/router/predict.go | 10 +++++ service/predict/v1/router/status.go | 10 +++++ service/predict/v1/router/train.go | 10 +++++ service/predict/v1/template/const.go | 12 +++++ service/predict/v1/template/default.go | 29 ++++++++++++ service/predict/v1/template/request.go | 9 ++++ service/predict/v1/template/response.go | 56 ++++++++++++++++++++++++ service/service.go | 10 ++++- 13 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 service/predict/predict.go create mode 100644 service/predict/v1/controller/default.go create mode 100644 service/predict/v1/controller/status.go create mode 100644 service/predict/v1/router/predict.go create mode 100644 service/predict/v1/router/status.go create mode 100644 service/predict/v1/router/train.go create mode 100644 service/predict/v1/template/const.go create mode 100644 service/predict/v1/template/default.go create mode 100644 service/predict/v1/template/request.go create mode 100644 service/predict/v1/template/response.go diff --git a/event/event.go b/event/event.go index 6831249..c72410a 100644 --- a/event/event.go +++ b/event/event.go @@ -5,8 +5,11 @@ type Event struct { eventGroup string eventName string + + parent string + target string } func NewEvent(data interface{}) *Event { - return &Event{data, "", ""} + return &Event{data, "", "", "", ""} } diff --git a/resource/viewer.go b/resource/viewer.go index 958e354..a6cd7fa 100644 --- a/resource/viewer.go +++ b/resource/viewer.go @@ -1 +1,17 @@ package resource + +func (this *Viewer) GetStorage() {} + +func (this *Viewer) GetProfile() {} + +func (this *Viewer) GetNN() {} + +func (this *Viewer) GetId() {} + +func (this *Viewer) GetPluginStorageList() {} + +func (this *Viewer) GetIdentity() {} + +func (this *Viewer) GetScope() {} + +func (this *Viewer) GetContext() {} diff --git a/service/predict/predict.go b/service/predict/predict.go new file mode 100644 index 0000000..749e994 --- /dev/null +++ b/service/predict/predict.go @@ -0,0 +1,17 @@ +package predict + +import ( + "github.com/feedlabs/elasticfeed/service/predict/v1/router" +) + +type PredictService struct {} + +func (this *PredictService) Init() { + router.InitStatusRouters() + router.InitPredictRouters() + router.InitTrainRouters() +} + +func NewPredictService() *PredictService { + return &PredictService{} +} diff --git a/service/predict/v1/controller/default.go b/service/predict/v1/controller/default.go new file mode 100644 index 0000000..fe583aa --- /dev/null +++ b/service/predict/v1/controller/default.go @@ -0,0 +1,24 @@ +package controller + +import ( + "github.com/feedlabs/feedify" +) + +type DefaultController struct { + feedify.Controller +} + +func (this *DefaultController) Get() { + this.Data["json"] = map[string]string{"succes": "ok"} + this.Controller.ServeJson() +} + +func (this *DefaultController) ServeJson(data interface{}, status int) { + this.Data["json"] = data + this.SetResponseStatusCode(status) + this.Controller.ServeJson() +} + +func (this *DefaultController) SetResponseStatusCode(code int) { + this.Controller.Ctx.Output.SetStatus(code) +} diff --git a/service/predict/v1/controller/status.go b/service/predict/v1/controller/status.go new file mode 100644 index 0000000..ce3a067 --- /dev/null +++ b/service/predict/v1/controller/status.go @@ -0,0 +1,17 @@ +package controller + +import ( + "github.com/feedlabs/feedify" +) + +type StatusController struct { + feedify.Controller +} + +func (this *StatusController) Get() { + this.Data["json"] = map[string]interface{}{ + "enabled": true, + } + + this.Controller.ServeJson() +} diff --git a/service/predict/v1/router/predict.go b/service/predict/v1/router/predict.go new file mode 100644 index 0000000..7f2cb26 --- /dev/null +++ b/service/predict/v1/router/predict.go @@ -0,0 +1,10 @@ +package router + +import ( + "github.com/feedlabs/feedify" + "github.com/feedlabs/elasticfeed/service/predict/v1/controller" +) + +func InitPredictRouters() { + feedify.Router("/v1/predict/predict", &controller.DefaultController{}, "get:Get") +} diff --git a/service/predict/v1/router/status.go b/service/predict/v1/router/status.go new file mode 100644 index 0000000..2da594e --- /dev/null +++ b/service/predict/v1/router/status.go @@ -0,0 +1,10 @@ +package router + +import ( + "github.com/feedlabs/feedify" + "github.com/feedlabs/elasticfeed/service/predict/v1/controller" +) + +func InitStatusRouters() { + feedify.Router("/v1/predict/status", &controller.StatusController{}, "get:Get") +} diff --git a/service/predict/v1/router/train.go b/service/predict/v1/router/train.go new file mode 100644 index 0000000..adec61c --- /dev/null +++ b/service/predict/v1/router/train.go @@ -0,0 +1,10 @@ +package router + +import ( + "github.com/feedlabs/feedify" + "github.com/feedlabs/elasticfeed/service/predict/v1/controller" +) + +func InitTrainRouters() { + feedify.Router("/v1/predict/train", &controller.DefaultController{}, "get:Get") +} diff --git a/service/predict/v1/template/const.go b/service/predict/v1/template/const.go new file mode 100644 index 0000000..8e811ae --- /dev/null +++ b/service/predict/v1/template/const.go @@ -0,0 +1,12 @@ +package template + +const HTTP_CODE_VALID_REQUEST = 200 +const HTTP_CODE_ENTITY_CREATED = 201 +const HTTP_CODE_ENTITY_NOEXIST = 404 +const HTTP_CODE_ENTITY_CONFLICT = 409 +const HTTP_CODE_INVALID_REQUEST = 400 +const HTTP_CODE_ACCESS_UNAUTHORIZED = 401 +const HTTP_CODE_ACCESS_FORBIDDEN = 403 +const HTTP_CODE_NOALLOWED_REQUEST = 405 +const HTTP_CODE_TOOMANY_REQUEST = 429 +const HTTP_CODE_SERVER_ERROR = 500 diff --git a/service/predict/v1/template/default.go b/service/predict/v1/template/default.go new file mode 100644 index 0000000..eeac146 --- /dev/null +++ b/service/predict/v1/template/default.go @@ -0,0 +1,29 @@ +package template + +func Error(err error) (entry map[string]interface{}, code int) { + entry = make(map[string]interface{}) + entry["result"] = err.Error() + entry["status"] = "error" + + return entry, HTTP_CODE_ENTITY_NOEXIST +} + +func Success(msg string) (entry map[string]string, code int) { + return map[string]string{"result": msg, "status": "ok"}, HTTP_CODE_VALID_REQUEST +} + +func GetOK() int { + return HTTP_CODE_VALID_REQUEST +} + +func PostOK() int { + return HTTP_CODE_ENTITY_CREATED +} + +func PutOK() int { + return HTTP_CODE_ENTITY_CREATED +} + +func DeleteOK() int { + return HTTP_CODE_VALID_REQUEST +} diff --git a/service/predict/v1/template/request.go b/service/predict/v1/template/request.go new file mode 100644 index 0000000..52a8849 --- /dev/null +++ b/service/predict/v1/template/request.go @@ -0,0 +1,9 @@ +package template + +import ( + net "net/url" +) + +func QueryParamsCount(url *net.URL) int { + return len(url.Query()) +} diff --git a/service/predict/v1/template/response.go b/service/predict/v1/template/response.go new file mode 100644 index 0000000..4e3fe81 --- /dev/null +++ b/service/predict/v1/template/response.go @@ -0,0 +1,56 @@ +package template + +import ( + "strconv" + "github.com/feedlabs/feedify/context" +) + +type ResponseDefinition struct { + orderby string + orderdir string + page int + limit int +} + +func (this *ResponseDefinition) GetOrderBy() string { + return this.orderby +} + +func (this *ResponseDefinition) GetOrderDir() string { + return this.orderdir +} + +func (this *ResponseDefinition) GetPage() int { + return this.page +} + +func (this *ResponseDefinition) GetLimit() int { + return this.limit +} + +func NewResponseDefinition(input *context.Input) *ResponseDefinition { + orderby := input.Request.URL.Query().Get("orderby") + if orderby == "" { + orderby = "id" + } + + orderdir := input.Request.URL.Query().Get("orderdir") + if orderdir == "" { + orderdir = "asc" + } + + page := input.Request.URL.Query().Get("page") + if page == "" { + page = "0" + } + + limit := input.Request.URL.Query().Get("limit") + if limit == "" { + limit = "100" + } + + pageInt, _ := strconv.Atoi(page) + limitInt, _ := strconv.Atoi(limit) + + return &ResponseDefinition{orderby, orderdir, pageInt, limitInt} +} diff --git a/service/service.go b/service/service.go index 582d5dd..830dfd1 100644 --- a/service/service.go +++ b/service/service.go @@ -4,6 +4,7 @@ import ( "github.com/feedlabs/elasticfeed/service/store" "github.com/feedlabs/elasticfeed/service/stream" "github.com/feedlabs/elasticfeed/service/system" + "github.com/feedlabs/elasticfeed/service/predict" "github.com/feedlabs/elasticfeed/elasticfeed/model" ) @@ -16,12 +17,14 @@ type ServiceManager struct { store *store.DbService stream *stream.StreamService system *system.SystemService + predict *predict.PredictService } func (this *ServiceManager) Init() { this.system.Init() this.stream.Init() this.store.Init() + this.predict.Init() } func (this *ServiceManager) GetDbService() *store.DbService { @@ -36,11 +39,16 @@ func (this *ServiceManager) GetSystemService() *system.SystemService { return this.system } +func (this *ServiceManager) GetPredictService() *predict.PredictService { + return this.predict +} + func NewServiceManager(engine model.Elasticfeed) *ServiceManager { store := store.NewDbService() stream := stream.NewStreamService() system := system.NewSystemService() + predict := predict.NewPredictService() - return &ServiceManager{engine, store, stream, system} + return &ServiceManager{engine, store, stream, system, predict} } From 67cdd2b7e7360abb04b1727187ce2df01fbb7ae8 Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Sat, 16 May 2015 16:56:46 +0200 Subject: [PATCH 03/10] Refactoring partially the packer template logic into elasticfeed; some comments. --- ai/framework.go | 24 +++ workflow/manager.go | 63 ++++++++ workflow/template.go | 363 +++++++++++++++++++++++-------------------- 3 files changed, 285 insertions(+), 165 deletions(-) create mode 100644 ai/framework.go diff --git a/ai/framework.go b/ai/framework.go new file mode 100644 index 0000000..e7f5af6 --- /dev/null +++ b/ai/framework.go @@ -0,0 +1,24 @@ +package ai + +// https://github.com/josephmisiti/awesome-machine-learning#go + +/* + + SHOULD PROVIDE ABSTRACTION FOR ELASTICFEED ENGINE AND SERVER-PLUGIN LOGIC. + + - Support for ANN: new, train, predict, grow, optimise + - Support for GA: new, simulate + + - Recognition: Photo, Songs, Video + - Transformations: FFT, ... + + - Filtering + - Sorting + - Aggregation + + - Compressing + - Export/Import + - Transcoding + */ + +func init() {} diff --git a/workflow/manager.go b/workflow/manager.go index 4edafb2..b178526 100644 --- a/workflow/manager.go +++ b/workflow/manager.go @@ -16,6 +16,9 @@ import ( "github.com/feedlabs/elasticfeed/service/stream/controller/room" "github.com/feedlabs/elasticfeed/service/stream" + + jsonutil "github.com/mitchellh/packer/common/json" + "github.com/mitchellh/mapstructure" ) @@ -35,6 +38,66 @@ type WorkflowManager struct { } func (this *WorkflowManager) Init() { + + sss := ` + { + "Description":"aaaa-aaaa-aaaa", + "storing" : { + "new-entry": { + "indexers": [ + { + "type": "ann" + } + ], + "crawlers": [ + { + "type": "crawler-google" + } + ] + } + } + } + ` + // var data []byte + // copy(data[:], sss) + data := []byte(sss) + + var rawTplInterface interface{} + err := jsonutil.Unmarshal(data, &rawTplInterface) + if err != nil { + fmt.Println(data) + fmt.Println(err) + return + } + + // Decode the raw template interface into the actual rawTemplate + // structure, checking for any extranneous keys along the way. + var md mapstructure.Metadata + var rawTpl rawTemplate + decoderConfig := &mapstructure.DecoderConfig{ + Metadata: &md, + Result: &rawTpl, + } + + decoder, err := mapstructure.NewDecoder(decoderConfig) + if err != nil { + fmt.Println("err2") + fmt.Println(err) + return + } + + err = decoder.Decode(rawTplInterface) + if err != nil { + fmt.Println("err3") + fmt.Println(err) + return + } + + fmt.Println(rawTpl) + fmt.Println(rawTpl.Storing.NewEntryEvent.Indexers[0]["type"]) + fmt.Println(rawTpl.Storing.NewEntryEvent.Crawlers[0]["type"]) + + this.BindServiceEvents() this.InstallSensorsSchedule() diff --git a/workflow/template.go b/workflow/template.go index d59bb24..d178404 100644 --- a/workflow/template.go +++ b/workflow/template.go @@ -17,10 +17,10 @@ import ( // !!!! wrong place for debuging only type ComponentFinder struct { -// Builder BuilderFunc -// Hook HookFunc -// PostProcessor PostProcessorFunc -// Provisioner ProvisionerFunc + // Builder BuilderFunc + // Hook HookFunc + // PostProcessor PostProcessorFunc + // Provisioner ProvisionerFunc } // The rawTemplate struct represents the structure of a template read @@ -28,14 +28,47 @@ type ComponentFinder struct { // "interface{}" pointers since we actually don't know what their contents // are until we read the "type" field. type rawTemplate struct { - MinimumPackerVersion string `mapstructure:"min_packer_version"` + MinimumElasticfeedVersion string `mapstructure:"min_elasticfeed_version"` Description string + + Storing struct { + NewEntryEvent struct { + Indexers []map[string]interface{} `mapstructure:"indexers"` + Crawlers []map[string]interface{} `mapstructure:"crawlers"` + } `mapstructure:"new-entry"` + } `mapstructure:"storing"` + + Processing struct { + + FeedMaintainerEvent struct { + Scenarios []map[string]interface{} `mapstructure:"scenarios"` + } `mapstructure:"feed-maintainer"` + + SensorUpdateEvent struct { + Sensors []map[string]interface{} `mapstructure:"sensors"` + } `mapstructure:"sensor-update"` + + } `mapstructure:"processing"` + + Distributing struct { + PushEntryEvent struct { + Pipelines []map[string]interface{} `mapstructure:"pipelines"` + } `mapstructure:"push-entry"` + } `mapstructure:"distributing"` + + Learning struct { + PushEntryEvent struct { + Scenarios []map[string]interface{} `mapstructure:"scenarios"` + } `mapstructure:"new-metric"` + } `mapstructure:"learning"` + Builders []map[string]interface{} Hooks map[string][]string Push PushConfig PostProcessors []interface{} `mapstructure:"post-processors"` Provisioners []map[string]interface{} + Variables map[string]interface{} } @@ -141,14 +174,14 @@ func ParseTemplate(data []byte, vars map[string]string) (t *Template, err error) return } - if rawTpl.MinimumPackerVersion != "" { + if rawTpl.MinimumElasticfeedVersion != "" { // TODO: NOPE! Replace this Version := "1.0" vCur, err := version.NewVersion(Version) if err != nil { panic(err) } - vReq, err := version.NewVersion(rawTpl.MinimumPackerVersion) + vReq, err := version.NewVersion(rawTpl.MinimumElasticfeedVersion) if err != nil { return nil, fmt.Errorf( "'minimum_packer_version' error: %s", err) @@ -157,7 +190,7 @@ func ParseTemplate(data []byte, vars map[string]string) (t *Template, err error) if vCur.LessThan(vReq) { return nil, fmt.Errorf( "Template requires Packer version %s. "+ - "Running version is %s.", + "Running version is %s.", vReq, vCur) } } @@ -420,26 +453,26 @@ func ParseTemplateFile(path string, vars map[string]string) (*Template, error) { func parsePostProcessor(i int, rawV interface{}) (result []map[string]interface{}, errors []error) { switch v := rawV.(type) { - case string: + case string: result = []map[string]interface{}{ {"type": v}, } - case map[string]interface{}: + case map[string]interface{}: result = []map[string]interface{}{v} - case []interface{}: + case []interface{}: result = make([]map[string]interface{}, len(v)) errors = make([]error, 0) for j, innerRawV := range v { switch innerV := innerRawV.(type) { - case string: + case string: result[j] = map[string]interface{}{"type": innerV} - case map[string]interface{}: + case map[string]interface{}: result[j] = innerV - case []interface{}: + case []interface{}: errors = append( errors, fmt.Errorf("Post-processor %d.%d: sequences not allowed to be nested in sequences", i+1, j+1)) - default: + default: errors = append(errors, fmt.Errorf("Post-processor %d.%d is in a bad format.", i+1, j+1)) } } @@ -447,7 +480,7 @@ func parsePostProcessor(i int, rawV interface{}) (result []map[string]interface{ if len(errors) == 0 { errors = nil } - default: + default: result = nil errors = []error{fmt.Errorf("Post-processor %d is in a bad format.", i+1)} } @@ -472,155 +505,155 @@ func (t *Template) BuildNames() []string { // returned. //func (t *Template) Build(name string, components *ComponentFinder) (b Build, err error) { func (t *Template) Build(name string, components *ComponentFinder) (b interface {}, err error) { -// // Setup the Builder -// builderConfig, ok := t.Builders[name] -// if !ok { -// err = fmt.Errorf("No such build found in template: %s", name) -// return -// } -// -// // We panic if there is no builder function because this is really -// // an internal bug that always needs to be fixed, not an error. -// if components.Builder == nil { -// panic("no builder function") -// } -// -// // Panic if there are provisioners on the template but no provisioner -// // component finder. This is always an internal error, so we panic. -// if len(t.Provisioners) > 0 && components.Provisioner == nil { -// panic("no provisioner function") -// } -// -// builder, err := components.Builder(builderConfig.Type) -// if err != nil { -// return -// } -// -// if builder == nil { -// err = fmt.Errorf("Builder type not found: %s", builderConfig.Type) -// return -// } -// -// // Process the name -// tpl, variables, err := t.NewConfigTemplate() -// if err != nil { -// return nil, err -// } -// -// rawName := name -// name, err = tpl.Process(name, nil) -// if err != nil { -// return nil, err -// } -// -// // Gather the Hooks -// hooks := make(map[string][]Hook) -// for tplEvent, tplHooks := range t.Hooks { -// curHooks := make([]Hook, 0, len(tplHooks)) -// -// for _, hookName := range tplHooks { -// var hook Hook -// hook, err = components.Hook(hookName) -// if err != nil { -// return -// } -// -// if hook == nil { -// err = fmt.Errorf("Hook not found: %s", hookName) -// return -// } -// -// curHooks = append(curHooks, hook) -// } -// -// hooks[tplEvent] = curHooks -// } -// -// // Prepare the post-processors -// postProcessors := make([][]coreBuildPostProcessor, 0, len(t.PostProcessors)) -// for _, rawPPs := range t.PostProcessors { -// current := make([]coreBuildPostProcessor, 0, len(rawPPs)) -// for _, rawPP := range rawPPs { -// if rawPP.TemplateOnlyExcept.Skip(rawName) { -// continue -// } -// -// pp, err := components.PostProcessor(rawPP.Type) -// if err != nil { -// return nil, err -// } -// -// if pp == nil { -// return nil, fmt.Errorf("PostProcessor type not found: %s", rawPP.Type) -// } -// -// current = append(current, coreBuildPostProcessor{ -// processor: pp, -// processorType: rawPP.Type, -// config: rawPP.RawConfig, -// keepInputArtifact: rawPP.KeepInputArtifact, -// }) -// } -// -// // If we have no post-processors in this chain, just continue. -// // This can happen if the post-processors skip certain builds. -// if len(current) == 0 { -// continue -// } -// -// postProcessors = append(postProcessors, current) -// } -// -// // Prepare the provisioners -// provisioners := make([]coreBuildProvisioner, 0, len(t.Provisioners)) -// for _, rawProvisioner := range t.Provisioners { -// if rawProvisioner.TemplateOnlyExcept.Skip(rawName) { -// continue -// } -// -// var provisioner Provisioner -// provisioner, err = components.Provisioner(rawProvisioner.Type) -// if err != nil { -// return -// } -// -// if provisioner == nil { -// err = fmt.Errorf("Provisioner type not found: %s", rawProvisioner.Type) -// return -// } -// -// configs := make([]interface{}, 1, 2) -// configs[0] = rawProvisioner.RawConfig -// -// if rawProvisioner.Override != nil { -// if override, ok := rawProvisioner.Override[name]; ok { -// configs = append(configs, override) -// } -// } -// -// if rawProvisioner.pauseBefore > 0 { -// provisioner = &PausedProvisioner{ -// PauseBefore: rawProvisioner.pauseBefore, -// Provisioner: provisioner, -// } -// } -// -// coreProv := coreBuildProvisioner{provisioner, configs} -// provisioners = append(provisioners, coreProv) -// } -// -// b = &coreBuild{ -// name: name, -// builder: builder, -// builderConfig: builderConfig.RawConfig, -// builderType: builderConfig.Type, -// hooks: hooks, -// postProcessors: postProcessors, -// provisioners: provisioners, -// variables: variables, -// } -// -// return + // // Setup the Builder + // builderConfig, ok := t.Builders[name] + // if !ok { + // err = fmt.Errorf("No such build found in template: %s", name) + // return + // } + // + // // We panic if there is no builder function because this is really + // // an internal bug that always needs to be fixed, not an error. + // if components.Builder == nil { + // panic("no builder function") + // } + // + // // Panic if there are provisioners on the template but no provisioner + // // component finder. This is always an internal error, so we panic. + // if len(t.Provisioners) > 0 && components.Provisioner == nil { + // panic("no provisioner function") + // } + // + // builder, err := components.Builder(builderConfig.Type) + // if err != nil { + // return + // } + // + // if builder == nil { + // err = fmt.Errorf("Builder type not found: %s", builderConfig.Type) + // return + // } + // + // // Process the name + // tpl, variables, err := t.NewConfigTemplate() + // if err != nil { + // return nil, err + // } + // + // rawName := name + // name, err = tpl.Process(name, nil) + // if err != nil { + // return nil, err + // } + // + // // Gather the Hooks + // hooks := make(map[string][]Hook) + // for tplEvent, tplHooks := range t.Hooks { + // curHooks := make([]Hook, 0, len(tplHooks)) + // + // for _, hookName := range tplHooks { + // var hook Hook + // hook, err = components.Hook(hookName) + // if err != nil { + // return + // } + // + // if hook == nil { + // err = fmt.Errorf("Hook not found: %s", hookName) + // return + // } + // + // curHooks = append(curHooks, hook) + // } + // + // hooks[tplEvent] = curHooks + // } + // + // // Prepare the post-processors + // postProcessors := make([][]coreBuildPostProcessor, 0, len(t.PostProcessors)) + // for _, rawPPs := range t.PostProcessors { + // current := make([]coreBuildPostProcessor, 0, len(rawPPs)) + // for _, rawPP := range rawPPs { + // if rawPP.TemplateOnlyExcept.Skip(rawName) { + // continue + // } + // + // pp, err := components.PostProcessor(rawPP.Type) + // if err != nil { + // return nil, err + // } + // + // if pp == nil { + // return nil, fmt.Errorf("PostProcessor type not found: %s", rawPP.Type) + // } + // + // current = append(current, coreBuildPostProcessor{ + // processor: pp, + // processorType: rawPP.Type, + // config: rawPP.RawConfig, + // keepInputArtifact: rawPP.KeepInputArtifact, + // }) + // } + // + // // If we have no post-processors in this chain, just continue. + // // This can happen if the post-processors skip certain builds. + // if len(current) == 0 { + // continue + // } + // + // postProcessors = append(postProcessors, current) + // } + // + // // Prepare the provisioners + // provisioners := make([]coreBuildProvisioner, 0, len(t.Provisioners)) + // for _, rawProvisioner := range t.Provisioners { + // if rawProvisioner.TemplateOnlyExcept.Skip(rawName) { + // continue + // } + // + // var provisioner Provisioner + // provisioner, err = components.Provisioner(rawProvisioner.Type) + // if err != nil { + // return + // } + // + // if provisioner == nil { + // err = fmt.Errorf("Provisioner type not found: %s", rawProvisioner.Type) + // return + // } + // + // configs := make([]interface{}, 1, 2) + // configs[0] = rawProvisioner.RawConfig + // + // if rawProvisioner.Override != nil { + // if override, ok := rawProvisioner.Override[name]; ok { + // configs = append(configs, override) + // } + // } + // + // if rawProvisioner.pauseBefore > 0 { + // provisioner = &PausedProvisioner{ + // PauseBefore: rawProvisioner.pauseBefore, + // Provisioner: provisioner, + // } + // } + // + // coreProv := coreBuildProvisioner{provisioner, configs} + // provisioners = append(provisioners, coreProv) + // } + // + // b = &coreBuild{ + // name: name, + // builder: builder, + // builderConfig: builderConfig.RawConfig, + // builderType: builderConfig.Type, + // hooks: hooks, + // postProcessors: postProcessors, + // provisioners: provisioners, + // variables: variables, + // } + // + // return return nil, nil } From 2aaa294e91f930d4fb0c745273b05e2493ccb739 Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Thu, 28 May 2015 22:55:20 +0200 Subject: [PATCH 04/10] added TODOs --- TODO | 30 ++++++++++++++++++++++++++++++ ai/framework.go | 1 + ai/gpu/gpu.go | 1 + plugin/rpc_data.go | 19 +++++++++++++++++++ service/predict/predict.go | 9 +++++++++ workflow/manager.go | 24 ++++++++++++++---------- workflow/workflow.go | 1 + 7 files changed, 75 insertions(+), 10 deletions(-) create mode 100644 TODO create mode 100644 ai/gpu/gpu.go create mode 100644 plugin/rpc_data.go diff --git a/TODO b/TODO new file mode 100644 index 0000000..721e462 --- /dev/null +++ b/TODO @@ -0,0 +1,30 @@ + +TODO: + +- workflow/template +- metrics/stream + +- plugin ANN/pipeline ANN/scenario Location+weather/sensor PHOTO/indexer + +- exclude metric from streaming by re-integration + +- load Workflowfile by FeedID +- parse Workflowfile with templating +- load WorkflowController with Workflowfile +- add WorkflowController to WorkflowManager +- implement SystemEvents +- bind Workflow to SystemEvents + +- metrics manager: send from UI, store + +- entry logic: metrics +- viewer logic: metrics-for-entry + +- upload and run plugin + +- later needs pass as argument + - user/viewer context (storage, analytics, brains) + - internal api / communicator for accessing API resources + - artifacts (for plugins in chain) + - cache + - plugin manager (for metrics) diff --git a/ai/framework.go b/ai/framework.go index e7f5af6..dae1f34 100644 --- a/ai/framework.go +++ b/ai/framework.go @@ -3,6 +3,7 @@ package ai // https://github.com/josephmisiti/awesome-machine-learning#go /* + TODO: SHOULD PROVIDE ABSTRACTION FOR ELASTICFEED ENGINE AND SERVER-PLUGIN LOGIC. diff --git a/ai/gpu/gpu.go b/ai/gpu/gpu.go new file mode 100644 index 0000000..33eb92b --- /dev/null +++ b/ai/gpu/gpu.go @@ -0,0 +1 @@ +package gpu diff --git a/plugin/rpc_data.go b/plugin/rpc_data.go new file mode 100644 index 0000000..4af435e --- /dev/null +++ b/plugin/rpc_data.go @@ -0,0 +1,19 @@ +package plugin + +import ( + "net/rpc" + "github.com/feedlabs/elasticfeed/plugin/model" +) + +// An implementation of packer.Cache where the RpcData is actually executed +// over an RPC connection. +type RpcData struct { + client *rpc.Client + endpoint string +} + +// CacheRpcServer wraps a packer.Cache implementation and makes it exportable +// as part of a Golang RPC server. +type DataRpcServer struct { + data model.Data +} diff --git a/service/predict/predict.go b/service/predict/predict.go index 749e994..bc21844 100644 --- a/service/predict/predict.go +++ b/service/predict/predict.go @@ -4,6 +4,15 @@ import ( "github.com/feedlabs/elasticfeed/service/predict/v1/router" ) +/* + TODO: + - THIS SERVICE SHOULD ALLOW FOR WORKFLOW files + - WORKFLOW SHOULD be: + - SCENARIO -> YES (learning) + - PIPELINE -> YES (prediction) + - SENSOR/INDEXER/CRAWLER/HELPER -> NO (no pre-learning, no environment) + */ + type PredictService struct {} func (this *PredictService) Init() { diff --git a/workflow/manager.go b/workflow/manager.go index b178526..25033b7 100644 --- a/workflow/manager.go +++ b/workflow/manager.go @@ -41,7 +41,7 @@ func (this *WorkflowManager) Init() { sss := ` { - "Description":"aaaa-aaaa-aaaa", + "description":"aaaa-aaaa-aaaa", "storing" : { "new-entry": { "indexers": [ @@ -105,6 +105,7 @@ func (this *WorkflowManager) Init() { } /** + TODO: MAYBE COULD BIND TO "SYSTEM EVENT MANAGER" - COULD BIND TO RESOURCE EVENTS: NEW ENTRY, NEW METRIC, NEW VIEWER - COULD BIND TO CRON JOBS: FEED MAINTAINER, SENSORS UPDATE @@ -113,19 +114,20 @@ func (this *WorkflowManager) Init() { */ /** - - IMPLEMENT EVENTS TRIGGERS + TODO: - - IMPLEMENT STREAM SERVICE EVENT/HOOKS BINDING (LISTEN TO EVENTS AND HOOKS ON STREAM SERVICE) - - IMPLEMENT STORE SERVICE EVENT/HOOKS BINDING (SHOULD BE DONE BY "SYSTEM EVENTS MANAGER") + - IMPLEMENT EVENTS TRIGGERS - - IMPLEMENT LOCAL CRON JOB FOR ("SYSTEM EVENTS MANAGER") - - SENSOR REFRESH EVENT - - FEED MAINTAINER EVENT + - IMPLEMENT STREAM SERVICE EVENT/HOOKS BINDING (LISTEN TO EVENTS AND HOOKS ON STREAM SERVICE) + - IMPLEMENT STORE SERVICE EVENT/HOOKS BINDING (SHOULD BE DONE BY "SYSTEM EVENTS MANAGER") - - IMPLEMENT RESOURCE API WHICH - - CAN BE PASSED TO PLUGINS - - CAN PROVIDE/CREATE DATA + - IMPLEMENT LOCAL CRON JOB FOR ("SYSTEM EVENTS MANAGER") + - SENSOR REFRESH EVENT + - FEED MAINTAINER EVENT + - IMPLEMENT RESOURCE API WHICH + - CAN BE PASSED TO PLUGINS + - CAN PROVIDE/CREATE DATA */ func (this *WorkflowManager) GetStreamService() *stream.StreamService { @@ -174,6 +176,8 @@ func (this *WorkflowManager) BindStreamServiceEvents() { func (this *WorkflowManager) ResourcePipelineRound(socketEvent smodel.SocketEvent) { + // TODO: + // will run WorkflowManager with Pipeline plugins // ******************************************************************* diff --git a/workflow/workflow.go b/workflow/workflow.go index 13957a1..80f2f6a 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -31,6 +31,7 @@ func (this *WorkflowController) GetProfiler() *model.Profiler { } func (this *WorkflowController) Init() { + // TODO: // verify Feed.Workflowfile stricture; does match WorkflowManager Templating // verify plugins availability: this.manager.findPlugin() // run Plugins if require specific Profiler From 940180fc0f992647d22828c49732e3887940c8d7 Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Sat, 6 Jun 2015 17:43:40 +0200 Subject: [PATCH 05/10] Added concept of REST client and Scraper for plugins --- common/rest/client.go | 13 +++++++++++++ common/scraper/scraper.go | 10 ++++++++++ 2 files changed, 23 insertions(+) create mode 100644 common/rest/client.go create mode 100644 common/scraper/scraper.go diff --git a/common/rest/client.go b/common/rest/client.go new file mode 100644 index 0000000..538da77 --- /dev/null +++ b/common/rest/client.go @@ -0,0 +1,13 @@ +package rest + +/* + SHOULD ALLOW FOR PLUGINS TO HANDLE RESTFUL APIs + + e.g + + - photo API by imagga.com + - weather API by worldweatheronline.com + - space weather by nasa.org + - crawling API by import.io + - prediction API by prediction.io +*/ diff --git a/common/scraper/scraper.go b/common/scraper/scraper.go new file mode 100644 index 0000000..f97f370 --- /dev/null +++ b/common/scraper/scraper.go @@ -0,0 +1,10 @@ +package scraper + +/* + SHOULD ALLOW FOR EASY SCRAPPING OF WEBSITEs + + e.g. + - biorythm + - horoscope + - event in location/city +*/ From 79d087e8851738f026c0dbd3636a457796a6fbb8 Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Sat, 6 Jun 2015 19:43:23 +0200 Subject: [PATCH 06/10] Added fpga concept to AI framework --- ai/fpga/fpga.go | 5 +++++ ai/gpu/gpu.go | 4 ++++ common/rest/client.go | 7 ++++++- 3 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 ai/fpga/fpga.go diff --git a/ai/fpga/fpga.go b/ai/fpga/fpga.go new file mode 100644 index 0000000..7b37bbf --- /dev/null +++ b/ai/fpga/fpga.go @@ -0,0 +1,5 @@ +package fpga + +/* + field-programmable gate array + */ diff --git a/ai/gpu/gpu.go b/ai/gpu/gpu.go index 33eb92b..c6c06d0 100644 --- a/ai/gpu/gpu.go +++ b/ai/gpu/gpu.go @@ -1 +1,5 @@ package gpu + +/* + graphic processing unit + */ diff --git a/common/rest/client.go b/common/rest/client.go index 538da77..99ecb58 100644 --- a/common/rest/client.go +++ b/common/rest/client.go @@ -7,7 +7,12 @@ package rest - photo API by imagga.com - weather API by worldweatheronline.com - - space weather by nasa.org + - space weather by data.nasa.gov - crawling API by import.io - prediction API by prediction.io + - http://predictthesky.org/ Predict the Sky + - Earth temperature anomalies GET https://api.nasa.gov/planetary/earth/temperature/address GET https://api.nasa.gov/planetary/earth/temperature/coords + - space Sounds https://api.nasa.gov/planetary/sounds?q=apollo&api_key=DEMO_KEY + - https://earthdata.nasa.gov + - https://earthdata.nasa.gov/user-resources/remote-sensors */ From b3d79a1a61b92351d5d4761cf443c6b208fe5712 Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Sat, 6 Jun 2015 19:56:16 +0200 Subject: [PATCH 07/10] Added DSP concept --- ai/dsp/dsp.go | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 ai/dsp/dsp.go diff --git a/ai/dsp/dsp.go b/ai/dsp/dsp.go new file mode 100644 index 0000000..a85df08 --- /dev/null +++ b/ai/dsp/dsp.go @@ -0,0 +1,5 @@ +package dsp + +/* + digital signal processor + */ From 1e2d17c806395e8b0b77286f9afac94f7119242a Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Sun, 21 Jun 2015 17:27:22 +0200 Subject: [PATCH 08/10] - small updated - improvements - comments - ideas --- TODO | 54 +++++++++---- resource/workflow.go | 9 +++ service/system/v1/template/plugin/response.go | 9 ++- workflow/manager.go | 75 ++++--------------- workflow/workflow.go | 68 +++++++++++++++++ 5 files changed, 138 insertions(+), 77 deletions(-) diff --git a/TODO b/TODO index 721e462..83fde7f 100644 --- a/TODO +++ b/TODO @@ -1,13 +1,21 @@ TODO: +http://app.ganttpro.com/#!/app/home +Services +-------- - workflow/template - metrics/stream -- plugin ANN/pipeline ANN/scenario Location+weather/sensor PHOTO/indexer - -- exclude metric from streaming by re-integration +Plugins +------- +ANN/pipeline ANN/scenario Location+weather/sensor PHOTO/indexer +- weather: https://github.com/schachmat/wego +- imagga goclient +- NOX73/go-neural or fxsjy/gonn +Workflow +-------- - load Workflowfile by FeedID - parse Workflowfile with templating - load WorkflowController with Workflowfile @@ -15,16 +23,34 @@ TODO: - implement SystemEvents - bind Workflow to SystemEvents -- metrics manager: send from UI, store - -- entry logic: metrics -- viewer logic: metrics-for-entry -- upload and run plugin -- later needs pass as argument - - user/viewer context (storage, analytics, brains) - - internal api / communicator for accessing API resources - - artifacts (for plugins in chain) - - cache - - plugin manager (for metrics) +More than 70 experts will present examples, standards, methods, strategies, +and tools needed to deliver the right content, to the right people, +at the right place and time, on any device. + +WHAT IS INTELLIGENT CONTENT? + +Simply put, ‘intelligent content’ is content which is not limited to one purpose, +technology or output. It’s content that is structurally rich and semantically aware, +and is therefore discoverable, reusable, reconfigurable and adaptable. It’s +content that helps you and your customers get the job done. It’s content that +works for you and it’s limited only by your imagination. Learn more about intelligent +content and how it can benefit your organization. + +TOPICS TO BE COVERED + +Content Strategy +Content Engineering +Content Marketing +Digital Publishing +eBooks, Apps, and Mobile +Structured Authoring +Adaptive Content +Language and Culture +Personalized Content +Dynamic Publishing +Content Reuse +Translation Automation +Information Visualization +Big Data and Analytics diff --git a/resource/workflow.go b/resource/workflow.go index 2c33f7e..31c2b87 100644 --- a/resource/workflow.go +++ b/resource/workflow.go @@ -12,6 +12,15 @@ func (this *Workflow) GetRawData() map[string]interface{} { } func (this *Workflow) GetProfilerRawData() map[string]string { + + // workaround - first GetRawData() needs to return real data + data := make(map[string]string) + data["default-profiler"] = "true" + data["mem"] = "256" + data["cpus"] = "4" + data["bandwidth"] = "1000" + return data + return this.GetRawData()["profiler"].(map[string]string) } diff --git a/service/system/v1/template/plugin/response.go b/service/system/v1/template/plugin/response.go index 29b2b4b..88d5a15 100644 --- a/service/system/v1/template/plugin/response.go +++ b/service/system/v1/template/plugin/response.go @@ -19,10 +19,17 @@ func GetEntry(plugin *resource.Plugin) (entry map[string]interface{}) { entry["status"] = "error" entry["errors"] = "File path is missing" } else { - entry["status"] = "running" + entry["status"] = "runable" entry["errors"] = "no errors" } + runtime := make(map[string]interface{}) + runtime["workflowBinded"] = 0 + runtime["workflowCurrent"] = 0 + runtime["workflowCrashed"] = 0 + + entry["runtime"] = runtime + return entry } diff --git a/workflow/manager.go b/workflow/manager.go index 25033b7..047b1ba 100644 --- a/workflow/manager.go +++ b/workflow/manager.go @@ -16,9 +16,6 @@ import ( "github.com/feedlabs/elasticfeed/service/stream/controller/room" "github.com/feedlabs/elasticfeed/service/stream" - - jsonutil "github.com/mitchellh/packer/common/json" - "github.com/mitchellh/mapstructure" ) @@ -39,65 +36,13 @@ type WorkflowManager struct { func (this *WorkflowManager) Init() { - sss := ` - { - "description":"aaaa-aaaa-aaaa", - "storing" : { - "new-entry": { - "indexers": [ - { - "type": "ann" - } - ], - "crawlers": [ - { - "type": "crawler-google" - } - ] - } - } - } - ` - // var data []byte - // copy(data[:], sss) - data := []byte(sss) - - var rawTplInterface interface{} - err := jsonutil.Unmarshal(data, &rawTplInterface) - if err != nil { - fmt.Println(data) - fmt.Println(err) - return - } - - // Decode the raw template interface into the actual rawTemplate - // structure, checking for any extranneous keys along the way. - var md mapstructure.Metadata - var rawTpl rawTemplate - decoderConfig := &mapstructure.DecoderConfig{ - Metadata: &md, - Result: &rawTpl, - } - - decoder, err := mapstructure.NewDecoder(decoderConfig) - if err != nil { - fmt.Println("err2") - fmt.Println(err) - return - } - - err = decoder.Decode(rawTplInterface) - if err != nil { - fmt.Println("err3") - fmt.Println(err) - return - } - - fmt.Println(rawTpl) - fmt.Println(rawTpl.Storing.NewEntryEvent.Indexers[0]["type"]) - fmt.Println(rawTpl.Storing.NewEntryEvent.Crawlers[0]["type"]) - + // Testing: + org := &resource.Org{"0", "org", "org", 0, 0, 0} + app := &resource.Application{"1", org, "", 1} + feed := resource.NewFeed("15", app, "", 1, 1) + this.CreateFeedWorkflow(feed) + // Testing: bind to ANN plugin with RPS communicator this.BindServiceEvents() this.InstallSensorsSchedule() @@ -146,7 +91,6 @@ func (this *WorkflowManager) InitTemplate(t interface{}) { func (this *WorkflowManager) CreateFeedWorkflow(feed *resource.Feed) *WorkflowController { w := NewWorkflowController(feed, this) - w.Init() this.workflows = append(this.workflows, w) return w } @@ -159,6 +103,13 @@ func (this *WorkflowManager) BindServiceEvents() { func (this *WorkflowManager) BindStreamServiceEvents() { + /* + - HANDLE API EVENTS LIKE NEW-ENTRY... OR.. + - BIND TO this.GetStreamService().GetFeedRoomManager().ResourceEvent FOR ANY POSSIBLE EVENTS? + + - MAYBE BIND TO SYSTEM EVENTS? + */ + for { select { case socketEvent := <-this.GetStreamService().GetFeedRoomManager().ResourceEvent: diff --git a/workflow/workflow.go b/workflow/workflow.go index 80f2f6a..72b6b1c 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -1,8 +1,13 @@ package workflow import ( + "fmt" + "github.com/feedlabs/elasticfeed/plugin/model" "github.com/feedlabs/elasticfeed/resource" + + jsonutil "github.com/mitchellh/packer/common/json" + "github.com/mitchellh/mapstructure" ) type WorkflowController struct { @@ -31,11 +36,74 @@ func (this *WorkflowController) GetProfiler() *model.Profiler { } func (this *WorkflowController) Init() { + // TODO: // verify Feed.Workflowfile stricture; does match WorkflowManager Templating // verify plugins availability: this.manager.findPlugin() // run Plugins if require specific Profiler // bind Feed to system Events: this.manager.BindToSystemEvents() + + data := []byte(` + { + "description":"aaaa-aaaa-aaaa", + "storing" : { + "new-entry": { + "indexers": [ + { + "type": "ann" + } + ], + "crawlers": [ + { + "type": "crawler-google" + } + ] + } + } + } + `) + + var rawTplInterface interface{} + err := jsonutil.Unmarshal(data, &rawTplInterface) + if err != nil { + fmt.Println(data) + fmt.Println(err) + return + } + + // Decode the raw template interface into the actual rawTemplate + // structure, checking for any extranneous keys along the way. + var md mapstructure.Metadata + var rawTpl rawTemplate + decoderConfig := &mapstructure.DecoderConfig{ + Metadata: &md, + Result: &rawTpl, + } + + decoder, err := mapstructure.NewDecoder(decoderConfig) + if err != nil { + fmt.Println("err2") + fmt.Println(err) + return + } + + err = decoder.Decode(rawTplInterface) + if err != nil { + fmt.Println("err3") + fmt.Println(err) + return + } + + + /* + - WE NEED PLUGINS UPLOADED AND INSTALLED + - PLUGINS SHOULD BE ABLE TO RUN FOR SPECIFIC WORKFLOW + */ + + fmt.Println(rawTpl) + fmt.Println(rawTpl.Storing.NewEntryEvent.Indexers[0]["type"]) + fmt.Println(rawTpl.Storing.NewEntryEvent.Crawlers[0]["type"]) + } func (this *WorkflowController) DispatchIndexerHook(data interface{}) interface{} { From 6e99d3eb86b3bce361f8ad24836788227498fbfa Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Mon, 22 Jun 2015 16:29:02 +0200 Subject: [PATCH 09/10] Refactored WorkflowController to work with WorkflowManager --- workflow/workflow.go | 127 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 6 deletions(-) diff --git a/workflow/workflow.go b/workflow/workflow.go index 72b6b1c..a53ea43 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -3,11 +3,19 @@ package workflow import ( "fmt" + "encoding/json" + "time" + "math/rand" + "github.com/feedlabs/elasticfeed/plugin/model" "github.com/feedlabs/elasticfeed/resource" jsonutil "github.com/mitchellh/packer/common/json" "github.com/mitchellh/mapstructure" + + smodel "github.com/feedlabs/elasticfeed/service/stream/model" + + "github.com/feedlabs/elasticfeed/service/stream/controller/room" ) type WorkflowController struct { @@ -106,18 +114,125 @@ func (this *WorkflowController) Init() { } -func (this *WorkflowController) DispatchIndexerHook(data interface{}) interface{} { - return data -} +func (this *WorkflowController) ExecutePipelineChain(socketEvent smodel.SocketEvent) { + + // ******************************************************************* + // REAL CONTENT IMPROVEMENT + // based on connected user (viewer) or users (audience)!: habits, behaviours, stats etc. + // WORKFLOW PIPE: filtering, customization + // WORKFLOW SCENARIO-ENGINE: scenarios SHOULD BE IMPLEMENTED ON METRIC SERVICE + // ******************************************************************* + + // ******************************************************************* + // SCENARIO AND RULES/METRICS + // should use go routine with time limit to query filter rules + // if in specific time there is no rules the results should be sent + // client feed. After this the next package should be sent with + // rules which entries should be remove/hidden from the view! + // ******************************************************************* + + timeout := make(chan bool, 1) + results := make(chan []*resource.Entry, 1) + + // COLLECTING ENTRIES + if entryListCache == nil { + entryListCache = make(map[string][]*resource.Entry) + } + + if entryListCache[socketEvent.FeedId] == nil { + entryListCache[socketEvent.FeedId], _ = resource.GetEntryList(socketEvent.FeedId, socketEvent.AppId, socketEvent.OrgId) + } + + // WORKFLOW TIMEOUT + // !! SHOULD BE CONFIGURABLE OVER RUNTIME SETTING + // !! DEFAULT VALUE SHOULD BE IN CONFIG FILE + go func() { + amt := time.Duration(this.PipelineTimeout) + time.Sleep(amt * time.Millisecond) + timeout <- true + }() + + // WORKFLOW PIPELINE + go func(list []*resource.Entry, socketEvent smodel.SocketEvent) { + + if pluginManagerAnn == nil { + pluginManagerAnn, _ = this.manager.engine.GetPluginManager().LoadPipeline("ann") + pluginManagerAnn.Prepare() + } + + newList, _ := pluginManagerAnn.Run(list) + + var newEntryList []*resource.Entry + + for _, v := range newList.([]interface{}) { + Id := "" + Data := "" + for k, vv := range v.(map[interface{}]interface{}) { + if k == "Id" { + Id = vv.(string) + } + if k == "Data" { + Data = vv.(string) + } + } + if Id != "" && Data != "" { + newEntryList = append(newEntryList, &resource.Entry{Id, nil, Data}) + } + } -func (this *WorkflowController) DispatchPipelineHook(data interface{}) interface{} { - return data + list = newEntryList + + results <- list + }(entryListCache[socketEvent.FeedId], socketEvent) + + select { + + // IF PIPE TAKES TOO MUCH TIME, DATA DELAYED + case <-timeout: + + event := room.NewFeedEvent(room.FEED_ENTRY_NEW, socketEvent.FeedId, "{Content:\"tiemout\"}") + data, _ := json.Marshal(event) + + if socketEvent.Ws != nil { + amt := time.Duration(rand.Intn(500)) * 1000 + time.Sleep(amt * time.Microsecond) + socketEvent.Ws.WriteMessage(1, data) + } + + if socketEvent.Ch != nil { + socketEvent.Ch <- data + } + + // IF DATA ARRIVES WITHOUT DELAY + case list := <-results: + + // ********************************************************************* + // register socket handler + // needs to send notiffication to long pooling + ws + // join should generate uniqe ID and client should use it + // maybe sessionID could be as uniqeID ? + // room.FeedSubscribers[socketEvent.FeedId][channelID] = socketEvent + // ********************************************************************* + + d, _ := json.Marshal(list) + event := room.NewFeedEvent(room.FEED_ENTRY_INIT, socketEvent.FeedId, string(d)) + data, _ := json.Marshal(event) + + if socketEvent.Ws != nil { + socketEvent.Ws.WriteMessage(1, data) + } + + if socketEvent.Ch != nil { + socketEvent.Ch <- data + } + + } } func NewWorkflowController(feed *resource.Feed, wm *WorkflowManager) *WorkflowController { data := feed.GetWorkflow().GetProfilerRawData() p := model.NewProfiler(data) - w := &WorkflowController{feed, wm, p, 100, 100, 100, 100, 100} + w := &WorkflowController{feed, wm, p, 100, 100, 100, 50, 100} w.Init() From c8b727dc9aab28dd4a10abde1d5cace0a15624bf Mon Sep 17 00:00:00 2001 From: Chris Stasiak Date: Mon, 22 Jun 2015 17:58:51 +0200 Subject: [PATCH 10/10] Auto create of WorkflowController for Feed --- workflow/manager.go | 151 +++++++++----------------------------------- 1 file changed, 29 insertions(+), 122 deletions(-) diff --git a/workflow/manager.go b/workflow/manager.go index 047b1ba..fcac5a4 100644 --- a/workflow/manager.go +++ b/workflow/manager.go @@ -3,10 +3,6 @@ package workflow import ( "fmt" - "encoding/json" - "time" - "math/rand" - "github.com/feedlabs/elasticfeed/elasticfeed/model" "github.com/feedlabs/elasticfeed/resource" @@ -18,7 +14,6 @@ import ( "github.com/feedlabs/elasticfeed/service/stream" ) - var ( pluginManagerAnn pmodel.Pipeline entryListCache map[string][]*resource.Entry @@ -30,19 +25,14 @@ type WorkflowManager struct { pManager model.PluginManager eManager model.EventManager - workflows []*WorkflowController + workflows map[string]*WorkflowController template interface{} } func (this *WorkflowManager) Init() { - // Testing: - org := &resource.Org{"0", "org", "org", 0, 0, 0} - app := &resource.Application{"1", org, "", 1} - feed := resource.NewFeed("15", app, "", 1, 1) - this.CreateFeedWorkflow(feed) + this.workflows = make(map[string]*WorkflowController) - // Testing: bind to ANN plugin with RPS communicator this.BindServiceEvents() this.InstallSensorsSchedule() @@ -89,10 +79,9 @@ func (this *WorkflowManager) InitTemplate(t interface{}) { this.template = t } -func (this *WorkflowManager) CreateFeedWorkflow(feed *resource.Feed) *WorkflowController { - w := NewWorkflowController(feed, this) - this.workflows = append(this.workflows, w) - return w +func (this *WorkflowManager) CreateWorkflowFeed(feed *resource.Feed) *WorkflowController { + this.workflows[feed.Id] = NewWorkflowController(feed, this) + return this.workflows[feed.Id] } func (this *WorkflowManager) BindServiceEvents() { @@ -125,123 +114,41 @@ func (this *WorkflowManager) BindStreamServiceEvents() { } } -func (this *WorkflowManager) ResourcePipelineRound(socketEvent smodel.SocketEvent) { - - // TODO: +func (this *WorkflowManager) ResourceIndexerRound(socketEvent smodel.SocketEvent) { +} - // will run WorkflowManager with Pipeline plugins +func (this *WorkflowManager) ResourceCrawlerRound(socketEvent smodel.SocketEvent) { +} - // ******************************************************************* - // REAL CONTENT IMPROVEMENT - // based on connected user (viewer) or users (audience)!: habits, behaviours, stats etc. - // WORKFLOW PIPE: filtering, customization - // WORKFLOW SCENARIO-ENGINE: scenarios SHOULD BE IMPLEMENTED ON METRIC SERVICE - // ******************************************************************* +func (this *WorkflowManager) ResourceSensorRound(socketEvent smodel.SocketEvent) { +} - // ******************************************************************* - // SCENARIO AND RULES/METRICS - // should use go routine with time limit to query filter rules - // if in specific time there is no rules the results should be sent - // client feed. After this the next package should be sent with - // rules which entries should be remove/hidden from the view! - // ******************************************************************* +func (this *WorkflowManager) ResourceScenarioRound(socketEvent smodel.SocketEvent) { +} - timeout := make(chan bool, 1) - results := make(chan []*resource.Entry, 1) +func (this *WorkflowManager) ResourcePipelineRound(socketEvent smodel.SocketEvent) { - // COLLECTING ENTRIES - if entryListCache == nil { - entryListCache = make(map[string][]*resource.Entry) + workflow, err := this.FindWorkflowByFeedId(socketEvent.FeedId) + // create workflow for feed if not existing yet + if err != nil { + feed, _ := resource.GetFeed(socketEvent.FeedId, socketEvent.AppId, socketEvent.OrgId) + this.CreateWorkflowFeed(feed) } - if entryListCache[socketEvent.FeedId] == nil { - entryListCache[socketEvent.FeedId], _ = resource.GetEntryList(socketEvent.FeedId, socketEvent.AppId, socketEvent.OrgId) + workflow, err = this.FindWorkflowByFeedId(socketEvent.FeedId) + if err != nil { + fmt.Println("Cannot find Workflow for Feed ID: " + socketEvent.FeedId) + return } - // WORKFLOW TIMEOUT - // !! SHOULD BE CONFIGURABLE OVER RUNTIME SETTING - // !! DEFAULT VALUE SHOULD BE IN CONFIG FILE - go func() { - amt := time.Duration(100) - time.Sleep(amt * time.Millisecond) - timeout <- true - }() - - // WORKFLOW PIPELINE - go func(list []*resource.Entry, socketEvent smodel.SocketEvent) { - - if pluginManagerAnn == nil { - pluginManagerAnn, _ = this.engine.GetPluginManager().LoadPipeline("ann") - pluginManagerAnn.Prepare() - } - - newList, _ := pluginManagerAnn.Run(list) - - var newEntryList []*resource.Entry - - for _, v := range newList.([]interface{}) { - Id := "" - Data := "" - for k, vv := range v.(map[interface{}]interface{}) { - if k == "Id" { - Id = vv.(string) - } - if k == "Data" { - Data = vv.(string) - } - } - if Id != "" && Data != "" { - newEntryList = append(newEntryList, &resource.Entry{Id, nil, Data}) - } - } - - list = newEntryList - - results <- list - }(entryListCache[socketEvent.FeedId], socketEvent) - - select { - - // IF PIPE TAKES TOO MUCH TIME, DATA DELAYED - case <-timeout: - - event := room.NewFeedEvent(room.FEED_ENTRY_NEW, socketEvent.FeedId, "{Content:\"tiemout\"}") - data, _ := json.Marshal(event) - - if socketEvent.Ws != nil { - amt := time.Duration(rand.Intn(500)) * 1000 - time.Sleep(amt * time.Microsecond) - socketEvent.Ws.WriteMessage(1, data) - } - - if socketEvent.Ch != nil { - socketEvent.Ch <- data - } - - // IF DATA ARRIVES WITHOUT DELAY - case list := <-results: - - // ********************************************************************* - // register socket handler - // needs to send notiffication to long pooling + ws - // join should generate uniqe ID and client should use it - // maybe sessionID could be as uniqeID ? - // room.FeedSubscribers[socketEvent.FeedId][channelID] = socketEvent - // ********************************************************************* - - d, _ := json.Marshal(list) - event := room.NewFeedEvent(room.FEED_ENTRY_INIT, socketEvent.FeedId, string(d)) - data, _ := json.Marshal(event) - - if socketEvent.Ws != nil { - socketEvent.Ws.WriteMessage(1, data) - } - - if socketEvent.Ch != nil { - socketEvent.Ch <- data - } + workflow.ExecutePipelineChain(socketEvent) +} +func (this *WorkflowManager) FindWorkflowByFeedId(id string) (workflow *WorkflowController, err error) { + if this.workflows[id] == nil { + return nil, fmt.Errorf("Workflow for feedID %s does not exist!", id) } + return this.workflows[id], nil } func (this *WorkflowManager) InstallFeedMaintenanceSchedule() {