Skip to content
Fabrice Bacchella edited this page Nov 27, 2025 · 58 revisions

Processors

Processors are the main worker of loghub. They take events and transform them.

They are defined using the syntax:

ClassName {
    attribute: value;
    ...
}

Fields processor

Many of the processors transform a single field and return a value. For those kinds of field processors, there is a few shortcuts that allow easier management.

The first use case is simply the declaration:

| loghub.processors.Someone {
    field: "message",
    ...
}

But also, many fields can be given using an array. It this case, glob are used. For more details, see Pattern Matching Notation. For each field repeated, a processing step is added, using the same processor instance.

For example:

| loghub.processors.Someone {
    fields: ["message", "sub*"],
    ...
}

This transformer will work on the field message, subMessage, but not other.

Events can be structured and contains map of key. It's not always convenient to repeat or define the full path. So a processor can use a path prefix. All the other fields will be relatives to this path and path component are separated using a dot. If a field name starts with a ., it refers to the root.

For example:

| loghub.processors.Someone {
    path: "query",
    fields: [".message", "param"],
    ...
}

This processor will work on the fields message and query.param.

The result will be stored in the same field by default. If someone wants a copy instead it can use the bean destination. A pattern can be used that will have access to the field variable.

Some fields processor generator a new hash of values. For example, the processor loghub.processors.SyslogPriority transform a single numerical value to a hash of severity and facility. The default option is to transform the field and replace it with the hash. But if attribute inPlace is set to true, it will keep the field and the hash items to the event.

For example the configuration:

loghub.processors.SyslogPriority {
     field: [syslog_pri],
     resolve: false,
 }

while transform the event containing the value {syslog_pri: 38} to {syslog_pri: {severity: 6, facility: 4}}

The configuration

loghub.processors.SyslogPriority {
     field: [syslog_pri],
     resolve: false,
     inPlace: true,
 }

The event will be transformed to {syslog_pri: 38, severity: 6, facility: 4}.

If the field to be processed is iterable (an array or a Collection), each of it’s element will be processed individually unless the attribute iterate is set to false.

Conditions

The processing execution can be managed using 4 attributes.

The attribute if can be used to control execution of a processor. It's an expression. If it returns false, this processor will not be executed.

The attribute success refers to pipe element that will be executed if the processor succeeded.

The attribute failure refers to pipe element that will be executed if the processor failed by returning a false value.

The attribute exception refers to pipe element that will be executed if the processor failed throwing a ProcessorException.

Unmanaged exceptions can’t be handled, they are stored in the JMX bean loghub:type=stats/Exceptions and the event is dropped.

Processor as pipe clause

Some processors are not be used directly, but are directly parsed using custom construct. For example, to drop a single event, instead of writing

    | loghub.processors.Drop {}

it’s easier to write it as

    | drop

Processors list

loghub.processors.AnsiClean

Attributes

  • remplacement a string that can be used to replace ANSI formatting

loghub.processors.Cidr

Given an IP or hostname, it can identify to which previously listed network it belongs. Network are enumerated as a CIDR notation.

It’s a field processor.

Attributes

  • networks an array of CIDR masks, using the notation IP/mask length

loghub.processors.Convert

A processor that take a String field and transform it to any object that can take a String as a constructor. Empty strings are ignored. The given class can be an Enum type.

It can also be used to parse IP address given a byte array or a String to a java.net.InetAddress.

A custom type loghub.types.MacAddress can be used to normalize MAC address using the EUI-48/64 format, e.g. 01-23-45-67-89-AB.

A custom type loghub.types.Dn can be used to normalize DN, by removing superfluous spaces.

A custom type loghub.types.MimeType can be used to normalize mime type, like used in HTTP's header Content-Type .

It can also parse byte array to the following types:

  • java.lang.Character
  • java.lang.Byte
  • java.lang.Short
  • java.lang.Integer
  • java.lang.Long
  • java.lang.Float
  • java.lang.Double

It’s a field processor.

Clause

It can be used with the compact syntax:

(java.class.Name) [variable path]

Attributes

If used as an explicit processor, some fields can be used:

  • className, the expected type to use, default to java.lang.String.
  • charset, the charset to use if a byte array is to be converted to a String, default to UTF-8.
  • byteOrder, the byte array order, from BIG_ENDIAN, LITTLE_ENDIAN ; default to native order.
  • encoding, if the input is a byte array encoded using a known binary encoding, it will be decoded first. Known decoder are BASE64, BASE64MIME, BASE64MIME, BASE64URL and Z85.

loghub.processors.Crlf

Used to transform a multi line field in a single line one, needed when the event will be sent using a line-separated sender, like Syslog on TCP or CSV.

It can also be used to unify line separator to a unique one, with or without escaping it.

It’s a field processor.

Attributes

  • format, what format to used, from the values CRLF, CR, LF, KEEP; default to KEEP.
  • escape, a boolean to ensure that line feed are indeed escaped.

loghub.processors.DateParser

Used to parse a date in a string and store the result as a java.time.Instant object. It uses Axibas patterns for parsing. If the destination field is @timestamp, the event's timestamp will be changed.

Some predefined named patterns are defined, comparison is case-insensitive.

  • ISO
  • ISO_DATE_TIME
  • ISO_INSTANT
  • RFC_822_WEEK_DAY
  • RFC_822_SHORT
  • RFC_3164
  • NANOSECONDS
  • MILLISECONDS
  • SECONDS
  • ISO8601
  • UNIX
  • UNIX_MS
  • UNIX_NS

It can also parse numerical value, using unix epoch as a reference. If the pattern is seconds, it can be an integer or floating number of seconds. If it’s milliseconds, it’s an integer number of milliseconds. If it’s nanoseconds, it’s an integer number of nanoseconds.

It’s a field processor.

Attributes

  • timezone, ensure the time zone of parsed value, if it can't be deduced from the string.
  • locale, use this locale for parsing, default to en.
  • patterns, an array of patterns to try until one matches, Default to ISO, RFC_822_WEEK_DAY, RFC_822_SHORT, RFC_3164, MILLISECONDS.
  • pattern, a single pattern to use for parsing.

loghub.processors.DecodeUrl

Used to decode string encoded for URL It's a field processor.

Attributes

  • encoding, the encoding used for the decoding, default to "UTF-8".
  • loop, loop until no more decoded is done, default to false.

loghub.processors.Dissect

An implementation of the dissect specification from Elastic, describe at Dissect specification.

It adds an automatic type casting when requested, similar to Grok, using the :typename syntax, that must follow the key name and be declared before modifiers.

Attributes

  • pattern a dissect pattern.
  • appendSeparator, the format used to join appended keys.
  • inPlace, the default is to return a map of values that will be stored in the destination field. It set to true, keys can use the path syntax (a.b or #meta) and the value wil be stored at the provided path

Example

The examples given in the specification link apply directly. For new features, the pattern %{#a:int} will parse the string 1 as an int value and stored in the meta a if inPlace is set to true

Type resolvers

The manager types resolvers for key are

  • byte
  • boolean
  • short
  • int, or integer
  • long
  • float
  • double
  • ip

loghub.processors.Drop

Drop a single event.

Clause

drop

loghub.processors.DurationConvert

This processor take a number or a Duration as the input and convert it to a know duration scale.

The values for the duration can be NANO, MICRO, MILLI, CENTI, DECI, SECOND, SECOND_FLOAT, DURATION, STRING.

Both SECOND and SECOND_FLOAT can take an integer or a float as the input. But when used as the output, SECOND will output only the integer part of the duration as a second, and SECOND_FLOAT will output the duration as a float in seconds.

If the STRING argument is used, the value will be resolved using the ISO-8601 specifications PnDTnHnMn.nS.

When in is set to DURATION, the field value must be a java.time.Duration or it will be ignored. When used on out, once parsed the duration resolved will be forwared as is.

This processor can take number or number formatted as string as inpute value.

Attributes

  • in the expected time unit for the input value.
  • out the timeunit what will be used for the output.

loghub.processors.Encoder

loghub.processors.Forker

Clause

+$destinationpipe

loghub.processors.Forwarder

loghub.processors.Geoip2

It uses GeoIP2 API and database (.mmdb). The path to the database can be given in the global geoip2data property, so all the possible instances uses the same, or defined for each instance.

It's a field processor.

Attributes

  • geoipdb, the path to the GeoIP db. Default to the value of the global property geoip2data. Or it will search for GeoLite2-City.mmd in the class path. It can be loaded any valid URL, using for example fileor https schema.
  • required, if false, the processor will not fails at startup and will retry again at refresh interval.
  • types, an array of value to extract from the database, can take the values country, registredcountry, representedcountry, city, location, continent, postal and subdivision.
  • locale, GeoIP2 databases contains many locale names, this indicates which one to use, default to "en".
  • cacheSize, the size of the cache used by the Maxmind library.
  • refresh, who often to refresh the loaded database ; defined using a duration patten, as define in ISO-8601 duration, as format PnDTnHnMn.nS.

The destination attribute will define where a map will store the possible resolved values. Fully resolved, it will be filled with the following structure:

country:
   code:
   name:
represented_country:
   code:
   name:
registred_country:
   code:
   name:
city:
location:
    latitude:
    longitude:
    timezone:
    accuray_radius:
    metro_code:
    average_income:
    population_density:
continent:
postal:
subdivisions:
    -   code:
        name:
    ...

Custom build mmdb can also be used, content will be directly added to the event.

Some example of duration (copied from javadoc):

    "PT20.345S" -- parses as "20.345 seconds"
    "PT15M"     -- parses as "15 minutes" (where a minute is 60 seconds)
    "PT10H"     -- parses as "10 hours" (where an hour is 3600 seconds)
    "P2D"       -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
    "P2DT3H4M"  -- parses as "2 days, 3 hours and 4 minutes"
    "PT-6H3M"    -- parses as "-6 hours and +3 minutes"
    "-PT6H3M"    -- parses as "-6 hours and -3 minutes"
    "-PT-6H+3M"  -- parses as "+6 hours and -3 minutes"

loghub.processors.Flatten

If the filed is an iterable (collection, stream or array), all nested iterable will be flattened. The resulting value preserves the original type, except for arrays that will be transformed as a List. If the value is not iterable, it is returned unchanged.

loghub.processors.Grok

Uses Grok to parse a field. It uses Java Grok

All pattern are found inside the patterns folder of the class path. So it can be stored under the plugins property of the configuration file.

The destination can be a full LogHub path like %{PATTERN:my.path} or %{PATTERN:.event.code}

Automatic typing can also be used with the syntax: %{PATTERN:destination:type}. The separator can either be a : or a ;.

The following types are supported:

  • byte
  • boolean
  • short
  • integer, int
  • float
  • double
  • datetime, date
  • string, text

It the type is datetime or date, it can be following by a date pattern matching the rules from DateTimeFormatter, for example: %{HTTPDATE:timestamp;date;dd/MMM/yyyy:HH:mm:ss Z}.

If in a pattern, the destination name is ., it will be put back in the original field. It allows to write an expression like

   | loghub.processors.Grok {
      pattern: "%{HOSTNAME:.}\\.mydomain\\.com",
      fields: ["remote_address", "host", "local_address"],
   }

This will remove .mydomain.com from each field remote_address, host, local_address only if it contains it and keep the hostname part.

Patterns can be tested in command line. If run with the argument -g or --grok, it specifies the pattern folder. It then read stdin. The first line is the pattern to check (the pattern bean) and then each following lines is checked against it. If the parsing failed, the line is printed back prefixed with ** failing ** . If succeeded, the found properties are printed.

It's a field processor.

Attributes

  • pattern: grok pattern to use.
  • patterns: an array of patterns to try; will stop at the first valid.
  • customPatterns: some patterns to add to the default set.

loghub.processors.Hierarchical

Transform a set of attributes written as flat values into a hiearchical struct. For example, it will tranform {"a.b": 1} to {"a": {"b":1}}.

Attributes

  • destination: a path where to store the extracted variables, default to current level.
  • fields: an array of glob to filter the fields to extract.

loghub.processors.Log

loghub.processors.Log4JExtract

loghub.processors.Mapper

loghub.processors.Merge

loghub.processors.NamedSubPipeline

loghub.processors.NameResolver

Used to resolve names from a field. It uses JNDI for that.

It's a field processor.

Attributes

  • ttl, the TTL for queries in the ehcache,
  • cacheSize, the ehcache size,
  • resolver, the IP of a resolver, if not given, it uses systems one,
  • timeout, the time out of queries in seconds.

loghub.processors.NettyNameResolver

Used to resolve names from a field. It uses Netty asynchronous resolver. It allows waiting requests to don’t hold a processor thread.

It’s a field processor.

Attributes

  • resolver, the IP of a resolver, if not given, it uses the system’s one.
  • resolvers, an array of IP or hostname of a resolver, if not given, it uses the system’s one.
  • resolutionMode, when multiple resolvers are found, how to resolve them, can be SEQUENTIALor PARALLEL.
  • timeout, the time out of queries in seconds.
  • cacheSize, the entry cache size.
  • poller, the Netty poller to use.
  • rcvBuf, the UDP socket receiver buffer.
  • sndBuf, the UDP socket send buffer.
  • queueDepth, the number of simultaneous request allowed; -1 default to the queueDepth property, 0 means no restrictions.
  • failureCaching, how long the failed queries are cached, in seconds.

The purpose of resolutionMode is to increase reliability of name resolution when the load is high. It ensures that DNS cache is always hot to improve performance when it’s restarted. And as DNS resolution always uses UDP, it also helps protect against packet loss. The TCP socket buffers size is not configurable, it relies on TCP autotune.

loghub.processors.OnigurumaRegex

loghub.processors.ParseCef

CEF (Common Event Format) is a syslog-like message format defined by ArcSight, current version is 25

At this message is quite strange, parsing it using a regex is not really possible, and a dedicated parser is provided. This parse only resolve the message part. So from a message being:

Sep 19 08:26:10 host CEF:0|Security|threatmanager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232

it will be used to resolve the message CEF:0|Security|threatmanager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232, so any transport or source format can be used, it just needs to be cleaned. The fields are extracted and put in the following event fields :

  • version
  • device_vendor
  • device_product
  • device_version
  • device_event_class_id
  • name
  • severity
  • extensions

A standard usage, that implements Elastic Common Schema could be:

pipeline[cef] {
    loghub.processors.Grok {
        pattern: "%{TIMESTAMP:timestamp} %{WORD:host.hostname} %{CEFDATA:cef.message}",
        source: [message],
        customPatterns: {
            "TIMESTAMP": "%{MONTH} %{MONTHDAY} (%{YEAR} )?%{TIME}",
            "CEFDATA": "CEF:%{GREEDYDATA}",
        },
        success: (
            [event original]< [message] |
            loghub.processors.DateParser {
                field: [timestamp],
                destination: [@timestamp],
                patterns: ["MMM dd YYYY HH:mm:ss"],
                success: [timestamp]-,
            } |
            path[cef] (
                loghub.processors.ParseCef {
                    field: [message],
                    success: (
                        [message]- |
                        [device_vendor] == "vendor1" ? path[extensions] ($vendor1) |
                        [device_vendor] == "vendor2" ? path[extensions] ($vendor2) |
                            [. event code] < [device_event_class_id] |
                            [. observer vendor] < [device_vendor] |
                            [. observer product] < [device_product] |
                            [. observer version] < [device_version] |
                            [. message] < [name] |
                            [. event severity] < [severity]
                        ) |
                        path[extensions] (
                            [. message] < [msg] |
                            [. source interface alias] < [deviceInboundInterface] |
                            [. destination interface alias] < [deviceOutboundInterface] |
                            [. destination mac] < [smac] |
                            [. source mac] < [dmac] |
                            [. destination bytes] < [out] |
                            [. source bytes] < [in] |
                            (java.lang.Integer) [spt] | [. source port] < [spt] |
                            (java.lang.Integer) [dpt] | [. destination port] < [dpt]
                        )
                    )
                }
            )
        ),
    }
}

loghub.processors.ParseCsv

loghub.processors.ParseJson

Uses com.fasterxml.jackson for parsing a json string and encode the result in the field.

It's a field processor.

loghub.processors.ParseXml

Transform a field that contains bytes or a String to a DOM object. If it’s already a DOM node, it’s left untouched.

attributes:

  • nameSpaceAware does the parsing keeps the name space

It's a field processor.

loghub.processors.ScanBinary

This processor parse a string or a number as a bits field. It can manage single bit (flag) bits field or split it as sub value with variable length subfields.

It's a field processor.

Attributes

  • bitsNames, an array of string, that give the name of individual bits. It's mandatory.
  • fieldsLength, an array of integer ; if given, the input value is split as subfield of the given length.
  • asMap, a boolean ; a flags bit field can either be returned as an array of set bits or a map of all flags with there value.

For example, for the following configuration:

loghub.processors.PrintBinary {
    bitsNames: ["PF_PROT", "PF_WRITE", "PF_USER", "PF_RSVD", "PF_INSTR"],
    field: "error_code",
    destination: "errors"
}

If given an event that contains the field "field": 13, it will put ["PF_PROT", "PF_USER", "PF_RSVD"] in errors because 13 resolve to 0x1101. Each bit is matched to the associated entry in the array, the first bit maps to first entry in the array and so on. Only bits set to 1 will keep the name. If the attribute asMap is set to true, the processor will return a map of {"PF_PROT": 1, "PF_WRITE": 0, "PF_USER": 1, "PF_RSVD": 1, "PF_INSTR": 0}

loghub.processors.Split

loghub.processors.SyslogPriority

It's used to resolve an integer as a syslog priority field.

It's a field processor.

The storing fields used depends on the value of the fields ecs, resolve and inPlace.

The default configuration output two attributes [facility] and [severity] that contains the decoded priority as string value, or numerical value if resolve is set to false.

If ecs is set to true. The priority is decoded and put in specific attributes, the field resolve is ignored.

  • [. log syslog severity name] for the severity name ;
  • [. log syslog severity code] for the severity numerical value ;
  • [. log syslog facility name] for the facility name ;
  • [. log syslog facility code] for the facility numerical value.

If both ecs and inPlace are set to true, the following fields are added to the current event:

  • [severity name] for the severity name ;
  • [severity code] for the severity numerical value ;
  • [facility name] for the facility name ;
  • [facility code] for the facility numerical value.

Attributes

  • ecs, use the Elastic common schema field to store the resulting fields.
  • resolve, does facility and severity numbers are resolver to string, default to true.
  • facilities, an ordered array of string to uses for facilites values, default to [ "kernel", "user-level" "mail", "daemon", "security/authorization", "syslogd", "line printer", "network news", "uucp", "clock", "security/authorization", "ftp", "ntp", "log audit", "log alert", "clock", "local0", "local1", "local2", "local3", "local4", "local5", "local6", "local7" ]
  • severities, an ordered array of string to uses for severities values, default to [ "emergency", "alert", "critical", "error", "warning", "notice", "informational", "debug"]
  • inPlace, if set to `true, store the result in the event, not the original field

loghub.processors.Test

Used to write tests and dependent of the value, it can process a true or false clause.

Clause

test ? trueClause: falseClause

loghub.processors.UrlParser

Parse a field as a URI and extract the different parts. The following fields can be generated if they are resolved:

  • scheme;
  • user_info;
  • username;
  • password;
  • domain;
  • port;
  • path;
  • extension;
  • query;
  • fragment;
  • specific, for opaque URI;

If the URL fails to parse, it will return a failure, that can be handled for specific processing. For example, and to be compliant with Elastic common schema

path[url] (
    loghub.processors.UrlParser {
        field: [original],
        inPlace: true,
        failure: [#badurl] = true,
    }
)

It’s a field processor.

Attributes

  • reference, if the URI is not absolute, it will be resolved again the given URI.
  • inPlace, if set to `true, store the result in the event, not the original field

loghub.processors.UserAgent

Parse a user agent field, using the ua-parser/uap-java library and yaml files extracted from ua-parser/uap-core. It looks for the file ua_parser/regexes.yaml in the class path, so an alternate file can be given. Results are cached.

It’s a field processor.

Attributes

  • cacheSize, the cache size to use.
  • inPlace, if set to true, store the result in the event, not the original field

loghub.processors.VarExtractor

It iterates other a field using a pattern to extract a set of name/value. The pattern is a common java regex that must define two named capturing group name and value. After the parsing, the content not matched is put back in the original field. If nothing left, that field is destroyed.

It's a field processor.

Attributes

  • parser, the parser to use to extract values, it defaults to (?<name>\\p{Alnum}+)\\p{Space}?[=:]\\p{Space}?(?<value>[^;,:]+)[;,:]? that should match common case.
  • collision, how to handle multiple occurrence of the same key; KEEP_FIRST keeps the first value, KEEP_LAST keep the last one and AS_LIST transform the value as a list and add the new value.

loghub.processors.XPathExtractor

Apply an XPath to an already parsed DOM field.

attributes:

  • xpath, the xpath to apply.

loghub.processors.X509Parser

Given a value, it parses it as an X.509 certificate and parse it’s content. It can take an PEM file, a DER encoded file or even X509Certificate object. The result matches the Elastic common schema for x509 Certificate Fields recommendation for x509. It can use the inPlace attribute.

The common use case should be

[... x509 certificate] = "-----BEGIN CERTIFICATE-----\n..." |
path[... x509] (
    loghub.processors.X509Parser {
      field: [certificate],
      inPlace: true,
      success: [certificate]-,
    }

attributes:

  • withExtensions, are the certificate extensions are exported. They are not parsed or resolved but exported as raw BER encoded data.

Clone this wiki locally