Skip to content

Commit 3453073

Browse files
committed
feat: process notifications from MQ and persist to storage
1 parent 6120086 commit 3453073

File tree

162 files changed

+6158
-684
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

162 files changed

+6158
-684
lines changed

app/content/internal/biz/article.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ import (
66
userv1 "common/api/user/v1"
77
"common/pkg/client"
88
"common/pkg/constant"
9+
"common/pkg/cutil/base"
10+
"common/pkg/cutil/base/str"
11+
dict2 "common/pkg/cutil/collections/dict"
12+
"common/pkg/cutil/collections/set"
913
commonModel "common/pkg/model"
1014
"common/pkg/util"
11-
"common/pkg/util/base"
12-
"common/pkg/util/collections/dict"
13-
"common/pkg/util/collections/set"
1415
"content/internal/biz/model"
1516
"content/internal/biz/repo"
1617
"content/internal/data/ent"
@@ -32,7 +33,7 @@ type ArticleDomain struct {
3233
}
3334

3435
func NewArticleDomain(base *BaseDomain, articleRepo repo.ArticleRepo, postscriptRepo repo.ArticlePostscriptRepo, actionRecordRepo repo.ArticleActionRecordRepo, commentRepo repo.CommentRepo, tagRepo repo.TagRepo, domainRepo repo.DomainRepo) (*ArticleDomain, error) {
35-
sf, err := util.NewSonyflake()
36+
sf, err := str.NewSonyflake()
3637
if err != nil {
3738
return nil, err
3839
}
@@ -246,13 +247,13 @@ func (d *ArticleDomain) Page(ctx context.Context, page *cv1.PageRequest, req *re
246247
userIds.Add(*item.CreatedBy)
247248
}
248249

249-
lastCommentMap := dict.New[int64, *model.Comment](0)
250+
lastCommentMap := dict2.New[int64, *model.Comment](0)
250251
if articleIds.Len() > 0 {
251252
lastCommentMap, err = d.commentRepo.GetArticleLastComments(ctx, d.db, &repo.CommentGetReq{ArticleIds: articleIds.ToSlice()})
252253
if err != nil {
253254
return nil, nil, err
254255
}
255-
lastCommentMap.Foreach(func(e *dict.Entry[int64, *model.Comment]) bool {
256+
lastCommentMap.Foreach(func(e *dict2.Entry[int64, *model.Comment]) bool {
256257
userIds.Add(*e.Value.CreatedBy)
257258
return true
258259
})

app/content/internal/biz/comment.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
userv1 "common/api/user/v1"
77
"common/pkg/client"
88
"common/pkg/constant"
9-
"common/pkg/util/base"
10-
"common/pkg/util/collections/set"
9+
"common/pkg/cutil/base"
10+
"common/pkg/cutil/collections/set"
1111
"content/internal/biz/model"
1212
"content/internal/biz/repo"
1313
"content/internal/data/ent"

app/content/internal/biz/model/article.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package model
33
import (
44
v1 "common/api/content/v1"
55
userv1 "common/api/user/v1"
6+
"common/pkg/cutil/base"
7+
set2 "common/pkg/cutil/collections/set"
68
"common/pkg/util"
7-
"common/pkg/util/base"
8-
"common/pkg/util/collections/set"
99
"content/internal/data/ent/gen"
1010
"fmt"
1111
"time"
@@ -29,11 +29,6 @@ type Article struct {
2929
IsSummary bool `json:"-"`
3030
}
3131

32-
func NewArticle(model *gen.Article) *Article {
33-
a := &Article{Article: model}
34-
return a
35-
}
36-
3732
// Summary 文章摘要
3833
func (a *Article) Summary() {
3934
r := []rune(a.Content)
@@ -48,8 +43,8 @@ func (a *Article) FormatContent() {
4843
}
4944

5045
// ParseContent 解析文章内容
51-
func (a *Article) ParseContent() (atUserNames set.Set[string]) {
52-
atUserNames = set.New[string](0)
46+
func (a *Article) ParseContent() (atUserNames set2.Set[string]) {
47+
atUserNames = set2.New[string](0)
5348
tree := parse.Parse(fmt.Sprintf("%s_%d", "article_content", a.ID), []byte(a.Content), parse.NewOptions())
5449
ast.Walk(tree.Root, func(n *ast.Node, entering bool) ast.WalkStatus {
5550
return util.ParseNodeLinkAtUsernames(n, entering, atUserNames)
@@ -72,8 +67,8 @@ func (a *Article) FormatRewardContent() {
7267
}
7368

7469
// ParseRewardContent 解析文章打赏区内容
75-
func (a *Article) ParseRewardContent() (atUserNames set.Set[string]) {
76-
atUserNames = set.New[string](0)
70+
func (a *Article) ParseRewardContent() (atUserNames set2.Set[string]) {
71+
atUserNames = set2.New[string](0)
7772
if a.RewardContent != nil {
7873
tree := parse.Parse(fmt.Sprintf("%s_%d", "article_reward_content", a.ID), []byte(*a.RewardContent), parse.NewOptions())
7974
ast.Walk(tree.Root, func(n *ast.Node, entering bool) ast.WalkStatus {
@@ -136,7 +131,7 @@ func (a *Article) ConvertToRpc() *v1.Article {
136131
}
137132
if len(a.Edges.Tags) > 0 {
138133
for _, tag := range a.Edges.Tags {
139-
article.Tags = append(article.Tags, (*Tag)(tag).ConvertToRpc())
134+
article.Tags = append(article.Tags, (&Tag{Tag: tag}).ConvertToRpc())
140135
}
141136
}
142137
return article

app/content/internal/biz/model/article_postscript.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package model
22

33
import (
44
v1 "common/api/content/v1"
5+
set2 "common/pkg/cutil/collections/set"
56
"common/pkg/util"
6-
"common/pkg/util/collections/set"
77
"content/internal/data/ent/gen"
88
"fmt"
99

@@ -23,8 +23,8 @@ func (p *ArticlePostscript) FormatContent() {
2323
}
2424

2525
// ParseContent 解析文章内容
26-
func (p *ArticlePostscript) ParseContent() (atUserNames set.Set[string]) {
27-
atUserNames = set.New[string](0)
26+
func (p *ArticlePostscript) ParseContent() (atUserNames set2.Set[string]) {
27+
atUserNames = set2.New[string](0)
2828
tree := parse.Parse(fmt.Sprintf("article_postscript_%d", p.ID), []byte(p.Content), parse.NewOptions())
2929
ast.Walk(tree.Root, func(n *ast.Node, entering bool) ast.WalkStatus {
3030
return util.ParseNodeLinkAtUsernames(n, entering, atUserNames)

app/content/internal/biz/model/comment.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package model
33
import (
44
v1 "common/api/content/v1"
55
userv1 "common/api/user/v1"
6+
set2 "common/pkg/cutil/collections/set"
67
"common/pkg/util"
7-
"common/pkg/util/collections/set"
88
"content/internal/data/ent/gen"
99
"fmt"
1010

@@ -29,8 +29,8 @@ func (c *Comment) FormatContent() {
2929
}
3030

3131
// ParseContent 解析评论内容
32-
func (c *Comment) ParseContent() (atUserNames set.Set[string]) {
33-
atUserNames = set.New[string](0)
32+
func (c *Comment) ParseContent() (atUserNames set2.Set[string]) {
33+
atUserNames = set2.New[string](0)
3434
tree := parse.Parse(fmt.Sprintf("article_postscript_%d", c.ID), []byte(c.Content), parse.NewOptions())
3535
ast.Walk(tree.Root, func(n *ast.Node, entering bool) ast.WalkStatus {
3636
return util.ParseNodeLinkAtUsernames(n, entering, atUserNames)

app/content/internal/biz/model/domain.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"google.golang.org/protobuf/types/known/timestamppb"
88
)
99

10-
type Domain gen.Domain
10+
type Domain struct {
11+
*gen.Domain
12+
}
1113

1214
func (t *Domain) ConvertToRpc() *v1.Domain {
1315
domain := &v1.Domain{
@@ -24,10 +26,9 @@ func (t *Domain) ConvertToRpc() *v1.Domain {
2426
TagCount: t.TagCount,
2527
IsNav: t.IsNav,
2628
}
27-
entDomain := (*gen.Domain)(t)
28-
if len(entDomain.Edges.Tags) > 0 {
29-
for _, tag := range entDomain.Edges.Tags {
30-
domain.Tags = append(domain.Tags, (*Tag)(tag).ConvertToRpc())
29+
if len(t.Domain.Edges.Tags) > 0 {
30+
for _, tag := range t.Domain.Edges.Tags {
31+
domain.Tags = append(domain.Tags, (&Tag{Tag: tag}).ConvertToRpc())
3132
}
3233
}
3334
return domain

app/content/internal/biz/model/tag.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package model
22

33
import (
44
v1 "common/api/content/v1"
5-
"common/pkg/util/base"
5+
"common/pkg/cutil/base"
66
"content/internal/data/ent/gen"
77

88
"google.golang.org/protobuf/types/known/timestamppb"
99
)
1010

11-
type Tag gen.Tag
11+
type Tag struct {
12+
*gen.Tag
13+
}
1214

1315
func (t *Tag) ConvertToRpc() *v1.Tag {
1416
return &v1.Tag{

app/content/internal/biz/repo/comment.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package repo
33
import (
44
cv1 "common/api/common/v1"
55
v1 "common/api/content/v1"
6-
"common/pkg/util/collections/dict"
6+
"common/pkg/cutil/collections/dict"
77
"content/internal/biz/model"
88
"content/internal/data/ent/gen"
99
"context"

app/content/internal/biz/repo/tag.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
type TagRepo interface {
1313
Save(ctx context.Context, tx *gen.Client, tag *model.Tag) (*model.Tag, error)
1414
Saves(ctx context.Context, tx *gen.Client, tags []*model.Tag) ([]*model.Tag, error)
15-
Update(ctx context.Context, db *gen.Client, tag *model.Tag) (*model.Tag, error)
15+
Update(ctx context.Context, tx *gen.Client, tag *model.Tag) (*model.Tag, error)
1616

1717
GetOne(ctx context.Context, tx *gen.Client, req *TagGetReq) (*model.Tag, error)
1818
GetList(ctx context.Context, tx *gen.Client, req *TagGetReq) ([]*model.Tag, error)

app/content/internal/data/article.go

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,24 @@ package data
33
import (
44
cv1 "common/api/common/v1"
55
v1 "common/api/content/v1"
6+
notifyv1 "common/api/notify/v1"
67
"common/pkg/constant"
7-
"common/pkg/util/base"
8-
"common/pkg/util/collections/set"
8+
"common/pkg/cutil/base"
9+
"common/pkg/cutil/collections/set"
10+
commonModel "common/pkg/model"
11+
"common/pkg/util"
912
"content/internal/biz/model"
1013
"content/internal/biz/repo"
1114
"content/internal/data/ent/gen"
1215
"content/internal/data/ent/gen/article"
1316
"content/internal/data/ent/gen/articlepostscript"
1417
"content/internal/data/ent/gen/tag"
1518
"context"
19+
"encoding/json"
1620
"time"
1721

1822
"entgo.io/ent/dialect/sql"
23+
"github.com/google/uuid"
1924
)
2025

2126
type ArticleRepo struct {
@@ -182,6 +187,11 @@ func (r *ArticleRepo) UpdateStat(ctx context.Context, tx *gen.Client, articleId
182187
}
183188

184189
func (r *ArticleRepo) Publish(ctx context.Context, tx *gen.Client, articleId int64) error {
190+
user, ok := util.GetContextValue[*commonModel.User](ctx, constant.CtxUserInfo)
191+
if !ok {
192+
return cv1.ErrorUnauthorized("user not login")
193+
}
194+
185195
first, err := r.GetOne(ctx, tx, &repo.ArticleGetReq{ArticleId: base.Ptr(articleId)})
186196
if err != nil {
187197
return err
@@ -196,22 +206,25 @@ func (r *ArticleRepo) Publish(ctx context.Context, tx *gen.Client, articleId int
196206
return err
197207
}
198208

199-
return nil
209+
//return nil
200210

201-
//// Todo 广播添加文章事件
202-
//publish := &v1.ArticleEventPublish{}
203-
//err = copier.Copy(&publish, first)
204-
//if err != nil {
205-
// return err
206-
//}
207-
//marshal, err := json.Marshal(publish)
208-
//if err != nil {
209-
// return err
210-
//}
211-
//err = r.rabbitmq.Publish(constant.ExchangeContent.String(), constant.RoutingKeyArticleCreate.String(), marshal)
212-
//if err != nil {
213-
// return err
214-
//}
211+
// Todo 广播添加文章事件
212+
publish := &commonModel.Notification{
213+
UUID: uuid.New().String(),
214+
Type: base.Ptr(notifyv1.NotificationType_NotificationTypeArticlePublish),
215+
SenderId: user.ID,
216+
Channels: []*notifyv1.NotificationChannel{base.Ptr(notifyv1.NotificationChannel_NotificationChannelWebSite)},
217+
Meta: map[string]any{"user": commonModel.User{Name: user.Name}, "article": first},
218+
Status: notifyv1.NotificationStatus_NotificationStatusNormal,
219+
}
220+
marshal, err := json.Marshal(publish)
221+
if err != nil {
222+
return err
223+
}
224+
err = r.rabbitmq.Publish(constant.ExchangeContent.String(), constant.RoutingKeyContentArticlePublish.String(), marshal)
225+
if err != nil {
226+
return err
227+
}
215228
//// Todo 广播@用户通知
216229
//atUserNames := first.ParseContent()
217230
//
@@ -221,8 +234,8 @@ func (r *ArticleRepo) Publish(ctx context.Context, tx *gen.Client, articleId int
221234
// return err
222235
//}
223236
//_ = atUserList
224-
//
225-
//return nil
237+
238+
return nil
226239
}
227240

228241
func (r *ArticleRepo) Delete(ctx context.Context, tx *gen.Client, articleId int64) error {

0 commit comments

Comments
 (0)