diff --git a/.gitignore b/.gitignore index cdd14aa..dfaae9d 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ test/tmp test/version_tmp tmp test.conf +vendor \ No newline at end of file diff --git a/Gemfile b/Gemfile index a1b93f3..fa75df1 100644 --- a/Gemfile +++ b/Gemfile @@ -1,3 +1,3 @@ -source :rubygems +source 'https://rubygems.org' gemspec diff --git a/README.md b/README.md index d0c9e06..9792de3 100644 --- a/README.md +++ b/README.md @@ -7,133 +7,114 @@ - `gem install fluent-plugin-postgres` or - `/usr/lib/fluent/ruby/bin/fluent-gem install fluent-plugin-postgres` -## Changes from mysql: +## About: -- We currently don't support json format -- You need to specify a SQL query -- Placeholders are numbered (yeah, I know). +This plugin is an adaptation of the MySQL plugin. -Other than that, just bear in mind that it's Postgres SQL. +- Does not currently support json format ( see [fluent-plugin-pgjson](https://github.com/fluent-plugins-nursery/fluent-plugin-pgjson)) +- Placeholders are numbered. ### Quick example - - - type postgres - host master.db.service.local - # port 3306 # default - database application_logs - username myuser - password mypass - key_names status,bytes,vhost,path,rhost,agent,referer - sql INSERT INTO accesslog (status,bytes,vhost,path,rhost,agent,referer) VALUES ($1,$2,$3,$4,$5,$6,$7) +``` + + @type postgres + host master.db.service.local + # port 5432 # default + database application_logs + username myuser + password mypass + key_names status,bytes,vhost,path,rhost,agent,referer + sql INSERT INTO accesslog (status,bytes,vhost,path,rhost,agent,referer) VALUES ($1,$2,$3,$4,$5,$6,$7) + flush_intervals 5s - + + +``` +## Configuration +### Option Parameters -## Component +#### Connection Parameters -### PostgresOutput +* host +* port +* database +* username +* password -Plugin to store Postgres tables over SQL, to each columns per values, or to single column as json. +The standard information needed to connect to the database as for any SQL application. Port is defaulted to 5432. -## Configuration +#### key_names -### MysqlOutput +A comma seperated list of the key names from the Fluentd record that will be written to the database. -MysqlOutput needs MySQL server's host/port/database/username/password, and INSERT format as SQL, or as table name and columns. +The *key_names* do not need to match the field names of your database, but the order does need to match. Each key will be replaced in the insert values by the corresponding numbered placeholder ($1,$2...). - - type mysql - host master.db.service.local - # port 3306 # default - database application_logs - username myuser - password mypass - key_names status,bytes,vhost,path,rhost,agent,referer - sql INSERT INTO accesslog (status,bytes,vhost,path,rhost,agent,referer) VALUES (?,?,?,?,?,?,?) - flush_intervals 5s - - - - type mysql - host master.db.service.local - database application_logs - username myuser - password mypass - key_names status,bytes,vhost,path,rhost,agent,referer - table accesslog - # 'columns' names order must be same with 'key_names' - columns status,bytes,vhost,path,rhost,agent,referer - flush_intervals 5s - - -Or, insert json into single column. - - - type mysql - host master.db.service.local - database application_logs - username root - table accesslog - columns jsondata - format json - flush_intervals 5s - +#### sql + +A SQL query to insert the record. This is a standard insert query. The Postgres numbered place holder format is required. It is required to provide either a query or *table* and *columns* parameters, in which case this plugin will generate the insert query for you. + +For simple inserts it is easier to use *table* and *columns*, sql will permit use of a function or custom sql. + +An exception will be raised if both *columns* and *sql* are provided. -To include time/tag into output, use `include_time_key` and `include_tag_key`, like this: +#### columns - - type mysql - host my.mysql.local - database anydatabase - username yourusername - password secret +By providing a comma seperated list of *columns* and *table* the plugin will generate an insert query for *sql*. - include_time_key yes - ### default `time_format` is ISO-8601 - # time_format %Y%m%d-%H%M%S +#### table + +The table name, required with *columns*, ignored with *sql*. + +## More Examples + +``` + + + @type postgres + ... + table accesslog + key_names status,bytes,vhost,path,rhost,agent,referer + # 'columns' names order must be same with 'key_names' + columns status,bytes,vhost,path,rhost,agent,referer + ... + + + + @type postgres + ... + + time_type string + time_format "%Y-%m-%d %H:%M:%S.%L" ### default `time_key` is 'time' # time_key timekey - include_tag_key yes ### default `tag_key` is 'tag' # tag_key tagkey + - table anydata - key_names time,tag,field1,field2,field3,field4 - sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?) - + key_names time,tag,field1,field2,field3,field4 + sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6) + ... + +``` -Or, for json: +## Troubleshooting - - type mysql - host database.local - database foo - username root +The Default output level of fluentd will not display important errors generated from this plugin. Use the -v or -vv flags to see them. - include_time_key yes - utc # with UTC timezome output (default: localtime) - time_format %Y%m%d-%H%M%S - time_key timeattr - - include_tag_key yes - tag_key tagattr - table accesslog - columns jsondata - format json - - #=> inserted json data into column 'jsondata' with addtional attribute 'timeattr' and 'tagattr' +The --dry-run flag will attempt to parse the config file without starting fluentd. ## TODO +* implement json support * implement 'tag_mapped' - * dynamic tag based table selection +* dynamic tag based table selection ## Copyright * Copyright - * Copyright 2013 Uken Games + * Copyright 2013-2020 Uken Games * License * Apache License, Version 2.0 diff --git a/lib/fluent/plugin/out_postgres.rb b/lib/fluent/plugin/out_postgres.rb index e8f8449..c391352 100644 --- a/lib/fluent/plugin/out_postgres.rb +++ b/lib/fluent/plugin/out_postgres.rb @@ -7,7 +7,7 @@ class Fluent::Plugin::PostgresOutput < Fluent::Plugin::Output helpers :inject, :compat_parameters config_param :host, :string - config_param :port, :integer, :default => nil + config_param :port, :integer, :default => 5432 config_param :database, :string config_param :username, :string config_param :password, :string, :default => '' @@ -17,6 +17,7 @@ class Fluent::Plugin::PostgresOutput < Fluent::Plugin::Output config_param :table, :string, :default => nil config_param :columns, :string, :default => nil +# Currently unimplimented config_param :format, :string, :default => "raw" # or json attr_accessor :handler @@ -34,10 +35,20 @@ def configure(conf) end if @columns.nil? and @sql.nil? - raise Fluent::ConfigError, "columns or sql MUST be specified, but missing" + raise Fluent::ConfigError, "postgres plugin -- columns or sql MUST be specified, but missing" end if @columns and @sql - raise Fluent::ConfigError, "both of columns and sql are specified, but specify one of them" + raise Fluent::ConfigError, "postgres plugin -- both of columns and sql are specified, but specify one of them" + end + if @columns + unless @table + raise Fluent::ConfigError, "postgres plugin -- columns is specified but table is missing" + end + placeholders = [] + for i in 1..@columns.split(',').count + placeholders.push("$#{i}") + end + @sql = "INSERT INTO #{@table} (#{@columns}) VALUES (#{placeholders.join(',')})" end end diff --git a/test/plugin/test_out_postgres.rb b/test/plugin/test_out_postgres.rb index a4aff07..00d708b 100644 --- a/test/plugin/test_out_postgres.rb +++ b/test/plugin/test_out_postgres.rb @@ -12,7 +12,7 @@ def setup username bar password mogera key_names field1,field2,field3 -sql INSERT INTO baz (col1,col2,col3,col4) VALUES (?,?,?,?) +sql INSERT INTO baz (col1,col2,col3,col4) VALUES ($1,$2,$3,$4) ] def create_driver(conf=CONFIG) @@ -39,7 +39,7 @@ def test_configure_fails_if_both_cols_and_sql_specified username bar password mogera key_names field1,field2,field3 -sql INSERT INTO baz (col1,col2,col3,col4) VALUES (?,?,?,?) +sql INSERT INTO baz (col1,col2,col3,col4) VALUES ($1,$2,$3,$4) columns col1,col2,col3,col4 ] } @@ -65,7 +65,7 @@ def test_key_names_with_spaces password mogera table baz key_names time, tag, field1, field2, field3, field4 -sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?) +sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6) ] assert_equal ["time", "tag", "field1", "field2", "field3", "field4"], d.instance.key_names end @@ -81,9 +81,9 @@ def test_time_and_tag_key include_tag_key yes table baz key_names time,tag,field1,field2,field3,field4 -sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?) +sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6) ] - assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?)', d.instance.sql + assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)', d.instance.sql time = event_time('2012-12-17 01:23:45 UTC') record = {'field1'=>'value1','field2'=>'value2','field3'=>'value3','field4'=>'value4'} @@ -107,9 +107,9 @@ def test_time_and_tag_key_complex tag_key tagkey table baz key_names timekey,tagkey,field1,field2,field3,field4 -sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?) +sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6) ] - assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES (?,?,?,?,?,?)', d.instance.sql + assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)', d.instance.sql time = event_time('2012-12-17 09:23:45 +0900') record = {'field1'=>'value1','field2'=>'value2','field3'=>'value3','field4'=>'value4'} @@ -118,4 +118,46 @@ def test_time_and_tag_key_complex end assert_equal ['test', time, ['20121217-002345','test','value1','value2','value3','value4']].to_msgpack, d.formatted[0] end + + def test_throw_when_table_missing + assert_raise(Fluent::ConfigError) { + create_driver %[ +host database.local +database foo +username bar +password mogera +key_names field1,field2,field3 +columns field1,field2,field3 + ] + } + end + + def test_sql_from_columns + e = create_driver %[ +host database.local +database foo +username bar +password mogera +include_time_key yes +utc +include_tag_key yes +table baz +key_names time,tag,field1,field2,field3,field4 +columns time,tag,field1,field2,field3,field4 +# sql INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6) + ] + # assert_equal 'INSERT INTO baz (coltime,coltag,col1,col2,col3,col4) VALUES ($1,$2,$3,$4,$5,$6)', e.instance.sql +# puts e.instance.sql + assert_equal 'INSERT INTO baz (time,tag,field1,field2,field3,field4) VALUES ($1,$2,$3,$4,$5,$6)', e.instance.sql + assert_equal '1', '1' + end + end +# # class TestSimpleNumber < Test::Unit::TestCase + +# def test_simple +# assert_equal(4, 2+2 ) +# assert_equal(6, 4+2 ) +# end + +# # end \ No newline at end of file