Skip to content

Commit 99496a1

Browse files
authored
Merge pull request #138 from djoreilly/rsyslog-shipper
Add plugin and artifact for sending client events to rsyslog
2 parents 87ee357 + be4c407 commit 99496a1

File tree

4 files changed

+246
-0
lines changed

4 files changed

+246
-0
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
name: RsyslogShipper.Events.Clients
2+
3+
description: |
4+
This server side event monitoring artifact will watch a selection of client
5+
monitoring artifacts for new events and push them rsyslog.
6+
7+
NOTE: You must ensure you are collecting these artifacts from the
8+
clients by adding them to the "Client Events" GUI.
9+
10+
type: SERVER_EVENT
11+
12+
parameters:
13+
- name: UnixSocket
14+
description: Path to the Unix Domain Socket to send events to
15+
type: string
16+
default: /tmp/velo-socket
17+
18+
- name: Threads
19+
description: Number of threads to start up to post events
20+
type: int
21+
default: 2
22+
23+
- name: Artifacts
24+
type: artifactset
25+
artifact_type: CLIENT_EVENT
26+
description: Client artifacts to monitor
27+
28+
sources:
29+
- query: |
30+
LET _ <= SELECT log(message="ERROR: parameter Artifacts cannot be empty!") FROM scope() WHERE len(list=Artifacts) = 0
31+
32+
LET artifacts_to_watch = SELECT Artifact FROM Artifacts
33+
WHERE log(message="Sending events from client artifact " + Artifact + " to rsyslog")
34+
35+
LET events = SELECT * FROM foreach(
36+
row=artifacts_to_watch,
37+
async=TRUE, // Required for event queries in foreach()
38+
query={
39+
SELECT *, Artifact, timestamp(epoch=now()) AS timestamp
40+
FROM watch_monitoring(artifact=Artifact)
41+
})
42+
43+
SELECT * FROM rsyslog_upload(query=events, unix_domain=UnixSocket, threads=Threads)
44+

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ require (
198198
github.com/klauspost/compress v1.17.8 // indirect
199199
github.com/klauspost/pgzip v1.2.5 // indirect
200200
github.com/kr/fs v0.1.0 // indirect
201+
github.com/leodido/go-syslog v1.0.1 // indirect
201202
github.com/lestrrat-go/strftime v1.0.5 // indirect
202203
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
203204
github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
402402
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
403403
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
404404
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
405+
github.com/leodido/go-syslog v1.0.1 h1:/I6CcKOIT/Auo/vjvemIaQYnSNFmcv6Xd6kqXaKHZyM=
406+
github.com/leodido/go-syslog v1.0.1/go.mod h1:iGQLav8eZdt0+DaWcqmGKurRtDPyxwD1Dvc4DG5GMoU=
405407
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
406408
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
407409
github.com/lestrrat-go/strftime v1.0.5 h1:A7H3tT8DhTz8u65w+JRpiBxM4dINQhUXAZnhBa2xeOE=

vql/server/rsyslog.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"io"
6+
"net"
7+
"os"
8+
"strconv"
9+
"sync"
10+
"time"
11+
12+
"github.com/Velocidex/ordereddict"
13+
"github.com/leodido/go-syslog/rfc5424"
14+
"golang.org/x/time/rate"
15+
"www.velocidex.com/golang/velociraptor/acls"
16+
"www.velocidex.com/golang/velociraptor/file_store/api"
17+
"www.velocidex.com/golang/velociraptor/file_store/directory"
18+
"www.velocidex.com/golang/velociraptor/utils"
19+
"www.velocidex.com/golang/velociraptor/vql"
20+
"www.velocidex.com/golang/vfilter"
21+
"www.velocidex.com/golang/vfilter/arg_parser"
22+
)
23+
24+
const pluginName = "rsyslog_upload"
25+
26+
type rsyslogUploadPluginArgs struct {
27+
Query vfilter.StoredQuery `vfilter:"required,field=query,doc=Source for rows to upload."`
28+
UnixDomain string `vfilter:"required,field=unix_domain,doc=path to unix domain socket rsyslog listens on"`
29+
Threads int `vfilter:"optional,field=threads,doc=How many threads to use to send events."`
30+
}
31+
32+
type rsyslogUploadPlugin struct{}
33+
34+
func (r rsyslogUploadPlugin) Info(
35+
scope vfilter.Scope, typeMap *vfilter.TypeMap,
36+
) *vfilter.PluginInfo {
37+
return &vfilter.PluginInfo{
38+
Name: pluginName,
39+
Doc: "Upload rows to rsyslog",
40+
ArgType: typeMap.AddType(scope, &rsyslogUploadPluginArgs{}),
41+
Metadata: vql.VQLMetadata().Permissions(acls.COLLECT_SERVER).Build(),
42+
}
43+
}
44+
45+
func (r rsyslogUploadPlugin) Call(
46+
ctx context.Context, scope vfilter.Scope, args *ordereddict.Dict,
47+
) <-chan vfilter.Row {
48+
// this plugin does not send anything to its output channel
49+
outputCh := make(chan vfilter.Row)
50+
51+
go func() {
52+
defer close(outputCh)
53+
54+
err := vql.CheckAccess(scope, acls.COLLECT_SERVER)
55+
if err != nil {
56+
scope.Log("%s: check access failed: %v", pluginName, err)
57+
return
58+
}
59+
60+
arg := rsyslogUploadPluginArgs{}
61+
err = arg_parser.ExtractArgsWithContext(ctx, scope, args, &arg)
62+
if err != nil {
63+
scope.Log("%s: parsing args: %v", pluginName, err)
64+
return
65+
}
66+
if arg.UnixDomain == "" {
67+
scope.Log("%s: parameter UnixDomain must be set", pluginName)
68+
return
69+
}
70+
if arg.Threads == 0 {
71+
arg.Threads = 1
72+
}
73+
74+
configObj, ok := vql.GetServerConfig(scope)
75+
if !ok {
76+
scope.Log("%s: could not get config from scope", pluginName)
77+
return
78+
}
79+
80+
options := api.QueueOptions{
81+
DisableFileBuffering: false,
82+
FileBufferLeaseSize: 100,
83+
OwnerName: pluginName,
84+
}
85+
86+
listenerCtx, cancelListener := context.WithCancel(context.Background())
87+
defer cancelListener()
88+
89+
listener, err := directory.NewListener(configObj, listenerCtx, pluginName, options)
90+
if err != nil {
91+
scope.Log("%s: could not create listener: %v", pluginName, err)
92+
return
93+
}
94+
95+
scope.Log("%s: starting %d worker threads", pluginName, arg.Threads)
96+
wg := sync.WaitGroup{}
97+
for i := 0; i < arg.Threads; i++ {
98+
wg.Add(1)
99+
go rsyslogSender(ctx, &wg, arg.UnixDomain, scope, listener.Output())
100+
}
101+
102+
rowCh := arg.Query.Eval(ctx, scope)
103+
104+
quitLoop := false
105+
for !quitLoop {
106+
select {
107+
case <-ctx.Done():
108+
listener.Close()
109+
quitLoop = true
110+
case row, ok := <-rowCh:
111+
if !ok {
112+
continue
113+
}
114+
listener.Send(vfilter.RowToDict(ctx, scope, row))
115+
}
116+
}
117+
118+
// the workers will return when they detect that
119+
// the listener had closed its output channel
120+
wg.Wait()
121+
}()
122+
return outputCh
123+
}
124+
125+
func rsyslogSender(
126+
ctx context.Context, wg *sync.WaitGroup, address string,
127+
scope vfilter.Scope, rowCh <-chan *ordereddict.Dict,
128+
) {
129+
defer func() {
130+
scope.Log("%s: worker done", pluginName)
131+
wg.Done()
132+
}()
133+
134+
scope.Log("%s: worker started", pluginName)
135+
var (
136+
pid = strconv.Itoa(os.Getpid())
137+
conn net.Conn
138+
message string
139+
rrDialLog = rate.Sometimes{Interval: time.Minute}
140+
rrWriteLog = rate.Sometimes{Interval: time.Minute}
141+
)
142+
for {
143+
if conn == nil {
144+
var err error
145+
conn, err = net.DialTimeout("unixgram", address, time.Second)
146+
if err != nil {
147+
rrDialLog.Do(func() { scope.Log("%s: dialing: %v", pluginName, err) })
148+
utils.SleepWithCtx(ctx, time.Second)
149+
if ctx.Err() != nil {
150+
// avoid spinning here if rsyslogd is not
151+
// listening when the plugin is shutting down.
152+
return
153+
}
154+
conn = nil // probably not needed, but no harm.
155+
continue // retry dial
156+
}
157+
scope.Log("%s: worker connected!", pluginName)
158+
}
159+
if message == "" {
160+
row, ok := <-rowCh
161+
if !ok {
162+
// the listener closed its channel
163+
return
164+
}
165+
var err error
166+
message, err = rowToRsyslogString(row, pid)
167+
if err != nil {
168+
scope.Log("%s: creating rsyslog message: %v", pluginName, err)
169+
return
170+
}
171+
}
172+
conn.SetWriteDeadline(time.Now().Add(time.Second))
173+
_, err := io.WriteString(conn, message)
174+
if err != nil {
175+
rrWriteLog.Do(func() { scope.Log("%s: writing to conn: %v", pluginName, err) })
176+
conn.Close()
177+
conn = nil // conn is an interface!
178+
continue // Retry sending the same message on the next iteration.
179+
}
180+
181+
// the message was sent successfully.
182+
message = ""
183+
}
184+
}
185+
186+
func rowToRsyslogString(row *ordereddict.Dict, pid string) (string, error) {
187+
message := rfc5424.SyslogMessage{}
188+
message.SetPriority(0)
189+
message.SetVersion(1)
190+
message.SetAppname("velociraptor")
191+
message.SetProcID(pid)
192+
message.SetMessage(row.String()) // json
193+
194+
return message.String()
195+
}
196+
197+
func init() {
198+
vql.RegisterPlugin(&rsyslogUploadPlugin{})
199+
}

0 commit comments

Comments
 (0)