Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ test/tmp
test/version_tmp
tmp
test.conf
vendor
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
source :rubygems
source 'https://rubygems.org'

gemspec
169 changes: 75 additions & 94 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,133 +7,114 @@
- `gem install fluent-plugin-postgres` or
- `/usr/lib/fluent/ruby/bin/fluent-gem install fluent-plugin-postgres`

## Changes from mysql:
## About:

- We currently don't support json format
- You need to specify a SQL query
- Placeholders are numbered (yeah, I know).
This plugin is an adaptation of the MySQL plugin.

Other than that, just bear in mind that it's Postgres SQL.
- Does not currently support json format ( see [fluent-plugin-pgjson](https://github.com/fluent-plugins-nursery/fluent-plugin-pgjson))
- Placeholders are numbered.

### Quick example

<match output.by.sql.*>
type postgres
host master.db.service.local
# port 3306 # default
database application_logs
username myuser
password mypass
key_names status,bytes,vhost,path,rhost,agent,referer
sql INSERT INTO accesslog (status,bytes,vhost,path,rhost,agent,referer) VALUES ($1,$2,$3,$4,$5,$6,$7)
```
<match output.by.sql.*>
@type postgres
host master.db.service.local
# port 5432 # default
database application_logs
username myuser
password mypass
key_names status,bytes,vhost,path,rhost,agent,referer
sql INSERT INTO accesslog (status,bytes,vhost,path,rhost,agent,referer) VALUES ($1,$2,$3,$4,$5,$6,$7)
<buffer>
flush_intervals 5s
</match>
</buffer>
</match>
```

## Configuration

### Option Parameters

## Component
#### Connection Parameters

### PostgresOutput
* host
* port
* database
* username
* password

Plugin to store Postgres tables over SQL, to each columns per values, or to single column as json.
The standard information needed to connect to the database as for any SQL application. Port is defaulted to 5432.

## Configuration
#### key_names

### MysqlOutput
A comma seperated list of the key names from the Fluentd record that will be written to the database.

MysqlOutput needs MySQL server's host/port/database/username/password, and INSERT format as SQL, or as table name and columns.
The *key_names* do not need to match the field names of your database, but the order does need to match. Each key will be replaced in the insert values by the corresponding numbered placeholder ($1,$2...).

<match output.by.sql.*>
type mysql
host master.db.service.local
# port 3306 # default
database application_logs
username myuser
password mypass
key_names status,bytes,vhost,path,rhost,agent,referer
sql INSERT INTO accesslog (status,bytes,vhost,path,rhost,agent,referer) VALUES (?,?,?,?,?,?,?)
flush_intervals 5s
</match>

<match output.by.names.*>
type mysql
host master.db.service.local
database application_logs
username myuser
password mypass
key_names status,bytes,vhost,path,rhost,agent,referer
table accesslog
# 'columns' names order must be same with 'key_names'
columns status,bytes,vhost,path,rhost,agent,referer
flush_intervals 5s
</match>

Or, insert json into single column.

<match output.as.json.*>
type mysql
host master.db.service.local
database application_logs
username root
table accesslog
columns jsondata
format json
flush_intervals 5s
</match>
#### sql

A SQL query to insert the record. This is a standard insert query. The Postgres numbered place holder format is required. It is required to provide either a query or *table* and *columns* parameters, in which case this plugin will generate the insert query for you.

For simple inserts it is easier to use *table* and *columns*, sql will permit use of a function or custom sql.

An exception will be raised if both *columns* and *sql* are provided.

To include time/tag into output, use `include_time_key` and `include_tag_key`, like this:
#### columns

<match output.with.tag.and.time.*>
type mysql
host my.mysql.local
database anydatabase
username yourusername
password secret
By providing a comma seperated list of *columns* and *table* the plugin will generate an insert query for *sql*.

include_time_key yes
### default `time_format` is ISO-8601
# time_format %Y%m%d-%H%M%S
#### table

The table name, required with *columns*, ignored with *sql*.

## More Examples

```

<match output.by.names.*>
@type postgres
...
table accesslog
key_names status,bytes,vhost,path,rhost,agent,referer
# 'columns' names order must be same with 'key_names'
columns status,bytes,vhost,path,rhost,agent,referer
...
</match>

<match output.with.tag.and.time.*>
@type postgres
...
<inject>
time_type string
time_format "%Y-%m-%d %H:%M:%S.%L"
### default `time_key` is 'time'
# time_key timekey

include_tag_key yes
### default `tag_key` is 'tag'
# tag_key tagkey
</inject>

table anydata
key_names time,tag,field1,field2,field3,field4
sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?)
</match>
key_names time,tag,field1,field2,field3,field4
sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)
...
</match>
```

Or, for json:
## Troubleshooting

<match output.with.tag.and.time.as.json.*>
type mysql
host database.local
database foo
username root
The Default output level of fluentd will not display important errors generated from this plugin. Use the -v or -vv flags to see them.

include_time_key yes
utc # with UTC timezome output (default: localtime)
time_format %Y%m%d-%H%M%S
time_key timeattr

include_tag_key yes
tag_key tagattr
table accesslog
columns jsondata
format json
</match>
#=> inserted json data into column 'jsondata' with addtional attribute 'timeattr' and 'tagattr'
The --dry-run flag will attempt to parse the config file without starting fluentd.

## TODO

* implement json support
* implement 'tag_mapped'
* dynamic tag based table selection
* dynamic tag based table selection

## Copyright

* Copyright
* Copyright 2013 Uken Games
* Copyright 2013-2020 Uken Games
* License
* Apache License, Version 2.0
17 changes: 14 additions & 3 deletions lib/fluent/plugin/out_postgres.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Fluent::Plugin::PostgresOutput < Fluent::Plugin::Output
helpers :inject, :compat_parameters

config_param :host, :string
config_param :port, :integer, :default => nil
config_param :port, :integer, :default => 5432
config_param :database, :string
config_param :username, :string
config_param :password, :string, :default => ''
Expand All @@ -17,6 +17,7 @@ class Fluent::Plugin::PostgresOutput < Fluent::Plugin::Output
config_param :table, :string, :default => nil
config_param :columns, :string, :default => nil

# Currently unimplimented
config_param :format, :string, :default => "raw" # or json

attr_accessor :handler
Expand All @@ -34,10 +35,20 @@ def configure(conf)
end

if @columns.nil? and @sql.nil?
raise Fluent::ConfigError, "columns or sql MUST be specified, but missing"
raise Fluent::ConfigError, "postgres plugin -- columns or sql MUST be specified, but missing"
end
if @columns and @sql
raise Fluent::ConfigError, "both of columns and sql are specified, but specify one of them"
raise Fluent::ConfigError, "postgres plugin -- both of columns and sql are specified, but specify one of them"
end
if @columns
unless @table
raise Fluent::ConfigError, "postgres plugin -- columns is specified but table is missing"
end
placeholders = []
for i in 1..@columns.split(',').count
placeholders.push("$#{i}")
end
@sql = "INSERT INTO #{@table} (#{@columns}) VALUES (#{placeholders.join(',')})"
end
end

Expand Down
56 changes: 49 additions & 7 deletions test/plugin/test_out_postgres.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def setup
username bar
password mogera
key_names field1,field2,field3
sql INSERT INTO baz (col1,col2,col3,col4) VALUES (?,?,?,?)
sql INSERT INTO baz (col1,col2,col3,col4) VALUES ($1,$2,$3,$4)
]

def create_driver(conf=CONFIG)
Expand All @@ -39,7 +39,7 @@ def test_configure_fails_if_both_cols_and_sql_specified
username bar
password mogera
key_names field1,field2,field3
sql INSERT INTO baz (col1,col2,col3,col4) VALUES (?,?,?,?)
sql INSERT INTO baz (col1,col2,col3,col4) VALUES ($1,$2,$3,$4)
columns col1,col2,col3,col4
]
}
Expand All @@ -65,7 +65,7 @@ def test_key_names_with_spaces
password mogera
table baz
key_names time, tag, field1, field2, field3, field4
sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?)
sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)
]
assert_equal ["time", "tag", "field1", "field2", "field3", "field4"], d.instance.key_names
end
Expand All @@ -81,9 +81,9 @@ def test_time_and_tag_key
include_tag_key yes
table baz
key_names time,tag,field1,field2,field3,field4
sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?)
sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)
]
assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?)', d.instance.sql
assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)', d.instance.sql

time = event_time('2012-12-17 01:23:45 UTC')
record = {'field1'=>'value1','field2'=>'value2','field3'=>'value3','field4'=>'value4'}
Expand All @@ -107,9 +107,9 @@ def test_time_and_tag_key_complex
tag_key tagkey
table baz
key_names timekey,tagkey,field1,field2,field3,field4
sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?)
sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)
]
assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?)', d.instance.sql
assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)', d.instance.sql

time = event_time('2012-12-17 09:23:45 +0900')
record = {'field1'=>'value1','field2'=>'value2','field3'=>'value3','field4'=>'value4'}
Expand All @@ -118,4 +118,46 @@ def test_time_and_tag_key_complex
end
assert_equal ['test', time, ['20121217-002345','test','value1','value2','value3','value4']].to_msgpack, d.formatted[0]
end

def test_throw_when_table_missing
assert_raise(Fluent::ConfigError) {
create_driver %[
host database.local
database foo
username bar
password mogera
key_names field1,field2,field3
columns field1,field2,field3
]
}
end

def test_sql_from_columns
e = create_driver %[
host database.local
database foo
username bar
password mogera
include_time_key yes
utc
include_tag_key yes
table baz
key_names time,tag,field1,field2,field3,field4
columns time,tag,field1,field2,field3,field4
# sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)
]
# assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)', e.instance.sql
# puts e.instance.sql
assert_equal 'INSERT INTO baz (time,tag,field1,field2,field3,field4) VALUES ($1,$2,$3,$4,$5,$6)', e.instance.sql
assert_equal '1', '1'
end

end
# # class TestSimpleNumber < Test::Unit::TestCase

# def test_simple
# assert_equal(4, 2+2 )
# assert_equal(6, 4+2 )
# end

# # end