[Hindsight] how to change heka files to hindsight

Michael Trinkala mtrinkala at mozilla.com
Fri Apr 7 14:38:43 UTC 2017


The LogstreamerInput plugin was not ported to Hindsight.  The closest thing
available in Hindsight is 'tail.lua'
https://mozilla-services.github.io/lua_sandbox_extensions/lfs/sandboxes/heka/input/tail.html

Hindsight has decoders but they are modules, not sandboxes, as described
here:
https://github.com/mozilla-services/lua_sandbox_extensions#decoder-api-convention
e.g., Nginx access.log decoder:
https://mozilla-services.github.io/lua_sandbox_extensions/lpeg/io_modules/decoders/nginx/access.html

Putting it all together to read an Nginx access log produces an input cfg
like the following:

--- nginx_access.cfg
filename = "tail.lua"
ticker_interval = 1
input_filename = "/var/log/nginx/access.log"
follow = "name"

decoder_module = "decoders.nginx.access"
decoders_nginx_access = {
  log_format = '$remote_addr - $remote_user [$time_local] "$request"
$status $body_bytes_sent "$http_referer" "$http_user_agent"'
}
---

That should provide a good working model of how inputs and decoders are
used in Hindsight.
Trink

On Thu, Apr 6, 2017 at 8:32 PM, 黄文彪 <huangwenbiao at 126.com> wrote:

> as below, heka input configs use decode to process log file.
> And in the decoder lua file's process_message function,there is no need to
> use io function to open file/close,but in hindsight there is no decoder
> module,can someone tell me the truth how hindsight's input module due with
> the log file?
> because to use io function to due with file is very unconfortable.
> thx!
>
> in heka:
> 1、cat /etc/heka.toml
> ### Input ############################################################
> #########
> [da_decoder]
> type = "SandboxDecoder"
> script_type = "lua"
> filename = "lua_decoders/coremail_da_log.lua"
>
>    [da_decoder.config]
>    type = "da"
>    platform = "xxx"
>    expire_sec = 600  # Skip logs earlier than 10 minutes
>    payload_keep = false
>
> [da_input]
> type = "LogstreamerInput"
> decoder = "da_decoder"
> log_directory = "/home/coremail/logs/"
> file_match = 'deliveragent.log.(?P<Year>\d+)-(?P<Month>\d+)-(?P<Day>\d+)'
> differentiator = ["Year", "Month", "Day"]
> priority = ["Year", "Month", "Day"]
> oldest_duration = "24h"
> ### Output ############################################################
> ########
> [ESJsonEncoder]
> type_name = "%{Type}"
> es_index_from_timestamp = true
> index = "%{Type}-%{%Y.%m.%d}"
>    [ESJsonEncoder.field_mappings]
>    Timestamp = "@timestamp"
>    Severity = "level"
>
> [elastic_output]
> type = "ElasticSearchOutput"
> server = "http://ip:19200"
> message_matcher = "Type != 'heka.all-report' && Type != 'heka.memstat'"
> encoder = "ESJsonEncoder"
> username = "heka"
> password = "tzLLNFNGUTmw3z9W"
> # flush_interval = 1000  # Default=1000, 1s
> #flush_count = 50  # Default=10
> # use_buffering = false  # Default=true
>
>    [elastic_output.buffering]
>    max_file_size = 16777216  # 16MB
>    max_buffer_size = 2147483648 <(214)%20748-3648> # changed 2048MB
>    full_action = "block" #shutdown,drop,block
>    # cursor_update_count = 100
> ### END ############################################################
> ########
>
> 2、cat coremail_da_log.lua
> require "os"
> local string = require "string"
> local coremail_log = require "lib_coremail_log"
> local l = require "lpeg"
> local C, P, S = l.C, l.P, l.S
> l.locale(l)
>
> local key = l.Cs((l.alnum + S("-_.")) ^ 1) / string.lower
> local sep = S(",")
> local value_char = coremail_log.backslash_escaped_char(sep)
> local value = l.Cs(value_char ^ 0)
> local key_value = l.Cg(key * P("=") * value) * sep ^ -1
>
> local da_msg_id_char = l.alnum + S("-_.+")
> local init_table = l.Ct(
>     P("T:") * l.Cg(l.digit ^ 1, "tid") *  -- tid
>     P("(") * coremail_log.hh_mm_ss_obj * P(")") * -- time
>     (1 - l.space) ^ 1 * l.space ^ 1 * -- skip [S:F50265][da:Info]
>     l.Cg(da_msg_id_char ^ 1, "da_msg_id") * P(":") -- da_msg_id
> )
> local init_table_cac = l.Ct(
>     P("T:") * l.Cg(l.digit ^ 1, "tid") *  -- tid
>     P("(") * coremail_log.hh_mm_ss_obj * P(")") * -- time
>     (1 - l.space) ^ 1 * l.space ^ 1 * -- skip [S:F50265][da:Info]
>     l.Cg(da_msg_id_char ^ 1, "da_msg_id") * P(" ") -- da_msg_id
> )
>
> local grammar = l.Cf(init_table * key_value ^ 0, rawset) * C(P(1) ^ 0)
> local grammar_cac = l.Cf(init_table_cac, rawset)
>
>
> local number_keys = {delay=true, optime=true, size=true, tid=true}
> local delete_keys = {id=true, rcpttype=true, coremailsmtp=true}
> local ignore_states = {defer=false, scaned=true}
> local ignore_domains = {"@coremail.cn>", "@mailtech.cn>"}
>
> -- Return: fields, timestamp
> local function decode(line, date)
>     local fields, failed_msg = grammar:match(line)
>     if not fields then
>         return nil, 0
>     end
>
>     local timestamp = coremail_log.get_timestamp(date, fields["time"])
>     fields["time"] = nil
>     local from = fields["from"]
>     local to = fields["to"]
>     if from then
>         for key,value in pairs(ignore_domains) do
>             if string.match(from, value) or string.match(to, value) then
>                 return nil, 0
>             end
>         end
>         if from ~= '<>' then
>             fields["from"] = from:gsub("[<>]", '')
>         end
>         fields["to"] = to:gsub("[<>]", '')
>     end
>
>     local state = fields["state"]
>     if state == nil or ignore_states[state] then
>         return nil, 0
>     end
>
>     for key, value in pairs(fields) do
>         if number_keys[key] then
>             fields[key] = tonumber(fields[key])
>         end
>         if delete_keys[key] then
>             fields[key] = nil
>         end
>     end
>
>     local proxy = fields["proxy"]
>     if proxy ~= nil then
>         -- SDN Rule?
>         fields["proxy_rule"] = proxy:match("Match SDN RuleID:(%d+)")
>         if fields["proxy_rule"] == nil and proxy:match(
>                 "Match Proxy Transport: ([0-9.]+)") then
>             fields["proxy_rule"] = "Transport"
>         end
>
>         -- Proxy server
>         fields["proxy_server"] = proxy:match("(%d+[.]%d+[.]%d+[.]%d+)")
>     end
>
>     fields["failed_msg"] = failed_msg
>
>     return fields, timestamp
> end
>
> --
> -- Heka interface
> --
>
> function process_message()
>     -- Read config and payload
>     local payload_keep = read_config("payload_keep")
>     local type = read_config("type")
>     local expire_sec = read_config("expire_sec")
>     local platform = read_config("platform")
>     local line = read_message("Payload")
>     local date = read_message("Logger")
>     local payload = nil
>     if payload_keep then
>         payload = line
>     end
>
>     -- Decode
>     local fields, timestamp = decode(line, date)
>     if not fields then
>         local string_cac = line:match("CAC Check message error, .*")
>         if string_cac then
>             --local fields_cac = {}
>             local fields_cac = grammar_cac:match(line)
>             if not fields_cac then
>                 return -1
>             end
>             --local time = string.match(line, "%d+:%d+:%d+")
>             timestamp = coremail_log.get_timestamp(date,
> fields_cac["time"])
>             local age = os.time() - coremail_log.timezone_offset -
> (timestamp / 1e9)
>             if age > expire_sec then
>                 return -1
>             end
>             fields_cac["platform"] = platform
>             fields_cac["iscacerror"] = 1
>             fields_cac["error_msg"] = string_cac
>             local msg = {
>                 Timestamp = timestamp,
>                 Type = "cacerror",
>                 Payload = payload,
>                 Platform = platform,
>                 Fields = fields_cac
>             }
>             inject_message(msg)
>             return 0
>         else
>             return -1
>         end
>     end
>
>     local age = os.time() - coremail_log.timezone_offset - (timestamp /
> 1e9)
>     if age > expire_sec then
>         return -1
>     end
>
>     fields["platform"] = platform
>     local msg = {
>         Timestamp = timestamp,
>         Type = type,
>         Payload = payload,
>         Fields = fields
>     }
>     inject_message(msg)
>     return 0
> end
> ############################END#################
>
>
>
>
> _______________________________________________
> Hindsight mailing list
> Hindsight at mozilla.org
> https://mail.mozilla.org/listinfo/hindsight
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.mozilla.org/pipermail/hindsight/attachments/20170407/fec5ac5b/attachment-0001.html>


More information about the Hindsight mailing list