Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/examples/feed.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,23 @@ Subscribe
}
})
```

Reload
------
```
curl -u john:hello 127.0.0.1:10100/v1/application/30/feed/89/reload --digest
{
"result": "Feed reloaded",
"status": "ok"
}
```

Empty
-----
```
curl -u john:hello 127.0.0.1:10100/v1/application/30/feed/86/empty --digest
{
"result": "Feed empty done.",
"status": "ok"
}
```
39 changes: 39 additions & 0 deletions docs/examples/javascript.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
### JS client

#### Usage
```
window.onload = function() {
elasticfeed.init({
channel: {
url: 'ws://localhost:80',
transport: 'ws'
}
});

feed = elasticfeed.initFeed('000001', {
outputContainerId: 'my-elastic-feed-1',
stylerFunction: function(data) {
return '<div style="height:50px; border:1px dotted; border-color: blue;">' + data + '</div>';
}
});

feed.channel.on('join', function(chid, ts) {
feed1.addEntry(chid + " joined the chat room");
});

feed.channel.on('leave', function(chid, ts) {
feed1.addEntry(chid + " left the chat room");
});

window['socket'] = feed1.socket;
}
```

#### Test data broadcast
```
// single feed
socket.send({Type:1, Timestamp:1111111, Content: {Type:3, Timestamp:22222, Id: "000001", Content: "data-examples"}})

// all feeds in the view on the chanel
socket.send({Type:1, Timestamp:1111111, Content: {Type:3, Timestamp:22222, Id: "*", Content: "data-examples"}})
```
72 changes: 11 additions & 61 deletions resource/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,60 +8,8 @@ import (
"github.com/feedlabs/feedify/graph"

"github.com/feedlabs/elasticfeed/service/stream/controller/room"
"github.com/feedlabs/elasticfeed/service/stream/model"
)

// user_feed_token = channel_id + feed_id => e.g aabbccddee + aabbcc
// for private feeds there will be 1 websocket connection
// for public company feeds will be 1 websocket connection
// basically for each channel is 1 websocket connection
// private and public channel will stream through multiple feed-pages events
//
// channel => channel_id
// event => 'feed:' + feed_id
// data => [{ // action object
// id => string // entryId
// tags => strings... // array of strings
// action => string // add/delete/update
// data => string // entry data as content; string e.g. json.stringify
// }, {}, {}]

const BODY_HEADER = `{
"channel": "iO5wshd5fFE5YXxJ/hfyKQ==:17",
"event": "CM_Action_Abstract:SEND:31",
"data": {
"action": {
"actor": {
"_type": 33,
"_id": {
"id": "1"
},
"id": 1,
"displayName": "user1",
"visible": true,
"_class": "Feed_Model_User"
},
"verb": 13,
"type": 31,
"_class": "Feed_Action_Feed"
},
"model": {
"_type": 33,
"_id": {
"id": "1"
},
"id": 1,
"displayName": "user1",
"visible": true,
"_class": "Feed_Model_User"
},
"data": {`

const BODY_BOTTOM = `
}
}
}`

func GetEntryList(FeedId string, ApplicationId string, OrgId string) (feedEntries []*Entry, err error) {
feed, err := GetFeed(FeedId, ApplicationId, OrgId)

Expand Down Expand Up @@ -136,14 +84,11 @@ func AddEntry(feedEntry Entry, FeedId string, ApplicationId string, OrgId string
return "0", err
}

// _data := BODY_HEADER + `"Id": "` + feedEntry.Id + `", "Action": "add", "Tag": {}, "Data": ` + strconv.Quote(feedEntry.Data) + BODY_BOTTOM
// message.Publish(_data)

feedEntry.Id = strconv.Itoa(entry.Id)

// notify
data, _ := json.Marshal(entry)
room.Publish <- room.NewEntryEvent(model.EVENT_MESSAGE, "system", string(data))
d, _ := json.Marshal(feedEntry)
room.Publish <- room.NewFeedEvent(room.FEED_ENTRY_NEW, feed.Id, string(d))

return feedEntry.Id, nil
}
Expand All @@ -155,8 +100,12 @@ func UpdateEntry(id string, FeedId string, ApplicationId string, OrgId string, d
return err
}

_data := BODY_HEADER + `"Id": "` + entry.Id + `", "Action": "update", "Tag": {}, "Data": ` + strconv.Quote(data) + BODY_BOTTOM
message.Publish(_data)
// update entry
entry.Data = data

// notify
d, _ := json.Marshal(entry)
room.Publish <- room.NewEntryEvent(room.ENTRY_UPDATE, entry.Id, string(d))

_id, _ := strconv.Atoi(entry.Id)
return storage.SetPropertyNode(_id, "data", data)
Expand All @@ -176,8 +125,9 @@ func DeleteEntry(id string, FeedId string, ApplicationId string, OrgId string) (
storage.DeleteRelation(rel.Id)
}

_data := BODY_HEADER + `"Id": "` + entry.Id + `", "Action": "remove"` + BODY_BOTTOM
message.Publish(_data)
// notify
d, _ := json.Marshal(entry)
room.Publish <- room.NewEntryEvent(room.ENTRY_DELETE, entry.Id, string(d))

return storage.DeleteNode(_id)
}
14 changes: 8 additions & 6 deletions resource/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package resource
import (
"errors"
"strconv"
"encoding/json"

"github.com/feedlabs/feedify/graph"

"github.com/feedlabs/elasticfeed/service/stream/controller/room"
"github.com/feedlabs/elasticfeed/service/stream/model"
)

func (this *Feed) AddEntry(entry Entry) (EntryId string, err error) {
Expand Down Expand Up @@ -93,10 +91,6 @@ func AddFeed(feed Feed, applicationId string, orgId string) (id string, err erro

feed.Id = strconv.Itoa(_feed.Id)

// notify
data, _ := json.Marshal(feed)
room.Publish <- room.NewFeedEvent(model.EVENT_MESSAGE, "system", string(data))

return feed.Id, nil
}

Expand All @@ -110,6 +104,14 @@ func DeleteFeed(id string) (error) {
return storage.DeleteNode(_id)
}

func ActionReloadFeed(id string) {
room.Publish <- room.NewFeedEvent(room.FEED_RELOAD, id, "reload")
}

func ActionEmptyFeed(id string) {
room.Publish <- room.NewFeedEvent(room.FEED_EMPTY, id, "empty")
}

func init() {
Feeds = make(map[string]*Feed)
}
18 changes: 18 additions & 0 deletions service/db/v1/controller/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"github.com/feedlabs/elasticfeed/resource"
"github.com/feedlabs/elasticfeed/service/db/v1/template/feed"

"github.com/feedlabs/elasticfeed/service/db/v1/template"
)

type FeedController struct {
Expand Down Expand Up @@ -152,3 +154,19 @@ func (this *FeedController) Delete() {
feed.ResponseDelete()
this.Controller.ServeJson()
}

func (this *FeedController) ActionReload() {
feedId := this.Ctx.Input.Params[":feedId"]

resource.ActionReloadFeed(feedId)

this.ServeJson(template.Success("Feed reloaded."))
}

func (this *FeedController) ActionEmpty() {
feedId := this.Ctx.Input.Params[":feedId"]

resource.ActionEmptyFeed(feedId)

this.ServeJson(template.Success("Feed empty done."))
}
2 changes: 1 addition & 1 deletion service/db/v1/router/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ func init() {
feedify.Router("/v1/application/:applicationId:string/entry/:feedEntryId:int", &controller.EntryController{}, "get:Get;delete:Delete;put:Put")

feedify.Router("/v1/application/:applicationId:string/feed/:feedId:int/entry", &controller.EntryController{}, "get:GetListByFeed;post:PostToFeed")
feedify.Router("/v1/application/:applicationId:string/feed/:feedId:int/entry/:feedEntryId:int", &controller.EntryController{}, "get:Get;delete:Remove")
feedify.Router("/v1/application/:applicationId:string/feed/:feedId:int/entry/:feedEntryId:int", &controller.EntryController{}, "get:Get;delete:Remove;put:Put")
}
3 changes: 3 additions & 0 deletions service/db/v1/router/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ import (
func init() {
feedify.Router("/v1/application/:applicationId:string/feed", &controller.FeedController{}, "get:GetList;post:Post")
feedify.Router("/v1/application/:applicationId:string/feed/:feedId:int", &controller.FeedController{}, "get:Get;delete:Delete;put:Put")

feedify.Router("/v1/application/:applicationId:string/feed/:feedId:int/reload", &controller.FeedController{}, "get:ActionReload")
feedify.Router("/v1/application/:applicationId:string/feed/:feedId:int/empty", &controller.FeedController{}, "get:ActionEmpty")
}
2 changes: 1 addition & 1 deletion service/stream/controller/channel/long_pooling.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (this *LongPollingController) Post() {
// should be executed and returned directly to user
// lastReceived time should not be changed in that case

room.Publish <- room.NewSystemEvent(model.EVENT_MESSAGE, chid, data)
room.Publish <- room.NewSystemEvent(room.CHANNEL_MESSAGE, chid, data)
}

func (this *LongPollingController) Fetch() {
Expand Down
3 changes: 1 addition & 2 deletions service/stream/controller/channel/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/feedlabs/feedify"
"github.com/gorilla/websocket"

"github.com/feedlabs/elasticfeed/service/stream/model"
"github.com/feedlabs/elasticfeed/service/stream/controller/room"
)

Expand Down Expand Up @@ -43,7 +42,7 @@ func (this *WebSocketController) Join() {
return
}

room.Publish <- room.NewSystemEvent(model.EVENT_MESSAGE, chid, string(p))
room.Publish <- room.NewSystemEvent(room.CHANNEL_MESSAGE, chid, string(p))
room.P2P <- ws
}
}
64 changes: 45 additions & 19 deletions service/stream/controller/room/feed.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,41 @@
package room

import (
"strconv"
"time"
"container/list"
"encoding/json"

"github.com/feedlabs/feedify"
"github.com/gorilla/websocket"

"github.com/astaxie/beego/session"

"github.com/feedlabs/elasticfeed/service/stream/model"
)

const (
FEED_ADD = iota
FEED_DELETE
FEED_UPDATE
FEED_RESET
FEED_RELOAD

ENTRY_ADD
ENTRY_DELETE
ENTRY_UPDATE
ENTRY_RELOAD
CHANNEL_JOIN = 0
CHANNEL_LEAVE = 1
CHANNEL_MESSAGE = 2

SYSTEM_FEED_MESSAGE = 1

FEED_RELOAD = 1
FEED_EMPTY = 2
FEED_ENTRY_NEW = 3
FEED_ENTRY_INIT = 4
FEED_ENTRY_MORE = 5
FEED_HIDE = 6
FEED_SHOW = 7
FEED_ENTRY_MESSAGE = 8
FEED_AUTHENTICATED = 100
FEED_AUTHENTICATION_REQUIRED = 101
FEED_AUTHENTICATION_FAILED = 102
FEED_LOGGED_OUT = 103

ENTRY_UPDATE = 1
ENTRY_DELETE = 2
ENTRY_SHOW = 3
ENTRY_HIDE = 4
)

var (
Expand All @@ -49,23 +61,37 @@ type Subscriber struct {
}

func NewEvent(ep model.EventType, user, msg string) model.Event {
return model.Event{ep, user, time.Now().UnixNano(), msg}
ts := time.Now().UnixNano()
return model.Event{ep, user, ts, strconv.Itoa(int(ts)), msg}
}

func NewChannelEvent(ep model.EventType, user, msg string) model.Event {
return NewEvent(ep, user, msg)
}

func NewSystemEvent(ep model.EventType, user, msg string) model.Event {
return NewChannelEvent(ep, user, msg)
event := NewEvent(ep, user, msg)
data, _ := json.Marshal(event)

return NewChannelEvent(CHANNEL_MESSAGE, user, string(data))
}

func NewFeedEvent(ep model.EventType, user, msg string) model.Event {
return NewSystemEvent(ep, user, msg)
// "msg" is a feed action; can contain entry specific event
event := NewEvent(ep, user, msg)
data, _ := json.Marshal(event)

// "user" is and feed-id; "*" means all feeds on the client site
return NewSystemEvent(SYSTEM_FEED_MESSAGE, user, string(data))
}

func NewEntryEvent(ep model.EventType, user, msg string) model.Event {
return NewSystemEvent(ep, user, msg)
// "msg" is a feed entry data as a string
event := NewEvent(ep, user, msg)
data, _ := json.Marshal(event)

// "*" all feeds on client site will receive this message
return NewFeedEvent(FEED_ENTRY_MESSAGE, "*", string(data))
}

func Join(user string, ws *websocket.Conn) {
Expand All @@ -82,10 +108,10 @@ func FeedManager() {

case sub := <-Subscribe:
Subscribers.PushBack(sub)
Publish <- NewChannelEvent(model.EVENT_JOIN, sub.Name, "")
Publish <- NewChannelEvent(CHANNEL_JOIN, sub.Name, "")

case client := <-P2P:
data, _ := json.Marshal(NewSystemEvent(model.EVENT_MESSAGE, "system", "ok"))
data, _ := json.Marshal(NewSystemEvent(CHANNEL_MESSAGE, "system", "ok"))
client.WriteMessage(websocket.TextMessage, data)

case event := <-Publish:
Expand All @@ -108,7 +134,7 @@ func FeedManager() {
ws.Close()
feedify.Error("WebSocket closed:", unsub)
}
Publish <- NewChannelEvent(model.EVENT_LEAVE, unsub, "")
Publish <- NewChannelEvent(CHANNEL_LEAVE, unsub, "")
break
}
}
Expand Down
Loading