-
Notifications
You must be signed in to change notification settings - Fork 9
Description
Versions:
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '2.11.11'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-flowcounter' version '1.3.0'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-kafka' version '0.7.9'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-multiprocess' version '0.2.2'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-prometheus' version '1.0.1'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-record-modifier' version '1.1.0'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.1.0'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-s3' version '1.1.6'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-string-scrub' version '1.0.0'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-td' version '1.0.0'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.4'
2018-11-19 19:07:06 +0000 [info]: gem 'fluent-plugin-webhdfs' version '1.2.3'
2018-11-19 19:07:06 +0000 [info]: gem 'fluentd' version '1.2.6'
Server config:
<source>
@type forward
@label @RAW
tag forwarded
port 24224
bind 0.0.0.0
source_address_key forwarder_ip
source_hostname_key forwarder_hostname
<transport tls>
ca_cert_path /etc/td-agent/server.pem
ca_private_key_path /etc/td-agent/server.key
</transport>
<security>
shared_key key
self_hostname harvester-staging-gce-be-1.logs.fqdn
</security>
</source>
<label @RAW>
<filter **>
@type string_scrub
replace_char BORKED
</filter>
<filter **>
@type record_transformer
enable_ruby true
<record>
pushed_by harvester-staging-gce-be-1.logs.fqdn:1
target_index ${if record['project'] and record['team']; sprintf("%s_%s--%s", record['team'], record['project'], Time.new.strftime('%Y.%m.%d')); elsif record['project']; sprintf('%s--%s',record['project'],Time.new.strftime('%Y.%m.%d')); else sprintf('fallback--%s',Time.new.strftime('%Y.%m.%d')); end}
severity ${record["severity"].downcase if record["severity"]}
</record>
</filter>
<match **>
@type copy
<store>
@type elasticsearch
host localhost
port 9200
target_type_key @target_key
type_name fluentd
target_index_key target_index
logstash_format true
logstash_prefix invalid--
time_key @timestamp
include_timestamp true
reconnect_on_error true
reload_on_failure true
reload_connections false
request_timeout 120s
<buffer>
@type file
flush_interval 10s
retry_type periodic
retry_forever true
retry_wait 10s
chunk_limit_size 16Mb
queue_limit_length 4096
total_limit_size 15Gb
path /var/lib/td-agent/buffers/output_elasticsearch-1
</buffer>
</store>
</match>
</label>
<label @ERROR>
<match **>
@type file
path /var/log/td-agent/es_errors/child_1.log.%Y%m%d
add_path_suffix false
append true
<buffer>
@type memory
flush_mode interval
flush_interval 10s
</buffer>
</match>
</label>
On the client server I run this:
printf '\xFF\xADfoo' | logger
Server td-agent log:
2018-11-19 19:07:36 +0000 [warn]: #0 send an error event to @ERROR: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="syslog.user.notice" time=2018-11-19 19:07:16.000000000 +0000
es_error log:
2018-11-19T19:07:16+00:00 syslog.user.notice {"host":"client","ident":"myusername","message":"��foo","severity":"notice","syslog_facility":"user","fqdn":"client.fqdn","hostname":"client.fqdn","project":"foo","cluster":"dev","team":"ops","original_tag":"syslog.user.notice","forwarder_ip":"1.2.3.4","forwarder_hostname":"barbaz","pushed_by":"harvester-staging-gce-be-1.logs.fqdn:1","target_index":"ops_client--2018.11.19"}