[Hindsight] how to change heka files to hindsight

黄文彪 huangwenbiao at 126.com
Fri Apr 7 03:32:30 UTC 2017


as below, heka input configs use decode to process log file.
And in the decoder lua file's process_message function,there is no need to use io function to open file/close,but in hindsight there is no decoder module,can someone tell me the truth how hindsight's input module due with the log file?
because to use io function to due with file is very unconfortable.
thx!


in heka:
1、cat /etc/heka.toml
### Input #####################################################################

[da_decoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "lua_decoders/coremail_da_log.lua"

   [da_decoder.config]
   type = "da"
   platform = "xxx"
   expire_sec = 600  # Skip logs earlier than 10 minutes
   payload_keep = false

[da_input]
type = "LogstreamerInput"
decoder = "da_decoder"
log_directory = "/home/coremail/logs/"
file_match = 'deliveragent.log.(?P<Year>\d+)-(?P<Month>\d+)-(?P<Day>\d+)'
differentiator = ["Year", "Month", "Day"]
priority = ["Year", "Month", "Day"]
oldest_duration = "24h"

### Output ####################################################################
[ESJsonEncoder]
type_name = "%{Type}"
es_index_from_timestamp = true
index = "%{Type}-%{%Y.%m.%d}"
   [ESJsonEncoder.field_mappings]
   Timestamp = "@timestamp"
   Severity = "level"

[elastic_output]
type = "ElasticSearchOutput"
server = "http://ip:19200"
message_matcher = "Type != 'heka.all-report' && Type != 'heka.memstat'"
encoder = "ESJsonEncoder"
username = "heka"
password = "tzLLNFNGUTmw3z9W"
# flush_interval = 1000  # Default=1000, 1s
#flush_count = 50  # Default=10
# use_buffering = false  # Default=true

   [elastic_output.buffering]
   max_file_size = 16777216  # 16MB
   max_buffer_size = 2147483648 # changed 2048MB
   full_action = "block" #shutdown,drop,block
   # cursor_update_count = 100
### END ####################################################################


2、cat coremail_da_log.lua
require "os"
local string = require "string"
local coremail_log = require "lib_coremail_log"
local l = require "lpeg"
local C, P, S = l.C, l.P, l.S
l.locale(l)


local key = l.Cs((l.alnum + S("-_.")) ^ 1) / string.lower
local sep = S(",")
local value_char = coremail_log.backslash_escaped_char(sep)
local value = l.Cs(value_char ^ 0)
local key_value = l.Cg(key * P("=") * value) * sep ^ -1


local da_msg_id_char = l.alnum + S("-_.+")
local init_table = l.Ct(
    P("T:") * l.Cg(l.digit ^ 1, "tid") *  -- tid
    P("(") * coremail_log.hh_mm_ss_obj * P(")") * -- time
    (1 - l.space) ^ 1 * l.space ^ 1 * -- skip [S:F50265][da:Info]
    l.Cg(da_msg_id_char ^ 1, "da_msg_id") * P(":") -- da_msg_id
)
local init_table_cac = l.Ct(
    P("T:") * l.Cg(l.digit ^ 1, "tid") *  -- tid
    P("(") * coremail_log.hh_mm_ss_obj * P(")") * -- time
    (1 - l.space) ^ 1 * l.space ^ 1 * -- skip [S:F50265][da:Info]
    l.Cg(da_msg_id_char ^ 1, "da_msg_id") * P(" ") -- da_msg_id
)


local grammar = l.Cf(init_table * key_value ^ 0, rawset) * C(P(1) ^ 0)
local grammar_cac = l.Cf(init_table_cac, rawset)




local number_keys = {delay=true, optime=true, size=true, tid=true}
local delete_keys = {id=true, rcpttype=true, coremailsmtp=true}
local ignore_states = {defer=false, scaned=true}
local ignore_domains = {"@coremail.cn>", "@mailtech.cn>"}


-- Return: fields, timestamp
local function decode(line, date)
    local fields, failed_msg = grammar:match(line)
    if not fields then
        return nil, 0
    end


    local timestamp = coremail_log.get_timestamp(date, fields["time"])
    fields["time"] = nil
    local from = fields["from"]
    local to = fields["to"]
    if from then
        for key,value in pairs(ignore_domains) do
            if string.match(from, value) or string.match(to, value) then
                return nil, 0
            end
        end
        if from ~= '<>' then
            fields["from"] = from:gsub("[<>]", '')
        end
        fields["to"] = to:gsub("[<>]", '')
    end


    local state = fields["state"]
    if state == nil or ignore_states[state] then
        return nil, 0
    end


    for key, value in pairs(fields) do
        if number_keys[key] then
            fields[key] = tonumber(fields[key])
        end
        if delete_keys[key] then
            fields[key] = nil
        end
    end


    local proxy = fields["proxy"]
    if proxy ~= nil then
        -- SDN Rule?
        fields["proxy_rule"] = proxy:match("Match SDN RuleID:(%d+)")
        if fields["proxy_rule"] == nil and proxy:match(
                "Match Proxy Transport: ([0-9.]+)") then
            fields["proxy_rule"] = "Transport"
        end


        -- Proxy server
        fields["proxy_server"] = proxy:match("(%d+[.]%d+[.]%d+[.]%d+)")
    end


    fields["failed_msg"] = failed_msg


    return fields, timestamp
end


--
-- Heka interface
--


function process_message()
    -- Read config and payload
    local payload_keep = read_config("payload_keep")
    local type = read_config("type")
    local expire_sec = read_config("expire_sec")
    local platform = read_config("platform")
    local line = read_message("Payload")
    local date = read_message("Logger") 
    local payload = nil
    if payload_keep then
        payload = line
    end


    -- Decode
    local fields, timestamp = decode(line, date)
    if not fields then
        local string_cac = line:match("CAC Check message error, .*")
        if string_cac then
            --local fields_cac = {}
            local fields_cac = grammar_cac:match(line)
            if not fields_cac then
                return -1
            end
            --local time = string.match(line, "%d+:%d+:%d+")
            timestamp = coremail_log.get_timestamp(date, fields_cac["time"])
            local age = os.time() - coremail_log.timezone_offset - (timestamp / 1e9)
            if age > expire_sec then
                return -1
            end
            fields_cac["platform"] = platform
            fields_cac["iscacerror"] = 1
            fields_cac["error_msg"] = string_cac
            local msg = {
                Timestamp = timestamp,
                Type = "cacerror",
                Payload = payload,
                Platform = platform,
                Fields = fields_cac
            }
            inject_message(msg)
            return 0
        else
            return -1
        end
    end


    local age = os.time() - coremail_log.timezone_offset - (timestamp / 1e9)
    if age > expire_sec then
        return -1
    end


    fields["platform"] = platform 
    local msg = {
        Timestamp = timestamp,
        Type = type,
        Payload = payload,
        Fields = fields
    }
    inject_message(msg)
    return 0
end
############################END#################
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.mozilla.org/pipermail/hindsight/attachments/20170407/c6654366/attachment-0001.html>


More information about the Hindsight mailing list