亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

k8s與日志--采用golang實(shí)現(xiàn)Fluent Bit的output插件

岳光 / 3885人閱讀

摘要:采用實(shí)現(xiàn)的插件前言目前社區(qū)日志采集和處理的組件不少,之前方案中的,社區(qū)中的,方案中的以及大數(shù)據(jù)用到比較多的。適合采用的方案,實(shí)現(xiàn)日志中心化收集的方案。主要負(fù)責(zé)采集,負(fù)責(zé)處理和傳送。

采用golang實(shí)現(xiàn)Fluent Bit的output插件 前言

目前社區(qū)日志采集和處理的組件不少,之前elk方案中的logstash,cncf社區(qū)中的fluentd,efk方案中的filebeat,以及大數(shù)據(jù)用到比較多的flume。而Fluent Bit是一款用c語言編寫的高性能的日志收集組件,整個架構(gòu)源于fluentd。官方比較數(shù)據(jù)如下:

Fluentd Fluent Bit
Scope Containers / Servers Containers / Servers
Language C & Ruby C
Memory ~40MB ~450KB
Performance High Performance High Performance
Dependencies Built as a Ruby Gem, it requires a certain number of gems. Zero dependencies, unless some special plugin requires them.
Plugins More than 650 plugins available Around 35 plugins available
License Apache License v2.0 Apache License v2.0

通過數(shù)據(jù)可以看出,fluent bit 占用資源更少。適合采用fluent bit + fluentd 的方案,實(shí)現(xiàn)日志中心化收集的方案。fluent bit主要負(fù)責(zé)采集,fluentd負(fù)責(zé)處理和傳送。

擴(kuò)展output插件

fluent bit 本身是C語言編寫,擴(kuò)展插件有一定的難度??赡芄俜娇紤]到這一點(diǎn),實(shí)現(xiàn)了fluent-bit-go,可以實(shí)現(xiàn)采用go語言來編寫插件,目前只支持output的編寫。
fluent-bit-go其實(shí)就是利用cgo,封裝了c接口。代碼比較簡單,主要分析其中一個關(guān)鍵文件

package output

/*
#include 
#include "flb_plugin.h"
#include "flb_output.h"
*/
import "C"
import "fmt"
import "unsafe"

// Define constants matching Fluent Bit core
const FLB_ERROR               =  C.FLB_ERROR
const FLB_OK                  =  C.FLB_OK
const FLB_RETRY               =  C.FLB_RETRY

const FLB_PROXY_OUTPUT_PLUGIN =  C.FLB_PROXY_OUTPUT_PLUGIN
const FLB_PROXY_GOLANG        =  C.FLB_PROXY_GOLANG

// Local type to define a plugin definition
type FLBPlugin C.struct_flb_plugin_proxy
type FLBOutPlugin C.struct_flbgo_output_plugin

// When the FLBPluginInit is triggered by Fluent Bit, a plugin context
// is passed and the next step is to invoke this FLBPluginRegister() function
// to fill the required information: type, proxy type, flags name and
// description.
func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    p._type = FLB_PROXY_OUTPUT_PLUGIN
    p.proxy = FLB_PROXY_GOLANG
    p.flags = 0
    p.name  = C.CString(name)
    p.description = C.CString(desc)
    return 0
}

// Release resources allocated by the plugin initialization
func FLBPluginUnregister(ctx unsafe.Pointer) {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    fmt.Printf("[flbgo] unregistering %v
", p)
    C.free(unsafe.Pointer(p.name))
    C.free(unsafe.Pointer(p.description))
}

func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string {
    _key := C.CString(key)
    return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx)))
}

主要是定義了一些編寫插件需要用到的變量和方法,例如FLBPluginRegister注冊組件,F(xiàn)LBPluginConfigKey獲取配置文件設(shè)定參數(shù)等。
PS
實(shí)際上用golang調(diào)用fluent-bit-go,再加一些實(shí)際的業(yè)務(wù)邏輯實(shí)現(xiàn),最終編譯成一個c-share的.so動態(tài)鏈接庫。

定制fluent-bit-kafka-ouput插件

實(shí)際上,fluent-bit v0.13版本以后就提供了kafka output的插件,但是實(shí)際項(xiàng)目中,并不滿足我們的需求,必須定制化。
當(dāng)然接下來的代碼主要是作為一個demo,講清楚如何編寫一個output插件。

代碼編寫和分析

先上代碼:

package main

import (
    "C"
    "fmt"
    "io"
    "log"
    "reflect"
    "strconv"
    "strings"
    "time"
    "unsafe"

    "github.com/Shopify/sarama"
    "github.com/fluent/fluent-bit-go/output"
    "github.com/ugorji/go/codec"
)

var (
    brokers    []string
    producer   sarama.SyncProducer
    timeout    = 0 * time.Minute
    topic      string
    module     string
    messageKey string
)

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
    return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")
}

//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

    if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {
        brokers = strings.Split(bs, ",")
    } else {
        log.Printf("you must set brokers")
        return output.FLB_ERROR
    }

    if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {
        topic = tp
    } else {
        log.Printf("you must set topics")
        return output.FLB_ERROR
    }

    if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {
        module = mo
    } else {
        log.Printf("you must set module")
        return output.FLB_ERROR
    }

    if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {
        messageKey = key
    } else {
        log.Printf("you must set message_key")
        return output.FLB_ERROR
    }

    config := sarama.NewConfig()
    config.Producer.Return.Successes = true

    if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {
        if acks, err := strconv.Atoi(required_acks); err == nil {
            config.Producer.RequiredAcks = sarama.RequiredAcks(acks)
        }
    }

    if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {
        if codec, err := strconv.Atoi(compression_codec); err == nil {
            config.Producer.Compression = sarama.CompressionCodec(codec)
        }
    }

    if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {
        if max_retry, err := strconv.Atoi(max_retry); err == nil {
            config.Producer.Retry.Max = max_retry
        }
    }

    if timeout == 0 {
        timeout = 5 * time.Minute
    }
    // If Kafka is not running on init, wait to connect
    deadline := time.Now().Add(timeout)
    for tries := 0; time.Now().Before(deadline); tries++ {
        var err error
        if producer == nil {
            producer, err = sarama.NewSyncProducer(brokers, config)
        }
        if err == nil {
            return output.FLB_OK
        }
        log.Printf("Cannot connect to Kafka: (%s) retrying...", err)
        time.Sleep(time.Second * 30)
    }

    log.Printf("Kafka failed to respond after %s", timeout)
    return output.FLB_ERROR

}

//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
    var h codec.MsgpackHandle

    var b []byte
    var m interface{}
    var err error

    b = C.GoBytes(data, length)
    dec := codec.NewDecoderBytes(b, &h)

    // Iterate the original MessagePack array
    var msgs []*sarama.ProducerMessage
    for {
        // decode the msgpack data
        err = dec.Decode(&m)
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Printf("Failed to decode msgpack data: %v
", err)
            return output.FLB_ERROR
        }

        // Get a slice and their two entries: timestamp and map
        slice := reflect.ValueOf(m)
        data := slice.Index(1)

        // Convert slice data to a real map and iterate
        mapData := data.Interface().(map[interface{}]interface{})
        flattenData, err := Flatten(mapData, "", UnderscoreStyle)
        if err != nil {
            break
        }

        message := ""
        host := ""

        for k, v := range flattenData {
            value := ""
            switch t := v.(type) {
            case string:
                value = t
            case []byte:
                value = string(t)
            default:
                value = fmt.Sprintf("%v", v)
            }

            if k == "pod_name" {
                host = value
            }

            if k == messageKey {
                message = value
            }

        }
        if message == "" || host == "" {
            break
        }

        m := &sarama.ProducerMessage{
            Topic: topic,
            Key:   sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),
            Value: sarama.ByteEncoder(message),
        }
        msgs = append(msgs, m)

    }

    err = producer.SendMessages(msgs)
    if err != nil {
        log.Printf("FAILED to send kafka message: %s
", err)
        return output.FLB_ERROR
    }
    return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
    producer.Close()
    return output.FLB_OK
}

func main() {
}

FLBPluginExit 插件退出的時候需要執(zhí)行的一些方法,比如關(guān)閉連接。

FLBPluginRegister 注冊插件

FLBPluginInit 插件初始化

FLBPluginFlush flush到數(shù)據(jù)到output

FLBPluginConfigKey 獲取配置文件中參數(shù)

PS
當(dāng)然除了FLBPluginConfigKey之外,也可以通過獲取環(huán)境變量來獲得設(shè)置參數(shù)。
ctx相當(dāng)于一個上下文,負(fù)責(zé)之間的數(shù)據(jù)的傳遞。

編譯和執(zhí)行

編譯的時候

go build -buildmode=c-shared -o out_kafka.so .

生成out_kafka.so

執(zhí)行的時候

/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so
總結(jié)

采用類似的編寫結(jié)構(gòu),就可以定制化自己的輸出插件了。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/33069.html

相關(guān)文章

  • k8s日志--采用golang實(shí)現(xiàn)Fluent Bitoutput插件

    摘要:采用實(shí)現(xiàn)的插件前言目前社區(qū)日志采集和處理的組件不少,之前方案中的,社區(qū)中的,方案中的以及大數(shù)據(jù)用到比較多的。適合采用的方案,實(shí)現(xiàn)日志中心化收集的方案。主要負(fù)責(zé)采集,負(fù)責(zé)處理和傳送。 采用golang實(shí)現(xiàn)Fluent Bit的output插件 前言 目前社區(qū)日志采集和處理的組件不少,之前elk方案中的logstash,cncf社區(qū)中的fluentd,efk方案中的filebeat,以及大...

    binta 評論0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一個開源和多平臺的,它允許您從不同的來源收集數(shù)據(jù)日志,統(tǒng)一并將它們發(fā)送到多個目的地。例如日志收集日志分析主要講部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。業(yè)務(wù)落盤的日志。部署方案采取部署。 前言 收集日志的組件多不勝數(shù),有ELK久負(fù)盛名組合中的logstash, 也有EFK組合中的filebeat,更有cncf新貴fluentd,另外還有大數(shù)據(jù)領(lǐng)域...

    betacat 評論0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一個開源和多平臺的,它允許您從不同的來源收集數(shù)據(jù)日志,統(tǒng)一并將它們發(fā)送到多個目的地。例如日志收集日志分析主要講部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。業(yè)務(wù)落盤的日志。部署方案采取部署。 前言 收集日志的組件多不勝數(shù),有ELK久負(fù)盛名組合中的logstash, 也有EFK組合中的filebeat,更有cncf新貴fluentd,另外還有大數(shù)據(jù)領(lǐng)域...

    CoffeX 評論0 收藏0
  • 關(guān)于k8s集群容器日志收集總結(jié)

    摘要:我推薦你使用進(jìn)行日志收集,將作為的出口。集群目前暫時沒有提供日志查看機(jī)制。以如下的形式啟動容器,容器日志將發(fā)往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產(chǎn)品做了比較。最后介紹了好雨云幫如何對k8s進(jìn)行改造并使用ZeroMQ以消息的形式將日志傳輸?shù)浇y(tǒng)一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...

    jeffrey_up 評論0 收藏0
  • 關(guān)于k8s集群容器日志收集總結(jié)

    摘要:我推薦你使用進(jìn)行日志收集,將作為的出口。集群目前暫時沒有提供日志查看機(jī)制。以如下的形式啟動容器,容器日志將發(fā)往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產(chǎn)品做了比較。最后介紹了好雨云幫如何對k8s進(jìn)行改造并使用ZeroMQ以消息的形式將日志傳輸?shù)浇y(tǒng)一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...

    or0fun 評論0 收藏0

發(fā)表評論

0條評論

岳光

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<